Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 159 additions & 28 deletions acapy_agent/anoncreds/revocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import os
import time
import redis.asyncio as redis
from pathlib import Path
from typing import List, Mapping, NamedTuple, Optional, Sequence, Tuple, Union
from urllib.parse import urlparse
Expand Down Expand Up @@ -61,8 +62,133 @@
STATE_REVOCATION_PENDING = "pending"
REV_REG_DEF_STATE_ACTIVE = "active"

# Module level lock
_credential_creation_lock = asyncio.Lock()

class AsyncRedisLock:
"""Class to manage Redis locks for concurrent revocation operations."""

def __init__(self, lock_key: str):
"""Initialize the AsyncRedisLock instance.

Args:
lock_key (str): The key to use for the Redis lock.
"""
self.lock_key = lock_key
self.lock_value = None
self._redis = None
self.acquired_at = None # NEW: Track acquisition time

async def _get_redis(self):
if self._redis is None:
self._redis = redis.from_url(
"redis://valkey-primary:6379", decode_responses=True
)
return self._redis

async def __aenter__(self):
"""Acquire the Redis lock."""
redis_client = await self._get_redis()
self.lock_value = str(uuid4())
attempt_count = 0
start_time = time.time()

# NEW: Initial logging
LOGGER.info(
"Attempting to acquire lock '%s' with value '%s'",
self.lock_key,
self.lock_value,
)

while True:
attempt_count += 1
acquired = await redis_client.set(
self.lock_key, self.lock_value, nx=True, ex=30
)
if acquired:
self.acquired_at = time.time()
# NEW: Success logging with metrics
LOGGER.info(
"Lock '%s' acquired successfully by '%s' after %d attempts in %.2f seconds",
self.lock_key,
self.lock_value,
attempt_count,
self.acquired_at - start_time,
)
break

# NEW: Timeout protection (prevents infinite waiting)
elapsed = time.time() - start_time
if elapsed > 25: # 25 second timeout (5s buffer before Redis expires lock)
LOGGER.error(
"Failed to acquire lock '%s' after %.2f seconds and %d attempts - timeout exceeded",
self.lock_key,
elapsed,
attempt_count,
)
raise AnonCredsRevocationError(
f"Timeout waiting for lock '{self.lock_key}' after {elapsed:.2f} seconds"
)

# NEW: Periodic progress logging
if attempt_count % 50 == 0: # Log every 5 seconds
LOGGER.warning(
"Still waiting for lock '%s' after %d attempts (%.2f seconds)",
self.lock_key,
attempt_count,
elapsed,
)

await asyncio.sleep(0.5)

return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Release the Redis lock."""
if self.lock_value and self._redis:
lua_script = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"""
try:
result = await self._redis.eval(
lua_script, 1, self.lock_key, self.lock_value
)
held_duration = time.time() - self.acquired_at if self.acquired_at else 0

# NEW: Release logging with duration tracking
if result == 1:
LOGGER.info(
"Lock '%s' released successfully by '%s' after being held for %.2f seconds",
self.lock_key,
self.lock_value,
held_duration,
)
else:
# NEW: Warning for expired locks
LOGGER.warning(
"Lock '%s' was already expired or released. Expected value '%s', held for %.2f seconds",
self.lock_key,
self.lock_value,
held_duration,
)
except Exception as e:
# NEW: Error handling for release failures
LOGGER.error(
"Error releasing lock '%s' with value '%s': %s",
self.lock_key,
self.lock_value,
str(e),
)
finally:
try:
await self._redis.close()
LOGGER.info("Redis connection closed for lock '%s'", self.lock_key)
except Exception as e:
LOGGER.error("Error closing Redis connection: %s", str(e))


class AnonCredsRevocationError(BaseError):
"""Generic revocation error."""

