Skip to content

Commit d28ff48

Browse files
dereuromarkclaude
andcommitted
Add queue job lifecycle events for monitoring integration
Add events at key lifecycle points to enable integration with monitoring tools like Sentry: - Queue.Job.created: Fired when a job is added to the queue (producer) - Queue.Job.started: Fired when a worker begins processing a job (consumer) - Queue.Job.completed: Fired when a job finishes successfully - Queue.Job.failed: Fired on every job failure (not just when exhausted) These events provide the necessary hooks for implementing Sentry's queue monitoring feature or similar monitoring tools. The job entity included in each event provides all data needed for tracing (job ID, task name, payload, attempts, timestamps). Refs: LordSimal/cakephp-sentry#17 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent cb2221a commit d28ff48

File tree

5 files changed

+226
-2
lines changed

5 files changed

+226
-2
lines changed

docs/sections/misc.md

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,64 @@ This includes also failed ones if not filtered further using `where()` condition
5151

5252
## Events
5353
The Queue plugin dispatches events to allow you to hook into the queue processing lifecycle.
54+
These events are useful for monitoring, logging, and integrating with external services like Sentry.
55+
56+
### Queue.Job.created
57+
This event is triggered when a new job is added to the queue (producer side).
58+
59+
```php
60+
use Cake\Event\EventInterface;
61+
use Cake\Event\EventManager;
62+
63+
EventManager::instance()->on('Queue.Job.created', function (EventInterface $event) {
64+
$job = $event->getData('job');
65+
// Track job creation for monitoring
66+
});
67+
```
68+
69+
Event data:
70+
- `job`: The `QueuedJob` entity that was created
71+
72+
### Queue.Job.started
73+
This event is triggered when a worker begins processing a job (consumer side).
74+
75+
```php
76+
EventManager::instance()->on('Queue.Job.started', function (EventInterface $event) {
77+
$job = $event->getData('job');
78+
// Start tracing/monitoring span
79+
});
80+
```
81+
82+
Event data:
83+
- `job`: The `QueuedJob` entity being processed
84+
85+
### Queue.Job.completed
86+
This event is triggered when a job finishes successfully.
87+
88+
```php
89+
EventManager::instance()->on('Queue.Job.completed', function (EventInterface $event) {
90+
$job = $event->getData('job');
91+
// Mark trace as successful
92+
});
93+
```
94+
95+
Event data:
96+
- `job`: The `QueuedJob` entity that completed
97+
98+
### Queue.Job.failed
99+
This event is triggered when a job fails (on every failure attempt).
100+
101+
```php
102+
EventManager::instance()->on('Queue.Job.failed', function (EventInterface $event) {
103+
$job = $event->getData('job');
104+
$failureMessage = $event->getData('failureMessage');
105+
// Mark trace as failed, log error
106+
});
107+
```
108+
109+
Event data:
110+
- `job`: The `QueuedJob` entity that failed
111+
- `failureMessage`: The error message from the failure
54112

