Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions video2commons/backend/download/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,22 @@
from urllib.parse import urlparse

from celery.utils.log import get_logger
import yt_dlp
from yt_dlp.utils import std_headers, DownloadError

from video2commons.exceptions import TaskError
from video2commons.shared.ratelimiting import YoutubeDLRateLimited
from video2commons.shared.yt_dlp import add_youtube_params


def download(
url, ie_key, formats, subtitles, outputdir, statuscallback=None, errorcallback=None
conn,
url,
ie_key,
formats,
subtitles,
outputdir,
statuscallback=None,
errorcallback=None,
):
"""Download a video from url to outputdir."""

Expand Down Expand Up @@ -101,7 +108,7 @@ def progresshook(d):
try:
# Not using provided ie_key because of the existance of extractors that
# targets another extractor, such as TwitterIE.
with yt_dlp.YoutubeDL(params) as dl:
with YoutubeDLRateLimited(conn, "backend", url, params) as dl:
dl.add_progress_hook(progresshook)
statuscallback("Preprocessing...", -1)
info = dl.extract_info(url, download=True, ie_key=None)
Expand All @@ -110,7 +117,7 @@ def progresshook(d):
statuscallback(
"Download failed. creating YoutubeDL instance without local cache", -1
)
with yt_dlp.YoutubeDL(params) as dl:
with YoutubeDLRateLimited(conn, "backend", url, params) as dl:
dl.add_progress_hook(progresshook)
info = dl.extract_info(url, download=True, ie_key=None)

Expand Down
4 changes: 4 additions & 0 deletions video2commons/backend/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

"""video2commons backend worker."""

import logging
import os
import sys
import shutil
Expand All @@ -43,6 +44,8 @@
)
from video2commons.shared.stats import update_task_stats

logging.basicConfig(level=logging.INFO)

redisurl = "redis://:" + redis_pw + "@" + redis_host + ":6379/"
app = celery.Celery("v2cbackend", backend=redisurl + "1", broker=redisurl + "2")
app.conf.result_expires = 30 * 24 * 3600 # 1 month
Expand Down Expand Up @@ -136,6 +139,7 @@ def errorcallback(text):
try:
statuscallback("Downloading...", -1)
d = download.download(
redisconnection,
url,
ie_key,
downloadkey,
Expand Down
1 change: 1 addition & 0 deletions video2commons/frontend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

app = Flask(__name__)

logging.basicConfig(level=logging.INFO)
app.logger.setLevel(logging.INFO)

app.session_cookie_name = "v2c-session"
Expand Down
20 changes: 19 additions & 1 deletion video2commons/frontend/urlextract.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import pywikibot
import yt_dlp

from video2commons.shared.ratelimiting import YoutubeDLRateLimited
from video2commons.shared.yt_dlp import add_youtube_params
from video2commons.frontend.wcqs import WcqsSession
from video2commons.frontend.shared import redisconnection
Expand Down Expand Up @@ -76,6 +77,15 @@
DEFAULT_QUEUE = "celery"
HEAVY_QUEUE = "heavy"

# The frontend has a shorter timeout for ratelimiting because it only fetches
# metadata. Metadata should always be returnable within a minute, so the lock
# shouldn't be held for too long so URL extractions don't hang for too long.
RATELIMIT_LOCK_TIMEOUT = 60

# The frontend only extracts metadata (download=False), so extraction request
# sleeps can be shorter than the default used by the workers.
RATELIMIT_SLEEP_INTERVAL_REQUESTS = 1

REDIS_PREFIX_BLACKLIST_KEY = "commons:filename-prefix-blacklist"
REDIS_PREFIX_BLACKLIST_TTL = 24 * 3600 # 1 day

Expand Down Expand Up @@ -179,6 +189,7 @@ def do_extract_url(url):
"subtitlesformat": "srt/ass/vtt/best",
"cachedir": "/tmp/",
"noplaylist": False,
"sleep_interval_requests": RATELIMIT_SLEEP_INTERVAL_REQUESTS,
}

if ".youtube.com/" in url:
Expand All @@ -192,7 +203,14 @@ def do_extract_url(url):
if "/playlist" in url or re.search(r"[?&]list=", url):
params["ignoreerrors"] = True

with yt_dlp.YoutubeDL(params) as dl:
with YoutubeDLRateLimited(
conn=redisconnection,
source="frontend",
url=url,
params=params,
timeout=RATELIMIT_LOCK_TIMEOUT,
blocking_timeout=RATELIMIT_LOCK_TIMEOUT,
) as dl:
info = dl.extract_info(url, download=False)

# Extract playlist entries if this is a playlist.
Expand Down
170 changes: 170 additions & 0 deletions video2commons/shared/ratelimiting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>

"""Helpers for distributed rate limiting of yt-dlp requests."""

import logging

from contextlib import contextmanager
from urllib.parse import urlparse

import yt_dlp

