unify tensor and non-tensor data paths into a single packed-buffer batch transfer#107
unify tensor and non-tensor data paths into a single packed-buffer batch transfer#107xupinjie wants to merge 4 commits into
Conversation
CLA Signature Guide@xupinjie , thanks for your pull request. The following commit(s) are not associated with a signed Contributor License Agreement (CLA).
To sign CLA, click here. To check if your email is configured correctly, refer to the FAQs. Once you've signed the CLA or updating your email, please comment |
|
| backend | step 1 | step 2 | step 3 | step 4 | step 5 | step 6 | step 7 | step 8 | step 9 | step 10 |
|---|---|---|---|---|---|---|---|---|---|---|
| baseline (no TQ) | 33.7 | 28.4 | 41.8 | 36.1 | 42.8 | 37.7 | 48.5 | 34.4 | 36.8 | 34.9 |
| TQ + SimpleStorage | 5.8 | 0.43 | 10.2 | 6.1 | 19.3 | 0.54 | 41.2 | 0.19 | 0.59 | 1.6 |
| TQ + Mooncake refactor | 0.63 | 0.55 | 5.5 | 17.1 | 24.5 | 5.1 | 32.4 | 13.0 | 0.55 | 0.24 |
| TQ + Mooncake refactor (v2) | 1.1 | 0.38 | 32.9 | 25.5 | 8.4 | 4.1 | 5.2 | 0.19 | 0.54 | 0.53 |
old_log_prob per-step transfer time (s)
| backend | step 1 | step 2 | step 3 | step 4 | step 5 | step 6 | step 7 | step 8 | step 9 | step 10 |
|---|---|---|---|---|---|---|---|---|---|---|
| baseline (no TQ) | 34.3 | 39.3 | 63.5 | 62.6 | 77.9 | 37.9 | 56.8 | 42.9 | 30.0 | 43.9 |
| TQ + SimpleStorage | 0.25 | 0.24 | 0.56 | 0.56 | 0.24 | 0.24 | 0.24 | 0.27 | 0.23 | 0.24 |
| TQ + Mooncake refactor | 0.87 | 0.42 | 0.46 | 0.92 | 0.42 | 0.46 | 0.54 | 0.42 | 0.43 | 0.90 |
| TQ + Mooncake refactor (v2) | 0.64 | 0.34 | 0.32 | 0.33 | 0.32 | 0.75 | 0.33 | 0.30 | 0.76 | 0.35 |
adv per-step transfer time (s)
| backend | step 1 | step 2 | step 3 | step 4 | step 5 | step 6 | step 7 | step 8 | step 9 | step 10 |
|---|---|---|---|---|---|---|---|---|---|---|
| baseline (no TQ) | — | — | — | — | — | — | — | — | — | — |
| TQ + SimpleStorage | 0.30 | 0.41 | 0.41 | 0.32 | 0.64 | 0.62 | 0.31 | 0.38 | 0.31 | 0.31 |
| TQ + Mooncake refactor | 0.50 | 0.57 | 0.53 | 0.49 | 0.79 | 0.51 | 0.52 | 0.48 | 0.56 | 0.51 |
| TQ + Mooncake refactor (v2) | 0.39 | 0.34 | 0.37 | 0.37 | 0.47 | 0.40 | 0.37 | 0.38 | 0.37 | 0.38 |
update_actor per-step transfer time (s)
| backend | step 1 | step 2 | step 3 | step 4 | step 5 | step 6 | step 7 | step 8 | step 9 | step 10 |
|---|---|---|---|---|---|---|---|---|---|---|
| baseline (no TQ) | 42.1 | 37.3 | 58.9 | 58.0 | 76.4 | 35.4 | 55.1 | 42.6 | 29.3 | 48.1 |
| TQ + SimpleStorage | 11.0 | 8.7 | 12.7 | 14.4 | 16.6 | 9.6 | 15.7 | 10.4 | 7.4 | 11.4 |
| TQ + Mooncake refactor | 7.1 | 5.8 | 8.0 | 7.9 | 9.3 | 6.3 | 7.9 | 6.1 | 5.0 | 6.9 |
| TQ + Mooncake refactor (v2) | 4.7 | 3.8 | 5.2 | 5.2 | 5.6 | 4.0 | 5.3 | 4.1 | 3.7 | 5.0 |
There was a problem hiding this comment.
Pull request overview
This PR refactors the Mooncake storage backend to unify tensor and non-tensor transfer into a single packed-buffer pathway, aiming to avoid Mooncake’s internal bytes-pool saturation under concurrent multi-MB GETs by always sending one contiguous registered CPU buffer per batch.
Changes:
- Added packed-buffer helpers in
serial_utilsto concatenate multiple encoded frames into a single contiguous buffer and split them back into memoryview slices. - Refactored
MooncakeStoreClientto encode all values viaserial_utils.encode, pack each value into a contiguous CPU buffer, and transfer viabatch_upsert_from/batch_get_intousing per-slice pointers and sizes. - Added deep-detach logic on decode to ensure decoded tensors/arrays own their storage after the packed receive buffer is discarded.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
transfer_queue/utils/serial_utils.py |
Introduces a packed-buffer format + pack/unpack helpers to consolidate multi-frame encodings into one buffer. |
transfer_queue/storage/clients/mooncake_client.py |
Switches Mooncake put/get to a unified packed-buffer transfer path and returns/consumes per-key packed_size metadata. |
Comments suppressed due to low confidence (1)
transfer_queue/utils/serial_utils.py:436
- unpack_from() trusts the packed header/table and will silently return truncated slices (or raise struct errors) if the buffer is corrupted or too small. Validating header/table sizes and each (offset, length) entry makes failures deterministic and easier to debug.
def unpack_from(source: bytestr) -> list[memoryview]:
"""Split a packed buffer back into N memoryview slices over ``source``."""
mv = memoryview(source)
item_count = struct.unpack_from(_PACK_HEADER_FMT, mv, 0)[0]
result: list[memoryview] = []
for i in range(item_count):
offset, length = struct.unpack_from(_PACK_ENTRY_FMT, mv, _PACK_HEADER_SIZE + i * _PACK_ENTRY_SIZE)
result.append(mv[offset : offset + length])
return result
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def pack_into(target: bytestr, items: Sequence[bytestr]) -> None: | ||
| """Concatenate ``items`` into ``target``, which must be at least ``calc_packed_size(items)`` bytes.""" | ||
| target_mv = memoryview(target) | ||
| struct.pack_into(_PACK_HEADER_FMT, target_mv, 0, len(items)) | ||
|
|
||
| entry_offset = _PACK_HEADER_SIZE | ||
| payload_offset = _PACK_HEADER_SIZE + len(items) * _PACK_ENTRY_SIZE | ||
|
|
||
| target_tensor = torch.frombuffer(target_mv, dtype=torch.uint8) |
| batch_items = [serial_utils.encode(v) for v in batch_values] | ||
| batch_sizes = [serial_utils.calc_packed_size(items) for items in batch_items] | ||
| # TODO: switch to a MooncakeStore-allocated buffer once such an API exists. | ||
| big_buf = torch.empty(sum(batch_sizes), dtype=torch.uint8) | ||
| big_buf_mv = big_buf.numpy().data | ||
| base_ptr = big_buf.data_ptr() | ||
|
|
| # Packed buffer layout: | ||
| # [item_count: uint32 LE] | ||
| # [N × (payload_offset: uint32 LE, payload_size: uint32 LE)] | ||
| # [payload_0 ... payload_{N-1}] | ||
| _PACK_HEADER_FMT = "<I" | ||
| _PACK_HEADER_SIZE = struct.calcsize(_PACK_HEADER_FMT) | ||
| _PACK_ENTRY_FMT = "<II" | ||
| _PACK_ENTRY_SIZE = struct.calcsize(_PACK_ENTRY_FMT) | ||
|
|
||
|
|
||
| def calc_packed_size(items: Sequence[bytestr]) -> int: | ||
| """Total bytes required to pack ``items`` into one buffer.""" | ||
| return _PACK_HEADER_SIZE + len(items) * _PACK_ENTRY_SIZE + sum(memoryview(item).nbytes for item in items) | ||
|
|
| RETRY_DELAY_SECONDS = 1.0 | ||
|
|
||
|
|
||
| def _detach_from_buffer(obj: Any) -> Any: |
There was a problem hiding this comment.
Why we need to detach? How about initialize a new buffer when there is a new get operation?
| if not type(self)._logged_first_put: | ||
| logger.info("[TQ-MOONCAKE-REFACTOR] put() entered: unified pack-into data path") | ||
| type(self)._logged_first_put = True |
There was a problem hiding this comment.
Remove the debug codes
| batch_items = [serial_utils.encode(v) for v in batch_values] | ||
| batch_sizes = [serial_utils.calc_packed_size(items) for items in batch_items] | ||
| # TODO: switch to a MooncakeStore-allocated buffer once such an API exists. | ||
| big_buf = torch.empty(sum(batch_sizes), dtype=torch.uint8) | ||
| big_buf_mv = big_buf.numpy().data | ||
| base_ptr = big_buf.data_ptr() | ||
|
|
||
| batch_ptrs: list[int] = [] | ||
| offset = 0 | ||
| for items, size in zip(batch_items, batch_sizes, strict=True): | ||
| serial_utils.pack_into(big_buf_mv[offset : offset + size], items) | ||
| batch_ptrs.append(base_ptr + offset) | ||
| offset += size |
There was a problem hiding this comment.
Implement the core logics expect buffer management in serial_utils.py, as encode_info & decode_from
| batch_ptrs: list[int] = [] | ||
| offset = 0 | ||
| for items, size in zip(batch_items, batch_sizes, strict=True): | ||
| serial_utils.pack_into(big_buf_mv[offset : offset + size], items) |
There was a problem hiding this comment.
We can put this in thread workers to speed up the process
CLA Signature Guide@xupinjie , thanks for your pull request. The following commit(s) are not associated with a signed Contributor License Agreement (CLA).
To sign CLA, click here. To check if your email is configured correctly, refer to the FAQs. Once you've signed the CLA or updating your email, please comment |
16a2b7c to
0a1e7ac
Compare
CLA Signature Guide@xupinjie , thanks for your pull request. The following commit(s) are not associated with a signed Contributor License Agreement (CLA).
To sign CLA, click here. To check if your email is configured correctly, refer to the FAQs. Once you've signed the CLA or updating your email, please comment |
CLA Signature Guide@xupinjie , thanks for your pull request. The following commit(s) are not associated with a signed Contributor License Agreement (CLA).
To sign CLA, click here. To check if your email is configured correctly, refer to the FAQs. Once you've signed the CLA or updating your email, please comment |
|
|
||
|
|
||
| def pack_into(target: bytestr, items: Sequence[bytestr]) -> None: | ||
| """Concatenate ``items`` into ``target``, which must be at least ``calc_packed_size(items)`` bytes.""" |
There was a problem hiding this comment.
Is there any checks to make sure the target >= calc_packed_size(items)?
There was a problem hiding this comment.
target -> target_buffer?
There was a problem hiding this comment.
we can check sum(item_mv.nbytes) == target_mv.nbytes.
| payload_offset += nbytes | ||
|
|
||
|
|
||
| def unpack_from(source: bytestr) -> list[memoryview]: |
There was a problem hiding this comment.
source -> source_buffer
|
|
||
|
|
||
| def batch_encode_into( | ||
| values: list[Any], |
There was a problem hiding this comment.
values -> objs to align with encode
|
|
||
| def batch_encode_into( | ||
| values: list[Any], | ||
| alloc: Callable[[int], tuple[Any, int]], |
There was a problem hiding this comment.
alloc -> alloc_buff_func
| return result | ||
|
|
||
|
|
||
| def batch_encode_into( |
There was a problem hiding this comment.
TO BE CHECK: is this thread-safe?
| batch_ptrs.append(base_ptr + offset) | ||
| offset += size | ||
|
|
||
| return buf, batch_ptrs, batch_sizes |
There was a problem hiding this comment.
TO BE CHECK: the life cycle of the allocated buffer
| return buf, batch_ptrs, batch_sizes | ||
|
|
||
|
|
||
| def batch_decode_from(buffers: Sequence[Any]) -> list[Any]: |
|
|
||
| # bytes results arrive in non-tensor submit order, which matches the order of | ||
| # non-tensor values; walk values once to scatter packed_size back to its key slot. | ||
| custom_meta: list[dict | None] = [None] * len(keys) |
There was a problem hiding this comment.
custom_meta -> custom_backend_meta because custom_meta is sample-level, while custom_backend_meta is sample-field-level
| bytes_futures.append(executor.submit(self._put_bytes_thread_worker, batch_keys, batch_values)) | ||
|
|
||
| for future in as_completed(futures): | ||
| for future in tensor_futures: |
There was a problem hiding this comment.
I think we can let _put_tensors_thread_worker return None to avoid the multiple for loops below.
- initialize the custom_backend_meta list
- get thread return values in a for loop and directly put it to custom_backend_meta
| def alloc(total: int) -> tuple[Tensor, int]: | ||
| tensors, ptrs, _, _ = allocate_empty_tensors([torch.uint8], [(total,)]) | ||
| return tensors[0], ptrs[0] |
There was a problem hiding this comment.
optimize the var names & comments
There was a problem hiding this comment.
BTW, the allocate_empty_tensors receives the dtype and shape, which does not equal to the nuber of bytes calculated by calc_packed_size
batch_sizes = [calc_packed_size(items) for items in batch_items]
buf, base_ptr = alloc(sum(batch_sizes))| batch_shapes = [(sz,) for sz in batch_packed_sizes] | ||
| batch_dtypes = [torch.uint8] * len(batch_keys) | ||
| batch_nbytes = get_nbytes(batch_dtypes, batch_shapes) | ||
| batch_buffer_tensors, batch_buffer_ptrs, region_ptrs, region_sizes = allocate_empty_tensors( |
CLA Signature Guide@xupinjie , thanks for your pull request. The following commit(s) are not associated with a signed Contributor License Agreement (CLA).
To sign CLA, click here. To check if your email is configured correctly, refer to the FAQs. Once you've signed the CLA or updating your email, please comment |
| return result | ||
|
|
||
|
|
||
| def batch_encode_into( |
There was a problem hiding this comment.
I believe the current method is not sufficiently general. alloc is a memory allocation function, and for Yuanrong , it requires passing in a batch of keys. I think we could consider passing in a list of buffers, which would also be able to handle non-contiguous buffers.
There was a problem hiding this comment.
This may not be a very general usage pattern and we can pack this for yuanrong client
| for items, size in zip(batch_items, batch_sizes, strict=True): | ||
| pack_into(buf_mv[offset : offset + size], items) | ||
| batch_ptrs.append(base_ptr + offset) | ||
| offset += size |
There was a problem hiding this comment.
pack_into operations are serial. When the value of BATCH_SIZE_LIMIT x NUM_THREAD of the upper-layer StorageClient is greater than num_values, multi-thread optimization will significantly deteriorate. Btw, BATCH_SIZE_LIMIT is 400 for mooncake and 10000 for yuanrong.
There was a problem hiding this comment.
I suggest provide a num_workers params, based on which we set multi-thread tasks for pack_into. Therefore, we will have num_workers (for pack_into) x MAX_WORKER_THREADS (for batch spliting) threads.
| return buf, batch_ptrs, batch_sizes | ||
|
|
||
|
|
||
| def batch_decode_from(buffers: Sequence[Any]) -> list[Any]: |
There was a problem hiding this comment.
I think batch_encode_into and batch_decode_from should be "inverses" of each other, like x = batch_decode_from(batch_encode_into(x))
CLA Signature Guide@xupinjie , thanks for your pull request. The following commit(s) are not associated with a signed Contributor License Agreement (CLA).
To sign CLA, click here. To check if your email is configured correctly, refer to the FAQs. Once you've signed the CLA or updating your email, please comment |
| alloc_buff_func: Callable[[list[int]], list[Any]], | ||
| *, | ||
| num_workers: int = 1, | ||
| ) -> tuple[list[Any], list[int]]: |
There was a problem hiding this comment.
return_val buffers must be NumPy or be viewable by MemoryView.
There was a problem hiding this comment.
The type of buffers, "Any," conflicts with the type annotation in pack_into.
unpack_from and batch_decode_from has a same promble.
| alive while ``ptrs`` are in flight (and for register/unregister). | ||
| * ``ptrs[i]`` / ``sizes[i]`` are the absolute pointer and byte length | ||
| of object ``i``'s packed slice within ``buf``. | ||
| ``(buffers, batch_sizes)``: ``buffers`` is what ``alloc_buff_func`` |
There was a problem hiding this comment.
Please align the format of docsting with existing files. For reference: https://github.com/Ascend/TransferQueue/blob/main/transfer_queue/client.py#L1226
| target_mv = memoryview(target_buffer) | ||
| required = calc_packed_size(items) | ||
| if target_mv.nbytes < required: | ||
| raise ValueError( | ||
| f"pack_into: target buffer has {target_mv.nbytes} bytes, requires {required}" | ||
| ) |
| for i in range(item_count): | ||
| offset, length = struct.unpack_from(_PACK_ENTRY_FMT, mv, _PACK_HEADER_SIZE + i * _PACK_ENTRY_SIZE) | ||
| result.append(mv[offset : offset + length]) | ||
| return result |
| if non_tensor_indices and (custom_backend_meta is None or len(custom_backend_meta) != len(keys)): | ||
| raise ValueError( | ||
| "custom_backend_meta with per-key packed_size is required when any dtype is None." | ||
| ) | ||
|
|
| def test_calc_packed_size_then_pack_unpack_roundtrip(): | ||
| items = [b"hello", b"world!", b"x"] | ||
| size = serial_utils.calc_packed_size(items) | ||
| buf = bytearray(size) | ||
| serial_utils.pack_into(buf, items) | ||
| recovered = serial_utils.unpack_from(buf) | ||
| assert [bytes(mv) for mv in recovered] == items | ||
|
|
||
|
|
||
| def test_pack_unpack_roundtrip_random_bytes(): | ||
| rng = np.random.default_rng(0) | ||
| items = [rng.bytes(int(rng.integers(1, 2048))) for _ in range(16)] | ||
| size = serial_utils.calc_packed_size(items) | ||
| buf = bytearray(size) | ||
| serial_utils.pack_into(buf, items) | ||
|
|
||
| recovered = serial_utils.unpack_from(buf) | ||
| assert len(recovered) == len(items) | ||
| for r, item in zip(recovered, items, strict=True): | ||
| assert bytes(r) == item |
There was a problem hiding this comment.
We can only preserve one of these
| assert b.nbytes == s | ||
|
|
||
|
|
||
| def test_batch_encode_into_alloc_called_with_per_obj_sizes(): |
There was a problem hiding this comment.
We can delete this test
| pytest.param([torch.arange(10, dtype=torch.float32)], id="single-tensor"), | ||
| pytest.param( | ||
| [ | ||
| torch.arange(100, dtype=torch.float32), |
There was a problem hiding this comment.
Please add test coverage for
- Nested tensor (jagged/strided)
- 4D+ dim tensor
| @pytest.mark.parametrize("values", _ROUNDTRIP_PARAMS) | ||
| def test_batch_encode_into_parallel_matches_serial(values): | ||
| serial_buffers, serial_sizes = serial_utils.batch_encode_into( | ||
| values, _yuanrong_alloc, num_workers=1 | ||
| ) | ||
| par_buffers, par_sizes = serial_utils.batch_encode_into( | ||
| values, _yuanrong_alloc, num_workers=4 | ||
| ) | ||
|
|
||
| assert serial_sizes == par_sizes | ||
| assert [bytes(b) for b in serial_buffers] == [bytes(b) for b in par_buffers] | ||
|
|
||
|
|
||
| def test_batch_encode_into_parallel_roundtrip_many_objects(): | ||
| rng = np.random.default_rng(42) | ||
| values = [] | ||
| for _ in range(64): | ||
| n = int(rng.integers(1, 257)) | ||
| values.append(torch.from_numpy(rng.random(n).astype(np.float32))) | ||
|
|
||
| decoded, *_ = _roundtrip(values, _yuanrong_alloc, "yuanrong", num_workers=8) | ||
| _assert_equal_payloads(decoded, values) |
There was a problem hiding this comment.
We can merge the two tests by setting @pytest.mark.parametrize("values", _ROUNDTRIP_PARAMS) to the roundtrip test
| packed_sizes = [] | ||
| for future in bytes_futures: | ||
| packed_sizes.extend(future.result()) |
There was a problem hiding this comment.
Consider using itertools to speed up:
from itertools import chain
packed_sizes = list(chain.from_iterable(future.result() for future in bytes_futures))| for i, value in enumerate(values): | ||
| if not isinstance(value, torch.Tensor): | ||
| custom_backend_meta[i] = {"packed_size": next(sizes_iter)} | ||
|
|
There was a problem hiding this comment.
Consider using:
sizes_iter = iter(packed_sizes)
custom_backend_meta = [
{"packed_size": next(sizes_iter)} if not isinstance(value, torch.Tensor) else None
for value in values
]
Problem
Multimodal RL puts nested-dict values into TransferQueue (e.g.
{"pixel_values": Tensor, "image_grid_thw": Tensor, ...}). The oldMooncakeStoreClientonly zero-copied plain tensors; anything else — includingdicts that contain tensors — got pickled through Mooncake's internal bytes pool,
which saturated under concurrent multi-MB GETs and forced a
VERL_TQ_MC_GET_RETRIESretry workaround upstream.Refactor
Treat every value as one opaque payload. Each value is encoded by the existing
zero-copy msgpack encoder, the whole batch is packed into one contiguous CPU
buffer, registered once, and shipped through Mooncake's RDMA-backed
batch_upsert_from/batch_get_into. The pickled bytes path and the retryworkaround it required are gone.
Test