Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ restate-utoipa = { path = "crates/utoipa" }
restate-vqueues = { path = "crates/vqueues" }
restate-wal-protocol = { path = "crates/wal-protocol" }
restate-worker = { path = "crates/worker" }
restate-ingestion-client = { path = "crates/ingestion-client" }

# this workspace-hack package is overridden by a patch below to use workspace-hack subdir when building in this repo
# outside this repo, the crates.io restate-workspace-hack (an empty package) will be used instead
Expand Down
1 change: 1 addition & 0 deletions crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ restate-admin-rest-model = { workspace = true, features = ["schema"] }
restate-bifrost = { workspace = true, features = ["local-loglet", "replicated-loglet"] }
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-ingestion-client = { workspace = true }
restate-metadata-store = { workspace = true }
restate-service-client = { workspace = true }
restate-service-protocol = { workspace = true, features = ["discovery"] }
Expand Down
20 changes: 10 additions & 10 deletions crates/admin/src/rest_api/deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ use serde::Deserialize;
from_type = "MetaApiError",
)
)]
pub async fn create_deployment<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn create_deployment<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Extension(version): Extension<AdminApiVersion>,
#[request_body(required = true)] Json(payload): Json<RegisterDeploymentRequest>,
) -> Result<impl IntoResponse, MetaApiError>
Expand Down Expand Up @@ -188,8 +188,8 @@ where
schema = "std::string::String"
))
)]
pub async fn get_deployment<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn get_deployment<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(deployment_id): Path<DeploymentId>,
) -> Result<Json<DetailedDeploymentResponse>, MetaApiError>
where
Expand All @@ -210,8 +210,8 @@ where
operation_id = "list_deployments",
tags = "deployment"
)]
pub async fn list_deployments<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn list_deployments<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
) -> Json<ListDeploymentsResponse>
where
Metadata: MetadataService,
Expand Down Expand Up @@ -267,8 +267,8 @@ pub struct DeleteDeploymentParams {
from_type = "MetaApiError",
)
)]
pub async fn delete_deployment<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn delete_deployment<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(deployment_id): Path<DeploymentId>,
Query(DeleteDeploymentParams { force }): Query<DeleteDeploymentParams>,
) -> Result<StatusCode, MetaApiError>
Expand Down Expand Up @@ -302,8 +302,8 @@ where
schema = "std::string::String"
))
)]
pub async fn update_deployment<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn update_deployment<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Extension(version): Extension<AdminApiVersion>,
method: Method,
Path(deployment_id): Path<DeploymentId>,
Expand Down
8 changes: 4 additions & 4 deletions crates/admin/src/rest_api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use restate_types::schema::service::HandlerMetadata;
schema = "std::string::String"
))
)]
pub async fn list_service_handlers<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn list_service_handlers<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(service_name): Path<String>,
) -> Result<Json<ListServiceHandlersResponse>, MetaApiError>
where
Expand Down Expand Up @@ -62,8 +62,8 @@ where
)
)
)]
pub async fn get_service_handler<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn get_service_handler<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path((service_name, handler_name)): Path<(String, String)>,
) -> Result<Json<HandlerMetadata>, MetaApiError>
where
Expand Down
138 changes: 20 additions & 118 deletions crates/admin/src/rest_api/invocations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,121 +8,23 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use super::error::*;
use crate::generate_meta_api_error;
use crate::rest_api::create_envelope_header;
use crate::state::AdminServiceState;
use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use okapi_operation::*;
use serde::Deserialize;

use restate_admin_rest_model::invocations::RestartAsNewInvocationResponse;
use restate_types::identifiers::{
DeploymentId, InvocationId, PartitionProcessorRpcRequestId, WithPartitionKey,
};
use restate_types::identifiers::{DeploymentId, InvocationId, PartitionProcessorRpcRequestId};
use restate_types::invocation::client::{
self, CancelInvocationResponse, InvocationClient, KillInvocationResponse,
PauseInvocationResponse, PurgeInvocationResponse, ResumeInvocationResponse,
};
use restate_types::invocation::{InvocationTermination, PurgeInvocationRequest, TerminationFlavor};
use restate_types::journal_v2::EntryIndex;
use restate_wal_protocol::{Command, Envelope};
use serde::Deserialize;
use std::sync::Arc;
use tracing::warn;

#[derive(Debug, Default, Deserialize, JsonSchema)]
pub enum DeletionMode {
#[default]
#[serde(alias = "cancel")]
Cancel,
#[serde(alias = "kill")]
Kill,
#[serde(alias = "purge")]
Purge,
}
#[derive(Debug, Default, Deserialize, JsonSchema)]
pub struct DeleteInvocationParams {
pub mode: Option<DeletionMode>,
}