Expand Down Expand Up @@ -975,47 +1101,50 @@ async def _create_credential(
A tuple of created credential and revocation ID

"""
async with _credential_creation_lock:
def _handle_missing_entries(rev_list: Entry, rev_reg_def: Entry, rev_key: Entry):
if not rev_list:
raise AnonCredsRevocationError("Revocation registry list not found")
if not rev_reg_def:
raise AnonCredsRevocationError("Revocation registry definition not found")
if not rev_key:
raise AnonCredsRevocationError(
"Revocation registry definition private data not found"
)

def _has_required_id_and_tails_path():
return rev_reg_def_id and tails_file_path


def _handle_missing_entries(rev_list: Entry, rev_reg_def: Entry, rev_key: Entry):
if not rev_list:
raise AnonCredsRevocationError("Revocation registry list not found")
if not rev_reg_def:
raise AnonCredsRevocationError("Revocation registry definition not found")
if not rev_key:
raise AnonCredsRevocationError(
"Revocation registry definition private data not found"
)

def _has_required_id_and_tails_path():
return rev_reg_def_id and tails_file_path

async with AsyncRedisLock("acapy_credential_creation_lock"):
revoc = None
credential_revocation_id = None
rev_list = None

if _has_required_id_and_tails_path():
async with self.profile.session() as session:
rev_reg_def = await session.handle.fetch(
CATEGORY_REV_REG_DEF, rev_reg_def_id
)
rev_list = await session.handle.fetch(CATEGORY_REV_LIST, rev_reg_def_id)
rev_list = await session.handle.fetch(
CATEGORY_REV_LIST, rev_reg_def_id
)
rev_key = await session.handle.fetch(
CATEGORY_REV_REG_DEF_PRIVATE, rev_reg_def_id
)

_handle_missing_entries(rev_list, rev_reg_def, rev_key)

rev_list_value_json = rev_list.value_json
rev_list_tags = rev_list.tags

# If the rev_list state is failed then the tails file was never uploaded,
# try to upload it now and finish the revocation list
if rev_list_tags.get("state") == RevListState.STATE_FAILED:
await self.upload_tails_file(
RevRegDef.deserialize(rev_reg_def.value_json)
)
rev_list_tags["state"] = RevListState.STATE_FINISHED

rev_reg_index = rev_list_value_json["next_index"]
try:
rev_reg_def = RevocationRegistryDefinition.load(rev_reg_def.raw_value)
Expand All @@ -1024,14 +1153,16 @@ def _has_required_id_and_tails_path():
raise AnonCredsRevocationError(
"Error loading revocation registry"
) from err

# NOTE: we increment the index ahead of time to keep the
# transaction short. The revocation registry itself will NOT
# be updated because we always use ISSUANCE_BY_DEFAULT.
# If something goes wrong later, the index will be skipped.
# FIXME - double check issuance type in case of upgraded wallet?
if rev_reg_index > rev_reg_def.max_cred_num:
raise AnonCredsRevocationRegistryFullError("Revocation registry is full")
raise AnonCredsRevocationRegistryFullError(
"Revocation registry is full"
)
rev_list_value_json["next_index"] = rev_reg_index + 1
async with self.profile.transaction() as txn:
await txn.handle.replace(
Expand All @@ -1041,19 +1172,19 @@ def _has_required_id_and_tails_path():
tags=rev_list_tags,
)
await txn.commit()

revoc = CredentialRevocationConfig(
rev_reg_def,
rev_key.raw_value,
rev_list,
rev_reg_index,
)
credential_revocation_id = str(rev_reg_index)

cred_def, cred_def_private = await self._get_cred_def_objects(
credential_definition_id
)

try:
credential = await asyncio.get_event_loop().run_in_executor(
None,
Expand All @@ -1070,7 +1201,7 @@ def _has_required_id_and_tails_path():
)
except AnoncredsError as err:
raise AnonCredsRevocationError("Error creating credential") from err

return credential.to_json(), credential_revocation_id

async def create_credential(
Expand Down
24 changes: 23 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ ursa-bbs-signatures = { version = "~1.0.1", optional = true }
# didcommv2
didcomm-messaging = { version = "^0.1.1a0", optional = true }
canonicaljson = "^2.0.0"
redis = "^6.2.0"

[tool.poetry.group.dev.dependencies]
# Sync with version in .pre-commit-config.yaml and .github/workflows/format.yml
Expand Down