feat(evo-flow): re-wire CampaignWorkflow to publish campaigns.pack (EVO-1221)#58
Open
nickoliveira23 wants to merge 1 commit into
Open
feat(evo-flow): re-wire CampaignWorkflow to publish campaigns.pack (EVO-1221)#58nickoliveira23 wants to merge 1 commit into
nickoliveira23 wants to merge 1 commit into
Conversation
…VO-1221)
Switch-flip of the distributed campaign pipeline. CampaignExecutionWorkflow
no longer computes audience / batches / sends inline; it now validates the
campaign, marks it SENDING, and publishes a single `campaigns.pack` message
via a new `publishCampaignsPack` activity, returning in <5s. The packer →
sender → campaign-tracker pipeline (4.1/4.3/4.5/4.6) takes over and drives
Campaign.status to Completed broker-native (4.6 is not Temporal-signal based).
- New activity `publishCampaignsPack({campaignId, correlationId})` publishes
`{campaignId, triggeredAt, triggeredBy:'schedule', correlationId}` to
`campaigns.pack`. `triggeredBy:'schedule'` matches the landed story-1.5
contract enum (schedule|manual|recurrence); the card's literal 'workflow'
is not a valid contract value and the packer reads only campaignId +
correlationId. Broker errors propagate so Temporal applies the activity
proxy's default retry policy.
- correlationId generated in the workflow via `uuid4()` (replay-safe) and
propagated through every downstream message/log.
- Close the CampaignExecution row at hand-off (completed) and on failure
(failed) so it stops counting as an active execution; otherwise it stays
RUNNING forever and blocks re-runs and breaks pause/stop.
- Legacy inline dispatch (sendCampaignBatchMessages / CampaignMessageSenderService)
is no longer called; its removal stays in story 5.5.
- Unit tests for the new activity (schema-valid publish + error propagation).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Reviewer's GuideRewires the CampaignExecutionWorkflow to stop performing inline audience computation and batch sending, and instead hand off campaign dispatch to the distributed broker-based pipeline by publishing a single campaigns.pack message via a new activity, while ensuring execution rows are properly closed and the new behavior is covered by unit tests. Sequence diagram for CampaignExecutionWorkflow hand-off to distributed pipelinesequenceDiagram
actor Scheduler
participant CampaignExecutionWorkflow
participant CampaignExecutionActivities
participant IMessageBroker
participant PackerWorker
participant SenderWorker
participant CampaignTracker
Scheduler->>CampaignExecutionWorkflow: start CampaignExecutionWorkflow
CampaignExecutionWorkflow->>CampaignExecutionActivities: getCampaignData
CampaignExecutionActivities-->>CampaignExecutionWorkflow: Campaign
CampaignExecutionWorkflow->>CampaignExecutionActivities: updateCampaignStatus (SENDING)
CampaignExecutionWorkflow->>CampaignExecutionActivities: publishCampaignsPack(campaignId, correlationId)
CampaignExecutionActivities->>IMessageBroker: publish(CAMPAIGNS_PACK_TOPIC, CampaignsPackContract)
IMessageBroker-->>PackerWorker: CAMPAIGNS_PACK_TOPIC
PackerWorker-->>IMessageBroker: campaigns.send
IMessageBroker-->>SenderWorker: campaigns.send
SenderWorker-->>IMessageBroker: campaigns.tracked
IMessageBroker-->>CampaignTracker: campaigns.tracked
CampaignExecutionWorkflow->>CampaignExecutionActivities: updateExecutionProgress(completed)
CampaignExecutionWorkflow-->>Scheduler: return CampaignExecutionState (status sending)
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've left some high level feedback:
- In
CampaignExecutionWorkflow, the success path no longer updatesstate.statusto'completed'or setsstate.completedAt, even thoughupdateExecutionProgressis called withstatus: 'completed'; consider aligning the in-memoryCampaignExecutionStatewith the persisted status so API consumers of the workflow result don’t see a stale'sending'state.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `CampaignExecutionWorkflow`, the success path no longer updates `state.status` to `'completed'` or sets `state.completedAt`, even though `updateExecutionProgress` is called with `status: 'completed'`; consider aligning the in-memory `CampaignExecutionState` with the persisted status so API consumers of the workflow result don’t see a stale `'sending'` state.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
The "switch flip" that wires the distributed campaign pipeline end-to-end (Epic 4, story 4.7). Until now the packer → sender → tracker existed but was never triggered.
CampaignExecutionWorkflowno longer computes audience / creates batches / sends inline. It validates the campaign, marks itSENDING, and publishes a singlecampaigns.packmessage via a new activity, returning in <5s.publishCampaignsPack({campaignId, correlationId})injectsIMESSAGE_BROKER(already@GlobalinAppModule.forRoot()) and publishes{campaignId, triggeredAt, triggeredBy:'schedule', correlationId}tocampaigns.pack.correlationIdis generated in the workflow withuuid4()from@temporalio/workflow(deterministic / replay-safe) and flows through every downstream message + log.campaigns.send; the sender dispatches →campaigns.tracked; the broker-nativecampaign-tracker(4.6 / EVO-1220) aggregates and drivesCampaign.status → Completed. The workflow waits on none of it.sendCampaignBatchMessages→CampaignMessageSenderService) is no longer called; its removal stays in story 5.5.Contract note (card vs. landed contract)
The card's AC1 text says publish
triggeredBy:'workflow', but the landed story-1.5 contract (campaigns-pack.contract.ts) enum isschedule|manual|recurrence(.strict()). The field is pure provenance (the packer reads onlycampaignId/correlationId), so this publishestriggeredBy:'schedule'(the Temporal trigger fires when the campaign schedule expires) rather than mutating the shared contract.Code review fix (adversarial pass)
CampaignExecution: removing the inline orchestration also removed theupdateExecutionProgresscalls that used to close the execution row, which would leave itRUNNINGforever (blocking re-runs and breaking pause/stop). Fixed by closing the row at the hand-off (completed) and on failure (failed). This also makes pause/stop endpoints return "no active execution" gracefully instead of signalling a completed workflow.Acceptance Criteria
campaigns.packwith the payloadpublishCampaignsPack(activities) + workflow STEP 3; spec asserts topic+schematriggeredBy:'schedule'vs card's'workflow'— contract-driven)CompletedcorrelationIdacross pack/send/tracked + logsuuid4()in workflow; packer/sender propagate (4.1/4.6)CampaignMessageSenderServiceSecurity
runActivityInTenantDbContext).Test plan
evo-flow: npm run typecheck— cleanevo-flow: npx jest campaign-execution.activities— 2/2 (schema-valid publish + broker-error propagation/AC4)evo-flow: npx eslint <changed files>— activity +0 vs baseline, workflow reduced 19→7 (pre-existingerror.messageidiom), spec 0evo-flow: npx jest campaign temporal— 174/175 (1 failure is pre-existing ondevelop:campaigns.controller.spec.tsstopbackward-compat test, unrelated)Completed) not run here — requires the full broker+Temporal stack.Changed Files
src/modules/temporal/activities/campaign-execution.activities.ts— newpublishCampaignsPackactivity + registrationsrc/modules/temporal/workflows/campaign-execution.workflow.ts— re-wire to hand-off; close execution row at hand-off/failuresrc/modules/temporal/activities/campaign-execution.activities.spec.ts— new unit testsKnown limitations / follow-ups
@temporalio/testingis not a dependency; importing the workflow in plain jest throws on top-levelproxyActivities()). Activity is covered.campaigns.control, EVO-1222), which this unblocks.triggeredByis fixed to'schedule'even for manual starts (provenance only).Linked Issue
Summary by Sourcery
Wire the campaign execution workflow to hand off dispatch to the distributed campaign pipeline via a new broker-backed activity and close executions at hand-off or on failure to avoid orphaned runs.
New Features:
Bug Fixes:
Enhancements:
Tests: