Skip to content

feat(evo-flow): re-wire CampaignWorkflow to publish campaigns.pack (EVO-1221)#58

Open
nickoliveira23 wants to merge 1 commit into
developfrom
feat/EVO-1221
Open

feat(evo-flow): re-wire CampaignWorkflow to publish campaigns.pack (EVO-1221)#58
nickoliveira23 wants to merge 1 commit into
developfrom
feat/EVO-1221

Conversation

@nickoliveira23

@nickoliveira23 nickoliveira23 commented Jun 11, 2026

Copy link
Copy Markdown

Summary

The "switch flip" that wires the distributed campaign pipeline end-to-end (Epic 4, story 4.7). Until now the packer → sender → tracker existed but was never triggered.

  • CampaignExecutionWorkflow no longer computes audience / creates batches / sends inline. It validates the campaign, marks it SENDING, and publishes a single campaigns.pack message via a new activity, returning in <5s.
  • New activity publishCampaignsPack({campaignId, correlationId}) injects IMESSAGE_BROKER (already @Global in AppModule.forRoot()) and publishes {campaignId, triggeredAt, triggeredBy:'schedule', correlationId} to campaigns.pack.
  • correlationId is generated in the workflow with uuid4() from @temporalio/workflow (deterministic / replay-safe) and flows through every downstream message + log.
  • The packer resolves audience + paginates → campaigns.send; the sender dispatches → campaigns.tracked; the broker-native campaign-tracker (4.6 / EVO-1220) aggregates and drives Campaign.status → Completed. The workflow waits on none of it.
  • Legacy inline dispatch (sendCampaignBatchMessagesCampaignMessageSenderService) is no longer called; its removal stays in story 5.5.

Contract note (card vs. landed contract)

The card's AC1 text says publish triggeredBy:'workflow', but the landed story-1.5 contract (campaigns-pack.contract.ts) enum is schedule|manual|recurrence (.strict()). The field is pure provenance (the packer reads only campaignId/correlationId), so this publishes triggeredBy:'schedule' (the Temporal trigger fires when the campaign schedule expires) rather than mutating the shared contract.

Code review fix (adversarial pass)

  • HIGH — orphaned CampaignExecution: removing the inline orchestration also removed the updateExecutionProgress calls that used to close the execution row, which would leave it RUNNING forever (blocking re-runs and breaking pause/stop). Fixed by closing the row at the hand-off (completed) and on failure (failed). This also makes pause/stop endpoints return "no active execution" gracefully instead of signalling a completed workflow.

Acceptance Criteria

