diff --git a/.github/workflows/per-pr.yml b/.github/workflows/per-pr.yml index 93aed2d4c..d2243b299 100644 --- a/.github/workflows/per-pr.yml +++ b/.github/workflows/per-pr.yml @@ -56,6 +56,7 @@ jobs: - os: windows-latest timeoutMinutes: 20 - os: ubuntu-latest + useWftChunkingV2: "true" - os: ubuntu-arm runsOn: ubuntu-24.04-arm64-2-core - os: macos-arm @@ -87,6 +88,8 @@ jobs: cache-bin: false save-if: ${{ github.ref == 'refs/heads/main' }} - run: cargo test -- --include-ignored --nocapture + env: + TEMPORAL_USE_WFT_CHUNKING_V2: ${{ matrix.useWftChunkingV2 || 'false' }} - name: Find test executable for cgroup tests id: find-cgroup-test if: runner.os == 'Linux' && runner.arch == 'X64' @@ -162,6 +165,7 @@ jobs: include: - os: windows-latest - os: ubuntu-latest + useWftChunkingV2: "true" - os: ubuntu-arm runsOn: ubuntu-24.04-arm64-2-core - os: macos-arm @@ -192,6 +196,8 @@ jobs: cache-bin: false save-if: ${{ github.ref == 'refs/heads/main' }} - run: cargo integ-test + env: + TEMPORAL_USE_WFT_CHUNKING_V2: ${{ matrix.useWftChunkingV2 || 'false' }} cloud-tests: if: github.event.pull_request.head.repo.full_name == '' || github.event.pull_request.head.repo.full_name == 'temporalio/sdk-rust' diff --git a/arch_docs/workflow_task_chunking.md b/arch_docs/workflow_task_chunking.md index 51c88f790..9f1922ca3 100644 --- a/arch_docs/workflow_task_chunking.md +++ b/arch_docs/workflow_task_chunking.md @@ -1,51 +1,240 @@ # Workflow Task Chunking +## Workflow Tasks Model + +Conceptually, Workflow Tasks are usually presented using the following model: + +- \[Preceding Events\] (optional) +- WFT Scheduled +- WFT Started +- WFT Completed +- \[Command Events\] (optional) + +That is, some events happen that the workflow needs to react to (Preceding Events), so the server +schedules a Workflow Task (WFT Scheduled). A worker then polls the task (WFT Started), and executes +workflow code. The workflow may produce commands, which are sent back to the server when the task +completes (WFT Completed). The server interprets those commands and records the corresponding +history events (Command Events). + +Commands and events are both "optional" in the sense that: + +- Workflow code, after being woken up, may not do anything and produce no new commands. +- There may be no inbound events, for a few reasons: + 1. The workflow might have been running a long-running local activity (LA). In that case the + workflow must "WFT heartbeat" — completing the current WFT with no commands while the LA is + still in progress — in order to avoid timing out the WFT on the server. + 2. The workflow might have received an Update, which is not represented as a history event but + rather as a "protocol message" attached to the task. + 3. The server can occasionally force a new WFT via some obscure internal APIs. + +## Logical Workflow Tasks Model + +Though useful as a general mental model, the sequence described previously does not faithfully align +with how Workflow Tasks are actually _processed_ by a Worker. + +Indeed, when a given Workflow Task is processed for the first time, that Task has obviously not yet +been completed and no commands have yet been produced, so the corresponding events are not present +in history. The Workflow Task handed to the Worker therefore ends at the WFT Started event. The WFT +Completed and Command Events resulting from that WFT execution only become visible to the Worker on +the _next_ Workflow Task. + +Therefore, from the perspective of how the Worker advances through a Workflow history, the +processing unit is better described as follows: + +- WFT Completed +- [Command Events] (optional) +- [Preceding Events] (optional) +- WFT Scheduled +- WFT Started + +Another particularity of the processing-oriented model is that it is possible, under some +conditions, for multiple consecutive Workflow Tasks to be collapsed into a single processing unit. +It is notably the case of Workflow Tasks that completed unsuccessfully (i.e. failed or timed out), +as well as most Workflow Tasks that belong to a Workflow Task Heartbeat sequence. + +This processing-oriented model is what we call **logical Workflow Tasks** (LWFT). Note that LWFT do +not represent a different history: all events in the Workflow Execution history remain present and +in the same order. It only changes how those events are aligned and grouped into processing units as +the Worker advances through history. + +// FIXME: What are absolute must-haves for LWFT? + +## Workflow History Pagination + +There are various ways The Worker obtains Workflow history from the server. + +## Chunking history into Logical Workflow Tasks + +It is critical that, on replay, Workflow history be split into LWFTs that correctly align with the +LWFTs processed during the original execution. Differences in how events are grouped affect how and +when those events are presented to workflow code. That, in turn, may affect which commands the +workflow produces, or when it produces them, resulting in non-determinism errors. + +It is also critical that + +Note that we said that LWFTs have to _align_ with the original execution, not that they have to be +_strictly the same_. In some cases, it is + One source of complexity in Core is the chunking of history into "logical" Workflow Tasks. -Workflow tasks (WFTs) always take the following form in event history: +A LWFT that contains no inbound events, no commands, and no protocol messages is a no-op from the +workflow's perspective. To avoid waking lang for no purpose, Core collapses such empty LWFTs into +the preceding non-empty one. -* \[Preceding Events\] (optional) -* WFT Scheduled -* WFT Started -* WFT Completed -* \[Commands\] (optional) +Deciding which sequences of WFTs are safe to collapse is the job of the WFT chunking algorithm. -In the typical case, the "logical" WFT consists of all the commands from the last workflow task, -any events generated in the interrim, and the scheduled/started preamble. So: +Two chunking algorithms exist: **v1** (the legacy heuristic) and **v2** (the current, more rigorous +algorithm). Both ship in this version of Core. Which one a given workflow uses is recorded as the +`WftChunkingV2` SDK flag on its first `WorkflowTaskCompleted` event. Workflows that don't carry the +flag use v1; workflows that do, use v2. v2 is opt-in for new workflows via the +`TEMPORAL_USE_WFT_CHUNKING_V2` environment variable. -* WFT Completed -* \[Commands\] (optional) -* \[Events\] (optional) -* WFT Scheduled -* WFT Started +## v1 — heartbeat detection heuristic -Commands and events are both "optional" in the sense that: +v1 (`find_end_index_of_next_wft_seq_v1`) walks history events linearly. At each +`WorkflowTaskStarted`, it peeks two events ahead and applies a simple heuristic to +decide whether the current WFT is a "WFT heartbeat" and should therefore be collapsed +into the next one. In essence: + +```rust +if !saw_command && next_next_event.event_type() == EventType::WorkflowTaskScheduled { + // This WFT contained no commands, and history immediately rolls into the next + // WFTScheduled with no intervening events. Treat it as a heartbeat: don't + // conclude the current LWFT here, keep walking. + continue; +} +``` + +`saw_command` tracks whether any _command event_ — i.e. an event generated by a worker +command, like `ActivityTaskScheduled`, `TimerStarted`, `MarkerRecorded`, etc. — appeared +before the current `WorkflowTaskStarted`. The combination "no commands + no inbound +events between WFTCompleted and the next WFTScheduled" is v1's signature of a heartbeat. + +This compact heuristic is the heart of v1's chunking decisions. + +### Downsides of v1 + +v1's compactness comes at the cost of accuracy. Several edge cases led to incorrect +chunking and, in some cases, non-determinism errors (NDE) at replay time: + +1. **Updates after empty WFTs.** Workflow Updates are delivered as protocol messages + attached to a WFT, not as history events. When a server delivers an Update right + after a WFT heartbeat, v1 sees the heartbeat as "empty" (no commands, nothing + between WFTCompleted and the next WFTScheduled) and collapses it into the next + WFT. The Update is then delivered as part of the combined LWFT, but the original + execution produced two separate activations — the heartbeat, then the Update. + On replay this discrepancy typically manifests as an NDE. + +2. **Brittle handling of paginated history.** v1's heuristic needs to look two events + past each `WorkflowTaskStarted` to commit a decision. When history is fetched in + pages, page boundaries can fall mid-decision. v1 returns `Incomplete(...)` in + that case, but the paginator's handling of `Incomplete` interacts subtly with + the heuristic, and the chunking outcome can diverge from the original run. + +3. **Inbound events could be absorbed into a heartbeat chain.** v1's heuristic + tracks only _command_ events explicitly (via the `saw_command` flag). Other + event types that can affect workflow state — signals, timer fires, accepted + updates, child-workflow events, etc. — are only handled indirectly through the + two-event look-ahead at each `WorkflowTaskStarted`. In some scenarios — most + visibly when the heuristic interacts with paginated history, but also in plain + in-memory chunking when events fall just-so — v1 ended up considering WFTs that + actually carried inbound events as part of the preceding heartbeat chain. + Because a collapsed LWFT presents _one_ timestamp (that of the first WFT in + the chain) to user code, any code that runs for the absorbed WFT observes + `workflow.now()` at the wrong moment in time, which leads to a non-determinism + error at replay. + +These problems motivated the v2 rewrite. + +## v2 — explicit state machine with time-sensitive events + +v2 (`find_end_index_of_next_wft_seq_v2`) replaces v1's compact heuristic with an +explicit state machine over the structure of WFT sequences in history. Three ideas +make v2 more robust than v1: + +### 1. Time-sensitive events explicitly prevent collapsing + +v2 introduces the concept of a **time-sensitive event**: any event that can cause user +workflow code to run on replay. Examples include `WorkflowExecutionStarted`, +`WorkflowExecutionSignaled`, `TimerStarted`/`TimerFired`, `ActivityTaskScheduled` and +its terminal counterparts, `WorkflowExecutionUpdateAccepted`, child-workflow events, +Nexus events, and so on. Events that cannot independently wake user code — WFT framing +events, `MarkerRecorded`, `UpsertWorkflowSearchAttributes`, etc. — are _not_ +time-sensitive. + +The full classification lives in `HistoryEvent::is_wft_time_sensitive_event` and uses +an exhaustive `match` (no catch-all) so any new event type added to the proto must be +explicitly classified. + +While walking events, v2 maintains a `prevent_heartbeat` flag. Once any time-sensitive +event is seen, the flag is latched on and v2 will not collapse the current LWFT into +the next WFT. This replaces v1's implicit "saw a command" check with an explicit, +complete list of which event types force a boundary. + +### 2. Pending speculative Updates are taken into account + +The chunker is parameterized with `has_pending_speculative_updates` — set true when the +current task carries Update protocol messages that haven't yet produced +`WorkflowExecutionUpdateAccepted` events in history. When set, v2 refuses to collapse +the **last** WFT in a heartbeat chain into the preceding chain, because the pending +Update message must be delivered in its own activation in order to match the original +execution. Earlier (non-last) heartbeats in the same chain can still be collapsed +together. + +This is the structural fix for the "Update after heartbeat" class of NDE that v1 +caused. + +### 3. Bounded look-ahead with explicit `NeedMore` + +v2 looks up to five events past the current `WorkflowTaskStarted` to make its +decision, and exposes its outcome via a three-variant enum: + +- `Complete(ix)` — a LWFT boundary has been found at the `WorkflowTaskStarted` at + index `ix`. +- `NeedMore` — not enough events are available to commit a decision; the caller + should fetch more history pages and retry. +- `Tail` — no more LWFT boundaries exist in the slice; any remaining events are + trailing matter (terminal `WorkflowExecution*` events, `WorkflowTaskCompleted` + with commands). + +When the look-ahead runs past the end of the slice, v2 either commits a decision +(if the slice covers the last WFT of the workflow) or returns `NeedMore`. This +moves "I'm not sure yet, fetch more" out of the look-ahead's blind spot and into +an explicit return value, which the paginator can act on cleanly. -Workflow code, after being woken up, might not do anything, and thus generate no new commands +### How v2 addresses each v1 downside -There may be no events for more nuanced reasons: +1. **Update-after-heartbeat NDE.** Fixed structurally: `has_pending_speculative_updates` + prevents the last WFT in a heartbeat chain from being collapsed. -1. The workflow might have been running a long-running local activity. In such cases, the workflow - must "workflow task heartbeat" in order to avoid timing out the workflow task. This means - completing the WFT with no commands while the LA is ongoing. -2. The workflow might have received an update, which does not come as an event in history, but - rather as a "protocol message" attached to the task. -3. Server can forcibly generate a new WFT with some obscure APIs +2. **Pagination correctness.** Fixed by the explicit `NeedMore` return value, which + gives the paginator a clean signal to fetch more events rather than relying on + ambiguous `Incomplete` handling. -Core does not consider such empty WFT sequences as worthy of waking lang (on replay - as a new -task, they always will), since nothing meaningful has happened. Thus, they are grouped together -as part of a "logical" WFT with the last WFT that had any real work in it. +3. **Inbound events absorbed into a heartbeat chain.** Fixed by + `is_wft_time_sensitive_event` and the `prevent_heartbeat` flag — every event + type that can wake user workflow code is enumerated and classified explicitly, + and seeing any one of them latches the chunker out of heartbeat-collapsing + mode for the remainder of the current LWFT. A WFT that carries any such event + can no longer be silently merged with a preceding heartbeat chain, so user + code always observes `workflow.now()` at the timestamp of the WFT it really + belongs to. -## Possible issues as of this writing (5/25) +## Rollout -The "new WFT force-issued by server" case would, currently, not cause a wakeup on replay for the -reasons discussed above. In some obscure edge cases (inspecting workflow clock) this could cause -NDE. +Per Core's SDK-flag rollout policy, this version of Core ships **support** for v2 but +leaves it off by default. Operators opt in via the `TEMPORAL_USE_WFT_CHUNKING_V2` +environment variable. When set to a truthy value (`"true"` or `"1"`, case-insensitive), +the worker, for each newly started workflow: -### Possible solutions +- records `WftChunkingV2` in `sdk_metadata.core_used_flags` on the first + `WorkflowTaskCompleted` event, and +- uses v2 chunking for that workflow from then on, regardless of whether the env + variable is set on subsequent worker versions (the flag in history is the source + of truth). -* Core can attach a flag on WFT completes in order to be explicit that that WFT may be skipped on - replay. IE: During WFT heartbeating for LAs. -* We could legislate that server should never send empty WFTs. Seemingly the only case of this - is - the [obscure api](https://github.com/temporalio/temporal/blob/d189737aa2ed1b07c221abb9fbdd28ecf68f0492/proto/internal/temporal/server/api/adminservice/v1/service.proto#L151) +Workflows that started without the flag — either before opt-in, or on a worker where +opt-in was not enabled — continue to use v1 indefinitely. This guarantees that a +worker can be rolled back to the previous version without stranding workflows that +already adopted v2, since the previous version also understood the flag (it just +didn't write it by default). diff --git a/crates/common/src/protos/mod.rs b/crates/common/src/protos/mod.rs index 6e39904f8..06e90ee82 100644 --- a/crates/common/src/protos/mod.rs +++ b/crates/common/src/protos/mod.rs @@ -2347,6 +2347,102 @@ pub mod temporal { } } + /// Returns true if safe replay of this event requires restoring the original + /// WFT start time. The WFT chunking algorithm v2 may collapse away a WFT + /// sequence only if that WFT contains no time-sensitive events. + /// + /// Any event that may result in user code being executed is time-sensitive. + /// Ignorable events and events that modify the workflow's metadata/internal + /// without triggering user code are not time-sensitive. + pub fn is_wft_time_sensitive_event(&self) -> bool { + // Never add a catch-all case to this match statement. + // We need to explicitly mark any new event types as time-sensitive or not. + match self.event_type() { + // WFExecutionStarted is time-sensitive because it triggers execution + // of the workflow's main function. WFExec terminal events are not. + EventType::WorkflowExecutionStarted => true, + EventType::WorkflowExecutionCompleted => false, + EventType::WorkflowExecutionFailed => false, + EventType::WorkflowExecutionTimedOut => false, + EventType::WorkflowExecutionContinuedAsNew => false, + EventType::WorkflowExecutionTerminated => false, + EventType::WorkflowExecutionCanceled => false, + + // WFT framing events are not time-sensitive, because + // they do not result in user code execution by themselves. + EventType::WorkflowTaskScheduled => false, + EventType::WorkflowTaskStarted => false, + EventType::WorkflowTaskCompleted => false, + EventType::WorkflowTaskTimedOut => false, + EventType::WorkflowTaskFailed => false, + + EventType::ActivityTaskScheduled => true, + EventType::ActivityTaskStarted => true, + EventType::ActivityTaskCompleted => true, + EventType::ActivityTaskFailed => true, + EventType::ActivityTaskTimedOut => true, + EventType::ActivityTaskCancelRequested => true, + EventType::ActivityTaskCanceled => true, + + EventType::TimerStarted => true, + EventType::TimerFired => true, + EventType::TimerCanceled => true, + + EventType::StartChildWorkflowExecutionInitiated => true, + EventType::StartChildWorkflowExecutionFailed => true, + EventType::ChildWorkflowExecutionStarted => true, + EventType::ChildWorkflowExecutionCompleted => true, + EventType::ChildWorkflowExecutionFailed => true, + EventType::ChildWorkflowExecutionCanceled => true, + EventType::ChildWorkflowExecutionTimedOut => true, + EventType::ChildWorkflowExecutionTerminated => true, + + EventType::WorkflowExecutionSignaled => true, + EventType::WorkflowExecutionCancelRequested => true, + + EventType::SignalExternalWorkflowExecutionInitiated => true, + EventType::SignalExternalWorkflowExecutionFailed => true, + EventType::ExternalWorkflowExecutionSignaled => true, + + EventType::RequestCancelExternalWorkflowExecutionInitiated => true, + EventType::RequestCancelExternalWorkflowExecutionFailed => true, + EventType::ExternalWorkflowExecutionCancelRequested => true, + + EventType::WorkflowExecutionUpdateAdmitted => true, + EventType::WorkflowExecutionUpdateAccepted => true, + EventType::WorkflowExecutionUpdateCompleted => true, + + // Update rejection doesn't trigger user code execution. + EventType::WorkflowExecutionUpdateRejected => false, + + EventType::NexusOperationScheduled => true, + EventType::NexusOperationStarted => true, + EventType::NexusOperationCompleted => true, + EventType::NexusOperationFailed => true, + EventType::NexusOperationCanceled => true, + EventType::NexusOperationTimedOut => true, + EventType::NexusOperationCancelRequested => true, + EventType::NexusOperationCancelRequestCompleted => true, + EventType::NexusOperationCancelRequestFailed => true, + + EventType::MarkerRecorded => false, + EventType::UpsertWorkflowSearchAttributes => false, + EventType::WorkflowPropertiesModifiedExternally => false, + EventType::ActivityPropertiesModifiedExternally => false, + EventType::WorkflowPropertiesModified => false, + EventType::WorkflowExecutionOptionsUpdated => false, + + // Ignored events obviously can't be time-sensitive. + EventType::WorkflowExecutionPaused => false, + EventType::WorkflowExecutionUnpaused => false, + EventType::WorkflowExecutionTimeSkippingTransitioned => false, + + EventType::Unspecified => { + panic!("Unexpected unspecified event type."); + } + } + } + pub fn is_ignorable(&self) -> bool { if !self.worker_may_ignore { return false; diff --git a/crates/sdk-core/src/core_tests/workflow_tasks.rs b/crates/sdk-core/src/core_tests/workflow_tasks.rs index a4d0aae6b..ade57d0cb 100644 --- a/crates/sdk-core/src/core_tests/workflow_tasks.rs +++ b/crates/sdk-core/src/core_tests/workflow_tasks.rs @@ -2457,15 +2457,17 @@ async fn core_internal_flags() { mock_worker_client(), ); mh.completion_mock_fn = Some(Box::new(move |c| { + let expected: HashSet = CoreInternalFlags::all_cumulative_default_enabled() + .chain(CoreInternalFlags::all_first_wft_only_default_enabled()) + .map(|f| f as u32) + .collect(); assert_eq!( c.sdk_metadata .core_used_flags .iter() .copied() .collect::>(), - CoreInternalFlags::all_except_too_high() - .map(|f| f as u32) - .collect() + expected ); Ok(Default::default()) })); diff --git a/crates/sdk-core/src/internal_flags.rs b/crates/sdk-core/src/internal_flags.rs index b03086f68..e46d18ba1 100644 --- a/crates/sdk-core/src/internal_flags.rs +++ b/crates/sdk-core/src/internal_flags.rs @@ -5,12 +5,30 @@ use itertools::Either; use std::{ collections::{BTreeSet, HashSet}, iter, + sync::OnceLock, }; use temporalio_common::protos::temporal::api::{ history::v1::WorkflowTaskCompletedEventAttributes, sdk::v1::WorkflowTaskCompletedMetadata, workflowservice::v1::get_system_info_response, }; +/// Environment variable that opts new workflow executions into WFT chunking v2 +/// (the `WftChunkingV2` flag). When set to a truthy value (`"true"` or `"1"`, +/// case-insensitive), the worker will set the flag on the first WFT completion +/// of newly started workflow executions. Existing workflows are unaffected +/// (they continue using whatever chunking version matches their history). +const USE_WFT_CHUNKING_V2_ENV_VAR: &str = "TEMPORAL_USE_WFT_CHUNKING_V2"; + +fn use_wft_chunking_v2_opt_in() -> bool { + static CACHED: OnceLock = OnceLock::new(); + *CACHED.get_or_init(|| { + std::env::var(USE_WFT_CHUNKING_V2_ENV_VAR) + .ok() + .map(|v| matches!(v.to_ascii_lowercase().as_str(), "true" | "1")) + .unwrap_or(false) + }) +} + /// This enumeration contains internal flags that may result in incompatible history changes with /// older workflows, or other breaking changes. /// @@ -25,9 +43,11 @@ pub enum CoreInternalFlags { /// In this flag additional checks were added to a number of state machines to ensure that /// the ID and type of activities, local activities, and child workflows match during replay. IdAndTypeDeterminismChecks = 1, + /// Introduced automatically upserting search attributes for each patched call, and /// nondeterminism checks for upserts. UpsertSearchAttributeOnPatch = 2, + /// Prior to this flag, we truncated commands received from lang at the /// first terminal (i.e. workflow-terminating) command. With this flag, we /// reorder commands such that all non-terminal commands come first, @@ -37,8 +57,64 @@ pub enum CoreInternalFlags { /// if in the sequence delivered by lang they came after a terminal command. /// See . MoveTerminalCommands = 3, + + /// Indicates that this workflow uses WFT chunking v2 — the second-generation + /// algorithm for grouping history events into "logical" workflow tasks. v2 is + /// more conservative than v1 about collapsing consecutive empty WFTs, correctly + /// handles updates that follow empty WFTs, and requires more look-ahead before + /// committing to a chunking decision when history is paginated. + /// + /// Only set on workflows started after this flag was introduced (first-WFT-only). + /// Existing workflows continue to use WFT chunking v1 (the legacy algorithm). + WftChunkingV2 = 4, + /// We received a value higher than this code can understand. - TooHigh = u32::MAX, + UnknownFlag = u32::MAX, +} + +impl CoreInternalFlags { + fn from_u32(v: u32) -> Self { + match v { + 1 => Self::IdAndTypeDeterminismChecks, + 2 => Self::UpsertSearchAttributeOnPatch, + 3 => Self::MoveTerminalCommands, + 4 => Self::WftChunkingV2, + _ => Self::UnknownFlag, + } + } + + /// Returns all cumulative flags that should be enabled by default on every WFT completion. + pub(crate) fn all_cumulative_default_enabled() -> impl Iterator { + [ + CoreInternalFlags::IdAndTypeDeterminismChecks, + CoreInternalFlags::UpsertSearchAttributeOnPatch, + CoreInternalFlags::MoveTerminalCommands, + ] + .iter() + .copied() + } + + /// Returns cumulative flags that should only be enabled on the first WFT of new workflows. + /// These are not written on subsequent WFTs to avoid changing behavior of existing workflows. + /// + /// `WftChunkingV2` is opt-in via the [`USE_WFT_CHUNKING_V2_ENV_VAR`] environment + /// variable. Per Core's SDK-flag rollout policy, we ship a version that supports + /// the flag but leaves it off by default so users can roll back without stranding + /// workflows that already adopted v2. + pub(crate) fn all_first_wft_only_default_enabled() -> impl Iterator { + if use_wft_chunking_v2_opt_in() { + Either::Left(iter::once(CoreInternalFlags::WftChunkingV2)) + } else { + Either::Right(iter::empty()) + } + } + + /// Returns all known flag variants (excluding the sentinel). + #[cfg(test)] + pub(crate) fn all_except_unknown() -> impl Iterator { + enum_iterator::all::() + .filter(|f| !matches!(f, CoreInternalFlags::UnknownFlag)) + } } #[allow(clippy::large_enum_variant)] @@ -137,15 +213,19 @@ impl InternalFlags { } } - /// Writes all known core flags to the set which should be recorded in the current WFT if not - /// already known. Must only be called if not replaying. - pub(crate) fn write_all_known(&mut self) { + /// Writes all core flags that should be enabled by default to the set which should be recorded + /// in the current WFT if not already known. Must only be called if not replaying. + pub(crate) fn write_all_cumulative_default_enabled(&mut self, is_first_wft: bool) { if let Self::Enabled { core_since_last_complete, .. } = self { - core_since_last_complete.extend(CoreInternalFlags::all_except_too_high()); + if is_first_wft { + core_since_last_complete + .extend(CoreInternalFlags::all_first_wft_only_default_enabled()); + } + core_since_last_complete.extend(CoreInternalFlags::all_cumulative_default_enabled()); } } @@ -214,22 +294,6 @@ impl InternalFlags { } } -impl CoreInternalFlags { - fn from_u32(v: u32) -> Self { - match v { - 1 => Self::IdAndTypeDeterminismChecks, - 2 => Self::UpsertSearchAttributeOnPatch, - 3 => Self::MoveTerminalCommands, - _ => Self::TooHigh, - } - } - - pub(crate) fn all_except_too_high() -> impl Iterator { - enum_iterator::all::() - .filter(|f| !matches!(f, CoreInternalFlags::TooHigh)) - } -} - #[cfg(test)] mod tests { use super::*; @@ -266,7 +330,7 @@ mod tests { #[test] fn all_have_u32_from_impl() { - let all_known = CoreInternalFlags::all_except_too_high(); + let all_known = CoreInternalFlags::all_except_unknown(); for flag in all_known { let as_u32 = flag as u32; assert_eq!(CoreInternalFlags::from_u32(as_u32), flag); diff --git a/crates/sdk-core/src/replay/history_builder.rs b/crates/sdk-core/src/replay/history_builder.rs index 0bcbf5453..b2e972cab 100644 --- a/crates/sdk-core/src/replay/history_builder.rs +++ b/crates/sdk-core/src/replay/history_builder.rs @@ -1,4 +1,4 @@ -use crate::{replay::HistoryInfo, test_help::CoreInternalFlags}; +use crate::{internal_flags::CoreInternalFlags, replay::HistoryInfo}; use anyhow::bail; use prost_types::Timestamp; use std::{ @@ -48,6 +48,10 @@ pub struct TestHistoryBuilder { final_workflow_task_started_event_id: i64, previous_task_completed_id: i64, original_run_id: String, + /// When true, the builder auto-sets `WftChunkingV2` on the first WFTCompleted. + use_wft_chunking_v2: bool, + /// Tracks whether we've already emitted the first WFTCompleted + has_seen_wft_completed: bool, } impl TestHistoryBuilder { @@ -69,6 +73,8 @@ impl TestHistoryBuilder { original_run_id: extract_original_run_id_from_events(&events) .expect("Run id must be discoverable") .to_string(), + use_wft_chunking_v2: false, + has_seen_wft_completed: false, events, } } @@ -89,6 +95,15 @@ impl TestHistoryBuilder { self.add(attribs) } + /// Enable WFT chunking v2 for this builder. When enabled, the first + /// WFTCompleted event gets the cumulative `WftChunkingV2` flag. + pub fn set_use_wft_chunking_v2(&mut self) { + if self.has_seen_wft_completed { + panic!("WFT chunking v2 can only be enabled before the first WFTCompleted"); + } + self.use_wft_chunking_v2 = true; + } + /// Adds the following events: /// ```text /// EVENT_TYPE_WORKFLOW_TASK_SCHEDULED @@ -127,6 +142,10 @@ impl TestHistoryBuilder { ..Default::default() }); self.previous_task_completed_id = id; + if self.use_wft_chunking_v2 && !self.has_seen_wft_completed { + self.set_flags_on_wft_completed(id, &[CoreInternalFlags::WftChunkingV2]); + } + self.has_seen_wft_completed = true; } /// Add a workflow task timed out event. @@ -600,20 +619,25 @@ impl TestHistoryBuilder { /// Sets internal patches which should appear in the first WFT complete event pub fn set_flags_first_wft(&mut self, core: &[CoreInternalFlags], lang: &[u32]) { - Self::set_flags( - self.events.iter_mut(), - &core.iter().map(|f| *f as u32).collect::>(), - lang, - ) + Self::set_flags(self.events.iter_mut(), core, lang) } /// Sets internal patches which should appear in the most recent complete event pub fn set_flags_last_wft(&mut self, core: &[CoreInternalFlags], lang: &[u32]) { - Self::set_flags( - self.events.iter_mut().rev(), - &core.iter().map(|f| *f as u32).collect::>(), - lang, - ) + Self::set_flags(self.events.iter_mut().rev(), core, lang) + } + + /// Sets core flags on the WFTCompleted event with the given event ID. + pub fn set_flags_on_wft_completed(&mut self, event_id: i64, core: &[CoreInternalFlags]) { + if let Some(event) = self.events.iter_mut().find(|e| e.event_id == event_id) + && let Some(Attributes::WorkflowTaskCompletedEventAttributes(ref mut a)) = + event.attributes + { + let sdk_dat = a.sdk_metadata.get_or_insert_with(Default::default); + sdk_dat + .core_used_flags + .extend(core.iter().map(|f| *f as u32)); + } } /// Get the event ID of the most recently added event @@ -628,7 +652,7 @@ impl TestHistoryBuilder { fn set_flags<'a>( mut events: impl Iterator, - core: &[u32], + core: &[CoreInternalFlags], lang: &[u32], ) { if let Some(first_attrs) = events.find_map(|e| { @@ -642,7 +666,7 @@ impl TestHistoryBuilder { let sdk_dat = first_attrs .sdk_metadata .get_or_insert_with(Default::default); - sdk_dat.core_used_flags = core.to_vec(); + sdk_dat.core_used_flags = core.iter().map(|f| *f as u32).collect(); sdk_dat.lang_used_flags = lang.to_vec(); } } diff --git a/crates/sdk-core/src/worker/workflow/history_update.rs b/crates/sdk-core/src/worker/workflow/history_update.rs index 856589946..8663a60d1 100644 --- a/crates/sdk-core/src/worker/workflow/history_update.rs +++ b/crates/sdk-core/src/worker/workflow/history_update.rs @@ -1,4 +1,6 @@ use crate::{ + abstractions::dbg_panic, + internal_flags::CoreInternalFlags, protosext::ValidPollWFTQResponse, worker::{ client::WorkerClient, @@ -46,6 +48,15 @@ pub(crate) struct HistoryUpdate { /// additional updates should be made. has_last_wft: bool, wft_count: usize, + /// True if the speculative WFT (i.e. the current, non-replayed task from the + /// server) carries pending update messages. When set, the heartbeat-collapsing + /// heuristic will avoid merging the last WFT in history into a preceding + /// heartbeat chain, because the update needs its own activation. + has_pending_speculative_updates: bool, + /// True if any WFTCompleted event in this update carries the + /// `WftChunkingV2` flag, indicating WFT chunking v2 should be used + /// instead of v1 (the legacy algorithm). + has_wft_chunking_v2: bool, } impl Debug for HistoryUpdate { @@ -94,6 +105,13 @@ pub(crate) struct HistoryPaginator { /// These are events that should be returned once pagination has finished. This only happens /// during cache misses, where we got a partial task but need to fetch history from the start. final_events: Vec, + /// True if the speculative WFT associated with this paginator carries pending update + /// messages. Passed through to `find_end_index_of_next_wft_seq` so the heartbeat + /// heuristic avoids collapsing the last WFT when an update needs its own activation. + has_pending_speculative_updates: bool, + /// True if the workflow uses WFT chunking v2. Detected from the initial + /// events and propagated to all subsequent `from_events` calls. + has_wft_chunking_v2: bool, } #[derive(Clone, Debug)] @@ -130,6 +148,7 @@ impl HistoryPaginator { } else { wft.next_page_token.into() }; + let has_pending_speculative_updates = !wft.messages.is_empty(); let mut paginator = HistoryPaginator::new( wft.history, wft.previous_started_event_id, @@ -138,6 +157,7 @@ impl HistoryPaginator { wft.workflow_execution.run_id.clone(), npt, client, + has_pending_speculative_updates, ); if empty_hist && wft.legacy_query.is_none() && wft.query_requests.is_empty() { return Err(EMPTY_TASK_ERR.clone()); @@ -148,6 +168,8 @@ impl HistoryPaginator { wft.previous_started_event_id, wft.started_event_id, true, + has_pending_speculative_updates, + paginator.has_wft_chunking_v2, ) .0 } else { @@ -183,6 +205,8 @@ impl HistoryPaginator { event_queue: Default::default(), next_page_token: NextPageToken::FetchFromStart, final_events: req.original_wft.work.update.events, + has_pending_speculative_updates: !req.original_wft.work.messages.is_empty(), + has_wft_chunking_v2: req.original_wft.paginator.has_wft_chunking_v2, }; let first_update = paginator.extract_next_update().await?; req.original_wft.work.update = first_update; @@ -190,6 +214,7 @@ impl HistoryPaginator { Ok(req.original_wft) } + #[allow(clippy::too_many_arguments)] fn new( initial_history: History, previous_wft_started_id: i64, @@ -198,7 +223,9 @@ impl HistoryPaginator { run_id: String, next_page_token: impl Into, client: Arc, + has_pending_speculative_updates: bool, ) -> Self { + let has_wft_chunking_v2 = events_have_wft_chunking_v2(&initial_history.events); let next_page_token = next_page_token.into(); let (event_queue, final_events) = if matches!(next_page_token, NextPageToken::FetchFromStart) { @@ -216,6 +243,8 @@ impl HistoryPaginator { previous_wft_started_id, wft_started_event_id, id_of_last_event_in_last_extracted_update: None, + has_pending_speculative_updates, + has_wft_chunking_v2, } } @@ -261,6 +290,8 @@ impl HistoryPaginator { self.previous_wft_started_id, self.wft_started_event_id, true, + self.has_pending_speculative_updates, + self.has_wft_chunking_v2, ) .0); } @@ -287,6 +318,8 @@ impl HistoryPaginator { self.previous_wft_started_id, self.wft_started_event_id, no_more, + self.has_pending_speculative_updates, + self.has_wft_chunking_v2, ); // If there are potentially more events and we haven't extracted two WFTs yet, keep @@ -440,6 +473,8 @@ impl HistoryUpdate { wft_started_id: -1, has_last_wft: false, wft_count: 0, + has_pending_speculative_updates: false, + has_wft_chunking_v2: false, } } @@ -474,17 +509,44 @@ impl HistoryUpdate { previous_wft_started_id: i64, wft_started_id: i64, has_last_wft: bool, + has_pending_speculative_updates: bool, + has_wft_chunking_v2: bool, ) -> (Self, Vec) where ::IntoIter: Send + 'static, { - let mut all_events: Vec<_> = events.into_iter().collect(); + let all_events: Vec<_> = events.into_iter().collect(); + + Self::from_events_apply( + all_events, + previous_wft_started_id, + wft_started_id, + has_last_wft, + has_pending_speculative_updates, + has_wft_chunking_v2, + ) + } + + fn from_events_apply( + mut all_events: Vec, + previous_wft_started_id: i64, + wft_started_id: i64, + has_last_wft: bool, + has_pending_speculative_updates: bool, + has_wft_chunking_v2: bool, + ) -> (Self, Vec) { let mut last_end = find_end_index_of_next_wft_seq( all_events.as_slice(), previous_wft_started_id, has_last_wft, + has_pending_speculative_updates, + has_wft_chunking_v2, ); - if matches!(last_end, NextWFTSeqEndIndex::Incomplete(_)) { + + if matches!( + last_end, + NextWFTSeqEndIndex::NeedMore | NextWFTSeqEndIndex::Tail + ) { return if has_last_wft { ( Self { @@ -493,6 +555,8 @@ impl HistoryUpdate { wft_started_id, has_last_wft, wft_count: 1, + has_pending_speculative_updates, + has_wft_chunking_v2, }, vec![], ) @@ -504,34 +568,39 @@ impl HistoryUpdate { wft_started_id, has_last_wft, wft_count: 0, + has_pending_speculative_updates, + has_wft_chunking_v2, }, all_events, ) }; } + let mut wft_count = 0; while let NextWFTSeqEndIndex::Complete(next_end_ix) = last_end { wft_count += 1; let next_end_eid = all_events[next_end_ix].event_id; - // To save skipping all events at the front of this slice, only pass the relevant - // portion, but that means the returned index must be adjusted, hence the addition. let next_end = find_end_index_of_next_wft_seq( &all_events[next_end_ix..], next_end_eid, has_last_wft, + has_pending_speculative_updates, + has_wft_chunking_v2, ) .add(next_end_ix); - if matches!(next_end, NextWFTSeqEndIndex::Incomplete(_)) { + if matches!( + next_end, + NextWFTSeqEndIndex::NeedMore | NextWFTSeqEndIndex::Tail + ) { break; } last_end = next_end; } - // If we have the last WFT, there's no point in there being "remaining" events, because - // they must be considered part of the last sequence + let remaining_events = if all_events.is_empty() || has_last_wft { vec![] } else { - all_events.split_off(last_end.index() + 1) + all_events.split_off(last_end.end_index_in_slice(all_events.len()) + 1) }; ( @@ -541,6 +610,8 @@ impl HistoryUpdate { wft_started_id, has_last_wft, wft_count, + has_pending_speculative_updates, + has_wft_chunking_v2, }, remaining_events, ) @@ -554,6 +625,9 @@ impl HistoryUpdate { events: I, previous_wft_started_id: i64, wft_started_id: i64, + has_last_wft: bool, + has_pending_speculative_updates: bool, + has_wft_chunking_v2: bool, ) -> Self where ::IntoIter: Send + 'static, @@ -562,8 +636,10 @@ impl HistoryUpdate { events: events.into_iter().collect(), previous_wft_started_id, wft_started_id, - has_last_wft: true, + has_last_wft, wft_count: 0, + has_pending_speculative_updates, + has_wft_chunking_v2, } } @@ -576,26 +652,34 @@ impl HistoryUpdate { /// vec, indicating more pages will need to be fetched. pub(crate) fn take_next_wft_sequence(&mut self, from_wft_started_id: i64) -> NextWFT { // First, drop any events from the queue which are earlier than the passed-in id. - if let Some(ix_first_relevant) = self.starting_index_after_skipping(from_wft_started_id) { + if let Some(ix_first_relevant) = + starting_index_after_skipping(&self.events, from_wft_started_id) + { self.events.drain(0..ix_first_relevant); } - let next_wft_ix = - find_end_index_of_next_wft_seq(&self.events, from_wft_started_id, self.has_last_wft); - match next_wft_ix { - NextWFTSeqEndIndex::Incomplete(siz) => { - if self.has_last_wft { - if siz == 0 { - NextWFT::ReplayOver - } else { - self.build_next_wft(siz) - } - } else { - if siz != 0 { - panic!( - "HistoryUpdate was created with an incomplete WFT. This is an SDK bug." - ); - } + + let chunk = find_end_index_of_next_wft_seq( + &self.events, + from_wft_started_id, + self.has_last_wft, + self.has_pending_speculative_updates, + self.has_wft_chunking_v2, + ); + + match chunk { + NextWFTSeqEndIndex::NeedMore => NextWFT::NeedFetch, + NextWFTSeqEndIndex::Tail => { + if !self.has_last_wft { + // We don't have the full history yet; what looks like tail events may + // just be the end of the current page. Fetch more. NextWFT::NeedFetch + } else if self.events.is_empty() { + NextWFT::ReplayOver + } else { + // Remaining events are trailing matter (e.g. terminal events, WFTCompleted + // + commands after the last WFTStarted). Include them all so the caller + // can process them (e.g. to set have_seen_terminal_event). + self.build_next_wft(self.events.len() - 1) } } NextWFTSeqEndIndex::Complete(next_wft_ix) => self.build_next_wft(next_wft_ix), @@ -614,30 +698,41 @@ impl HistoryUpdate { /// not been called first. May also return an empty iterator or incomplete sequence if we are at /// the end of history. pub(crate) fn peek_next_wft_sequence(&self, from_wft_started_id: i64) -> &[HistoryEvent] { - let ix_first_relevant = self - .starting_index_after_skipping(from_wft_started_id) - .unwrap_or_default(); + let ix_first_relevant = + starting_index_after_skipping(&self.events, from_wft_started_id).unwrap_or_default(); + let relevant_events = &self.events[ix_first_relevant..]; if relevant_events.is_empty() { return relevant_events; } - let ix_end = - find_end_index_of_next_wft_seq(relevant_events, from_wft_started_id, self.has_last_wft) - .index(); + + let ix_end = find_end_index_of_next_wft_seq( + relevant_events, + from_wft_started_id, + self.has_last_wft, + self.has_pending_speculative_updates, + self.has_wft_chunking_v2, + ) + .end_index_in_slice(relevant_events.len()); + &relevant_events[0..=ix_end] } /// Returns true if this update has the next needed WFT sequence, false if events will need to /// be fetched in order to create a complete update with the entire next WFT sequence. pub(crate) fn can_take_next_wft_sequence(&self, from_wft_started_id: i64) -> bool { - let next_wft_ix = - find_end_index_of_next_wft_seq(&self.events, from_wft_started_id, self.has_last_wft); - if let NextWFTSeqEndIndex::Incomplete(_) = next_wft_ix - && !self.has_last_wft - { - return false; + let next_wft_ix = find_end_index_of_next_wft_seq( + &self.events, + from_wft_started_id, + self.has_last_wft, + self.has_pending_speculative_updates, + self.has_wft_chunking_v2, + ); + match next_wft_ix { + NextWFTSeqEndIndex::NeedMore => false, + NextWFTSeqEndIndex::Tail => self.has_last_wft, + NextWFTSeqEndIndex::Complete(_) => true, } - true } /// Returns the next WFT completed event attributes, if any, starting at (inclusive) the @@ -654,56 +749,74 @@ impl HistoryUpdate { _ => None, }) } - - fn starting_index_after_skipping(&self, from_wft_started_id: i64) -> Option { - self.events - .iter() - .find_position(|e| e.event_id > from_wft_started_id) - .map(|(ix, _)| ix) - } } -#[derive(Debug, Copy, Clone)] -enum NextWFTSeqEndIndex { - /// The next WFT sequence is completely contained within the passed-in iterator - Complete(usize), - /// The next WFT sequence is not found within the passed-in iterator, and the contained - /// value is the last index of the iterator. - Incomplete(usize), +fn starting_index_after_skipping( + events: &[HistoryEvent], + from_wft_started_id: i64, +) -> Option { + events + .iter() + .find_position(|e| e.event_id > from_wft_started_id) + .map(|(ix, _)| ix) } -impl NextWFTSeqEndIndex { - fn index(self) -> usize { - match self { - NextWFTSeqEndIndex::Complete(ix) | NextWFTSeqEndIndex::Incomplete(ix) => ix, - } - } - fn add(self, val: usize) -> Self { - match self { - NextWFTSeqEndIndex::Complete(ix) => NextWFTSeqEndIndex::Complete(ix + val), - NextWFTSeqEndIndex::Incomplete(ix) => NextWFTSeqEndIndex::Incomplete(ix + val), + +/// Returns true if any WFTCompleted event in the given events carries the +/// `WftChunkingV2` flag. +fn events_have_wft_chunking_v2(events: &[HistoryEvent]) -> bool { + let flag_value = CoreInternalFlags::WftChunkingV2 as u32; + events.iter().any(|e| { + if let Some(Attributes::WorkflowTaskCompletedEventAttributes(ref attr)) = e.attributes + && let Some(ref metadata) = attr.sdk_metadata + { + metadata.core_used_flags.contains(&flag_value) + } else { + false } - } + }) } -/// Discovers the index of the last event in next WFT sequence within the passed-in slice -/// For more on workflow task chunking, see arch_docs/workflow_task_chunking.md +/// Dispatches to v1 (legacy) or v2 chunking based on the `has_wft_chunking_v2` flag. fn find_end_index_of_next_wft_seq( events: &[HistoryEvent], from_event_id: i64, has_last_wft: bool, + has_pending_speculative_updates: bool, + has_wft_chunking_v2: bool, +) -> NextWFTSeqEndIndex { + if has_wft_chunking_v2 { + find_end_index_of_next_wft_seq_v2( + events, + from_event_id, + has_last_wft, + has_pending_speculative_updates, + ) + } else { + find_end_index_of_next_wft_seq_v1(events, from_event_id, has_last_wft) + } +} + +/// Legacy chunking algorithm. Used for workflows that were started before the +/// `WftChunkingV2` flag was introduced. +fn find_end_index_of_next_wft_seq_v1( + events: &[HistoryEvent], + from_event_id: i64, + has_last_wft: bool, ) -> NextWFTSeqEndIndex { if events.is_empty() { - return NextWFTSeqEndIndex::Incomplete(0); + return if has_last_wft { + NextWFTSeqEndIndex::Tail + } else { + NextWFTSeqEndIndex::NeedMore + }; } - let mut last_index = 0; + let mut last_index; let mut saw_command_or_started = false; let mut saw_command = false; let mut wft_started_event_id_to_index = vec![]; for (ix, e) in events.iter().enumerate() { last_index = ix; - // It's possible to have gotten a new history update without eviction (ex: unhandled - // command on completion), where we may need to skip events we already handled. if e.event_id <= from_event_id { continue; } @@ -723,8 +836,6 @@ fn find_end_index_of_next_wft_seq( wft_started_event_id_to_index.push((e.event_id, ix)); if let Some(next_event) = events.get(ix + 1) { let next_event_type = next_event.event_type(); - // If the next event is WFT timeout or fail, or abrupt WF execution end, that - // doesn't conclude a WFT sequence. if matches!( next_event_type, EventType::WorkflowTaskFailed @@ -733,8 +844,6 @@ fn find_end_index_of_next_wft_seq( | EventType::WorkflowExecutionTerminated | EventType::WorkflowExecutionCanceled ) { - // Since we're skipping this WFT, we don't want to include it in the vec used - // for update accepted sequencing lookups. wft_started_event_id_to_index.pop(); continue; } else if next_event_type == EventType::WorkflowTaskCompleted { @@ -742,31 +851,12 @@ fn find_end_index_of_next_wft_seq( if !saw_command && next_next_event.event_type() == EventType::WorkflowTaskScheduled { - // If we've never seen an interesting event and the next two events are - // a completion followed immediately again by scheduled, then this is a - // WFT heartbeat and also doesn't conclude the sequence. continue; } else { - // If we see an update accepted command after WFT completed, we want to - // conclude the WFT sequence where that update should have been - // processed. We don't need to check for any other command types, - // because the only thing that can run before an update validator is a - // signal handler - but if a signal handler ran then there would have - // been a previous signal event, and we would've already concluded the - // previous WFT sequence. - if let Some( - Attributes::WorkflowExecutionUpdateAcceptedEventAttributes( - ref attr, - ), - ) = next_next_event.attributes - { - // Find index of closest unskipped WFT started before sequencing id. - // The fact that the WFT wasn't skipped is important. If it was, we - // need to avoid stopping at that point even though that's where the - // update was sequenced. If we did, we'll fail to actually include - // the update accepted event and therefore fail to generate the - // request to run the update handler on replay. - if let Some(ret_ix) = wft_started_event_id_to_index + if let Some(Attributes::WorkflowExecutionUpdateAcceptedEventAttributes( + ref attr, + )) = next_next_event.attributes + && let Some(ret_ix) = wft_started_event_id_to_index .iter() .rev() .find_map(|(eid, ix)| { @@ -775,21 +865,16 @@ fn find_end_index_of_next_wft_seq( } None }) - { - return NextWFTSeqEndIndex::Complete(ret_ix); - } + { + return NextWFTSeqEndIndex::Complete(ret_ix); } return NextWFTSeqEndIndex::Complete(ix); } } else if !has_last_wft && !saw_command_or_started { - // Don't have enough events to look ahead of the WorkflowTaskCompleted. Need - // to fetch more. continue; } } } else if !has_last_wft && !saw_command_or_started { - // Don't have enough events to look ahead of the WorkflowTaskStarted. Need to fetch - // more. continue; } if saw_command_or_started { @@ -798,7 +883,360 @@ fn find_end_index_of_next_wft_seq( } } - NextWFTSeqEndIndex::Incomplete(last_index) + // Legacy: Incomplete maps to NeedMore when !has_last_wft; the caller handles + // has_last_wft by treating all remaining events as a single WFT. + if has_last_wft { + NextWFTSeqEndIndex::Tail + } else { + NextWFTSeqEndIndex::NeedMore + } +} + +#[derive(Debug, Copy, Clone)] +enum NextWFTSeqEndIndex { + /// The next Logical WFT sequence is completely contained within the passed-in slice. + /// The index corresponds to the index of the last `WorkflowTaskStarted` event. + Complete(usize), + + /// Not enough events in the slice to positively determine the next WFT boundary. + /// The caller should fetch more events before attempting to chunk again. + NeedMore, + + /// No more WFT boundaries exist in this slice. Any remaining events are trailing matter + /// after the last WFT (e.g. terminal `WorkflowExecution*` events, `WorkflowTaskCompleted` + /// with its commands). These events still need to be processed by the caller. + Tail, +} + +impl NextWFTSeqEndIndex { + /// Last event index within a slice of length `slice_len` that this result refers to. + fn end_index_in_slice(self, slice_len: usize) -> usize { + match self { + NextWFTSeqEndIndex::Complete(ix) => ix, + NextWFTSeqEndIndex::NeedMore | NextWFTSeqEndIndex::Tail => slice_len.saturating_sub(1), + } + } + + fn add(self, val: usize) -> Self { + match self { + NextWFTSeqEndIndex::Complete(ix) => NextWFTSeqEndIndex::Complete(ix + val), + NextWFTSeqEndIndex::NeedMore => NextWFTSeqEndIndex::NeedMore, + NextWFTSeqEndIndex::Tail => NextWFTSeqEndIndex::Tail, + } + } +} + +/// Return the event _index_ (not ID!) of the last event of the logical workflow task starting +/// at event ID `from_event_id`. The logical WFT is guaranteed to be "complete", meaning that all +/// events required to process that logical WFT are contained in the provided slice. +/// +/// Returns one of three variants: +/// +/// - `Complete(ix)` — the WFT boundary is at the `WorkflowTaskStarted` event at index `ix`. +/// All events required to process the LWFT are present in the slice. +/// - `NeedMore` — not enough events to determine the boundary; the caller should fetch more +/// history pages before retrying. This can happen when the slice ends at a point where +/// look-ahead is required (e.g. `WFTStarted → WFTCompleted → EOS` with `!has_last_wft`). +/// - `Tail` — no more WFT boundaries exist in the remaining events. Any events still in the +/// slice are trailing matter after the last WFT (e.g. terminal `WorkflowExecution*` events, +/// `WorkflowTaskCompleted` + commands). The caller must still process these events (e.g. to +/// set `have_seen_terminal_event`). +/// +/// When `has_last_wft` is true, the slice is the full history for this update: a trailing +/// `WorkflowTaskStarted` with no following event (open task) **is** a `Complete` boundary at +/// that started event—there is no further history to page in that could change the decision. +/// +/// The index returned by `Complete(x)` always corresponds to the event index of a +/// `WorkflowTaskStarted` event. +/// +/// A logical WFT may span multiple real WFTs in history, in the following cases: +/// +/// - Empty Workflow Tasks sequences, like those resulting from WFT heartbeats; +/// - WFT attempts that failed or timed out. +/// +/// In both cases, the ignored wft is swallowed by the _following_ workflow task, +/// resulting in a single logical workflow task. +fn find_end_index_of_next_wft_seq_v2( + events: &[HistoryEvent], + from_event_id: i64, + has_last_wft: bool, + has_pending_speculative_updates: bool, +) -> NextWFTSeqEndIndex { + use EventType::*; + use NextWFTSeqEndIndex::*; + + if events.is_empty() { + return if has_last_wft { Tail } else { NeedMore }; + } + + // It's possible to have gotten a new history update without eviction (ex: unhandled + // command on completion), where we may need to skip events we already handled. + let mut ix = starting_index_after_skipping(events, from_event_id).unwrap_or(events.len()); + + // Set to true if we've seen any event that prevents extending the present LWFT past the next `WFTStarted` event. + let mut prevent_heartbeat = false; + + // Skip the initial `WFExecutionStarted` event, if present. + // + // Consume `WFExecutionStarted?` + if let Some(WorkflowExecutionStarted) = events.get(ix).map(|e| e.event_type()) { + ix += 1; + prevent_heartbeat = true; + } + + // We're at the begining of a LWFT. Any command here results from the _previous_ WFT, + // and therefore shouldn't affect chunking of the present LWFT, besides + // + // Consume `(WFTCompleted -> Command*)?` + if let Some(WorkflowTaskCompleted) = events.get(ix).map(|e| e.event_type()) { + ix += 1; // WFTCompleted + + while ix < events.len() { + if !events[ix].is_command_event() { + break; + } + + prevent_heartbeat = true; + ix += 1; // Command + } + } + + // From this point on, there should be: + // `InboundEvent* -> WFTScheduled -> WFTStarted -> WFTCompleted -> Command*` + while ix < events.len() { + // let ahead = &events[ix + 1..events.len().min(ix + 6)]; + // let ahead: Vec<_> = ahead.iter().map(|e| e.event_type()).collect(); + + let e0 = &events[ix]; + let e1 = events.get(ix + 1); + let e2 = events.get(ix + 2); + let e3 = events.get(ix + 3); + let e4 = events.get(ix + 4); + let e5 = events.get(ix + 5); + + match e0.event_type() { + // WFTStarted -> ... + EventType::WorkflowTaskStarted => { + match e1.map(|e| e.event_type()) { + // WFTStarted -> EOH + None if has_last_wft => { + // History ends on this WFTStarted. + // Conclusion is safe and replay is over after this LWFT. + return NextWFTSeqEndIndex::Complete(ix); + } + + // WFTStarted -> (unknown) + None /* !has_last_wft */ => { + // Can't conclude yet: unknown could be a WFTCompleted, WFTFailed, or WFTTimedOut event. + return NextWFTSeqEndIndex::NeedMore; + } + + // WFTStarted -> WFTCompleted -> ... + Some(EventType::WorkflowTaskCompleted) => { + match e2.map(|e| e.event_type()) { + // WFTStarted -> WFTCompleted -> EOH + None if has_last_wft => { + // There's no more event to look ahead. + // It is safe to conclude the LWFT at the current WFTStarted event. + return NextWFTSeqEndIndex::Complete(ix); + } + + // WFTStarted -> WFTCompleted -> (unknown) + None /* !has_last_wft */ => { + // Can't conclcude yet, as unknown could be a WFTScheduled or UpdateAccepted event. + // Note that we are not making an exception for prevent_heartbeat=true here, + // because we'd still need to if there's an UpdateAccepted event ahead. + return NextWFTSeqEndIndex::NeedMore; + } + + // WFTStarted -> WFTCompleted -> WFTScheduled -> ... + Some(EventType::WorkflowTaskScheduled) => { + if prevent_heartbeat { + // For some reason (e.g. we saw a command preceding this WFTStarted), we know + // that we can't collapse the current WFT with the one ahead, and we've seen + // one event that can't belong to the current WFT (the WFTScheduled), so it + // is safe to conclude a Complete LWFT at the current WFTStarted event. + return NextWFTSeqEndIndex::Complete(ix); + } + + match e3.map(|e| e.event_type()) { + // WFTStarted -> WFTCompleted -> WFTScheduled -> EOH + None if has_last_wft => { + // History ends on this WFTScheduled. That's somewhat unexpected, + // but still means there can't be nothing affecting decision on the + // present LWFT, so it is safe to conclude a Complete LWFT + // at the current WFTStarted event. + return NextWFTSeqEndIndex::Complete(ix); + } + + // WFTStarted -> WFTCompleted -> WFTScheduled -> (unknown) + None /* !has_last_wft */ => { + // There might be more events ahead that would affect the conclusion, + // e.g. a `WFTScheduled -> WFTStarted` sequence that would make this + // a heartbeat. Delay the conclusion until we see more events. + return NextWFTSeqEndIndex::NeedMore; + } + + // WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> ... + Some(EventType::WorkflowTaskStarted) => { + match e4.map(|e| e.event_type()) { + // WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> EOH + None if has_last_wft => { + if has_pending_speculative_updates { + // There's a pending speculative update, which necessarily affects + // the last WFTStarted event, which is the one we're looking ahead + // to. We therefore can't collapse the current WFT (WFTStarted at ix) + // with the one ahead (WFTStarted at ix + 3). + return NextWFTSeqEndIndex::Complete(ix); + } else { + // We got a full noop WFT sequence. Collapse the current WFT + // (WFTStarted at ix) with the one ahead (WFTStarted at ix + 3), + // and return that as this is the final event in history. + return NextWFTSeqEndIndex::Complete(ix + 3); + } + } + + // WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> (unknown) + None /* !has_last_wft */ => { + // Can't conclude yet: unknown could be a WFTCompleted, WFTFailed, or WFTTimedOut. + return NextWFTSeqEndIndex::NeedMore; + } + + // WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> ... + Some(EventType::WorkflowTaskCompleted) => { + match e5.map(|e| e.event_type()) { + // WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> EOH + None if has_last_wft => { + assert!(!has_pending_speculative_updates); + + // We got a full noop WFT sequence. Collapse the current WFT + // (WFTStarted at ix) with the one ahead (WFTStarted at ix + 3), + // and return that as this is the final event in history. + return NextWFTSeqEndIndex::Complete(ix + 3); + } + + // WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> (unknown) + None /* !has_last_wft */ => { + // Can't conclude yet, as unknown could be a WFTStarted, WFTFailed, or WFTTimedOut event. + return NextWFTSeqEndIndex::NeedMore; + } + + // WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> UpdateAccepted -> ... + Some(EventType::WorkflowExecutionUpdateAccepted) => { + // Found an UpdateAccepted event, which must affect the WFTStarted at ix + 3. + // That means we can't collapse the current WFT (WFTStarted at ix) with the + // one ahead (WFTStarted at ix + 3). Conclude the current WFTStarted event. + return NextWFTSeqEndIndex::Complete(ix); + } + + // WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> + Some(_) => { + // We found a full noop WFT sequence (ix..ix+3), and we've looked + // ahead far enough to be sure that we won't need to walk back on + // previous WFTStarted events. Jump ahead to the next WFTStarted + // event, and continue the loop. + ix += 3; // WFTStarted + WFTCompleted + WFTScheduled + continue; + } + } + + } + + // WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> + Some(_) => { + return NextWFTSeqEndIndex::Complete(ix); + } + } + } + + // WFTStarted -> WFTCompleted -> WFTScheduled -> + Some(_) => { + return NextWFTSeqEndIndex::Complete(ix); + } + } + } + + // WFTStarted -> WFTCompleted -> + Some(_) => { + return NextWFTSeqEndIndex::Complete(ix); + } + } + } + + // WFTStarted -> WFT(Failed|TimedOut) -> ... + Some(EventType::WorkflowTaskFailed) | Some(EventType::WorkflowTaskTimedOut) => { + // Failed WFT. Skip over it. + ix += 2; // Started + Failed/TimedOut + continue; + } + + // Workflow execution terminates after WFTStarted without WFTCompleted. + // Complete points at the WFTStarted; the terminal event is left as + // trailing matter (will be returned as `Tail` on the next call). + // `WFTStarted -> WFExecution(Terminated|TimedOut|...)` + Some(_) if e1.is_some_and(|e| e.is_final_wf_execution_event()) => { + return NextWFTSeqEndIndex::Complete(ix); + } + + // `WFTStarted -> ` + Some(_) => { + panic!( + "Unexpected event type: {:?} after WorkflowTaskStarted event, {:?}", + e0.event_type(), + events + ); + } + } + } + + // Sudden workflow execution termination. That's the end of history, + // but we still don't have a "complete" LWFT. The terminal event is trailing + // matter that the caller must still process (to set have_seen_terminal_event). + // `WFExecution(Failed|TimedOut|Canceled|Terminated|TimedOut|CAN)` + _ if e0.is_final_wf_execution_event() => { + if e1.is_some() || !has_last_wft || has_pending_speculative_updates { + panic!( + "{:?} event at index {ix} is not the last event in history", + e0.event_type() + ); + } + return Tail; + } + + // Just skip over any other event type. + _ => { + if e0.is_command_event() { + // This case is theoretically impossible, unless either the workflow + // history is malformed or we hit a bug in this chunking logic. + dbg_panic!( + "Command event at index {ix} is not expected after seeing a non-command event" + ); + } + + if e0.is_wft_time_sensitive_event() { + prevent_heartbeat = true; + } + + ix += 1; + continue; + } + } + + #[allow(unreachable_code)] + { + panic!("All match arms above must diverge (return/continue/panic)"); + } + } + + // Fell off the main loop without finding a WFTStarted. + if has_last_wft { + // This is the last WFT in history. Any events consumed by the preamble (WFTCompleted + commands) + // or remaining inbound events are trailing matter. + NextWFTSeqEndIndex::Tail + } else { + // There might be a WFTStarted event ahead, but we'll need to fetch more events to find it. + NextWFTSeqEndIndex::NeedMore + } } #[cfg(test)] @@ -811,16 +1249,25 @@ mod tests { }; use futures_util::TryStreamExt; use temporalio_common::protos::temporal::api::{ - common::v1::WorkflowExecution, enums::v1::WorkflowTaskFailedCause, - workflowservice::v1::GetWorkflowExecutionHistoryResponse, + common::v1::WorkflowExecution, + enums::v1::WorkflowTaskFailedCause, + history::v1::{History, history_event::Attributes}, + workflowservice::v1::{ + GetWorkflowExecutionHistoryResponse, update_activity_options_request, + }, }; impl From for HistoryUpdate { fn from(v: HistoryInfo) -> Self { + let events = v.events().to_vec(); + let has_chunking_v2 = events_have_wft_chunking_v2(&events); Self::new_from_events( - v.events().to_vec(), + events, v.previous_started_event_id(), v.workflow_task_started_event_id(), + true, + false, + has_chunking_v2, ) } } @@ -835,6 +1282,15 @@ mod tests { } } + /// Retroactively sets the `WftChunkingV2` flag on the first + /// WFTCompleted event in an already-constructed builder (for canned histories). + fn maybe_set_chunking_v2(t: &mut TestHistoryBuilder, chunking_v2: bool) { + if chunking_v2 { + use crate::internal_flags::CoreInternalFlags; + t.set_flags_first_wft(&[CoreInternalFlags::WftChunkingV2], &[]); + } + } + impl NextWFT { fn unwrap_events(self) -> Vec { match self { @@ -842,6 +1298,13 @@ mod tests { o => panic!("Must be complete WFT: {o:?}"), } } + + fn is_complete(&self) -> bool { + match self { + NextWFT::WFT(_, true) => true, + _ => false, + } + } } fn next_check_peek(update: &mut HistoryUpdate, from_id: i64) -> Vec { @@ -851,9 +1314,20 @@ mod tests { seq } + fn next_check_peek2(update: &mut HistoryUpdate, from_id: i64) -> (usize, bool) { + let seq_peek = update.peek_next_wft_sequence(from_id).to_vec(); + let next = update.take_next_wft_sequence(from_id); + let is_complete = next.is_complete(); + let seq_take = next.unwrap_events(); + assert_eq!(seq_take, seq_peek); + (seq_take.len(), is_complete) + } + + #[rstest::rstest] #[test] - fn consumes_standard_wft_sequence() { - let timer_hist = canned_histories::single_timer("t"); + fn consumes_standard_wft_sequence(#[values(false, true)] chunking_v2: bool) { + let mut timer_hist = canned_histories::single_timer("t"); + maybe_set_chunking_v2(&mut timer_hist, chunking_v2); let mut update = timer_hist.as_history_update(); let seq_1 = next_check_peek(&mut update, 0); assert_eq!(seq_1.len(), 3); @@ -865,9 +1339,11 @@ mod tests { assert_eq!(seq_2.last().unwrap().event_id, 8); } + #[rstest::rstest] #[test] - fn skips_wft_failed() { - let failed_hist = canned_histories::workflow_fails_with_reset_after_timer("t", "runid"); + fn skips_wft_failed(#[values(false, true)] chunking_v2: bool) { + let mut failed_hist = canned_histories::workflow_fails_with_reset_after_timer("t", "runid"); + maybe_set_chunking_v2(&mut failed_hist, chunking_v2); let mut update = failed_hist.as_history_update(); let seq_1 = next_check_peek(&mut update, 0); assert_eq!(seq_1.len(), 3); @@ -877,9 +1353,11 @@ mod tests { assert_eq!(seq_2.last().unwrap().event_id, 11); } + #[rstest::rstest] #[test] - fn skips_wft_timeout() { - let failed_hist = canned_histories::wft_timeout_repro(); + fn skips_wft_timeout(#[values(false, true)] chunking_v2: bool) { + let mut failed_hist = canned_histories::wft_timeout_repro(); + maybe_set_chunking_v2(&mut failed_hist, chunking_v2); let mut update = failed_hist.as_history_update(); let seq_1 = next_check_peek(&mut update, 0); assert_eq!(seq_1.len(), 3); @@ -889,9 +1367,11 @@ mod tests { assert_eq!(seq_2.last().unwrap().event_id, 14); } + #[rstest::rstest] #[test] - fn skips_events_before_desired_wft() { - let timer_hist = canned_histories::single_timer("t"); + fn skips_events_before_desired_wft(#[values(false, true)] chunking_v2: bool) { + let mut timer_hist = canned_histories::single_timer("t"); + maybe_set_chunking_v2(&mut timer_hist, chunking_v2); let mut update = timer_hist.as_history_update(); // We haven't processed the first 3 events, but we should still only get the second sequence let seq_2 = update.take_next_wft_sequence(3).unwrap_events(); @@ -899,36 +1379,104 @@ mod tests { assert_eq!(seq_2.last().unwrap().event_id, 8); } + #[rstest::rstest] #[test] - fn history_ends_abruptly() { + fn history_ends_abruptly(#[values(false, true)] chunking_v2: bool) { let mut timer_hist = canned_histories::single_timer("t"); timer_hist.add_workflow_execution_terminated(); + maybe_set_chunking_v2(&mut timer_hist, chunking_v2); let mut update = timer_hist.as_history_update(); let seq_2 = update.take_next_wft_sequence(3).unwrap_events(); - assert_eq!(seq_2.len(), 6); - assert_eq!(seq_2.last().unwrap().event_id, 9); + if chunking_v2 { + // New algorithm: terminal event is not part of the WFTStarted LWFT. + assert_eq!(seq_2.len(), 5); + assert_eq!(seq_2.last().unwrap().event_id, 8); + let seq_3 = update.take_next_wft_sequence(8).unwrap_events(); + assert_eq!(seq_3.len(), 1); + assert!(seq_3[0].is_final_wf_execution_event()); + assert_matches!(update.take_next_wft_sequence(8), NextWFT::ReplayOver); + } else { + // Legacy algorithm: terminal event included in the WFTStarted LWFT. + assert_eq!(seq_2.len(), 6); + assert_eq!(seq_2.last().unwrap().event_id, 9); + assert!(seq_2.last().unwrap().is_final_wf_execution_event()); + } } + /// Verifies that non-command terminal events (`WorkflowExecutionTerminated`, + /// `WorkflowExecutionTimedOut`) following a `WorkflowTaskStarted` are returned as + /// trailing tail events rather than being silently dropped. This is critical because + /// callers need to process them to set `have_seen_terminal_event`. + #[rstest::rstest] #[test] - fn heartbeats_skipped() { + fn terminal_events_not_dropped_after_wft_started(#[values(false, true)] chunking_v2: bool) { + // Test both non-command terminal event types that can follow WFTStarted. + for add_terminal in [ + TestHistoryBuilder::add_workflow_execution_terminated as fn(&mut TestHistoryBuilder), + TestHistoryBuilder::add_workflow_execution_timed_out, + ] { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); // Sched(2), Started(3), Completed(4) + t.add_by_type(EventType::TimerStarted); // TimerStarted(5) + t.add_workflow_task_scheduled_and_started(); // Sched(6), Started(7) + add_terminal(&mut t); // terminal(8) + maybe_set_chunking_v2(&mut t, chunking_v2); + + let mut update = t.as_history_update(); + let seq_1 = update.take_next_wft_sequence(0).unwrap_events(); + assert_eq!(seq_1.last().unwrap().event_id, 3); + + if chunking_v2 { + let seq_2 = update.take_next_wft_sequence(3).unwrap_events(); + assert_eq!(seq_2.last().unwrap().event_id, 7); + assert_eq!( + seq_2.last().unwrap().event_type(), + EventType::WorkflowTaskStarted + ); + let seq_3 = update.take_next_wft_sequence(7).unwrap_events(); + assert_eq!(seq_3.len(), 1); + assert!(seq_3[0].is_final_wf_execution_event()); + assert_matches!(update.take_next_wft_sequence(7), NextWFT::ReplayOver); + } else { + let seq_2 = update.take_next_wft_sequence(3).unwrap_events(); + assert_eq!(seq_2.last().unwrap().event_id, 8); + assert!(seq_2.last().unwrap().is_final_wf_execution_event()); + } + } + } + + #[rstest::rstest] + #[test] + fn heartbeats_skipped(#[values(false, true)] chunking_v2: bool) { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_full_wf_task(); - t.add_full_wf_task(); // wft started 6 + t.add_full_wf_task(); t.add_by_type(EventType::TimerStarted); - t.add_full_wf_task(); // wft started 10 t.add_full_wf_task(); t.add_full_wf_task(); - t.add_full_wf_task(); // wft started 19 + t.add_full_wf_task(); + t.add_full_wf_task(); t.add_by_type(EventType::TimerStarted); - t.add_full_wf_task(); // wft started 23 + t.add_full_wf_task(); t.add_we_signaled("whee", vec![]); t.add_full_wf_task(); t.add_workflow_execution_completed(); + maybe_set_chunking_v2(&mut t, chunking_v2); let mut update = t.as_history_update(); - let seq = next_check_peek(&mut update, 0); - assert_eq!(seq.len(), 6); + if chunking_v2 { + // v2 treats WFExecutionStarted as time-sensitive, + // so the first WFT can't be collapsed with the next. + let seq = next_check_peek(&mut update, 0); + assert_eq!(seq.len(), 3); + let seq = next_check_peek(&mut update, 3); + assert_eq!(seq.len(), 3); + } else { + let seq = next_check_peek(&mut update, 0); + assert_eq!(seq.len(), 6); + } let seq = next_check_peek(&mut update, 6); assert_eq!(seq.len(), 4); let seq = next_check_peek(&mut update, 10); @@ -941,18 +1489,19 @@ mod tests { assert_eq!(seq.len(), 2); } + #[rstest::rstest] #[test] - fn heartbeat_marker_end() { + fn heartbeat_marker_end(#[values(false, true)] chunking_v2: bool) { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_full_wf_task(); t.add_full_wf_task(); t.add_local_activity_result_marker(1, "1", "done".into()); t.add_workflow_execution_completed(); + maybe_set_chunking_v2(&mut t, chunking_v2); let mut update = t.as_history_update(); let seq = next_check_peek(&mut update, 3); - // completed, sched, started assert_eq!(seq.len(), 3); let seq = next_check_peek(&mut update, 6); assert_eq!(seq.len(), 3); @@ -998,56 +1547,62 @@ mod tests { "runid".to_string(), vec![1], Arc::new(mock_client), + false, ) } #[rstest::rstest] #[tokio::test] - async fn paginator_extracts_updates(#[values(10, 11, 12, 13, 14)] chunk_size: usize) { + async fn paginator_extracts_updates( + #[values(10, 11, 12, 13, 14)] chunk_size: usize, + #[values(false, true)] chunking_v2: bool, + ) { let wft_count = 100; - let mut paginator = paginator_setup( - canned_histories::long_sequential_timers(wft_count), - chunk_size, - ); - let mut update = paginator.extract_next_update().await.unwrap(); - - let seq = update.take_next_wft_sequence(0).unwrap_events(); - assert_eq!(seq.len(), 3); + let mut hist = canned_histories::long_sequential_timers(wft_count); + let expected_final_eid = hist + .get_full_history_info() + .unwrap() + .into_events() + .last() + .unwrap() + .event_id; + maybe_set_chunking_v2(&mut hist, chunking_v2); - let mut last_event_id = 3; - let mut last_started_id = 3; - for i in 1..wft_count { - let seq = { - match update.take_next_wft_sequence(last_started_id) { - NextWFT::WFT(seq, _) => seq, + let mut paginator = paginator_setup(hist, chunk_size); + let mut update = paginator.extract_next_update().await.unwrap(); + let mut last_id = 0; + loop { + let seq = loop { + match update.take_next_wft_sequence(last_id) { + NextWFT::WFT(seq, _) => break seq, NextWFT::NeedFetch => { update = paginator.extract_next_update().await.unwrap(); - update - .take_next_wft_sequence(last_started_id) - .unwrap_events() } NextWFT::ReplayOver => { - assert_eq!(i, wft_count - 1); - break; + assert_eq!(last_id, expected_final_eid); + return; } } }; + assert!(!seq.is_empty()); for e in &seq { - last_event_id += 1; - assert_eq!(e.event_id, last_event_id); + assert!( + e.event_id > last_id, + "event ids must increase monotonically (last_id={last_id}, got {})", + e.event_id + ); + last_id = e.event_id; } - assert_eq!(seq.len(), 5); - last_started_id += 5; } } + #[rstest::rstest] #[tokio::test] - async fn paginator_streams() { + async fn paginator_streams(#[values(false, true)] chunking_v2: bool) { let wft_count = 10; - let paginator = StreamingHistoryPaginator::new(paginator_setup( - canned_histories::long_sequential_timers(wft_count), - 10, - )); + let mut hist = canned_histories::long_sequential_timers(wft_count); + maybe_set_chunking_v2(&mut hist, chunking_v2); + let paginator = StreamingHistoryPaginator::new(paginator_setup(hist, 10)); let everything: Vec<_> = paginator.try_collect().await.unwrap(); assert_eq!(everything.len(), (wft_count + 1) * 5); everything.iter().fold(1, |event_id, e| { @@ -1078,8 +1633,10 @@ mod tests { async fn needs_fetch_if_ending_in_middle_of_wft_seq( // These values test points truncation could've occurred in the middle of the heartbeat #[values(18, 19, 20, 21)] truncate_at: usize, + #[values(false, true)] chunking_v2: bool, ) { - let t = three_wfts_then_heartbeats(); + let mut t = three_wfts_then_heartbeats(); + maybe_set_chunking_v2(&mut t, chunking_v2); let mut ends_in_middle_of_seq = t.as_history_update().events; ends_in_middle_of_seq.truncate(truncate_at); // The update should contain the first three complete WFTs, ending on the 11th event which @@ -1092,6 +1649,8 @@ mod tests { .unwrap() .workflow_task_started_event_id(), false, + false, + chunking_v2, ); assert_eq!(remaining[0].event_id, 12); assert_eq!(remaining.last().unwrap().event_id, truncate_at as i64); @@ -1099,18 +1658,30 @@ mod tests { assert_eq!(seq.last().unwrap().event_id, 3); let seq = update.take_next_wft_sequence(3).unwrap_events(); assert_eq!(seq.last().unwrap().event_id, 7); - let seq = update.take_next_wft_sequence(7).unwrap_events(); - assert_eq!(seq.last().unwrap().event_id, 11); - let next = update.take_next_wft_sequence(11); - assert_matches!(next, NextWFT::NeedFetch); + if chunking_v2 { + // New algorithm: the third logical WFT ends at `WorkflowTaskStarted` (id 11), but + // the buffer has no following event — `find_end` returns NeedMore until more history + // exists. + let next = update.take_next_wft_sequence(7); + assert_matches!(next, NextWFT::NeedFetch); + } else { + // Legacy algorithm: less conservative, yields WFT3 immediately then NeedFetch for + // the next call. + let seq = update.take_next_wft_sequence(7).unwrap_events(); + assert_eq!(seq.last().unwrap().event_id, 11); + let next = update.take_next_wft_sequence(11); + assert_matches!(next, NextWFT::NeedFetch); + } } #[rstest::rstest] #[tokio::test] async fn paginator_works_with_wft_over_multiple_pages( #[values(10, 11, 12, 13, 14)] chunk_size: usize, + #[values(false, true)] chunking_v2: bool, ) { - let t = three_wfts_then_heartbeats(); + let mut t = three_wfts_then_heartbeats(); + maybe_set_chunking_v2(&mut t, chunking_v2); let mut paginator = paginator_setup(t, chunk_size); let mut update = paginator.extract_next_update().await.unwrap(); let mut last_id = 0; @@ -1129,9 +1700,11 @@ mod tests { assert_eq!(last_id, 160); } + #[rstest::rstest] #[tokio::test] - async fn task_just_before_heartbeat_chain_is_taken() { - let t = three_wfts_then_heartbeats(); + async fn task_just_before_heartbeat_chain_is_taken(#[values(false, true)] chunking_v2: bool) { + let mut t = three_wfts_then_heartbeats(); + maybe_set_chunking_v2(&mut t, chunking_v2); let mut update = t.as_history_update(); let seq = update.take_next_wft_sequence(0).unwrap_events(); assert_eq!(seq.last().unwrap().event_id, 3); @@ -1149,9 +1722,11 @@ mod tests { ); } + #[rstest::rstest] #[tokio::test] - async fn handles_cache_misses() { - let timer_hist = canned_histories::single_timer("t"); + async fn handles_cache_misses(#[values(false, true)] chunking_v2: bool) { + let mut timer_hist = canned_histories::single_timer("t"); + maybe_set_chunking_v2(&mut timer_hist, chunking_v2); let partial_task = timer_hist.get_one_wft(2).unwrap(); let prev_started_wft_id = partial_task.previous_started_event_id(); let wft_started_id = partial_task.workflow_task_started_event_id(); @@ -1174,6 +1749,7 @@ mod tests { // A cache miss means we'll try to fetch from start NextPageToken::FetchFromStart, Arc::new(mock_client), + false, ); let mut update = paginator.extract_next_update().await.unwrap(); // We expect if we try to take the first task sequence that the first event is the first @@ -1186,8 +1762,9 @@ mod tests { assert_eq!(seq.last().unwrap().event_id, 8); } + #[rstest::rstest] #[test] - fn la_marker_chunking() { + fn la_marker_chunking(#[values(false, true)] chunking_v2: bool) { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_full_wf_task(); @@ -1202,6 +1779,7 @@ mod tests { t.add_workflow_task_scheduled_and_started(); t.add_workflow_task_timed_out(); t.add_workflow_task_scheduled_and_started(); + maybe_set_chunking_v2(&mut t, chunking_v2); let mut update = t.as_history_update(); let seq = next_check_peek(&mut update, 0); @@ -1212,9 +1790,11 @@ mod tests { assert_eq!(seq.len(), 13); } + #[rstest::rstest] #[tokio::test] - async fn handles_blank_fetch_response() { - let timer_hist = canned_histories::single_timer("t"); + async fn handles_blank_fetch_response(#[values(false, true)] chunking_v2: bool) { + let mut timer_hist = canned_histories::single_timer("t"); + maybe_set_chunking_v2(&mut timer_hist, chunking_v2); let partial_task = timer_hist.get_one_wft(2).unwrap(); let prev_started_wft_id = partial_task.previous_started_event_id(); let wft_started_id = partial_task.workflow_task_started_event_id(); @@ -1232,14 +1812,17 @@ mod tests { // A cache miss means we'll try to fetch from start NextPageToken::FetchFromStart, Arc::new(mock_client), + false, ); let err = paginator.extract_next_update().await.unwrap_err(); assert_matches!(err.code(), tonic::Code::Unknown); } + #[rstest::rstest] #[tokio::test] - async fn handles_empty_page_with_next_token() { - let timer_hist = canned_histories::single_timer("t"); + async fn handles_empty_page_with_next_token(#[values(false, true)] chunking_v2: bool) { + let mut timer_hist = canned_histories::single_timer("t"); + maybe_set_chunking_v2(&mut timer_hist, chunking_v2); let partial_task = timer_hist.get_one_wft(2).unwrap(); let prev_started_wft_id = partial_task.previous_started_event_id(); let wft_started_id = partial_task.workflow_task_started_event_id(); @@ -1271,6 +1854,7 @@ mod tests { // A cache miss means we'll try to fetch from start NextPageToken::FetchFromStart, Arc::new(mock_client), + false, ); let mut update = paginator.extract_next_update().await.unwrap(); let seq = update.take_next_wft_sequence(0).unwrap_events(); @@ -1283,9 +1867,13 @@ mod tests { // TODO: Test we dont re-feed pointless updates if fetching returns <= events we already // processed + #[rstest::rstest] #[tokio::test] - async fn handles_fetching_page_with_complete_wft_and_page_token_to_empty_page() { - let timer_hist = canned_histories::single_timer("t"); + async fn handles_fetching_page_with_complete_wft_and_page_token_to_empty_page( + #[values(false, true)] chunking_v2: bool, + ) { + let mut timer_hist = canned_histories::single_timer("t"); + maybe_set_chunking_v2(&mut timer_hist, chunking_v2); let workflow_task = timer_hist.get_full_history_info().unwrap(); let prev_started_wft_id = workflow_task.previous_started_event_id(); let wft_started_id = workflow_task.workflow_task_started_event_id(); @@ -1319,6 +1907,7 @@ mod tests { "runid".to_string(), NextPageToken::FetchFromStart, Arc::new(mock_client), + false, ); let mut update = paginator.extract_next_update().await.unwrap(); let seq = update.take_next_wft_sequence(0).unwrap_events(); @@ -1328,8 +1917,11 @@ mod tests { assert_matches!(update.take_next_wft_sequence(8), NextWFT::ReplayOver); } + #[rstest::rstest] #[tokio::test] - async fn extreme_pagination_doesnt_drop_wft_events_paginator() { + async fn extreme_pagination_doesnt_drop_wft_events_paginator( + #[values(false, true)] chunking_v2: bool, + ) { // 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED // 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED // 3: EVENT_TYPE_WORKFLOW_TASK_STARTED // <- previous_started_event_id @@ -1365,6 +1957,7 @@ mod tests { t.add_we_signaled("hi", vec![]); t.add_we_signaled("hi", vec![]); t.add_workflow_task_scheduled_and_started(); + maybe_set_chunking_v2(&mut t, chunking_v2); let mut mock_client = mock_worker_client(); @@ -1423,6 +2016,7 @@ mod tests { "runid".to_string(), vec![1], Arc::new(mock_client), + false, ); let mut update = paginator.extract_next_update().await.unwrap(); @@ -1435,8 +2029,11 @@ mod tests { assert_eq!(seq.last().unwrap().event_id, 15); } + #[rstest::rstest] #[tokio::test] - async fn finding_end_index_with_started_as_last_event() { + async fn finding_end_index_with_started_as_last_event( + #[values(false, true)] chunking_v2: bool, + ) { let wf_id = "fakeid"; let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -1444,6 +2041,7 @@ mod tests { t.add_we_signaled("hi", vec![]); t.add_workflow_task_scheduled_and_started(); + maybe_set_chunking_v2(&mut t, chunking_v2); // We need to see more after this - it's not sufficient to end on a started event when // we know there might be more @@ -1481,6 +2079,7 @@ mod tests { "runid".to_string(), NextPageToken::FetchFromStart, Arc::new(mock_client), + false, ); let mut update = paginator.extract_next_update().await.unwrap(); let seq = update.take_next_wft_sequence(0).unwrap_events(); @@ -1490,8 +2089,9 @@ mod tests { assert_eq!(seq.last().unwrap().event_id, 7); } + #[rstest::rstest] #[tokio::test] - async fn just_signal_is_complete_wft() { + async fn just_signal_is_complete_wft(#[values(false, true)] chunking_v2: bool) { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_full_wf_task(); @@ -1500,6 +2100,7 @@ mod tests { t.add_we_signaled("whatever", vec![]); t.add_full_wf_task(); t.add_workflow_execution_completed(); + maybe_set_chunking_v2(&mut t, chunking_v2); let workflow_task = t.get_full_history_info().unwrap(); let prev_started_wft_id = workflow_task.previous_started_event_id(); @@ -1513,6 +2114,7 @@ mod tests { "runid".to_string(), NextPageToken::Done, Arc::new(mock_client), + false, ); let mut update = paginator.extract_next_update().await.unwrap(); let seq = next_check_peek(&mut update, 0); @@ -1525,8 +2127,9 @@ mod tests { assert_eq!(seq.len(), 2); } + #[rstest::rstest] #[tokio::test] - async fn heartbeats_then_signal() { + async fn heartbeats_then_signal(#[values(false, true)] chunking_v2: bool) { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_full_wf_task(); @@ -1537,6 +2140,7 @@ mod tests { t.add_full_wf_task(); t.add_we_signaled("whatever", vec![]); t.add_workflow_task_scheduled_and_started(); + maybe_set_chunking_v2(&mut t, chunking_v2); let full_resp: GetWorkflowExecutionHistoryResponse = t.get_full_history_info().unwrap().into(); @@ -1556,6 +2160,7 @@ mod tests { "runid".to_string(), NextPageToken::Next(vec![1]), Arc::new(mock_client), + false, ); let mut update = paginator.extract_next_update().await.unwrap(); // Starting past first wft @@ -1565,8 +2170,11 @@ mod tests { assert_eq!(seq.len(), 4); } + #[rstest::rstest] #[tokio::test] - async fn cache_miss_with_only_one_wft_available_orders_properly() { + async fn cache_miss_with_only_one_wft_available_orders_properly( + #[values(false, true)] chunking_v2: bool, + ) { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_full_wf_task(); @@ -1574,6 +2182,7 @@ mod tests { t.add_full_wf_task(); t.add_by_type(EventType::TimerStarted); t.add_workflow_task_scheduled_and_started(); + maybe_set_chunking_v2(&mut t, chunking_v2); let incremental_task = hist_to_poll_resp(&t, "wfid".to_owned(), ResponseType::OneTask(3)).resp; @@ -1607,6 +2216,7 @@ mod tests { "runid".to_string(), NextPageToken::FetchFromStart, Arc::new(mock_client), + false, ); let mut update = paginator.extract_next_update().await.unwrap(); let seq = next_check_peek(&mut update, 0); @@ -1617,8 +2227,9 @@ mod tests { assert_eq!(seq.last().unwrap().event_id, 11); } + #[rstest::rstest] #[tokio::test] - async fn wft_fail_on_first_task_with_update() { + async fn wft_fail_on_first_task_with_update(#[values(false, true)] chunking_v2: bool) { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_workflow_task_scheduled_and_started(); @@ -1632,6 +2243,7 @@ mod tests { t.add_update_completed(accept_id); t.add_timer_fired(timer_id, "1".to_string()); t.add_full_wf_task(); + maybe_set_chunking_v2(&mut t, chunking_v2); let mut update = t.as_history_update(); let seq = next_check_peek(&mut update, 0); @@ -1643,8 +2255,9 @@ mod tests { assert_eq!(seq.len(), 7); } + #[rstest::rstest] #[test] - fn update_accepted_after_empty_wft() { + fn update_accepted_after_empty_wft(#[values(false, true)] chunking_v2: bool) { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_full_wf_task(); @@ -1654,6 +2267,7 @@ mod tests { t.add_update_completed(accept_id); t.add_timer_fired(timer_id, "1".to_string()); t.add_full_wf_task(); + maybe_set_chunking_v2(&mut t, chunking_v2); let mut update = t.as_history_update(); let seq = next_check_peek(&mut update, 0); @@ -1663,5 +2277,769 @@ mod tests { assert_eq!(seq.len(), 3); let seq = next_check_peek(&mut update, 3); assert_eq!(seq.len(), 3); + // // Heartbeat: first empty WFT collapses into the second; boundary is the second WFTStarted. + // assert_eq!(seq.len(), 6); + // assert_eq!(seq.last().unwrap().event_id, 6); + // let seq = next_check_peek(&mut update, 6); + // // Through timer command, next WFTStarted (open until following completion is visible as end index). + // assert_eq!(seq.len(), 7); + // assert_eq!(seq.last().unwrap().event_id, 13); + } + + /// Builds a history with an empty WFT followed by a WFT with an update: + /// Event 1: WorkflowExecutionStarted + /// Event 2: WFTScheduled ─┐ + /// Event 3: WFTStarted ─┤ WFT1 (empty, no commands) + /// Event 4: WFTCompleted ─┘ + /// Event 5: WFTScheduled ─┐ + /// Event 6: WFTStarted ─┤ WFT2 (empty, no commands) + /// Event 7: WFTCompleted ─┘ + /// Event 8: WFTScheduled ─┐ + /// Event 9: WFTStarted ─┤ WFT3 (update + commands follow) + /// Event 10: WFTCompleted ─┘ + /// Event 11: UpdateAccepted (sequencing_event_id = 8) + /// Event 12: UpdateCompleted + /// Event 13: TimerStarted + /// Event 14: TimerFired + /// Event 15: WFTScheduled ─┐ + /// Event 16: WFTStarted ─┤ WFT4 + /// Event 17: WFTCompleted ─┘ + /// Event 18: WorkflowExecutionCompleted + fn build_empty_wft_then_update_history(chunking_v2: bool) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + if chunking_v2 { + t.set_use_wft_chunking_v2(); + } + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); // WFT1: events 2-4 (empty) + t.add_full_wf_task(); // WFT2: events 5-7 (empty) + t.add_full_wf_task(); // WFT3: events 8-10 + let accept_id = t.add_update_accepted("upd-1", "startWork"); // 11, seq=8 + t.add_update_completed(accept_id); // 12 + let timer_id = t.add_timer_started("1".to_string()); // 13 + t.add_timer_fired(timer_id, "1".to_string()); // 14 + t.add_full_wf_task(); // WFT4: events 15-17 (command) + t.add_workflow_execution_completed(); // 18 + t + } + + /// Empty WFT followed by a speculative WFT with Update request. + /// + /// The v1 algorithm may incorrectly collapse the speculative WFT with the preceding WFT, + /// resulting in an NDE. This test confirms that v2 correctly handles this scenario. + /// + /// The legacy, v1 algorithm's behavior is not covered by this test. + #[rstest::rstest] + #[test] + fn empty_wft_then_update_has_last_wft(#[values(false, true)] chunking_v2: bool) { + if !chunking_v2 { + // The legacy, v1 algorithm's behavior is not covered by this test. + return; + } + + let t = build_empty_wft_then_update_history(chunking_v2); + let all_events = t.get_full_history_info().unwrap().into_events(); + + // 3. Up to WFT1 Started — single WFT visible. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..3].to_vec(), + 0, + 3, + true, + false, + chunking_v2, + ); + + // WFEStarted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 0), (3, true)); + + // ReplayOver + assert_matches!(update.take_next_wft_sequence(3), NextWFT::ReplayOver); + } + + // 4. Up to WFT1 Completed. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..4].to_vec(), + 0, + 3, + true, + false, + chunking_v2, + ); + + // Can't collapse because of WFExecutionStarted. + // WFEStarted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // WFTCompleted + assert_eq!(next_check_peek2(&mut update, 3), (1, true)); + + // ReplayOver + assert_matches!(update.take_next_wft_sequence(4), NextWFT::ReplayOver); + } + + // 6. Up to WFT2 Started. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..6].to_vec(), + 0, + 6, + true, + false, + chunking_v2, + ); + + // Can't collapse because of WFExecutionStarted. + // WFEStarted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // WFT2 is the remaining LWFT. + // WFTCompleted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 3), (3, true)); + + // ReplayOver + assert_matches!(update.take_next_wft_sequence(6), NextWFT::ReplayOver); + } + + // 7. Up to WFT2 Completed. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..7].to_vec(), + 0, + 6, + true, + false, + chunking_v2, + ); + + // Can't collapse because of WFExecutionStarted. + // WFEStarted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // WFTCompleted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 3), (3, false)); + + // WFTCompleted + assert_eq!(next_check_peek2(&mut update, 7), (1, true)); + + // ReplayOver + assert_matches!(update.take_next_wft_sequence(7), NextWFT::ReplayOver); + } + + // 9. Up to WFT3 Started, no speculative Update pending. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..9].to_vec(), + 0, + 9, + true, + false, + chunking_v2, + ); + + // WFEStarted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // It is ok to collapse WFT2+WFT3 in this case, as there is no new event on WFT3. + // WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 3), (6, true)); + + // ReplayOver + assert_matches!(update.take_next_wft_sequence(9), NextWFT::ReplayOver); + } + + // 9a. Similar to 9, but WFT3 is a speculative WFT with a pending update. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..9].to_vec(), + 0, + 9, + true, + true, + chunking_v2, + ); + + // WFEStarted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // WFTCompleted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 3), (3, false)); + + // Can't collapse because of speculative update affecting WFT3 + // WFTCompleted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 6), (3, true)); + + // ReplayOver + assert_matches!(update.take_next_wft_sequence(9), NextWFT::ReplayOver); + } + + // 10. Up to WFT3 Completed. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..10].to_vec(), + 0, + 6, + true, + false, + chunking_v2, + ); + + // WFEStarted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // It is ok to collapse WFT2+WFT3 in this case, as there is no new event on WFT3. + // WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 3), (6, false)); + + // WFTCompleted + assert_eq!(next_check_peek2(&mut update, 9), (1, true)); + + // ReplayOver + assert_matches!(update.take_next_wft_sequence(10), NextWFT::ReplayOver); + } + + // 11. Similar to 10, but there's an UpdateAccepted affecting WFT3. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..11].to_vec(), + 0, + 9, + true, + false, + chunking_v2, + ); + + // WFEStarted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // WFTCompleted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 3), (3, false)); + + // Can't collapse because of UpdateAccepted affecting WFT3 + // WFTCompleted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 6), (3, false)); + + // Tail(WFTCompleted -> UpdateAccepted) + assert_eq!(next_check_peek2(&mut update, 9), (2, true)); + + // ReplayOver + assert_matches!(update.take_next_wft_sequence(11), NextWFT::ReplayOver); + } + + // 18: Full history + { + let mut update = t.as_history_update(); + + // WFEStarted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // WFTCompleted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 3), (3, false)); + + // Can't collapse because of UpdateAccepted affecting WFT3 + // WFTCompleted -> WFTScheduled -> WFTStarted + assert_eq!(next_check_peek2(&mut update, 6), (3, false)); + + // Complete(WFTCompleted -> UpdateAccepted -> UpdateCompleted -> TimerStarted -> TimerFired -> WFTScheduled -> WFTStarted) + assert_eq!(next_check_peek2(&mut update, 9), (7, false)); + + // Complete(WFTCompleted -> WorkflowExecutionCompleted) + assert_eq!(next_check_peek2(&mut update, 16), (2, true)); + + // ReplayOver + assert_matches!(update.take_next_wft_sequence(18), NextWFT::ReplayOver); + } + } + + /// Empty WFT followed by WFT with update. + /// + /// The v1 algorithm would often return a Complete despite not being not being able to look + /// ahead far enough (despite has_last_wft=false) to take a positive decision. This test + /// confirms that v2 correctly returns NeedFetch in those case. + /// + /// The legacy, v1 algorithm's behavior is not covered by this test. + #[rstest::rstest] + #[test] + fn empty_wft_then_update_no_last_wft(#[values(false, true)] chunking_v2: bool) { + if !chunking_v2 { + // This test encodes behavior that WFT chunking v2 specifically fixes + // (correct handling of updates after empty WFTs). The legacy algorithm has + // known buggy behavior in these scenarios — which is the entire motivation + // for this workspace's work. Skip on legacy. + return; + } + let t = build_empty_wft_then_update_history(chunking_v2); + let all_events = t.get_full_history_info().unwrap().into_events(); + + // 3. Up to WFT1 Started. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..3].to_vec(), + 0, + 3, + false, + false, + chunking_v2, + ); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> (unknown) + + // Can't decide because unknown could: + // - be collapsable into WFT1 + // - contain an UpdateAccepted event pointing back to the first WFTStarted + // - contain a WFTFailed event + + assert_matches!(update.take_next_wft_sequence(0), NextWFT::NeedFetch); + } + + // 4. Up to WFT1 Completed. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..4].to_vec(), + 0, + 3, + false, + false, + chunking_v2, + ); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> (unknown) + + // Can't decide because unknown could: + // - be collapsable into WFT1 + // - contain an UpdateAccepted event pointing back to the first WFTStarted + + assert_matches!(update.take_next_wft_sequence(0), NextWFT::NeedFetch); + } + + // 4a. Up to WFT1 Completed + a follow up command + { + let mut t = TestHistoryBuilder::from_history(all_events[..4].to_vec()); + t.add_timer_started("1".to_string()); + + let events = t.get_full_history_info().unwrap().into_events().to_vec(); + let mut update = + HistoryUpdate::new_from_events(events, 0, 3, false, false, chunking_v2); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> TimerStarted -> (unknown) + + // It is safe to return LWFT ending at the first WFTStarted + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // Can't decide because there are no more WFTStarted in buffer, but unknown could contain some + assert_matches!(update.take_next_wft_sequence(3), NextWFT::NeedFetch); + } + + // 5. Up to WFT2 Scheduled. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..5].to_vec(), + 0, + 3, + false, + false, + chunking_v2, + ); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> (unknown) + + // It is safe to return LWFT ending at the first WFTStarted (WFT1 follows WFExecutionStarted). + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // Can't decide further because unknown could: + // - contain a WFTFailed event + // - contain an UpdateAccepted event pointing back to the second WFTStarted + assert_matches!(update.take_next_wft_sequence(3), NextWFT::NeedFetch); + } + + // 5a. Up to WFT2 Scheduled + some inbound event + { + let mut t = TestHistoryBuilder::from_history(all_events[..5].to_vec()); + t.add_we_signaled("whee", vec![]); + + let events = t.get_full_history_info().unwrap().into_events().to_vec(); + let mut update = + HistoryUpdate::new_from_events(events, 0, 3, false, false, chunking_v2); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WeSignaled -> (unknown) + + // It is safe to return LWFT ending at the first WFTStarted. + // There can't be any unknown passed the WeSignaled that would affect WFT1 + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // Can't decide further because there are no more WFTStarted in buffer, but unknown could contain some + assert_matches!(update.take_next_wft_sequence(3), NextWFT::NeedFetch); + } + + // 6. Up to WFT2 Started. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..6].to_vec(), + 0, + 6, + false, + false, + chunking_v2, + ); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> (unknown) + + // It is safe to return LWFT ending at the first WFTStarted (WFT1 follows WFExecutionStarted). + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // Can't decide further because unknown could: + // - contain a WFTFailed event + // - contain an UpdateAccepted event pointing back to the second WFTStarted + assert_matches!(update.take_next_wft_sequence(3), NextWFT::NeedFetch); + } + + // 6a. Up to WFT2 Started + WFTTimedOut. + { + let mut t = TestHistoryBuilder::from_history(all_events[..6].to_vec()); + t.add_workflow_task_timed_out(); + + let events = t.get_full_history_info().unwrap().into_events().to_vec(); + let mut update = + HistoryUpdate::new_from_events(events, 0, 3, false, false, chunking_v2); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTFailed -> (unknown) + + // It is safe to return LWFT ending at the first WFTStarted. + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // Can't decide further because there are no more non-failed WFTStarted in buffer; unknown could contain some + assert_matches!(update.take_next_wft_sequence(3), NextWFT::NeedFetch); + } + + // 7. Up to WFT2 Completed. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..7].to_vec(), + 0, + 6, + false, + false, + chunking_v2, + ); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> (unknown) + + // It is safe to return LWFT ending at the first WFTStarted (WFT1 follows WFExecutionStarted). + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // Can't decide further because unknown could contain an UpdateAccepted event pointing + // back to the second WFTStarted. + assert_matches!(update.take_next_wft_sequence(3), NextWFT::NeedFetch); + } + + // 7a. Up to WFT2 Completed + a follow up command. + { + let mut t = TestHistoryBuilder::from_history(all_events[..7].to_vec()); + t.add_timer_started("1".to_string()); + + let events = t.get_full_history_info().unwrap().into_events().to_vec(); + let mut update = + HistoryUpdate::new_from_events(events, 0, 3, false, false, chunking_v2); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> TimerStarted -> (unknown) + + // WFT1 is forced separate (follows WFExecutionStarted). + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // It is safe to return LWFT ending at the second WFTStarted. + assert_eq!(next_check_peek2(&mut update, 3), (3, false)); + + assert_matches!(update.take_next_wft_sequence(6), NextWFT::NeedFetch); + } + + // 9. Up to WFT3 Started. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..9].to_vec(), + 0, + 9, + false, + false, + chunking_v2, + ); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> (unknown) + + // It is safe to return LWFT ending at the first WFTStarted (WFT1 follows WFExecutionStarted). + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // Can't decide further because unknown could: + // - allow or prevent collapsing WFT2+WFT3 + // - contain a WFTFailed event + // - contain an UpdateAccepted event pointing back to the second WFTStarted + assert_matches!(update.take_next_wft_sequence(3), NextWFT::NeedFetch); + } + + // 9a. Up to WFT3 Started + WFTTimedOut. + { + let mut t = TestHistoryBuilder::from_history(all_events[..9].to_vec()); + t.add_workflow_task_timed_out(); + + let events = t.get_full_history_info().unwrap().into_events().to_vec(); + let mut update = + HistoryUpdate::new_from_events(events, 0, 0, false, false, chunking_v2); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTTimedOut -> (unknown) + + // WFT1 is forced separate (follows WFExecutionStarted). + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // It is safe to return LWFT ending at the second WFTStarted. + assert_eq!(next_check_peek2(&mut update, 3), (3, false)); + + // Can't decide further because there are no more non-failed WFTStarted in buffer; unknown could contain some + assert_matches!(update.take_next_wft_sequence(6), NextWFT::NeedFetch); + } + + // 10. Up to WFT3 Completed. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..10].to_vec(), + 0, + 9, + false, + false, + chunking_v2, + ); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> (unknown) + + // It is safe to return LWFT ending at the first WFTStarted (WFT1 follows WFExecutionStarted). + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + + // Can't decide further because unknown could: + // - allow or prevent collapsing WFT2+WFT3 + // - contain an UpdateAccepted event pointing back to the third WFTStarted + assert_matches!(update.take_next_wft_sequence(3), NextWFT::NeedFetch); + } + + // 11. Up to updateAccepted + { + let mut update = HistoryUpdate::new_from_events( + all_events[..11].to_vec(), + 0, + 9, + false, + false, + chunking_v2, + ); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTAccepted -> (unknown) + + // WFT1 is forced separate (follows WFExecutionStarted). + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + // WFT2 is safe because we know it can't collapse with WFT3 (because of UpdateAccepted) + assert_eq!(next_check_peek2(&mut update, 3), (3, false)); + // WFT3 is safe because we know we can't collapse past the UpdateAccepted ahead + assert_eq!(next_check_peek2(&mut update, 6), (3, false)); + + // Can't decide further because there are no more WFTStarted in buffer; unknown could contain some; UpdateAccepted is not part of any LWFT + assert_matches!(update.take_next_wft_sequence(9), NextWFT::NeedFetch); + } + + // 12. Up to TimerStarted + { + let mut update = HistoryUpdate::new_from_events( + all_events[..13].to_vec(), + 0, + 9, + false, + false, + chunking_v2, + ); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTAccepted -> WFTCompleted -> TimerStarted -> (unknown) + + // WFT1 is forced separate (follows WFExecutionStarted). + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + // WFT2 is safe because we know it can't collapse with WFT3 (because of UpdateAccepted) + assert_eq!(next_check_peek2(&mut update, 3), (3, false)); + // WFT3 is safe because we know we can't collapse past the UpdateAccepted ahead + assert_eq!(next_check_peek2(&mut update, 6), (3, false)); + + // Can't decide further because there are no more WFTStarted in buffer; unknown could contain some; UpdateAccepted is not part of any LWFT + assert_matches!(update.take_next_wft_sequence(9), NextWFT::NeedFetch); + } + + // 16. Up to WFT4 Started. + { + let mut update = HistoryUpdate::new_from_events( + all_events[..16].to_vec(), + 0, + 9, + false, + false, + chunking_v2, + ); + + // Buffer: + // WFEStarted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTScheduled -> WFTStarted -> WFTCompleted -> WFTAccepted -> WFTCompleted -> TimerStarted -> TimerFired -> WFTScheduled -> WFTStarted -> (unknown) + + // WFT1 is forced separate (follows WFExecutionStarted). + assert_eq!(next_check_peek2(&mut update, 0), (3, false)); + // WFT2 is safe because we know it can't collapse with WFT3 (because of UpdateAccepted) + assert_eq!(next_check_peek2(&mut update, 3), (3, false)); + // WFT3 is safe because we know we can't collapse past the UpdateAccepted ahead + assert_eq!(next_check_peek2(&mut update, 6), (3, false)); + + // Can't decide further because WFT4 Started could be followed by a WFTFailure or noop WFT sequences. + assert_matches!(update.take_next_wft_sequence(9), NextWFT::NeedFetch); + } + } + + fn build_heartbeat_then_commands_history(chunking_v2: bool) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + if chunking_v2 { + t.set_use_wft_chunking_v2(); + } + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_full_wf_task(); // WFT2: has commands + let timer_id = t.add_timer_started("1".to_string()); + t.add_timer_fired(timer_id, "1".to_string()); + t.add_full_wf_task(); // WFT3 + t + } + + /// Heartbeat collapsing: empty WFT followed by WFT with commands. + /// + /// Under v1 (legacy), WFT1+WFT2 collapse into a single 6-event LWFT. + /// Under v2, WFT1 is forced separate (it follows WFExecutionStarted), + /// and WFT2 becomes its own LWFT. + #[rstest::rstest] + #[test] + fn heartbeat_collapsing(#[values(false, true)] chunking_v2: bool) { + let t = build_heartbeat_then_commands_history(chunking_v2); + + let mut update = t.as_history_update(); + if chunking_v2 { + // WFT1 alone (follows WFExecutionStarted). + let seq = next_check_peek(&mut update, 0); + assert_eq!(seq.len(), 3, "WFT1 should be separate"); + assert_eq!(seq.last().unwrap().event_id, 3); + + // WFT2 alone (was previously collapsed with WFT1 by v1). + let seq = next_check_peek(&mut update, 3); + assert_eq!(seq.len(), 3, "WFT2 is the second LWFT"); + assert_eq!(seq.last().unwrap().event_id, 6); + } else { + let seq = next_check_peek(&mut update, 0); + assert_eq!(seq.len(), 6, "WFT1+WFT2 should be collapsed under v1"); + assert_eq!(seq.last().unwrap().event_id, 6); + } + } + + /// When there are pending speculative updates, WFT chunking v2 must NOT + /// collapse the last WFT in a heartbeat chain, because the update needs + /// to be delivered in its own activation (matching the original execution). + /// Earlier (non-last) heartbeats in the chain may still be collapsed together. + /// + /// To exercise this independently of the WFExecutionStarted rule (which + /// already forces WFT1 to be separate), the chain we care about runs from + /// WFT2 through WFT4. + /// + /// History: + /// Event 1: WorkflowExecutionStarted + /// Event 2: WFTScheduled ─┐ + /// Event 3: WFTStarted ─┤ WFT1 (heartbeat, empty) + /// Event 4: WFTCompleted ─┘ + /// Event 5: WFTScheduled ─┐ + /// Event 6: WFTStarted ─┤ WFT2 (heartbeat, empty) + /// Event 7: WFTCompleted ─┘ + /// Event 8: WFTScheduled ─┐ + /// Event 9: WFTStarted ─┤ WFT3 (heartbeat, empty) + /// Event 10: WFTCompleted ─┘ + /// Event 11: WFTScheduled ─┐ + /// Event 12: WFTStarted ─┘ WFT4 (current task, with pending update) + #[test] + fn heartbeat_not_collapsed_when_speculative_updates_pending() { + let chunking_v2 = true; + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); // WFT1: events 2-4 + t.add_full_wf_task(); // WFT2: events 5-7 + t.add_full_wf_task(); // WFT3: events 8-10 + t.add_workflow_task_scheduled_and_started(); // WFT4: events 11-12 + maybe_set_chunking_v2(&mut t, chunking_v2); + let all_events = t.get_full_history_info().unwrap().into_events(); + + // Without speculative updates: WFT2+WFT3+WFT4 all collapse via heartbeat coalescing. + { + let (mut update, _) = + HistoryUpdate::from_events(all_events.clone(), 0, 12, true, false, chunking_v2); + + // WFT1 alone (follows WFExecutionStarted). + let seq = next_check_peek(&mut update, 0); + assert_eq!( + seq.len(), + 3, + "WFT1 is separate (follows WFExecutionStarted)" + ); + assert_eq!(seq.last().unwrap().event_id, 3); + + // WFT2+WFT3+WFT4 collapsed. + let seq = next_check_peek(&mut update, 3); + assert_eq!( + seq.len(), + 9, + "Without speculative updates: WFT2+WFT3+WFT4 collapsed" + ); + assert_eq!(seq.last().unwrap().event_id, 12); + } + + // With speculative updates: only the last heartbeat (WFT4) is uncollapsed; + // the earlier heartbeats (WFT2+WFT3) are still collapsed together. + { + let (mut update, _) = + HistoryUpdate::from_events(all_events.clone(), 0, 12, true, true, chunking_v2); + + // WFT1 alone (follows WFExecutionStarted). + let seq = next_check_peek(&mut update, 0); + assert_eq!( + seq.len(), + 3, + "WFT1 is separate (follows WFExecutionStarted)" + ); + assert_eq!(seq.last().unwrap().event_id, 3); + + // WFT2+WFT3 collapsed: intermediate heartbeats can still merge. + let seq = next_check_peek(&mut update, 3); + assert_eq!( + seq.len(), + 6, + "With speculative updates: WFT2+WFT3 still collapsed (intermediate heartbeats)" + ); + assert_eq!(seq.last().unwrap().event_id, 9); + + // WFT4 separate: holds the pending speculative update. + let seq = next_check_peek(&mut update, 9); + assert_eq!( + seq.len(), + 3, + "With speculative updates: WFT4 should be separate (3 events)" + ); + assert_eq!(seq.last().unwrap().event_id, 12); + } } } diff --git a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs index 9dbf4c35e..faf7f355f 100644 --- a/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs +++ b/crates/sdk-core/src/worker/workflow/machines/workflow_machines.rs @@ -275,6 +275,11 @@ impl WorkflowMachines { // Peek ahead to determine used flags in the first WFT. if let Some(attrs) = basics.history.peek_next_wft_completed(0) { observed_internal_flags.add_from_complete(attrs); + } else { + // There's not even a single WFT Completed event for this workflow, which implies + // that 1) this is the first WFT and 2) we're not replaying. That's a good time to + // set all flags that should be set but can only be set on the first WFT. + observed_internal_flags.write_all_cumulative_default_enabled(true); }; Self { last_history_from_server: basics.history, @@ -424,7 +429,7 @@ impl WorkflowMachines { // already recorded. (*self.observed_internal_flags) .borrow_mut() - .write_all_known(); + .write_all_cumulative_default_enabled(false); self.commands.iter().filter_map(|c| { if !self.machine(c.machine).is_final_state() { match &c.command { diff --git a/crates/sdk-core/tests/integ_tests/workflow_tests/replay.rs b/crates/sdk-core/tests/integ_tests/workflow_tests/replay.rs index 0a0ef19ab..036901bad 100644 --- a/crates/sdk-core/tests/integ_tests/workflow_tests/replay.rs +++ b/crates/sdk-core/tests/integ_tests/workflow_tests/replay.rs @@ -19,7 +19,8 @@ use temporalio_common::protos::{ }; use temporalio_macros::{workflow, workflow_methods}; use temporalio_sdk::{ - Worker, WorkflowContext, WorkflowContextView, WorkflowResult, interceptors::WorkerInterceptor, + SyncWorkflowContext, Worker, WorkflowContext, WorkflowContextView, WorkflowResult, + interceptors::WorkerInterceptor, }; use temporalio_sdk_core::{ PollError, prost_dur, @@ -404,6 +405,296 @@ async fn replay_ends_with_empty_wft() { assert!(task.eviction_reason().is_some()); } +/// Confirms WFT chunking v2 propagates end-to-end through Core's replay path +/// and produces one activation per real WFT, each with its own observed +/// timestamp. +/// +/// This is the integration-level analogue of the unit tests in +/// `crates/sdk-core/src/worker/workflow/history_update.rs` (which exercise the +/// chunking algorithm itself) — it confirms the chunking decisions also reach +/// lang correctly through the SDK replay machinery, with each WFT's +/// `WorkflowTaskStarted` timestamp surfacing on its own activation. If chunking +/// ever collapsed WFTs that should remain distinct, the activation count would +/// drop and/or two activations would share a timestamp. +#[tokio::test] +async fn wft_chunking_v2_replay_preserves_per_wft_timestamps() { + let num_timers = 3u32; + let mut t = canned_histories::long_sequential_timers(num_timers as usize); + // Builder-level opt-in: stamp the first WFTCompleted with the `WftChunkingV2` + // flag so the worker selects v2 chunking on replay. The canned history was + // built without v2 in mind, so we set the flag retroactively via the + // dedicated helper (no flags were previously set on the first WFTCompleted). + use temporalio_sdk_core::test_help::CoreInternalFlags; + t.set_flags_first_wft(&[CoreInternalFlags::WftChunkingV2], &[]); + t.set_wf_input(num_timers.as_json_payload().unwrap()); + + let mut worker = replay_sdk_worker([test_hist_to_replay(t)]); + worker.register_workflow::(); + + let collected: Arc>> = Arc::new(Mutex::new(vec![])); + worker.set_worker_interceptor(WftChunkingV2TimestampCollector { + timestamps: collected.clone(), + }); + worker.run().await.unwrap(); + + let times = collected.lock(); + assert_eq!( + times.len(), + num_timers as usize + 1, + "expected one activation per real WFT under v2 chunking \ + (InitializeWorkflow + one per TimerFired), got {}: {:?}", + times.len(), + &*times, + ); + let unique: HashSet<_> = times.iter().collect(); + assert_eq!( + times.len(), + unique.len(), + "every activation should observe its own WFTStarted timestamp under v2 \ + chunking; observed repeated timestamps would indicate a chunking collapse: \ + {:?}", + &*times, + ); +} + +struct WftChunkingV2TimestampCollector { + timestamps: Arc>>, +} + +#[async_trait::async_trait(?Send)] +impl WorkerInterceptor for WftChunkingV2TimestampCollector { + async fn on_workflow_activation( + &self, + act: &temporalio_common::protos::coresdk::workflow_activation::WorkflowActivation, + ) -> Result<(), anyhow::Error> { + // Skip eviction-only activations — they're not the "real" replay + // activations whose timestamps we care about here. + if act.eviction_reason().is_some() { + return Ok(()); + } + if let Some(ts) = act.timestamp.clone() { + self.timestamps.lock().push(ts); + } + Ok(()) + } +} + +/// A workflow that runs a local activity around a `workflow_time()` observation. +/// At workflow start it records the seconds since UNIX_EPOCH of the observed +/// time, runs the LA, and then emits a `StartTimer` command **only if** that +/// observed value matches a hard-coded "expected WFT1 timestamp". +/// +/// We construct the replay history with `WorkflowTaskStarted` events placed at +/// known wall-clock times, and we expect the workflow's first observation to +/// equal WFT1's timestamp. Under WFT chunking v2 this is what the workflow +/// sees, because `WorkflowExecutionStarted` forces WFT1 to be its own LWFT. +/// Under v1 chunking, the first LWFT collapses WFT1 with the following +/// heartbeat-shaped WFTs, and the workflow's first observation lands on the +/// *last* `WorkflowTaskStarted` in the collapsed chain (a different timestamp), +/// so the conditional branch is skipped, no timer command is emitted, and +/// replay NDEs against the recorded `TimerStarted` event. +const WFT1_TIMESTAMP_SECS: u64 = 1_700_000_000; +const WFT3_TIMESTAMP_SECS: u64 = 1_700_001_000; + +#[workflow] +#[derive(Default)] +struct WftChunkingV2LaHeartbeatWf; + +#[workflow_methods] +impl WftChunkingV2LaHeartbeatWf { + #[run(name = "wft_chunking_v2_la_heartbeat")] + async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { + let initial = ctx + .workflow_time() + .expect("workflow time should be set"); + let initial_secs = initial + .duration_since(std::time::UNIX_EPOCH) + .expect("workflow time should be after UNIX_EPOCH") + .as_secs(); + ctx.start_local_activity( + crate::common::activity_functions::StdActivities::echo, + "hi!".to_string(), + temporalio_sdk::LocalActivityOptions::default(), + ) + .await?; + // Emit a real command iff the observed timestamp matches what v2 chunking + // (which keeps WFT1 as its own LWFT) is supposed to surface. + if initial_secs == WFT1_TIMESTAMP_SECS { + ctx.timer(Duration::from_millis(1)).await; + } + Ok(()) + } +} + +/// A workflow that records `workflow_time()` each time it receives a signal, +/// then emits a `StartTimer` command *only if the two observed times differ*. +/// +/// The conditional command makes the test sensitive to chunking decisions in +/// a way the workflow's return value cannot be: the return value is recorded +/// in `WorkflowExecutionCompletedEventAttributes` in history and is therefore +/// fixed on replay regardless of what the workflow code computed; but the +/// command sequence emitted by the workflow code is replayed and matched +/// against history's command events on every replay. If chunking ever +/// collapsed the two signal-receiving WFTs into a single LWFT, both handler +/// invocations would observe the same timestamp, the workflow would not emit +/// the timer command, and the replay would NDE against the recorded +/// `TimerStarted` event. +#[workflow] +#[derive(Default)] +struct WftChunkingV2SignalTimeWf { + received: u32, + time1: Option, + time2: Option, +} + +#[workflow_methods] +impl WftChunkingV2SignalTimeWf { + #[run(name = "wft_chunking_v2_signal_time")] + async fn run(ctx: &mut WorkflowContext) -> WorkflowResult<()> { + ctx.wait_condition(|s| s.received >= 2).await; + let t1 = ctx.state(|s| s.time1.expect("time1 set in sig1 handler")); + let t2 = ctx.state(|s| s.time2.expect("time2 set in sig2 handler")); + if t1 != t2 { + // Under correct chunking the two signals were delivered in distinct + // LWFTs at distinct timestamps; emit a real command so the chunking + // decision becomes observable in the command stream (and therefore + // checkable against history on replay). + ctx.timer(Duration::from_millis(1)).await; + } + Ok(()) + } + + #[signal(name = "sig1")] + fn handle_sig1(&mut self, ctx: &mut SyncWorkflowContext, _input: ()) { + self.time1 = ctx.workflow_time(); + self.received += 1; + } + + #[signal(name = "sig2")] + fn handle_sig2(&mut self, ctx: &mut SyncWorkflowContext, _input: ()) { + self.time2 = ctx.workflow_time(); + self.received += 1; + } +} + +/// Confirms the "time sensitivity" fix of WFT chunking v2 from the workflow +/// author's point of view. +/// +/// The history below records a workflow that: +/// 1. Started. +/// 2. Received `sig1` in WFT2 — workflow_time observed = WFT2's WFTStarted ts. +/// 3. Received `sig2` in WFT3 — workflow_time observed = WFT3's WFTStarted ts. +/// 4. Saw the two observed times differ, emitted a `StartTimer` from WFT3. +/// 5. The timer fired in WFT4, then completed. +/// +/// Under v2 chunking this same shape of command stream is produced on replay — +/// because each signal-receiving WFT stays in its own LWFT with its own +/// timestamp, the workflow's `t1 != t2` branch fires again and re-emits the +/// timer. Under a hypothetical chunking collapse, the workflow would instead +/// observe `t1 == t2`, skip the timer, and NDE against the recorded +/// `TimerStarted` event in history (caught by +/// [`FailOnNondeterminismInterceptor`] that `replay_sdk_worker` installs). +#[tokio::test] +async fn wft_chunking_v2_signal_observed_times_are_per_wft() { + let mut t = TestHistoryBuilder::default(); + t.set_use_wft_chunking_v2(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); // WFT1: workflow main runs, awaits signals + t.add_we_signaled("sig1", vec![]); + t.add_full_wf_task(); // WFT2: handles sig1 (records time1) + t.add_we_signaled("sig2", vec![]); + t.add_full_wf_task(); // WFT3: handles sig2 (records time2), emits StartTimer + let timer_id = t.add_timer_started("1".to_string()); + t.add_timer_fired(timer_id, "1".to_string()); + t.add_full_wf_task(); // WFT4: timer fires, workflow completes + t.add_workflow_execution_completed(); + t.set_wf_type("wft_chunking_v2_signal_time"); + + let mut worker = replay_sdk_worker([test_hist_to_replay(t)]); + worker.register_workflow::(); + // `replay_sdk_worker` pre-installs `FailOnNondeterminismInterceptor`, which + // is what catches any mismatch between the workflow's emitted commands and + // history if chunking ever observed the wrong timestamps. + worker.run().await.unwrap(); +} + +/// Strict v1-vs-v2 differentiator using a LA-heartbeat history. +/// +/// History layout: +/// 1. `WorkflowExecutionStarted` +/// 2. WFT1 (Sched/Started/Completed) — workflow code observes time at this WFT's +/// `WorkflowTaskStarted` event under v2, then starts an LA and suspends. +/// 3. WFT2 (Sched/Started/Completed) — heartbeat WFT (long-running LA still in +/// progress; the worker completed with no commands to dodge WFT timeout). +/// 4. WFT3 (Sched/Started/Completed) — the LA completes during this WFT. +/// 5. `MarkerRecorded` — LA result marker (recorded as WFT3's command). +/// 6. WFT4 (Sched/Started/Completed) — workflow code resumes, observes the LA +/// result, and emits a `StartTimer` command iff the workflow's first +/// `workflow_time()` observation matched WFT1's WFTStarted timestamp. +/// 7. `TimerStarted` / `TimerFired` — the timer (the chunking-sensitive command). +/// 8. WFT5 (Sched/Started/Completed) — timer fired, workflow completes. +/// 9. `WorkflowExecutionCompleted` +/// +/// The WFTStarted events for WFT1 and WFT3 are stamped with deliberately +/// distinct wall-clock times. +/// +/// Under v2 chunking, the first LWFT is `[WFExecutionStarted, WFTScheduled, +/// WFTStarted]` covering WFT1 alone. The workflow's first `workflow_time()` +/// observation surfaces WFT1's timestamp, the conditional branch fires, and +/// the timer command is emitted — matching the recorded `TimerStarted` event. +/// Under v1 chunking, the heartbeat heuristic collapses WFT1/WFT2/WFT3 into a +/// single LWFT whose activation timestamp is the last `WorkflowTaskStarted` in +/// the chain (WFT3's). The workflow's first observation lands on the wrong +/// timestamp, the conditional branch is skipped, no timer command is emitted, +/// and the worker's `FailOnNondeterminismInterceptor` triggers on the +/// unmatched `TimerStarted` event in history. +#[tokio::test] +async fn wft_chunking_v2_la_heartbeat_keeps_wft1_distinct() { + let mut t = TestHistoryBuilder::default(); + t.set_use_wft_chunking_v2(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); // WFT1 (events 2-4) — workflow starts LA, awaits + t.add_full_wf_task(); // WFT2 (events 5-7) — LA-heartbeat (empty WFT) + t.add_full_wf_task(); // WFT3 (events 8-10) — LA completes + t.add_local_activity_result_marker( + 1, + "1", + temporalio_common::protos::coresdk::AsJsonPayloadExt::as_json_payload(&"hi!".to_string()) + .unwrap(), + ); // event 11 (LA result marker, recorded as WFT3's command) + t.add_full_wf_task(); // WFT4 (events 12-14) — workflow resumes, emits StartTimer + let timer_id = t.add_timer_started("1".to_string()); // event 15 + t.add_timer_fired(timer_id, "1".to_string()); // event 16 + t.add_full_wf_task(); // WFT5 (events 17-19) — timer fires, workflow completes + t.add_workflow_execution_completed(); // event 20 + t.set_wf_type("wft_chunking_v2_la_heartbeat"); + + // Stamp WFT1 and WFT3's `WorkflowTaskStarted` events with deliberately distinct + // wall-clock times so the workflow's `workflow_time()` observation can detect + // which one its activation timestamp came from. `set_current_time` in + // workflow_machines is monotonic, so we also push `WorkflowExecutionStarted` and + // the no-op WFT2 events strictly below WFT1's target time — otherwise the + // default `SystemTime::now()` event times (set when the events were appended) + // would clamp the workflow clock to the present and our modifications would be + // ignored. The events past WFT3 (LA marker, WFT4, timer, WFT5) keep their + // default wall-clock times; they are only reached after the workflow's first + // observation, so they don't affect this test's signal. + let pre = std::time::UNIX_EPOCH + Duration::from_secs(WFT1_TIMESTAMP_SECS - 1); + let wft1_start = std::time::UNIX_EPOCH + Duration::from_secs(WFT1_TIMESTAMP_SECS); + let wft3_start = std::time::UNIX_EPOCH + Duration::from_secs(WFT3_TIMESTAMP_SECS); + t.modify_event(1, |e| e.event_time = Some(pre.into())); // WFExecutionStarted + t.modify_event(2, |e| e.event_time = Some(pre.into())); // WFT1.WFTScheduled + t.modify_event(3, |e| e.event_time = Some(wft1_start.into())); // WFT1.WFTStarted + t.modify_event(9, |e| e.event_time = Some(wft3_start.into())); // WFT3.WFTStarted + + let mut worker = replay_sdk_worker([test_hist_to_replay(t)]); + worker.register_workflow::(); + worker.register_activities(crate::common::activity_functions::StdActivities); + // `replay_sdk_worker` already installs `FailOnNondeterminismInterceptor`, + // which is what catches the missing `StartTimer` command if chunking is wrong. + worker.run().await.unwrap(); +} + #[derive(Default)] struct UniqueRunsCounter { runs: Arc>>,