diff --git a/video2commons/backend/download/__init__.py b/video2commons/backend/download/__init__.py index a1405578..7007eda3 100644 --- a/video2commons/backend/download/__init__.py +++ b/video2commons/backend/download/__init__.py @@ -21,15 +21,22 @@ from urllib.parse import urlparse from celery.utils.log import get_logger -import yt_dlp from yt_dlp.utils import std_headers, DownloadError from video2commons.exceptions import TaskError +from video2commons.shared.ratelimiting import YoutubeDLRateLimited from video2commons.shared.yt_dlp import add_youtube_params def download( - url, ie_key, formats, subtitles, outputdir, statuscallback=None, errorcallback=None + conn, + url, + ie_key, + formats, + subtitles, + outputdir, + statuscallback=None, + errorcallback=None, ): """Download a video from url to outputdir.""" @@ -101,7 +108,7 @@ def progresshook(d): try: # Not using provided ie_key because of the existance of extractors that # targets another extractor, such as TwitterIE. - with yt_dlp.YoutubeDL(params) as dl: + with YoutubeDLRateLimited(conn, "backend", url, params) as dl: dl.add_progress_hook(progresshook) statuscallback("Preprocessing...", -1) info = dl.extract_info(url, download=True, ie_key=None) @@ -110,7 +117,7 @@ def progresshook(d): statuscallback( "Download failed. creating YoutubeDL instance without local cache", -1 ) - with yt_dlp.YoutubeDL(params) as dl: + with YoutubeDLRateLimited(conn, "backend", url, params) as dl: dl.add_progress_hook(progresshook) info = dl.extract_info(url, download=True, ie_key=None) diff --git a/video2commons/backend/worker.py b/video2commons/backend/worker.py index d15de971..acfab1c4 100644 --- a/video2commons/backend/worker.py +++ b/video2commons/backend/worker.py @@ -17,6 +17,7 @@ """video2commons backend worker.""" +import logging import os import sys import shutil @@ -43,6 +44,8 @@ ) from video2commons.shared.stats import update_task_stats +logging.basicConfig(level=logging.INFO) + redisurl = "redis://:" + redis_pw + "@" + redis_host + ":6379/" app = celery.Celery("v2cbackend", backend=redisurl + "1", broker=redisurl + "2") app.conf.result_expires = 30 * 24 * 3600 # 1 month @@ -136,6 +139,7 @@ def errorcallback(text): try: statuscallback("Downloading...", -1) d = download.download( + redisconnection, url, ie_key, downloadkey, diff --git a/video2commons/frontend/app.py b/video2commons/frontend/app.py index 92a386b0..6e7d398e 100644 --- a/video2commons/frontend/app.py +++ b/video2commons/frontend/app.py @@ -57,6 +57,7 @@ app = Flask(__name__) +logging.basicConfig(level=logging.INFO) app.logger.setLevel(logging.INFO) app.session_cookie_name = "v2c-session" diff --git a/video2commons/frontend/urlextract.py b/video2commons/frontend/urlextract.py index ba05f8c9..01952df3 100644 --- a/video2commons/frontend/urlextract.py +++ b/video2commons/frontend/urlextract.py @@ -31,6 +31,7 @@ import pywikibot import yt_dlp +from video2commons.shared.ratelimiting import YoutubeDLRateLimited from video2commons.shared.yt_dlp import add_youtube_params from video2commons.frontend.wcqs import WcqsSession from video2commons.frontend.shared import redisconnection @@ -76,6 +77,15 @@ DEFAULT_QUEUE = "celery" HEAVY_QUEUE = "heavy" +# The frontend has a shorter timeout for ratelimiting because it only fetches +# metadata. Metadata should always be returnable within a minute, so the lock +# shouldn't be held for too long so URL extractions don't hang for too long. +RATELIMIT_LOCK_TIMEOUT = 60 + +# The frontend only extracts metadata (download=False), so extraction request +# sleeps can be shorter than the default used by the workers. +RATELIMIT_SLEEP_INTERVAL_REQUESTS = 1 + REDIS_PREFIX_BLACKLIST_KEY = "commons:filename-prefix-blacklist" REDIS_PREFIX_BLACKLIST_TTL = 24 * 3600 # 1 day @@ -179,6 +189,7 @@ def do_extract_url(url): "subtitlesformat": "srt/ass/vtt/best", "cachedir": "/tmp/", "noplaylist": False, + "sleep_interval_requests": RATELIMIT_SLEEP_INTERVAL_REQUESTS, } if ".youtube.com/" in url: @@ -192,7 +203,14 @@ def do_extract_url(url): if "/playlist" in url or re.search(r"[?&]list=", url): params["ignoreerrors"] = True - with yt_dlp.YoutubeDL(params) as dl: + with YoutubeDLRateLimited( + conn=redisconnection, + source="frontend", + url=url, + params=params, + timeout=RATELIMIT_LOCK_TIMEOUT, + blocking_timeout=RATELIMIT_LOCK_TIMEOUT, + ) as dl: info = dl.extract_info(url, download=False) # Extract playlist entries if this is a playlist. diff --git a/video2commons/shared/ratelimiting.py b/video2commons/shared/ratelimiting.py new file mode 100644 index 00000000..5b31ef12 --- /dev/null +++ b/video2commons/shared/ratelimiting.py @@ -0,0 +1,170 @@ +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see + +"""Helpers for distributed rate limiting of yt-dlp requests.""" + +import logging + +from contextlib import contextmanager +from urllib.parse import urlparse + +import yt_dlp + +from redis import Redis +from redis.lock import Lock +from redis.exceptions import LockError, LockNotOwnedError + +logger = logging.getLogger(__name__) + +# Redis key prefix for rate limit locks. +LOCK_KEY_PREFIX = "yt_dlp_lock" + +# Default lock timeout in seconds. 20 minutes was chosen to accommodate the +# cumulative sleep delays from rate limiting (e.g. 10s per media segment, 3s +# per extraction request), while still expiring in a reasonable time if a +# worker dies without releasing the lock. +DEFAULT_LOCK_TIMEOUT = 20 * 60 + +# Domains where rate limiting logic should be applied, grouped by site. +RATE_LIMITED_DOMAINS = { + "youtube": { + "youtube.com", + "youtube-nocookie.com", + "youtubekids.com", + "youtu.be", + "youtube.googleapis.com", + }, +} + +# Rate limiting parameters for each type of request being made to YouTube. +# These are tweaked to work best with anonymous calls to YouTube that are made +# without session cookies. +SLEEP_PARAMS = { + "sleep_interval": 10, # Media file downloads (video/audio). + "sleep_interval_requests": 3, # Extraction requests (metadata, pages). + "sleep_interval_subtitles": 6, # Subtitle downloads. +} + + +@contextmanager +def YoutubeDLRateLimited( + conn: Redis, + source: str, + url: str, + params: dict | None = None, + auto_init=True, + timeout=DEFAULT_LOCK_TIMEOUT, + blocking_timeout=DEFAULT_LOCK_TIMEOUT, +): + """Wrapper around yt_dlp.YoutubeDL that applies distributed rate limiting. + + This context manager wraps yt-dlp's YoutubeDL class and automatically + prevents multiple workers from downloading videos and metadata + simultaneously based on the URL. This effectively reduces the risk of us + being blocked by heavily rate-limited sites like YouTube. + + In addition to the existing YoutubeDL parameters, this context manager also + requires that a Redis connection, source, and URL be provided. The source + is the name of the app requesting a lock and is incorporated into the + lock's key name. This allows a different lock to be used for the frontend + than the backend workers. + + The URL is needed to allow different groups of sites to be ratelimited + separately (or not at all). Currently, only YouTube is ratelimited. + + timeout controls how long the lock exists in Redis before auto-expiring. + blocking_timeout controls how long to wait for the lock before giving up. + """ + # Apply ratelimit parameters if the URL is from a ratelimited domain. + group = _get_ratelimit_group(url) + params = (SLEEP_PARAMS if group else {}) | (params or {}) + + # Acquire the distributed lock before starting the download to prevent + # multiple workers from downloading videos and metadata at the same time, + # which can potentially cause sites like YouTube to block us. + lock = None + if group: + lock = _acquire_lock(conn, source, group, timeout, blocking_timeout) + + try: + with yt_dlp.YoutubeDL(params, auto_init=auto_init) as dl: + yield dl + finally: + # Always release the lock regardless of success or failure. + if lock: + _release_lock(lock) + + +def _get_ratelimit_group(url: str) -> str | None: + """Return the ratelimit group name for a URL if it's ratelimited.""" + if (hostname := urlparse(url).hostname) is None: + return None + + hostname = hostname.lower() + + for group, domains in RATE_LIMITED_DOMAINS.items(): + if any(hostname == d or hostname.endswith("." + d) for d in domains): + logger.debug("URL '%s' matched ratelimit group '%s'", url, group) + return group + + return None + + +def _key(source: str, group: str): + """Generate a lock key for the given source and domain group.""" + return f"{LOCK_KEY_PREFIX}:{source}:{group}" + + +def _acquire_lock( + conn: Redis, + source: str, + group: str, + timeout=DEFAULT_LOCK_TIMEOUT, + blocking_timeout=DEFAULT_LOCK_TIMEOUT, +) -> Lock: + """Acquire a distributed Redis lock.""" + key = _key(source, group) + + lock = conn.lock( + name=key, + timeout=timeout, + blocking_timeout=blocking_timeout, + ) + try: + acquired = lock.acquire() + except LockError: + acquired = False + + if not acquired: + message = "Failed to acquire lock '%s'" + logger.error(message, key) + raise RuntimeError(message % key) + + logger.info("Acquired lock '%s'", key) + + return lock + + +def _release_lock(lock: Lock): + """Release a distributed Redis lock.""" + try: + lock.release() + logger.info("Released lock '%s'", lock.name) + except LockNotOwnedError: + logger.warning( + "Lock '%s' expired before release (owned by another worker)", + lock.name, + ) + except LockError: + message = "Failed to release lock '%s'" + logger.error(message, lock.name)