Skip to content

Commit bf74930

Browse files
fix(redis): implement atomic operations to prevent race conditions
Eliminate race conditions in Redis operations by wrapping critical sections in atomic transactions and using Lua scripts. Critical fixes: - Discovery set registration now atomic with recordStart() - Queue discovery consolidated into recordStart() transaction - Hostname metrics changed from pipeline to transaction - Failure recording fully atomic including hostname updates - Exception recording wrapped in transaction - Retry recording made atomic - Timeout recording made atomic - Throughput calculation uses Lua script for atomicity Medium priority fixes: - Add comprehensive documentation for sample methods - Clarify timestamp storage and ordering behavior This ensures data consistency under concurrent job processing without race conditions between separate Redis operations.
1 parent dedebfa commit bf74930

File tree

2 files changed

+174
-75
lines changed

2 files changed

+174
-75
lines changed

src/Actions/RecordJobStartAction.php

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ public function execute(
2828
return;
2929
}
3030

31-
// Mark queue as discovered for listQueues() to find it
32-
$this->queueMetricsRepository->markQueueDiscovered($connection, $queue);
33-
31+
// Queue discovery now happens atomically inside recordStart()
3432
$this->repository->recordStart(
3533
jobId: $jobId,
3634
jobClass: $jobClass,

src/Repositories/RedisJobMetricsRepository.php

Lines changed: 173 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,48 @@ public function recordStart(
2424
string $queue,
2525
Carbon $startedAt,
2626
): void {
27-
$driver = $this->redis->driver();
2827
$metricsKey = $this->redis->key('jobs', $connection, $queue, $jobClass);
2928
$jobKey = $this->redis->key('job', $jobId);
29+
$jobDiscoveryKey = $this->redis->key('discovery', 'jobs');
30+
$queueDiscoveryKey = $this->redis->key('discovery', 'queues');
3031
$ttl = $this->redis->getTtl('raw');
32+
$discoveryTtl = $this->redis->getTtl('aggregated');
3133

32-
// Register job in discovery set (push-based tracking)
33-
$this->markJobDiscovered($connection, $queue, $jobClass);
34-
35-
// Increment total queued counter
36-
$driver->incrementHashField($metricsKey, 'total_queued', 1);
37-
38-
// Store job start time
39-
$driver->setHash($jobKey, [
40-
'job_class' => $jobClass,
41-
'connection' => $connection,
42-
'queue' => $queue,
43-
'started_at' => $startedAt->timestamp,
44-
], $ttl);
45-
46-
// Ensure TTL is set on metrics key
47-
$driver->expire($metricsKey, $ttl);
34+
// Use transaction to ensure atomic registration of both discovery sets with metrics
35+
$this->redis->transaction(function ($pipe) use (
36+
$jobDiscoveryKey,
37+
$queueDiscoveryKey,
38+
$metricsKey,
39+
$jobKey,
40+
$connection,
41+
$queue,
42+
$jobClass,
43+
$startedAt,
44+
$ttl,
45+
$discoveryTtl
46+
) {
47+
// Register queue in discovery set (push-based tracking)
48+
$pipe->addToSet($queueDiscoveryKey, ["{$connection}:{$queue}"]);
49+
$pipe->expire($queueDiscoveryKey, $discoveryTtl);
50+
51+
// Register job in discovery set (push-based tracking)
52+
$pipe->addToSet($jobDiscoveryKey, ["{$connection}:{$queue}:{$jobClass}"]);
53+
$pipe->expire($jobDiscoveryKey, $discoveryTtl);
54+
55+
// Increment total queued counter
56+
$pipe->incrementHashField($metricsKey, 'total_queued', 1);
57+
58+
// Store job start time
59+
$pipe->setHash($jobKey, [
60+
'job_class' => $jobClass,
61+
'connection' => $connection,
62+
'queue' => $queue,
63+
'started_at' => $startedAt->timestamp,
64+
], $ttl);
65+
66+
// Ensure TTL is set on metrics key
67+
$pipe->expire($metricsKey, $ttl);
68+
});
4869
}
4970

5071
public function recordCompletion(
@@ -132,35 +153,52 @@ public function recordFailure(
132153
Carbon $failedAt,
133154
?string $hostname = null,
134155
): void {
135-
$driver = $this->redis->driver();
136156
$metricsKey = $this->redis->key('jobs', $connection, $queue, $jobClass);
157+
$jobKey = $this->redis->key('job', $jobId);
137158
$ttl = $this->redis->getTtl('raw');
138159

139-
$driver->pipeline(function ($pipe) use ($metricsKey, $exception, $failedAt, $ttl) {
140-
$pipe->incrementHashField($metricsKey, 'total_failed', 1);
141-
$pipe->setHash($metricsKey, [
142-
'last_failed_at' => $failedAt->timestamp,
143-
'last_exception' => substr($exception, 0, 1000),
144-
]);
145-
// Refresh TTL on metrics key
146-
$pipe->expire($metricsKey, $ttl);
147-
});
148-
149-
// Store hostname-scoped metrics if hostname is provided
150160
if ($hostname !== null) {
151-
$this->recordHostnameMetrics(
152-
$hostname,
153-
$connection,
154-
$queue,
155-
$jobClass,
156-
0.0,
157-
false,
158-
$failedAt
159-
);
161+
// Include hostname metrics in the same transaction
162+
$serverKey = $this->redis->key('server_jobs', $hostname, $connection, $queue, $jobClass);
163+
164+
$this->redis->transaction(function ($pipe) use (
165+
$metricsKey,
166+
$serverKey,
167+
$jobKey,
168+
$exception,
169+
$failedAt,
170+
$ttl
171+
) {
172+
// Job-level failure metrics
173+
$pipe->incrementHashField($metricsKey, 'total_failed', 1);
174+
$pipe->setHash($metricsKey, [
175+
'last_failed_at' => $failedAt->timestamp,
176+
'last_exception' => substr($exception, 0, 1000),
177+
]);
178+
$pipe->expire($metricsKey, $ttl);
179+
180+
// Hostname-level failure metrics
181+
$pipe->incrementHashField($serverKey, 'total_failed', 1);
182+
$pipe->setHash($serverKey, ['last_updated_at' => $failedAt->timestamp]);
183+
$pipe->expire($serverKey, $ttl);
184+
185+
// Clean up job tracking key
186+
$pipe->delete($jobKey);
187+
});
188+
} else {
189+
// Transaction without hostname metrics
190+
$this->redis->transaction(function ($pipe) use ($metricsKey, $jobKey, $exception, $failedAt, $ttl) {
191+
$pipe->incrementHashField($metricsKey, 'total_failed', 1);
192+
$pipe->setHash($metricsKey, [
193+
'last_failed_at' => $failedAt->timestamp,
194+
'last_exception' => substr($exception, 0, 1000),
195+
]);
196+
$pipe->expire($metricsKey, $ttl);
197+
198+
// Clean up job tracking key
199+
$pipe->delete($jobKey);
200+
});
160201
}
161-
162-
// Clean up job tracking key
163-
$driver->delete($this->redis->key('job', $jobId));
164202
}
165203

166204
/**
@@ -178,7 +216,8 @@ private function recordHostnameMetrics(
178216
$serverKey = $this->redis->key('server_jobs', $hostname, $connection, $queue, $jobClass);
179217
$ttl = $this->redis->getTtl('raw');
180218

181-
$this->redis->driver()->pipeline(function ($pipe) use ($serverKey, $durationMs, $success, $timestamp, $ttl) {
219+
// Use transaction instead of pipeline to ensure atomicity
220+
$this->redis->transaction(function ($pipe) use ($serverKey, $durationMs, $success, $timestamp, $ttl) {
182221
if ($success) {
183222
$pipe->incrementHashField($serverKey, 'total_processed', 1);
184223
$pipe->incrementHashField($serverKey, 'total_duration_ms', $durationMs);
@@ -256,7 +295,18 @@ public function getMetrics(
256295
}
257296

258297
/**
259-
* @return array<int, float>
298+
* Get duration samples for job performance analysis.
299+
*
300+
* Returns the most recent duration measurements, with timestamps stored as
301+
* sorted set scores in Redis. Samples are stored in timestamp order, so
302+
* negative indices retrieve the most recent samples first.
303+
*
304+
* Sample Limit Behavior:
305+
* - If fewer than $limit samples exist, returns all available samples
306+
* - Samples are returned in chronological order (oldest to newest)
307+
* - Default limit of 1000 provides sufficient data for statistical analysis
308+
*
309+
* @return array<int, float> Duration values in milliseconds, chronologically ordered
260310
*/
261311
public function getDurationSamples(
262312
string $jobClass,
@@ -268,14 +318,25 @@ public function getDurationSamples(
268318
$driver = $this->redis->driver();
269319

270320
// Get most recent samples (reverse order, so use negative indices)
321+
// Timestamps are used as sorted set scores for time-based querying
271322
/** @var array<string> */
272323
$samples = $driver->getSortedSetByRank($key, -$limit, -1);
273324

274325
return array_map('floatval', array_reverse($samples));
275326
}
276327

277328
/**
278-
* @return array<int, float>
329+
* Get memory usage samples for resource analysis.
330+
*
331+
* Returns the most recent memory measurements, with timestamps stored as
332+
* sorted set scores in Redis. Samples are stored in timestamp order.
333+
*
334+
* Sample Limit Behavior:
335+
* - If fewer than $limit samples exist, returns all available samples
336+
* - Samples are returned in chronological order (oldest to newest)
337+
* - Timestamps in sorted set scores enable time-based filtering
338+
*
339+
* @return array<int, float> Memory values in megabytes, chronologically ordered
279340
*/
280341
public function getMemorySamples(
281342
string $jobClass,
@@ -286,14 +347,25 @@ public function getMemorySamples(
286347
$key = $this->redis->key('memory', $connection, $queue, $jobClass);
287348
$driver = $this->redis->driver();
288349

350+
// Timestamps are used as sorted set scores for time-based querying
289351
/** @var array<string> */
290352
$samples = $driver->getSortedSetByRank($key, -$limit, -1);
291353

292354
return array_map('floatval', array_reverse($samples));
293355
}
294356

295357
/**
296-
* @return array<int, float>
358+
* Get CPU time samples for performance analysis.
359+
*
360+
* Returns the most recent CPU time measurements, with timestamps stored as
361+
* sorted set scores in Redis. Samples are stored in timestamp order.
362+
*
363+
* Sample Limit Behavior:
364+
* - If fewer than $limit samples exist, returns all available samples
365+
* - Samples are returned in chronological order (oldest to newest)
366+
* - Timestamps in sorted set scores enable time-based filtering
367+
*
368+
* @return array<int, float> CPU time values in milliseconds, chronologically ordered
297369
*/
298370
public function getCpuTimeSamples(
299371
string $jobClass,
@@ -304,6 +376,7 @@ public function getCpuTimeSamples(
304376
$key = $this->redis->key('cpu', $connection, $queue, $jobClass);
305377
$driver = $this->redis->driver();
306378

379+
// Timestamps are used as sorted set scores for time-based querying
307380
/** @var array<string> */
308381
$samples = $driver->getSortedSetByRank($key, -$limit, -1);
309382

@@ -319,10 +392,18 @@ public function getThroughput(
319392
$key = $this->redis->key('durations', $connection, $queue, $jobClass);
320393
$driver = $this->redis->driver();
321394

322-
$cutoff = Carbon::now()->subSeconds($windowSeconds)->timestamp;
323-
324-
// Count samples within time window
325-
return $driver->countSortedSetByScore($key, (string) $cutoff, '+inf');
395+
// Use Lua script to atomically calculate cutoff and count items
396+
// This prevents race conditions where items could be added between
397+
// calculating the cutoff timestamp and executing the count
398+
$script = <<<'LUA'
399+
local key = KEYS[1]
400+
local windowSeconds = tonumber(ARGV[1])
401+
local cutoff = redis.call('TIME')[1] - windowSeconds
402+
return redis.call('ZCOUNT', key, cutoff, '+inf')
403+
LUA;
404+
405+
/** @var int */
406+
return $driver->eval($script, 1, $key, $windowSeconds);
326407
}
327408

328409
public function recordQueuedAt(
@@ -347,21 +428,30 @@ public function recordRetryRequested(
347428
Carbon $retryRequestedAt,
348429
int $attemptNumber,
349430
): void {
350-
$driver = $this->redis->driver();
351431
$metricsKey = $this->redis->key('jobs', $connection, $queue, $jobClass);
352432
$retryKey = $this->redis->key('retries', $connection, $queue, $jobClass);
353433
$ttl = $this->redis->getTtl('raw');
434+
$retryData = json_encode(['job_id' => $jobId, 'attempt' => $attemptNumber], JSON_THROW_ON_ERROR);
354435

355-
// Increment retry counter
356-
$driver->incrementHashField($metricsKey, 'total_retries', 1);
436+
// Use transaction to ensure atomic retry recording
437+
$this->redis->transaction(function ($pipe) use (
438+
$metricsKey,
439+
$retryKey,
440+
$retryData,
441+
$retryRequestedAt,
442+
$ttl
443+
) {
444+
// Increment retry counter
445+
$pipe->incrementHashField($metricsKey, 'total_retries', 1);
357446

358-
// Store retry event for pattern analysis
359-
$driver->addToSortedSet($retryKey, [
360-
json_encode(['job_id' => $jobId, 'attempt' => $attemptNumber], JSON_THROW_ON_ERROR) => (int) $retryRequestedAt->timestamp,
361-
], $ttl);
447+
// Store retry event for pattern analysis
448+
$pipe->addToSortedSet($retryKey, [
449+
$retryData => (int) $retryRequestedAt->timestamp,
450+
], $ttl);
362451

363-
// Refresh TTL on metrics key
364-
$driver->expire($metricsKey, $ttl);
452+
// Refresh TTL on metrics key
453+
$pipe->expire($metricsKey, $ttl);
454+
});
365455
}
366456

367457
public function recordTimeout(
@@ -371,16 +461,18 @@ public function recordTimeout(
371461
string $queue,
372462
Carbon $timedOutAt,
373463
): void {
374-
$driver = $this->redis->driver();
375464
$metricsKey = $this->redis->key('jobs', $connection, $queue, $jobClass);
376465
$ttl = $this->redis->getTtl('raw');
377466

378-
// Increment timeout counter
379-
$driver->incrementHashField($metricsKey, 'total_timeouts', 1);
380-
$driver->setHash($metricsKey, ['last_timeout_at' => $timedOutAt->timestamp]);
467+
// Use transaction to ensure atomic timeout recording
468+
$this->redis->transaction(function ($pipe) use ($metricsKey, $timedOutAt, $ttl) {
469+
// Increment timeout counter
470+
$pipe->incrementHashField($metricsKey, 'total_timeouts', 1);
471+
$pipe->setHash($metricsKey, ['last_timeout_at' => $timedOutAt->timestamp]);
381472

382-
// Refresh TTL on metrics key
383-
$driver->expire($metricsKey, $ttl);
473+
// Refresh TTL on metrics key
474+
$pipe->expire($metricsKey, $ttl);
475+
});
384476
}
385477

386478
public function recordException(
@@ -392,20 +484,29 @@ public function recordException(
392484
string $exceptionMessage,
393485
Carbon $occurredAt,
394486
): void {
395-
$driver = $this->redis->driver();
396487
$metricsKey = $this->redis->key('jobs', $connection, $queue, $jobClass);
397488
$exceptionsKey = $this->redis->key('exceptions', $connection, $queue, $jobClass);
398489
$ttl = $this->redis->getTtl('raw');
490+
$aggregatedTtl = $this->redis->getTtl('aggregated');
399491

400-
// Increment exception counter
401-
$driver->incrementHashField($metricsKey, 'total_exceptions', 1);
492+
// Use transaction to ensure atomic exception recording
493+
$this->redis->transaction(function ($pipe) use (
494+
$metricsKey,
495+
$exceptionsKey,
496+
$exceptionClass,
497+
$ttl,
498+
$aggregatedTtl
499+
) {
500+
// Increment exception counter
501+
$pipe->incrementHashField($metricsKey, 'total_exceptions', 1);
402502

403-
// Track exception types
404-
$driver->incrementHashField($exceptionsKey, $exceptionClass, 1);
405-
$driver->expire($exceptionsKey, $this->redis->getTtl('aggregated'));
503+
// Track exception types
504+
$pipe->incrementHashField($exceptionsKey, $exceptionClass, 1);
505+
$pipe->expire($exceptionsKey, $aggregatedTtl);
406506

407-
// Refresh TTL on metrics key
408-
$driver->expire($metricsKey, $ttl);
507+
// Refresh TTL on metrics key
508+
$pipe->expire($metricsKey, $ttl);
509+
});
409510
}
410511

411512
public function cleanup(int $olderThanSeconds): int

0 commit comments

Comments
 (0)