Skip to content

Commit f9f6240

Browse files
committed
Use ingestion-client in the Shuffler
Avoid direct writes to bifrost in shuffler by using a dedicated ingestion-client instance.
1 parent af82da1 commit f9f6240

File tree

9 files changed

+334
-196
lines changed

9 files changed

+334
-196
lines changed

crates/ingestion-client/src/chunks_size.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ where
6767
let item_size = (me.size_fn)(&item);
6868

6969
if me.items.is_empty() || *me.size + item_size <= *me.cap {
70+
*me.size += item_size;
7071
me.items.push(item);
7172
} else {
7273
// not empty and adding the item will go over the cap

crates/types/src/config/worker.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use restate_serde_util::NonZeroByteCount;
2020
use restate_time_util::{FriendlyDuration, NonZeroFriendlyDuration};
2121

2222
use super::{CommonOptions, ObjectStoreOptions, RocksDbOptions, RocksDbOptionsBuilder};
23+
use crate::config::IngestionOptions;
2324
use crate::identifiers::PartitionId;
2425
use crate::rate::Rate;
2526
use crate::retries::RetryPolicy;
@@ -96,6 +97,12 @@ pub struct WorkerOptions {
9697
#[cfg_attr(feature = "schemars", schemars(skip))]
9798
#[serde(default, skip_serializing_if = "FriendlyDuration::is_zero")]
9899
trim_delay_interval: FriendlyDuration,
100+
101+
/// # Worker Ingestion Options
102+
///
103+
/// Settings for the shared ingestion client used by all workers to
104+
/// manage record ingestion across partitions.
105+
pub ingestion_options: IngestionOptions,
99106
}
100107

101108
impl WorkerOptions {
@@ -132,6 +139,7 @@ impl Default for WorkerOptions {
132139
snapshots: SnapshotsOptions::default(),
133140
trim_delay_interval: FriendlyDuration::ZERO,
134141
durability_mode: None,
142+
ingestion_options: Default::default(),
135143
}
136144
}
137145
}

crates/worker/src/lib.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ mod subscription_integration;
2222
use std::sync::Arc;
2323

2424
use codederror::CodedError;
25+
use restate_core::network::Swimlane;
26+
use restate_ingestion_client::SessionOptions;
27+
use restate_wal_protocol::Envelope;
2528
use tracing::info;
2629

2730
use restate_bifrost::Bifrost;
@@ -52,7 +55,6 @@ use restate_types::health::HealthStatus;
5255
use restate_types::partitions::state::PartitionReplicaSetStates;
5356
use restate_types::protobuf::common::WorkerStatus;
5457
use restate_types::schema::subscriptions::SubscriptionResolver;
55-
use restate_wal_protocol::Envelope;
5658

5759
use crate::partition::invoker_storage_reader::InvokerStorageReader;
5860
use crate::partition_processor_manager::PartitionProcessorManager;
@@ -98,7 +100,7 @@ pub struct Worker<T> {
98100
datafusion_remote_scanner: RemoteQueryScannerServer,
99101
ingress_kafka: IngressKafkaService<T>,
100102
subscription_controller_handle: SubscriptionControllerHandle,
101-
partition_processor_manager: PartitionProcessorManager,
103+
partition_processor_manager: PartitionProcessorManager<T>,
102104
}
103105

104106
impl<T> Worker<T>
@@ -142,6 +144,32 @@ where
142144
)));
143145
}
144146

147+
// A dedicated ingestion client for PPM that uses
148+
// BifrostData swimlane
149+
let ppm_ingestion_client = IngestionClient::new(
150+
networking.clone(),
151+
Metadata::with_current(|m| m.updateable_partition_table()),
152+
partition_routing.clone(),
153+
config
154+
.worker
155+
.ingestion_options
156+
.inflight_memory_budget
157+
.as_non_zero_usize(),
158+
Some(SessionOptions {
159+
batch_size: config
160+
.worker
161+
.ingestion_options
162+
.request_batch_size
163+
.as_usize(),
164+
connection_retry_policy: config
165+
.worker
166+
.ingestion_options
167+
.connection_retry_policy
168+
.clone(),
169+
swimlane: Swimlane::BifrostData,
170+
}),
171+
);
172+
145173
let partition_processor_manager = PartitionProcessorManager::new(
146174
health_status,
147175
Configuration::live(),
@@ -156,6 +184,7 @@ where
156184
)
157185
.await
158186
.map_err(BuildError::SnapshotRepository)?,
187+
ppm_ingestion_client,
159188
);
160189

