Skip to content

feat: add replicator service with NATS JetStream#1007

Open
bmuddha wants to merge 13 commits intobmuddha/accountsdb/archive-snapshotsfrom
bmuddha/replication/protocol
Open

feat: add replicator service with NATS JetStream#1007
bmuddha wants to merge 13 commits intobmuddha/accountsdb/archive-snapshotsfrom
bmuddha/replication/protocol

Conversation

@bmuddha
Copy link
Collaborator

@bmuddha bmuddha commented Mar 1, 2026

Summary

Adds magicblock-replicator crate implementing primary-standby state replication via NATS JetStream:

  • Protocol: Transaction/Block/SuperBlock message types with bincode serialization
  • NATS module: Broker, Producer (with leader lock), Consumer, Snapshot, LockWatcher
  • Service layer: Automatic Primary↔Standby role transitions with leader election
  • Watcher: Directory monitor for AccountsDb snapshot archives

Compatibility

  • No breaking changes

Testing

  • unit tests in magicblock-replicator/src/tests.rs

Checklist

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 1, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

The PR adds a new workspace crate magicblock-replicator and adds bytes = "1.0" to workspace dependencies. The crate provides a wire protocol (types and signing) in proto.rs, an error module, a length-prefixed bincode codec with Sender/Receiver wrappers in connection.rs, TCP helpers (tcp.rs) to connect/split streams into sender/receiver halves, crate root exports in lib.rs, and an async test suite validating framing, signatures, and end-to-end transport.

Assessment against linked issues

Objective (issue) Addressed Explanation
Message types defined with byte identifiers (951) Message enum uses Serde variants without an explicit explicit byte identifier or separate type-prefix field in the wire format.
Bincode serialization/deserialization works (951)
Codec frame wrapper implemented (951)
Unit tests for round-trip encoding/decoding (951)

Out-of-scope changes

Code Change Explanation
Cryptographic signing/verification logic added (magicblock-replicator/src/proto.rs) The proto module adds signature creation/verification and uses Solana key types, which are application-layer crypto beyond the requested codec+frame wrapper objective.
Multiple Solana-specific dependencies added (magicblock-replicator/Cargo.toml) Introducing solana-* crates embeds blockchain-specific types into the crate that are not required by the framing/codec acceptance criteria.
Workspace dependency bytes = "1.0" added (Cargo.toml) Adding a workspace-level dependency changes workspace resolution scope rather than being limited to the new crate's implementation.

Suggested reviewers

  • thlorenz
  • GabrielePicco
  • taco-paco
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch bmuddha/replication/protocol
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Collaborator Author

bmuddha commented Mar 1, 2026

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 19-30: MessageEncoder::encode can produce frames larger than the
Receiver's 64KB limit, causing the remote side to reject them; update the
Encoder (MessageEncoder::encode) to compute the serialized message size (e.g.,
via bincode::serialized_size or by serializing into a temporary buffer), compare
it against the same 64 * 1024 frame limit used by the Receiver, and return an
Err(Error) if it exceeds that limit instead of writing an oversize frame; keep
the existing write path when size is within the limit so the existing
dst.put_u32_le / serialize_into logic is unchanged.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1acdff2 and 23c811a.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (7)
  • Cargo.toml
  • magicblock-replicator/Cargo.toml
  • magicblock-replicator/src/connection.rs
  • magicblock-replicator/src/error.rs
  • magicblock-replicator/src/lib.rs
  • magicblock-replicator/src/proto.rs
  • magicblock-replicator/src/tcp.rs