# AC (card) Evidence Status
1 dispatch returns <5s, msg in campaigns.pack with the payload publishCampaignsPack (activities) + workflow STEP 3; spec asserts topic+schema ✓ (⚠ triggeredBy:'schedule' vs card's 'workflow' — contract-driven)
2 full pipeline, 10 contacts → Completed workflow publishes pack; tracker (4.6, Done) completes ⚠ live E2E pending (needs broker+Temporal stack)
3 same correlationId across pack/send/tracked + logs uuid4() in workflow; packer/sender propagate (4.1/4.6)
4 broker error → activity propagates → Temporal default retry activity does not swallow; proxy retry policy; spec test
5 tests no longer reference legacy service legacy not called; new activity spec; no ref to CampaignMessageSenderService

Security

  • No new external surface, auth, or input. Broker token resolved via DI; payload is the typed/strict story-1.5 contract. Tenant scoping unchanged (activity DB writes still go through runActivityInTenantDbContext).

Test plan

  • evo-flow: npm run typecheck — clean
  • evo-flow: npx jest campaign-execution.activities — 2/2 (schema-valid publish + broker-error propagation/AC4)
  • evo-flow: npx eslint <changed files> — activity +0 vs baseline, workflow reduced 19→7 (pre-existing error.message idiom), spec 0
  • evo-flow: npx jest campaign temporal — 174/175 (1 failure is pre-existing on develop: campaigns.controller.spec.ts stop backward-compat test, unrelated)
  • Live E2E smoke (packer+sender+tracker+api → 10-contact campaign → Completed) not run here — requires the full broker+Temporal stack.

Note: evo-flow CI runs Sourcery only (no tsc/jest/lint), so green checks are self-reported — validation above is local.

Changed Files

  • src/modules/temporal/activities/campaign-execution.activities.ts — new publishCampaignsPack activity + registration
  • src/modules/temporal/workflows/campaign-execution.workflow.ts — re-wire to hand-off; close execution row at hand-off/failure
  • src/modules/temporal/activities/campaign-execution.activities.spec.ts — new unit tests

Known limitations / follow-ups

  • Workflow re-wire itself is not unit-tested (@temporalio/testing is not a dependency; importing the workflow in plain jest throws on top-level proxyActivities()). Activity is covered.
  • Pause/stop of an in-flight distributed send is not possible via the workflow anymore — that is story 4.8 (campaigns.control, EVO-1222), which this unblocks.
  • triggeredBy is fixed to 'schedule' even for manual starts (provenance only).

Linked Issue

  • EVO-1221

Summary by Sourcery

Wire the campaign execution workflow to hand off dispatch to the distributed campaign pipeline via a new broker-backed activity and close executions at hand-off or on failure to avoid orphaned runs.

New Features:

  • Introduce a publishCampaignsPack activity that emits a campaigns.pack message using the existing message broker contract as the entrypoint to the distributed campaign pipeline.

Bug Fixes:

  • Ensure CampaignExecution records are marked completed or failed at hand-off or on error so executions no longer remain stuck in a RUNNING state.

Enhancements:

  • Simplify CampaignExecutionWorkflow by removing inline audience computation, batching, and message sending in favor of a fast hand-off with correlation IDs for end-to-end tracing.

Tests:

  • Add unit tests for publishCampaignsPack to validate the emitted message schema, topic, and error propagation behavior.

…VO-1221)

Switch-flip of the distributed campaign pipeline. CampaignExecutionWorkflow
no longer computes audience / batches / sends inline; it now validates the
campaign, marks it SENDING, and publishes a single `campaigns.pack` message
via a new `publishCampaignsPack` activity, returning in <5s. The packer →
sender → campaign-tracker pipeline (4.1/4.3/4.5/4.6) takes over and drives
Campaign.status to Completed broker-native (4.6 is not Temporal-signal based).

- New activity `publishCampaignsPack({campaignId, correlationId})` publishes
  `{campaignId, triggeredAt, triggeredBy:'schedule', correlationId}` to
  `campaigns.pack`. `triggeredBy:'schedule'` matches the landed story-1.5
  contract enum (schedule|manual|recurrence); the card's literal 'workflow'
  is not a valid contract value and the packer reads only campaignId +
  correlationId. Broker errors propagate so Temporal applies the activity
  proxy's default retry policy.
- correlationId generated in the workflow via `uuid4()` (replay-safe) and
  propagated through every downstream message/log.
- Close the CampaignExecution row at hand-off (completed) and on failure
  (failed) so it stops counting as an active execution; otherwise it stays
  RUNNING forever and blocks re-runs and breaks pause/stop.
- Legacy inline dispatch (sendCampaignBatchMessages / CampaignMessageSenderService)
  is no longer called; its removal stays in story 5.5.
- Unit tests for the new activity (schema-valid publish + error propagation).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@sourcery-ai

sourcery-ai Bot commented Jun 11, 2026

Copy link
Copy Markdown

Reviewer's Guide

Rewires the CampaignExecutionWorkflow to stop performing inline audience computation and batch sending, and instead hand off campaign dispatch to the distributed broker-based pipeline by publishing a single campaigns.pack message via a new activity, while ensuring execution rows are properly closed and the new behavior is covered by unit tests.

