Fix the bug of get worker host before read range using s3 api#97
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes a bug where the worker host was not being properly retrieved before reading a range using the S3 API, and introduces comprehensive logging improvements with tagged loggers for better observability across different components.
Key Changes:
- Introduced
_get_s3_worker_address()method to properly fetch worker address before S3 range reads - Refactored logging architecture with
TagAdapterandTagFilterfor component-specific log filtering - Consolidated logger initialization across all components (AlluxioClient, LocalCacheManager, CachedFileReader, prefetch policies, etc.)
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| setup.py | Version bumped to 1.1.18rc1 for release candidate |
| alluxiofs/core.py | Refactored logger initialization with TagAdapter; separated fallback logging; fixed print statements to use logger |
| alluxiofs/client/utils.py | Added TagAdapter and TagFilter classes; enhanced setup_logger with tag filtering; updated prefetch policy instantiation to pass config |
| alluxiofs/client/transfer.py | Updated UFSUpdater logger initialization with TagAdapter and proper config handling |
| alluxiofs/client/prefetch_policy.py | Added config parameter and logger initialization to all prefetch policy classes |
| alluxiofs/client/core.py | Removed logger parameter from AlluxioClient constructor; added _get_s3_worker_address method; removed unused path_id parameter from range read methods; simplified LocalCacheManager instantiation |
| alluxiofs/client/config.py | Added log_level, log_dir, and log_tag_allowlist configuration parameters with validation |
| alluxiofs/client/cache.py | Refactored LocalCacheManager and CachedFileReader to use config object; fixed hardcoded port 29998 bug; improved atomic file loading; enhanced logging with tags |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| setup( | ||
| name="alluxiofs", | ||
| version="1.1.18", | ||
| version="1.1.18rc1", |
There was a problem hiding this comment.
The version string "1.1.18rc1" uses an unconventional format. Python packaging conventions typically use either "1.1.18.rc1" or "1.1.18rc.1" for release candidates. The current format may not be properly recognized by tools like pip for version comparisons.
| version="1.1.18rc1", | |
| version="1.1.18rc.1", |
| "Exception in _atomic_write pre-check:", | ||
| e, | ||
| traceback.print_exc(), |
There was a problem hiding this comment.
The logger.info() call is incorrectly using multiple arguments. Logger methods should use a single formatted string, not multiple positional arguments. The call traceback.print_exc() returns None and prints to stderr, so it won't be captured in the log message. This should be formatted as a single string using traceback.format_exc() instead.
| "Exception in _atomic_write pre-check:", | |
| e, | |
| traceback.print_exc(), | |
| f"Exception in _atomic_write pre-check: {e}\n{traceback.format_exc()}" |
| log_tags=self.config.log_tag_allowlist | ||
| if self.config | ||
| else None, |
There was a problem hiding this comment.
When alluxio is None, self.config will also be None (line 38), but then at line 56 the code tries to access self.config.log_tag_allowlist which will raise an AttributeError. The conditional check at line 57-58 happens after the attribute access, not before it.
| log_tags=self.config.log_tag_allowlist | |
| if self.config | |
| else None, | |
| log_tags=self.config.log_tag_allowlist if self.config is not None else None, |
| def __init__(self, block_size, config=None): | ||
| super().__init__(block_size) | ||
| self.local_cache_prefetch_ahead_blocks = ( | ||
| local_cache_prefetch_ahead_blocks | ||
| self.config = config | ||
| base_logger = setup_logger( | ||
| self.config.log_dir, | ||
| self.config.log_level, | ||
| self.__class__.__name__, | ||
| self.config.log_tag_allowlist, | ||
| ) | ||
| self.logger = TagAdapter(base_logger, {"tag": "[PREFETCH]"}) |
There was a problem hiding this comment.
The config parameter could be None but is immediately dereferenced without a null check. In all three prefetch policy constructors (NoPrefetchPolicy, FixedWindowPrefetchPolicy, AdaptiveWindowPrefetchPolicy), config is accessed via config.log_dir, config.log_level, etc., which will raise AttributeError if config is None. Add a null check or make config a required parameter.
| except Exception as e: | ||
| print( | ||
| self.logger.info( | ||
| "Exception in _atomic_write pre-check:", | ||
| e, | ||
| traceback.print_exc(), | ||
| ) | ||
| try: | ||
| # Step 2: Write to temporary file | ||
| temp_file = tempfile.NamedTemporaryFile( | ||
| dir=tmp_pool_dir, delete=False | ||
| ) | ||
| temp_file.close() | ||
| with open(temp_file.name, "wb") as f: | ||
| with tempfile.NamedTemporaryFile( | ||
| dir=tmp_pool_dir, delete=False, mode="wb" |
There was a problem hiding this comment.
If the first try block (lines 240-244) raises an exception, tmp_pool_dir may not be initialized, causing a NameError when used at line 255. The exception is logged but execution continues, which will lead to a crash. Either re-raise the exception or ensure tmp_pool_dir has a default value.
| def __init__(self, block_size, config=None): | ||
| super().__init__(block_size) | ||
| self.config = config | ||
| base_logger = setup_logger( | ||
| self.config.log_dir, | ||
| self.config.log_level, | ||
| self.__class__.__name__, | ||
| self.config.log_tag_allowlist, | ||
| ) | ||
| self.logger = TagAdapter(base_logger, {"tag": "[PREFETCH]"}) |
There was a problem hiding this comment.
The config parameter could be None but is immediately dereferenced without a null check. In all three prefetch policy constructors (NoPrefetchPolicy, FixedWindowPrefetchPolicy, AdaptiveWindowPrefetchPolicy), config is accessed via config.log_dir, config.log_level, etc., which will raise AttributeError if config is None. Add a null check or make config a required parameter.
| def __init__(self, block_size, config=None): | ||
| super().__init__(block_size) | ||
| self.prefetch_ahead = 0 | ||
| self.max_prefetch = local_cache_max_prefetch_blocks | ||
| self.config = config | ||
| base_logger = setup_logger( | ||
| self.config.log_dir, | ||
| self.config.log_level, | ||
| self.__class__.__name__, | ||
| self.config.log_tag_allowlist, | ||
| ) | ||
| self.logger = TagAdapter(base_logger, {"tag": "[PREFETCH]"}) | ||
| self.max_prefetch = self.config.local_cache_max_prefetch_blocks |
There was a problem hiding this comment.
The config parameter could be None but is immediately dereferenced without a null check. In all three prefetch policy constructors (NoPrefetchPolicy, FixedWindowPrefetchPolicy, AdaptiveWindowPrefetchPolicy), config is accessed via config.log_dir, config.log_level, etc., which will raise AttributeError if config is None. Add a null check or make config a required parameter.
| self.config = config | ||
| self.logger = setup_logger( | ||
| self.config.log_dir, | ||
| self.config.log_level, | ||
| self.__class__.__name__, | ||
| self.config.log_tag_allowlist, | ||
| ) |
There was a problem hiding this comment.
The config parameter could be None but is immediately dereferenced without a null check at lines 532-535. This will raise AttributeError if config is None. Add a null check or make config a required parameter.
NO JIRA