from redis import Redis
from redis.lock import Lock
from redis.exceptions import LockError, LockNotOwnedError

logger = logging.getLogger(__name__)

# Redis key prefix for rate limit locks.
LOCK_KEY_PREFIX = "yt_dlp_lock"

# Default lock timeout in seconds. 20 minutes was chosen to accommodate the
# cumulative sleep delays from rate limiting (e.g. 10s per media segment, 3s
# per extraction request), while still expiring in a reasonable time if a
# worker dies without releasing the lock.
DEFAULT_LOCK_TIMEOUT = 20 * 60

# Domains where rate limiting logic should be applied, grouped by site.
RATE_LIMITED_DOMAINS = {
"youtube": {
"youtube.com",
"youtube-nocookie.com",
"youtubekids.com",
"youtu.be",
"youtube.googleapis.com",
},
}
Comment on lines +39 to +47
Copy link
Copy Markdown
Collaborator Author

@Amdrel Amdrel Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a way to use the extractor since that appears to be determined after the params are created and sent to yt-dlp, so this is a bit of a hack. I'm open to replacing this if there's a better way.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks fine to me


# Rate limiting parameters for each type of request being made to YouTube.
# These are tweaked to work best with anonymous calls to YouTube that are made
# without session cookies.
SLEEP_PARAMS = {
"sleep_interval": 10, # Media file downloads (video/audio).
"sleep_interval_requests": 3, # Extraction requests (metadata, pages).
"sleep_interval_subtitles": 6, # Subtitle downloads.
}


@contextmanager
def YoutubeDLRateLimited(
conn: Redis,
source: str,
url: str,
params: dict | None = None,
auto_init=True,
timeout=DEFAULT_LOCK_TIMEOUT,
blocking_timeout=DEFAULT_LOCK_TIMEOUT,
):
"""Wrapper around yt_dlp.YoutubeDL that applies distributed rate limiting.

This context manager wraps yt-dlp's YoutubeDL class and automatically
prevents multiple workers from downloading videos and metadata
simultaneously based on the URL. This effectively reduces the risk of us
being blocked by heavily rate-limited sites like YouTube.

In addition to the existing YoutubeDL parameters, this context manager also
requires that a Redis connection, source, and URL be provided. The source
is the name of the app requesting a lock and is incorporated into the
lock's key name. This allows a different lock to be used for the frontend
than the backend workers.

The URL is needed to allow different groups of sites to be ratelimited
separately (or not at all). Currently, only YouTube is ratelimited.

timeout controls how long the lock exists in Redis before auto-expiring.
blocking_timeout controls how long to wait for the lock before giving up.
"""
# Apply ratelimit parameters if the URL is from a ratelimited domain.
group = _get_ratelimit_group(url)
params = (SLEEP_PARAMS if group else {}) | (params or {})

# Acquire the distributed lock before starting the download to prevent
# multiple workers from downloading videos and metadata at the same time,
# which can potentially cause sites like YouTube to block us.
lock = None
if group:
lock = _acquire_lock(conn, source, group, timeout, blocking_timeout)

try:
with yt_dlp.YoutubeDL(params, auto_init=auto_init) as dl:
yield dl
finally:
# Always release the lock regardless of success or failure.
if lock:
_release_lock(lock)


def _get_ratelimit_group(url: str) -> str | None:
"""Return the ratelimit group name for a URL if it's ratelimited."""
if (hostname := urlparse(url).hostname) is None:
return None

hostname = hostname.lower()

for group, domains in RATE_LIMITED_DOMAINS.items():
if any(hostname == d or hostname.endswith("." + d) for d in domains):
logger.debug("URL '%s' matched ratelimit group '%s'", url, group)
return group

return None


def _key(source: str, group: str):
"""Generate a lock key for the given source and domain group."""
return f"{LOCK_KEY_PREFIX}:{source}:{group}"


def _acquire_lock(
conn: Redis,
source: str,
group: str,
timeout=DEFAULT_LOCK_TIMEOUT,
blocking_timeout=DEFAULT_LOCK_TIMEOUT,
) -> Lock:
"""Acquire a distributed Redis lock."""
key = _key(source, group)

lock = conn.lock(
name=key,
timeout=timeout,
blocking_timeout=blocking_timeout,
)
try:
acquired = lock.acquire()
except LockError:
acquired = False

if not acquired:
message = "Failed to acquire lock '%s'"
logger.error(message, key)
raise RuntimeError(message % key)

logger.info("Acquired lock '%s'", key)

return lock


def _release_lock(lock: Lock):
"""Release a distributed Redis lock."""
try:
lock.release()
logger.info("Released lock '%s'", lock.name)
except LockNotOwnedError:
logger.warning(
"Lock '%s' expired before release (owned by another worker)",
lock.name,
)
except LockError:
message = "Failed to release lock '%s'"
logger.error(message, lock.name)
Loading