Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 104 additions & 82 deletions alluxiofs/client/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,11 @@
from fsspec.caching import Fetcher
from fsspec.caching import ReadAheadCache

from .const import ALLUXIO_LOCAL_CACHE_EVICTION_HIGH_WATERMARK
from .const import ALLUXIO_LOCAL_CACHE_EVICTION_LOW_WATERMARK
from .const import ALLUXIO_LOCAL_CACHE_EVICTION_SCAN_INTERVAL_MINUTES
from .const import ALLUXIO_LOCAL_CACHE_TTL_TIME_MINUTES
from .const import ALLUXIO_REQUEST_MAX_RETRIES
from .const import ALLUXIO_REQUEST_MAX_TIMEOUT_SECONDS
from .const import DEFAULT_LOCAL_CACHE_BLOCK_SIZE_MB
from .const import DEFAULT_LOCAL_CACHE_SIZE_GB
from .const import LOCAL_CACHE_DIR_DEFAULT
from .const import MAGIC_SIZE
from .utils import _c_send_get_request_write_file
from .utils import get_prefetch_policy
from .utils import setup_logger
from .utils import TagAdapter


# =========================================================
Expand All @@ -42,34 +35,39 @@ class BlockStatus(Enum):
class LocalCacheManager:
def __init__(
self,
cache_dir=LOCAL_CACHE_DIR_DEFAULT,
max_cache_size=DEFAULT_LOCAL_CACHE_SIZE_GB,
block_size=DEFAULT_LOCAL_CACHE_BLOCK_SIZE_MB,
http_max_retries=ALLUXIO_REQUEST_MAX_RETRIES,
http_timeouts=ALLUXIO_REQUEST_MAX_TIMEOUT_SECONDS,
eviction_high_watermark=ALLUXIO_LOCAL_CACHE_EVICTION_HIGH_WATERMARK,
eviction_low_watermark=ALLUXIO_LOCAL_CACHE_EVICTION_LOW_WATERMARK,
eviction_scan_interval=int(
ALLUXIO_LOCAL_CACHE_EVICTION_SCAN_INTERVAL_MINUTES * 60
),
ttl_time_seconds=int(ALLUXIO_LOCAL_CACHE_TTL_TIME_MINUTES * 60),
logger=None,
config,
):
self.config = config
base_logger = setup_logger(
self.config.log_dir,
self.config.log_level,
self.__class__.__name__,
self.config.log_tag_allowlist,
)
self.logger = TagAdapter(base_logger, {"tag": "[LOCAL_CACHE]"})

self.block_size = (
int(self.config.local_cache_block_size_mb) * 1024 * 1024
)
self.http_max_retries = self.config.http_max_retries
self.http_timeouts = self.config.http_timeouts
self.eviction_high_watermark = (
self.config.local_cache_eviction_high_watermark
)
self.eviction_low_watermark = (
self.config.local_cache_eviction_low_watermark
)
self.eviction_scan_interval = int(
self.config.local_cache_eviction_scan_interval_minutes * 60
)
self.ttl_time_seconds = int(
self.config.local_cache_ttl_time_minutes * 60
)
self.cache_dirs, self.max_cache_sizes = self._param_local_cache_dirs(
cache_dir, max_cache_size
self.config.local_cache_dir, self.config.local_cache_size_gb
)
self.http_max_retries = http_max_retries
self.http_timeouts = http_timeouts
self.block_size = int(block_size * 1024 * 1024)
self.eviction_high_watermark = eviction_high_watermark
self.eviction_low_watermark = eviction_low_watermark
self.logger = logger
self.current_cache_sizes = [Value("l", 0) for _ in self.cache_dirs]
self.ttl_time_seconds = ttl_time_seconds

self._load_existing_cache()

