Skip to content

Commit 4e1134e

Browse files
Don't allow requests deployments using old protocol anymore (#3877)
* Don't allow requests deployments using old protocol anymore * Apply suggestions from code review Co-authored-by: Till Rohrmann <[email protected]> --------- Co-authored-by: Till Rohrmann <[email protected]>
1 parent cf64694 commit 4e1134e

File tree

11 files changed

+92
-16
lines changed

11 files changed

+92
-16
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
## RT0020
2+
3+
The requested service is exposed by a deployment using a deprecated service protocol version. New requests won't be accepted.
4+
Upgrade the SDK in your service code and register it as a new deployment to continue accepting requests to the requested service.

crates/errors/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ mod helper;
3636

3737
declare_restate_error_codes!(
3838
RT0001, RT0002, RT0003, RT0004, RT0005, RT0006, RT0007, RT0009, RT0010, RT0011, RT0012, RT0013,
39-
RT0014, RT0015, RT0016, RT0017, RT0018, RT0019, META0003, META0004, META0005, META0006,
39+
RT0014, RT0015, RT0016, RT0017, RT0018, RT0019, RT0020, META0003, META0004, META0005, META0006,
4040
META0009, META0010, META0011, META0012, META0013, META0014, META0015, META0016, META0017
4141
);
4242

crates/ingress-http/src/handler/error.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::RequestDispatcherError;
1414
use bytes::Bytes;
1515
use http::{Response, StatusCode, header};
1616
use restate_types::errors::{IdDecodeError, InvocationError};
17+
use restate_types::identifiers::DeploymentId;
1718
use restate_types::schema::invocation_target::InputValidationError;
1819
use serde::Serialize;
1920
use std::string;
@@ -28,6 +29,10 @@ pub(crate) enum HandlerError {
2829
"the service '{0}' exists, but the handler '{1}' was not found, check that the handler exists in the latest registered service version."
2930
)]
3031
ServiceHandlerNotFound(String, String),
32+
#[error(
33+
"the service {0} is exposed by the deprecated deployment {1}. Upgrade the SDK used by {0}."
34+
)]
35+
DeploymentDeprecated(String, DeploymentId),
3136
#[error("invocation not found")]
3237
InvocationNotFound,
3338
#[error(
@@ -128,7 +133,8 @@ impl HandlerError {
128133
| HandlerError::BadWorkflowPath
129134
| HandlerError::InputValidation(_)
130135
| HandlerError::UnsupportedIdempotencyKey
131-
| HandlerError::UnsupportedGetOutput => StatusCode::BAD_REQUEST,
136+
| HandlerError::UnsupportedGetOutput
137+
| HandlerError::DeploymentDeprecated(_, _) => StatusCode::BAD_REQUEST,
132138
HandlerError::DispatcherError(_) => {
133139
// TODO add more distinctions between different dispatcher errors (unavailable, etc)
134140
StatusCode::INTERNAL_SERVER_ERROR

crates/ingress-http/src/handler/service_handler.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use restate_types::invocation::{
3434
SpanRelation, WorkflowHandlerType,
3535
};
3636
use restate_types::schema::invocation_target::{
37-
InvocationTargetMetadata, InvocationTargetResolver,
37+
DeploymentStatus, InvocationTargetMetadata, InvocationTargetResolver,
3838
};
3939
use restate_types::time::MillisSinceEpoch;
4040

@@ -101,6 +101,9 @@ where
101101
handler_name.clone(),
102102
));
103103
};
104+
if let DeploymentStatus::Deprecated(dp_id) = invocation_target_meta.deployment_status {
105+
return Err(HandlerError::DeploymentDeprecated(service_name, dp_id));
106+
}
104107

105108
// Check if Idempotency-Key is available
106109
let idempotency_key = parse_idempotency(req.headers())?;

crates/ingress-kafka/src/dispatcher.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
use std::borrow::Borrow;
12-
use std::sync::Arc;
13-
11+
use anyhow::bail;
1412
use bytes::Bytes;
1513
use opentelemetry::propagation::{Extractor, TextMapPropagator};
1614
use opentelemetry::trace::{Span, SpanContext, TraceContextExt};
1715
use opentelemetry_sdk::propagation::TraceContextPropagator;
16+
use std::borrow::Borrow;
17+
use std::sync::Arc;
1818
use tracing::debug;
1919