@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from 1acdff2 to 6e3a51e Compare March 1, 2026 17:18
@bmuddha bmuddha force-pushed the bmuddha/replication/protocol branch from 23c811a to be09338 Compare March 1, 2026 17:18
@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from 6e3a51e to 936c1c0 Compare March 2, 2026 10:41
@bmuddha bmuddha force-pushed the bmuddha/replication/protocol branch from be09338 to 31292d0 Compare March 2, 2026 10:41
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-replicator/src/proto.rs`:
- Around line 83-145: The signatures are created over raw bytes that overlap
across message types and omit the protocol version; update all signing and
verification to domain-separate and bind version by including an explicit
context string and the protocol version in the signed payload. Concretely,
change each signer call (the constructors that call keypair.sign_message in the
implementations for the first message type (the HandshakeRequest-like impl),
HandshakeResponse::new, and FailoverSignal::new) to sign a concatenation like
b"<MSG_TYPE>|v{PROTOCOL_VERSION}|"+payload (use a distinct MSG_TYPE per struct,
e.g., "HandshakeRequest", "HandshakeResponse", "FailoverSignal"), and update the
corresponding verify methods (the verify implementations for those types) to
reconstruct the same domain-separated bytes (including version) before calling
signature.verify; do the same for error branches in
HandshakeResponse::new/verify so both Ok and Err paths use the same
domain-separated scheme. Ensure VersionedTransaction/Transaction::decode is
unaffected but do not sign raw payloads elsewhere.

In `@magicblock-replicator/src/tcp.rs`:
- Around line 15-23: The connect/split helpers currently hard-code Tokio
TcpStream and so cannot support the WebSocket framing required by the objective;
replace or refactor TcpSender/TcpReceiver creation so the transport is
abstracted and supports WebSocket-frame wrapping (type prefix + bincode
payload). Update the public functions connect and split (and any constructors
like Sender::new / Receiver::new) to work against a Transport trait (or separate
WsStream implementation) that implements send/receive with WebSocket framing, or
provide alternate connect_ws/split_ws that wrap a WebSocket stream into
TcpSender/TcpReceiver equivalents using the required frame format; ensure the
new implementation still returns the same TcpSender/TcpReceiver types or
introduces clearly named WsSender/WsReceiver to avoid breaking callers.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 23c811a and 31292d0.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (8)
  • Cargo.toml
  • magicblock-replicator/Cargo.toml
  • magicblock-replicator/src/connection.rs
  • magicblock-replicator/src/error.rs
  • magicblock-replicator/src/lib.rs
  • magicblock-replicator/src/proto.rs
  • magicblock-replicator/src/tcp.rs
  • magicblock-replicator/src/tests.rs

@bmuddha bmuddha marked this pull request as ready for review March 2, 2026 17:37
@bmuddha bmuddha self-assigned this Mar 2, 2026
@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from 936c1c0 to 1642077 Compare March 3, 2026 18:28
@bmuddha bmuddha force-pushed the bmuddha/replication/protocol branch from 31292d0 to 15c008a Compare March 3, 2026 18:28
@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from 1642077 to 69f7765 Compare March 4, 2026 12:00
@bmuddha bmuddha force-pushed the bmuddha/replication/protocol branch from 15c008a to 66c7f58 Compare March 4, 2026 12:00
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
magicblock-replicator/src/proto.rs (1)

83-145: ⚠️ Potential issue | 🟠 Major

Signatures are not domain-separated and do not bind protocol version.

These constructors/validators sign and verify raw payload bytes only. This permits cross-context replay and does not cryptographically protect version from tampering.

🔐 Suggested hardening pattern
+fn signature_preimage(domain: &'static [u8], version: u32, body: &[u8]) -> Vec<u8> {
+    let mut out = Vec::with_capacity(domain.len() + 1 + 4 + body.len());
+    out.extend_from_slice(domain);
+    out.push(0);
+    out.extend_from_slice(&version.to_le_bytes());
+    out.extend_from_slice(body);
+    out
+}
+
 impl HandshakeRequest {
     pub fn new(start_slot: Slot, keypair: &Keypair) -> Self {
+        let version = PROTOCOL_VERSION;
+        let msg = signature_preimage(b"HandshakeRequest", version, &start_slot.to_le_bytes());
         Self {
-            version: PROTOCOL_VERSION,
+            version,
             start_slot,
             identity: keypair.pubkey(),
-            signature: keypair.sign_message(&start_slot.to_le_bytes()),
+            signature: keypair.sign_message(&msg),
         }
     }

     pub fn verify(&self) -> bool {
-        self.signature
-            .verify(self.identity.as_array(), &self.start_slot.to_le_bytes())
+        let msg = signature_preimage(b"HandshakeRequest", self.version, &self.start_slot.to_le_bytes());
+        self.signature.verify(self.identity.as_array(), &msg)
     }
 }

Apply the same domain+version preimage approach to HandshakeResponse and FailoverSignal.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/proto.rs` around lines 83 - 145, Construct and
verify signatures over a domain-separated preimage that includes a static domain
tag and the PROTOCOL_VERSION to prevent cross-context replay and bind the
version; update Handshake::new/verify, HandshakeResponse::new/verify, and
FailoverSignal::new/verify so their sign_message input is a concatenation of a
fixed domain string (e.g. "magicblock-v1"), PROTOCOL_VERSION (as bytes), and the
existing payload bytes (start_slot/slot or result bytes), and make the
corresponding verify methods expect that same prefixed preimage when calling
signature.verify.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 80-82: The Receiver stream currently maps the inner stream's
Poll::Ready(None) to Poll::Ready(Some(Err(Error::ConnectionClosed))), violating
Stream's contract; update the poll_next implementation in the Receiver (the impl
Stream for Receiver / poll_next method) so that when the inner stream returns
Poll::Ready(None) it returns Poll::Ready(None) (i.e., signal EOF) instead of
emitting a ConnectionClosed error, preserving proper termination semantics for
consumers.

