diff --git a/acapy_agent/anoncreds/revocation.py b/acapy_agent/anoncreds/revocation.py index e47a02649c..38a0e82637 100644 --- a/acapy_agent/anoncreds/revocation.py +++ b/acapy_agent/anoncreds/revocation.py @@ -7,6 +7,7 @@ import logging import os import time +import redis.asyncio as redis from pathlib import Path from typing import List, Mapping, NamedTuple, Optional, Sequence, Tuple, Union from urllib.parse import urlparse @@ -61,8 +62,133 @@ STATE_REVOCATION_PENDING = "pending" REV_REG_DEF_STATE_ACTIVE = "active" -# Module level lock -_credential_creation_lock = asyncio.Lock() + +class AsyncRedisLock: + """Class to manage Redis locks for concurrent revocation operations.""" + + def __init__(self, lock_key: str): + """Initialize the AsyncRedisLock instance. + + Args: + lock_key (str): The key to use for the Redis lock. + """ + self.lock_key = lock_key + self.lock_value = None + self._redis = None + self.acquired_at = None # NEW: Track acquisition time + + async def _get_redis(self): + if self._redis is None: + self._redis = redis.from_url( + "redis://valkey-primary:6379", decode_responses=True + ) + return self._redis + + async def __aenter__(self): + """Acquire the Redis lock.""" + redis_client = await self._get_redis() + self.lock_value = str(uuid4()) + attempt_count = 0 + start_time = time.time() + + # NEW: Initial logging + LOGGER.info( + "Attempting to acquire lock '%s' with value '%s'", + self.lock_key, + self.lock_value, + ) + + while True: + attempt_count += 1 + acquired = await redis_client.set( + self.lock_key, self.lock_value, nx=True, ex=30 + ) + if acquired: + self.acquired_at = time.time() + # NEW: Success logging with metrics + LOGGER.info( + "Lock '%s' acquired successfully by '%s' after %d attempts in %.2f seconds", + self.lock_key, + self.lock_value, + attempt_count, + self.acquired_at - start_time, + ) + break + + # NEW: Timeout protection (prevents infinite waiting) + elapsed = time.time() - start_time + if elapsed > 25: # 25 second timeout (5s buffer before Redis expires lock) + LOGGER.error( + "Failed to acquire lock '%s' after %.2f seconds and %d attempts - timeout exceeded", + self.lock_key, + elapsed, + attempt_count, + ) + raise AnonCredsRevocationError( + f"Timeout waiting for lock '{self.lock_key}' after {elapsed:.2f} seconds" + ) + + # NEW: Periodic progress logging + if attempt_count % 50 == 0: # Log every 5 seconds + LOGGER.warning( + "Still waiting for lock '%s' after %d attempts (%.2f seconds)", + self.lock_key, + attempt_count, + elapsed, + ) + + await asyncio.sleep(0.5) + + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Release the Redis lock.""" + if self.lock_value and self._redis: + lua_script = """ + if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) + else + return 0 + end + """ + try: + result = await self._redis.eval( + lua_script, 1, self.lock_key, self.lock_value + ) + held_duration = time.time() - self.acquired_at if self.acquired_at else 0 + + # NEW: Release logging with duration tracking + if result == 1: + LOGGER.info( + "Lock '%s' released successfully by '%s' after being held for %.2f seconds", + self.lock_key, + self.lock_value, + held_duration, + ) + else: + # NEW: Warning for expired locks + LOGGER.warning( + "Lock '%s' was already expired or released. Expected value '%s', held for %.2f seconds", + self.lock_key, + self.lock_value, + held_duration, + ) + except Exception as e: + # NEW: Error handling for release failures + LOGGER.error( + "Error releasing lock '%s' with value '%s': %s", + self.lock_key, + self.lock_value, + str(e), + ) + finally: + try: + await self._redis.close() + LOGGER.info("Redis connection closed for lock '%s'", self.lock_key) + except Exception as e: + LOGGER.error("Error closing Redis connection: %s", str(e)) + + class AnonCredsRevocationError(BaseError): """Generic revocation error.""" @@ -975,39 +1101,42 @@ async def _create_credential( A tuple of created credential and revocation ID """ - async with _credential_creation_lock: - def _handle_missing_entries(rev_list: Entry, rev_reg_def: Entry, rev_key: Entry): - if not rev_list: - raise AnonCredsRevocationError("Revocation registry list not found") - if not rev_reg_def: - raise AnonCredsRevocationError("Revocation registry definition not found") - if not rev_key: - raise AnonCredsRevocationError( - "Revocation registry definition private data not found" - ) - - def _has_required_id_and_tails_path(): - return rev_reg_def_id and tails_file_path - + + def _handle_missing_entries(rev_list: Entry, rev_reg_def: Entry, rev_key: Entry): + if not rev_list: + raise AnonCredsRevocationError("Revocation registry list not found") + if not rev_reg_def: + raise AnonCredsRevocationError("Revocation registry definition not found") + if not rev_key: + raise AnonCredsRevocationError( + "Revocation registry definition private data not found" + ) + + def _has_required_id_and_tails_path(): + return rev_reg_def_id and tails_file_path + + async with AsyncRedisLock("acapy_credential_creation_lock"): revoc = None credential_revocation_id = None rev_list = None - + if _has_required_id_and_tails_path(): async with self.profile.session() as session: rev_reg_def = await session.handle.fetch( CATEGORY_REV_REG_DEF, rev_reg_def_id ) - rev_list = await session.handle.fetch(CATEGORY_REV_LIST, rev_reg_def_id) + rev_list = await session.handle.fetch( + CATEGORY_REV_LIST, rev_reg_def_id + ) rev_key = await session.handle.fetch( CATEGORY_REV_REG_DEF_PRIVATE, rev_reg_def_id ) - + _handle_missing_entries(rev_list, rev_reg_def, rev_key) - + rev_list_value_json = rev_list.value_json rev_list_tags = rev_list.tags - + # If the rev_list state is failed then the tails file was never uploaded, # try to upload it now and finish the revocation list if rev_list_tags.get("state") == RevListState.STATE_FAILED: @@ -1015,7 +1144,7 @@ def _has_required_id_and_tails_path(): RevRegDef.deserialize(rev_reg_def.value_json) ) rev_list_tags["state"] = RevListState.STATE_FINISHED - + rev_reg_index = rev_list_value_json["next_index"] try: rev_reg_def = RevocationRegistryDefinition.load(rev_reg_def.raw_value) @@ -1024,14 +1153,16 @@ def _has_required_id_and_tails_path(): raise AnonCredsRevocationError( "Error loading revocation registry" ) from err - + # NOTE: we increment the index ahead of time to keep the # transaction short. The revocation registry itself will NOT # be updated because we always use ISSUANCE_BY_DEFAULT. # If something goes wrong later, the index will be skipped. # FIXME - double check issuance type in case of upgraded wallet? if rev_reg_index > rev_reg_def.max_cred_num: - raise AnonCredsRevocationRegistryFullError("Revocation registry is full") + raise AnonCredsRevocationRegistryFullError( + "Revocation registry is full" + ) rev_list_value_json["next_index"] = rev_reg_index + 1 async with self.profile.transaction() as txn: await txn.handle.replace( @@ -1041,7 +1172,7 @@ def _has_required_id_and_tails_path(): tags=rev_list_tags, ) await txn.commit() - + revoc = CredentialRevocationConfig( rev_reg_def, rev_key.raw_value, @@ -1049,11 +1180,11 @@ def _has_required_id_and_tails_path(): rev_reg_index, ) credential_revocation_id = str(rev_reg_index) - + cred_def, cred_def_private = await self._get_cred_def_objects( credential_definition_id ) - + try: credential = await asyncio.get_event_loop().run_in_executor( None, @@ -1070,7 +1201,7 @@ def _has_required_id_and_tails_path(): ) except AnoncredsError as err: raise AnonCredsRevocationError("Error creating credential") from err - + return credential.to_json(), credential_revocation_id async def create_credential( diff --git a/poetry.lock b/poetry.lock index f7bc021218..429457fd72 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1324,6 +1324,8 @@ python-versions = "*" groups = ["main"] files = [ {file = "jsonpath-ng-1.7.0.tar.gz", hash = "sha256:f6f5f7fd4e5ff79c785f1573b394043b39849fb2bb47bcead935d12b00beab3c"}, + {file = "jsonpath_ng-1.7.0-py2-none-any.whl", hash = "sha256:898c93fc173f0c336784a3fa63d7434297544b7198124a68f9a3ef9597b0ae6e"}, + {file = "jsonpath_ng-1.7.0-py3-none-any.whl", hash = "sha256:f3d7f9e848cba1b6da28c55b1c26ff915dc9e0b1ba7e752a53d6da8d5cbd00b6"}, ] [package.dependencies] @@ -1431,8 +1433,11 @@ files = [ {file = "lxml-5.4.0-cp36-cp36m-win_amd64.whl", hash = "sha256:7ce1a171ec325192c6a636b64c94418e71a1964f56d002cc28122fceff0b6121"}, {file = "lxml-5.4.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:795f61bcaf8770e1b37eec24edf9771b307df3af74d1d6f27d812e15a9ff3872"}, {file = "lxml-5.4.0-cp37-cp37m-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:29f451a4b614a7b5b6c2e043d7b64a15bd8304d7e767055e8ab68387a8cacf4e"}, + {file = "lxml-5.4.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:891f7f991a68d20c75cb13c5c9142b2a3f9eb161f1f12a9489c82172d1f133c0"}, {file = "lxml-5.4.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4aa412a82e460571fad592d0f93ce9935a20090029ba08eca05c614f99b0cc92"}, + {file = "lxml-5.4.0-cp37-cp37m-manylinux_2_28_aarch64.whl", hash = "sha256:ac7ba71f9561cd7d7b55e1ea5511543c0282e2b6450f122672a2694621d63b7e"}, {file = "lxml-5.4.0-cp37-cp37m-manylinux_2_28_x86_64.whl", hash = "sha256:c5d32f5284012deaccd37da1e2cd42f081feaa76981f0eaa474351b68df813c5"}, + {file = "lxml-5.4.0-cp37-cp37m-musllinux_1_2_aarch64.whl", hash = "sha256:ce31158630a6ac85bddd6b830cffd46085ff90498b397bd0a259f59d27a12188"}, {file = "lxml-5.4.0-cp37-cp37m-musllinux_1_2_x86_64.whl", hash = "sha256:31e63621e073e04697c1b2d23fcb89991790eef370ec37ce4d5d469f40924ed6"}, {file = "lxml-5.4.0-cp37-cp37m-win32.whl", hash = "sha256:be2ba4c3c5b7900246a8f866580700ef0d538f2ca32535e991027bdaba944063"}, {file = "lxml-5.4.0-cp37-cp37m-win_amd64.whl", hash = "sha256:09846782b1ef650b321484ad429217f5154da4d6e786636c38e434fa32e94e49"}, @@ -2621,6 +2626,23 @@ all = ["pillow (>=9.1.0)", "pypng"] pil = ["pillow (>=9.1.0)"] png = ["pypng"] +[[package]] +name = "redis" +version = "6.2.0" +description = "Python client for Redis database and key-value store" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "redis-6.2.0-py3-none-any.whl", hash = "sha256:c8ddf316ee0aab65f04a11229e94a64b2618451dab7a67cb2f77eb799d872d5e"}, + {file = "redis-6.2.0.tar.gz", hash = "sha256:e821f129b75dde6cb99dd35e5c76e8c49512a5a0d8dfdc560b2fbd44b85ca977"}, +] + +[package.extras] +hiredis = ["hiredis (>=3.2.0)"] +jwt = ["pyjwt (>=2.9.0)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (>=20.0.1)", "requests (>=2.31.0)"] + [[package]] name = "requests" version = "2.32.3" @@ -3037,4 +3059,4 @@ didcommv2 = ["didcomm-messaging"] [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "e84b0518100bb7fe87bcb9140d011a095c1a3a5ae89c71d5ead2f5887c65c565" +content-hash = "bb0ca5cc0d7bf7fec64c02605089bb4e10fcc36afaf4049007b287f64c334be8" diff --git a/pyproject.toml b/pyproject.toml index 76748cb1ee..c84e9ad892 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,6 +67,7 @@ ursa-bbs-signatures = { version = "~1.0.1", optional = true } # didcommv2 didcomm-messaging = { version = "^0.1.1a0", optional = true } canonicaljson = "^2.0.0" +redis = "^6.2.0" [tool.poetry.group.dev.dependencies] # Sync with version in .pre-commit-config.yaml and .github/workflows/format.yml