2020
use restate_bifrost::Bifrost;
@@ -25,7 +25,7 @@ use restate_types::live;
2525
use restate_types::message::MessageIndex;
2626
use restate_types::partition_table::PartitionTableError;
2727
use restate_types::schema::Schema;
28-
use restate_types::schema::invocation_target::InvocationTargetResolver;
28+
use restate_types::schema::invocation_target::{DeploymentStatus, InvocationTargetResolver};
2929
use restate_types::schema::subscriptions::{EventInvocationTargetTemplate, Sink, Subscription};
3030
use restate_wal_protocol::{Command, Destination, Envelope, Header, Source};
3131

@@ -102,13 +102,21 @@ impl KafkaIngressEvent {
102102
};
103103

104104
// Compute the retention values
105-
let invocation_retention = schema
105+
let target = schema
106106
.resolve_latest_invocation_target(
107107
invocation_target.service_name(),
108108
invocation_target.handler_name(),
109109
)
110-
.ok_or_else(|| anyhow::anyhow!("Service and handler are not registered"))?
111-
.compute_retention(false);
110+
.ok_or_else(|| anyhow::anyhow!("Service and handler are not registered"))?;
111+
112+
if let DeploymentStatus::Deprecated(dp_id) = target.deployment_status {
113+
bail!(
114+
"the service {} is exposed by the deprecated deployment {dp_id}, please upgrade the SDK.",
115+
invocation_target.service_name()
116+
)
117+
}
118+
119+
let invocation_retention = target.compute_retention(false);
112120

113121
// Time to generate invocation id
114122
let invocation_id = InvocationId::generate(&invocation_target, None);

crates/invoker-impl/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,9 @@ pub(crate) enum CommandPreconditionError {
293293
NoStateOperations,
294294
#[error("unsupported entry type, this handler type cannot write state")]
295295
NoWriteStateOperations,
296+
#[error("the service {0} is exposed by the deprecated deployment {1}.")]
297+
#[code(restate_errors::RT0020)]
298+
DeploymentDeprecated(String, DeploymentId),
296299
}
297300

298301
#[derive(Debug)]

crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use restate_types::journal_v2::{
5050
CommandIndex, CommandType, Entry, EntryType, NotificationId, RunCompletion, RunResult, SignalId,
5151
};
5252
use restate_types::schema::deployment::{Deployment, DeploymentType, ProtocolType};
53-
use restate_types::schema::invocation_target::InvocationTargetResolver;
53+
use restate_types::schema::invocation_target::{DeploymentStatus, InvocationTargetResolver};
5454
use restate_types::service_protocol::ServiceProtocolVersion;
5555

5656
use crate::Notification;
@@ -1027,6 +1027,13 @@ fn resolve_call_request(
10271027
)
10281028
})?;
10291029

