Skip to content

Commit 819226a

Browse files
committed
[wip] shuffler with ingress client
1 parent cec1475 commit 819226a

File tree

7 files changed

+179
-82
lines changed

7 files changed

+179
-82
lines changed

crates/worker/src/lib.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ mod subscription_controller;
2020
mod subscription_integration;
2121

2222
use std::sync::Arc;
23+
use std::time::Duration;
2324

2425
use codederror::CodedError;
2526
use tracing::info;
@@ -35,6 +36,7 @@ use restate_core::worker_api::ProcessorsManagerHandle;
3536
use restate_core::{Metadata, TaskKind};
3637
use restate_core::{MetadataWriter, TaskCenter};
3738
use restate_ingress_client::IngressClient;
39+
use restate_ingress_client::SessionOptions;
3840
use restate_ingress_kafka::Service as IngressKafkaService;
3941
use restate_invoker_impl::InvokerHandle as InvokerChannelServiceHandle;
4042
use restate_partition_store::snapshots::SnapshotRepository;
@@ -97,7 +99,7 @@ pub struct Worker<T> {
9799
datafusion_remote_scanner: RemoteQueryScannerServer,
98100
ingress_kafka: IngressKafkaService<T>,
99101
subscription_controller_handle: SubscriptionControllerHandle,
100-
partition_processor_manager: PartitionProcessorManager,
102+
partition_processor_manager: PartitionProcessorManager<T>,
101103
}
102104

103105
impl<T> Worker<T>
@@ -140,6 +142,20 @@ where
140142
)));
141143
}
142144

145+
// This instance of ingress client is dedicated for the PPM.
146+
// so it has separate budget AND batch_timeout (set to zero)
147+
// to optimize for latency.
148+
let ppm_ingress = IngressClient::new(
149+
networking.clone(),
150+
Metadata::with_current(|m| m.updateable_partition_table()),
151+
partition_routing.clone(),
152+
1000,
153+
Some(SessionOptions {
154+
batch_timeout: Duration::ZERO,
155+
..Default::default()
156+
}),
157+
);
158+
143159
let partition_processor_manager = PartitionProcessorManager::new(
144160
health_status,
145161
Configuration::live(),
@@ -154,6 +170,7 @@ where
154170
)
155171
.await
156172
.map_err(BuildError::SnapshotRepository)?,
173+
ppm_ingress,
157174
);
158175

159176
let remote_scanner_manager = RemoteScannerManager::new(

crates/worker/src/partition/leadership/mod.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ use tokio_stream::wrappers::ReceiverStream;
2626
use tracing::{debug, instrument, warn};
2727

2828
use restate_bifrost::Bifrost;
29-
use restate_core::network::{Oneshot, Reciprocal};
29+
use restate_core::network::{Oneshot, Reciprocal, TransportConnect};
3030
use restate_core::{ShutdownError, TaskCenter, TaskKind, my_node_id};
3131
use restate_errors::NotRunningError;
32+
use restate_ingress_client::IngressClient;
3233
use restate_invoker_api::InvokeInputJournal;
3334
use restate_partition_store::PartitionStore;
3435
use restate_storage_api::deduplication_table::EpochSequenceNumber;
@@ -151,25 +152,28 @@ impl State {
151152
}
152153
}
153154

154-
pub(crate) struct LeadershipState<I> {
155+
pub(crate) struct LeadershipState<T, I> {
155156
state: State,
156157
last_seen_leader_epoch: Option<LeaderEpoch>,
157158

158159
partition: Arc<Partition>,
159160
invoker_tx: I,
161+
ingress: IngressClient<T>,
160162
bifrost: Bifrost,
161163
#[allow(unused)]
162164
trim_queue: TrimQueue,
163165
}
164166

165-
impl<I> LeadershipState<I>
167+
impl<T, I> LeadershipState<T, I>
166168
where
167169
I: restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>>,
170+
T: TransportConnect,
168171
{
169172
#[allow(clippy::too_many_arguments)]
170173
pub(crate) fn new(
171174
partition: Arc<Partition>,
172175
invoker_tx: I,
176+
ingress: IngressClient<T>,
173177
bifrost: Bifrost,
174178
last_seen_leader_epoch: Option<LeaderEpoch>,
175179
trim_queue: TrimQueue,
@@ -178,6 +182,7 @@ where
178182
state: State::Follower,
179183
partition,
180184
invoker_tx,
185+
ingress,
181186
bifrost,
182187
last_seen_leader_epoch,
183188
trim_queue,
@@ -374,7 +379,7 @@ where
374379
OutboxReader::from(partition_store.clone()),
375380
shuffle_tx,
376381
config.worker.internal_queue_length(),
377-
self.bifrost.clone(),
382+
self.ingress.clone(),
378383
);
379384

380385
let shuffle_hint_tx = shuffle.create_hint_sender();
@@ -562,7 +567,7 @@ where
562567
}
563568
}
564569

565-
impl<I> LeadershipState<I> {
570+
impl<T, I> LeadershipState<T, I> {
566571
pub async fn handle_rpc_proposal_command(
567572
&mut self,
568573
request_id: PartitionProcessorRpcRequestId,
@@ -682,7 +687,9 @@ mod tests {
682687
use crate::partition::leadership::{LeadershipState, State};
683688
use assert2::let_assert;
684689
use restate_bifrost::Bifrost;
690+
use restate_core::partitions::PartitionRouting;
685691
use restate_core::{TaskCenter, TestCoreEnv};
692+
use restate_ingress_client::IngressClient;
686693
use restate_invoker_api::test_util::MockInvokerHandle;
687694
use restate_partition_store::PartitionStoreManager;
688695
use restate_rocksdb::RocksDbManager;
@@ -714,10 +721,19 @@ mod tests {
714721

715722
let partition_store_manager = PartitionStoreManager::create().await?;
716723

724+
let ingress = IngressClient::new(
725+
env.networking.clone(),
726+
env.metadata.updateable_partition_table(),
727+
PartitionRouting::new(replica_set_states.clone(), TaskCenter::current()),
728+
1000,
729+
None,
730+
);
731+
717732
let invoker_tx = MockInvokerHandle::default();
718733
let mut state = LeadershipState::new(
719734
Arc::new(PARTITION),
720735
invoker_tx,
736+
ingress,
721737
bifrost.clone(),
722738
None,
723739
TrimQueue::default(),

crates/worker/src/partition/mod.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@ use tracing::{Span, debug, error, info, instrument, trace, warn};
3131

3232
use restate_bifrost::loglet::FindTailOptions;
3333
use restate_bifrost::{Bifrost, LogEntry, MaybeRecord};
34-
use restate_core::network::{Incoming, Oneshot, Reciprocal, Rpc, ServiceMessage, Verdict};
34+
use restate_core::network::{
35+
Incoming, Oneshot, Reciprocal, Rpc, ServiceMessage, TransportConnect, Verdict,
36+
};
3537
use restate_core::{Metadata, ShutdownError, cancellation_watcher, my_node_id};
38+
use restate_ingress_client::IngressClient;
3639
use restate_partition_store::{PartitionStore, PartitionStoreTransaction};
3740
use restate_storage_api::deduplication_table::{
3841
DedupInformation, DedupSequenceNumber, ProducerId, ReadDeduplicationTable,
@@ -107,12 +110,16 @@ where
107110
}
108111
}
109112

110-
pub async fn build(
113+
pub async fn build<T>(
111114
self,
112115
bifrost: Bifrost,
116+
ingress: IngressClient<T>,
113117
mut partition_store: PartitionStore,
114118
replica_set_states: PartitionReplicaSetStates,
115-
) -> Result<PartitionProcessor<InvokerInputSender>, state_machine::Error> {
119+
) -> Result<PartitionProcessor<T, InvokerInputSender>, state_machine::Error>
120+
where
121+
T: TransportConnect,
122+
{
116123
let PartitionProcessorBuilder {
117124
invoker_tx,
118125
target_leader_state_rx,
@@ -155,6 +162,7 @@ where
155162
let leadership_state = LeadershipState::new(
156163
Arc::clone(partition_store.partition()),
157164
invoker_tx,
165+
ingress,
158166
bifrost.clone(),
159167
last_seen_leader_epoch,
160168
trim_queue.clone(),
@@ -208,9 +216,9 @@ where
208216
}
209217
}
210218

211-
pub struct PartitionProcessor<InvokerSender> {
219+
pub struct PartitionProcessor<T, InvokerSender> {
212220
partition_id_str: SharedString,
213-
leadership_state: LeadershipState<InvokerSender>,
221+
leadership_state: LeadershipState<T, InvokerSender>,
214222
state_machine: StateMachine,
215223
bifrost: Bifrost,
216224
target_leader_state_rx: watch::Receiver<TargetLeaderState>,
@@ -274,8 +282,9 @@ struct LsnEnvelope {
274282
pub envelope: Arc<Envelope>,
275283
}
276284

277-
impl<InvokerSender> PartitionProcessor<InvokerSender>
285+
impl<T, InvokerSender> PartitionProcessor<T, InvokerSender>
278286
where
287+
T: TransportConnect,
279288
InvokerSender: restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>> + Clone,
280289
{
281290
#[instrument(

crates/worker/src/partition/rpc/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ mod resume_invocation;
2222

2323
use crate::partition;
2424
use crate::partition::leadership::LeadershipState;
25-
use restate_core::network::{Oneshot, Reciprocal};
25+
use restate_core::network::{Oneshot, Reciprocal, TransportConnect};
2626
use restate_invoker_api::InvokerHandle;
2727
use restate_storage_api::idempotency_table::ReadOnlyIdempotencyTable;
2828
use restate_storage_api::invocation_status_table::ReadInvocationStatusTable;
@@ -71,13 +71,14 @@ pub(super) trait Actuator {
7171
);
7272
}
7373

74-
impl<
74+
impl<T, I> Actuator for LeadershipState<T, I>
75+
where
76+
T: TransportConnect,
7577
I: InvokerHandle<
7678
partition::invoker_storage_reader::InvokerStorageReader<
7779
restate_partition_store::PartitionStore,
7880
>,
7981
>,
80-
> Actuator for LeadershipState<I>
8182
{
8283
async fn self_propose_and_respond_asynchronously<O: Into<PartitionProcessorRpcResponse>>(
8384
&mut self,

0 commit comments

Comments
 (0)