Skip to content

unify tensor and non-tensor data paths into a single packed-buffer batch transfer#107

Open
xupinjie wants to merge 4 commits into
Ascend:mainfrom
xupinjie:pinjie/fix_multi_modal_inputs_v2
Open

unify tensor and non-tensor data paths into a single packed-buffer batch transfer#107
xupinjie wants to merge 4 commits into
Ascend:mainfrom
xupinjie:pinjie/fix_multi_modal_inputs_v2

Conversation

@xupinjie
Copy link
Copy Markdown

@xupinjie xupinjie commented May 22, 2026

Problem

Multimodal RL puts nested-dict values into TransferQueue (e.g.
{"pixel_values": Tensor, "image_grid_thw": Tensor, ...}). The old
MooncakeStoreClient only zero-copied plain tensors; anything else — including
dicts that contain tensors — got pickled through Mooncake's internal bytes pool,
which saturated under concurrent multi-MB GETs and forced a
VERL_TQ_MC_GET_RETRIES retry workaround upstream.

Refactor

Treat every value as one opaque payload. Each value is encoded by the existing
zero-copy msgpack encoder, the whole batch is packed into one contiguous CPU
buffer, registered once, and shipped through Mooncake's RDMA-backed
batch_upsert_from / batch_get_into. The pickled bytes path and the retry
workaround it required are gone.

Test

  • 30B-VL + onethinker, 2×8 GPU, RDMA: 3-step and 10-step runs clean — no retry / allocator failure / AssertionError.
datapath_perf_3way_v2

@ascend-robot
Copy link
Copy Markdown

CLA Signature Guide

@xupinjie , thanks for your pull request.

The following commit(s) are not associated with a signed Contributor License Agreement (CLA).

Commit Reason
16a2b7cd refactor(mooncake): unify tensor... the email used in the commit is not linked to a signed CLA!
please verify that it matches the email you used when signing the CLA.

To sign CLA, click here.

To check if your email is configured correctly, refer to the FAQs.

Once you've signed the CLA or updating your email, please comment /check-cla to revalidate CLA status.

@xupinjie
Copy link
Copy Markdown
Author

xupinjie commented May 22, 2026

gen per-step transfer time (s)

backend step 1 step 2 step 3 step 4 step 5 step 6 step 7 step 8 step 9 step 10
baseline (no TQ) 33.7 28.4 41.8 36.1 42.8 37.7 48.5 34.4 36.8 34.9
TQ + SimpleStorage 5.8 0.43 10.2 6.1 19.3 0.54 41.2 0.19 0.59 1.6
TQ + Mooncake refactor 0.63 0.55 5.5 17.1 24.5 5.1 32.4 13.0 0.55 0.24
TQ + Mooncake refactor (v2) 1.1 0.38 32.9 25.5 8.4 4.1 5.2 0.19 0.54 0.53

old_log_prob per-step transfer time (s)

backend step 1 step 2 step 3 step 4 step 5 step 6 step 7 step 8 step 9 step 10
baseline (no TQ) 34.3 39.3 63.5 62.6 77.9 37.9 56.8 42.9 30.0 43.9
TQ + SimpleStorage 0.25 0.24 0.56 0.56 0.24 0.24 0.24 0.27 0.23 0.24
TQ + Mooncake refactor 0.87 0.42 0.46 0.92 0.42 0.46 0.54 0.42 0.43 0.90
TQ + Mooncake refactor (v2) 0.64 0.34 0.32 0.33 0.32 0.75 0.33 0.30 0.76 0.35

adv per-step transfer time (s)

backend step 1 step 2 step 3 step 4 step 5 step 6 step 7 step 8 step 9 step 10
baseline (no TQ)
TQ + SimpleStorage 0.30 0.41 0.41 0.32 0.64 0.62 0.31 0.38 0.31 0.31
TQ + Mooncake refactor 0.50 0.57 0.53 0.49 0.79 0.51 0.52 0.48 0.56 0.51
TQ + Mooncake refactor (v2) 0.39 0.34 0.37 0.37 0.47 0.40 0.37 0.38 0.37 0.38

update_actor per-step transfer time (s)

backend step 1 step 2 step 3 step 4 step 5 step 6 step 7 step 8 step 9 step 10
baseline (no TQ) 42.1 37.3 58.9 58.0 76.4 35.4 55.1 42.6 29.3 48.1
TQ + SimpleStorage 11.0 8.7 12.7 14.4 16.6 9.6 15.7 10.4 7.4 11.4
TQ + Mooncake refactor 7.1 5.8 8.0 7.9 9.3 6.3 7.9 6.1 5.0 6.9
TQ + Mooncake refactor (v2) 4.7 3.8 5.2 5.2 5.6 4.0 5.3 4.1 3.7 5.0

@xupinjie xupinjie marked this pull request as ready for review May 22, 2026 16:40
@0oshowero0 0oshowero0 requested a review from Copilot May 25, 2026 01:24
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the Mooncake storage backend to unify tensor and non-tensor transfer into a single packed-buffer pathway, aiming to avoid Mooncake’s internal bytes-pool saturation under concurrent multi-MB GETs by always sending one contiguous registered CPU buffer per batch.

Changes:

  • Added packed-buffer helpers in serial_utils to concatenate multiple encoded frames into a single contiguous buffer and split them back into memoryview slices.
  • Refactored MooncakeStoreClient to encode all values via serial_utils.encode, pack each value into a contiguous CPU buffer, and transfer via batch_upsert_from / batch_get_into using per-slice pointers and sizes.
  • Added deep-detach logic on decode to ensure decoded tensors/arrays own their storage after the packed receive buffer is discarded.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
transfer_queue/utils/serial_utils.py Introduces a packed-buffer format + pack/unpack helpers to consolidate multi-frame encodings into one buffer.
transfer_queue/storage/clients/mooncake_client.py Switches Mooncake put/get to a unified packed-buffer transfer path and returns/consumes per-key packed_size metadata.
Comments suppressed due to low confidence (1)

transfer_queue/utils/serial_utils.py:436

  • unpack_from() trusts the packed header/table and will silently return truncated slices (or raise struct errors) if the buffer is corrupted or too small. Validating header/table sizes and each (offset, length) entry makes failures deterministic and easier to debug.
def unpack_from(source: bytestr) -> list[memoryview]:
    """Split a packed buffer back into N memoryview slices over ``source``."""
    mv = memoryview(source)
    item_count = struct.unpack_from(_PACK_HEADER_FMT, mv, 0)[0]
    result: list[memoryview] = []
    for i in range(item_count):
        offset, length = struct.unpack_from(_PACK_ENTRY_FMT, mv, _PACK_HEADER_SIZE + i * _PACK_ENTRY_SIZE)
        result.append(mv[offset : offset + length])
    return result

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread transfer_queue/utils/serial_utils.py Outdated
Comment on lines +408 to +416
def pack_into(target: bytestr, items: Sequence[bytestr]) -> None:
"""Concatenate ``items`` into ``target``, which must be at least ``calc_packed_size(items)`` bytes."""
target_mv = memoryview(target)
struct.pack_into(_PACK_HEADER_FMT, target_mv, 0, len(items))

entry_offset = _PACK_HEADER_SIZE
payload_offset = _PACK_HEADER_SIZE + len(items) * _PACK_ENTRY_SIZE

target_tensor = torch.frombuffer(target_mv, dtype=torch.uint8)
Comment on lines +154 to +160
batch_items = [serial_utils.encode(v) for v in batch_values]
batch_sizes = [serial_utils.calc_packed_size(items) for items in batch_items]
# TODO: switch to a MooncakeStore-allocated buffer once such an API exists.
big_buf = torch.empty(sum(batch_sizes), dtype=torch.uint8)
big_buf_mv = big_buf.numpy().data
base_ptr = big_buf.data_ptr()

Comment on lines +393 to +406
# Packed buffer layout:
# [item_count: uint32 LE]
# [N × (payload_offset: uint32 LE, payload_size: uint32 LE)]
# [payload_0 ... payload_{N-1}]
_PACK_HEADER_FMT = "<I"
_PACK_HEADER_SIZE = struct.calcsize(_PACK_HEADER_FMT)
_PACK_ENTRY_FMT = "<II"
_PACK_ENTRY_SIZE = struct.calcsize(_PACK_ENTRY_FMT)


def calc_packed_size(items: Sequence[bytestr]) -> int:
"""Total bytes required to pack ``items`` into one buffer."""
return _PACK_HEADER_SIZE + len(items) * _PACK_ENTRY_SIZE + sum(memoryview(item).nbytes for item in items)

Comment thread transfer_queue/utils/serial_utils.py
Comment thread transfer_queue/storage/clients/mooncake_client.py
RETRY_DELAY_SECONDS = 1.0


def _detach_from_buffer(obj: Any) -> Any:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to detach? How about initialize a new buffer when there is a new get operation?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I delete detach. Thx.

Comment on lines +139 to +141
if not type(self)._logged_first_put:
logger.info("[TQ-MOONCAKE-REFACTOR] put() entered: unified pack-into data path")
type(self)._logged_first_put = True
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the debug codes

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thx.

Comment on lines +154 to +166
batch_items = [serial_utils.encode(v) for v in batch_values]
batch_sizes = [serial_utils.calc_packed_size(items) for items in batch_items]
# TODO: switch to a MooncakeStore-allocated buffer once such an API exists.
big_buf = torch.empty(sum(batch_sizes), dtype=torch.uint8)
big_buf_mv = big_buf.numpy().data
base_ptr = big_buf.data_ptr()

batch_ptrs: list[int] = []
offset = 0
for items, size in zip(batch_items, batch_sizes, strict=True):
serial_utils.pack_into(big_buf_mv[offset : offset + size], items)
batch_ptrs.append(base_ptr + offset)
offset += size
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement the core logics expect buffer management in serial_utils.py, as encode_info & decode_from

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thx.

batch_ptrs: list[int] = []
offset = 0
for items, size in zip(batch_items, batch_sizes, strict=True):
serial_utils.pack_into(big_buf_mv[offset : offset + size], items)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can put this in thread workers to speed up the process

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thx.

Comment thread transfer_queue/storage/clients/mooncake_client.py
@xupinjie xupinjie closed this May 25, 2026
@xupinjie xupinjie deleted the pinjie/fix_multi_modal_inputs_v2 branch May 25, 2026 14:13
@xupinjie xupinjie restored the pinjie/fix_multi_modal_inputs_v2 branch May 25, 2026 14:36
@xupinjie xupinjie reopened this May 25, 2026
@ascend-robot
Copy link
Copy Markdown

CLA Signature Guide

@xupinjie , thanks for your pull request.

The following commit(s) are not associated with a signed Contributor License Agreement (CLA).

Commit Reason
16a2b7cd refactor(mooncake): unify tensor... the email used in the commit is not linked to a signed CLA!
please verify that it matches the email you used when signing the CLA.

To sign CLA, click here.

To check if your email is configured correctly, refer to the FAQs.

Once you've signed the CLA or updating your email, please comment /check-cla to revalidate CLA status.

@xupinjie xupinjie force-pushed the pinjie/fix_multi_modal_inputs_v2 branch from 16a2b7c to 0a1e7ac Compare May 25, 2026 14:39
@ascend-robot
Copy link
Copy Markdown

CLA Signature Guide

@xupinjie , thanks for your pull request.

The following commit(s) are not associated with a signed Contributor License Agreement (CLA).

Commit Reason
0a1e7ac9 refactor(mooncake): non-tensor d... the email used in the commit is not linked to a signed CLA!
please verify that it matches the email you used when signing the CLA.

To sign CLA, click here.

To check if your email is configured correctly, refer to the FAQs.

Once you've signed the CLA or updating your email, please comment /check-cla to revalidate CLA status.

@ascend-robot
Copy link
Copy Markdown

CLA Signature Guide

@xupinjie , thanks for your pull request.

The following commit(s) are not associated with a signed Contributor License Agreement (CLA).

Commit Reason
0a1e7ac9 refactor(mooncake): non-tensor d... the email used in the commit is not linked to a signed CLA!
please verify that it matches the email you used when signing the CLA.

To sign CLA, click here.

To check if your email is configured correctly, refer to the FAQs.

Once you've signed the CLA or updating your email, please comment /check-cla to revalidate CLA status.

Comment thread transfer_queue/utils/serial_utils.py Outdated


def pack_into(target: bytestr, items: Sequence[bytestr]) -> None:
"""Concatenate ``items`` into ``target``, which must be at least ``calc_packed_size(items)`` bytes."""
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any checks to make sure the target >= calc_packed_size(items)?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

target -> target_buffer?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can check sum(item_mv.nbytes) == target_mv.nbytes.

Comment thread transfer_queue/utils/serial_utils.py Outdated
payload_offset += nbytes


def unpack_from(source: bytestr) -> list[memoryview]:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source -> source_buffer

Comment thread transfer_queue/utils/serial_utils.py Outdated


def batch_encode_into(
values: list[Any],
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

values -> objs to align with encode

Comment thread transfer_queue/utils/serial_utils.py Outdated

def batch_encode_into(
values: list[Any],
alloc: Callable[[int], tuple[Any, int]],
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alloc -> alloc_buff_func

return result


def batch_encode_into(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TO BE CHECK: is this thread-safe?

Comment thread transfer_queue/utils/serial_utils.py Outdated
batch_ptrs.append(base_ptr + offset)
offset += size

return buf, batch_ptrs, batch_sizes
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TO BE CHECK: the life cycle of the allocated buffer

Comment thread transfer_queue/utils/serial_utils.py Outdated
return buf, batch_ptrs, batch_sizes


def batch_decode_from(buffers: Sequence[Any]) -> list[Any]:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source_buffers


# bytes results arrive in non-tensor submit order, which matches the order of
# non-tensor values; walk values once to scatter packed_size back to its key slot.
custom_meta: list[dict | None] = [None] * len(keys)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

custom_meta -> custom_backend_meta because custom_meta is sample-level, while custom_backend_meta is sample-field-level

bytes_futures.append(executor.submit(self._put_bytes_thread_worker, batch_keys, batch_values))

for future in as_completed(futures):
for future in tensor_futures:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can let _put_tensors_thread_worker return None to avoid the multiple for loops below.

  1. initialize the custom_backend_meta list
  2. get thread return values in a for loop and directly put it to custom_backend_meta

Comment on lines +183 to +185
def alloc(total: int) -> tuple[Tensor, int]:
tensors, ptrs, _, _ = allocate_empty_tensors([torch.uint8], [(total,)])
return tensors[0], ptrs[0]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optimize the var names & comments

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the allocate_empty_tensors receives the dtype and shape, which does not equal to the nuber of bytes calculated by calc_packed_size

    batch_sizes = [calc_packed_size(items) for items in batch_items]
    buf, base_ptr = alloc(sum(batch_sizes))

batch_shapes = [(sz,) for sz in batch_packed_sizes]
batch_dtypes = [torch.uint8] * len(batch_keys)
batch_nbytes = get_nbytes(batch_dtypes, batch_shapes)
batch_buffer_tensors, batch_buffer_ptrs, region_ptrs, region_sizes = allocate_empty_tensors(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comments

@ascend-robot
Copy link
Copy Markdown

CLA Signature Guide

@xupinjie , thanks for your pull request.

The following commit(s) are not associated with a signed Contributor License Agreement (CLA).

Commit Reason
0a1e7ac9 refactor(mooncake): non-tensor d... the email used in the commit is not linked to a signed CLA!
please verify that it matches the email you used when signing the CLA.

To sign CLA, click here.

To check if your email is configured correctly, refer to the FAQs.

Once you've signed the CLA or updating your email, please comment /check-cla to revalidate CLA status.

return result


def batch_encode_into(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the current method is not sufficiently general. alloc is a memory allocation function, and for Yuanrong , it requires passing in a batch of keys. I think we could consider passing in a list of buffers, which would also be able to handle non-contiguous buffers.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be a very general usage pattern and we can pack this for yuanrong client

Comment thread transfer_queue/utils/serial_utils.py Outdated
Comment on lines +472 to +475
for items, size in zip(batch_items, batch_sizes, strict=True):
pack_into(buf_mv[offset : offset + size], items)
batch_ptrs.append(base_ptr + offset)
offset += size
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pack_into operations are serial. When the value of BATCH_SIZE_LIMIT x NUM_THREAD of the upper-layer StorageClient is greater than num_values, multi-thread optimization will significantly deteriorate. Btw, BATCH_SIZE_LIMIT is 400 for mooncake and 10000 for yuanrong.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest provide a num_workers params, based on which we set multi-thread tasks for pack_into. Therefore, we will have num_workers (for pack_into) x MAX_WORKER_THREADS (for batch spliting) threads.

Comment thread transfer_queue/utils/serial_utils.py Outdated
return buf, batch_ptrs, batch_sizes


def batch_decode_from(buffers: Sequence[Any]) -> list[Any]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think batch_encode_into and batch_decode_from should be "inverses" of each other, like x = batch_decode_from(batch_encode_into(x))

@ascend-robot
Copy link
Copy Markdown

CLA Signature Guide

@xupinjie , thanks for your pull request.

The following commit(s) are not associated with a signed Contributor License Agreement (CLA).

Commit Reason
0a1e7ac9 refactor(mooncake): non-tensor d... the email used in the commit is not linked to a signed CLA!
please verify that it matches the email you used when signing the CLA.

To sign CLA, click here.

To check if your email is configured correctly, refer to the FAQs.

Once you've signed the CLA or updating your email, please comment /check-cla to revalidate CLA status.

alloc_buff_func: Callable[[list[int]], list[Any]],
*,
num_workers: int = 1,
) -> tuple[list[Any], list[int]]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return_val buffers must be NumPy or be viewable by MemoryView.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type of buffers, "Any," conflicts with the type annotation in pack_into.

unpack_from and batch_decode_from has a same promble.

alive while ``ptrs`` are in flight (and for register/unregister).
* ``ptrs[i]`` / ``sizes[i]`` are the absolute pointer and byte length
of object ``i``'s packed slice within ``buf``.
``(buffers, batch_sizes)``: ``buffers`` is what ``alloc_buff_func``
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please align the format of docsting with existing files. For reference: https://github.com/Ascend/TransferQueue/blob/main/transfer_queue/client.py#L1226

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

Comment on lines +411 to +416
target_mv = memoryview(target_buffer)
required = calc_packed_size(items)
if target_mv.nbytes < required:
raise ValueError(
f"pack_into: target buffer has {target_mv.nbytes} bytes, requires {required}"
)
Comment on lines +439 to +442
for i in range(item_count):
offset, length = struct.unpack_from(_PACK_ENTRY_FMT, mv, _PACK_HEADER_SIZE + i * _PACK_ENTRY_SIZE)
result.append(mv[offset : offset + length])
return result
Comment on lines +233 to +237
if non_tensor_indices and (custom_backend_meta is None or len(custom_backend_meta) != len(keys)):
raise ValueError(
"custom_backend_meta with per-key packed_size is required when any dtype is None."
)

Comment on lines +37 to +56
def test_calc_packed_size_then_pack_unpack_roundtrip():
items = [b"hello", b"world!", b"x"]
size = serial_utils.calc_packed_size(items)
buf = bytearray(size)
serial_utils.pack_into(buf, items)
recovered = serial_utils.unpack_from(buf)
assert [bytes(mv) for mv in recovered] == items


def test_pack_unpack_roundtrip_random_bytes():
rng = np.random.default_rng(0)
items = [rng.bytes(int(rng.integers(1, 2048))) for _ in range(16)]
size = serial_utils.calc_packed_size(items)
buf = bytearray(size)
serial_utils.pack_into(buf, items)

recovered = serial_utils.unpack_from(buf)
assert len(recovered) == len(items)
for r, item in zip(recovered, items, strict=True):
assert bytes(r) == item
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can only preserve one of these

assert b.nbytes == s


def test_batch_encode_into_alloc_called_with_per_obj_sizes():
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can delete this test

pytest.param([torch.arange(10, dtype=torch.float32)], id="single-tensor"),
pytest.param(
[
torch.arange(100, dtype=torch.float32),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add test coverage for

  1. Nested tensor (jagged/strided)
  2. 4D+ dim tensor

Comment on lines +218 to +239
@pytest.mark.parametrize("values", _ROUNDTRIP_PARAMS)
def test_batch_encode_into_parallel_matches_serial(values):
serial_buffers, serial_sizes = serial_utils.batch_encode_into(
values, _yuanrong_alloc, num_workers=1
)
par_buffers, par_sizes = serial_utils.batch_encode_into(
values, _yuanrong_alloc, num_workers=4
)

assert serial_sizes == par_sizes
assert [bytes(b) for b in serial_buffers] == [bytes(b) for b in par_buffers]


def test_batch_encode_into_parallel_roundtrip_many_objects():
rng = np.random.default_rng(42)
values = []
for _ in range(64):
n = int(rng.integers(1, 257))
values.append(torch.from_numpy(rng.random(n).astype(np.float32)))

decoded, *_ = _roundtrip(values, _yuanrong_alloc, "yuanrong", num_workers=8)
_assert_equal_payloads(decoded, values)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge the two tests by setting @pytest.mark.parametrize("values", _ROUNDTRIP_PARAMS) to the roundtrip test

Comment on lines +147 to +149
packed_sizes = []
for future in bytes_futures:
packed_sizes.extend(future.result())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using itertools to speed up:

from itertools import chain

packed_sizes = list(chain.from_iterable(future.result() for future in bytes_futures))

for i, value in enumerate(values):
if not isinstance(value, torch.Tensor):
custom_backend_meta[i] = {"packed_size": next(sizes_iter)}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using:

sizes_iter = iter(packed_sizes)
custom_backend_meta = [
    {"packed_size": next(sizes_iter)} if not isinstance(value, torch.Tensor) else None
    for value in values
]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants