Skip to content

Commit 6abb268

Browse files
authored
Merge pull request #53 from utopia-php/feat-workerstop
feat: add workerStop handler, graceful exits
2 parents 5cff95b + 83d6ac3 commit 6abb268

File tree

21 files changed

+426
-228
lines changed

21 files changed

+426
-228
lines changed

docker-compose.yml

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ services:
33
container_name: tests
44
build: .
55
volumes:
6-
- ./src:/usr/local/src/src
7-
- ./tests:/usr/local/src/tests
6+
- ./vendor:/usr/src/code/vendor
7+
- ./src:/usr/src/code/src
8+
- ./tests:/usr/src/code/tests
89
depends_on:
910
- swoole
1011
- swoole-amqp
@@ -16,8 +17,9 @@ services:
1617
build: ./tests/Queue/servers/Swoole/.
1718
command: php /usr/src/code/tests/Queue/servers/Swoole/worker.php
1819
volumes:
19-
- ./src:/usr/local/src/src
20-
- ./tests:/usr/local/src/tests
20+
- ./vendor:/usr/src/code/vendor
21+
- ./src:/usr/src/code/src
22+
- ./tests:/usr/src/code/tests
2123
depends_on:
2224
- redis
2325

@@ -26,8 +28,9 @@ services:
2628
build: ./tests/Queue/servers/SwooleRedisCluster/.
2729
command: php /usr/src/code/tests/Queue/servers/SwooleRedisCluster/worker.php
2830
volumes:
29-
- ./src:/usr/local/src/src
30-
- ./tests:/usr/local/src/tests
31+
- ./vendor:/usr/src/code/vendor
32+
- ./src:/usr/src/code/src
33+
- ./tests:/usr/src/code/tests
3134
depends_on:
3235
redis-cluster-0:
3336
condition: service_healthy
@@ -37,8 +40,9 @@ services:
3740
build: ./tests/Queue/servers/AMQP/.
3841
command: php /usr/src/code/tests/Queue/servers/AMQP/worker.php
3942
volumes:
40-
- ./src:/usr/local/src/src
41-
- ./tests:/usr/local/src/tests
43+
- ./vendor:/usr/src/code/vendor
44+
- ./src:/usr/src/code/src
45+
- ./tests:/usr/src/code/tests
4246
depends_on:
4347
amqp:
4448
condition: service_healthy
@@ -48,8 +52,9 @@ services:
4852
build: ./tests/Queue/servers/Workerman/.
4953
command: php /usr/src/code/tests/Queue/servers/Workerman/worker.php start
5054
volumes:
51-
- ./src:/usr/local/src/src
52-
- ./tests:/usr/local/src/tests
55+
- ./vendor:/usr/src/code/vendor
56+
- ./src:/usr/src/code/src
57+
- ./tests:/usr/src/code/tests
5358
depends_on:
5459
- redis
5560

pint.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
{
2-
"preset": "psr12"
2+
"preset": "psr12",
3+
"rules": {
4+
"single_quote": true
5+
}
36
}

src/Queue/Adapter.php

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,4 @@ abstract public function workerStart(callable $callback): self;
4040
* @return self
4141
*/
4242
abstract public function workerStop(callable $callback): self;
43-
44-
/**
45-
* Returns the native server object from the Adapter.
46-
* @return mixed
47-
*/
48-
abstract public function getNative(): mixed;
4943
}

src/Queue/Adapter/Swoole.php

Lines changed: 66 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,55 +2,99 @@
22

33
namespace Utopia\Queue\Adapter;
44

5-
use Swoole\Process\Pool;
5+
use Swoole\Coroutine;
6+
use Swoole\Process;
67
use Utopia\Queue\Adapter;
78
use Utopia\Queue\Consumer;
89

910
class Swoole extends Adapter
1011
{
11-
protected Pool $pool;
12+
/** @var Process[] */
13+
protected array $workers = [];
1214

13-
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
14-
{
15-
parent::__construct($workerNum, $queue, $namespace);
15+
/** @var callable[] */
16+
protected array $onWorkerStart = [];
1617

18+
/** @var callable[] */
19+
protected array $onWorkerStop = [];
20+
21+
public function __construct(
22+
Consumer $consumer,
23+
int $workerNum,
24+
string $queue,
25+
string $namespace = 'utopia-queue',
26+
) {
27+
parent::__construct($workerNum, $queue, $namespace);
1728
$this->consumer = $consumer;
18-
$this->pool = new Pool($workerNum);
1929
}
2030

2131
public function start(): self
2232
{
23-
$this->pool->set(['enable_coroutine' => true]);
24-
$this->pool->start();
33+
for ($i = 0; $i < $this->workerNum; $i++) {
34+
$this->spawnWorker($i);
35+
}
36+
37+
Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);
38+
39+
Coroutine\run(function () {
40+
Process::signal(SIGTERM, fn () => $this->stop());
41+
Process::signal(SIGINT, fn () => $this->stop());
42+
Process::signal(SIGCHLD, fn () => $this->reap());
43+
44+
while (\count($this->workers) > 0) {
45+
Coroutine::sleep(1);
46+
}
47+
});
48+
2549
return $this;
2650
}
2751

28-
public function stop(): self
52+
protected function spawnWorker(int $workerId): void
2953
{
30-
$this->pool->shutdown();
31-
return $this;
54+
$process = new Process(function () use ($workerId) {
55+
Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);
56+
57+
Coroutine\run(function () use ($workerId) {
58+
Process::signal(SIGTERM, fn () => $this->consumer->close());
59+
60+
foreach ($this->onWorkerStart as $callback) {
61+
$callback((string)$workerId);
62+
}
63+
64+
foreach ($this->onWorkerStop as $callback) {
65+
$callback((string)$workerId);
66+
}
67+
});
68+
}, false, 0, false);
69+
70+
$pid = $process->start();
71+
$this->workers[$pid] = $process;
3272
}
3373

34-
public function workerStart(callable $callback): self
74+
protected function reap(): void
3575
{
36-
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
37-
call_user_func($callback, $workerId);
38-
});
76+
while (($ret = Process::wait(false)) !== false) {
77+
unset($this->workers[$ret['pid']]);
78+
}
79+
}
3980

81+
public function stop(): self
82+
{
83+
foreach ($this->workers as $pid => $process) {
84+
Process::kill($pid, SIGTERM);
85+
}
4086
return $this;
4187
}
4288

43-
public function workerStop(callable $callback): self
89+
public function workerStart(callable $callback): self
4490
{
45-
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
46-
call_user_func($callback, $workerId);
47-
});
48-
91+
$this->onWorkerStart[] = $callback;
4992
return $this;
5093
}
5194

52-
public function getNative(): Pool
95+
public function workerStop(callable $callback): self
5396
{
54-
return $this->pool;
97+
$this->onWorkerStop[] = $callback;
98+
return $this;
5599
}
56100
}

src/Queue/Adapter/Workerman.php

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,4 @@ public function workerStop(callable $callback): self
4747

4848
return $this;
4949
}
50-
51-
public function getNative(): Worker
52-
{
53-
return $this->worker;
54-
}
5550
}

src/Queue/Broker/AMQP.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
111111
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
112112

113113
// 2. Declare the working queue and configure the DLX for receiving rejected messages.
114-
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"])));
114+
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ['x-dead-letter-exchange' => "{$queue->namespace}.failed"])));
115115
$channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name);
116116

117117
// 3. Declare the dead-letter-queue and bind it to the DLX.
@@ -131,6 +131,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
131131

132132
public function close(): void
133133
{
134+
$this->channel?->stopConsume();
134135
$this->channel?->getConnection()?->close();
135136
}
136137

@@ -161,7 +162,7 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
161162
{
162163
$queueName = $queue->name;
163164
if ($failedJobs) {
164-
$queueName = $queueName . ".failed";
165+
$queueName = $queueName . '.failed';
165166
}
166167

167168
$client = new Client();

src/Queue/Broker/Pool.php

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,36 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
3030
return $this->delegatePublish(__FUNCTION__, \func_get_args());
3131
}
3232

33-
public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
34-
{
33+
public function consume(
34+
Queue $queue,
35+
callable $messageCallback,
36+
callable $successCallback,
37+
callable $errorCallback,
38+
): void {
3539
$this->delegateConsumer(__FUNCTION__, \func_get_args());
3640
}
3741

3842
public function close(): void
3943
{
40-
$this->delegateConsumer(__FUNCTION__, \func_get_args());
44+
// TODO: Implement closing all connections in the pool
4145
}
4246

4347
protected function delegatePublish(string $method, array $args): mixed
4448
{
45-
return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) {
49+
return $this->publisher?->use(function (Publisher $adapter) use (
50+
$method,
51+
$args,
52+
) {
4653
return $adapter->$method(...$args);
4754
});
4855
}
4956

5057
protected function delegateConsumer(string $method, array $args): mixed
5158
{
52-
return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) {
59+
return $this->consumer?->use(function (Consumer $adapter) use (
60+
$method,
61+
$args,
62+
) {
5363
return $adapter->$method(...$args);
5464
});
5565
}

src/Queue/Broker/Redis.php

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
class Redis implements Publisher, Consumer
1212
{
13+
private const int POP_TIMEOUT = 2;
14+
1315
private bool $closed = false;
1416

1517
public function __construct(private readonly Connection $connection)
@@ -22,7 +24,15 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
2224
/**
2325
* Waiting for next Job.
2426
*/
25-
$nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", 5);
27+
try {
28+
$nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT);
29+
} catch (\RedisException $e) {
30+
if ($this->closed) {
31+
break;
32+
}
33+
34+
throw $e;
35+
}
2636

2737
if (!$nextMessage) {
2838
continue;
@@ -115,7 +125,7 @@ public function retry(Queue $queue, ?int $limit = null): void
115125
$processed = 0;
116126

117127
while (true) {
118-
$pid = $this->connection->rightPop("{$queue->namespace}.failed.{$queue->name}", 5);
128+
$pid = $this->connection->rightPop("{$queue->namespace}.failed.{$queue->name}", self::POP_TIMEOUT);
119129

120130
// No more jobs to retry
121131
if ($pid === false) {

src/Queue/Connection.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ public function setArray(string $key, array $value): bool;
2525
public function increment(string $key): int;
2626
public function decrement(string $key): int;
2727
public function ping(): bool;
28+
public function close(): void;
2829
}

src/Queue/Connection/Redis.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,12 @@ public function ping(): bool
169169
}
170170
}
171171

172+
public function close(): void
173+
{
174+
$this->redis?->close();
175+
$this->redis = null;
176+
}
177+
172178
protected function getRedis(): \Redis
173179
{
174180
if ($this->redis) {

0 commit comments

Comments
 (0)