Support skip alluxio#102
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds support for bypassing Alluxio and directly accessing the underlying file systems (UFS) by introducing a new skip_alluxio parameter. This enables use cases where Alluxio caching is not desired or available, while still leveraging the UFS access capabilities.
Key Changes:
- Introduced
UFSManageras a unified interface for managing UFS access, supporting both Alluxio-based and configuration-based UFS registration - Added
LocalUFSUpdaterto parse and register UFS configurations directly without requiring an Alluxio client - Refactored
UFSUpdaterto extendBaseUFSUpdaterfor better code reuse
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
alluxiofs/core.py |
Integrated skip_alluxio parameter, replaced UFSUpdater with UFSManager, refactored fallback execution logic into separate methods, added @fallback_handler decorator to open method |
alluxiofs/client/core.py |
Renamed data_manager to local_cache_manager for clarity |
alluxiofs/client/ufs_manager.py |
Introduced BaseUFSUpdater, LocalUFSUpdater, and UFSManager classes; refactored UFSUpdater to extend base class |
alluxiofs/client/utils.py |
Enhanced get_protocol_from_path to support protocol-only strings (e.g., "s3") in addition to full URIs |
tests/local_cache/test_utils.py |
Added test cases for updated get_protocol_from_path and convert_ufs_info_to functions |
tests/local_cache/test_ufs_manager.py |
Added comprehensive tests for LocalUFSUpdater including config parsing and validation |
setup.py |
Version bump from 1.1.18rc1 to 1.1.18rc3 |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ufs_full_path = ufs_full_path[:-1] | ||
|
|
||
| ufs_info = UfsInfo( | ||
| alluxio_path=ufs_full_path, | ||
| ufs_full_path=ufs_full_path, |
There was a problem hiding this comment.
The loop variable ufs_full_path is reassigned inside the loop body (line 344), which modifies the value used in subsequent operations. While this works correctly in this case, it's a maintainability concern as it can be confusing to readers. Consider using a different variable name for the modified path (e.g., normalized_path) to make the code clearer and avoid potential bugs if the original value is needed later.
| ufs_full_path = ufs_full_path[:-1] | |
| ufs_info = UfsInfo( | |
| alluxio_path=ufs_full_path, | |
| ufs_full_path=ufs_full_path, | |
| normalized_alluxio_path = ufs_full_path[:-1] | |
| else: | |
| normalized_alluxio_path = ufs_full_path | |
| ufs_info = UfsInfo( | |
| alluxio_path=normalized_alluxio_path, | |
| ufs_full_path=normalized_alluxio_path, |
| client = AlluxioClient(**kwargs) | ||
| self.ufs_updater = UFSUpdater(client) | ||
| self.ufs_updater.start_updater() | ||
| self.alluxio = None if skip_alluxio else client | ||
| if self.skip_alluxio is True: | ||
| ufs_config = kwargs.get("ufs_config", {}) | ||
| if not ufs_config: | ||
| raise ValueError( | ||
| "ufs_config must be provided when skip_alluxio is True." | ||
| ) | ||
| self.ufs_manager = UFSManager(config=ufs_config) | ||
| self.ufs_manager.initialize_ufs_manager() | ||
| self.alluxio = None |
There was a problem hiding this comment.
When skip_alluxio is True, an AlluxioClient is still being instantiated even though it will never be used (self.alluxio is set to None on line 152). This is inefficient and could cause initialization failures or unnecessary resource allocation. Consider moving the client instantiation inside the else block (after line 153) so it's only created when actually needed.
| ) | ||
| local_cache = LocalCacheManager(self.config) | ||
| self.data_manager = CachedFileReader( | ||
| self.local_cache_manager = CachedFileReader( |
There was a problem hiding this comment.
The variable is being renamed from data_manager to local_cache_manager, but this appears to be an incomplete refactoring. The codebase still has many references to data_manager (e.g., at lines 546, 574, 638, 932, 934, 952, 987, 1255, 1414 in this file). This will cause AttributeError exceptions when those code paths are executed. All references to data_manager should be updated to local_cache_manager to complete this refactoring.
| def get_protocol_from_path(self, path): | ||
| return get_protocol_from_path(path) | ||
|
|
||
| def register_ufs_fallback(self, ufs_info_list: list[UfsInfo]): |
There was a problem hiding this comment.
The type hint list[UfsInfo] uses the modern Python 3.9+ syntax. For compatibility with older Python versions (3.7-3.8), consider using List[UfsInfo] from the typing module instead, which is already imported at the top of the file. This ensures the code works across a wider range of Python versions unless the project explicitly requires Python 3.9+.
| class UFSManager: | ||
| """ | ||
| Class responsible for managing Ufs Info. | ||
| """ | ||
|
|
||
| def __init__(self, alluxio=None, config: Dict = None): | ||
| self.ufs_updater: Optional[BaseUFSUpdater] = None | ||
| if alluxio is not None: | ||
| self.ufs_updater = UFSUpdater(alluxio) | ||
| elif config is not None: | ||
| self.ufs_updater = LocalUFSUpdater(config) | ||
|
|
||
| def initialize_ufs_manager(self): | ||
| """ | ||
| Initialize UFS Manager with UFS Updater. | ||
| """ | ||
| if self.ufs_updater: | ||
| self.ufs_updater.start_updater() | ||
|
|
||
| def shutdown_ufs_manager(self): | ||
| """ | ||
| Shutdown UFS Manager and stop the UFS Updater. | ||
| """ | ||
| if self.ufs_updater: | ||
| self.ufs_updater.stop_updater() | ||
|
|
||
| def must_get_ufs_count(self): | ||
| if self.ufs_updater: | ||
| return self.ufs_updater.must_get_ufs_count() | ||
| return 0 | ||
|
|
||
| def must_get_ufs_from_path(self, path: str): | ||
| if self.ufs_updater: | ||
| return self.ufs_updater.must_get_ufs_from_path(path) | ||
| return None | ||
|
|
||
| def must_get_alluxio_path_from_ufs_full_path(self, path: str): | ||
| if self.ufs_updater: | ||
| return self.ufs_updater.must_get_alluxio_path_from_ufs_full_path( | ||
| path | ||
| ) | ||
| return None | ||
|
|
||
| def get_ufs_count(self): | ||
| if self.ufs_updater: | ||
| return self.ufs_updater.get_ufs_count() | ||
| return 0 | ||
|
|
||
| def get_ufs_from_cache(self, path: str): | ||
| if self.ufs_updater: | ||
| return self.ufs_updater.get_ufs_from_cache(path) | ||
| return None | ||
|
|
||
| def get_alluxio_path_from_ufs_full_path(self, path: str): | ||
| if self.ufs_updater: | ||
| return self.ufs_updater.get_alluxio_path_from_ufs_full_path(path) | ||
| return None |
There was a problem hiding this comment.
The new UFSManager class lacks test coverage. While LocalUFSUpdater and the existing UFSUpdater have tests, there are no tests for the UFSManager wrapper class itself. Consider adding tests that verify the initialization with both alluxio and config parameters, the delegation to the underlying updater, and the lifecycle methods (initialize_ufs_manager, shutdown_ufs_manager).
| test_skip_alluxio = test_options.get("skip_alluxio") is True | ||
| self.skip_alluxio = kwargs.get("skip_alluxio") | ||
| client = AlluxioClient(**kwargs) | ||
| self.ufs_updater = UFSUpdater(client) | ||
| self.ufs_updater.start_updater() | ||
| self.alluxio = None if skip_alluxio else client | ||
| if self.skip_alluxio is True: | ||
| ufs_config = kwargs.get("ufs_config", {}) | ||
| if not ufs_config: | ||
| raise ValueError( | ||
| "ufs_config must be provided when skip_alluxio is True." | ||
| ) | ||
| self.ufs_manager = UFSManager(config=ufs_config) | ||
| self.ufs_manager.initialize_ufs_manager() | ||
| self.alluxio = None | ||
| else: | ||
| self.ufs_manager = UFSManager(alluxio=client) | ||
| self.ufs_manager.initialize_ufs_manager() | ||
| self.alluxio = None if test_skip_alluxio else client |
There was a problem hiding this comment.
The new skip_alluxio parameter and its integration with ufs_config lack test coverage. While tests exist for test_options={"skip_alluxio": True}, there are no tests verifying: 1) the behavior when skip_alluxio=True is passed as a top-level parameter, 2) that ufs_config is properly validated and used, 3) that operations are correctly routed to UFS when skip_alluxio is True, and 4) that the error is raised when ufs_config is missing. Consider adding integration tests for this new functionality.
| client = AlluxioClient(**kwargs) | ||
| self.ufs_updater = UFSUpdater(client) | ||
| self.ufs_updater.start_updater() | ||
| self.alluxio = None if skip_alluxio else client | ||
| if self.skip_alluxio is True: | ||
| ufs_config = kwargs.get("ufs_config", {}) | ||
| if not ufs_config: | ||
| raise ValueError( | ||
| "ufs_config must be provided when skip_alluxio is True." | ||
| ) | ||
| self.ufs_manager = UFSManager(config=ufs_config) | ||
| self.ufs_manager.initialize_ufs_manager() | ||
| self.alluxio = None | ||
| else: | ||
| self.ufs_manager = UFSManager(alluxio=client) | ||
| self.ufs_manager.initialize_ufs_manager() | ||
| self.alluxio = None if test_skip_alluxio else client |
There was a problem hiding this comment.
When skip_alluxio is True, an AlluxioClient is still being instantiated on line 143 even though it will not be used. This is inefficient and could cause initialization failures or unnecessary resource allocation. The client should only be instantiated when it's actually needed (i.e., when skip_alluxio is False or when test_skip_alluxio is False).
| if "/" in path or "\\" in path: | ||
| return None | ||
| return path |
There was a problem hiding this comment.
The new logic at lines 86-88 returns the entire path as a protocol if it doesn't contain "://" and doesn't contain "/" or "\". This means get_protocol_from_path("s3") would return "s3", which seems correct for protocol-only strings. However, this could incorrectly identify other strings without slashes as protocols (e.g., a filename like "file.txt" would be considered a protocol). Consider adding validation to ensure the returned value is a known protocol, or document this behavior clearly.
| import fsspec | ||
| from fsspec import filesystem | ||
|
|
||
| from alluxiofs import AlluxioClient |
There was a problem hiding this comment.
This import creates a circular dependency: alluxiofs/__init__.py imports from alluxiofs.core, which imports from alluxiofs.client.ufs_manager, which now imports from alluxiofs (top-level). While Python can sometimes handle circular imports if they're structured carefully, this is fragile and can cause import failures depending on the import order. Consider importing directly from alluxiofs.client.core instead: from alluxiofs.client.core import AlluxioClient.
| from alluxiofs import AlluxioClient | |
| from alluxiofs.client.core import AlluxioClient |
| super().__init__() | ||
| assert ( | ||
| isinstance(alluxio, AlluxioClient) or alluxio is None | ||
| ), "alluxio must be an instance of AlluxioClient or None" |
There was a problem hiding this comment.
The assertion message could be more descriptive. Instead of just stating what the type should be, it should explain why this is a requirement or what the consequence of passing the wrong type would be. This helps developers understand the constraint better when debugging assertion failures.
| ), "alluxio must be an instance of AlluxioClient or None" | |
| ), ( | |
| "alluxio must be an instance of AlluxioClient or None so that " | |
| "UFSUpdater can access the Alluxio configuration and correctly " | |
| "initialize background UFS info refresh; passing any other type " | |
| "will prevent proper setup of periodic updates and logging." | |
| ) |
NO JIRA