Skip to content

Commit 6fcdd50

Browse files
committed
[Ingress] ingress-client crate
- `ingress-client` implements the runtime layer that receives ingress traffic, fans it out to the correct partition, and tracks completion. It exposes: - `Ingress`, enforces inflight budgets, and resolves partition IDs before sending work downstream. - The session subsystem that batches `IngestRecords`, retries connections, and reports commit status to callers. - `ingress-core` only ingests records and notify the caller once the record is "committed" to bifrost by the PP. This makes it useful to implement kafka ingress and other external ingestion
1 parent cae4c4d commit 6fcdd50

File tree

9 files changed

+1207
-0
lines changed

9 files changed

+1207
-0
lines changed

Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ restate-types = { path = "crates/types" }
8585
restate-utoipa = { path = "crates/utoipa" }
8686
restate-wal-protocol = { path = "crates/wal-protocol" }
8787
restate-worker = { path = "crates/worker" }
88+
restate-ingress-client = { path = "crates/ingress-client" }
8889

8990
# this workspace-hack package is overridden by a patch below to use workspace-hack subdir when building in this repo
9091
# outside this repo, the crates.io restate-workspace-hack (an empty package) will be used instead

crates/ingress-client/Cargo.toml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
[package]
2+
name = "restate-ingress-client"
3+
version.workspace = true
4+
authors.workspace = true
5+
edition.workspace = true
6+
rust-version.workspace = true
7+
license.workspace = true
8+
publish = false
9+
10+
[dependencies]
11+
arc-swap = { workspace = true }
12+
dashmap = { workspace = true }
13+
futures = { workspace = true }
14+
pin-project-lite = { workspace = true }
15+
thiserror = { workspace = true }
16+
tokio = { workspace = true }
17+
tokio-stream = { workspace = true }
18+
tokio-util = { workspace = true }
19+
tracing = { workspace = true }
20+
21+
restate-workspace-hack = { workspace = true }
22+
restate-core = { workspace = true }
23+
restate-types = { workspace = true }
24+
25+
[dev-dependencies]
26+
restate-core = { workspace = true, features = ["test-util"] }
27+
test-log = { workspace = true }
28+
bytes = { workspace = true }
29+
googletest = { workspace = true }
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use tokio::time::{Sleep, sleep};
2+
use tokio_stream::adapters::Fuse;
3+
use tokio_stream::{Stream, StreamExt};
4+
5+
use core::future::Future;
6+
use core::pin::Pin;
7+
use core::task::{Context, Poll, ready};
8+
use pin_project_lite::pin_project;
9+
use std::time::Duration;
10+
11+
// This file is a copy from `tokio_stream` until PR https://github.com/tokio-rs/tokio/pull/7715 is released
12+
13+
pin_project! {
14+
/// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
15+
#[must_use = "streams do nothing unless polled"]
16+
#[derive(Debug)]
17+
pub struct ChunksTimeout<S: Stream> {
18+
#[pin]
19+
stream: Fuse<S>,
20+
#[pin]
21+
deadline: Option<Sleep>,
22+
duration: Duration,
23+
items: Vec<S::Item>,
24+
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
25+
}
26+
}
27+
28+
impl<S: Stream> ChunksTimeout<S> {
29+
pub fn new(stream: S, max_size: usize, duration: Duration) -> Self {
30+
ChunksTimeout {
31+
stream: stream.fuse(),
32+
deadline: None,
33+
duration,
34+
items: Vec::with_capacity(max_size),
35+
cap: max_size,
36+
}
37+
}
38+
/// Drains the buffered items, returning them without waiting for the timeout or capacity limit.
39+
pub fn into_remainder(mut self: Pin<&mut Self>) -> Vec<S::Item> {
40+
let me = self.as_mut().project();
41+
std::mem::take(me.items)
42+
}
43+
}
44+
45+
impl<S: Stream> Stream for ChunksTimeout<S> {
46+
type Item = Vec<S::Item>;
47+
48+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49+
let mut me = self.as_mut().project();
50+
loop {
51+
match me.stream.as_mut().poll_next(cx) {
52+
Poll::Pending => break,
53+
Poll::Ready(Some(item)) => {
54+
if me.items.is_empty() {
55+
me.deadline.set(Some(sleep(*me.duration)));
56+
me.items.reserve_exact(*me.cap);
57+
}
58+
me.items.push(item);
59+
if me.items.len() >= *me.cap {
60+
return Poll::Ready(Some(std::mem::take(me.items)));
61+
}
62+
}
63+
Poll::Ready(None) => {
64+
// Returning Some here is only correct because we fuse the inner stream.
65+
let last = if me.items.is_empty() {
66+
None
67+
} else {
68+
Some(std::mem::take(me.items))
69+
};
70+
71+
return Poll::Ready(last);
72+
}
73+
}
74+
}
75+
76+
if !me.items.is_empty() {
77+
if let Some(deadline) = me.deadline.as_pin_mut() {
78+
ready!(deadline.poll(cx));
79+
}
80+
return Poll::Ready(Some(std::mem::take(me.items)));
81+
}
82+
83+
Poll::Pending
84+
}
85+
86+
fn size_hint(&self) -> (usize, Option<usize>) {
87+
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
88+
let (lower, upper) = self.stream.size_hint();
89+
let lower = (lower / self.cap).saturating_add(chunk_len);
90+
let upper = upper.and_then(|x| x.checked_add(chunk_len));
91+
(lower, upper)
92+
}
93+
}

0 commit comments

Comments
 (0)