Integrate prefetcher engine with zonal buckets#805
Integrate prefetcher engine with zonal buckets#805googlyrahman wants to merge 9 commits intofsspec:mainfrom
Conversation
2469216 to
675b731
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #805 +/- ##
==========================================
+ Coverage 86.91% 88.32% +1.41%
==========================================
Files 15 15
Lines 2743 2972 +229
==========================================
+ Hits 2384 2625 +241
+ Misses 359 347 -12 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
/gcbrun |
Good idea to check what happens with read(0), though - does it count as sequential? Can enough of them make the prefetch size go to 0? |
It doesn't count as sequential and neither prefetch size goes to zero, as we return early with an empty bytes in case of |
Rebasing from main may address this error. |
|
@martindurant please review this |
martindurant
left a comment
There was a problem hiding this comment.
I have read the code but not the tests so far.
Again, there is a lot of complexity here...
| self.gcsfs, bucket, key, generation, self.pool_size | ||
| ) | ||
| object_size = self.mrd.persisted_size | ||
| asyn.sync(self.gcsfs.loop, self.mrd_pool.initialize) |
There was a problem hiding this comment.
Shouldn't this only be done on first write?
There was a problem hiding this comment.
Ideally, yes, but take a look at the existing code. ZonalFile initializes the MRD, gets its persisted size, and passes it to super(). I just kept the workflow as is while integrating the pool.
I think it was done for some reason here #744
| fixed_key_metadata=None, | ||
| generation=None, | ||
| kms_key_name=None, | ||
| pool_size=zb_hns_utils.DEFAULT_CONCURRENCY, |
There was a problem hiding this comment.
Elsewhere instead of "pool", "concurrency" is the preferred term, right?
There was a problem hiding this comment.
Yes, we have two terms: pool and concurrency. The pool represents the number of underlying MRDs, while concurrency represents the number of splits we need to make for downloading.
There was a problem hiding this comment.
But the pool represents concurrency too
There was a problem hiding this comment.
They actually serve two distinct purposes.
Think of an MRD as a single connection stream to Google Cloud Storage. The pool_size parameter decides how many of these streams you want to open, while concurrency dictates how many requests your application is trying to make at the same time.
For example, if you set concurrency=4 and pool_size=1, your application will send all four concurrent requests through a single MRD. The MRD is capable of handling multiple requests simultaneously, but funneling everything through one stream can create a bottleneck and slow things down.
On the other hand, if you set concurrency=4 and pool_size=4, those four requests will be distributed evenly across four separate MRD connections in a round-robin fashion.
A helpful way to look at it is to treat pool_size as your underlying resource limit (similar to the worker count in a ThreadPoolExecutor) and concurrency as the volume of tasks your application is trying to execute at once.
There was a problem hiding this comment.
Sounds like something that would be helped by a well-crafted diagram :)
There was a problem hiding this comment.
Took a note of it, and created an internal ticket https://b.corp.google.com/issues/508054742 - will ensure the diagram is added in followup PR.
|
@martindurant all your comments are addressed, please take a look again - Thanks for taking time to review this. |
martindurant
left a comment
There was a problem hiding this comment.
I had a closer look at MRDpool and have a few more comments.
Am I right in thinking that this pool concept should/will be upstreamed to the grpc layer?
| raise RuntimeError("Cannot initialize a closed MRDPool.") | ||
|
|
||
| if not self._initialized: | ||
| mrd = await self._create_mrd() |
There was a problem hiding this comment.
lock should apply only to this inner block: if closed or already initialised, we don't need it.
There was a problem hiding this comment.
This suggestion might actually introduces a race condition. If multiple tasks call initialize() concurrently, the first task will yield control during the await self._create_mrd() step. If the check is outside the lock, a second task could jump in during that await, see that _initialized is still False, and erroneously spin up a duplicate connection.
| if not self._initialized: | ||
| mrd = await self._create_mrd() | ||
| self.persisted_size = mrd.persisted_size | ||
| self._free_mrds.put_nowait(mrd) |
There was a problem hiding this comment.
Can raise? We are assuming the q should be empty to start, but it's not enforced here.
There was a problem hiding this comment.
Hmmm, ideally shouldn't happen - but would put a check for active_count==0
| mrd = await self._create_mrd() | ||
| self.persisted_size = mrd.persisted_size | ||
| self._free_mrds.put_nowait(mrd) | ||
| self._active_count += 1 |
There was a problem hiding this comment.
Any way to tie active count (or _initialized) to the q size?
There was a problem hiding this comment.
The active count tracks created MRDs, while the queue shows the free MRDs waiting. Because of this, there is no direct tie between them. Let me know if I'm off base here.
| mrd = self._all_mrds[self._rr_index] | ||
| self._rr_index = (self._rr_index + 1) % len(self._all_mrds) | ||
|
|
||
| if spawn_new: |
There was a problem hiding this comment.
I don't like the word "spawn". We are creating connections, not new execution contexts.
There was a problem hiding this comment.
I'll change it to create_new variable
| # or if we just spawned it. This prevents duplicate entries in the queue | ||
| # when multiple concurrent tasks share the same MRD via round-robin. | ||
| if (spawn_new or used_from_queue) and not self._closed: | ||
| self._free_mrds.put_nowait(mrd) |
There was a problem hiding this comment.
The number of elements in _free_mrds will never exceed the pool_size, so this scenario is mathematically impossible. We only ever place an MRD into the queue if we just took it out, or if it is a newly created connection. Since the total number of MRDs we create is strictly capped at pool_size, the queue can never become full.
| elif self.mrd_supports_multi_request and self._all_mrds: | ||
| # Pool is full, queue is empty, and we are allowed to share a busy MRD. | ||
| mrd = self._all_mrds[self._rr_index] | ||
| self._rr_index = (self._rr_index + 1) % len(self._all_mrds) |
There was a problem hiding this comment.
_rr_index is undocumented. Round-robin on which connection to use?
There was a problem hiding this comment.
I'll document it as round robin.
| async with self._lock: | ||
| if self._closed: | ||
| raise RuntimeError("MRDPool is closed.") |
There was a problem hiding this comment.
| async with self._lock: | |
| if self._closed: | |
| raise RuntimeError("MRDPool is closed.") | |
| if self._closed: | |
| raise RuntimeError("MRDPool is closed.") | |
| async with self._lock: |
There was a problem hiding this comment.
This might introduce race condition. If an await yields execution while a close operation is in progress, this task could resume in between without knowing the state changed. It would then blindly create a new MRD and add it to the queue, even though the MRD pool is already closed.
| results = await asyncio.gather(*tasks, return_exceptions=True) | ||
| for result in results: | ||
| if isinstance(result, Exception): | ||
| raise result |
There was a problem hiding this comment.
Why not return_exceptions=False if we raise them anyway
There was a problem hiding this comment.
If we set return_exceptions=False, the cleanup will fail fast on the first error and leave the remaining tasks orphaned. We need to avoid this. If a user catches that initial exception and keeps their application running, those abandoned background tasks will stick around, and their unhandled exceptions will not be properly garbage collected.
There was a problem hiding this comment.
This little loop really seems like something that asyncio ought to do for us. So be it.
There was a problem hiding this comment.
From Python 3.11, we can use asyncio.TaskGroup to ensure that no dangling tasks are left.
There was a problem hiding this comment.
Yeah i know that, we'd discussed already in the past. Python 3.10 end of life is scheduled later this year, until then we can't drop support for python 3.10
| fixed_key_metadata=None, | ||
| generation=None, | ||
| kms_key_name=None, | ||
| pool_size=zb_hns_utils.DEFAULT_CONCURRENCY, |
There was a problem hiding this comment.
But the pool represents concurrency too
The pool concept would be upstreamed to python SDK in future, and not the grpc layer. |
|
@martindurant, Addressed your comments - please take a look again |
|
/gcbrun |
1 similar comment
|
/gcbrun |
|
The e2e tests are failing for this issue - #830 @martindurant please check the existing comments, if you feel everything is addressed. We can merge this. |
This PR integrates the prefetcher engine with Zonal Buckets to improve read throughput and introduces concurrency support for fetching data. To maximize performance and avoid Python's memory and concurrency bottlenecks, this PR also introduces a highly optimized DirectMemmoveBuffer and an MRDPool for managing connection states.
Zero-Copy, GIL-Free Memory Management (DirectMemmoveBuffer)
Standard Python buffers (like
bytearrayorio.BytesIO) were GIL-bound in my experiments and require multiple memory copies. For example, if you want to assemble fifty 2MB chunks into a 100MB object using abytearray, you must create a completely separate 100MB copy when finally retrieving the bytes object because the origin object where you filled the data wasbytesarray. Furthermore, these standard I/O objects heavily acquire the Global Interpreter Lock (e.g., holding the GIL for 20s during 60s of high I/O).To solve this, we introduced
DirectMemmoveBuffer. It bypasses these limitations by utilizingctypes.memmoveto write data directly to pre-allocated memory off the main thread, effectively bypassing the GIL and eliminating the final copy overhead.Concurrency Support for Zonal Buckets
_concurrent_mrd_fetch: A new method to handle downloading multiple data chunks concurrently ( source)._fetch_range_split: Upgraded this method (used by the readahead_chunked cache) to fully support concurrent chunk fetching ( source)._cat_file: Updated to support concurrency out of the box ( source).Connection Pooling (MRDPool)
Introduced MRDPool to manage a pool of AsyncMultiRangeDownloader (MRD) instances. This pool enables safe and efficient concurrent fetching by reusing connections and scaling downloaders on demand.
Prefetcher Engine Integration & Heuristic Tuning
Zonal Bucket Integration: Fully integrated the prefetcher engine into the zonal file read path.
Tuned Prefetching Trigger (MIN_STREAKS_FOR_PREFETCHING): Modified the prefetching heuristic to start on the 3rd sequential read ( >= 3) instead of the 2nd ( >= 2) ( source). Why? The readahead_chunked cache typically issues two requests to the prefetch engine simultaneously (the required chunk and the readahead chunk). Previously, this immediately and falsely signaled to the prefetcher that the user was performing sequential reads, triggering aggressive prefetching too early. Adjusting the limit to 3 prevents these false positives.