feat: add replicator service with NATS JetStream#1007
feat: add replicator service with NATS JetStream#1007bmuddha wants to merge 13 commits intobmuddha/accountsdb/archive-snapshotsfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThe PR adds a new workspace crate Assessment against linked issues
Out-of-scope changes
Suggested reviewers
✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 19-30: MessageEncoder::encode can produce frames larger than the
Receiver's 64KB limit, causing the remote side to reject them; update the
Encoder (MessageEncoder::encode) to compute the serialized message size (e.g.,
via bincode::serialized_size or by serializing into a temporary buffer), compare
it against the same 64 * 1024 frame limit used by the Receiver, and return an
Err(Error) if it exceeds that limit instead of writing an oversize frame; keep
the existing write path when size is within the limit so the existing
dst.put_u32_le / serialize_into logic is unchanged.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
Cargo.tomlmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/connection.rsmagicblock-replicator/src/error.rsmagicblock-replicator/src/lib.rsmagicblock-replicator/src/proto.rsmagicblock-replicator/src/tcp.rs
1acdff2 to
6e3a51e
Compare
23c811a to
be09338
Compare
6e3a51e to
936c1c0
Compare
be09338 to
31292d0
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-replicator/src/proto.rs`:
- Around line 83-145: The signatures are created over raw bytes that overlap
across message types and omit the protocol version; update all signing and
verification to domain-separate and bind version by including an explicit
context string and the protocol version in the signed payload. Concretely,
change each signer call (the constructors that call keypair.sign_message in the
implementations for the first message type (the HandshakeRequest-like impl),
HandshakeResponse::new, and FailoverSignal::new) to sign a concatenation like
b"<MSG_TYPE>|v{PROTOCOL_VERSION}|"+payload (use a distinct MSG_TYPE per struct,
e.g., "HandshakeRequest", "HandshakeResponse", "FailoverSignal"), and update the
corresponding verify methods (the verify implementations for those types) to
reconstruct the same domain-separated bytes (including version) before calling
signature.verify; do the same for error branches in
HandshakeResponse::new/verify so both Ok and Err paths use the same
domain-separated scheme. Ensure VersionedTransaction/Transaction::decode is
unaffected but do not sign raw payloads elsewhere.
In `@magicblock-replicator/src/tcp.rs`:
- Around line 15-23: The connect/split helpers currently hard-code Tokio
TcpStream and so cannot support the WebSocket framing required by the objective;
replace or refactor TcpSender/TcpReceiver creation so the transport is
abstracted and supports WebSocket-frame wrapping (type prefix + bincode
payload). Update the public functions connect and split (and any constructors
like Sender::new / Receiver::new) to work against a Transport trait (or separate
WsStream implementation) that implements send/receive with WebSocket framing, or
provide alternate connect_ws/split_ws that wrap a WebSocket stream into
TcpSender/TcpReceiver equivalents using the required frame format; ensure the
new implementation still returns the same TcpSender/TcpReceiver types or
introduces clearly named WsSender/WsReceiver to avoid breaking callers.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
Cargo.tomlmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/connection.rsmagicblock-replicator/src/error.rsmagicblock-replicator/src/lib.rsmagicblock-replicator/src/proto.rsmagicblock-replicator/src/tcp.rsmagicblock-replicator/src/tests.rs
936c1c0 to
1642077
Compare
31292d0 to
15c008a
Compare
1642077 to
69f7765
Compare
15c008a to
66c7f58
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
magicblock-replicator/src/proto.rs (1)
83-145:⚠️ Potential issue | 🟠 MajorSignatures are not domain-separated and do not bind protocol version.
These constructors/validators sign and verify raw payload bytes only. This permits cross-context replay and does not cryptographically protect
versionfrom tampering.🔐 Suggested hardening pattern
+fn signature_preimage(domain: &'static [u8], version: u32, body: &[u8]) -> Vec<u8> { + let mut out = Vec::with_capacity(domain.len() + 1 + 4 + body.len()); + out.extend_from_slice(domain); + out.push(0); + out.extend_from_slice(&version.to_le_bytes()); + out.extend_from_slice(body); + out +} + impl HandshakeRequest { pub fn new(start_slot: Slot, keypair: &Keypair) -> Self { + let version = PROTOCOL_VERSION; + let msg = signature_preimage(b"HandshakeRequest", version, &start_slot.to_le_bytes()); Self { - version: PROTOCOL_VERSION, + version, start_slot, identity: keypair.pubkey(), - signature: keypair.sign_message(&start_slot.to_le_bytes()), + signature: keypair.sign_message(&msg), } } pub fn verify(&self) -> bool { - self.signature - .verify(self.identity.as_array(), &self.start_slot.to_le_bytes()) + let msg = signature_preimage(b"HandshakeRequest", self.version, &self.start_slot.to_le_bytes()); + self.signature.verify(self.identity.as_array(), &msg) } }Apply the same domain+version preimage approach to
HandshakeResponseandFailoverSignal.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-replicator/src/proto.rs` around lines 83 - 145, Construct and verify signatures over a domain-separated preimage that includes a static domain tag and the PROTOCOL_VERSION to prevent cross-context replay and bind the version; update Handshake::new/verify, HandshakeResponse::new/verify, and FailoverSignal::new/verify so their sign_message input is a concatenation of a fixed domain string (e.g. "magicblock-v1"), PROTOCOL_VERSION (as bytes), and the existing payload bytes (start_slot/slot or result bytes), and make the corresponding verify methods expect that same prefixed preimage when calling signature.verify.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 80-82: The Receiver stream currently maps the inner stream's
Poll::Ready(None) to Poll::Ready(Some(Err(Error::ConnectionClosed))), violating
Stream's contract; update the poll_next implementation in the Receiver (the impl
Stream for Receiver / poll_next method) so that when the inner stream returns
Poll::Ready(None) it returns Poll::Ready(None) (i.e., signal EOF) instead of
emitting a ConnectionClosed error, preserving proper termination semantics for
consumers.
---
Duplicate comments:
In `@magicblock-replicator/src/proto.rs`:
- Around line 83-145: Construct and verify signatures over a domain-separated
preimage that includes a static domain tag and the PROTOCOL_VERSION to prevent
cross-context replay and bind the version; update Handshake::new/verify,
HandshakeResponse::new/verify, and FailoverSignal::new/verify so their
sign_message input is a concatenation of a fixed domain string (e.g.
"magicblock-v1"), PROTOCOL_VERSION (as bytes), and the existing payload bytes
(start_slot/slot or result bytes), and make the corresponding verify methods
expect that same prefixed preimage when calling signature.verify.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: fcc6e96e-ecb4-42d7-857e-e455fb474253
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktest-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
Cargo.tomlmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/connection.rsmagicblock-replicator/src/error.rsmagicblock-replicator/src/lib.rsmagicblock-replicator/src/proto.rsmagicblock-replicator/src/tcp.rsmagicblock-replicator/src/tests.rs
66c7f58 to
7a1d2ff
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
magicblock-replicator/src/connection.rs (1)
80-82:⚠️ Potential issue | 🟠 MajorReturn
Poll::Ready(None)on EOF inStream::poll_next.Mapping EOF to
Some(Err(ConnectionClosed))prevents proper stream termination and can keep stream consumers looping indefinitely after disconnect.Suggested fix
- Poll::Ready(None) => { - Poll::Ready(Some(Err(Error::ConnectionClosed))) - } + Poll::Ready(None) => Poll::Ready(None),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-replicator/src/connection.rs` around lines 80 - 82, In the Stream::poll_next implementation, change the EOF handling so it returns Poll::Ready(None) instead of Poll::Ready(Some(Err(Error::ConnectionClosed))); locate the match arm that currently maps Poll::Ready(None) => Poll::Ready(Some(Err(Error::ConnectionClosed))) and replace it with Poll::Ready(None) so the stream terminates cleanly on EOF while preserving other error paths that should still return Some(Err(...)).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-replicator/src/proto.rs`:
- Around line 23-30: The Message enum is missing the required Heartbeat variant
and currently contains Failover which diverges from the protocol; update the
enum definition (Message) to include Heartbeat(Heartbeat) and remove the
Failover(FailoverSignal) variant unless Failover is part of the spec, and ensure
the corresponding Heartbeat type/name is imported/defined so the protocol
matches the required contract.
- Around line 3-5: The protocol currently relies on bincode's implicit enum
discriminants for Message; change the wire format to prefix an explicit u8
message-type byte before the bincode payload and keep the existing 4-byte LE
length prefix logic. Update serde/serialization code paths that read/write
messages (the encoder/decoder that handle the 4-byte length and
bincode(Message)) to: write length = 1 + payload_len, then write a single u8
type tag mapped from the Message enum variants, followed by the bincode payload;
and on read, use the length to read the frame, read the u8 type tag first and
dispatch/deserialize the remainder into the corresponding Message variant.
Ensure the Message-to-u8 mapping and the inverse dispatch logic are defined
(e.g., a small match on Message variants and a match to construct/deserialize
variants) so the wire format no longer depends on bincode's internal
discriminant ordering.
---
Duplicate comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 80-82: In the Stream::poll_next implementation, change the EOF
handling so it returns Poll::Ready(None) instead of
Poll::Ready(Some(Err(Error::ConnectionClosed))); locate the match arm that
currently maps Poll::Ready(None) =>
Poll::Ready(Some(Err(Error::ConnectionClosed))) and replace it with
Poll::Ready(None) so the stream terminates cleanly on EOF while preserving other
error paths that should still return Some(Err(...)).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: b72f2e54-f6b2-4403-9775-3af27b6b5f55
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktest-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
Cargo.tomlmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/connection.rsmagicblock-replicator/src/error.rsmagicblock-replicator/src/lib.rsmagicblock-replicator/src/proto.rsmagicblock-replicator/src/tcp.rsmagicblock-replicator/src/tests.rs
7a1d2ff to
0aca470
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (3)
magicblock-replicator/src/connection.rs (1)
80-82:⚠️ Potential issue | 🟠 MajorReturn
Poll::Ready(None)on EOF inStreamimplementation.This branch should terminate the stream instead of yielding
ConnectionClosed; otherwisewhile let Some(...)consumers may never exit on disconnect.♻️ Suggested fix
- Poll::Ready(None) => { - Poll::Ready(Some(Err(Error::ConnectionClosed))) - } + Poll::Ready(None) => Poll::Ready(None),#!/bin/bash set -euo pipefail echo "Inspecting EOF branch in Receiver::poll_next..." nl -ba magicblock-replicator/src/connection.rs | sed -n '70,86p' echo echo "Searching for EOF mapping to ConnectionClosed..." rg -n 'Poll::Ready\(None\)|ConnectionClosed' magicblock-replicator/src/connection.rs -C 2Expected verification result: EOF path should be
Poll::Ready(None)and notPoll::Ready(Some(Err(Error::ConnectionClosed))).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-replicator/src/connection.rs` around lines 80 - 82, In the Stream implementation for Receiver (Receiver::poll_next) the EOF branch currently maps Poll::Ready(None) to Poll::Ready(Some(Err(Error::ConnectionClosed))); change this so the EOF case returns Poll::Ready(None) to properly terminate the stream instead of yielding a ConnectionClosed error; locate the match/branch in connection.rs that checks for Poll::Ready(None) and replace the wrapped Some(Err(Error::ConnectionClosed)) return with a plain Poll::Ready(None).magicblock-replicator/src/proto.rs (2)
23-30:⚠️ Potential issue | 🟠 Major
Heartbeatmessage type is missing from protocol enum.The issue objective lists
Heartbeatas a required wire message. Please add it (or update the objective/spec reference if requirements changed) before merging.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-replicator/src/proto.rs` around lines 23 - 30, The Message enum is missing the Heartbeat variant required by the protocol; add a Heartbeat(Heartbeat) variant to the Message enum (the enum named Message in proto.rs) and ensure the Heartbeat type (struct/alias) is defined or imported into this module so serialization/deserialization and matches over Message compile; update any match statements or serialization logic that enumerate Message variants (e.g., handlers for HandshakeReq/Resp, Transaction, Block, SuperBlock, Failover) to handle the new Heartbeat variant accordingly.
3-5:⚠️ Potential issue | 🟠 MajorWire format still depends on bincode’s implicit enum discriminants, not explicit byte type IDs.
The linked protocol objective calls for byte-identified message types with a type prefix. Encoding
Messagedirectly via bincode keeps the type tag implicit and serializer-coupled.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@magicblock-replicator/src/proto.rs` around lines 3 - 5, The Message enum is currently serialized via bincode which uses implicit, serializer-specific discriminants; instead implement an explicit byte type-ID prefix to satisfy the protocol: assign each Message variant a stable u8 type id, update the Message (de)serialization paths to write/read one leading byte (the variant id) followed by the bincode-encoded payload of that variant (or no payload) and use that byte to dispatch decoding to the correct variant; update the corresponding serialize/deserialize helpers (e.g., the Message::encode / Message::decode or the read_message/write_message routines in proto.rs) to perform this two-step process and remove reliance on bincode’s implicit enum tag.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@magicblock-replicator/src/connection.rs`:
- Around line 80-82: In the Stream implementation for Receiver
(Receiver::poll_next) the EOF branch currently maps Poll::Ready(None) to
Poll::Ready(Some(Err(Error::ConnectionClosed))); change this so the EOF case
returns Poll::Ready(None) to properly terminate the stream instead of yielding a
ConnectionClosed error; locate the match/branch in connection.rs that checks for
Poll::Ready(None) and replace the wrapped Some(Err(Error::ConnectionClosed))
return with a plain Poll::Ready(None).
In `@magicblock-replicator/src/proto.rs`:
- Around line 23-30: The Message enum is missing the Heartbeat variant required
by the protocol; add a Heartbeat(Heartbeat) variant to the Message enum (the
enum named Message in proto.rs) and ensure the Heartbeat type (struct/alias) is
defined or imported into this module so serialization/deserialization and
matches over Message compile; update any match statements or serialization logic
that enumerate Message variants (e.g., handlers for HandshakeReq/Resp,
Transaction, Block, SuperBlock, Failover) to handle the new Heartbeat variant
accordingly.
- Around line 3-5: The Message enum is currently serialized via bincode which
uses implicit, serializer-specific discriminants; instead implement an explicit
byte type-ID prefix to satisfy the protocol: assign each Message variant a
stable u8 type id, update the Message (de)serialization paths to write/read one
leading byte (the variant id) followed by the bincode-encoded payload of that
variant (or no payload) and use that byte to dispatch decoding to the correct
variant; update the corresponding serialize/deserialize helpers (e.g., the
Message::encode / Message::decode or the read_message/write_message routines in
proto.rs) to perform this two-step process and remove reliance on bincode’s
implicit enum tag.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 44576bcc-6961-4eb6-85ce-bc7f45a82c6b
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktest-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
Cargo.tomlmagicblock-replicator/Cargo.tomlmagicblock-replicator/src/connection.rsmagicblock-replicator/src/error.rsmagicblock-replicator/src/lib.rsmagicblock-replicator/src/proto.rsmagicblock-replicator/src/tcp.rsmagicblock-replicator/src/tests.rs
0aca470 to
f025746
Compare
40fa0de to
ee85613
Compare
ee85613 to
ba76d02
Compare
8c2636d to
109fa64
Compare
56b9fbe to
43a9a23
Compare
109fa64 to
de66c05
Compare
| pub(crate) async fn new(broker: &Broker) -> Self { | ||
| let watch = loop { | ||
| let store = match broker.ctx.get_key_value(cfg::PRODUCER_LOCK).await | ||
| { | ||
| Ok(s) => s, | ||
| Err(error) => { | ||
| tracing::error!(%error, "failed to obtain lock object"); | ||
| continue; | ||
| } | ||
| }; | ||
| match store.watch(cfg::LOCK_KEY).await { | ||
| Ok(w) => break Box::new(w), | ||
| Err(error) => { | ||
| tracing::error!(%error, "failed to create lock watcher"); | ||
| continue; | ||
| } | ||
| } | ||
| }; |
There was a problem hiding this comment.
new() loops forever on errors with no backoff and no cancellation path. If the KV store is unavailable for a while this will spin-loop consuming CPU. Consider adding a sleep between retries (like CONSUMER_RETRY_DELAY in context.rs) and/or accepting a CancellationToken so the caller can abort.
| tokio::spawn(async move { | ||
| if let Err(error) = store.put(meta, &mut file).await { | ||
| error!(%error, "snapshot upload failed"); | ||
| } | ||
| }); | ||
|
|
There was a problem hiding this comment.
Two concerns with this tokio::spawn:
-
The error is logged but there's no feedback to the caller - the
put_snapshotmethod returnsOk(())immediately even if upload later fails. Consider returning aJoinHandleso the caller can optionally check. -
More importantly, if
Service::spawnruns on a single-threadedcurrent_threadruntime then atokio::spawninside that runtime will only make progress if the caller yields. SincePrimary::runloops intokio::select!, this should mostly work, but it's subtle, worth a comment or consider usingtokio::task::spawn_localto make the intent explicit.
| let obsolete = self.ctx.slot == slot && self.ctx.index >= index; | ||
| if self.ctx.slot > slot || obsolete { | ||
| return; | ||
| } |
There was a problem hiding this comment.
The duplicate-skip logic has a subtle issue: when self.ctx.slot == slot and self.ctx.index >= index, it skips (correct for same slot). But when self.ctx.slot > slot it also skips. However, after a role transition from primary->standby, the consumer starts from DeliverPolicy::All (no snapshot sequence), so it will replay everything from the beginning of the stream, meaning all messages will be skipped until the stream catches up to the current slot. This is correct for dedup, but it means an O(n) scan through the entire stream on every demotion. Is that intentional? If not, consider using start_seq from the broker's sequence when creating the consumer during demotion.
| pub fn spawn(self) -> JoinHandle<Result<()>> { | ||
| std::thread::spawn(move || { | ||
| let runtime = Builder::new_current_thread() | ||
| .thread_name("replication-service") | ||
| .build() | ||
| .expect("Failed to build replication service runtime"); | ||
|
|
||
| runtime.block_on(tokio::task::unconstrained(self.run())) | ||
| }) | ||
| } |
There was a problem hiding this comment.
tokio::task::unconstrained on a current_thread runtime means the future runs to completion without yielding to the runtime's task scheduler. This works because the select! in Primary::run/Standby::run already yields at each iteration, but if any branch ever does a long synchronous operation it will block the entire runtime with no preemption. Worth documenting this invariant or reconsidering if you need cooperative scheduling guarantees in the future.
| pub async fn into_standby( | ||
| self, | ||
| messages: Receiver<Message>, | ||
| ) -> Result<Standby> { | ||
| let consumer = Box::new(self.create_consumer(None).await); | ||
| let watcher = LockWatcher::new(&self.broker).await; | ||
| self.enter_replica_mode().await; | ||
| Ok(Standby::new(self, consumer, messages, watcher)) |
There was a problem hiding this comment.
When transitioning to standby, the consumer is created with start_seq: None, which means DeliverPolicy::All, replaying from the very beginning of the stream. Combined with the dedup logic in Standby::handle_message, this means every demotion scans through the entire event history. Consider passing Some(self.broker.sequence) (or similar) as start_seq to resume from where the primary left off.
| async fn publish(&mut self, msg: Message) -> Result<()> { | ||
| let payload = match bincode::serialize(&msg) { | ||
| Ok(p) => p, | ||
| Err(error) => { | ||
| error!(%error, "serialization failed, should never happen"); | ||
| return Ok(()); | ||
| } | ||
| }; | ||
| let subject = msg.subject(); | ||
| let (slot, index) = msg.slot_and_index(); | ||
| let ack = matches!(msg, Message::SuperBlock(_)); | ||
|
|
||
| self.ctx | ||
| .broker | ||
| .publish(subject, payload.into(), ack) | ||
| .await?; | ||
| self.ctx.update_position(slot, index); | ||
| Ok(()) |
There was a problem hiding this comment.
When serialization fails (line 78), the method returns Ok(()), silently dropping the message. This means the standby will never see this transaction, creating a silent divergence. Since serialization of these types should never fail (they're all Serialize), consider at least bumping this to error! level with some identifying info, or even panicking if this truly "should never happen".
| pub async fn messages(&self) -> MessageStream { | ||
| loop { | ||
| let result = self | ||
| .inner | ||
| .stream() | ||
| .max_messages_per_batch(cfg::BATCH_SIZE) | ||
| .messages() | ||
| .await; | ||
| match result { | ||
| Ok(s) => break s, | ||
| Err(error) => { | ||
| warn!(%error, "failed to create message stream") | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
messages() loops forever on failure with no backoff, similar to LockWatcher::new. If the stream is temporarily unavailable this will spin. Consider adding a small delay on the error path.
thlorenz
left a comment
There was a problem hiding this comment.
Nice job!
The architecture is clean with well-separated concerns (nats layer, service layer, protocol).
A few items worth addressing that I pointed out, approving already though, but please address before merging.
2a3b174 to
c501cd7
Compare
edf70ca to
1e97a73
Compare

Summary
Adds
magicblock-replicatorcrate implementing primary-standby state replication via NATS JetStream:Compatibility
Testing
magicblock-replicator/src/tests.rsChecklist