-
Notifications
You must be signed in to change notification settings - Fork 9
Fix the bug of get worker host before read range using s3 api #97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,18 +14,11 @@ | |
| from fsspec.caching import Fetcher | ||
| from fsspec.caching import ReadAheadCache | ||
|
|
||
| from .const import ALLUXIO_LOCAL_CACHE_EVICTION_HIGH_WATERMARK | ||
| from .const import ALLUXIO_LOCAL_CACHE_EVICTION_LOW_WATERMARK | ||
| from .const import ALLUXIO_LOCAL_CACHE_EVICTION_SCAN_INTERVAL_MINUTES | ||
| from .const import ALLUXIO_LOCAL_CACHE_TTL_TIME_MINUTES | ||
| from .const import ALLUXIO_REQUEST_MAX_RETRIES | ||
| from .const import ALLUXIO_REQUEST_MAX_TIMEOUT_SECONDS | ||
| from .const import DEFAULT_LOCAL_CACHE_BLOCK_SIZE_MB | ||
| from .const import DEFAULT_LOCAL_CACHE_SIZE_GB | ||
| from .const import LOCAL_CACHE_DIR_DEFAULT | ||
| from .const import MAGIC_SIZE | ||
| from .utils import _c_send_get_request_write_file | ||
| from .utils import get_prefetch_policy | ||
| from .utils import setup_logger | ||
| from .utils import TagAdapter | ||
|
|
||
|
|
||
| # ========================================================= | ||
|
|
@@ -42,34 +35,39 @@ class BlockStatus(Enum): | |
| class LocalCacheManager: | ||
| def __init__( | ||
| self, | ||
| cache_dir=LOCAL_CACHE_DIR_DEFAULT, | ||
| max_cache_size=DEFAULT_LOCAL_CACHE_SIZE_GB, | ||
| block_size=DEFAULT_LOCAL_CACHE_BLOCK_SIZE_MB, | ||
| http_max_retries=ALLUXIO_REQUEST_MAX_RETRIES, | ||
| http_timeouts=ALLUXIO_REQUEST_MAX_TIMEOUT_SECONDS, | ||
| eviction_high_watermark=ALLUXIO_LOCAL_CACHE_EVICTION_HIGH_WATERMARK, | ||
| eviction_low_watermark=ALLUXIO_LOCAL_CACHE_EVICTION_LOW_WATERMARK, | ||
| eviction_scan_interval=int( | ||
| ALLUXIO_LOCAL_CACHE_EVICTION_SCAN_INTERVAL_MINUTES * 60 | ||
| ), | ||
| ttl_time_seconds=int(ALLUXIO_LOCAL_CACHE_TTL_TIME_MINUTES * 60), | ||
| logger=None, | ||
| config, | ||
| ): | ||
| self.config = config | ||
| base_logger = setup_logger( | ||
| self.config.log_dir, | ||
| self.config.log_level, | ||
| self.__class__.__name__, | ||
| self.config.log_tag_allowlist, | ||
| ) | ||
| self.logger = TagAdapter(base_logger, {"tag": "[LOCAL_CACHE]"}) | ||
|
|
||
| self.block_size = ( | ||
| int(self.config.local_cache_block_size_mb) * 1024 * 1024 | ||
| ) | ||
| self.http_max_retries = self.config.http_max_retries | ||
| self.http_timeouts = self.config.http_timeouts | ||
| self.eviction_high_watermark = ( | ||
| self.config.local_cache_eviction_high_watermark | ||
| ) | ||
| self.eviction_low_watermark = ( | ||
| self.config.local_cache_eviction_low_watermark | ||
| ) | ||
| self.eviction_scan_interval = int( | ||
| self.config.local_cache_eviction_scan_interval_minutes * 60 | ||
| ) | ||
| self.ttl_time_seconds = int( | ||
| self.config.local_cache_ttl_time_minutes * 60 | ||
| ) | ||
| self.cache_dirs, self.max_cache_sizes = self._param_local_cache_dirs( | ||
| cache_dir, max_cache_size | ||
| self.config.local_cache_dir, self.config.local_cache_size_gb | ||
| ) | ||
| self.http_max_retries = http_max_retries | ||
| self.http_timeouts = http_timeouts | ||
| self.block_size = int(block_size * 1024 * 1024) | ||
| self.eviction_high_watermark = eviction_high_watermark | ||
| self.eviction_low_watermark = eviction_low_watermark | ||
| self.logger = logger | ||
| self.current_cache_sizes = [Value("l", 0) for _ in self.cache_dirs] | ||
| self.ttl_time_seconds = ttl_time_seconds | ||
|
|
||
| self._load_existing_cache() | ||
|
|
||
| self.eviction_scan_interval = eviction_scan_interval | ||
| self._stop_eviction_event = threading.Event() | ||
| self._eviction_thread = threading.Thread( | ||
| target=self._run_eviction_monitor, | ||
|
|
@@ -79,7 +77,7 @@ def __init__( | |
| self._eviction_thread.start() | ||
| if self.logger: | ||
| self.logger.debug( | ||
| f"[CACHE] Eviction monitor thread started with interval {self.eviction_scan_interval}s" | ||
| f"Eviction monitor thread started with interval {self.eviction_scan_interval}s" | ||
| ) | ||
| # ---------------------------- | ||
|
|
||
|
|
@@ -110,14 +108,12 @@ def _run_eviction_monitor(self): | |
| self._scan_and_clean_evicted() | ||
| except Exception as e: | ||
| if self.logger: | ||
| self.logger.error( | ||
| f"[CACHE] Error in eviction monitor: {e}" | ||
| ) | ||
| self.logger.error(f"Error in eviction monitor: {e}") | ||
|
|
||
| def _scan_and_clean_evicted(self): | ||
| """Scan cache directories for files marked as evicted and delete them.""" | ||
| if self.logger: | ||
| self.logger.debug("[CACHE] Starting background eviction scan...") | ||
| self.logger.debug("Starting background eviction scan...") | ||
|
|
||
| cleaned_count = 0 | ||
|
|
||
|
|
@@ -139,13 +135,11 @@ def _scan_and_clean_evicted(self): | |
| cleaned_count += 1 | ||
| except Exception as e: | ||
| if self.logger: | ||
| self.logger.warning( | ||
| f"[CACHE] Failed to scan dir {data_dir}: {e}" | ||
| ) | ||
| self.logger.warning(f"Failed to scan dir {data_dir}: {e}") | ||
|
|
||
| if self.logger and cleaned_count > 0: | ||
| self.logger.debug( | ||
| f"[CACHE] Background scan finished. Cleaned {cleaned_count} files." | ||
| f"Background scan finished. Cleaned {cleaned_count} files." | ||
| ) | ||
|
|
||
| def shutdown(self): | ||
|
|
@@ -154,7 +148,7 @@ def shutdown(self): | |
| if self._eviction_thread.is_alive(): | ||
| self._eviction_thread.join(timeout=2) | ||
| if self.logger: | ||
| self.logger.debug("[CACHE] LocalCacheManager shut down.") | ||
| self.logger.debug("LocalCacheManager shut down.") | ||
|
|
||
| def _param_local_cache_dirs(self, cache_dir, max_cache_size): | ||
| """Parse cache directories and their respective sizes.""" | ||
|
|
@@ -243,36 +237,46 @@ def _atomic_write( | |
| """ | ||
| import traceback | ||
|
|
||
| tmp_pool_dir = "/tmp/local_cache_tmp_pool" | ||
| hash_index = 0 | ||
| try: | ||
| hash_index = self._get_local_cache_index_dir_for_file(file_path) | ||
| tmp_pool_dir = self._get_cache_tmp_pool_dir(hash_index) | ||
|
|
||
| self._evict_if_needed(hash_index, end - start) | ||
| temp_file = None | ||
| except Exception as e: | ||
| print( | ||
| self.logger.info( | ||
| "Exception in _atomic_write pre-check:", | ||
| e, | ||
| traceback.print_exc(), | ||
| ) | ||
| try: | ||
| # Step 2: Write to temporary file | ||
| temp_file = tempfile.NamedTemporaryFile( | ||
| dir=tmp_pool_dir, delete=False | ||
| ) | ||
| temp_file.close() | ||
| with open(temp_file.name, "wb") as f: | ||
| with tempfile.NamedTemporaryFile( | ||
| dir=tmp_pool_dir, delete=False, mode="wb" | ||
|
Comment on lines
252
to
+257
|
||
| ) as temp_file: | ||
| temp_file_name = temp_file.name | ||
|
|
||
| self._fetch_range_via_shell( | ||
| f, worker_host, worker_http_port, alluxio_path, start, end | ||
| temp_file, | ||
| worker_host, | ||
| worker_http_port, | ||
| alluxio_path, | ||
| start, | ||
| end, | ||
| ) | ||
| temp_file.flush() | ||
| os.fsync(temp_file.fileno()) | ||
|
|
||
| # Step 3: Atomic rename | ||
| os.rename(temp_file.name, file_path_hashed) | ||
| os.rename(temp_file_name, file_path_hashed) | ||
| self._set_file_cached(file_path_hashed) | ||
| with self.current_cache_sizes[hash_index].get_lock(): | ||
| self.current_cache_sizes[hash_index].value += end - start | ||
| if self.logger: | ||
| self.logger.debug( | ||
| f"[CACHE] Atomic write completed: {file_path_hashed}" | ||
| f"Atomic write completed: {file_path_hashed}" | ||
| ) | ||
| return True | ||
|
|
||
|
|
@@ -282,9 +286,7 @@ def _atomic_write( | |
| os.remove(temp_file.name) | ||
| self._set_file_absent(file_path_hashed) | ||
| if self.logger: | ||
| self.logger.debug( | ||
| f"[CACHE] Write failed for {file_path_hashed}: {e}" | ||
| ) | ||
| self.logger.debug(f"Write failed for {file_path_hashed}: {e}") | ||
| return False | ||
|
|
||
| def get_file_status(self, file_path, part_index): | ||
|
|
@@ -335,27 +337,43 @@ def _truly_evict_file(self, file_path_evicted, hash_index=None): | |
| self.current_cache_sizes[hash_index].value -= size | ||
| if self.logger: | ||
| self.logger.debug( | ||
| f"[LRU] Evicted old cache: {file_path_evicted} ({size} bytes)" | ||
| f"Evicted old cache: {file_path_evicted} ({size} bytes)" | ||
| ) | ||
| except FileNotFoundError: | ||
| pass | ||
| except Exception as e: | ||
| if self.logger: | ||
| self.logger.debug( | ||
| f"[LRU] Failed to evict cache {file_path_evicted}: {e}" | ||
| f"Failed to evict cache {file_path_evicted}: {e}" | ||
| ) | ||
|
|
||
| def set_file_loading(self, file_path, part_index): | ||
| file_path_hashed = self._get_local_path(file_path, part_index) | ||
| self._set_file_loading(file_path_hashed) | ||
| return self._set_file_loading(file_path_hashed) | ||
|
|
||
| def _set_file_loading(self, file_path_hashed): | ||
| """Set the file status to LOADING.""" | ||
| """ | ||
| Atomically try to set file status to LOADING. | ||
| Returns True if successful, False if already loading/cached. | ||
| """ | ||
| # Check if already cached first | ||
| if os.path.exists(file_path_hashed): | ||
| return False | ||
| try: | ||
| with open(file_path_hashed + "_loading", "x"): | ||
| pass | ||
| # Atomic exclusive create - only one thread can succeed | ||
| fd = os.open( | ||
| file_path_hashed + "_loading", | ||
| os.O_CREAT | os.O_EXCL | os.O_WRONLY, | ||
| ) | ||
| os.close(fd) | ||
| return True | ||
| except FileExistsError: | ||
| pass | ||
| # Another thread is already loading this block | ||
| return False | ||
| except OSError as e: | ||
| if self.logger: | ||
| self.logger.debug(f"Failed to set loading status: {e}") | ||
| return False | ||
|
|
||
| def _set_file_cached(self, file_path_hashed): | ||
| """Set the file status to CACHED.""" | ||
|
|
@@ -440,9 +458,7 @@ def read_from_cache(self, file_path, part_index, offset, length): | |
| data = os.pread(f.fileno(), length, offset) | ||
| except (IOError, OSError) as e: | ||
| if self.logger: | ||
| self.logger.debug( | ||
| f"[CACHE] Read error: {file_path_hashed}: {e}" | ||
| ) | ||
| self.logger.debug(f"Read error: {file_path_hashed}: {e}") | ||
| self._set_file_absent(file_path_hashed) | ||
| return None, BlockStatus.ABSENT | ||
|
|
||
|
|
@@ -511,11 +527,20 @@ def __init__( | |
| alluxio=None, | ||
| data_manager=None, | ||
| thread_pool=ThreadPoolExecutor(4), | ||
| logger=None, | ||
| config=None, | ||
| ): | ||
| self.config = config | ||
| self.logger = setup_logger( | ||
| self.config.log_dir, | ||
| self.config.log_level, | ||
| self.__class__.__name__, | ||
| self.config.log_tag_allowlist, | ||
| ) | ||
|
Comment on lines
+532
to
+538
|
||
|
|
||
| self.logger = TagAdapter(self.logger, {"tag": "[LOCAL_CACHE]"}) | ||
|
|
||
| self.cache = data_manager | ||
| self.block_size = data_manager.block_size | ||
| self.logger = logger if logger is not None else None | ||
| self.alluxio_client = alluxio | ||
| self.pool = thread_pool | ||
| self.prefetch_policy = get_prefetch_policy( | ||
|
|
@@ -525,14 +550,14 @@ def __init__( | |
|
|
||
| def close(self): | ||
| if self.logger: | ||
| self.logger.debug("[FileReader] Closing worker pool...") | ||
| self.logger.debug("Closing CacheFileReader...") | ||
| self.pool.shutdown(wait=True) | ||
| if self.cache: | ||
| self.cache.shutdown() | ||
|
|
||
| def _get_preferred_worker_address(self, file_path): | ||
| def _get_s3_worker_address(self, file_path): | ||
| """Mock: Returns the preferred worker host and HTTP port.""" | ||
| return self.alluxio_client._get_preferred_worker_address(file_path) | ||
| return self.alluxio_client._get_s3_worker_address(file_path) | ||
|
|
||
| def _get_path_hash(self, file_path): | ||
| """Generate a stable hash for the given file path.""" | ||
|
|
@@ -548,32 +573,32 @@ def _fetch_block(self, args): | |
| alluxio_path, | ||
| worker_host, | ||
| worker_http_port, | ||
| path_id, | ||
| block_index, | ||
| start, | ||
| end, | ||
| ) = args | ||
| try: | ||
| self.cache.set_file_loading(file_path, block_index) | ||
| if not self.cache.set_file_loading(file_path, block_index): | ||
| return | ||
| self.cache.add_to_cache( | ||
| file_path, | ||
| block_index, | ||
| worker_host, | ||
| 29998, | ||
| worker_http_port, | ||
| alluxio_path, | ||
| start, | ||
| end, | ||
| ) | ||
| if self.logger: | ||
| self.logger.debug( | ||
| f"[BLOCK] Block download complete: {file_path}_{block_index}, size={end - start}B" | ||
| f"Block download complete: {file_path}_{block_index}, size={end - start}B" | ||
| ) | ||
| except FileExistsError: | ||
| return | ||
| except Exception as e: | ||
| if self.logger: | ||
| self.logger.debug( | ||
| f"[ERROR] Failed to download block ({block_index}): {e}" | ||
| self.logger.warning( | ||
| f"Failed to download block ({block_index}): {e}" | ||
| ) | ||
|
|
||
| def _parallel_download_file( | ||
|
|
@@ -585,10 +610,7 @@ def _parallel_download_file( | |
| file_size=None, | ||
| ): | ||
| """Use multiprocessing to download the entire file in parallel (per block).""" | ||
| worker_host, worker_http_port = self._get_preferred_worker_address( | ||
| file_path | ||
| ) | ||
| path_id = self._get_path_hash(file_path) | ||
| worker_host, worker_http_port = self._get_s3_worker_address(file_path) | ||
| if file_size is None: | ||
| file_size = self.get_file_length(file_path) | ||
| start_block, end_block = self.get_blocks_prefetch( | ||
|
|
@@ -610,15 +632,14 @@ def _parallel_download_file( | |
| alluxio_path, | ||
| worker_host, | ||
| worker_http_port, | ||
| path_id, | ||
| i, | ||
| start, | ||
| end, | ||
| ) | ||
| ) | ||
|
|
||
| self.logger.debug( | ||
| f"[DOWNLOAD] Launching {end_block - start_block} processes to download {file_path}" | ||
| f"Launching {end_block - start_block} processes to download {file_path}" | ||
| ) | ||
| # self.pool.map_async(self._fetch_block, args_list) | ||
| for arg in args_list: | ||
|
|
@@ -658,7 +679,6 @@ def read_file_range( | |
| remaining_length -= part_length | ||
| else: | ||
| part_length = -1 | ||
|
|
||
| chunk, state = self.cache.read_from_cache( | ||
| file_path, blk, part_offset, part_length | ||
| ) | ||
|
|
@@ -786,6 +806,8 @@ def __init__( | |
| self.num_shards = num_shards | ||
| self.logger = logger | ||
|
|
||
| self.logger = TagAdapter(self.logger, {"tag": "[MEMORY_CACHE]"}) | ||
|
|
||
| # Use regular dict with separate access tracking for better performance | ||
| self.cache_shards = {} # file_path -> ReadAheadCache | ||
| self.access_order = OrderedDict() # file_path -> access_count | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logger.info() call is incorrectly using multiple arguments. Logger methods should use a single formatted string, not multiple positional arguments. The call traceback.print_exc() returns None and prints to stderr, so it won't be captured in the log message. This should be formatted as a single string using traceback.format_exc() instead.