Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { NestFactory } from '@nestjs/core';
import { publishCampaignsPack } from './campaign-execution.activities';
import { IMESSAGE_BROKER } from '../../../shared/broker/interfaces/message-broker.interface';
import {
CAMPAIGNS_PACK_TOPIC,
CampaignsPackContract,
isCampaignsPackContract,
} from '../../../shared/broker/contracts/campaigns-pack.contract';

jest.mock('@nestjs/core');
// Stub the app module so booting the activity's Nest context never pulls the
// real application graph (DB, brokers) into the unit test.
jest.mock('../../../app.module', () => ({
AppModule: { forRoot: () => ({}) },
}));
jest.mock('@temporalio/activity', () => ({
log: { info: jest.fn(), error: jest.fn(), warn: jest.fn(), debug: jest.fn() },
}));

const CORRELATION_ID = '11111111-1111-4111-8111-111111111111';
const CAMPAIGN_ID = 'camp-1';

describe('publishCampaignsPack activity', () => {
// Stable mock references: the activity caches its Nest context as a module
// singleton, so the broker resolved on the first call is reused. Reconfigure
// behaviour per test on the same `publish` fn rather than swapping it out.
const publish = jest.fn();
const appGet = jest.fn().mockReturnValue({ publish });

beforeEach(() => {
publish.mockReset().mockResolvedValue(undefined);
(NestFactory.createApplicationContext as jest.Mock).mockResolvedValue({
get: appGet,
});
});

it('publishes one schema-valid campaigns.pack message resolved via the broker token', async () => {
await publishCampaignsPack({
campaignId: CAMPAIGN_ID,
correlationId: CORRELATION_ID,
});

expect(appGet).toHaveBeenCalledWith(IMESSAGE_BROKER);
expect(publish).toHaveBeenCalledTimes(1);

const [topic, payload] = publish.mock.calls[0] as [
string,
CampaignsPackContract,
];
expect(topic).toBe(CAMPAIGNS_PACK_TOPIC);
expect(payload).toMatchObject({
campaignId: CAMPAIGN_ID,
triggeredBy: 'schedule',
correlationId: CORRELATION_ID,
});
expect(typeof payload.triggeredAt).toBe('string');
// The published payload must satisfy the landed story-1.5 contract.
expect(isCampaignsPackContract(payload)).toBe(true);
});

it('propagates a broker error so Temporal applies the activity retry policy (AC4)', async () => {
const brokerError = new Error('broker timeout');
publish.mockRejectedValueOnce(brokerError);

await expect(
publishCampaignsPack({
campaignId: CAMPAIGN_ID,
correlationId: CORRELATION_ID,
}),
).rejects.toThrow('broker timeout');
});
});
53 changes: 53 additions & 0 deletions src/modules/temporal/activities/campaign-execution.activities.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import { log } from '@temporalio/activity';
import { NestFactory } from '@nestjs/core';
import type { INestApplicationContext } from '@nestjs/common';
import { AppModule } from '../../../app.module';
import { AudienceComputationService } from '../../../shared/audience/audience-computation.service';
import { CampaignsService } from '../../campaigns/services/campaigns.service';
import { Campaign } from '../../campaigns/entities/campaign.entity';
import { CampaignExecution } from '../../campaigns/entities/campaign-execution.entity';
import { CampaignContactStatus } from '../../campaigns/entities/campaign-contact.entity';
import { runActivityInTenantDbContext } from '../tenant-activity-context';
import {
IMESSAGE_BROKER,
IMessageBroker,
} from '../../../shared/broker/interfaces/message-broker.interface';
import {
CAMPAIGNS_PACK_TOPIC,
CampaignsPackContract,
} from '../../../shared/broker/contracts/campaigns-pack.contract';

let appContext: any = null;

Expand Down Expand Up @@ -70,6 +79,11 @@ export interface GetCampaignDataInput {
campaignId: string;
}

export interface PublishCampaignsPackInput {
campaignId: string;
correlationId: string;
}

export interface UpdateExecutionProgressInput {
campaignId: string;
workflowId: string;
Expand Down Expand Up @@ -107,6 +121,8 @@ export interface CampaignExecutionActivities {

getCampaignData(input: GetCampaignDataInput): Promise<Campaign>;

publishCampaignsPack(input: PublishCampaignsPackInput): Promise<void>;

updateExecutionProgress(input: UpdateExecutionProgressInput): Promise<void>;

markBatchAsProcessed(
Expand Down Expand Up @@ -420,13 +436,50 @@ export async function updateExecutionProgress(
}
}

/**
* Switch-flip of the distributed campaign pipeline (story 4.7 / EVO-1221):
* publishes a single `campaigns.pack` message and returns once the broker
* acks, handing the heavy lifting (audience, pagination, dispatch, tracking)
* to the packer/sender/tracker workers. Replaces the legacy inline dispatch
* (`sendCampaignBatchMessages` → `CampaignMessageSenderService`).
*
* `triggeredBy` is fixed to `'schedule'`: the landed contract enum
* (`campaigns-pack.contract.ts`, story 1.5) is `schedule|manual|recurrence`
* and the packer only reads `campaignId`/`correlationId`, so the field is
* pure provenance — the Temporal trigger fires when the campaign schedule
* expires. A broker error propagates so Temporal applies the activity proxy's
* default retry policy (the broker is the fault, not the workflow).
*/
export async function publishCampaignsPack(
input: PublishCampaignsPackInput,
): Promise<void> {
const { campaignId, correlationId } = input;

log.info('Publishing campaigns.pack', { campaignId, correlationId });

const app = (await getAppContext()) as INestApplicationContext;
const broker = app.get<IMessageBroker>(IMESSAGE_BROKER);

const payload: CampaignsPackContract = {
campaignId,
triggeredAt: new Date().toISOString(),
triggeredBy: 'schedule',
correlationId,
};

await broker.publish(CAMPAIGNS_PACK_TOPIC, payload);

log.info('campaigns.pack published', { campaignId, correlationId });
}

// Export activities object for worker registration
export const campaignExecutionActivities: CampaignExecutionActivities = {
computeCampaignAudience,
createCampaignBatches,
getCampaignBatch,
updateCampaignStatus,
getCampaignData,
publishCampaignsPack,
updateExecutionProgress,
markBatchAsProcessed,
};
Loading
Loading