diff --git a/crates/sdk-core/src/core_tests/workflow_tasks.rs b/crates/sdk-core/src/core_tests/workflow_tasks.rs index 8fd2cfdc1..d06402a23 100644 --- a/crates/sdk-core/src/core_tests/workflow_tasks.rs +++ b/crates/sdk-core/src/core_tests/workflow_tasks.rs @@ -1159,6 +1159,52 @@ async fn new_server_work_while_eviction_outstanding_doesnt_overwrite_activation( ); } +#[rstest] +#[case::timed_out(TestHistoryBuilder::add_workflow_execution_timed_out)] +#[case::terminated(TestHistoryBuilder::add_workflow_execution_terminated)] +#[tokio::test] +async fn external_terminal_events_request_cache_eviction( + #[case] add_terminal_event: fn(&mut TestHistoryBuilder), +) { + let wfid = "fake_wf_id"; + let mut t = canned_histories::single_timer("1"); + add_terminal_event(&mut t); + + let mut mh = + MockPollCfg::from_resp_batches(wfid, t, [ResponseType::AllHistory], mock_worker_client()); + mh.num_expected_completions = Some(TimesRange::from(0)); + let mut mock = build_mock_pollers(mh); + mock.make_wft_stream_interminable(); + mock.worker_cfg(|wc| wc.max_cached_workflows = 1); + let core = mock_worker(mock); + + let activation = core.poll_workflow_activation().await.unwrap(); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( + activation.run_id, + vec![ + start_timer_cmd(1, Duration::from_secs(1)), + CompleteWorkflowExecution { result: None }.into(), + ], + )) + .await + .unwrap(); + + let eviction = time::timeout(Duration::from_secs(1), core.poll_workflow_activation()) + .await + .expect("timed out waiting for eviction") + .expect("expected eviction activation"); + assert_matches!( + eviction.jobs.as_slice(), + [WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::RemoveFromCache(eviction)), + }] if eviction.reason == EvictionReason::WorkflowExecutionEnding as i32 + ); + core.complete_workflow_activation(WorkflowActivationCompletion::empty(eviction.run_id)) + .await + .unwrap(); + core.shutdown().await; +} + #[tokio::test] async fn buffered_work_drained_on_shutdown() { let wfid = "fake_wf_id"; diff --git a/crates/sdk-core/src/worker/workflow/workflow_stream.rs b/crates/sdk-core/src/worker/workflow/workflow_stream.rs index 0c86f3fe5..d92737b85 100644 --- a/crates/sdk-core/src/worker/workflow/workflow_stream.rs +++ b/crates/sdk-core/src/worker/workflow/workflow_stream.rs @@ -397,11 +397,14 @@ impl WFStream { { // Attempt to produce the next activation if needed res = rh.check_more_activations(); - // If there's no more work and we reported workflow completion to server, evict. - if res.is_none() - && rh.workflow_is_finished() - && matches!(report.wft_report_status, WFTReportStatus::Reported { .. }) - { + // If there's no more work and the workflow has ended, evict. Workflows can end either + // because lang produced a terminal command we successfully reported to server, or + // because replay observed a server-side terminal event like timeout or termination. + let should_evict_finished_workflow = rh.workflow_is_finished() + && matches!(report.wft_report_status, WFTReportStatus::Reported { .. }); + let should_evict_terminal_history = rh.have_seen_terminal_event() + && matches!(report.wft_report_status, WFTReportStatus::NotReported); + if res.is_none() && (should_evict_finished_workflow || should_evict_terminal_history) { res = rh .request_eviction(RequestEvictMsg { run_id: run_id.to_string(),