Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions client/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2988,6 +2988,80 @@ TEST_F(ClientTest, PrefixSizeInconsistent) {
ASSERT_FALSE(pub2.ok());
}

// Virtual channels share storage on a multiplexer, so they must share a
// single prefix layout. The mux owns the layout: the first publisher to any
// vchan on a given mux fixes cs/ms, subsequent publishers (on the same vchan
// or any sibling vchan) must agree, and subscribers on any vchan see the
// authoritative sizes from the mux.
TEST_F(ClientTest, VirtualChannelMuxPrefixIsShared) {
auto client = EVAL_AND_ASSERT_OK(subspace::Client::Create(Socket()));

// First publisher on vchan_a with checksum_size=20, metadata_size=50:
// 48 + 20 + 50 = 118 → Aligned<64> = 128.
absl::StatusOr<Publisher> pub_a = client->CreatePublisher(
"vchan_a", {.slot_size = 256, .num_slots = 10, .mux = "shared_mux",
.checksum_size = 20, .metadata_size = 50});
ASSERT_OK(pub_a);
ASSERT_EQ(128, pub_a->PrefixSize());
ASSERT_EQ(20, pub_a->ChecksumSize());
ASSERT_EQ(50, pub_a->MetadataSize());

// A subscriber on a different vchan on the same mux must see the mux's
// sizes, not the per-vchan defaults (4/0/64).
absl::StatusOr<Subscriber> sub_b =
client->CreateSubscriber("vchan_b", {.mux = "shared_mux"});
ASSERT_OK(sub_b);
ASSERT_EQ(128, sub_b->PrefixSize());
ASSERT_EQ(20, sub_b->ChecksumSize());
ASSERT_EQ(50, sub_b->MetadataSize());

// A second publisher on a different vchan on the same mux with matching
// sizes must succeed and report the same prefix layout.
absl::StatusOr<Publisher> pub_b = client->CreatePublisher(
"vchan_b", {.slot_size = 256, .num_slots = 10, .mux = "shared_mux",
.checksum_size = 20, .metadata_size = 50});
ASSERT_OK(pub_b);
ASSERT_EQ(128, pub_b->PrefixSize());

// A second publisher on yet another vchan with mismatching cs/ms must be
// rejected, because virtual channels on a mux cannot disagree on layout.
absl::StatusOr<Publisher> pub_c = client->CreatePublisher(
"vchan_c", {.slot_size = 256, .num_slots = 10, .mux = "shared_mux",
.checksum_size = 32});
ASSERT_FALSE(pub_c.ok());

absl::StatusOr<Publisher> pub_d = client->CreatePublisher(
"vchan_d", {.slot_size = 256, .num_slots = 10, .mux = "shared_mux",
.metadata_size = 100});
ASSERT_FALSE(pub_d.ok());
}

// Symmetric scenario: a subscriber creates the placeholder mux + virtual
// channel first, then a publisher arrives with non-default sizes. Subscribers
// on sibling virtual channels created after that must see the publisher's
// sizes via the mux.
TEST_F(ClientTest, VirtualChannelMuxPrefixSubscriberFirst) {
auto client = EVAL_AND_ASSERT_OK(subspace::Client::Create(Socket()));

absl::StatusOr<Subscriber> sub_a =
client->CreateSubscriber("vchan_a", {.mux = "sub_first_mux"});
ASSERT_OK(sub_a);

absl::StatusOr<Publisher> pub_a = client->CreatePublisher(
"vchan_a", {.slot_size = 256, .num_slots = 10, .mux = "sub_first_mux",
.checksum_size = 20, .metadata_size = 50});
ASSERT_OK(pub_a);
ASSERT_EQ(128, pub_a->PrefixSize());

// Subscriber on a sibling vchan should now see the mux's sizes.
absl::StatusOr<Subscriber> sub_b =
client->CreateSubscriber("vchan_b", {.mux = "sub_first_mux"});
ASSERT_OK(sub_b);
ASSERT_EQ(128, sub_b->PrefixSize());
ASSERT_EQ(20, sub_b->ChecksumSize());
ASSERT_EQ(50, sub_b->MetadataSize());
}

TEST_F(ClientTest, SubscriberGetsSizes) {
auto client = EVAL_AND_ASSERT_OK(subspace::Client::Create(Socket()));

Expand Down
22 changes: 14 additions & 8 deletions common/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,14 +467,20 @@ class Channel : public std::enable_shared_from_this<Channel> {
}

// Total prefix area size in bytes (always a multiple of 64).
int32_t PrefixSize() const { return prefix_size_; }
void SetPrefixSize(int32_t size) { prefix_size_ = size; }

int32_t ChecksumSize() const { return checksum_size_; }
void SetChecksumSize(int32_t size) { checksum_size_ = size; }

int32_t MetadataSize() const { return metadata_size_; }
void SetMetadataSize(int32_t size) { metadata_size_ = size; }
//
// These accessors are virtual so that VirtualChannel (in the server) can
// forward them to its underlying multiplexer: storage and slot layout are
// owned by the mux, so all virtual channels sharing a mux must agree on
// the prefix layout. Plain (non-mux) channels and client-side channels
// simply use their own fields.
virtual int32_t PrefixSize() const { return prefix_size_; }
virtual void SetPrefixSize(int32_t size) { prefix_size_ = size; }

virtual int32_t ChecksumSize() const { return checksum_size_; }
virtual void SetChecksumSize(int32_t size) { checksum_size_ = size; }

virtual int32_t MetadataSize() const { return metadata_size_; }
virtual void SetMetadataSize(int32_t size) { metadata_size_ = size; }

uint64_t BufferSizeToSlotSize(uint64_t size) const {
if (size < NumSlots() * static_cast<uint64_t>(PrefixSize())) {
Expand Down
13 changes: 13 additions & 0 deletions server/server_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,19 @@ class VirtualChannel : public ServerChannel {
int NumSlots() const override { return mux_->NumSlots(); }
int GetChannelId() const override { return mux_->GetChannelId(); }

// The prefix / checksum / metadata sizes describe the per-slot prefix area
// laid out in the buffers owned by the multiplexer. Every virtual channel
// sharing a mux must use the same layout, so we delegate both reads and
// writes to the mux. This keeps consistency checks, subscriber responses,
// and any later layout-driven computations honest regardless of which
// virtual channel handle is used.
int32_t PrefixSize() const override { return mux_->PrefixSize(); }
void SetPrefixSize(int32_t size) override { mux_->SetPrefixSize(size); }
int32_t ChecksumSize() const override { return mux_->ChecksumSize(); }
void SetChecksumSize(int32_t size) override { mux_->SetChecksumSize(size); }
int32_t MetadataSize() const override { return mux_->MetadataSize(); }
void SetMetadataSize(int32_t size) override { mux_->SetMetadataSize(size); }

std::string ResolvedName() const override { return mux_->ResolvedName(); }
void RemoveBuffer(uint64_t session_id) override {
mux_->RemoveBuffer(session_id);
Expand Down
Loading