Sequence diagram for CampaignExecutionWorkflow hand-off to distributed pipeline

sequenceDiagram
  actor Scheduler
  participant CampaignExecutionWorkflow
  participant CampaignExecutionActivities
  participant IMessageBroker
  participant PackerWorker
  participant SenderWorker
  participant CampaignTracker

  Scheduler->>CampaignExecutionWorkflow: start CampaignExecutionWorkflow
  CampaignExecutionWorkflow->>CampaignExecutionActivities: getCampaignData
  CampaignExecutionActivities-->>CampaignExecutionWorkflow: Campaign
  CampaignExecutionWorkflow->>CampaignExecutionActivities: updateCampaignStatus (SENDING)
  CampaignExecutionWorkflow->>CampaignExecutionActivities: publishCampaignsPack(campaignId, correlationId)
  CampaignExecutionActivities->>IMessageBroker: publish(CAMPAIGNS_PACK_TOPIC, CampaignsPackContract)
  IMessageBroker-->>PackerWorker: CAMPAIGNS_PACK_TOPIC
  PackerWorker-->>IMessageBroker: campaigns.send
  IMessageBroker-->>SenderWorker: campaigns.send
  SenderWorker-->>IMessageBroker: campaigns.tracked
  IMessageBroker-->>CampaignTracker: campaigns.tracked
  CampaignExecutionWorkflow->>CampaignExecutionActivities: updateExecutionProgress(completed)
  CampaignExecutionWorkflow-->>Scheduler: return CampaignExecutionState (status sending)
Loading

File-Level Changes

Change Details Files
Workflow now validates the campaign, marks it SENDING, publishes a campaigns.pack message, and immediately returns instead of orchestrating audience computation and batch sending inline.
  • Removed inline audience computation, batch creation, and per-batch message sending logic from the workflow
  • Introduced a deterministic correlationId generated via uuid4() and threaded through logs and the publish call
  • Simplified pause/resume/cancel signal handlers to only annotate returned state without controlling a long-running send
  • Updated success and failure paths to treat the broker hand-off as the workflow boundary, including new logging messages
src/modules/temporal/workflows/campaign-execution.workflow.ts
Campaign execution activities gained a publishCampaignsPack activity that publishes a typed campaigns.pack message through the shared broker and is registered for worker use.
  • Defined PublishCampaignsPackInput and added publishCampaignsPack to the CampaignExecutionActivities interface
  • Implemented publishCampaignsPack to resolve IMessageBroker via IMESSAGE_BROKER from the Nest app context and publish a CampaignsPackContract payload with triggeredBy fixed to 'schedule'
  • Registered publishCampaignsPack in the exported campaignExecutionActivities object
src/modules/temporal/activities/campaign-execution.activities.ts
Unit tests were added to validate the new publishCampaignsPack activity’s broker interaction, schema compliance, and error propagation behavior.
  • Mocked NestFactory and AppModule to avoid bringing up the full application graph in tests
  • Asserted that a single schema-valid campaigns.pack message is published on the correct topic with the expected fields
  • Asserted that broker publish errors are propagated so Temporal can apply its retry policy
src/modules/temporal/activities/campaign-execution.activities.spec.ts

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've left some high level feedback:

  • In CampaignExecutionWorkflow, the success path no longer updates state.status to 'completed' or sets state.completedAt, even though updateExecutionProgress is called with status: 'completed'; consider aligning the in-memory CampaignExecutionState with the persisted status so API consumers of the workflow result don’t see a stale 'sending' state.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `CampaignExecutionWorkflow`, the success path no longer updates `state.status` to `'completed'` or sets `state.completedAt`, even though `updateExecutionProgress` is called with `status: 'completed'`; consider aligning the in-memory `CampaignExecutionState` with the persisted status so API consumers of the workflow result don’t see a stale `'sending'` state.

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant