From a48e3e47c3012e6969398736f706dc2ce79e4938 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 30 May 2025 21:39:26 +1200 Subject: [PATCH 1/2] Add timeout overrides --- src/Queue/Broker/AMQP.php | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 0f78354..9d89618 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -42,6 +42,9 @@ public function __construct( private readonly ?string $password = null, private readonly string $vhost = '/', private readonly int $heartbeat = 0, + private readonly float $connectTimeout = 3.0, + private readonly float $readWriteTimeout = 3.0, + ) { } @@ -94,7 +97,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe $amqpMessage->nack(requeue: true); $errorCallback($message ?? null, $e); } catch (\Throwable $th) { - $amqpMessage->nack(requeue: false); + $amqpMessage->nack(); $errorCallback($message ?? null, $th); } }; @@ -129,9 +132,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe public function close(): void { - if ($this->channel) { - $this->channel->getConnection()?->close(); - } + $this->channel?->getConnection()?->close(); } public function enqueue(Queue $queue, array $payload): bool @@ -184,7 +185,16 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int private function withChannel(callable $callback): void { $createChannel = function (): AMQPChannel { - $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost, heartbeat: $this->heartbeat); + $connection = new AMQPStreamConnection( + $this->host, + $this->port, + $this->user, + $this->password, + $this->vhost, + connection_timeout: $this->connectTimeout, + read_write_timeout: $this->readWriteTimeout, + heartbeat: $this->heartbeat, + ); if (is_callable($this->connectionConfigHook)) { call_user_func($this->connectionConfigHook, $connection); } @@ -201,7 +211,7 @@ private function withChannel(callable $callback): void try { $callback($this->channel); - } catch (\Throwable $th) { + } catch (\Throwable) { // createChannel() might throw, in that case set the channel to `null` first. $this->channel = null; // try creating a new connection once, if this still fails, throw the error From fe8d6dda60037478411b6eb693825d61957a25a1 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 30 May 2025 21:40:08 +1200 Subject: [PATCH 2/2] Format --- src/Queue/Broker/AMQP.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 9d89618..6bf1c6c 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -44,7 +44,6 @@ public function __construct( private readonly int $heartbeat = 0, private readonly float $connectTimeout = 3.0, private readonly float $readWriteTimeout = 3.0, - ) { }