---

Duplicate comments:
In `@magicblock-replicator/src/proto.rs`:
- Around line 83-145: Construct and verify signatures over a domain-separated
preimage that includes a static domain tag and the PROTOCOL_VERSION to prevent
cross-context replay and bind the version; update Handshake::new/verify,
HandshakeResponse::new/verify, and FailoverSignal::new/verify so their
sign_message input is a concatenation of a fixed domain string (e.g.
"magicblock-v1"), PROTOCOL_VERSION (as bytes), and the existing payload bytes
(start_slot/slot or result bytes), and make the corresponding verify methods
expect that same prefixed preimage when calling signature.verify.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: fcc6e96e-ecb4-42d7-857e-e455fb474253

📥 Commits

Reviewing files that changed from the base of the PR and between 31292d0 and 66c7f58.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • test-integration/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (8)
  • Cargo.toml
  • magicblock-replicator/Cargo.toml
  • magicblock-replicator/src/connection.rs
  • magicblock-replicator/src/error.rs
  • magicblock-replicator/src/lib.rs
  • magicblock-replicator/src/proto.rs
  • magicblock-replicator/src/tcp.rs
  • magicblock-replicator/src/tests.rs

@bmuddha bmuddha force-pushed the bmuddha/replication/protocol branch from 66c7f58 to 7a1d2ff Compare March 4, 2026 13:58
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
magicblock-replicator/src/connection.rs (1)

80-82: ⚠️ Potential issue | 🟠 Major

Return Poll::Ready(None) on EOF in Stream::poll_next.

Mapping EOF to Some(Err(ConnectionClosed)) prevents proper stream termination and can keep stream consumers looping indefinitely after disconnect.

Suggested fix
-            Poll::Ready(None) => {
-                Poll::Ready(Some(Err(Error::ConnectionClosed)))
-            }
+            Poll::Ready(None) => Poll::Ready(None),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/connection.rs` around lines 80 - 82, In the
Stream::poll_next implementation, change the EOF handling so it returns
Poll::Ready(None) instead of Poll::Ready(Some(Err(Error::ConnectionClosed)));
locate the match arm that currently maps Poll::Ready(None) =>
Poll::Ready(Some(Err(Error::ConnectionClosed))) and replace it with
Poll::Ready(None) so the stream terminates cleanly on EOF while preserving other
error paths that should still return Some(Err(...)).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-replicator/src/proto.rs`:
- Around line 23-30: The Message enum is missing the required Heartbeat variant
and currently contains Failover which diverges from the protocol; update the
enum definition (Message) to include Heartbeat(Heartbeat) and remove the
Failover(FailoverSignal) variant unless Failover is part of the spec, and ensure
the corresponding Heartbeat type/name is imported/defined so the protocol
matches the required contract.
- Around line 3-5: The protocol currently relies on bincode's implicit enum
discriminants for Message; change the wire format to prefix an explicit u8
message-type byte before the bincode payload and keep the existing 4-byte LE
length prefix logic. Update serde/serialization code paths that read/write
messages (the encoder/decoder that handle the 4-byte length and
bincode(Message)) to: write length = 1 + payload_len, then write a single u8
type tag mapped from the Message enum variants, followed by the bincode payload;
and on read, use the length to read the frame, read the u8 type tag first and
dispatch/deserialize the remainder into the corresponding Message variant.
Ensure the Message-to-u8 mapping and the inverse dispatch logic are defined
(e.g., a small match on Message variants and a match to construct/deserialize
variants) so the wire format no longer depends on bincode's internal
discriminant ordering.

---

Duplicate comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 80-82: In the Stream::poll_next implementation, change the EOF
handling so it returns Poll::Ready(None) instead of
Poll::Ready(Some(Err(Error::ConnectionClosed))); locate the match arm that
currently maps Poll::Ready(None) =>
Poll::Ready(Some(Err(Error::ConnectionClosed))) and replace it with
Poll::Ready(None) so the stream terminates cleanly on EOF while preserving other
error paths that should still return Some(Err(...)).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: b72f2e54-f6b2-4403-9775-3af27b6b5f55

