Skip to content

Commit 7dc9605

Browse files
committed
[Ingress] ingress-core crate
- `ingress-core` implements the runtime layer that receives ingress traffic, fans it out to the correct partition, and tracks completion. It exposes: - `Ingress`, enforces inflight budgets, and resolves partition IDs before sending work downstream. - The session subsystem that batches `IngestRecords`, retries connections, and reports commit status to callers. - `ingress-core` only ingests records and notify the caller once the record is "committed" to bifrost by the PP. This makes it useful to implement kafka ingress and other external ingestion
1 parent 3f66500 commit 7dc9605

File tree

9 files changed

+881
-0
lines changed

9 files changed

+881
-0
lines changed

Cargo.lock

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ restate-types = { path = "crates/types" }
8585
restate-utoipa = { path = "crates/utoipa" }
8686
restate-wal-protocol = { path = "crates/wal-protocol" }
8787
restate-worker = { path = "crates/worker" }
88+
restate-ingress-core = { path = "crates/ingress-core" }
8889

8990
# this workspace-hack package is overridden by a patch below to use workspace-hack subdir when building in this repo
9091
# outside this repo, the crates.io restate-workspace-hack (an empty package) will be used instead

crates/ingress-core/Cargo.toml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[package]
2+
name = "restate-ingress-core"
3+
version.workspace = true
4+
authors.workspace = true
5+
edition.workspace = true
6+
rust-version.workspace = true
7+
license.workspace = true
8+
repository.workspace = true
9+
homepage.workspace = true
10+
description.workspace = true
11+
12+
[dependencies]
13+
arc-swap = { workspace = true }
14+
dashmap = { workspace = true }
15+
futures = { workspace = true }
16+
pin-project-lite = { workspace = true }
17+
thiserror = { workspace = true }
18+
tokio = { workspace = true }
19+
tokio-stream = { workspace = true }
20+
tokio-util = { workspace = true }
21+
tracing = { workspace = true }
22+
23+
restate-workspace-hack = { workspace = true }
24+
restate-core = { workspace = true }
25+
restate-types = { workspace = true }
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use tokio::time::{Sleep, sleep};
2+
use tokio_stream::adapters::Fuse;
3+
use tokio_stream::{Stream, StreamExt};
4+
5+
use core::future::Future;
6+
use core::pin::Pin;
7+
use core::task::{Context, Poll, ready};
8+
use pin_project_lite::pin_project;
9+
use std::time::Duration;
10+
11+
// This file is a copy from `tokio_stream` until PR https://github.com/tokio-rs/tokio/pull/7715 is released
12+
13+
pin_project! {
14+
/// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
15+
#[must_use = "streams do nothing unless polled"]
16+
#[derive(Debug)]
17+
pub struct ChunksTimeout<S: Stream> {
18+
#[pin]
19+
stream: Fuse<S>,
20+
#[pin]
21+
deadline: Option<Sleep>,
22+
duration: Duration,
23+
items: Vec<S::Item>,
24+
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
25+
}
26+
}
27+
28+
impl<S: Stream> ChunksTimeout<S> {
29+
pub fn new(stream: S, max_size: usize, duration: Duration) -> Self {
30+
ChunksTimeout {
31+
stream: stream.fuse(),
32+
deadline: None,
33+
duration,
34+
items: Vec::with_capacity(max_size),
35+
cap: max_size,
36+
}
37+
}
38+
/// Drains the buffered items, returning them without waiting for the timeout or capacity limit.
39+
pub fn into_remainder(mut self: Pin<&mut Self>) -> Vec<S::Item> {
40+
let me = self.as_mut().project();
41+
std::mem::take(me.items)
42+
}
43+
}
44+
45+
impl<S: Stream> Stream for ChunksTimeout<S> {
46+
type Item = Vec<S::Item>;
47+
48+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49+
let mut me = self.as_mut().project();
50+
loop {
51+
match me.stream.as_mut().poll_next(cx) {
52+
Poll::Pending => break,
53+
Poll::Ready(Some(item)) => {
54+
if me.items.is_empty() {
55+
me.deadline.set(Some(sleep(*me.duration)));
56+
me.items.reserve_exact(*me.cap);
57+
}
58+
me.items.push(item);
59+
if me.items.len() >= *me.cap {
60+
return Poll::Ready(Some(std::mem::take(me.items)));
61+
}
62+
}
63+
Poll::Ready(None) => {
64+
// Returning Some here is only correct because we fuse the inner stream.
65+
let last = if me.items.is_empty() {
66+
None
67+
} else {
68+
Some(std::mem::take(me.items))
69+
};
70+
71+
return Poll::Ready(last);
72+
}
73+
}
74+
}
75+
76+
if !me.items.is_empty() {
77+
if let Some(deadline) = me.deadline.as_pin_mut() {
78+
ready!(deadline.poll(cx));
79+
}
80+
return Poll::Ready(Some(std::mem::take(me.items)));
81+
}
82+
83+
Poll::Pending
84+
}
85+
86+
fn size_hint(&self) -> (usize, Option<usize>) {
87+
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
88+
let (lower, upper) = self.stream.size_hint();
89+
let lower = (lower / self.cap).saturating_add(chunk_len);
90+
let upper = upper.and_then(|x| x.checked_add(chunk_len));
91+
(lower, upper)
92+
}
93+
}

crates/ingress-core/src/ingress.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use std::{collections::HashMap, sync::Arc};
12+
13+
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
14+
15+
use restate_core::{
16+
network::{Networking, TransportConnect},
17+
partitions::PartitionRouting,
18+
};
19+
use restate_types::{
20+
identifiers::{PartitionId, PartitionKey},
21+
live::Live,
22+
net::ingress::IngestRecord,
23+
partitions::{FindPartition, PartitionTable, PartitionTableError},
24+
};
25+
26+
use crate::{
27+
RecordCommit, SessionOptions,
28+
session::{SessionHandle, SessionManager},
29+
};
30+
31+
/// Errors that can be observed when interacting with the ingress facade.
32+
#[derive(Debug, thiserror::Error)]
33+
pub enum IngestionError {
34+
#[error("Ingress closed")]
35+
Closed,
36+
#[error(transparent)]
37+
PartitionTableError(#[from] PartitionTableError),
38+
}
39+
40+
/// High-level ingress entry point that allocates permits and hands out session handles per partition.
41+
/// Ingress can be cloned and shared across different routines. All users will share the same budget
42+
/// and underlying partition sessions.
43+
#[derive(Clone)]
44+
pub struct Ingress<T> {
45+
manager: SessionManager<T>,
46+
partition_table: Live<PartitionTable>,
47+
// budget for inflight invocations.
48+
// this should be a memory budget but it's
49+
// not possible atm to compute the serialization
50+
// size of an invocation.
51+
permits: Arc<Semaphore>,
52+
53+
// session handles cache just to avoid
54+
// cloning the handle on each ingest request
55+
handles: HashMap<PartitionId, SessionHandle>,
56+
}
57+
58+
impl<T> Ingress<T> {
59+
/// Builds a new ingress facade with the provided networking stack, partition metadata, and
60+
/// budget for inflight records.
61+
pub fn new(
62+
networking: Networking<T>,
63+
partition_table: Live<PartitionTable>,
64+
partition_routing: PartitionRouting,
65+
budget: usize,
66+
opts: Option<SessionOptions>,
67+
) -> Self {
68+
Self {
69+
manager: SessionManager::new(networking, partition_routing, opts),
70+
partition_table,
71+
permits: Arc::new(Semaphore::new(budget)),
72+
handles: HashMap::default(),
73+
}
74+
}
75+
}
76+
77+
impl<T> Ingress<T>
78+
where
79+
T: TransportConnect,
80+
{
81+
/// Reserves capacity to send exactly one record.
82+
pub async fn reserve(&mut self) -> Result<IngressPermit<'_, T>, IngestionError> {
83+
let permit = self
84+
.permits
85+
.clone()
86+
.acquire_owned()
87+
.await
88+
.map_err(|_| IngestionError::Closed)?;
89+
90+
Ok(IngressPermit {
91+
permit,
92+
ingress: self,
93+
})
94+
}
95+
96+
/// Once closed, calls to ingest will return [`IngestionError::Closed`].
97+
/// Inflight records might still get committed.
98+
pub fn close(&self) {
99+
self.permits.close();
100+
self.manager.close();
101+
}
102+
}
103+
104+
/// Permit that owns capacity for a single record ingest against an [`Ingress`] instance.
105+
pub struct IngressPermit<'a, T> {
106+
permit: OwnedSemaphorePermit,
107+
ingress: &'a mut Ingress<T>,
108+
}
109+
110+
impl<'a, T> IngressPermit<'a, T>
111+
where
112+
T: TransportConnect,
113+
{
114+
/// Sends a record to the partition derived from the supplied [`PartitionKey`], consuming the permit.
115+
pub fn ingest(
116+
self,
117+
partition_key: PartitionKey,
118+
record: impl Into<IngestRecord>,
119+
) -> Result<RecordCommit, IngestionError> {
120+
let partition_id = self
121+
.ingress
122+
.partition_table
123+
.pinned()
124+
.find_partition_id(partition_key)?;
125+
126+
let handle = self
127+
.ingress
128+
.handles
129+
.entry(partition_id)
130+
.or_insert_with(|| self.ingress.manager.get(partition_id));
131+
132+
handle
133+
.ingest(self.permit, record.into())
134+
.map_err(|_| IngestionError::Closed)
135+
}
136+
}

crates/ingress-core/src/lib.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
mod chunks_timeout;
12+
mod ingress;
13+
mod session;
14+
15+
pub use ingress::{IngestionError, Ingress, IngressPermit};
16+
pub use session::{CommitError, RecordCommit, SessionOptions};

0 commit comments

Comments
 (0)