From 941d45f685f5cfa4f31718fbdd7d6f54c7bcd006 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 4 Jun 2026 14:46:18 -0400 Subject: [PATCH 1/3] upgrade on CAN --- .cargo/config.toml | 2 +- .../integ_tests/worker_versioning_tests.rs | 479 +++++++++++++++++- .../workflow_tests/continue_as_new.rs | 12 +- crates/sdk/src/lib.rs | 1 + crates/workflow/src/workflow_context.rs | 48 +- .../workflow/src/workflow_context/options.rs | 11 +- 6 files changed, 542 insertions(+), 11 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 7f5daf4df..4bb521b75 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,6 +1,6 @@ [env] # This temporarily overrides the version of the CLI used for integration tests, locally and in CI -# CLI_VERSION_OVERRIDE = "v1.6.3-serverless" +CLI_VERSION_OVERRIDE = "v1.7.0" [alias] # Not sure why --all-features doesn't work diff --git a/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs b/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs index 53fd5fa0c..ebb40bc5a 100644 --- a/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs @@ -1,6 +1,8 @@ use crate::common::{CoreWfStarter, activity_functions::StdActivities, eventually}; use std::time::Duration; -use temporalio_client::{NamespacedClient, WorkflowStartOptions, grpc::WorkflowService}; +use temporalio_client::{ + Client, NamespacedClient, WorkflowSignalOptions, WorkflowStartOptions, grpc::WorkflowService, +}; use temporalio_common::{ protos::{ coresdk::{ @@ -8,17 +10,22 @@ use temporalio_common::{ workflow_completion::WorkflowActivationCompletion, }, temporal::api::{ - enums::v1::VersioningBehavior, + common::v1::WorkflowExecution, + enums::v1::{RoutingConfigUpdateState, VersioningBehavior}, history::v1::history_event::Attributes, workflowservice::v1::{ - DescribeWorkerDeploymentRequest, SetWorkerDeploymentCurrentVersionRequest, + DescribeWorkerDeploymentRequest, DescribeWorkflowExecutionRequest, + SetWorkerDeploymentCurrentVersionRequest, SetWorkerDeploymentRampingVersionRequest, }, }, }, worker::{WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerTaskTypes}, }; use temporalio_macros::{workflow, workflow_methods}; -use temporalio_sdk::{ActivityOptions, WorkflowContext, WorkflowResult}; +use temporalio_sdk::{ + ActivityOptions, ContinueAsNewOptions, ContinueAsNewVersioningBehavior, SyncWorkflowContext, + WorkflowContext, WorkflowResult, +}; use temporalio_sdk_core::test_help::WorkerTestHelpers; use tokio::join; use tonic::IntoRequest; @@ -290,6 +297,390 @@ async fn versioning_off_with_custom_build_id() { ); } +#[tokio::test] +async fn continue_as_new_auto_upgrade_uses_current_deployment_version() { + let wf_type = "continue_as_new_auto_upgrade_uses_current_deployment_version"; + let mut starter = CoreWfStarter::new(wf_type); + let deploy_name = format!("deployment-{}", starter.get_task_queue()); + let v1 = WorkerDeploymentVersion { + deployment_name: deploy_name.clone(), + build_id: "1.0".to_string(), + }; + let v2 = WorkerDeploymentVersion { + deployment_name: deploy_name.clone(), + build_id: "2.0".to_string(), + }; + starter.sdk_config.deployment_options = versioned_worker_options(v1.clone()); + starter.sdk_config.task_types = WorkerTaskTypes::workflow_only(); + let mut worker1 = starter.worker().await; + worker1 + .register_workflow::() + .unwrap(); + + let mut starter2 = starter.clone_no_worker(); + starter2.sdk_config.deployment_options = versioned_worker_options(v2.clone()); + starter2.sdk_config.task_types = WorkerTaskTypes::workflow_only(); + let mut worker2 = starter2.worker().await; + worker2 + .register_workflow::() + .unwrap(); + + let client = starter.get_client().await; + let task_queue = starter.get_task_queue().to_owned(); + let workflow_id = starter.get_wf_id(); + let shutdown1 = worker1.inner_mut().shutdown_handle(); + let shutdown2 = worker2.inner_mut().shutdown_handle(); + + let client_task = async { + wait_for_worker_deployment_version(&client, &deploy_name, &v1).await; + wait_for_worker_deployment_version(&client, &deploy_name, &v2).await; + set_current_deployment_version(&client, &deploy_name, &v1).await; + wait_for_worker_deployment_routing(&client, &deploy_name, Some(&v1), None).await; + + let handle = client + .start_workflow( + ContinueAsNewAutoUpgradeV1::run, + 0_u8, + WorkflowStartOptions::new(task_queue, workflow_id).build(), + ) + .await + .unwrap(); + wait_for_workflow_deployment_version( + &client, + &handle.info().workflow_id, + handle.run_id().unwrap_or_default(), + &v1, + ) + .await; + + set_current_deployment_version(&client, &deploy_name, &v2).await; + wait_for_worker_deployment_routing(&client, &deploy_name, Some(&v2), None).await; + handle + .signal( + ContinueAsNewAutoUpgradeV1::continue_as_new, + (), + WorkflowSignalOptions::default(), + ) + .await + .unwrap(); + + let result = handle.get_result(Default::default()).await.unwrap(); + assert_eq!(result, "v2.0"); + shutdown1(); + shutdown2(); + }; + + tokio::time::timeout(Duration::from_secs(60), async { + join!( + async { + worker1.inner_mut().run().await.unwrap(); + }, + async { + worker2.inner_mut().run().await.unwrap(); + }, + client_task + ); + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn continue_as_new_use_ramping_version_uses_ramping_deployment_version() { + let wf_type = "continue_as_new_use_ramping_version_uses_ramping_deployment_version"; + let mut starter = CoreWfStarter::new(wf_type); + let deploy_name = format!("deployment-{}", starter.get_task_queue()); + let v1 = WorkerDeploymentVersion { + deployment_name: deploy_name.clone(), + build_id: "1.0".to_string(), + }; + let v2 = WorkerDeploymentVersion { + deployment_name: deploy_name.clone(), + build_id: "2.0".to_string(), + }; + starter.sdk_config.deployment_options = versioned_worker_options(v1.clone()); + starter.sdk_config.task_types = WorkerTaskTypes::workflow_only(); + let mut worker1 = starter.worker().await; + worker1 + .register_workflow::() + .unwrap(); + + let mut starter2 = starter.clone_no_worker(); + starter2.sdk_config.deployment_options = versioned_worker_options(v2.clone()); + starter2.sdk_config.task_types = WorkerTaskTypes::workflow_only(); + let mut worker2 = starter2.worker().await; + worker2 + .register_workflow::() + .unwrap(); + + let client = starter.get_client().await; + let task_queue = starter.get_task_queue().to_owned(); + let workflow_id = starter.get_wf_id(); + let shutdown1 = worker1.inner_mut().shutdown_handle(); + let shutdown2 = worker2.inner_mut().shutdown_handle(); + + let client_task = async { + wait_for_worker_deployment_version(&client, &deploy_name, &v1).await; + wait_for_worker_deployment_version(&client, &deploy_name, &v2).await; + set_current_deployment_version(&client, &deploy_name, &v1).await; + wait_for_worker_deployment_routing(&client, &deploy_name, Some(&v1), None).await; + + let handle = client + .start_workflow( + ContinueAsNewUseRampingVersionV1::run, + 0_u8, + WorkflowStartOptions::new(task_queue, workflow_id).build(), + ) + .await + .unwrap(); + wait_for_workflow_deployment_version( + &client, + &handle.info().workflow_id, + handle.run_id().unwrap_or_default(), + &v1, + ) + .await; + + set_ramping_deployment_version(&client, &deploy_name, &v2, 0.0).await; + wait_for_worker_deployment_routing(&client, &deploy_name, Some(&v1), Some(&v2)).await; + handle + .signal( + ContinueAsNewUseRampingVersionV1::continue_as_new, + (), + WorkflowSignalOptions::default(), + ) + .await + .unwrap(); + + let result = handle.get_result(Default::default()).await.unwrap(); + assert_eq!(result, "v2.0"); + shutdown1(); + shutdown2(); + }; + + tokio::time::timeout(Duration::from_secs(60), async { + join!( + async { + worker1.inner_mut().run().await.unwrap(); + }, + async { + worker2.inner_mut().run().await.unwrap(); + }, + client_task + ); + }) + .await + .unwrap(); +} + +fn versioned_worker_options(version: WorkerDeploymentVersion) -> WorkerDeploymentOptions { + WorkerDeploymentOptions { + version, + use_worker_versioning: true, + default_versioning_behavior: VersioningBehavior::Pinned.into(), + } +} + +async fn try_describe_worker_deployment( + client: &Client, + deployment_name: &str, +) -> Result< + temporalio_common::protos::temporal::api::workflowservice::v1::DescribeWorkerDeploymentResponse, + tonic::Status, +> { + client + .connection() + .clone() + .describe_worker_deployment( + DescribeWorkerDeploymentRequest { + namespace: client.namespace(), + deployment_name: deployment_name.to_string(), + } + .into_request(), + ) + .await + .map(|resp| resp.into_inner()) +} + +async fn wait_for_worker_deployment_version( + client: &Client, + deployment_name: &str, + expected: &WorkerDeploymentVersion, +) { + eventually( + async || { + let resp = try_describe_worker_deployment(client, deployment_name) + .await + .map_err(|err| format!("{err:?}"))?; + let info = resp + .worker_deployment_info + .ok_or_else(|| "missing worker deployment info".to_string())?; + if info + .version_summaries + .iter() + .filter_map(|summary| summary.deployment_version.clone()) + .map(WorkerDeploymentVersion::from) + .any(|version| version == *expected) + { + Ok(()) + } else { + Err(format!("deployment version {expected:?} not visible yet")) + } + }, + Duration::from_secs(50), + ) + .await + .unwrap(); +} + +async fn wait_for_worker_deployment_routing( + client: &Client, + deployment_name: &str, + expected_current: Option<&WorkerDeploymentVersion>, + expected_ramping: Option<&WorkerDeploymentVersion>, +) { + eventually( + async || { + let resp = try_describe_worker_deployment(client, deployment_name) + .await + .map_err(|err| format!("{err:?}"))?; + let info = resp + .worker_deployment_info + .ok_or_else(|| "missing worker deployment info".to_string())?; + let routing = info + .routing_config + .ok_or_else(|| "missing routing config".to_string())?; + if RoutingConfigUpdateState::try_from(info.routing_config_update_state) + .unwrap_or(RoutingConfigUpdateState::Unspecified) + == RoutingConfigUpdateState::InProgress + { + return Err("routing config update still in progress".to_string()); + } + let current = routing + .current_deployment_version + .map(WorkerDeploymentVersion::from); + if current.as_ref() != expected_current { + return Err(format!( + "current deployment version mismatch: {:?}", + current + )); + } + let ramping = routing + .ramping_deployment_version + .map(WorkerDeploymentVersion::from); + if ramping.as_ref() != expected_ramping { + return Err(format!("ramping deployment version mismatch: {ramping:?}")); + } + Ok(()) + }, + Duration::from_secs(50), + ) + .await + .unwrap(); +} + +async fn set_current_deployment_version( + client: &Client, + deployment_name: &str, + version: &WorkerDeploymentVersion, +) { + let desc = try_describe_worker_deployment(client, deployment_name) + .await + .unwrap(); + client + .connection() + .clone() + .set_worker_deployment_current_version( + SetWorkerDeploymentCurrentVersionRequest { + namespace: client.namespace(), + deployment_name: deployment_name.to_string(), + build_id: version.build_id.clone(), + conflict_token: desc.conflict_token, + identity: client.identity(), + ..Default::default() + } + .into_request(), + ) + .await + .unwrap(); +} + +async fn set_ramping_deployment_version( + client: &Client, + deployment_name: &str, + version: &WorkerDeploymentVersion, + percentage: f32, +) { + let desc = try_describe_worker_deployment(client, deployment_name) + .await + .unwrap(); + client + .connection() + .clone() + .set_worker_deployment_ramping_version( + SetWorkerDeploymentRampingVersionRequest { + namespace: client.namespace(), + deployment_name: deployment_name.to_string(), + build_id: version.build_id.clone(), + percentage, + conflict_token: desc.conflict_token, + identity: client.identity(), + ..Default::default() + } + .into_request(), + ) + .await + .unwrap(); +} + +async fn wait_for_workflow_deployment_version( + client: &Client, + workflow_id: &str, + run_id: &str, + expected: &WorkerDeploymentVersion, +) { + eventually( + async || { + let resp = client + .connection() + .clone() + .describe_workflow_execution( + DescribeWorkflowExecutionRequest { + namespace: client.namespace(), + execution: Some(WorkflowExecution { + workflow_id: workflow_id.to_string(), + run_id: run_id.to_string(), + }), + } + .into_request(), + ) + .await + .map_err(|err| format!("{err:?}"))? + .into_inner(); + let info = resp + .workflow_execution_info + .ok_or_else(|| "missing workflow execution info".to_string())?; + let versioning_info = info + .versioning_info + .ok_or_else(|| "missing workflow versioning info".to_string())?; + let deployment_version = versioning_info + .deployment_version + .map(WorkerDeploymentVersion::from) + .ok_or_else(|| "missing workflow deployment version".to_string())?; + if deployment_version == *expected { + Ok(()) + } else { + Err(format!( + "workflow deployment version mismatch: {deployment_version:?}" + )) + } + }, + Duration::from_secs(50), + ) + .await + .unwrap(); +} + #[workflow] #[derive(Default)] struct ActivityHasDeploymentStampWf; @@ -308,3 +699,83 @@ impl ActivityHasDeploymentStampWf { Ok(()) } } + +#[workflow] +#[derive(Default)] +struct ContinueAsNewAutoUpgradeV1 { + should_continue_as_new: bool, +} + +#[workflow_methods] +impl ContinueAsNewAutoUpgradeV1 { + #[run(name = "continue_as_new_auto_upgrade_uses_current_deployment_version")] + async fn run(ctx: &mut WorkflowContext, attempt: u8) -> WorkflowResult { + if attempt > 0 { + return Ok("v1.0".to_string()); + } + ctx.wait_condition(|state| state.should_continue_as_new) + .await; + assert!(ctx.target_worker_deployment_version_changed()); + let mut options = ContinueAsNewOptions::default(); + options.initial_versioning_behavior = ContinueAsNewVersioningBehavior::AutoUpgrade.into(); + ctx.continue_as_new(&(attempt + 1), options)?; + Ok("v1.0".to_string()) + } + + #[signal] + fn continue_as_new(&mut self, _ctx: &mut SyncWorkflowContext, _: ()) { + self.should_continue_as_new = true; + } +} + +#[workflow] +#[derive(Default)] +struct ContinueAsNewAutoUpgradeV2; + +#[workflow_methods] +impl ContinueAsNewAutoUpgradeV2 { + #[run(name = "continue_as_new_auto_upgrade_uses_current_deployment_version")] + async fn run(_ctx: &mut WorkflowContext, _attempt: u8) -> WorkflowResult { + Ok("v2.0".to_string()) + } +} + +#[workflow] +#[derive(Default)] +struct ContinueAsNewUseRampingVersionV1 { + should_continue_as_new: bool, +} + +#[workflow_methods] +impl ContinueAsNewUseRampingVersionV1 { + #[run(name = "continue_as_new_use_ramping_version_uses_ramping_deployment_version")] + async fn run(ctx: &mut WorkflowContext, attempt: u8) -> WorkflowResult { + if attempt > 0 { + return Ok("v1.0".to_string()); + } + ctx.wait_condition(|state| state.should_continue_as_new) + .await; + let mut options = ContinueAsNewOptions::default(); + options.initial_versioning_behavior = + ContinueAsNewVersioningBehavior::UseRampingVersion.into(); + ctx.continue_as_new(&(attempt + 1), options)?; + Ok("v1.0".to_string()) + } + + #[signal] + fn continue_as_new(&mut self, _ctx: &mut SyncWorkflowContext, _: ()) { + self.should_continue_as_new = true; + } +} + +#[workflow] +#[derive(Default)] +struct ContinueAsNewUseRampingVersionV2; + +#[workflow_methods] +impl ContinueAsNewUseRampingVersionV2 { + #[run(name = "continue_as_new_use_ramping_version_uses_ramping_deployment_version")] + async fn run(_ctx: &mut WorkflowContext, _attempt: u8) -> WorkflowResult { + Ok("v2.0".to_string()) + } +} diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs index 35705cd57..edb291047 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs @@ -5,16 +5,17 @@ use temporalio_common::{ protos::{ coresdk::AsJsonPayloadExt, temporal::api::{ - command::v1::command::Attributes, - common::v1::SearchAttributes, - enums::v1::{CommandType, ContinueAsNewVersioningBehavior}, + command::v1::command::Attributes, common::v1::SearchAttributes, enums::v1::CommandType, history::v1::history_event, }, }, worker::WorkerTaskTypes, }; use temporalio_macros::{workflow, workflow_methods}; -use temporalio_sdk::{ContinueAsNewOptions, WorkflowContext, WorkflowResult, WorkflowTermination}; +use temporalio_sdk::{ + ContinueAsNewOptions, ContinueAsNewVersioningBehavior, WorkflowContext, WorkflowResult, + WorkflowTermination, +}; use temporalio_sdk_core::{ TunerHolder, replay::{DEFAULT_WORKFLOW_TYPE, canned_histories}, @@ -139,9 +140,11 @@ impl ContinueAsNewSuggestedWf { async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { // First WFT: flag should be false assert!(!ctx.continue_as_new_suggested()); + assert!(!ctx.target_worker_deployment_version_changed()); ctx.timer(Duration::from_millis(500)).await; // Second WFT: flag should be true (set on WFT started event 8) assert!(ctx.continue_as_new_suggested()); + assert!(ctx.target_worker_deployment_version_changed()); ctx.continue_as_new(&(), ContinueAsNewOptions::default())?; Ok(()) } @@ -156,6 +159,7 @@ async fn continue_as_new_suggested_flag_exposed() { he.attributes { attrs.suggest_continue_as_new = true; + attrs.target_worker_deployment_version_changed = true; } }); diff --git a/crates/sdk/src/lib.rs b/crates/sdk/src/lib.rs index def7dc1f4..7daed730a 100644 --- a/crates/sdk/src/lib.rs +++ b/crates/sdk/src/lib.rs @@ -87,6 +87,7 @@ pub use crate::error::{ OutgoingWorkflowError, WorkflowRegistrationError, }; pub use temporalio_client::Namespace; +pub use temporalio_common::protos::temporal::api::enums::v1::ContinueAsNewVersioningBehavior; pub use temporalio_workflow::{ ActivityCloseTimeouts, ActivityOptions, BaseWorkflowContext, CancellableFuture, ChildWorkflowOptions, ContinueAsNewOptions, ExternalWorkflowHandle, LocalActivityOptions, diff --git a/crates/workflow/src/workflow_context.rs b/crates/workflow/src/workflow_context.rs index 632cc1334..eaec81a2b 100644 --- a/crates/workflow/src/workflow_context.rs +++ b/crates/workflow/src/workflow_context.rs @@ -812,6 +812,18 @@ impl SyncWorkflowContext { .continue_as_new_suggested } + /// Returns true if the workflow's target worker deployment version changed. + /// + /// This experimental signal is intended for workers using worker deployment versioning. + pub fn target_worker_deployment_version_changed(&self) -> bool { + self.base + .inner + .shared + .borrow() + .activation + .target_worker_deployment_version_changed + } + /// Returns the headers for the current handler invocation (signal, update, query, etc.). /// /// When called from within a signal handler, returns the headers that were sent with that @@ -1156,6 +1168,13 @@ impl WorkflowContext { self.sync.continue_as_new_suggested() } + /// Returns true if the workflow's target worker deployment version changed. + /// + /// This experimental signal is intended for workers using worker deployment versioning. + pub fn target_worker_deployment_version_changed(&self) -> bool { + self.sync.target_worker_deployment_version_changed() + } + /// Returns the headers for the current handler invocation (signal, update, query, etc.). pub fn headers(&self) -> &HashMap { self.sync.headers() @@ -2460,6 +2479,9 @@ mod tests { ..Default::default() }), versioning_intent: Some(ProtoVersioningIntent::Compatible), + initial_versioning_behavior: Some( + ContinueAsNewVersioningBehavior::UseRampingVersion, + ), }, ) .expect_err("continue_as_new should terminate the workflow"); @@ -2487,7 +2509,8 @@ mod tests { ..Default::default() }), versioning_intent: ProtoVersioningIntent::Compatible.into(), - initial_versioning_behavior: ContinueAsNewVersioningBehavior::Unspecified.into(), + initial_versioning_behavior: ContinueAsNewVersioningBehavior::UseRampingVersion + as i32, } ); } @@ -2513,6 +2536,29 @@ mod tests { assert_eq!(cmd.search_attributes, Some(SearchAttributes::default())); } + #[test] + fn workflow_context_continue_as_new_applies_auto_upgrade_versioning_behavior() { + let ctx = test_context(); + + let termination = ctx + .continue_as_new( + &13, + ContinueAsNewOptions { + initial_versioning_behavior: Some(ContinueAsNewVersioningBehavior::AutoUpgrade), + ..Default::default() + }, + ) + .expect_err("continue_as_new should terminate the workflow"); + let WorkflowTermination::ContinueAsNew(cmd) = termination else { + unreachable!() + }; + + assert_eq!( + cmd.initial_versioning_behavior, + ContinueAsNewVersioningBehavior::AutoUpgrade as i32 + ); + } + #[test] fn continue_as_new_reports_serialization_errors() { #[derive(Debug)] diff --git a/crates/workflow/src/workflow_context/options.rs b/crates/workflow/src/workflow_context/options.rs index 9f264466a..7d18a3f1a 100644 --- a/crates/workflow/src/workflow_context/options.rs +++ b/crates/workflow/src/workflow_context/options.rs @@ -518,6 +518,12 @@ pub struct ContinueAsNewOptions { pub retry_policy: Option, /// Whether the new workflow should run on a worker with a compatible build id. pub versioning_intent: Option, + /// Versioning behavior to use for the first workflow task of the new run. + /// + /// This experimental option is only meaningful for workers using worker deployment + /// versioning. `AutoUpgrade` routes the new run to the current deployment version; + /// `UseRampingVersion` routes it to the ramping deployment version when one is configured. + pub initial_versioning_behavior: Option, } impl ContinueAsNewOptions { @@ -544,7 +550,10 @@ impl ContinueAsNewOptions { .versioning_intent .unwrap_or(VersioningIntent::Unspecified) .into(), - initial_versioning_behavior: ContinueAsNewVersioningBehavior::Unspecified.into(), + initial_versioning_behavior: self + .initial_versioning_behavior + .unwrap_or(ContinueAsNewVersioningBehavior::Unspecified) + .into(), } } } From 55e8033dcf6674b0482d7557d554ac20bb4b65ec Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 8 Jun 2026 22:35:29 -0400 Subject: [PATCH 2/3] address PR suggestions --- .cargo/config.toml | 2 +- .../integ_tests/worker_versioning_tests.rs | 198 +++++++++--------- 2 files changed, 100 insertions(+), 100 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 4bb521b75..7f5daf4df 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,6 +1,6 @@ [env] # This temporarily overrides the version of the CLI used for integration tests, locally and in CI -CLI_VERSION_OVERRIDE = "v1.7.0" +# CLI_VERSION_OVERRIDE = "v1.6.3-serverless" [alias] # Not sure why --all-features doesn't work diff --git a/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs b/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs index ebb40bc5a..80d915ef0 100644 --- a/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs @@ -147,6 +147,25 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau assert_eq!(dv.build_id, "1.0"); } +#[workflow] +#[derive(Default)] +struct ActivityHasDeploymentStampWf; + +#[workflow_methods] +impl ActivityHasDeploymentStampWf { + #[run(name = "activity_has_deployment_stamp")] + async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { + let _ = ctx + .start_activity( + StdActivities::echo, + "hi!".to_string(), + ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), + ) + .await; + Ok(()) + } +} + #[tokio::test] async fn activity_has_deployment_stamp() { let wf_name = "activity_has_deployment_stamp"; @@ -297,6 +316,46 @@ async fn versioning_off_with_custom_build_id() { ); } +#[workflow] +#[derive(Default)] +struct ContinueAsNewAutoUpgradeV1 { + should_continue_as_new: bool, +} + +#[workflow_methods] +impl ContinueAsNewAutoUpgradeV1 { + #[run(name = "continue_as_new_auto_upgrade_uses_current_deployment_version")] + async fn run(ctx: &mut WorkflowContext, attempt: u8) -> WorkflowResult { + if attempt > 0 { + return Ok("v1.0".to_string()); + } + ctx.wait_condition(|state| state.should_continue_as_new) + .await; + assert!(ctx.target_worker_deployment_version_changed()); + let mut options = ContinueAsNewOptions::default(); + options.initial_versioning_behavior = ContinueAsNewVersioningBehavior::AutoUpgrade.into(); + ctx.continue_as_new(&(attempt + 1), options)?; + Ok("v1.0".to_string()) + } + + #[signal] + fn continue_as_new(&mut self, _ctx: &mut SyncWorkflowContext, _: ()) { + self.should_continue_as_new = true; + } +} + +#[workflow] +#[derive(Default)] +struct ContinueAsNewAutoUpgradeV2; + +#[workflow_methods] +impl ContinueAsNewAutoUpgradeV2 { + #[run(name = "continue_as_new_auto_upgrade_uses_current_deployment_version")] + async fn run(_ctx: &mut WorkflowContext, _attempt: u8) -> WorkflowResult { + Ok("v2.0".to_string()) + } +} + #[tokio::test] async fn continue_as_new_auto_upgrade_uses_current_deployment_version() { let wf_type = "continue_as_new_auto_upgrade_uses_current_deployment_version"; @@ -385,6 +444,46 @@ async fn continue_as_new_auto_upgrade_uses_current_deployment_version() { .unwrap(); } +#[workflow] +#[derive(Default)] +struct ContinueAsNewUseRampingVersionV1 { + should_continue_as_new: bool, +} + +#[workflow_methods] +impl ContinueAsNewUseRampingVersionV1 { + #[run(name = "continue_as_new_use_ramping_version_uses_ramping_deployment_version")] + async fn run(ctx: &mut WorkflowContext, attempt: u8) -> WorkflowResult { + if attempt > 0 { + return Ok("v1.0".to_string()); + } + ctx.wait_condition(|state| state.should_continue_as_new) + .await; + let mut options = ContinueAsNewOptions::default(); + options.initial_versioning_behavior = + ContinueAsNewVersioningBehavior::UseRampingVersion.into(); + ctx.continue_as_new(&(attempt + 1), options)?; + Ok("v1.0".to_string()) + } + + #[signal] + fn continue_as_new(&mut self, _ctx: &mut SyncWorkflowContext, _: ()) { + self.should_continue_as_new = true; + } +} + +#[workflow] +#[derive(Default)] +struct ContinueAsNewUseRampingVersionV2; + +#[workflow_methods] +impl ContinueAsNewUseRampingVersionV2 { + #[run(name = "continue_as_new_use_ramping_version_uses_ramping_deployment_version")] + async fn run(_ctx: &mut WorkflowContext, _attempt: u8) -> WorkflowResult { + Ok("v2.0".to_string()) + } +} + #[tokio::test] async fn continue_as_new_use_ramping_version_uses_ramping_deployment_version() { let wf_type = "continue_as_new_use_ramping_version_uses_ramping_deployment_version"; @@ -680,102 +779,3 @@ async fn wait_for_workflow_deployment_version( .await .unwrap(); } - -#[workflow] -#[derive(Default)] -struct ActivityHasDeploymentStampWf; - -#[workflow_methods] -impl ActivityHasDeploymentStampWf { - #[run(name = "activity_has_deployment_stamp")] - async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { - let _ = ctx - .start_activity( - StdActivities::echo, - "hi!".to_string(), - ActivityOptions::start_to_close_timeout(Duration::from_secs(5)), - ) - .await; - Ok(()) - } -} - -#[workflow] -#[derive(Default)] -struct ContinueAsNewAutoUpgradeV1 { - should_continue_as_new: bool, -} - -#[workflow_methods] -impl ContinueAsNewAutoUpgradeV1 { - #[run(name = "continue_as_new_auto_upgrade_uses_current_deployment_version")] - async fn run(ctx: &mut WorkflowContext, attempt: u8) -> WorkflowResult { - if attempt > 0 { - return Ok("v1.0".to_string()); - } - ctx.wait_condition(|state| state.should_continue_as_new) - .await; - assert!(ctx.target_worker_deployment_version_changed()); - let mut options = ContinueAsNewOptions::default(); - options.initial_versioning_behavior = ContinueAsNewVersioningBehavior::AutoUpgrade.into(); - ctx.continue_as_new(&(attempt + 1), options)?; - Ok("v1.0".to_string()) - } - - #[signal] - fn continue_as_new(&mut self, _ctx: &mut SyncWorkflowContext, _: ()) { - self.should_continue_as_new = true; - } -} - -#[workflow] -#[derive(Default)] -struct ContinueAsNewAutoUpgradeV2; - -#[workflow_methods] -impl ContinueAsNewAutoUpgradeV2 { - #[run(name = "continue_as_new_auto_upgrade_uses_current_deployment_version")] - async fn run(_ctx: &mut WorkflowContext, _attempt: u8) -> WorkflowResult { - Ok("v2.0".to_string()) - } -} - -#[workflow] -#[derive(Default)] -struct ContinueAsNewUseRampingVersionV1 { - should_continue_as_new: bool, -} - -#[workflow_methods] -impl ContinueAsNewUseRampingVersionV1 { - #[run(name = "continue_as_new_use_ramping_version_uses_ramping_deployment_version")] - async fn run(ctx: &mut WorkflowContext, attempt: u8) -> WorkflowResult { - if attempt > 0 { - return Ok("v1.0".to_string()); - } - ctx.wait_condition(|state| state.should_continue_as_new) - .await; - let mut options = ContinueAsNewOptions::default(); - options.initial_versioning_behavior = - ContinueAsNewVersioningBehavior::UseRampingVersion.into(); - ctx.continue_as_new(&(attempt + 1), options)?; - Ok("v1.0".to_string()) - } - - #[signal] - fn continue_as_new(&mut self, _ctx: &mut SyncWorkflowContext, _: ()) { - self.should_continue_as_new = true; - } -} - -#[workflow] -#[derive(Default)] -struct ContinueAsNewUseRampingVersionV2; - -#[workflow_methods] -impl ContinueAsNewUseRampingVersionV2 { - #[run(name = "continue_as_new_use_ramping_version_uses_ramping_deployment_version")] - async fn run(_ctx: &mut WorkflowContext, _attempt: u8) -> WorkflowResult { - Ok("v2.0".to_string()) - } -} From 0dbcfd5e3d9b2d323b498ceeddd9a8c32fbd838c Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 9 Jun 2026 16:34:56 -0400 Subject: [PATCH 3/3] make rust native enum for CAN versioning behavior --- .../integ_tests/worker_versioning_tests.rs | 22 +++++-- .../workflow_tests/continue_as_new.rs | 16 ++--- crates/sdk/src/lib.rs | 11 ++-- crates/workflow/src/lib.rs | 8 +-- crates/workflow/src/workflow_context.rs | 12 ++-- .../workflow/src/workflow_context/options.rs | 59 +++++++++++++++++-- 6 files changed, 95 insertions(+), 33 deletions(-) diff --git a/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs b/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs index 80d915ef0..2175ddce7 100644 --- a/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_versioning_tests.rs @@ -333,7 +333,7 @@ impl ContinueAsNewAutoUpgradeV1 { .await; assert!(ctx.target_worker_deployment_version_changed()); let mut options = ContinueAsNewOptions::default(); - options.initial_versioning_behavior = ContinueAsNewVersioningBehavior::AutoUpgrade.into(); + options.initial_versioning_behavior = Some(ContinueAsNewVersioningBehavior::AutoUpgrade); ctx.continue_as_new(&(attempt + 1), options)?; Ok("v1.0".to_string()) } @@ -394,7 +394,7 @@ async fn continue_as_new_auto_upgrade_uses_current_deployment_version() { wait_for_worker_deployment_version(&client, &deploy_name, &v1).await; wait_for_worker_deployment_version(&client, &deploy_name, &v2).await; set_current_deployment_version(&client, &deploy_name, &v1).await; - wait_for_worker_deployment_routing(&client, &deploy_name, Some(&v1), None).await; + wait_for_worker_deployment_routing(&client, &deploy_name, Some(&v1), None, None).await; let handle = client .start_workflow( @@ -413,7 +413,7 @@ async fn continue_as_new_auto_upgrade_uses_current_deployment_version() { .await; set_current_deployment_version(&client, &deploy_name, &v2).await; - wait_for_worker_deployment_routing(&client, &deploy_name, Some(&v2), None).await; + wait_for_worker_deployment_routing(&client, &deploy_name, Some(&v2), None, None).await; handle .signal( ContinueAsNewAutoUpgradeV1::continue_as_new, @@ -461,7 +461,7 @@ impl ContinueAsNewUseRampingVersionV1 { .await; let mut options = ContinueAsNewOptions::default(); options.initial_versioning_behavior = - ContinueAsNewVersioningBehavior::UseRampingVersion.into(); + Some(ContinueAsNewVersioningBehavior::UseRampingVersion); ctx.continue_as_new(&(attempt + 1), options)?; Ok("v1.0".to_string()) } @@ -522,7 +522,7 @@ async fn continue_as_new_use_ramping_version_uses_ramping_deployment_version() { wait_for_worker_deployment_version(&client, &deploy_name, &v1).await; wait_for_worker_deployment_version(&client, &deploy_name, &v2).await; set_current_deployment_version(&client, &deploy_name, &v1).await; - wait_for_worker_deployment_routing(&client, &deploy_name, Some(&v1), None).await; + wait_for_worker_deployment_routing(&client, &deploy_name, Some(&v1), None, None).await; let handle = client .start_workflow( @@ -541,7 +541,8 @@ async fn continue_as_new_use_ramping_version_uses_ramping_deployment_version() { .await; set_ramping_deployment_version(&client, &deploy_name, &v2, 0.0).await; - wait_for_worker_deployment_routing(&client, &deploy_name, Some(&v1), Some(&v2)).await; + wait_for_worker_deployment_routing(&client, &deploy_name, Some(&v1), Some(&v2), Some(0.0)) + .await; handle .signal( ContinueAsNewUseRampingVersionV1::continue_as_new, @@ -637,6 +638,7 @@ async fn wait_for_worker_deployment_routing( deployment_name: &str, expected_current: Option<&WorkerDeploymentVersion>, expected_ramping: Option<&WorkerDeploymentVersion>, + expected_ramping_percentage: Option, ) { eventually( async || { @@ -670,6 +672,14 @@ async fn wait_for_worker_deployment_routing( if ramping.as_ref() != expected_ramping { return Err(format!("ramping deployment version mismatch: {ramping:?}")); } + if let Some(expected_ramping_percentage) = expected_ramping_percentage { + let actual = routing.ramping_version_percentage; + if (actual - expected_ramping_percentage).abs() > f32::EPSILON { + return Err(format!( + "ramping percentage mismatch: expected {expected_ramping_percentage}, got {actual}" + )); + } + } Ok(()) }, Duration::from_secs(50), diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs index edb291047..8b40654ab 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/continue_as_new.rs @@ -5,17 +5,19 @@ use temporalio_common::{ protos::{ coresdk::AsJsonPayloadExt, temporal::api::{ - command::v1::command::Attributes, common::v1::SearchAttributes, enums::v1::CommandType, + command::v1::command::Attributes, + common::v1::SearchAttributes, + enums::v1::{ + CommandType, + ContinueAsNewVersioningBehavior as ProtoContinueAsNewVersioningBehavior, + }, history::v1::history_event, }, }, worker::WorkerTaskTypes, }; use temporalio_macros::{workflow, workflow_methods}; -use temporalio_sdk::{ - ContinueAsNewOptions, ContinueAsNewVersioningBehavior, WorkflowContext, WorkflowResult, - WorkflowTermination, -}; +use temporalio_sdk::{ContinueAsNewOptions, WorkflowContext, WorkflowResult, WorkflowTermination}; use temporalio_sdk_core::{ TunerHolder, replay::{DEFAULT_WORKFLOW_TYPE, canned_histories}, @@ -95,7 +97,7 @@ impl WfWithTimer { ctx.timer(Duration::from_millis(500)).await; Err(WorkflowTermination::continue_as_new(ContinueAsNewRequest { arguments: vec![[1].into()], - initial_versioning_behavior: ContinueAsNewVersioningBehavior::AutoUpgrade.into(), + initial_versioning_behavior: ProtoContinueAsNewVersioningBehavior::AutoUpgrade.into(), ..Default::default() })) } @@ -120,7 +122,7 @@ async fn wf_completing_with_continue_as_new() { assert_matches!( wft.commands[0].attributes.as_ref().unwrap(), Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(can_attrs) - if can_attrs.initial_versioning_behavior == ContinueAsNewVersioningBehavior::AutoUpgrade as i32 + if can_attrs.initial_versioning_behavior == ProtoContinueAsNewVersioningBehavior::AutoUpgrade as i32 ); }); }); diff --git a/crates/sdk/src/lib.rs b/crates/sdk/src/lib.rs index 7daed730a..6c4ed242a 100644 --- a/crates/sdk/src/lib.rs +++ b/crates/sdk/src/lib.rs @@ -87,14 +87,13 @@ pub use crate::error::{ OutgoingWorkflowError, WorkflowRegistrationError, }; pub use temporalio_client::Namespace; -pub use temporalio_common::protos::temporal::api::enums::v1::ContinueAsNewVersioningBehavior; pub use temporalio_workflow::{ ActivityCloseTimeouts, ActivityOptions, BaseWorkflowContext, CancellableFuture, - ChildWorkflowOptions, ContinueAsNewOptions, ExternalWorkflowHandle, LocalActivityOptions, - NexusOperationOptions, ParentWorkflowInfo, RootWorkflowInfo, Signal, SignalData, - StartChildWorkflowExecutionFailedCause, StartedChildWorkflow, SyncWorkflowContext, - TimerOptions, TimerResult, WorkflowContext, WorkflowContextView, WorkflowResult, - WorkflowTermination, + ChildWorkflowOptions, ContinueAsNewOptions, ContinueAsNewVersioningBehavior, + ExternalWorkflowHandle, LocalActivityOptions, NexusOperationOptions, ParentWorkflowInfo, + RootWorkflowInfo, Signal, SignalData, StartChildWorkflowExecutionFailedCause, + StartedChildWorkflow, SyncWorkflowContext, TimerOptions, TimerResult, WorkflowContext, + WorkflowContextView, WorkflowResult, WorkflowTermination, }; #[cfg(feature = "wasm-workflows")] pub use workflow_wasm::WasmWorkflowComponent; diff --git a/crates/workflow/src/lib.rs b/crates/workflow/src/lib.rs index 334ec9d7b..3ece0ea48 100644 --- a/crates/workflow/src/lib.rs +++ b/crates/workflow/src/lib.rs @@ -32,10 +32,10 @@ pub use temporalio_common_wasm::error::{ }; pub use workflow_context::{ ActivityCloseTimeouts, ActivityOptions, BaseWorkflowContext, CancellableFuture, - ChildWorkflowOptions, ContinueAsNewOptions, ExternalWorkflowHandle, LocalActivityOptions, - NexusOperationOptions, ParentWorkflowInfo, RootWorkflowInfo, Signal, SignalData, - StartChildWorkflowExecutionFailedCause, StartedChildWorkflow, SyncWorkflowContext, - TimerOptions, WorkflowContext, WorkflowContextView, + ChildWorkflowOptions, ContinueAsNewOptions, ContinueAsNewVersioningBehavior, + ExternalWorkflowHandle, LocalActivityOptions, NexusOperationOptions, ParentWorkflowInfo, + RootWorkflowInfo, Signal, SignalData, StartChildWorkflowExecutionFailedCause, + StartedChildWorkflow, SyncWorkflowContext, TimerOptions, WorkflowContext, WorkflowContextView, }; pub use workflows::{join, join_all, select}; diff --git a/crates/workflow/src/workflow_context.rs b/crates/workflow/src/workflow_context.rs index eaec81a2b..7cb41489d 100644 --- a/crates/workflow/src/workflow_context.rs +++ b/crates/workflow/src/workflow_context.rs @@ -2,7 +2,8 @@ mod options; pub use options::{ ActivityCloseTimeouts, ActivityOptions, ChildWorkflowOptions, ContinueAsNewOptions, - LocalActivityOptions, NexusOperationOptions, Signal, SignalData, TimerOptions, + ContinueAsNewVersioningBehavior, LocalActivityOptions, NexusOperationOptions, Signal, + SignalData, TimerOptions, }; pub use temporalio_common_wasm::protos::coresdk::child_workflow::StartChildWorkflowExecutionFailedCause; @@ -2368,7 +2369,7 @@ mod tests { }, temporal::api::{ common::v1::{Payload, RetryPolicy}, - enums::v1::ContinueAsNewVersioningBehavior, + enums::v1::ContinueAsNewVersioningBehavior as ProtoContinueAsNewVersioningBehavior, }, }, }; @@ -2438,7 +2439,8 @@ mod tests { search_attributes: None, retry_policy: None, versioning_intent: ProtoVersioningIntent::Unspecified.into(), - initial_versioning_behavior: ContinueAsNewVersioningBehavior::Unspecified.into(), + initial_versioning_behavior: ProtoContinueAsNewVersioningBehavior::Unspecified + .into(), } ); } @@ -2509,7 +2511,7 @@ mod tests { ..Default::default() }), versioning_intent: ProtoVersioningIntent::Compatible.into(), - initial_versioning_behavior: ContinueAsNewVersioningBehavior::UseRampingVersion + initial_versioning_behavior: ProtoContinueAsNewVersioningBehavior::UseRampingVersion as i32, } ); @@ -2555,7 +2557,7 @@ mod tests { assert_eq!( cmd.initial_versioning_behavior, - ContinueAsNewVersioningBehavior::AutoUpgrade as i32 + ProtoContinueAsNewVersioningBehavior::AutoUpgrade as i32 ); } diff --git a/crates/workflow/src/workflow_context/options.rs b/crates/workflow/src/workflow_context/options.rs index 7d18a3f1a..f186f5f98 100644 --- a/crates/workflow/src/workflow_context/options.rs +++ b/crates/workflow/src/workflow_context/options.rs @@ -20,7 +20,10 @@ use temporalio_common_wasm::{ }, temporal::api::{ common::v1::{Payload, RetryPolicy, SearchAttributes}, - enums::v1::{ContinueAsNewVersioningBehavior, WorkflowIdReusePolicy}, + enums::v1::{ + ContinueAsNewVersioningBehavior as ProtoContinueAsNewVersioningBehavior, + WorkflowIdReusePolicy, + }, sdk::v1::UserMetadata, }, }, @@ -493,6 +496,51 @@ impl NexusOperationOptions { } } +/// Versioning behavior to use for the first workflow task of a new continue-as-new run. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +#[non_exhaustive] +pub enum ContinueAsNewVersioningBehavior { + /// No initial versioning behavior was specified. + #[default] + Unspecified, + /// Start the new run with AutoUpgrade behavior. + AutoUpgrade, + /// Start the new run on the task queue's ramping deployment version. + UseRampingVersion, +} + +impl From for ProtoContinueAsNewVersioningBehavior { + fn from(value: ContinueAsNewVersioningBehavior) -> Self { + match value { + ContinueAsNewVersioningBehavior::Unspecified => { + ProtoContinueAsNewVersioningBehavior::Unspecified + } + ContinueAsNewVersioningBehavior::AutoUpgrade => { + ProtoContinueAsNewVersioningBehavior::AutoUpgrade + } + ContinueAsNewVersioningBehavior::UseRampingVersion => { + ProtoContinueAsNewVersioningBehavior::UseRampingVersion + } + } + } +} + +impl From for ContinueAsNewVersioningBehavior { + fn from(value: ProtoContinueAsNewVersioningBehavior) -> Self { + match value { + ProtoContinueAsNewVersioningBehavior::Unspecified => { + ContinueAsNewVersioningBehavior::Unspecified + } + ProtoContinueAsNewVersioningBehavior::AutoUpgrade => { + ContinueAsNewVersioningBehavior::AutoUpgrade + } + ProtoContinueAsNewVersioningBehavior::UseRampingVersion => { + ContinueAsNewVersioningBehavior::UseRampingVersion + } + } + } +} + /// Options for continuing a workflow as a new execution. /// /// Unset fields inherit the current workflow's values where applicable. @@ -550,10 +598,11 @@ impl ContinueAsNewOptions { .versioning_intent .unwrap_or(VersioningIntent::Unspecified) .into(), - initial_versioning_behavior: self - .initial_versioning_behavior - .unwrap_or(ContinueAsNewVersioningBehavior::Unspecified) - .into(), + initial_versioning_behavior: ProtoContinueAsNewVersioningBehavior::from( + self.initial_versioning_behavior + .unwrap_or(ContinueAsNewVersioningBehavior::Unspecified), + ) + .into(), } } }