/// Terminate an invocation
#[openapi(
summary = "Delete an invocation",
deprecated = true,
description = "Use kill_invocation/cancel_invocation/purge_invocation instead.",
operation_id = "delete_invocation",
tags = "invocation",
parameters(
path(
name = "invocation_id",
description = "Invocation identifier.",
schema = "std::string::String"
),
query(
name = "mode",
description = "If cancel, it will gracefully terminate the invocation. \
If kill, it will terminate the invocation with a hard stop. \
If purge, it will only cleanup the response for completed invocations, and leave unaffected an in-flight invocation.",
required = false,
style = "simple",
allow_empty_value = false,
schema = "DeletionMode",
)
),
responses(
ignore_return_type = true,
response(
status = "202",
description = "Accepted",
content = "okapi_operation::Empty",
),
from_type = "MetaApiError",
)
)]
pub async fn delete_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
Path(invocation_id): Path<String>,
Query(DeleteInvocationParams { mode }): Query<DeleteInvocationParams>,
) -> Result<StatusCode, MetaApiError> {
let invocation_id = invocation_id
.parse::<InvocationId>()
.map_err(|e| MetaApiError::InvalidField("invocation_id", e.to_string()))?;

let cmd = match mode.unwrap_or_default() {
DeletionMode::Cancel => Command::TerminateInvocation(InvocationTermination {
invocation_id,
flavor: TerminationFlavor::Cancel,
response_sink: None,
}),
DeletionMode::Kill => Command::TerminateInvocation(InvocationTermination {
invocation_id,
flavor: TerminationFlavor::Kill,
response_sink: None,
}),
DeletionMode::Purge => Command::PurgeInvocation(PurgeInvocationRequest {
invocation_id,
response_sink: None,
}),
};

let partition_key = invocation_id.partition_key();

let result = restate_bifrost::append_to_bifrost(
&state.bifrost,
Arc::new(Envelope::new(create_envelope_header(partition_key), cmd)),
)
.await;

if let Err(err) = result {
warn!("Could not append invocation termination command to Bifrost: {err}");
Err(MetaApiError::Internal(
"Failed sending invocation termination to the cluster.".to_owned(),
))
} else {
Ok(StatusCode::ACCEPTED)
}
}
use super::error::*;
use crate::generate_meta_api_error;
use crate::state::AdminServiceState;

generate_meta_api_error!(KillInvocationError: [InvocationNotFoundError, InvocationClientError, InvalidFieldError, InvocationWasAlreadyCompletedError]);

Expand All @@ -139,8 +41,8 @@ generate_meta_api_error!(KillInvocationError: [InvocationNotFoundError, Invocati
schema = "std::string::String"
))
)]
pub async fn kill_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn kill_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
) -> Result<(), KillInvocationError>
where
Expand Down Expand Up @@ -199,8 +101,8 @@ generate_meta_api_error!(CancelInvocationError: [InvocationNotFoundError, Invoca
from_type = "CancelInvocationError",
)
)]
pub async fn cancel_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn cancel_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
) -> Result<StatusCode, CancelInvocationError>
where
Expand Down Expand Up @@ -241,8 +143,8 @@ generate_meta_api_error!(PurgeInvocationError: [InvocationNotFoundError, Invocat
schema = "std::string::String"
))
)]
pub async fn purge_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn purge_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
) -> Result<(), PurgeInvocationError>
where
Expand Down Expand Up @@ -284,8 +186,8 @@ generate_meta_api_error!(PurgeJournalError: [InvocationNotFoundError, Invocation
schema = "std::string::String"
))
)]
pub async fn purge_journal<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn purge_journal<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
) -> Result<(), PurgeJournalError>
where
Expand Down Expand Up @@ -398,8 +300,8 @@ generate_meta_api_error!(RestartInvocationError: [
),
)
)]
pub async fn restart_as_new_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn restart_as_new_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
Query(RestartAsNewInvocationQueryParams { from, deployment }): Query<
RestartAsNewInvocationQueryParams,
Expand Down Expand Up @@ -510,8 +412,8 @@ generate_meta_api_error!(ResumeInvocationError: [
)
)
)]
pub async fn resume_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn resume_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
Query(ResumeInvocationQueryParams { deployment }): Query<ResumeInvocationQueryParams>,
) -> Result<(), ResumeInvocationError>
Expand Down Expand Up @@ -596,8 +498,8 @@ generate_meta_api_error!(PauseInvocationError: [
from_type = "PauseInvocationError",
)
)]
pub async fn pause_invocation<Metadata, Discovery, Telemetry, Invocations>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations>>,
pub async fn pause_invocation<Metadata, Discovery, Telemetry, Invocations, Transport>(
State(state): State<AdminServiceState<Metadata, Discovery, Telemetry, Invocations, Transport>>,
Path(invocation_id): Path<String>,
) -> Result<StatusCode, PauseInvocationError>
where
Expand Down
Loading