1030+
if let DeploymentStatus::Deprecated(dp_id) = meta.deployment_status {
1031+
return Err(CommandPreconditionError::DeploymentDeprecated(
1032+
request.service_name.to_string(),
1033+
dp_id,
1034+
));
1035+
}
1036+
10301037
if !request.key.is_empty() && !meta.target_ty.is_keyed() {
10311038
return Err(CommandPreconditionError::UnexpectedKey(
10321039
request.service_name.to_string(),

crates/types/src/schema/invocation_target.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ pub enum OnMaxAttempts {
7272
Kill,
7373
}
7474

75+
#[derive(Debug, Clone, Default)]
76+
pub enum DeploymentStatus {
77+
#[default]
78+
Enabled,
79+
/// Deployment is disabled, and new requests should not be accepted.
80+
Deprecated(DeploymentId),
81+
}
82+
7583
#[derive(Debug, Clone)]
7684
pub struct InvocationTargetMetadata {
7785
pub public: bool,
@@ -84,6 +92,8 @@ pub struct InvocationTargetMetadata {
8492
pub target_ty: InvocationTargetType,
8593
pub input_rules: InputRules,
8694
pub output_rules: OutputRules,
95+
96+
pub deployment_status: DeploymentStatus,
8797
}
8898

8999
impl InvocationTargetMetadata {
@@ -577,6 +587,7 @@ pub mod test_util {
577587
target_ty: invocation_target_type,
578588
input_rules: Default::default(),
579589
output_rules: Default::default(),
590+
deployment_status: DeploymentStatus::Enabled,
580591
}
581592
}
582593
}

crates/types/src/schema/metadata/mod.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,17 @@ use crate::net::metadata::{MetadataContainer, MetadataKind};
3131
use crate::retries::{RetryIter, RetryPolicy};
3232
use crate::schema::deployment::{DeploymentResolver, DeploymentType};
3333
use crate::schema::invocation_target::{
34-
DEFAULT_IDEMPOTENCY_RETENTION, DEFAULT_WORKFLOW_COMPLETION_RETENTION, InputRules,
35-
InvocationAttemptOptions, InvocationTargetMetadata, InvocationTargetResolver, OnMaxAttempts,
36-
OutputRules,
34+
DEFAULT_IDEMPOTENCY_RETENTION, DEFAULT_WORKFLOW_COMPLETION_RETENTION, DeploymentStatus,
35+
InputRules, InvocationAttemptOptions, InvocationTargetMetadata, InvocationTargetResolver,
36+
OnMaxAttempts, OutputRules,
3737
};
3838
use crate::schema::metadata::openapi::ServiceOpenAPI;
3939
use crate::schema::service::{
4040
HandlerRetryPolicyMetadata, ServiceMetadataResolver, ServiceRetryPolicyMetadata,
4141
};
4242
use crate::schema::subscriptions::{ListSubscriptionFilter, Subscription, SubscriptionResolver};
4343
use crate::schema::{deployment, service};
44+
use crate::service_protocol::ServiceProtocolVersion;
4445
use crate::time::MillisSinceEpoch;
4546
use crate::{Version, Versioned, identifiers};
4647

@@ -597,7 +598,8 @@ impl InvocationTargetResolver for Schema {
597598
let handler_name = handler_name.as_ref();
598599

599600
let ActiveServiceRevision {
600-
service_revision, ..
601+
service_revision,
602+
deployment_id,
601603
} = self.active_service_revisions.get(service_name)?;
602604
let handler = service_revision.handlers.get(handler_name)?;
603605

@@ -627,13 +629,32 @@ impl InvocationTargetResolver for Schema {
627629
)
628630
.unwrap_or(Duration::ZERO);
629631

632+
let deployment_status = self
633+
.deployments
634+
.get(deployment_id)
635+
.map(|dp| {
636+
if ServiceProtocolVersion::is_acceptable_for_new_invocations(
637+
*dp.supported_protocol_versions.start(),
638+
*dp.supported_protocol_versions.end(),
639+
) {
640+
DeploymentStatus::Enabled
641+
} else {
642+
DeploymentStatus::Deprecated(dp.id)
643+
}
644+
})
645+
// It should never happen that the deployment doesn't exist,
646+
// this is an invalid schema registry otherwise.
647+
// But let's not panic yet, this will fail later on.
648+
.unwrap_or_default();
649+
630650
Some(InvocationTargetMetadata {
631651
public: handler.public.unwrap_or(service_revision.public),
632652
completion_retention,
633653
journal_retention,
634654
target_ty: handler.target_ty,
635655
input_rules: handler.input_rules.clone(),
636656
output_rules: handler.output_rules.clone(),
657+
deployment_status,
637658
})
638659
}
639660

crates/types/src/service_protocol.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ impl ServiceProtocolVersion {
3737
&& max_version >= i32::from(MIN_DISCOVERABLE_SERVICE_PROTOCOL_VERSION)
3838
}
3939

40+
pub fn is_acceptable_for_new_invocations(min_version: i32, max_version: i32) -> bool {
41+
Self::is_acceptable_for_discovery(min_version, max_version)
42+
}
43+
4044
pub fn is_supported_for_inflight_invocation(&self) -> bool {
4145
MIN_INFLIGHT_SERVICE_PROTOCOL_VERSION <= *self
4246
&& *self <= MAX_INFLIGHT_SERVICE_PROTOCOL_VERSION

0 commit comments

Comments
 (0)