Skip to content

Commit 7f5b48a

Browse files
fix(metrics): ensure throughput and duration use same time window
Fix race condition where throughput_per_minute and avg_duration_ms were calculated from different time windows, causing mismatched metrics. Changes: - Add getAverageDurationInWindow() method to calculate duration from same window as throughput - Use 60-second window for both throughput and avg_duration calculations - Store duration/memory/CPU samples with unique "jobId:value" format to prevent overwrites - Calculate weighted average duration across all jobs in the queue - Separate lifetime metrics (failure_rate) from windowed metrics (throughput, avg_duration) Technical details: - Atomic Lua script calculates average duration from windowed samples - Prevents race condition where jobs with identical values overwrote each other - Ensures metric consistency by using the same Redis ZRANGEBYSCORE window
1 parent bc33282 commit 7f5b48a

File tree

3 files changed

+97
-26
lines changed

3 files changed

+97
-26
lines changed

src/Actions/CalculateQueueMetricsAction.php

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,34 @@ public function execute(string $connection, string $queue): void
4040
return;
4141
}
4242

43-
// Aggregate metrics across all job classes
43+
// Aggregate metrics across all job classes for CURRENT window (last 60 seconds)
44+
// This ensures throughput and avg_duration are calculated from the same time window
45+
$windowSeconds = 60;
46+
$throughputPerMinute = 0;
47+
$totalDurationMs = 0.0;
48+
$totalJobsInWindow = 0;
49+
50+
// Aggregate lifetime metrics for failure rate
4451
$totalProcessed = 0;
4552
$totalFailed = 0;
46-
$totalDurationMs = 0.0;
4753
$lastProcessedAt = null;
4854

4955
foreach ($queueJobs as $job) {
5056
$jobClass = $job['jobClass'];
57+
58+
// Get current window metrics (last 60 seconds)
59+
$jobThroughput = $this->jobRepository->getThroughput($jobClass, $connection, $queue, $windowSeconds);
60+
$jobAvgDuration = $this->jobRepository->getAverageDurationInWindow($jobClass, $connection, $queue, $windowSeconds);
61+
62+
$throughputPerMinute += $jobThroughput;
63+
$totalDurationMs += ($jobAvgDuration * $jobThroughput); // Weighted by job count
64+
$totalJobsInWindow += $jobThroughput;
65+
66+
// Get lifetime metrics for failure rate and last_processed_at
5167
$metrics = $this->jobRepository->getMetrics($jobClass, $connection, $queue);
5268

5369
$totalProcessed += is_int($metrics['total_processed']) ? $metrics['total_processed'] : 0;
5470
$totalFailed += is_int($metrics['total_failed']) ? $metrics['total_failed'] : 0;
55-
$totalDurationMs += is_float($metrics['total_duration_ms']) || is_int($metrics['total_duration_ms'])
56-
? (float) $metrics['total_duration_ms']
57-
: 0.0;
5871

5972
if ($metrics['last_processed_at'] instanceof \Carbon\Carbon) {
6073
if ($lastProcessedAt === null || $metrics['last_processed_at']->greaterThan($lastProcessedAt)) {
@@ -63,24 +76,14 @@ public function execute(string $connection, string $queue): void
6376
}
6477
}
6578

66-
// Calculate aggregated metrics
67-
$avgDuration = $totalProcessed > 0 ? $totalDurationMs / $totalProcessed : 0.0;
79+
// Calculate aggregated metrics for current window
80+
$avgDuration = $totalJobsInWindow > 0 ? $totalDurationMs / $totalJobsInWindow : 0.0;
81+
82+
// Calculate failure rate from lifetime totals
6883
$failureRate = ($totalProcessed + $totalFailed) > 0
6984
? ($totalFailed / ($totalProcessed + $totalFailed)) * 100.0
7085
: 0.0;
7186

72-
// Calculate throughput per minute (jobs completed in last 60 seconds)
73-
$throughputPerMinute = 0.0;
74-
foreach ($queueJobs as $job) {
75-
$jobClass = $job['jobClass'];
76-
$throughputPerMinute += $this->jobRepository->getThroughput(
77-
$jobClass,
78-
$connection,
79-
$queue,
80-
60 // last 60 seconds
81-
);
82-
}
83-
8487
// Store aggregated metrics
8588
$this->queueRepository->recordSnapshot($connection, $queue, [
8689
'throughput_per_minute' => $throughputPerMinute,

src/Repositories/Contracts/JobMetricsRepository.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,18 @@ public function getThroughput(
114114
int $windowSeconds,
115115
): int;
116116

117+
/**
118+
* Get average duration for jobs completed within a specific time window.
119+
*
120+
* @return float Average duration in milliseconds, 0.0 if no jobs in window
121+
*/
122+
public function getAverageDurationInWindow(
123+
string $jobClass,
124+
string $connection,
125+
string $queue,
126+
int $windowSeconds,
127+
): float;
128+
117129
/**
118130
* Record when a job is queued for time-to-start tracking.
119131
*/

src/Repositories/RedisJobMetricsRepository.php

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,27 +95,33 @@ public function recordCompletion(
9595
$memoryMb,
9696
$cpuTimeMs,
9797
$completedAt,
98-
$ttl
98+
$ttl,
99+
$jobId
99100
) {
100101
$pipe->incrementHashField($metricsKey, 'total_processed', 1);
101102
$pipe->incrementHashField($metricsKey, 'total_duration_ms', $durationMs);
102103
$pipe->incrementHashField($metricsKey, 'total_memory_mb', $memoryMb);
103104
$pipe->incrementHashField($metricsKey, 'total_cpu_time_ms', $cpuTimeMs);
104105
$pipe->setHash($metricsKey, ['last_processed_at' => $completedAt->timestamp]);
105106

106-
// Store duration sample (sorted set with timestamp as score)
107+
// Store samples in sorted sets with timestamp as score
108+
// Use a unique member format: "jobId:value" to ensure each job gets a separate entry
109+
// This allows multiple jobs with the same duration/memory/cpu to be stored
110+
$durationMember = $jobId . ':' . $durationMs;
107111
/** @var array<string, int> $durationSample */
108-
$durationSample = [(string) $durationMs => (int) $completedAt->timestamp];
112+
$durationSample = [$durationMember => (int) $completedAt->timestamp];
109113
$pipe->addToSortedSet($durationKey, $durationSample, $ttl);
110114

111-
// Store memory sample
115+
// Store memory sample with unique member
116+
$memoryMember = $jobId . ':' . $memoryMb;
112117
/** @var array<string, int> $memorySample */
113-
$memorySample = [(string) $memoryMb => (int) $completedAt->timestamp];
118+
$memorySample = [$memoryMember => (int) $completedAt->timestamp];
114119
$pipe->addToSortedSet($memoryKey, $memorySample, $ttl);
115120

116-
// Store CPU time sample
121+
// Store CPU time sample with unique member
122+
$cpuMember = $jobId . ':' . $cpuTimeMs;
117123
/** @var array<string, int> $cpuSample */
118-
$cpuSample = [(string) $cpuTimeMs => (int) $completedAt->timestamp];
124+
$cpuSample = [$cpuMember => (int) $completedAt->timestamp];
119125
$pipe->addToSortedSet($cpuKey, $cpuSample, $ttl);
120126

121127
// Refresh TTL on metrics key
@@ -406,6 +412,56 @@ public function getThroughput(
406412
return $driver->eval($script, 1, $key, $windowSeconds);
407413
}
408414

415+
public function getAverageDurationInWindow(
416+
string $jobClass,
417+
string $connection,
418+
string $queue,
419+
int $windowSeconds,
420+
): float {
421+
$key = $this->redis->key('durations', $connection, $queue, $jobClass);
422+
$driver = $this->redis->driver();
423+
424+
// Use Lua script to atomically get samples within window and calculate average
425+
// This ensures consistency between throughput and average duration calculations
426+
$script = <<<'LUA'
427+
local key = KEYS[1]
428+
local windowSeconds = tonumber(ARGV[1])
429+
local cutoff = redis.call('TIME')[1] - windowSeconds
430+
431+
-- Get all members in the window (members are "jobId:duration")
432+
local samples = redis.call('ZRANGEBYSCORE', key, cutoff, '+inf')
433+
434+
if #samples == 0 then
435+
return 0
436+
end
437+
438+
-- Parse members to extract duration values and calculate average
439+
-- Each member is formatted as "jobId:duration"
440+
local sum = 0
441+
local count = 0
442+
for i = 1, #samples do
443+
local member = samples[i]
444+
local colonPos = string.find(member, ":")
445+
if colonPos then
446+
local duration = string.sub(member, colonPos + 1)
447+
sum = sum + tonumber(duration)
448+
count = count + 1
449+
end
450+
end
451+
452+
if count == 0 then
453+
return 0
454+
end
455+
456+
return sum / count
457+
LUA;
458+
459+
/** @var float */
460+
$result = $driver->eval($script, 1, $key, $windowSeconds);
461+
462+
return (float) $result;
463+
}
464+
409465
public function recordQueuedAt(
410466
string $jobClass,
411467
string $connection,

0 commit comments

Comments
 (0)