161190
let remote_scanner_manager = RemoteScannerManager::new(

crates/worker/src/partition/leadership/mod.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ use tokio_stream::wrappers::ReceiverStream;
2525
use tracing::{debug, instrument, warn};
2626

2727
use restate_bifrost::Bifrost;
28-
use restate_core::network::{Oneshot, Reciprocal};
28+
use restate_core::network::{Oneshot, Reciprocal, TransportConnect};
2929
use restate_core::{ShutdownError, TaskCenter, TaskKind, my_node_id};
3030
use restate_errors::NotRunningError;
31+
use restate_ingestion_client::IngestionClient;
3132
use restate_invoker_api::InvokeInputJournal;
3233
use restate_invoker_api::capacity::InvokerCapacity;
3334
use restate_partition_store::PartitionStore;
@@ -59,9 +60,9 @@ use restate_types::retries::with_jitter;
5960
use restate_types::schema::Schema;
6061
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
6162
use restate_vqueues::{SchedulerService, VQueuesMeta, VQueuesMetaMut};
62-
use restate_wal_protocol::Command;
6363
use restate_wal_protocol::control::{AnnounceLeader, PartitionDurability};
6464
use restate_wal_protocol::timer::TimerKeyValue;
65+
use restate_wal_protocol::{Command, Envelope};
6566

6667
use crate::partition::cleaner::{self, Cleaner};
6768
use crate::partition::invoker_storage_reader::InvokerStorageReader;
@@ -156,26 +157,29 @@ impl State {
156157
}
157158
}
158159

159-
pub(crate) struct LeadershipState<I> {
160+
pub(crate) struct LeadershipState<T, I> {
160161
state: State,
161162
last_seen_leader_epoch: Option<LeaderEpoch>,
162163

163164
partition: Arc<Partition>,
164165
invoker_tx: I,
166+
ingestion_client: IngestionClient<T, Envelope>,
165167
invoker_capacity: InvokerCapacity,
166168
bifrost: Bifrost,
167169
trim_queue: TrimQueue,
168170
}
169171

170-
impl<I> LeadershipState<I>
172+
impl<T, I> LeadershipState<T, I>
171173
where
172174
I: restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>>,
175+
T: TransportConnect,
173176
{
174177
#[allow(clippy::too_many_arguments)]
175178
pub(crate) fn new(
176179
partition: Arc<Partition>,
177180
invoker_tx: I,
178181
invoker_capacity: InvokerCapacity,
182+
ingestion_client: IngestionClient<T, Envelope>,
179183
bifrost: Bifrost,
180184
last_seen_leader_epoch: Option<LeaderEpoch>,
181185
trim_queue: TrimQueue,
@@ -184,6 +188,7 @@ where
184188
state: State::Follower,
185189
partition,
186190
invoker_tx,
191+
ingestion_client,
187192
invoker_capacity,
188193
bifrost,
189194
last_seen_leader_epoch,
@@ -411,7 +416,7 @@ where
411416
OutboxReader::from(partition_store.clone()),
412417
shuffle_tx,
413418
config.worker.internal_queue_length(),
414-
self.bifrost.clone(),
419+
self.ingestion_client.clone(),
415420
);
416421

417422
let shuffle_hint_tx = shuffle.create_hint_sender();
@@ -598,7 +603,7 @@ where
598603
}
599604
}
600605

