From e8635c8a0019fdaaf21d76472e3d881b511feb6e Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 17 Apr 2025 19:35:28 +1200 Subject: [PATCH 1/9] Add pool adapter --- composer.json | 1 + composer.lock | 110 +++++++++++++++++++++++++++---------- src/Queue/Adapter/Pool.php | 59 ++++++++++++++++++++ 3 files changed, 141 insertions(+), 29 deletions(-) create mode 100644 src/Queue/Adapter/Pool.php diff --git a/composer.json b/composer.json index 6252380..7de9530 100644 --- a/composer.json +++ b/composer.json @@ -29,6 +29,7 @@ "utopia-php/cli": "0.15.*", "utopia-php/framework": "0.33.*", "utopia-php/telemetry": "0.1.*", + "utopia-php/pools": "0.8.*", "utopia-php/fetch": "0.4.*" }, "require-dev": { diff --git a/composer.lock b/composer.lock index 0b9eb3c..126551c 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "77db808700b06a060b72e56ba5d9ac91", + "content-hash": "574ad3b103f97c1668af99674784aae8", "packages": [ { "name": "brick/math", @@ -593,16 +593,16 @@ }, { "name": "open-telemetry/sdk", - "version": "1.2.2", + "version": "1.2.4", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/sdk.git", - "reference": "37eec0fe47ddd627911f318f29b6cd48196be0c0" + "reference": "47fcb66ae5328c5a799195247b1dce551d85873e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/37eec0fe47ddd627911f318f29b6cd48196be0c0", - "reference": "37eec0fe47ddd627911f318f29b6cd48196be0c0", + "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/47fcb66ae5328c5a799195247b1dce551d85873e", + "reference": "47fcb66ae5328c5a799195247b1dce551d85873e", "shasum": "" }, "require": { @@ -679,7 +679,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-01-29T21:40:28+00:00" + "time": "2025-04-15T07:02:07+00:00" }, { "name": "open-telemetry/sem-conv", @@ -2037,16 +2037,16 @@ }, { "name": "tbachert/spi", - "version": "v1.0.2", + "version": "v1.0.3", "source": { "type": "git", "url": "https://github.com/Nevay/spi.git", - "reference": "2ddfaf815dafb45791a61b08170de8d583c16062" + "reference": "506a79c98e1a51522e76ee921ccb6c62d52faf3a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/Nevay/spi/zipball/2ddfaf815dafb45791a61b08170de8d583c16062", - "reference": "2ddfaf815dafb45791a61b08170de8d583c16062", + "url": "https://api.github.com/repos/Nevay/spi/zipball/506a79c98e1a51522e76ee921ccb6c62d52faf3a", + "reference": "506a79c98e1a51522e76ee921ccb6c62d52faf3a", "shasum": "" }, "require": { @@ -2083,22 +2083,22 @@ ], "support": { "issues": "https://github.com/Nevay/spi/issues", - "source": "https://github.com/Nevay/spi/tree/v1.0.2" + "source": "https://github.com/Nevay/spi/tree/v1.0.3" }, - "time": "2024-10-04T16:36:12+00:00" + "time": "2025-04-02T19:38:14+00:00" }, { "name": "utopia-php/cli", - "version": "0.15.1", + "version": "0.15.2", "source": { "type": "git", "url": "https://github.com/utopia-php/cli.git", - "reference": "d69bbe51a6a94dc4e5bcdd542b5938038b985a65" + "reference": "da00ff6b8b29a826a1794002ae43442cdf3a0f5f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/cli/zipball/d69bbe51a6a94dc4e5bcdd542b5938038b985a65", - "reference": "d69bbe51a6a94dc4e5bcdd542b5938038b985a65", + "url": "https://api.github.com/repos/utopia-php/cli/zipball/da00ff6b8b29a826a1794002ae43442cdf3a0f5f", + "reference": "da00ff6b8b29a826a1794002ae43442cdf3a0f5f", "shasum": "" }, "require": { @@ -2132,9 +2132,9 @@ ], "support": { "issues": "https://github.com/utopia-php/cli/issues", - "source": "https://github.com/utopia-php/cli/tree/0.15.1" + "source": "https://github.com/utopia-php/cli/tree/0.15.2" }, - "time": "2024-10-04T13:55:36+00:00" + "time": "2025-04-15T10:08:48+00:00" }, { "name": "utopia-php/compression", @@ -2184,16 +2184,16 @@ }, { "name": "utopia-php/fetch", - "version": "0.4.0", + "version": "0.4.1", "source": { "type": "git", "url": "https://github.com/utopia-php/fetch.git", - "reference": "46e791ff6a95864517750b9df6bbf4a17e3c9c4e" + "reference": "65095dac14037db0c822fb5e209e5bd3187a0303" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/fetch/zipball/46e791ff6a95864517750b9df6bbf4a17e3c9c4e", - "reference": "46e791ff6a95864517750b9df6bbf4a17e3c9c4e", + "url": "https://api.github.com/repos/utopia-php/fetch/zipball/65095dac14037db0c822fb5e209e5bd3187a0303", + "reference": "65095dac14037db0c822fb5e209e5bd3187a0303", "shasum": "" }, "require": { @@ -2217,9 +2217,9 @@ "description": "A simple library that provides an interface for making HTTP Requests.", "support": { "issues": "https://github.com/utopia-php/fetch/issues", - "source": "https://github.com/utopia-php/fetch/tree/0.4.0" + "source": "https://github.com/utopia-php/fetch/tree/0.4.1" }, - "time": "2025-03-11T21:06:56+00:00" + "time": "2025-04-14T07:34:27+00:00" }, { "name": "utopia-php/framework", @@ -2268,6 +2268,58 @@ }, "time": "2025-03-06T11:37:49+00:00" }, + { + "name": "utopia-php/pools", + "version": "0.8.2", + "source": { + "type": "git", + "url": "https://github.com/utopia-php/pools.git", + "reference": "05c67aba42eb68ac65489cc1e7fc5db83db2dd4d" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/utopia-php/pools/zipball/05c67aba42eb68ac65489cc1e7fc5db83db2dd4d", + "reference": "05c67aba42eb68ac65489cc1e7fc5db83db2dd4d", + "shasum": "" + }, + "require": { + "php": ">=8.3", + "utopia-php/telemetry": "0.1.*" + }, + "require-dev": { + "laravel/pint": "1.*", + "phpstan/phpstan": "1.*", + "phpunit/phpunit": "11.*" + }, + "type": "library", + "autoload": { + "psr-4": { + "Utopia\\Pools\\": "src/Pools" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Team Appwrite", + "email": "team@appwrite.io" + } + ], + "description": "A simple library to manage connection pools", + "keywords": [ + "framework", + "php", + "pools", + "utopia" + ], + "support": { + "issues": "https://github.com/utopia-php/pools/issues", + "source": "https://github.com/utopia-php/pools/tree/0.8.2" + }, + "time": "2025-04-17T02:04:54+00:00" + }, { "name": "utopia-php/telemetry", "version": "0.1.1", @@ -2694,16 +2746,16 @@ }, { "name": "phpstan/phpstan", - "version": "1.12.23", + "version": "1.12.24", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "29201e7a743a6ab36f91394eab51889a82631428" + "reference": "338b92068f58d9f8035b76aed6cf2b9e5624c025" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/29201e7a743a6ab36f91394eab51889a82631428", - "reference": "29201e7a743a6ab36f91394eab51889a82631428", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/338b92068f58d9f8035b76aed6cf2b9e5624c025", + "reference": "338b92068f58d9f8035b76aed6cf2b9e5624c025", "shasum": "" }, "require": { @@ -2748,7 +2800,7 @@ "type": "github" } ], - "time": "2025-03-23T14:57:32+00:00" + "time": "2025-04-16T13:01:53+00:00" }, { "name": "phpunit/php-code-coverage", diff --git a/src/Queue/Adapter/Pool.php b/src/Queue/Adapter/Pool.php new file mode 100644 index 0000000..85d6c81 --- /dev/null +++ b/src/Queue/Adapter/Pool.php @@ -0,0 +1,59 @@ + + */ + protected UtopiaPool $pool; + + public function __construct( + UtopiaPool $pool, + int $workerNum, + string $queue, + string $namespace = 'utopia-queue' + ) + { + parent::__construct($workerNum, $queue, $namespace); + + $this->pool = $pool; + } + + protected function delegate(string $method, array $args): mixed + { + return $this->pool->use(function (Adapter $adapter) use ($method, $args) { + return $adapter->$method(...$args); + }); + } + + public function start(): self + { + return $this->delegate(__FUNCTION__, func_get_args()); + } + + public function stop(): self + { + return $this->delegate(__FUNCTION__, func_get_args()); + } + + public function workerStart(callable $callback): self + { + return $this->delegate(__FUNCTION__, func_get_args()); + } + + public function workerStop(callable $callback): self + { + return $this->delegate(__FUNCTION__, func_get_args()); + } + + public function getNative(): mixed + { + return $this->delegate(__FUNCTION__, func_get_args()); + } +} \ No newline at end of file From 05bfd423c526827ad185f6e489f25179fe65ef88 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 17 Apr 2025 20:01:55 +1200 Subject: [PATCH 2/9] Add pool test --- tests/Queue/E2E/Adapter/PoolTest.php | 23 +++++++++++++++++++ .../E2E/Adapter/SwooleRedisClusterTest.php | 2 +- 2 files changed, 24 insertions(+), 1 deletion(-) create mode 100644 tests/Queue/E2E/Adapter/PoolTest.php diff --git a/tests/Queue/E2E/Adapter/PoolTest.php b/tests/Queue/E2E/Adapter/PoolTest.php new file mode 100644 index 0000000..966fa8c --- /dev/null +++ b/tests/Queue/E2E/Adapter/PoolTest.php @@ -0,0 +1,23 @@ + Date: Thu, 17 Apr 2025 20:02:06 +1200 Subject: [PATCH 3/9] Fix syntax --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 94354a4..565cb52 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM composer as composer +FROM composer AS composer WORKDIR /usr/local/src/ From 62097dfe43d6f2854522c1dc3af47ca667deb531 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 17 Apr 2025 20:28:31 +1200 Subject: [PATCH 4/9] Pool broker --- src/Queue/Broker/Pool.php | 57 ++++++++++++++++++++++++++++ tests/Queue/E2E/Adapter/PoolTest.php | 9 ++++- 2 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 src/Queue/Broker/Pool.php diff --git a/src/Queue/Broker/Pool.php b/src/Queue/Broker/Pool.php new file mode 100644 index 0000000..85536c5 --- /dev/null +++ b/src/Queue/Broker/Pool.php @@ -0,0 +1,57 @@ +publisherPool->use(function (Publisher $adapter) use ($method, $args) { + return $adapter->$method(...$args); + }); + } + + protected function delegateConsumer(string $method, array $args): mixed + { + return $this->consumerPool->use(function (Consumer $adapter) use ($method, $args) { + return $adapter->$method(...$args); + }); + } + + public function enqueue(Queue $queue, array $payload): bool + { + return $this->delegatePublish(__FUNCTION__, \func_get_args()); + } + + public function retry(Queue $queue, ?int $limit = null): void + { + $this->delegatePublish(__FUNCTION__, \func_get_args()); + } + + public function getQueueSize(Queue $queue, bool $failedJobs = false): int + { + return $this->delegatePublish(__FUNCTION__, \func_get_args()); + } + + public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void + { + $this->delegateConsumer(__FUNCTION__, \func_get_args()); + } + + public function close(): void + { + $this->delegateConsumer(__FUNCTION__, \func_get_args()); + } +} \ No newline at end of file diff --git a/tests/Queue/E2E/Adapter/PoolTest.php b/tests/Queue/E2E/Adapter/PoolTest.php index 966fa8c..8421a56 100644 --- a/tests/Queue/E2E/Adapter/PoolTest.php +++ b/tests/Queue/E2E/Adapter/PoolTest.php @@ -3,6 +3,8 @@ namespace Tests\E2E\Adapter; use Tests\E2E\Adapter\Base; +use Utopia\Pools\Pool as UtopiaPool; +use Utopia\Queue\Broker\Pool; use Utopia\Queue\Broker\Redis as RedisBroker; use Utopia\Queue\Connection\Redis; use Utopia\Queue\Publisher; @@ -12,8 +14,11 @@ class PoolTest extends Base { protected function getPublisher(): Publisher { - $connection = new Redis('redis', 6379); - return new RedisBroker($connection); + $pool = new UtopiaPool('redis', 1, function () { + return new RedisBroker(new Redis('redis', 6379)); + }); + + return new Pool($pool, $pool); } protected function getQueue(): Queue From 80cc1fb1053950f977f0d027bb4a7f7c91c1e794 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 17 Apr 2025 20:43:02 +1200 Subject: [PATCH 5/9] Update constructor --- src/Queue/Broker/Pool.php | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/Queue/Broker/Pool.php b/src/Queue/Broker/Pool.php index 85536c5..aed5eee 100644 --- a/src/Queue/Broker/Pool.php +++ b/src/Queue/Broker/Pool.php @@ -10,26 +10,12 @@ readonly class Pool implements Publisher, Consumer { public function __construct( - private UtopiaPool $publisherPool, - private UtopiaPool $consumerPool, + private ?UtopiaPool $publisher = null, + private ?UtopiaPool $consumer = null, ) { } - protected function delegatePublish(string $method, array $args): mixed - { - return $this->publisherPool->use(function (Publisher $adapter) use ($method, $args) { - return $adapter->$method(...$args); - }); - } - - protected function delegateConsumer(string $method, array $args): mixed - { - return $this->consumerPool->use(function (Consumer $adapter) use ($method, $args) { - return $adapter->$method(...$args); - }); - } - public function enqueue(Queue $queue, array $payload): bool { return $this->delegatePublish(__FUNCTION__, \func_get_args()); @@ -54,4 +40,18 @@ public function close(): void { $this->delegateConsumer(__FUNCTION__, \func_get_args()); } + + protected function delegatePublish(string $method, array $args): mixed + { + return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) { + return $adapter->$method(...$args); + }); + } + + protected function delegateConsumer(string $method, array $args): mixed + { + return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) { + return $adapter->$method(...$args); + }); + } } \ No newline at end of file From b192e36f646d3f5da80f70e3a69e0065b23a3612 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 17 Apr 2025 23:55:59 +1200 Subject: [PATCH 6/9] Remove top level --- src/Queue/Adapter/Pool.php | 59 ------------------- src/Queue/Broker/Pool.php | 5 +- tests/Queue/E2E/Adapter/PoolTest.php | 1 - .../E2E/Adapter/SwooleRedisClusterTest.php | 1 - 4 files changed, 2 insertions(+), 64 deletions(-) delete mode 100644 src/Queue/Adapter/Pool.php diff --git a/src/Queue/Adapter/Pool.php b/src/Queue/Adapter/Pool.php deleted file mode 100644 index 85d6c81..0000000 --- a/src/Queue/Adapter/Pool.php +++ /dev/null @@ -1,59 +0,0 @@ - - */ - protected UtopiaPool $pool; - - public function __construct( - UtopiaPool $pool, - int $workerNum, - string $queue, - string $namespace = 'utopia-queue' - ) - { - parent::__construct($workerNum, $queue, $namespace); - - $this->pool = $pool; - } - - protected function delegate(string $method, array $args): mixed - { - return $this->pool->use(function (Adapter $adapter) use ($method, $args) { - return $adapter->$method(...$args); - }); - } - - public function start(): self - { - return $this->delegate(__FUNCTION__, func_get_args()); - } - - public function stop(): self - { - return $this->delegate(__FUNCTION__, func_get_args()); - } - - public function workerStart(callable $callback): self - { - return $this->delegate(__FUNCTION__, func_get_args()); - } - - public function workerStop(callable $callback): self - { - return $this->delegate(__FUNCTION__, func_get_args()); - } - - public function getNative(): mixed - { - return $this->delegate(__FUNCTION__, func_get_args()); - } -} \ No newline at end of file diff --git a/src/Queue/Broker/Pool.php b/src/Queue/Broker/Pool.php index aed5eee..8fcf5f0 100644 --- a/src/Queue/Broker/Pool.php +++ b/src/Queue/Broker/Pool.php @@ -12,8 +12,7 @@ public function __construct( private ?UtopiaPool $publisher = null, private ?UtopiaPool $consumer = null, - ) - { + ) { } public function enqueue(Queue $queue, array $payload): bool @@ -54,4 +53,4 @@ protected function delegateConsumer(string $method, array $args): mixed return $adapter->$method(...$args); }); } -} \ No newline at end of file +} diff --git a/tests/Queue/E2E/Adapter/PoolTest.php b/tests/Queue/E2E/Adapter/PoolTest.php index 8421a56..8c02d2b 100644 --- a/tests/Queue/E2E/Adapter/PoolTest.php +++ b/tests/Queue/E2E/Adapter/PoolTest.php @@ -2,7 +2,6 @@ namespace Tests\E2E\Adapter; -use Tests\E2E\Adapter\Base; use Utopia\Pools\Pool as UtopiaPool; use Utopia\Queue\Broker\Pool; use Utopia\Queue\Broker\Redis as RedisBroker; diff --git a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php index 9f7a964..b1a744c 100644 --- a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php +++ b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php @@ -2,7 +2,6 @@ namespace Tests\E2E\Adapter; -use Tests\E2E\Adapter\Base; use Utopia\Queue\Broker\Redis; use Utopia\Queue\Connection\RedisCluster; use Utopia\Queue\Publisher; From 13ad11e65cb29323c25a31a4bf93c4c1113709c0 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 17 Apr 2025 23:58:10 +1200 Subject: [PATCH 7/9] Add pool test to CI --- .github/workflows/tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 6391332..ed5bb80 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,6 +16,7 @@ jobs: adapter: [ AMQP, + Pool, SwooleRedisCluster, Swoole, Workerman, From c3f3476450824a17c9d175df264b55a13de745ad Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 18 Apr 2025 00:11:16 +1200 Subject: [PATCH 8/9] Fix namespace --- tests/Queue/E2E/Adapter/AMQPTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Queue/E2E/Adapter/AMQPTest.php b/tests/Queue/E2E/Adapter/AMQPTest.php index dac71bd..de36ea5 100644 --- a/tests/Queue/E2E/Adapter/AMQPTest.php +++ b/tests/Queue/E2E/Adapter/AMQPTest.php @@ -1,6 +1,6 @@ Date: Fri, 18 Apr 2025 00:12:01 +1200 Subject: [PATCH 9/9] Format --- tests/Queue/E2E/Adapter/AMQPTest.php | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/Queue/E2E/Adapter/AMQPTest.php b/tests/Queue/E2E/Adapter/AMQPTest.php index de36ea5..e8557a3 100644 --- a/tests/Queue/E2E/Adapter/AMQPTest.php +++ b/tests/Queue/E2E/Adapter/AMQPTest.php @@ -2,7 +2,6 @@ namespace Tests\E2E\Adapter; -use Tests\E2E\Adapter\Base; use Utopia\Queue\Broker\AMQP; use Utopia\Queue\Publisher; use Utopia\Queue\Queue;