self.eviction_scan_interval = eviction_scan_interval
self._stop_eviction_event = threading.Event()
self._eviction_thread = threading.Thread(
target=self._run_eviction_monitor,
Expand All @@ -79,7 +77,7 @@ def __init__(
self._eviction_thread.start()
if self.logger:
self.logger.debug(
f"[CACHE] Eviction monitor thread started with interval {self.eviction_scan_interval}s"
f"Eviction monitor thread started with interval {self.eviction_scan_interval}s"
)
# ----------------------------

Expand Down Expand Up @@ -110,14 +108,12 @@ def _run_eviction_monitor(self):
self._scan_and_clean_evicted()
except Exception as e:
if self.logger:
self.logger.error(
f"[CACHE] Error in eviction monitor: {e}"
)
self.logger.error(f"Error in eviction monitor: {e}")

def _scan_and_clean_evicted(self):
"""Scan cache directories for files marked as evicted and delete them."""
if self.logger:
self.logger.debug("[CACHE] Starting background eviction scan...")
self.logger.debug("Starting background eviction scan...")

cleaned_count = 0

Expand All @@ -139,13 +135,11 @@ def _scan_and_clean_evicted(self):
cleaned_count += 1
except Exception as e:
if self.logger:
self.logger.warning(
f"[CACHE] Failed to scan dir {data_dir}: {e}"
)
self.logger.warning(f"Failed to scan dir {data_dir}: {e}")

if self.logger and cleaned_count > 0:
self.logger.debug(
f"[CACHE] Background scan finished. Cleaned {cleaned_count} files."
f"Background scan finished. Cleaned {cleaned_count} files."
)

def shutdown(self):
Expand All @@ -154,7 +148,7 @@ def shutdown(self):
if self._eviction_thread.is_alive():
self._eviction_thread.join(timeout=2)
if self.logger:
self.logger.debug("[CACHE] LocalCacheManager shut down.")
self.logger.debug("LocalCacheManager shut down.")

def _param_local_cache_dirs(self, cache_dir, max_cache_size):
"""Parse cache directories and their respective sizes."""
Expand Down Expand Up @@ -243,36 +237,46 @@ def _atomic_write(
"""
import traceback

tmp_pool_dir = "/tmp/local_cache_tmp_pool"
hash_index = 0
try:
hash_index = self._get_local_cache_index_dir_for_file(file_path)
tmp_pool_dir = self._get_cache_tmp_pool_dir(hash_index)

self._evict_if_needed(hash_index, end - start)
temp_file = None
except Exception as e:
print(
self.logger.info(
"Exception in _atomic_write pre-check:",
e,
traceback.print_exc(),
Comment on lines 250 to 252

Copilot AI Dec 16, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logger.info() call is incorrectly using multiple arguments. Logger methods should use a single formatted string, not multiple positional arguments. The call traceback.print_exc() returns None and prints to stderr, so it won't be captured in the log message. This should be formatted as a single string using traceback.format_exc() instead.

Suggested change
"Exception in _atomic_write pre-check:",
e,
traceback.print_exc(),
f"Exception in _atomic_write pre-check: {e}\n{traceback.format_exc()}"

Copilot uses AI. Check for mistakes.
)
try:
# Step 2: Write to temporary file
temp_file = tempfile.NamedTemporaryFile(
dir=tmp_pool_dir, delete=False
)
temp_file.close()
with open(temp_file.name, "wb") as f:
with tempfile.NamedTemporaryFile(
dir=tmp_pool_dir, delete=False, mode="wb"
Comment on lines 252 to +257

Copilot AI Dec 16, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the first try block (lines 240-244) raises an exception, tmp_pool_dir may not be initialized, causing a NameError when used at line 255. The exception is logged but execution continues, which will lead to a crash. Either re-raise the exception or ensure tmp_pool_dir has a default value.

Copilot uses AI. Check for mistakes.
) as temp_file:
temp_file_name = temp_file.name

self._fetch_range_via_shell(
f, worker_host, worker_http_port, alluxio_path, start, end
temp_file,
worker_host,
worker_http_port,
alluxio_path,
start,
end,
)
temp_file.flush()
os.fsync(temp_file.fileno())

# Step 3: Atomic rename
os.rename(temp_file.name, file_path_hashed)
os.rename(temp_file_name, file_path_hashed)
self._set_file_cached(file_path_hashed)
with self.current_cache_sizes[hash_index].get_lock():
self.current_cache_sizes[hash_index].value += end - start
if self.logger:
self.logger.debug(
f"[CACHE] Atomic write completed: {file_path_hashed}"
f"Atomic write completed: {file_path_hashed}"
)
return True

Expand All @@ -282,9 +286,7 @@ def _atomic_write(
os.remove(temp_file.name)
self._set_file_absent(file_path_hashed)
if self.logger:
self.logger.debug(
f"[CACHE] Write failed for {file_path_hashed}: {e}"
)
self.logger.debug(f"Write failed for {file_path_hashed}: {e}")
return False

def get_file_status(self, file_path, part_index):
Expand Down Expand Up @@ -335,27 +337,43 @@ def _truly_evict_file(self, file_path_evicted, hash_index=None):
self.current_cache_sizes[hash_index].value -= size
if self.logger:
self.logger.debug(
f"[LRU] Evicted old cache: {file_path_evicted} ({size} bytes)"
f"Evicted old cache: {file_path_evicted} ({size} bytes)"
)
except FileNotFoundError:
pass
except Exception as e:
if self.logger:
self.logger.debug(
f"[LRU] Failed to evict cache {file_path_evicted}: {e}"
f"Failed to evict cache {file_path_evicted}: {e}"
)

def set_file_loading(self, file_path, part_index):
file_path_hashed = self._get_local_path(file_path, part_index)
self._set_file_loading(file_path_hashed)
return self._set_file_loading(file_path_hashed)

def _set_file_loading(self, file_path_hashed):
"""Set the file status to LOADING."""
"""
Atomically try to set file status to LOADING.
Returns True if successful, False if already loading/cached.
"""
# Check if already cached first
if os.path.exists(file_path_hashed):
return False
try:
with open(file_path_hashed + "_loading", "x"):
pass
# Atomic exclusive create - only one thread can succeed
fd = os.open(
file_path_hashed + "_loading",
os.O_CREAT | os.O_EXCL | os.O_WRONLY,
)
os.close(fd)
return True
except FileExistsError:
pass
# Another thread is already loading this block
return False
except OSError as e:
if self.logger:
self.logger.debug(f"Failed to set loading status: {e}")
return False

def _set_file_cached(self, file_path_hashed):
"""Set the file status to CACHED."""
Expand Down Expand Up @@ -440,9 +458,7 @@ def read_from_cache(self, file_path, part_index, offset, length):
data = os.pread(f.fileno(), length, offset)
except (IOError, OSError) as e:
if self.logger:
self.logger.debug(
f"[CACHE] Read error: {file_path_hashed}: {e}"
)
self.logger.debug(f"Read error: {file_path_hashed}: {e}")
self._set_file_absent(file_path_hashed)
return None, BlockStatus.ABSENT

Expand Down Expand Up @@ -511,11 +527,20 @@ def __init__(
alluxio=None,
data_manager=None,
thread_pool=ThreadPoolExecutor(4),
logger=None,
config=None,
):
self.config = config
self.logger = setup_logger(
self.config.log_dir,
self.config.log_level,
self.__class__.__name__,
self.config.log_tag_allowlist,
)
Comment on lines +532 to +538

Copilot AI Dec 16, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config parameter could be None but is immediately dereferenced without a null check at lines 532-535. This will raise AttributeError if config is None. Add a null check or make config a required parameter.

Copilot uses AI. Check for mistakes.

self.logger = TagAdapter(self.logger, {"tag": "[LOCAL_CACHE]"})

self.cache = data_manager
self.block_size = data_manager.block_size
self.logger = logger if logger is not None else None
self.alluxio_client = alluxio
self.pool = thread_pool
self.prefetch_policy = get_prefetch_policy(
Expand All @@ -525,14 +550,14 @@ def __init__(

def close(self):
if self.logger:
self.logger.debug("[FileReader] Closing worker pool...")
self.logger.debug("Closing CacheFileReader...")
self.pool.shutdown(wait=True)
if self.cache:
self.cache.shutdown()

def _get_preferred_worker_address(self, file_path):
def _get_s3_worker_address(self, file_path):
"""Mock: Returns the preferred worker host and HTTP port."""
return self.alluxio_client._get_preferred_worker_address(file_path)
return self.alluxio_client._get_s3_worker_address(file_path)

def _get_path_hash(self, file_path):
"""Generate a stable hash for the given file path."""
Expand All @@ -548,32 +573,32 @@ def _fetch_block(self, args):
alluxio_path,
worker_host,
worker_http_port,
path_id,
block_index,
start,
end,
) = args
try:
self.cache.set_file_loading(file_path, block_index)
if not self.cache.set_file_loading(file_path, block_index):
return
self.cache.add_to_cache(
file_path,
block_index,
worker_host,
29998,
worker_http_port,
alluxio_path,
start,
end,
)
if self.logger:
self.logger.debug(
f"[BLOCK] Block download complete: {file_path}_{block_index}, size={end - start}B"
f"Block download complete: {file_path}_{block_index}, size={end - start}B"
)
except FileExistsError:
return
except Exception as e:
if self.logger:
self.logger.debug(
f"[ERROR] Failed to download block ({block_index}): {e}"
self.logger.warning(
f"Failed to download block ({block_index}): {e}"
)

def _parallel_download_file(
Expand All @@ -585,10 +610,7 @@ def _parallel_download_file(
file_size=None,
):
"""Use multiprocessing to download the entire file in parallel (per block)."""
worker_host, worker_http_port = self._get_preferred_worker_address(
file_path
)
path_id = self._get_path_hash(file_path)
worker_host, worker_http_port = self._get_s3_worker_address(file_path)
if file_size is None:
file_size = self.get_file_length(file_path)
start_block, end_block = self.get_blocks_prefetch(
Expand All @@ -610,15 +632,14 @@ def _parallel_download_file(
alluxio_path,
worker_host,
worker_http_port,
path_id,
i,
start,
end,
)
)

self.logger.debug(
f"[DOWNLOAD] Launching {end_block - start_block} processes to download {file_path}"
f"Launching {end_block - start_block} processes to download {file_path}"
)
# self.pool.map_async(self._fetch_block, args_list)
for arg in args_list:
Expand Down Expand Up @@ -658,7 +679,6 @@ def read_file_range(
remaining_length -= part_length
else:
part_length = -1

chunk, state = self.cache.read_from_cache(
file_path, blk, part_offset, part_length
)
Expand Down Expand Up @@ -786,6 +806,8 @@ def __init__(
self.num_shards = num_shards
self.logger = logger

self.logger = TagAdapter(self.logger, {"tag": "[MEMORY_CACHE]"})

# Use regular dict with separate access tracking for better performance
self.cache_shards = {} # file_path -> ReadAheadCache
self.access_order = OrderedDict() # file_path -> access_count
Expand Down
Loading
Loading