601-
impl<I> LeadershipState<I> {
606+
impl<T, I> LeadershipState<T, I> {
602607
pub async fn handle_rpc_proposal_command(
603608
&mut self,
604609
request_id: PartitionProcessorRpcRequestId,
@@ -718,7 +723,9 @@ mod tests {
718723
use crate::partition::leadership::{LeadershipState, State};
719724
use assert2::let_assert;
720725
use restate_bifrost::Bifrost;
726+
use restate_core::partitions::PartitionRouting;
721727
use restate_core::{TaskCenter, TestCoreEnv};
728+
use restate_ingestion_client::IngestionClient;
722729
use restate_invoker_api::capacity::InvokerCapacity;
723730
use restate_invoker_api::test_util::MockInvokerHandle;
724731
use restate_partition_store::PartitionStoreManager;
@@ -732,6 +739,7 @@ mod tests {
732739
use restate_vqueues::VQueuesMetaMut;
733740
use restate_wal_protocol::control::AnnounceLeader;
734741
use restate_wal_protocol::{Command, Envelope};
742+
use std::num::NonZeroUsize;
735743
use std::ops::RangeInclusive;
736744
use std::sync::Arc;
737745
use test_log::test;
@@ -752,11 +760,20 @@ mod tests {
752760

753761
let partition_store_manager = PartitionStoreManager::create().await?;
754762

763+
let ingress = IngestionClient::new(
764+
env.networking.clone(),
765+
env.metadata.updateable_partition_table(),
766+
PartitionRouting::new(replica_set_states.clone(), TaskCenter::current()),
767+
NonZeroUsize::new(10 * 1024 * 1024).unwrap(),
768+
None,
769+
);
770+
755771
let invoker_tx = MockInvokerHandle::default();
756772
let mut state = LeadershipState::new(
757773
Arc::new(PARTITION),
758774
invoker_tx,
759775
InvokerCapacity::new_unlimited(),
776+
ingress,
760777
bifrost.clone(),
761778
None,
762779
TrimQueue::default(),

crates/worker/src/partition/mod.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@ use tracing::{Span, debug, error, info, instrument, trace, warn};
3131

3232
use restate_bifrost::loglet::FindTailOptions;
3333
use restate_bifrost::{Bifrost, LogEntry, MaybeRecord};
34-
use restate_core::network::{Incoming, Oneshot, Reciprocal, Rpc, ServiceMessage, Verdict};
34+
use restate_core::network::{
35+
Incoming, Oneshot, Reciprocal, Rpc, ServiceMessage, TransportConnect, Verdict,
36+
};
3537
use restate_core::{Metadata, ShutdownError, cancellation_watcher, my_node_id};
38+
use restate_ingestion_client::IngestionClient;
3639
use restate_invoker_api::capacity::InvokerCapacity;
3740
use restate_partition_store::{PartitionStore, PartitionStoreTransaction};
3841
use restate_storage_api::deduplication_table::{
@@ -115,12 +118,16 @@ where
115118
}
116119
}
117120

118-
pub async fn build(
121+
pub async fn build<T>(
119122
self,
120123
bifrost: Bifrost,
124+
ingestion_client: IngestionClient<T, Envelope>,
121125
mut partition_store: PartitionStore,
122126
replica_set_states: PartitionReplicaSetStates,
123-
) -> Result<PartitionProcessor<InvokerInputSender>, state_machine::Error> {
127+
) -> Result<PartitionProcessor<T, InvokerInputSender>, state_machine::Error>
128+
where
129+
T: TransportConnect,
130+
{
124131
let PartitionProcessorBuilder {
125132
invoker_tx,
126133
target_leader_state_rx,
@@ -165,6 +172,7 @@ where
165172
Arc::clone(partition_store.partition()),
166173
invoker_tx,
167174
invoker_capacity,
175+
ingestion_client,
168176
bifrost.clone(),
169177
last_seen_leader_epoch,
170178
trim_queue.clone(),
@@ -218,9 +226,9 @@ where
218226
}
219227
}
220228

221-
pub struct PartitionProcessor<InvokerSender> {
229+
pub struct PartitionProcessor<T, InvokerSender> {
222230
partition_id_str: SharedString,
223-
leadership_state: LeadershipState<InvokerSender>,
231+
leadership_state: LeadershipState<T, InvokerSender>,
224232
state_machine: StateMachine,
225233
bifrost: Bifrost,
226234
target_leader_state_rx: watch::Receiver<TargetLeaderState>,
@@ -284,8 +292,9 @@ struct LsnEnvelope {
284292
pub envelope: Arc<Envelope>,
285293
}
286294

287-
impl<InvokerSender> PartitionProcessor<InvokerSender>
295+
impl<T, InvokerSender> PartitionProcessor<T, InvokerSender>
288296
where
297+
T: TransportConnect,
289298
InvokerSender: restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>> + Clone,
290299
{
291300
#[instrument(

crates/worker/src/partition/rpc/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ mod resume_invocation;
2222

2323
use crate::partition;
2424
use crate::partition::leadership::LeadershipState;
25-
use restate_core::network::{Oneshot, Reciprocal};
25+
use restate_core::network::{Oneshot, Reciprocal, TransportConnect};
2626
use restate_invoker_api::InvokerHandle;
2727
use restate_storage_api::idempotency_table::ReadOnlyIdempotencyTable;
2828
use restate_storage_api::invocation_status_table::ReadInvocationStatusTable;
@@ -71,13 +71,14 @@ pub(super) trait Actuator {
7171
);
7272
}
7373

74-
impl<
74+
impl<T, I> Actuator for LeadershipState<T, I>
75+
where
76+
T: TransportConnect,
7577
I: InvokerHandle<
7678
partition::invoker_storage_reader::InvokerStorageReader<
7779
restate_partition_store::PartitionStore,
7880
>,
7981
>,
80-
> Actuator for LeadershipState<I>
8182
{
8283
async fn self_propose_and_respond_asynchronously<O: Into<PartitionProcessorRpcResponse>>(
8384
&mut self,

0 commit comments

Comments
 (0)