Skip to content

Commit a0d976b

Browse files
Merge pull request #2 from gophpeek/fix/race-conditions-atomic-operations
Fix race conditions and implement queue metrics aggregation
2 parents 50f6fdb + 004f443 commit a0d976b

File tree

9 files changed

+611
-97
lines changed

9 files changed

+611
-97
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PHPeek\LaravelQueueMetrics\Actions;
6+
7+
use PHPeek\LaravelQueueMetrics\Repositories\Contracts\JobMetricsRepository;
8+
use PHPeek\LaravelQueueMetrics\Repositories\Contracts\QueueMetricsRepository;
9+
10+
/**
11+
* Calculate aggregated queue-level metrics from job-level metrics.
12+
*
13+
* This action aggregates metrics across all job classes for a given queue,
14+
* calculating weighted averages and totals for throughput, duration, and failure rates.
15+
*/
16+
final readonly class CalculateQueueMetricsAction
17+
{
18+
public function __construct(
19+
private JobMetricsRepository $jobRepository,
20+
private QueueMetricsRepository $queueRepository,
21+
) {}
22+
23+
/**
24+
* Calculate and store aggregated metrics for a specific queue.
25+
*/
26+
public function execute(string $connection, string $queue): void
27+
{
28+
// Get all jobs for this queue
29+
$allJobs = $this->jobRepository->listJobs();
30+
$queueJobs = array_filter($allJobs, fn ($job) => $job['connection'] === $connection && $job['queue'] === $queue);
31+
32+
if (empty($queueJobs)) {
33+
// No jobs found for this queue - record zero metrics
34+
$this->queueRepository->recordSnapshot($connection, $queue, [
35+
'throughput_per_minute' => 0.0,
36+
'avg_duration' => 0.0,
37+
'failure_rate' => 0.0,
38+
]);
39+
40+
return;
41+
}
42+
43+
// Aggregate metrics across all job classes
44+
$totalProcessed = 0;
45+
$totalFailed = 0;
46+
$totalDurationMs = 0.0;
47+
$lastProcessedAt = null;
48+
49+
foreach ($queueJobs as $job) {
50+
$jobClass = $job['jobClass'];
51+
$metrics = $this->jobRepository->getMetrics($jobClass, $connection, $queue);
52+
53+
$totalProcessed += is_int($metrics['total_processed']) ? $metrics['total_processed'] : 0;
54+
$totalFailed += is_int($metrics['total_failed']) ? $metrics['total_failed'] : 0;
55+
$totalDurationMs += is_float($metrics['total_duration_ms']) || is_int($metrics['total_duration_ms'])
56+
? (float) $metrics['total_duration_ms']
57+
: 0.0;
58+
59+
if ($metrics['last_processed_at'] instanceof \Carbon\Carbon) {
60+
if ($lastProcessedAt === null || $metrics['last_processed_at']->greaterThan($lastProcessedAt)) {
61+
$lastProcessedAt = $metrics['last_processed_at'];
62+
}
63+
}
64+
}
65+
66+
// Calculate aggregated metrics
67+
$avgDuration = $totalProcessed > 0 ? $totalDurationMs / $totalProcessed : 0.0;
68+
$failureRate = ($totalProcessed + $totalFailed) > 0
69+
? ($totalFailed / ($totalProcessed + $totalFailed)) * 100.0
70+
: 0.0;
71+
72+
// Calculate throughput per minute (jobs completed in last 60 seconds)
73+
$throughputPerMinute = 0.0;
74+
foreach ($queueJobs as $job) {
75+
$jobClass = $job['jobClass'];
76+
$throughputPerMinute += $this->jobRepository->getThroughput(
77+
$jobClass,
78+
$connection,
79+
$queue,
80+
60 // last 60 seconds
81+
);
82+
}
83+
84+
// Store aggregated metrics
85+
$this->queueRepository->recordSnapshot($connection, $queue, [
86+
'throughput_per_minute' => $throughputPerMinute,
87+
'avg_duration' => $avgDuration,
88+
'failure_rate' => $failureRate,
89+
'total_processed' => $totalProcessed,
90+
'total_failed' => $totalFailed,
91+
'last_processed_at' => $lastProcessedAt?->timestamp,
92+
]);
93+
}
94+
95+
/**
96+
* Calculate metrics for all discovered queues.
97+
*/
98+
public function executeForAllQueues(): int
99+
{
100+
$queues = $this->queueRepository->listQueues();
101+
$count = 0;
102+
103+
foreach ($queues as $queue) {
104+
$this->execute($queue['connection'], $queue['queue']);
105+
$count++;
106+
}
107+
108+
return $count;
109+
}
110+
}

src/Actions/RecordJobStartAction.php

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
use Carbon\Carbon;
88
use PHPeek\LaravelQueueMetrics\Repositories\Contracts\JobMetricsRepository;
9-
use PHPeek\LaravelQueueMetrics\Repositories\Contracts\QueueMetricsRepository;
109

1110
/**
1211
* Record when a job starts processing.
@@ -15,7 +14,6 @@
1514
{
1615
public function __construct(
1716
private JobMetricsRepository $repository,
18-
private QueueMetricsRepository $queueMetricsRepository,
1917
) {}
2018

2119
public function execute(
@@ -28,9 +26,7 @@ public function execute(
2826
return;
2927
}
3028

31-
// Mark queue as discovered for listQueues() to find it
32-
$this->queueMetricsRepository->markQueueDiscovered($connection, $queue);
33-
29+
// Queue discovery now happens atomically inside recordStart()
3430
$this->repository->recordStart(
3531
jobId: $jobId,
3632
jobClass: $jobClass,
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PHPeek\LaravelQueueMetrics\Console;
6+
7+
use Illuminate\Console\Command;
8+
use PHPeek\LaravelQueueMetrics\Actions\CalculateQueueMetricsAction;
9+
10+
/**
11+
* Calculate and update queue-level aggregated metrics.
12+
*
13+
* This command aggregates job-level metrics into queue-level metrics including
14+
* throughput_per_minute, avg_duration, and failure_rate. These metrics are
15+
* essential for auto-scaling calculations and queue health monitoring.
16+
*/
17+
final class CalculateQueueMetricsCommand extends Command
18+
{
19+
protected $signature = 'queue-metrics:calculate
20+
{--connection= : Calculate metrics only for this connection}
21+
{--queue= : Calculate metrics only for this queue (requires --connection)}';
22+
23+
protected $description = 'Calculate aggregated queue-level metrics from job metrics';
24+
25+
public function __construct(
26+
private readonly CalculateQueueMetricsAction $action,
27+
) {
28+
parent::__construct();
29+
}
30+
31+
public function handle(): int
32+
{
33+
$connection = $this->option('connection');
34+
$queue = $this->option('queue');
35+
36+
// Validate options
37+
if ($queue !== null && $connection === null) {
38+
$this->error('The --queue option requires --connection to be specified');
39+
40+
return self::FAILURE;
41+
}
42+
43+
try {
44+
if ($connection !== null && $queue !== null) {
45+
// Calculate for specific queue
46+
$connectionStr = is_string($connection) ? $connection : '';
47+
$queueStr = is_string($queue) ? $queue : '';
48+
$this->info("Calculating metrics for {$connectionStr}:{$queueStr}...");
49+
$this->action->execute($connectionStr, $queueStr);
50+
$this->info('✓ Metrics calculated successfully');
51+
52+
return self::SUCCESS;
53+
}
54+
55+
// Calculate for all queues
56+
$this->info('Calculating metrics for all queues...');
57+
$count = $this->action->executeForAllQueues();
58+
$this->info("✓ Metrics calculated for {$count} queue(s)");
59+
60+
return self::SUCCESS;
61+
} catch (\Exception $e) {
62+
$this->error("Failed to calculate queue metrics: {$e->getMessage()}");
63+
64+
return self::FAILURE;
65+
}
66+
}
67+
}

src/LaravelQueueMetricsServiceProvider.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use PHPeek\LaravelQueueMetrics\Commands\CleanupStaleWorkersCommand;
2525
use PHPeek\LaravelQueueMetrics\Config\QueueMetricsConfig;
2626
use PHPeek\LaravelQueueMetrics\Config\StorageConfig;
27+
use PHPeek\LaravelQueueMetrics\Console\CalculateQueueMetricsCommand;
2728
use PHPeek\LaravelQueueMetrics\Console\DetectStaleWorkersCommand;
2829
use PHPeek\LaravelQueueMetrics\Console\RecordTrendDataCommand;
2930
use PHPeek\LaravelQueueMetrics\Contracts\QueueInspector;
@@ -69,6 +70,7 @@ public function configurePackage(Package $package): void
6970
->hasRoute('api')
7071
->hasMigration('2024_01_01_000001_create_queue_metrics_storage_tables')
7172
->hasCommand(CalculateBaselinesCommand::class)
73+
->hasCommand(CalculateQueueMetricsCommand::class)
7274
->hasCommand(CleanupStaleWorkersCommand::class)
7375
->hasCommand(DetectStaleWorkersCommand::class)
7476
->hasCommand(RecordTrendDataCommand::class);
@@ -199,6 +201,11 @@ protected function registerScheduledTasks(): void
199201
// Schedule adaptive baseline calculation
200202
$this->scheduleAdaptiveBaselineCalculation($scheduler);
201203

204+
// Schedule queue metrics calculation (aggregate job metrics into queue metrics)
205+
$scheduler->command('queue-metrics:calculate')
206+
->everyMinute()
207+
->withoutOverlapping();
208+
202209
// Schedule trend data recording (every minute for real-time trends)
203210
$scheduler->command('queue-metrics:record-trends')
204211
->everyMinute()

0 commit comments

Comments
 (0)