📥 Commits

Reviewing files that changed from the base of the PR and between 66c7f58 and 7a1d2ff.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • test-integration/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (8)
  • Cargo.toml
  • magicblock-replicator/Cargo.toml
  • magicblock-replicator/src/connection.rs
  • magicblock-replicator/src/error.rs
  • magicblock-replicator/src/lib.rs
  • magicblock-replicator/src/proto.rs
  • magicblock-replicator/src/tcp.rs
  • magicblock-replicator/src/tests.rs

@bmuddha bmuddha force-pushed the bmuddha/replication/protocol branch from 7a1d2ff to 0aca470 Compare March 5, 2026 11:46
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (3)
magicblock-replicator/src/connection.rs (1)

80-82: ⚠️ Potential issue | 🟠 Major

Return Poll::Ready(None) on EOF in Stream implementation.

This branch should terminate the stream instead of yielding ConnectionClosed; otherwise while let Some(...) consumers may never exit on disconnect.

♻️ Suggested fix
-            Poll::Ready(None) => {
-                Poll::Ready(Some(Err(Error::ConnectionClosed)))
-            }
+            Poll::Ready(None) => Poll::Ready(None),
#!/bin/bash
set -euo pipefail

echo "Inspecting EOF branch in Receiver::poll_next..."
nl -ba magicblock-replicator/src/connection.rs | sed -n '70,86p'

echo
echo "Searching for EOF mapping to ConnectionClosed..."
rg -n 'Poll::Ready\(None\)|ConnectionClosed' magicblock-replicator/src/connection.rs -C 2

Expected verification result: EOF path should be Poll::Ready(None) and not Poll::Ready(Some(Err(Error::ConnectionClosed))).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/connection.rs` around lines 80 - 82, In the Stream
implementation for Receiver (Receiver::poll_next) the EOF branch currently maps
Poll::Ready(None) to Poll::Ready(Some(Err(Error::ConnectionClosed))); change
this so the EOF case returns Poll::Ready(None) to properly terminate the stream
instead of yielding a ConnectionClosed error; locate the match/branch in
connection.rs that checks for Poll::Ready(None) and replace the wrapped
Some(Err(Error::ConnectionClosed)) return with a plain Poll::Ready(None).
magicblock-replicator/src/proto.rs (2)

23-30: ⚠️ Potential issue | 🟠 Major

Heartbeat message type is missing from protocol enum.

The issue objective lists Heartbeat as a required wire message. Please add it (or update the objective/spec reference if requirements changed) before merging.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/proto.rs` around lines 23 - 30, The Message enum is
missing the Heartbeat variant required by the protocol; add a
Heartbeat(Heartbeat) variant to the Message enum (the enum named Message in
proto.rs) and ensure the Heartbeat type (struct/alias) is defined or imported
into this module so serialization/deserialization and matches over Message
compile; update any match statements or serialization logic that enumerate
Message variants (e.g., handlers for HandshakeReq/Resp, Transaction, Block,
SuperBlock, Failover) to handle the new Heartbeat variant accordingly.

3-5: ⚠️ Potential issue | 🟠 Major

Wire format still depends on bincode’s implicit enum discriminants, not explicit byte type IDs.

The linked protocol objective calls for byte-identified message types with a type prefix. Encoding Message directly via bincode keeps the type tag implicit and serializer-coupled.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-replicator/src/proto.rs` around lines 3 - 5, The Message enum is
currently serialized via bincode which uses implicit, serializer-specific
discriminants; instead implement an explicit byte type-ID prefix to satisfy the
protocol: assign each Message variant a stable u8 type id, update the Message
(de)serialization paths to write/read one leading byte (the variant id) followed
by the bincode-encoded payload of that variant (or no payload) and use that byte
to dispatch decoding to the correct variant; update the corresponding
serialize/deserialize helpers (e.g., the Message::encode / Message::decode or
the read_message/write_message routines in proto.rs) to perform this two-step
process and remove reliance on bincode’s implicit enum tag.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 80-82: In the Stream implementation for Receiver
(Receiver::poll_next) the EOF branch currently maps Poll::Ready(None) to
Poll::Ready(Some(Err(Error::ConnectionClosed))); change this so the EOF case
returns Poll::Ready(None) to properly terminate the stream instead of yielding a
ConnectionClosed error; locate the match/branch in connection.rs that checks for
Poll::Ready(None) and replace the wrapped Some(Err(Error::ConnectionClosed))
return with a plain Poll::Ready(None).

In `@magicblock-replicator/src/proto.rs`:
- Around line 23-30: The Message enum is missing the Heartbeat variant required
by the protocol; add a Heartbeat(Heartbeat) variant to the Message enum (the
enum named Message in proto.rs) and ensure the Heartbeat type (struct/alias) is
defined or imported into this module so serialization/deserialization and
matches over Message compile; update any match statements or serialization logic
that enumerate Message variants (e.g., handlers for HandshakeReq/Resp,
Transaction, Block, SuperBlock, Failover) to handle the new Heartbeat variant
accordingly.
- Around line 3-5: The Message enum is currently serialized via bincode which
uses implicit, serializer-specific discriminants; instead implement an explicit
byte type-ID prefix to satisfy the protocol: assign each Message variant a
stable u8 type id, update the Message (de)serialization paths to write/read one
leading byte (the variant id) followed by the bincode-encoded payload of that
variant (or no payload) and use that byte to dispatch decoding to the correct
variant; update the corresponding serialize/deserialize helpers (e.g., the
Message::encode / Message::decode or the read_message/write_message routines in
proto.rs) to perform this two-step process and remove reliance on bincode’s
implicit enum tag.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 44576bcc-6961-4eb6-85ce-bc7f45a82c6b

📥 Commits

Reviewing files that changed from the base of the PR and between 7a1d2ff and 0aca470.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • test-integration/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (8)
  • Cargo.toml
  • magicblock-replicator/Cargo.toml
  • magicblock-replicator/src/connection.rs
  • magicblock-replicator/src/error.rs
  • magicblock-replicator/src/lib.rs
  • magicblock-replicator/src/proto.rs
  • magicblock-replicator/src/tcp.rs
  • magicblock-replicator/src/tests.rs

@bmuddha bmuddha force-pushed the bmuddha/replication/protocol branch from 0aca470 to f025746 Compare March 5, 2026 16:55
@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from 40fa0de to ee85613 Compare March 5, 2026 16:55
@bmuddha bmuddha marked this pull request as draft March 5, 2026 17:33
@bmuddha bmuddha force-pushed the bmuddha/scheduler/dual-mode branch from ee85613 to ba76d02 Compare March 6, 2026 08:12
@bmuddha bmuddha changed the base branch from bmuddha/scheduler/dual-mode to graphite-base/1007 March 9, 2026 09:57
@bmuddha bmuddha force-pushed the bmuddha/replication/protocol branch from 8c2636d to 109fa64 Compare March 10, 2026 19:23
@bmuddha bmuddha force-pushed the graphite-base/1007 branch from 56b9fbe to 43a9a23 Compare March 10, 2026 19:23
@bmuddha bmuddha force-pushed the bmuddha/replication/protocol branch from 109fa64 to de66c05 Compare March 11, 2026 17:24
@bmuddha bmuddha changed the base branch from graphite-base/1007 to bmuddha/accountsdb/archive-snapshots March 11, 2026 17:24
@bmuddha bmuddha changed the title feat: added replcator crate with basic protocol impl feat: add replicator service with NATS JetStream Mar 12, 2026
@bmuddha bmuddha requested review from GabrielePicco and removed request for GabrielePicco March 12, 2026 18:45
@bmuddha bmuddha marked this pull request as ready for review March 12, 2026 18:45
Comment on lines +20 to +37
pub(crate) async fn new(broker: &Broker) -> Self {
let watch = loop {
let store = match broker.ctx.get_key_value(cfg::PRODUCER_LOCK).await
{
Ok(s) => s,
Err(error) => {
tracing::error!(%error, "failed to obtain lock object");
continue;
}
};
match store.watch(cfg::LOCK_KEY).await {
Ok(w) => break Box::new(w),
Err(error) => {
tracing::error!(%error, "failed to create lock watcher");
continue;
}
}
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new() loops forever on errors with no backoff and no cancellation path. If the KV store is unavailable for a while this will spin-loop consuming CPU. Consider adding a sleep between retries (like CONSUMER_RETRY_DELAY in context.rs) and/or accepting a CancellationToken so the caller can abort.

Comment on lines +162 to +167
tokio::spawn(async move {
if let Err(error) = store.put(meta, &mut file).await {
error!(%error, "snapshot upload failed");
}
});

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two concerns with this tokio::spawn:

  1. The error is logged but there's no feedback to the caller - the put_snapshot method returns Ok(()) immediately even if upload later fails. Consider returning a JoinHandle so the caller can optionally check.

  2. More importantly, if Service::spawn runs on a single-threaded current_thread runtime then a tokio::spawn inside that runtime will only make progress if the caller yields. Since Primary::run loops in tokio::select!, this should mostly work, but it's subtle, worth a comment or consider using tokio::task::spawn_local to make the intent explicit.

Comment on lines +99 to +102
let obsolete = self.ctx.slot == slot && self.ctx.index >= index;
if self.ctx.slot > slot || obsolete {
return;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The duplicate-skip logic has a subtle issue: when self.ctx.slot == slot and self.ctx.index >= index, it skips (correct for same slot). But when self.ctx.slot > slot it also skips. However, after a role transition from primary->standby, the consumer starts from DeliverPolicy::All (no snapshot sequence), so it will replay everything from the beginning of the stream, meaning all messages will be skipped until the stream catches up to the current slot. This is correct for dedup, but it means an O(n) scan through the entire stream on every demotion. Is that intentional? If not, consider using start_seq from the broker's sequence when creating the consumer during demotion.

Comment on lines +105 to +114
pub fn spawn(self) -> JoinHandle<Result<()>> {
std::thread::spawn(move || {
let runtime = Builder::new_current_thread()
.thread_name("replication-service")
.build()
.expect("Failed to build replication service runtime");

runtime.block_on(tokio::task::unconstrained(self.run()))
})
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio::task::unconstrained on a current_thread runtime means the future runs to completion without yielding to the runtime's task scheduler. This works because the select! in Primary::run/Standby::run already yields at each iteration, but if any branch ever does a long synchronous operation it will block the entire runtime with no preemption. Worth documenting this invariant or reconsidering if you need cooperative scheduling guarantees in the future.

Comment on lines +151 to +158
pub async fn into_standby(
self,
messages: Receiver<Message>,
) -> Result<Standby> {
let consumer = Box::new(self.create_consumer(None).await);
let watcher = LockWatcher::new(&self.broker).await;
self.enter_replica_mode().await;
Ok(Standby::new(self, consumer, messages, watcher))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When transitioning to standby, the consumer is created with start_seq: None, which means DeliverPolicy::All, replaying from the very beginning of the stream. Combined with the dedup logic in Standby::handle_message, this means every demotion scans through the entire event history. Consider passing Some(self.broker.sequence) (or similar) as start_seq to resume from where the primary left off.

Comment on lines +74 to +91
async fn publish(&mut self, msg: Message) -> Result<()> {
let payload = match bincode::serialize(&msg) {
Ok(p) => p,
Err(error) => {
error!(%error, "serialization failed, should never happen");
return Ok(());
}
};
let subject = msg.subject();
let (slot, index) = msg.slot_and_index();
let ack = matches!(msg, Message::SuperBlock(_));

self.ctx
.broker
.publish(subject, payload.into(), ack)
.await?;
self.ctx.update_position(slot, index);
Ok(())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When serialization fails (line 78), the method returns Ok(()), silently dropping the message. This means the standby will never see this transaction, creating a silent divergence. Since serialization of these types should never fail (they're all Serialize), consider at least bumping this to error! level with some identifying info, or even panicking if this truly "should never happen".

Comment on lines +65 to +80
pub async fn messages(&self) -> MessageStream {
loop {
let result = self
.inner
.stream()
.max_messages_per_batch(cfg::BATCH_SIZE)
.messages()
.await;
match result {
Ok(s) => break s,
Err(error) => {
warn!(%error, "failed to create message stream")
}
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messages() loops forever on failure with no backoff, similar to LockWatcher::new. If the stream is temporarily unavailable this will spin. Consider adding a small delay on the error path.

Copy link
Collaborator

@thlorenz thlorenz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice job!
The architecture is clean with well-separated concerns (nats layer, service layer, protocol).
A few items worth addressing that I pointed out, approving already though, but please address before merging.

@bmuddha bmuddha force-pushed the bmuddha/accountsdb/archive-snapshots branch from 2a3b174 to c501cd7 Compare March 13, 2026 13:58
@bmuddha bmuddha force-pushed the bmuddha/replication/protocol branch from edf70ca to 1e97a73 Compare March 13, 2026 13:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants