Skip to content

Integrate prefetcher engine with zonal buckets#805

Open
googlyrahman wants to merge 9 commits intofsspec:mainfrom
ankitaluthra1:zonal
Open

Integrate prefetcher engine with zonal buckets#805
googlyrahman wants to merge 9 commits intofsspec:mainfrom
ankitaluthra1:zonal

Conversation

@googlyrahman
Copy link
Copy Markdown
Collaborator

@googlyrahman googlyrahman commented Apr 13, 2026

This PR integrates the prefetcher engine with Zonal Buckets to improve read throughput and introduces concurrency support for fetching data. To maximize performance and avoid Python's memory and concurrency bottlenecks, this PR also introduces a highly optimized DirectMemmoveBuffer and an MRDPool for managing connection states.

Zero-Copy, GIL-Free Memory Management (DirectMemmoveBuffer)

Standard Python buffers (like bytearray or io.BytesIO) were GIL-bound in my experiments and require multiple memory copies. For example, if you want to assemble fifty 2MB chunks into a 100MB object using a bytearray, you must create a completely separate 100MB copy when finally retrieving the bytes object because the origin object where you filled the data was bytesarray. Furthermore, these standard I/O objects heavily acquire the Global Interpreter Lock (e.g., holding the GIL for 20s during 60s of high I/O).

To solve this, we introduced DirectMemmoveBuffer. It bypasses these limitations by utilizing ctypes.memmove to write data directly to pre-allocated memory off the main thread, effectively bypassing the GIL and eliminating the final copy overhead.

Concurrency Support for Zonal Buckets

  • Implemented _concurrent_mrd_fetch: A new method to handle downloading multiple data chunks concurrently ( source).
  • Refactored _fetch_range_split: Upgraded this method (used by the readahead_chunked cache) to fully support concurrent chunk fetching ( source).
  • Refactored _cat_file: Updated to support concurrency out of the box ( source).

Connection Pooling (MRDPool)

Introduced MRDPool to manage a pool of AsyncMultiRangeDownloader (MRD) instances. This pool enables safe and efficient concurrent fetching by reusing connections and scaling downloaders on demand.

Prefetcher Engine Integration & Heuristic Tuning

  • Zonal Bucket Integration: Fully integrated the prefetcher engine into the zonal file read path.

  • Tuned Prefetching Trigger (MIN_STREAKS_FOR_PREFETCHING): Modified the prefetching heuristic to start on the 3rd sequential read ( >= 3) instead of the 2nd ( >= 2) ( source). Why? The readahead_chunked cache typically issues two requests to the prefetch engine simultaneously (the required chunk and the readahead chunk). Previously, this immediately and falsely signaled to the prefetcher that the user was performing sequential reads, triggering aggressive prefetching too early. Adjusting the limit to 3 prevents these false positives.

@googlyrahman googlyrahman force-pushed the zonal branch 5 times, most recently from 2469216 to 675b731 Compare April 14, 2026 03:29
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 14, 2026

Codecov Report

❌ Patch coverage is 98.56631% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.32%. Comparing base (286ee3a) to head (8d9b2ef).

Files with missing lines Patch % Lines
gcsfs/zb_hns_utils.py 97.82% 3 Missing ⚠️
gcsfs/extended_gcsfs.py 99.09% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #805      +/-   ##
==========================================
+ Coverage   86.91%   88.32%   +1.41%     
==========================================
  Files          15       15              
  Lines        2743     2972     +229     
==========================================
+ Hits         2384     2625     +241     
+ Misses        359      347      -12     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread gcsfs/zb_hns_utils.py Outdated
Comment thread gcsfs/zb_hns_utils.py Outdated
Comment thread gcsfs/zb_hns_utils.py Outdated
Comment thread gcsfs/extended_gcsfs.py Outdated
Comment thread gcsfs/zb_hns_utils.py Outdated
Comment thread gcsfs/extended_gcsfs.py
Comment thread gcsfs/zb_hns_utils.py Outdated
Comment thread gcsfs/zb_hns_utils.py Outdated
Comment thread gcsfs/extended_gcsfs.py
@zhixiangli
Copy link
Copy Markdown
Collaborator

/gcbrun

@googlyrahman googlyrahman marked this pull request as ready for review April 21, 2026 11:06
@martindurant
Copy link
Copy Markdown
Member

if user passes buffer of size 0, it should not consider it as EOF

Good idea to check what happens with read(0), though - does it count as sequential? Can enough of them make the prefetch size go to 0?

@googlyrahman
Copy link
Copy Markdown
Collaborator Author

Good idea to check what happens with read(0), though - does it count as sequential? Can enough of them make the prefetch size go to 0?

It doesn't count as sequential and neither prefetch size goes to zero, as we return early with an empty bytes in case of size <= 0 (aka start>=end)

@zhixiangli
Copy link
Copy Markdown
Collaborator

zhixiangli commented Apr 21, 2026

docs/readthedocs.org:gcsfs — Read the Docs build failed!

Rebasing from main may address this error.

@googlyrahman
Copy link
Copy Markdown
Collaborator Author

@martindurant please review this

Copy link
Copy Markdown
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have read the code but not the tests so far.

Again, there is a lot of complexity here...

Comment thread gcsfs/zonal_file.py
self.gcsfs, bucket, key, generation, self.pool_size
)
object_size = self.mrd.persisted_size
asyn.sync(self.gcsfs.loop, self.mrd_pool.initialize)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this only be done on first write?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, yes, but take a look at the existing code. ZonalFile initializes the MRD, gets its persisted size, and passes it to super(). I just kept the workflow as is while integrating the pool.

I think it was done for some reason here #744

Comment thread gcsfs/zonal_file.py
fixed_key_metadata=None,
generation=None,
kms_key_name=None,
pool_size=zb_hns_utils.DEFAULT_CONCURRENCY,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elsewhere instead of "pool", "concurrency" is the preferred term, right?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have two terms: pool and concurrency. The pool represents the number of underlying MRDs, while concurrency represents the number of splits we need to make for downloading.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the pool represents concurrency too

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They actually serve two distinct purposes.

Think of an MRD as a single connection stream to Google Cloud Storage. The pool_size parameter decides how many of these streams you want to open, while concurrency dictates how many requests your application is trying to make at the same time.

For example, if you set concurrency=4 and pool_size=1, your application will send all four concurrent requests through a single MRD. The MRD is capable of handling multiple requests simultaneously, but funneling everything through one stream can create a bottleneck and slow things down.

On the other hand, if you set concurrency=4 and pool_size=4, those four requests will be distributed evenly across four separate MRD connections in a round-robin fashion.

A helpful way to look at it is to treat pool_size as your underlying resource limit (similar to the worker count in a ThreadPoolExecutor) and concurrency as the volume of tasks your application is trying to execute at once.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like something that would be helped by a well-crafted diagram :)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a note of it, and created an internal ticket https://b.corp.google.com/issues/508054742 - will ensure the diagram is added in followup PR.

Comment thread gcsfs/zonal_file.py
Comment thread gcsfs/zonal_file.py
Comment thread gcsfs/zonal_file.py Outdated
Comment thread gcsfs/extended_gcsfs.py
Comment thread gcsfs/extended_gcsfs.py
Comment thread gcsfs/extended_gcsfs.py
Comment thread gcsfs/extended_gcsfs.py
Comment thread gcsfs/extended_gcsfs.py Outdated
@googlyrahman
Copy link
Copy Markdown
Collaborator Author

@martindurant all your comments are addressed, please take a look again - Thanks for taking time to review this.

Copy link
Copy Markdown
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a closer look at MRDpool and have a few more comments.
Am I right in thinking that this pool concept should/will be upstreamed to the grpc layer?

Comment thread gcsfs/zb_hns_utils.py
raise RuntimeError("Cannot initialize a closed MRDPool.")

if not self._initialized:
mrd = await self._create_mrd()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lock should apply only to this inner block: if closed or already initialised, we don't need it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggestion might actually introduces a race condition. If multiple tasks call initialize() concurrently, the first task will yield control during the await self._create_mrd() step. If the check is outside the lock, a second task could jump in during that await, see that _initialized is still False, and erroneously spin up a duplicate connection.

Comment thread gcsfs/zb_hns_utils.py
if not self._initialized:
mrd = await self._create_mrd()
self.persisted_size = mrd.persisted_size
self._free_mrds.put_nowait(mrd)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can raise? We are assuming the q should be empty to start, but it's not enforced here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, ideally shouldn't happen - but would put a check for active_count==0

Comment thread gcsfs/zb_hns_utils.py
mrd = await self._create_mrd()
self.persisted_size = mrd.persisted_size
self._free_mrds.put_nowait(mrd)
self._active_count += 1
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to tie active count (or _initialized) to the q size?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The active count tracks created MRDs, while the queue shows the free MRDs waiting. Because of this, there is no direct tie between them. Let me know if I'm off base here.

Comment thread gcsfs/zb_hns_utils.py Outdated
mrd = self._all_mrds[self._rr_index]
self._rr_index = (self._rr_index + 1) % len(self._all_mrds)

if spawn_new:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the word "spawn". We are creating connections, not new execution contexts.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change it to create_new variable

Comment thread gcsfs/zb_hns_utils.py
# or if we just spawned it. This prevents duplicate entries in the queue
# when multiple concurrent tasks share the same MRD via round-robin.
if (spawn_new or used_from_queue) and not self._closed:
self._free_mrds.put_nowait(mrd)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q could have become full

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of elements in _free_mrds will never exceed the pool_size, so this scenario is mathematically impossible. We only ever place an MRD into the queue if we just took it out, or if it is a newly created connection. Since the total number of MRDs we create is strictly capped at pool_size, the queue can never become full.

Comment thread gcsfs/zb_hns_utils.py
elif self.mrd_supports_multi_request and self._all_mrds:
# Pool is full, queue is empty, and we are allowed to share a busy MRD.
mrd = self._all_mrds[self._rr_index]
self._rr_index = (self._rr_index + 1) % len(self._all_mrds)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_rr_index is undocumented. Round-robin on which connection to use?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll document it as round robin.

Comment thread gcsfs/zb_hns_utils.py
Comment on lines +392 to +394
async with self._lock:
if self._closed:
raise RuntimeError("MRDPool is closed.")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async with self._lock:
if self._closed:
raise RuntimeError("MRDPool is closed.")
if self._closed:
raise RuntimeError("MRDPool is closed.")
async with self._lock:

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might introduce race condition. If an await yields execution while a close operation is in progress, this task could resume in between without knowing the state changed. It would then blindly create a new MRD and add it to the queue, even though the MRD pool is already closed.

Comment thread gcsfs/zb_hns_utils.py
Comment on lines +440 to +443
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
raise result
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return_exceptions=False if we raise them anyway

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we set return_exceptions=False, the cleanup will fail fast on the first error and leave the remaining tasks orphaned. We need to avoid this. If a user catches that initial exception and keeps their application running, those abandoned background tasks will stick around, and their unhandled exceptions will not be properly garbage collected.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This little loop really seems like something that asyncio ought to do for us. So be it.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Python 3.11, we can use asyncio.TaskGroup to ensure that no dangling tasks are left.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah i know that, we'd discussed already in the past. Python 3.10 end of life is scheduled later this year, until then we can't drop support for python 3.10

Comment thread gcsfs/zonal_file.py
fixed_key_metadata=None,
generation=None,
kms_key_name=None,
pool_size=zb_hns_utils.DEFAULT_CONCURRENCY,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the pool represents concurrency too

@googlyrahman
Copy link
Copy Markdown
Collaborator Author

I had a closer look at MRDpool and have a few more comments.
Am I right in thinking that this pool concept should/will be upstreamed to the grpc layer?

The pool concept would be upstreamed to python SDK in future, and not the grpc layer.

@googlyrahman
Copy link
Copy Markdown
Collaborator Author

@martindurant, Addressed your comments - please take a look again

@googlyrahman
Copy link
Copy Markdown
Collaborator Author

/gcbrun

1 similar comment
@googlyrahman
Copy link
Copy Markdown
Collaborator Author

/gcbrun

@googlyrahman
Copy link
Copy Markdown
Collaborator Author

The e2e tests are failing for this issue - #830

@martindurant please check the existing comments, if you feel everything is addressed. We can merge this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants