Skip to content

Commit 7dabf9e

Browse files
committed
feat: add stream-oriented AppendEntries API for pipelined replication
Add `Raft::stream_append()` method that allows processing multiple AppendEntries requests through a pipelined stream interface. This enables efficient batched log replication with backpressure support via a bounded channel. Example usage: ```rust use std::pin::pin; use futures::StreamExt; let input = futures::stream::iter(vec![request1, request2, request3]); let mut output = pin!(raft.stream_append(input)); while let Some(result) = output.next().await { match result { Ok(log_id) => println!("Flushed: {:?}", log_id), Err(err) => { println!("Error: {}", err); break; } } } ``` Changes: - Add `Raft::stream_append()` for stream-based AppendEntries processing - Add `StreamAppendError` with `Conflict` and `HigherVote` variants - Add `StreamAppendResult` type alias for stream item results - Add `AppendEntriesResponse::into_stream_result()` conversion method - Add `ProtocolApi::stream_append()` internal implementation - Add integration tests for success, conflict, and higher vote scenarios
1 parent 6b3aa09 commit 7dabf9e

File tree

8 files changed

+404
-6
lines changed

8 files changed

+404
-6
lines changed

openraft/src/raft/api/protocol.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
use std::sync::Arc;
12
use std::time::Duration;
23

4+
use futures::Stream;
35
use openraft_macros::since;
46

57
use crate::LogIdOptionExt;
68
use crate::LogIndexOptionExt;
9+
use crate::OptionalSend;
710
use crate::RaftMetrics;
811
use crate::RaftTypeConfig;
912
use crate::Snapshot;
@@ -20,6 +23,8 @@ use crate::raft::TransferLeaderRequest;
2023
use crate::raft::VoteRequest;
2124
use crate::raft::VoteResponse;
2225
use crate::raft::raft_inner::RaftInner;
26+
use crate::raft::stream_append;
27+
use crate::raft::stream_append::StreamAppendResult;
2328
use crate::type_config::TypeConfigExt;
2429
use crate::type_config::alias::SnapshotDataOf;
2530
use crate::type_config::alias::VoteOf;
@@ -57,16 +62,16 @@ use crate::vote::raft_vote::RaftVoteExt;
5762
/// Remote Leader Node Local Follower Node
5863
/// ```
5964
#[since(version = "0.10.0")]
60-
pub(crate) struct ProtocolApi<'a, C>
65+
pub(crate) struct ProtocolApi<C>
6166
where C: RaftTypeConfig
6267
{
63-
inner: &'a RaftInner<C>,
68+
inner: Arc<RaftInner<C>>,
6469
}
6570

66-
impl<'a, C> ProtocolApi<'a, C>
71+
impl<C> ProtocolApi<C>
6772
where C: RaftTypeConfig
6873
{
69-
pub(in crate::raft) fn new(inner: &'a RaftInner<C>) -> Self {
74+
pub(in crate::raft) fn new(inner: Arc<RaftInner<C>>) -> Self {
7075
Self { inner }
7176
}
7277

@@ -91,6 +96,17 @@ where C: RaftTypeConfig
9196
self.inner.call_core(RaftMsg::AppendEntries { rpc, tx }, rx).await
9297
}
9398

99+
#[since(version = "0.10.0")]
100+
pub(crate) fn stream_append<S>(
101+
self,
102+
stream: S,
103+
) -> impl Stream<Item = StreamAppendResult<C>> + OptionalSend + 'static
104+
where
105+
S: Stream<Item = AppendEntriesRequest<C>> + OptionalSend + 'static,
106+
{
107+
stream_append::stream_append(self.inner, stream)
108+
}
109+
94110
#[since(version = "0.10.0")]
95111
#[tracing::instrument(level = "debug", skip_all)]
96112
pub(crate) async fn get_snapshot(&self) -> Result<Option<Snapshot<C>>, Fatal<C>> {

openraft/src/raft/message/append_entries.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use std::fmt;
33
use crate::RaftTypeConfig;
44
use crate::display_ext::DisplayOptionExt;
55
use crate::display_ext::DisplaySlice;
6+
use crate::raft::StreamAppendError;
7+
use crate::raft::stream_append::StreamAppendResult;
68
use crate::type_config::alias::LogIdOf;
79
use crate::type_config::alias::VoteOf;
810

@@ -115,6 +117,24 @@ where C: RaftTypeConfig
115117
pub fn is_conflict(&self) -> bool {
116118
matches!(*self, AppendEntriesResponse::Conflict)
117119
}
120+
121+
/// Convert this response to a stream append result.
122+
///
123+
/// Arguments:
124+
/// - `prev_log_id`: The prev_log_id from the request, used for Conflict errors.
125+
/// - `last_log_id`: The last_log_id of the sent entries, used for Success.
126+
pub fn into_stream_result(
127+
self,
128+
prev_log_id: Option<LogIdOf<C>>,
129+
last_log_id: Option<LogIdOf<C>>,
130+
) -> StreamAppendResult<C> {
131+
match self {
132+
AppendEntriesResponse::Success => Ok(last_log_id),
133+
AppendEntriesResponse::PartialSuccess(log_id) => Ok(log_id),
134+
AppendEntriesResponse::Conflict => Err(StreamAppendError::Conflict(prev_log_id)),
135+
AppendEntriesResponse::HigherVote(vote) => Err(StreamAppendError::HigherVote(vote)),
136+
}
137+
}
118138
}
119139

120140
impl<C> fmt::Display for AppendEntriesResponse<C>

openraft/src/raft/message/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
66
mod append_entries;
77
mod install_snapshot;
8+
mod stream_append_error;
89
mod transfer_leader;
910
mod vote;
1011

@@ -18,6 +19,7 @@ pub use client_write::ClientWriteResult;
1819
pub use install_snapshot::InstallSnapshotRequest;
1920
pub use install_snapshot::InstallSnapshotResponse;
2021
pub use install_snapshot::SnapshotResponse;
22+
pub use stream_append_error::StreamAppendError;
2123
pub use transfer_leader::TransferLeaderRequest;
2224
pub use vote::VoteRequest;
2325
pub use vote::VoteResponse;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use std::fmt;
2+
3+
use crate::RaftTypeConfig;
4+
use crate::display_ext::DisplayOptionExt;
5+
use crate::type_config::alias::LogIdOf;
6+
use crate::type_config::alias::VoteOf;
7+
8+
/// Error type for stream append entries.
9+
///
10+
/// When this error is returned, the stream is terminated.
11+
#[derive(Debug, Clone, PartialEq, Eq)]
12+
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
13+
pub enum StreamAppendError<C: RaftTypeConfig> {
14+
/// Log conflict at the given prev_log_id.
15+
///
16+
/// The follower's log at this position does not match the leader's.
17+
Conflict(Option<LogIdOf<C>>),
18+
19+
/// Seen a higher vote from another leader.
20+
HigherVote(VoteOf<C>),
21+
}
22+
23+
impl<C> fmt::Display for StreamAppendError<C>
24+
where C: RaftTypeConfig
25+
{
26+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27+
match self {
28+
StreamAppendError::Conflict(log_id) => {
29+
write!(f, "Conflict({})", log_id.display())
30+
}
31+
StreamAppendError::HigherVote(vote) => {
32+
write!(f, "HigherVote({})", vote)
33+
}
34+
}
35+
}
36+
}

openraft/src/raft/mod.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub(crate) mod message;
1717
mod raft_inner;
1818
pub mod responder;
1919
mod runtime_config_handle;
20+
mod stream_append;
2021
pub mod trigger;
2122

2223
use std::any::Any;
@@ -42,11 +43,13 @@ pub use message::ClientWriteResult;
4243
pub use message::InstallSnapshotRequest;
4344
pub use message::InstallSnapshotResponse;
4445
pub use message::SnapshotResponse;
46+
pub use message::StreamAppendError;
4547
pub use message::TransferLeaderRequest;
4648
pub use message::VoteRequest;
4749
pub use message::VoteResponse;
4850
pub use message::WriteRequest;
4951
use openraft_macros::since;
52+
pub use stream_append::StreamAppendResult;
5053
use tracing::Instrument;
5154
use tracing::Level;
5255
use tracing::trace_span;
@@ -655,8 +658,8 @@ where C: RaftTypeConfig
655658
/// - [`ProtocolApi::begin_receiving_snapshot`]
656659
/// - [`ProtocolApi::install_full_snapshot`]
657660
/// - [`ProtocolApi::handle_transfer_leader`]
658-
pub(crate) fn protocol_api(&self) -> ProtocolApi<'_, C> {
659-
ProtocolApi::new(self.inner.as_ref())
661+
pub(crate) fn protocol_api(&self) -> ProtocolApi<C> {
662+
ProtocolApi::new(self.inner.clone())
660663
}
661664

662665
pub(crate) fn app_api(&self) -> AppApi<'_, C> {
@@ -688,6 +691,72 @@ where C: RaftTypeConfig
688691
self.protocol_api().append_entries(rpc).await.into_raft_result()
689692
}
690693

694+
/// Submit a stream of AppendEntries RPCs to this Raft node.
695+
///
696+
/// This is a stream-oriented version of [`Self::append_entries`] with pipelining support.
697+
/// It spawns a background task that reads from the input stream, sends requests to RaftCore,
698+
/// and forwards response receivers to the output stream. Responses are yielded in order.
699+
///
700+
/// ## Pipelining Behavior
701+
///
702+
/// - A background task reads from the input stream and sends to RaftCore
703+
/// - Uses a bounded channel (64 slots) for backpressure
704+
/// - Responses are yielded in order (FIFO) as they complete
705+
///
706+
/// ## Output
707+
///
708+
/// The output stream emits:
709+
/// - `Ok(log_id)` when logs are successfully flushed
710+
/// - `Err(e)` when an error occurs, which terminates the stream
711+
///
712+
/// ## Pinning
713+
///
714+
/// The returned stream is `!Unpin` because it uses async closures internally.
715+
/// You must pin the stream before calling `.next()`:
716+
///
717+
/// ```ignore
718+
/// use std::pin::pin;
719+
///
720+
/// let mut output = pin!(raft.stream_append(input));
721+
/// while let Some(result) = output.next().await { /* ... */ }
722+
/// ```
723+
///
724+
/// Alternatively, use `Box::pin` for heap pinning if the stream needs to be stored or returned:
725+
///
726+
/// ```ignore
727+
/// let mut output = Box::pin(raft.stream_append(input));
728+
/// ```
729+
///
730+
/// # Example
731+
///
732+
/// ```ignore
733+
/// use std::pin::pin;
734+
/// use futures::StreamExt;
735+
///
736+
/// let input_stream = futures::stream::iter(vec![request1, request2, request3]);
737+
/// let mut output_stream = pin!(raft.stream_append(input_stream));
738+
///
739+
/// while let Some(result) = output_stream.next().await {
740+
/// match result {
741+
/// Ok(log_id) => println!("Flushed: {:?}", log_id),
742+
/// Err(err) => {
743+
/// println!("Error: {}", err);
744+
/// break;
745+
/// }
746+
/// }
747+
/// }
748+
/// ```
749+
#[since(version = "0.10.0")]
750+
pub fn stream_append<S>(
751+
&self,
752+
stream: S,
753+
) -> impl futures::Stream<Item = StreamAppendResult<C>> + OptionalSend + 'static
754+
where
755+
S: futures::Stream<Item = AppendEntriesRequest<C>> + OptionalSend + 'static,
756+
{
757+
self.protocol_api().stream_append(stream)
758+
}
759+
691760
/// Submit a VoteRequest (RequestVote in the spec) RPC to this Raft node.
692761
///
693762
/// These RPCs are sent by cluster peers which are in candidate state attempting to gather votes

openraft/src/raft/stream_append.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
//! Stream-based AppendEntries API implementation with pipelining.
2+
3+
use std::sync::Arc;
4+
5+
use futures::Stream;
6+
use futures::StreamExt;
7+
8+
use crate::AsyncRuntime;
9+
use crate::OptionalSend;
10+
use crate::RaftTypeConfig;
11+
use crate::core::raft_msg::RaftMsg;
12+
use crate::entry::RaftEntry;
13+
use crate::raft::AppendEntriesRequest;
14+
use crate::raft::AppendEntriesResponse;
15+
use crate::raft::StreamAppendError;
16+
use crate::raft::raft_inner::RaftInner;
17+
use crate::type_config::alias::LogIdOf;
18+
use crate::type_config::alias::OneshotReceiverOf;
19+
use crate::type_config::async_runtime::MpscReceiver;
20+
use crate::type_config::async_runtime::MpscSender;
21+
use crate::type_config::util::TypeConfigExt;
22+
23+
/// Result type for stream append operations.
24+
pub type StreamAppendResult<C> = Result<Option<LogIdOf<C>>, StreamAppendError<C>>;
25+
26+
const PIPELINE_BUFFER_SIZE: usize = 64;
27+
28+
struct Pending<C: RaftTypeConfig> {
29+
response_rx: OneshotReceiverOf<C, AppendEntriesResponse<C>>,
30+
prev_log_id: Option<LogIdOf<C>>,
31+
last_log_id: Option<LogIdOf<C>>,
32+
}
33+
34+
/// Create a pipelined stream that processes AppendEntries requests.
35+
///
36+
/// Spawns a background task that reads from input, sends to RaftCore,
37+
/// and forwards response receivers. The returned stream awaits responses in order.
38+
pub(crate) fn stream_append<C, S>(
39+
inner: Arc<RaftInner<C>>,
40+
input: S,
41+
) -> impl Stream<Item = StreamAppendResult<C>> + OptionalSend + 'static
42+
where
43+
C: RaftTypeConfig,
44+
S: Stream<Item = AppendEntriesRequest<C>> + OptionalSend + 'static,
45+
{
46+
let (tx, rx) = C::mpsc::<Pending<C>>(PIPELINE_BUFFER_SIZE);
47+
48+
let inner2 = inner.clone();
49+
let _handle = C::AsyncRuntime::spawn(async move {
50+
let inner = inner2;
51+
futures::pin_mut!(input);
52+
53+
while let Some(req) = input.next().await {
54+
let prev = req.prev_log_id.clone();
55+
let last = req.entries.last().map(|e| e.log_id()).or(prev.clone());
56+
let (resp_tx, resp_rx) = C::oneshot();
57+
58+
if inner.send_msg(RaftMsg::AppendEntries { rpc: req, tx: resp_tx }).await.is_err() {
59+
break;
60+
}
61+
let pending = Pending {
62+
response_rx: resp_rx,
63+
prev_log_id: prev,
64+
last_log_id: last,
65+
};
66+
if MpscSender::send(&tx, pending).await.is_err() {
67+
break;
68+
}
69+
}
70+
});
71+
72+
futures::stream::unfold(Some((rx, inner)), |state| async move {
73+
let (mut rx, inner) = state?;
74+
let p: Pending<C> = MpscReceiver::recv(&mut rx).await?;
75+
76+
let resp = inner.recv_msg(p.response_rx).await.ok()?;
77+
78+
let result = resp.into_stream_result(p.prev_log_id, p.last_log_id);
79+
let cont = result.is_ok();
80+
81+
Some((result, if cont { Some((rx, inner)) } else { None }))
82+
})
83+
}

tests/tests/append_entries/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod fixtures;
99

1010
mod t10_conflict_with_empty_entries;
1111
mod t10_see_higher_vote;
12+
mod t10_stream_append;
1213
mod t11_append_conflicts;
1314
mod t11_append_entries_with_bigger_term;
1415
mod t11_append_inconsistent_log;

0 commit comments

Comments
 (0)