diff --git a/client/client_test.cc b/client/client_test.cc index 20daa1c..eb8efd3 100644 --- a/client/client_test.cc +++ b/client/client_test.cc @@ -2988,6 +2988,80 @@ TEST_F(ClientTest, PrefixSizeInconsistent) { ASSERT_FALSE(pub2.ok()); } +// Virtual channels share storage on a multiplexer, so they must share a +// single prefix layout. The mux owns the layout: the first publisher to any +// vchan on a given mux fixes cs/ms, subsequent publishers (on the same vchan +// or any sibling vchan) must agree, and subscribers on any vchan see the +// authoritative sizes from the mux. +TEST_F(ClientTest, VirtualChannelMuxPrefixIsShared) { + auto client = EVAL_AND_ASSERT_OK(subspace::Client::Create(Socket())); + + // First publisher on vchan_a with checksum_size=20, metadata_size=50: + // 48 + 20 + 50 = 118 → Aligned<64> = 128. + absl::StatusOr pub_a = client->CreatePublisher( + "vchan_a", {.slot_size = 256, .num_slots = 10, .mux = "shared_mux", + .checksum_size = 20, .metadata_size = 50}); + ASSERT_OK(pub_a); + ASSERT_EQ(128, pub_a->PrefixSize()); + ASSERT_EQ(20, pub_a->ChecksumSize()); + ASSERT_EQ(50, pub_a->MetadataSize()); + + // A subscriber on a different vchan on the same mux must see the mux's + // sizes, not the per-vchan defaults (4/0/64). + absl::StatusOr sub_b = + client->CreateSubscriber("vchan_b", {.mux = "shared_mux"}); + ASSERT_OK(sub_b); + ASSERT_EQ(128, sub_b->PrefixSize()); + ASSERT_EQ(20, sub_b->ChecksumSize()); + ASSERT_EQ(50, sub_b->MetadataSize()); + + // A second publisher on a different vchan on the same mux with matching + // sizes must succeed and report the same prefix layout. + absl::StatusOr pub_b = client->CreatePublisher( + "vchan_b", {.slot_size = 256, .num_slots = 10, .mux = "shared_mux", + .checksum_size = 20, .metadata_size = 50}); + ASSERT_OK(pub_b); + ASSERT_EQ(128, pub_b->PrefixSize()); + + // A second publisher on yet another vchan with mismatching cs/ms must be + // rejected, because virtual channels on a mux cannot disagree on layout. + absl::StatusOr pub_c = client->CreatePublisher( + "vchan_c", {.slot_size = 256, .num_slots = 10, .mux = "shared_mux", + .checksum_size = 32}); + ASSERT_FALSE(pub_c.ok()); + + absl::StatusOr pub_d = client->CreatePublisher( + "vchan_d", {.slot_size = 256, .num_slots = 10, .mux = "shared_mux", + .metadata_size = 100}); + ASSERT_FALSE(pub_d.ok()); +} + +// Symmetric scenario: a subscriber creates the placeholder mux + virtual +// channel first, then a publisher arrives with non-default sizes. Subscribers +// on sibling virtual channels created after that must see the publisher's +// sizes via the mux. +TEST_F(ClientTest, VirtualChannelMuxPrefixSubscriberFirst) { + auto client = EVAL_AND_ASSERT_OK(subspace::Client::Create(Socket())); + + absl::StatusOr sub_a = + client->CreateSubscriber("vchan_a", {.mux = "sub_first_mux"}); + ASSERT_OK(sub_a); + + absl::StatusOr pub_a = client->CreatePublisher( + "vchan_a", {.slot_size = 256, .num_slots = 10, .mux = "sub_first_mux", + .checksum_size = 20, .metadata_size = 50}); + ASSERT_OK(pub_a); + ASSERT_EQ(128, pub_a->PrefixSize()); + + // Subscriber on a sibling vchan should now see the mux's sizes. + absl::StatusOr sub_b = + client->CreateSubscriber("vchan_b", {.mux = "sub_first_mux"}); + ASSERT_OK(sub_b); + ASSERT_EQ(128, sub_b->PrefixSize()); + ASSERT_EQ(20, sub_b->ChecksumSize()); + ASSERT_EQ(50, sub_b->MetadataSize()); +} + TEST_F(ClientTest, SubscriberGetsSizes) { auto client = EVAL_AND_ASSERT_OK(subspace::Client::Create(Socket())); diff --git a/common/channel.h b/common/channel.h index 04d3189..21929cc 100644 --- a/common/channel.h +++ b/common/channel.h @@ -467,14 +467,20 @@ class Channel : public std::enable_shared_from_this { } // Total prefix area size in bytes (always a multiple of 64). - int32_t PrefixSize() const { return prefix_size_; } - void SetPrefixSize(int32_t size) { prefix_size_ = size; } - - int32_t ChecksumSize() const { return checksum_size_; } - void SetChecksumSize(int32_t size) { checksum_size_ = size; } - - int32_t MetadataSize() const { return metadata_size_; } - void SetMetadataSize(int32_t size) { metadata_size_ = size; } + // + // These accessors are virtual so that VirtualChannel (in the server) can + // forward them to its underlying multiplexer: storage and slot layout are + // owned by the mux, so all virtual channels sharing a mux must agree on + // the prefix layout. Plain (non-mux) channels and client-side channels + // simply use their own fields. + virtual int32_t PrefixSize() const { return prefix_size_; } + virtual void SetPrefixSize(int32_t size) { prefix_size_ = size; } + + virtual int32_t ChecksumSize() const { return checksum_size_; } + virtual void SetChecksumSize(int32_t size) { checksum_size_ = size; } + + virtual int32_t MetadataSize() const { return metadata_size_; } + virtual void SetMetadataSize(int32_t size) { metadata_size_ = size; } uint64_t BufferSizeToSlotSize(uint64_t size) const { if (size < NumSlots() * static_cast(PrefixSize())) { diff --git a/server/server_channel.h b/server/server_channel.h index 45d5ee9..a601fcc 100644 --- a/server/server_channel.h +++ b/server/server_channel.h @@ -447,6 +447,19 @@ class VirtualChannel : public ServerChannel { int NumSlots() const override { return mux_->NumSlots(); } int GetChannelId() const override { return mux_->GetChannelId(); } + // The prefix / checksum / metadata sizes describe the per-slot prefix area + // laid out in the buffers owned by the multiplexer. Every virtual channel + // sharing a mux must use the same layout, so we delegate both reads and + // writes to the mux. This keeps consistency checks, subscriber responses, + // and any later layout-driven computations honest regardless of which + // virtual channel handle is used. + int32_t PrefixSize() const override { return mux_->PrefixSize(); } + void SetPrefixSize(int32_t size) override { mux_->SetPrefixSize(size); } + int32_t ChecksumSize() const override { return mux_->ChecksumSize(); } + void SetChecksumSize(int32_t size) override { mux_->SetChecksumSize(size); } + int32_t MetadataSize() const override { return mux_->MetadataSize(); } + void SetMetadataSize(int32_t size) override { mux_->SetMetadataSize(size); } + std::string ResolvedName() const override { return mux_->ResolvedName(); } void RemoveBuffer(uint64_t session_id) override { mux_->RemoveBuffer(session_id);