55113
### Queue.Job.maxAttemptsExhausted
56114
This event is triggered when a job has failed and exhausted all of its configured retry attempts.
@@ -81,10 +139,21 @@ EventManager::instance()->on('Queue.Job.maxAttemptsExhausted', function (EventIn
81139
});
82140
```
83141

84-
The event data contains:
142+
Event data:
85143
- `job`: The `QueuedJob` entity that failed
86144
- `failureMessage`: The error message from the last failure
87145

146+
### Monitoring Integration (Sentry, etc.)
147+
148+
These events enable integration with monitoring tools like Sentry's queue monitoring feature.
149+
The job entity provides all necessary data for tracing:
150+
151+
- `$job->id` - Message identifier (`messaging.message.id`)
152+
- `$job->job_task` - Queue/topic name (`messaging.destination.name`)
153+
- `$job->data` - Payload for calculating message size (`messaging.message.body.size`)
154+
- `$job->attempts` - Retry count (`messaging.message.retry.count`)
155+
- `$job->created`, `$job->notbefore`, `$job->fetched` - For calculating receive latency (`messaging.message.receive.latency`)
156+
88157
## Notes
89158

90159
`<TaskName>` is the complete class name without the Task suffix (e.g. Example or PluginName.Example).

src/Model/Table/QueuedJobsTable.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
use ArrayObject;
77
use Cake\Core\Configure;
88
use Cake\Core\Plugin;
9+
use Cake\Event\Event;
910
use Cake\Event\EventInterface;
11+
use Cake\Event\EventManager;
1012
use Cake\I18n\DateTime;
1113
use Cake\ORM\Query\SelectQuery;
1214
use Cake\ORM\Table;
@@ -216,8 +218,14 @@ public function createJob(string $jobTask, array|object|null $data = null, array
216218
}
217219

218220
$queuedJob = $this->newEntity($queuedJob);
221+
$queuedJob = $this->saveOrFail($queuedJob);
219222

220-
return $this->saveOrFail($queuedJob);
223+
$event = new Event('Queue.Job.created', $this, [
224+
'job' => $queuedJob,
225+
]);
226+
EventManager::instance()->dispatch($event);
227+
228+
return $queuedJob;
221229
}
222230

223231
/**

src/Queue/Processor.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,11 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
213213
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id, $pid, false);
214214
$taskName = $queuedJob->job_task;
215215

216+
$event = new Event('Queue.Job.started', $this, [
217+
'job' => $queuedJob,
218+
]);
219+
EventManager::instance()->dispatch($event);
220+
216221
$return = $failureMessage = null;
217222
try {
218223
$this->time = time();
@@ -242,6 +247,12 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
242247
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid);
243248
$this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.');
244249

250+
$event = new Event('Queue.Job.failed', $this, [
251+
'job' => $queuedJob,
252+
'failureMessage' => $failureMessage,
253+
]);
254+
EventManager::instance()->dispatch($event);
255+
245256
// Dispatch event when job has exhausted all retries
246257
if ($failedStatus === 'aborted') {
247258
$event = new Event('Queue.Job.maxAttemptsExhausted', $this, [
@@ -255,6 +266,12 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
255266
}
256267

257268
$this->QueuedJobs->markJobDone($queuedJob);
269+
270+
$event = new Event('Queue.Job.completed', $this, [
271+
'job' => $queuedJob,
272+
]);
273+
EventManager::instance()->dispatch($event);
274+
258275
$this->io->out('Job Finished.');
259276
$this->currentJob = null;
260277
}

tests/TestCase/Model/Table/QueuedJobsTableTest.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
use Cake\Core\Configure;
1313
use Cake\Datasource\ConnectionManager;
14+
use Cake\Event\EventList;
15+
use Cake\Event\EventManager;
1416
use Cake\I18n\DateTime;
1517
use Cake\ORM\TableRegistry;
1618
use Cake\TestSuite\TestCase;
@@ -752,6 +754,23 @@ public function testGetStats() {
752754
$this->assertWithinRange(7200, (int)$queuedJob->fetchdelay, 1);
753755
}
754756

757+
/**
758+
* Test that Queue.Job.created event is fired when a job is created
759+
*
760+
* @return void
761+
*/
762+
public function testJobCreatedEvent() {
763+
// Set up event tracking
764+
$eventList = new EventList();
765+
EventManager::instance()->setEventList($eventList);
766+
767+
// Create a job
768+
$job = $this->QueuedJobs->createJob('Queue.Example', ['test' => 'data']);
769+
770+
// Check that the created event was dispatched
771+
$this->assertEventFired('Queue.Job.created');
772+
}
773+
755774
/**
756775
* Helper method for skipping tests that need a real connection.
757776
*

tests/TestCase/Queue/ProcessorTest.php

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,117 @@ public function testWorkerTimeoutHandlingIntegration() {
286286
}
287287
}
288288

289+
/**
290+
* Test that Queue.Job.started event is fired when job begins processing
291+
*
292+
* @return void
293+
*/
294+
public function testJobStartedEvent() {
295+
// Set up event tracking
296+
$eventList = new EventList();
297+
EventManager::instance()->setEventList($eventList);
298+
299+
// Create a job
300+
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
301+
$job = $QueuedJobs->createJob('Queue.Example', [], ['priority' => 1]);
302+
303+
// Create processor with mock task
304+
$out = new ConsoleOutput();
305+
$err = new ConsoleOutput();
306+
$processor = $this->getMockBuilder(Processor::class)
307+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
308+
->onlyMethods(['loadTask'])
309+
->getMock();
310+
311+
// Create a mock task that succeeds (run method is void, so no return)
312+
$mockTask = $this->getMockBuilder(\Queue\Queue\Task\ExampleTask::class)
313+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
314+
->onlyMethods(['run'])
315+
->getMock();
316+
317+
$processor->method('loadTask')->willReturn($mockTask);
318+
319+
// Run the job
320+
$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);
321+
322+
// Check that the started event was dispatched
323+
$this->assertEventFired('Queue.Job.started');
324+
}
325+
326+
/**
327+
* Test that Queue.Job.completed event is fired when job finishes successfully
328+
*
329+
* @return void
330+
*/
331+
public function testJobCompletedEvent() {
332+
// Set up event tracking
333+
$eventList = new EventList();
334+
EventManager::instance()->setEventList($eventList);
335+
336+
// Create a job
337+
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
338+
$job = $QueuedJobs->createJob('Queue.Example', [], ['priority' => 1]);
339+
340+
// Create processor with mock task
341+
$out = new ConsoleOutput();
342+
$err = new ConsoleOutput();
343+
$processor = $this->getMockBuilder(Processor::class)
344+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
345+
->onlyMethods(['loadTask'])
346+
->getMock();
347+
348+
// Create a mock task that succeeds (run method is void, so no return)
349+
$mockTask = $this->getMockBuilder(\Queue\Queue\Task\ExampleTask::class)
350+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
351+
->onlyMethods(['run'])
352+
->getMock();
353+
354+
$processor->method('loadTask')->willReturn($mockTask);
355+
356+
// Run the job
357+
$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);
358+
359+
// Check that the completed event was dispatched
360+
$this->assertEventFired('Queue.Job.completed');
361+
}
362+
363+
/**
364+
* Test that Queue.Job.failed event is fired when job fails
365+
*
366+
* @return void
367+
*/
368+
public function testJobFailedEvent() {
369+
// Set up event tracking
370+
$eventList = new EventList();
371+
EventManager::instance()->setEventList($eventList);
372+
373+
// Create a job that will fail
374+
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
375+
$job = $QueuedJobs->createJob('Queue.RetryExample', [], ['priority' => 1]);
376+
377+
// Create processor with mock task that fails
378+
$out = new ConsoleOutput();
379+
$err = new ConsoleOutput();
380+
$processor = $this->getMockBuilder(Processor::class)
381+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
382+
->onlyMethods(['loadTask'])
383+
->getMock();
384+
385+
$mockTask = $this->getMockBuilder(RetryExampleTask::class)
386+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
387+
->onlyMethods(['run'])
388+
->getMock();
389+
$mockTask->method('run')->willThrowException(new RuntimeException('Task failed'));
390+
391+
$processor->method('loadTask')->willReturn($mockTask);
392+
393+
// Run the job (it will fail)
394+
$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);
395+
396+
// Check that the failed event was dispatched
397+
$this->assertEventFired('Queue.Job.failed');
398+
}
399+
289400
/**
290401
* Test setPhpTimeout with new config names
291402
*

0 commit comments

Comments
 (0)