From 2fabd54d14a087706faa4d9a9f88e6bd467e1b5b Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Tue, 23 Dec 2025 20:36:02 +0100 Subject: [PATCH] feature: cursor based postgresql adapter - added dsl and builder to cover cursors to postgresql library - added cursor based extractor --- .../components/adapters/postgresql.md | 75 ++++++- .../PostgreSql/PostgreSqlCursorExtractor.php | 135 +++++++++++++ .../Flow/ETL/Adapter/PostgreSql/functions.php | 32 +++ .../Benchmark/PostgreSqlExtractorBench.php | 14 +- ...stgreSqlCursorExtractorIntegrationTest.php | 187 ++++++++++++++++++ ...stgreSqlKeySetExtractorIntegrationTest.php | 84 ++++---- ...SqlLimitOffsetExtractorIntegrationTest.php | 68 +++---- .../Unit/PostgreSqlCursorExtractorTest.php | 98 +++++++++ .../src/Flow/PostgreSql/DSL/functions.php | 74 ++++++- .../Cursor/CloseCursorBuilder.php | 36 ++++ .../Cursor/CloseCursorFinalStep.php | 11 ++ .../QueryBuilder/Cursor/CursorOption.php | 30 +++ .../Cursor/DeclareCursorBuilder.php | 92 +++++++++ .../Cursor/DeclareCursorFinalStep.php | 11 ++ .../Cursor/DeclareCursorOptionsStep.php | 29 +++ .../Cursor/FetchCursorBuilder.php | 78 ++++++++ .../Cursor/FetchCursorFinalStep.php | 11 ++ web/landing/resources/dsl.json | 2 +- 18 files changed, 970 insertions(+), 97 deletions(-) create mode 100644 src/adapter/etl-adapter-postgresql/src/Flow/ETL/Adapter/PostgreSql/PostgreSqlCursorExtractor.php create mode 100644 src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Integration/PostgreSqlCursorExtractorIntegrationTest.php create mode 100644 src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Unit/PostgreSqlCursorExtractorTest.php create mode 100644 src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/CloseCursorBuilder.php create mode 100644 src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/CloseCursorFinalStep.php create mode 100644 src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/CursorOption.php create mode 100644 src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/DeclareCursorBuilder.php create mode 100644 src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/DeclareCursorFinalStep.php create mode 100644 src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/DeclareCursorOptionsStep.php create mode 100644 src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/FetchCursorBuilder.php create mode 100644 src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/FetchCursorFinalStep.php diff --git a/documentation/components/adapters/postgresql.md b/documentation/components/adapters/postgresql.md index 3b5108a3f..372795856 100644 --- a/documentation/components/adapters/postgresql.md +++ b/documentation/components/adapters/postgresql.md @@ -33,15 +33,16 @@ This adapter provides: ### Extractors -Two extraction strategies optimized for different use cases: +Three extraction strategies optimized for different use cases: +- **Server-Side Cursor**: True streaming extraction using PostgreSQL DECLARE CURSOR for maximum memory efficiency - **LIMIT/OFFSET Pagination**: Simple pagination suitable for smaller datasets - **Keyset (Cursor) Pagination**: Efficient pagination for large datasets with consistent performance -Both extractors support: +All extractors support: - Raw SQL strings or Query Builder objects -- Configurable page sizes +- Configurable batch/page sizes - Maximum row limits - Custom schema definitions @@ -54,6 +55,73 @@ A flexible loader supporting: - **DELETE**: Delete rows by primary key - **UPSERT**: ON CONFLICT handling for insert-or-update operations +## Extractor - Server-Side Cursor + +The `from_pgsql_cursor` extractor uses PostgreSQL's native server-side cursors via `DECLARE CURSOR` + `FETCH`. +This is the **only way** to achieve true low-memory streaming with PHP's ext-pgsql, as the extension +has no unbuffered query mode. + +> **Note:** This extractor automatically manages transactions. Cursors require a transaction context, +> which is auto-started if not already in one. + +### Basic Usage + +```php +use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor; +use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn}; + +$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database')); + +data_frame() + ->read(from_pgsql_cursor( + $client, + "SELECT id, name, email FROM users", + fetchSize: 1000 + )) + ->write(to_output()) + ->run(); +``` + +### With Query Builder + +```php +use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor; +use function Flow\PostgreSql\DSL\{col, select, star, table}; + +data_frame() + ->read(from_pgsql_cursor( + $client, + select(star())->from(table('large_table')), + fetchSize: 500 + )) + ->write(to_output()) + ->run(); +``` + +### With Parameters + +```php +use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor; + +data_frame() + ->read(from_pgsql_cursor( + $client, + "SELECT * FROM orders WHERE status = $1 AND created_at > $2", + parameters: ['pending', '2024-01-01'], + fetchSize: 1000 + )) + ->write(to_output()) + ->run(); +``` + +### When to Use Each Extractor + +| Extractor | Best For | Memory | ORDER BY Required | +|---------------------------|-------------------------------------|------------------------|-------------------| +| `from_pgsql_cursor` | Very large datasets, true streaming | Lowest (server-side) | No | +| `from_pgsql_key_set` | Large datasets with indexed keys | Medium (page buffered) | Auto-generated | +| `from_pgsql_limit_offset` | Small-medium datasets | Medium (page buffered) | Yes | + ## Extractor - LIMIT/OFFSET Pagination The `from_pgsql_limit_offset` extractor uses traditional LIMIT/OFFSET pagination. This is simple to use but may have @@ -195,6 +263,7 @@ data_frame() | Function | Description | |---------------------------------------------------------------------|---------------------------------------| +| `from_pgsql_cursor($client, $query, $parameters, $fetchSize, $max)` | Extract using server-side cursor | | `from_pgsql_limit_offset($client, $query, $pageSize, $maximum)` | Extract using LIMIT/OFFSET pagination | | `from_pgsql_key_set($client, $query, $keySet, $pageSize, $maximum)` | Extract using keyset pagination | diff --git a/src/adapter/etl-adapter-postgresql/src/Flow/ETL/Adapter/PostgreSql/PostgreSqlCursorExtractor.php b/src/adapter/etl-adapter-postgresql/src/Flow/ETL/Adapter/PostgreSql/PostgreSqlCursorExtractor.php new file mode 100644 index 000000000..2cb9f29bd --- /dev/null +++ b/src/adapter/etl-adapter-postgresql/src/Flow/ETL/Adapter/PostgreSql/PostgreSqlCursorExtractor.php @@ -0,0 +1,135 @@ + $parameters + */ + public function __construct( + private readonly Client $client, + private readonly string|SqlQuery $query, + private readonly array $parameters = [], + ) { + } + + public function extract(FlowContext $context) : \Generator + { + $cursorName = $this->cursorName ?? 'flow_cursor_' . \bin2hex(\random_bytes(8)); + + $ownTransaction = $this->client->getTransactionNestingLevel() === 0; + + if ($ownTransaction) { + $this->client->beginTransaction(); + } + + try { + $this->client->execute( + declare_cursor($cursorName, $this->query), + $this->parameters + ); + + $totalFetched = 0; + + while (true) { + $cursor = $this->client->cursor(fetch($cursorName)->forward($this->fetchSize)); + $hasRows = false; + + foreach ($cursor->iterate() as $row) { + $hasRows = true; + $signal = yield array_to_rows($row, $context->entryFactory(), [], $this->schema); + + if ($signal === Signal::STOP) { + $cursor->free(); + + return; + } + + $totalFetched++; + + if ($this->maximum !== null && $totalFetched >= $this->maximum) { + $cursor->free(); + + return; + } + } + + $cursor->free(); + + if (!$hasRows) { + break; + } + } + } finally { + $this->client->execute(close_cursor($cursorName)); + + if ($ownTransaction) { + $this->client->commit(); + } + } + } + + public function withCursorName(string $cursorName) : self + { + $this->cursorName = $cursorName; + + return $this; + } + + public function withFetchSize(int $fetchSize) : self + { + if ($fetchSize <= 0) { + throw new InvalidArgumentException('Fetch size must be greater than 0, got ' . $fetchSize); + } + + $this->fetchSize = $fetchSize; + + return $this; + } + + public function withMaximum(int $maximum) : self + { + if ($maximum <= 0) { + throw new InvalidArgumentException('Maximum must be greater than 0, got ' . $maximum); + } + + $this->maximum = $maximum; + + return $this; + } + + public function withSchema(Schema $schema) : self + { + $this->schema = $schema; + + return $this; + } +} diff --git a/src/adapter/etl-adapter-postgresql/src/Flow/ETL/Adapter/PostgreSql/functions.php b/src/adapter/etl-adapter-postgresql/src/Flow/ETL/Adapter/PostgreSql/functions.php index 39771553a..67816a2b8 100644 --- a/src/adapter/etl-adapter-postgresql/src/Flow/ETL/Adapter/PostgreSql/functions.php +++ b/src/adapter/etl-adapter-postgresql/src/Flow/ETL/Adapter/PostgreSql/functions.php @@ -10,6 +10,38 @@ use Flow\PostgreSql\Client\Client; use Flow\PostgreSql\QueryBuilder\SqlQuery; +/** + * Create a PostgreSQL cursor extractor using server-side cursors for memory-efficient extraction. + * + * Uses DECLARE CURSOR + FETCH to stream data without loading entire result set into memory. + * This is the only way to achieve true low memory extraction with PHP's ext-pgsql. + * + * Note: Requires a transaction context (auto-started if not in one). + * + * @param Client $client PostgreSQL client + * @param SqlQuery|string $query SQL query to execute (wrapped in DECLARE CURSOR) + * @param array $parameters Positional parameters for the query + * @param int $fetchSize Number of rows to fetch per batch (default: 1000) + * @param null|int $maximum Maximum number of rows to extract (null for unlimited) + */ +#[DocumentationDSL(module: Module::POSTGRESQL, type: DSLType::EXTRACTOR)] +function from_pgsql_cursor( + Client $client, + string|SqlQuery $query, + array $parameters = [], + int $fetchSize = 1000, + ?int $maximum = null, +) : PostgreSqlCursorExtractor { + $extractor = (new PostgreSqlCursorExtractor($client, $query, $parameters)) + ->withFetchSize($fetchSize); + + if ($maximum !== null) { + $extractor->withMaximum($maximum); + } + + return $extractor; +} + #[DocumentationDSL(module: Module::POSTGRESQL, type: DSLType::EXTRACTOR)] function from_pgsql_limit_offset( Client $client, diff --git a/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Benchmark/PostgreSqlExtractorBench.php b/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Benchmark/PostgreSqlExtractorBench.php index 27bbd1ee5..b80050dad 100644 --- a/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Benchmark/PostgreSqlExtractorBench.php +++ b/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Benchmark/PostgreSqlExtractorBench.php @@ -4,7 +4,7 @@ namespace Flow\ETL\Adapter\PostgreSql\Tests\Benchmark; -use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, from_pgsql_limit_offset, pgsql_pagination_key_asc, pgsql_pagination_key_set, to_pgsql_table}; +use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_cursor, from_pgsql_key_set, from_pgsql_limit_offset, pgsql_pagination_key_asc, pgsql_pagination_key_set, to_pgsql_table}; use function Flow\ETL\DSL\{config, df, flow_context}; use function Flow\PostgreSql\DSL\{asc, col, column, create, data_type_double_precision, data_type_integer, data_type_jsonb, data_type_text, data_type_timestamptz, data_type_uuid, drop, pgsql_client, pgsql_connection_dsn, pgsql_mapper, select, star, table}; use Flow\ETL\Tests\Double\FakeStaticOrdersExtractor; @@ -50,6 +50,18 @@ public function __destruct() $this->client->close(); } + public function bench_extract_10k_cursor() : void + { + $context = flow_context(config()); + + foreach (from_pgsql_cursor( + $this->client, + select(star())->from(table(self::TABLE_NAME)), + fetchSize: 1000 + )->extract($context) as $rows) { + } + } + public function bench_extract_10k_keyset() : void { $context = flow_context(config()); diff --git a/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Integration/PostgreSqlCursorExtractorIntegrationTest.php b/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Integration/PostgreSqlCursorExtractorIntegrationTest.php new file mode 100644 index 000000000..3a4be96f1 --- /dev/null +++ b/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Integration/PostgreSqlCursorExtractorIntegrationTest.php @@ -0,0 +1,187 @@ +client->execute( + drop()->table($this->tableName)->ifExists()->cascade() + ); + + $this->client->execute( + create()->table($this->tableName) + ->column(column('id', data_type_integer())->primaryKey()) + ->column(column('name', data_type_text())) + ); + + $this->insertTestData(25); + } + + protected function tearDown() : void + { + if (isset($this->client)) { + $this->client->execute( + drop()->table($this->tableName)->ifExists()->cascade() + ); + } + + parent::tearDown(); + } + + public function test_extracts_all_rows_with_cursor() : void + { + $rows = df() + ->read(from_pgsql_cursor( + $this->client, + select(col('id'), col('name'))->from(table($this->tableName))->orderBy(asc(col('id'))), + fetchSize: 5 + )) + ->fetch() + ->toArray(); + + self::assertCount(25, $rows); + self::assertSame(1, $rows[0]['id']); + self::assertSame(25, $rows[24]['id']); + self::assertSame(\range(1, 25), \array_column($rows, 'id')); + } + + public function test_extracts_all_rows_without_order_by() : void + { + $rows = df() + ->read(from_pgsql_cursor( + $this->client, + select(col('id'), col('name'))->from(table($this->tableName)), + fetchSize: 5 + )) + ->fetch() + ->toArray(); + + self::assertCount(25, $rows); + } + + public function test_extracts_limited_rows_with_maximum() : void + { + $rows = df() + ->read(from_pgsql_cursor( + $this->client, + select(col('id'), col('name'))->from(table($this->tableName))->orderBy(asc(col('id'))), + fetchSize: 5, + maximum: 12 + )) + ->fetch() + ->toArray(); + + self::assertCount(12, $rows); + self::assertSame(1, $rows[0]['id']); + self::assertSame(12, $rows[11]['id']); + self::assertSame(\range(1, 12), \array_column($rows, 'id')); + } + + public function test_extracts_with_custom_cursor_name() : void + { + $rows = df() + ->read(from_pgsql_cursor( + $this->client, + select(col('id'), col('name'))->from(table($this->tableName))->orderBy(asc(col('id'))), + fetchSize: 5 + )->withCursorName('my_custom_cursor')) + ->fetch() + ->toArray(); + + self::assertCount(25, $rows); + self::assertSame(1, $rows[0]['id']); + self::assertSame(25, $rows[24]['id']); + self::assertSame(\range(1, 25), \array_column($rows, 'id')); + } + + public function test_extracts_with_custom_fetch_size() : void + { + $rows = df() + ->read(from_pgsql_cursor( + $this->client, + select(col('id'), col('name'))->from(table($this->tableName))->orderBy(asc(col('id'))), + fetchSize: 3 + )) + ->fetch() + ->toArray(); + + self::assertCount(25, $rows); + self::assertSame(1, $rows[0]['id']); + self::assertSame(25, $rows[24]['id']); + self::assertSame(\range(1, 25), \array_column($rows, 'id')); + } + + public function test_extracts_with_parameterized_query() : void + { + $rows = df() + ->read(from_pgsql_cursor( + $this->client, + 'SELECT id, name FROM ' . $this->tableName . ' WHERE id > $1 ORDER BY id', + parameters: [10], + fetchSize: 5 + )) + ->fetch() + ->toArray(); + + self::assertCount(15, $rows); + self::assertSame(11, $rows[0]['id']); + self::assertSame(25, $rows[14]['id']); + self::assertSame(\range(11, 25), \array_column($rows, 'id')); + } + + public function test_extracts_with_raw_sql() : void + { + $rows = df() + ->read(from_pgsql_cursor( + $this->client, + 'SELECT id, name FROM ' . $this->tableName . ' ORDER BY id', + fetchSize: 5 + )) + ->fetch() + ->toArray(); + + self::assertCount(25, $rows); + self::assertSame(1, $rows[0]['id']); + self::assertSame(25, $rows[24]['id']); + self::assertSame(\range(1, 25), \array_column($rows, 'id')); + } + + public function test_returns_empty_for_empty_table() : void + { + $this->client->execute(delete()->from($this->tableName)); + + $rows = df() + ->read(from_pgsql_cursor( + $this->client, + select(star())->from(table($this->tableName)) + )) + ->fetch() + ->toArray(); + + self::assertSame([], $rows); + } + + private function insertTestData(int $count) : void + { + $insert = insert()->into($this->tableName)->columns('id', 'name'); + + for ($i = 1; $i <= $count; $i++) { + $insert = $insert->values(literal($i), literal(\sprintf('User_%02d', $i))); + } + + $this->client->execute($insert); + } +} diff --git a/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Integration/PostgreSqlKeySetExtractorIntegrationTest.php b/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Integration/PostgreSqlKeySetExtractorIntegrationTest.php index 5528e6b23..1de9e2610 100644 --- a/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Integration/PostgreSqlKeySetExtractorIntegrationTest.php +++ b/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Integration/PostgreSqlKeySetExtractorIntegrationTest.php @@ -27,14 +27,7 @@ protected function setUp() : void ->column(column('name', data_type_text())) ); - $this->client->execute( - insert()->into($this->tableName)->columns('id', 'name') - ->values(literal(1), literal('Alice')) - ->values(literal(2), literal('Bob')) - ->values(literal(3), literal('Charlie')) - ->values(literal(4), literal('David')) - ->values(literal(5), literal('Eve')) - ); + $this->insertTestData(25); } protected function tearDown() : void @@ -55,21 +48,15 @@ public function test_extracts_all_rows_with_keyset_pagination() : void $this->client, select(col('id'), col('name'))->from(table($this->tableName)), pgsql_pagination_key_set(pgsql_pagination_key_asc('id')), - pageSize: 2 + pageSize: 5 )) ->fetch() ->toArray(); - self::assertSame( - [ - ['id' => 1, 'name' => 'Alice'], - ['id' => 2, 'name' => 'Bob'], - ['id' => 3, 'name' => 'Charlie'], - ['id' => 4, 'name' => 'David'], - ['id' => 5, 'name' => 'Eve'], - ], - $rows - ); + self::assertCount(25, $rows); + self::assertSame(1, $rows[0]['id']); + self::assertSame(25, $rows[24]['id']); + self::assertSame(\range(1, 25), \array_column($rows, 'id')); } public function test_extracts_limited_rows_with_maximum() : void @@ -79,20 +66,16 @@ public function test_extracts_limited_rows_with_maximum() : void $this->client, select(col('id'), col('name'))->from(table($this->tableName)), pgsql_pagination_key_set(pgsql_pagination_key_asc('id')), - pageSize: 2, - maximum: 3 + pageSize: 5, + maximum: 12 )) ->fetch() ->toArray(); - self::assertSame( - [ - ['id' => 1, 'name' => 'Alice'], - ['id' => 2, 'name' => 'Bob'], - ['id' => 3, 'name' => 'Charlie'], - ], - $rows - ); + self::assertCount(12, $rows); + self::assertSame(1, $rows[0]['id']); + self::assertSame(12, $rows[11]['id']); + self::assertSame(\range(1, 12), \array_column($rows, 'id')); } public function test_extracts_with_descending_order() : void @@ -102,21 +85,15 @@ public function test_extracts_with_descending_order() : void $this->client, select(col('id'), col('name'))->from(table($this->tableName)), pgsql_pagination_key_set(pgsql_pagination_key_desc('id')), - pageSize: 2 + pageSize: 5 )) ->fetch() ->toArray(); - self::assertSame( - [ - ['id' => 5, 'name' => 'Eve'], - ['id' => 4, 'name' => 'David'], - ['id' => 3, 'name' => 'Charlie'], - ['id' => 2, 'name' => 'Bob'], - ['id' => 1, 'name' => 'Alice'], - ], - $rows - ); + self::assertCount(25, $rows); + self::assertSame(25, $rows[0]['id']); + self::assertSame(1, $rows[24]['id']); + self::assertSame(\range(25, 1), \array_column($rows, 'id')); } public function test_extracts_with_raw_sql() : void @@ -126,21 +103,15 @@ public function test_extracts_with_raw_sql() : void $this->client, 'SELECT id, name FROM ' . $this->tableName, pgsql_pagination_key_set(pgsql_pagination_key_asc('id')), - pageSize: 10 + pageSize: 5 )) ->fetch() ->toArray(); - self::assertSame( - [ - ['id' => 1, 'name' => 'Alice'], - ['id' => 2, 'name' => 'Bob'], - ['id' => 3, 'name' => 'Charlie'], - ['id' => 4, 'name' => 'David'], - ['id' => 5, 'name' => 'Eve'], - ], - $rows - ); + self::assertCount(25, $rows); + self::assertSame(1, $rows[0]['id']); + self::assertSame(25, $rows[24]['id']); + self::assertSame(\range(1, 25), \array_column($rows, 'id')); } public function test_returns_empty_for_empty_table() : void @@ -159,4 +130,15 @@ public function test_returns_empty_for_empty_table() : void self::assertSame([], $rows); } + + private function insertTestData(int $count) : void + { + $insert = insert()->into($this->tableName)->columns('id', 'name'); + + for ($i = 1; $i <= $count; $i++) { + $insert = $insert->values(literal($i), literal(\sprintf('User_%02d', $i))); + } + + $this->client->execute($insert); + } } diff --git a/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Integration/PostgreSqlLimitOffsetExtractorIntegrationTest.php b/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Integration/PostgreSqlLimitOffsetExtractorIntegrationTest.php index 37cb011c2..e29cb0995 100644 --- a/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Integration/PostgreSqlLimitOffsetExtractorIntegrationTest.php +++ b/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Integration/PostgreSqlLimitOffsetExtractorIntegrationTest.php @@ -27,14 +27,7 @@ protected function setUp() : void ->column(column('name', data_type_text())) ); - $this->client->execute( - insert()->into($this->tableName)->columns('id', 'name') - ->values(literal(1), literal('Alice')) - ->values(literal(2), literal('Bob')) - ->values(literal(3), literal('Charlie')) - ->values(literal(4), literal('David')) - ->values(literal(5), literal('Eve')) - ); + $this->insertTestData(25); } protected function tearDown() : void @@ -54,21 +47,15 @@ public function test_extracts_all_rows_with_pagination() : void ->read(from_pgsql_limit_offset( $this->client, select(col('id'), col('name'))->from(table($this->tableName))->orderBy(asc(col('id'))), - pageSize: 2 + pageSize: 5 )) ->fetch() ->toArray(); - self::assertSame( - [ - ['id' => 1, 'name' => 'Alice'], - ['id' => 2, 'name' => 'Bob'], - ['id' => 3, 'name' => 'Charlie'], - ['id' => 4, 'name' => 'David'], - ['id' => 5, 'name' => 'Eve'], - ], - $rows - ); + self::assertCount(25, $rows); + self::assertSame(1, $rows[0]['id']); + self::assertSame(25, $rows[24]['id']); + self::assertSame(\range(1, 25), \array_column($rows, 'id')); } public function test_extracts_limited_rows_with_maximum() : void @@ -77,20 +64,16 @@ public function test_extracts_limited_rows_with_maximum() : void ->read(from_pgsql_limit_offset( $this->client, select(col('id'), col('name'))->from(table($this->tableName))->orderBy(asc(col('id'))), - pageSize: 2, - maximum: 3 + pageSize: 5, + maximum: 12 )) ->fetch() ->toArray(); - self::assertSame( - [ - ['id' => 1, 'name' => 'Alice'], - ['id' => 2, 'name' => 'Bob'], - ['id' => 3, 'name' => 'Charlie'], - ], - $rows - ); + self::assertCount(12, $rows); + self::assertSame(1, $rows[0]['id']); + self::assertSame(12, $rows[11]['id']); + self::assertSame(\range(1, 12), \array_column($rows, 'id')); } public function test_extracts_with_raw_sql() : void @@ -99,21 +82,15 @@ public function test_extracts_with_raw_sql() : void ->read(from_pgsql_limit_offset( $this->client, 'SELECT id, name FROM ' . $this->tableName . ' ORDER BY id', - pageSize: 10 + pageSize: 5 )) ->fetch() ->toArray(); - self::assertSame( - [ - ['id' => 1, 'name' => 'Alice'], - ['id' => 2, 'name' => 'Bob'], - ['id' => 3, 'name' => 'Charlie'], - ['id' => 4, 'name' => 'David'], - ['id' => 5, 'name' => 'Eve'], - ], - $rows - ); + self::assertCount(25, $rows); + self::assertSame(1, $rows[0]['id']); + self::assertSame(25, $rows[24]['id']); + self::assertSame(\range(1, 25), \array_column($rows, 'id')); } public function test_returns_empty_for_empty_table() : void @@ -131,4 +108,15 @@ public function test_returns_empty_for_empty_table() : void self::assertSame([], $rows); } + + private function insertTestData(int $count) : void + { + $insert = insert()->into($this->tableName)->columns('id', 'name'); + + for ($i = 1; $i <= $count; $i++) { + $insert = $insert->values(literal($i), literal(\sprintf('User_%02d', $i))); + } + + $this->client->execute($insert); + } } diff --git a/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Unit/PostgreSqlCursorExtractorTest.php b/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Unit/PostgreSqlCursorExtractorTest.php new file mode 100644 index 000000000..2c15d6e59 --- /dev/null +++ b/src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Unit/PostgreSqlCursorExtractorTest.php @@ -0,0 +1,98 @@ +createClientMock(); + $extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users'); + + $result = $extractor->withFetchSize(500); + + self::assertSame($extractor, $result); + } + + public function test_with_fetch_size_validates_positive_value() : void + { + $client = $this->createClientMock(); + $extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users'); + + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Fetch size must be greater than 0, got 0'); + + $extractor->withFetchSize(0); + } + + public function test_with_fetch_size_validates_positive_value_negative() : void + { + $client = $this->createClientMock(); + $extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users'); + + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Fetch size must be greater than 0, got -10'); + + $extractor->withFetchSize(-10); + } + + public function test_with_maximum_returns_self() : void + { + $client = $this->createClientMock(); + $extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users'); + + $result = $extractor->withMaximum(100); + + self::assertSame($extractor, $result); + } + + public function test_with_maximum_validates_positive_value() : void + { + $client = $this->createClientMock(); + $extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users'); + + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Maximum must be greater than 0, got 0'); + + $extractor->withMaximum(0); + } + + public function test_with_maximum_validates_positive_value_negative() : void + { + $client = $this->createClientMock(); + $extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users'); + + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Maximum must be greater than 0, got -5'); + + $extractor->withMaximum(-5); + } + + public function test_with_schema_returns_self() : void + { + $client = $this->createClientMock(); + $extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users'); + + $schema = new Schema(); + $result = $extractor->withSchema($schema); + + self::assertSame($extractor, $result); + } + + /** + * @return Client&MockObject + */ + private function createClientMock() : Client + { + return $this->createMock(Client::class); + } +} diff --git a/src/lib/postgresql/src/Flow/PostgreSql/DSL/functions.php b/src/lib/postgresql/src/Flow/PostgreSql/DSL/functions.php index c156f2ba9..358a1a0e8 100644 --- a/src/lib/postgresql/src/Flow/PostgreSql/DSL/functions.php +++ b/src/lib/postgresql/src/Flow/PostgreSql/DSL/functions.php @@ -57,6 +57,13 @@ RawCondition, SimilarTo }; +use Flow\PostgreSql\QueryBuilder\Cursor\{ + CloseCursorBuilder, + CloseCursorFinalStep, + DeclareCursorBuilder, + DeclareCursorOptionsStep, + FetchCursorBuilder +}; use Flow\PostgreSql\QueryBuilder\Delete\{DeleteBuilder, DeleteFromStep}; use Flow\PostgreSql\QueryBuilder\Exception\InvalidExpressionException; use Flow\PostgreSql\QueryBuilder\Expression\{ @@ -86,7 +93,7 @@ use Flow\PostgreSql\QueryBuilder\Factory\CopyFactory; use Flow\PostgreSql\QueryBuilder\Insert\{BulkInsert, InsertBuilder, InsertIntoStep}; use Flow\PostgreSql\QueryBuilder\Merge\{MergeBuilder, MergeUsingStep}; -use Flow\PostgreSql\QueryBuilder\QualifiedIdentifier; +use Flow\PostgreSql\QueryBuilder\{QualifiedIdentifier, SqlQuery}; use Flow\PostgreSql\QueryBuilder\Schema\{ColumnDefinition, DataType, ReferentialAction}; use Flow\PostgreSql\QueryBuilder\Schema\Constraint\{CheckConstraint, ForeignKeyConstraint, PrimaryKeyConstraint, UniqueConstraint}; use Flow\PostgreSql\QueryBuilder\Schema\Function\{ @@ -2069,6 +2076,71 @@ function rollback_prepared(string $transactionId) : PreparedTransactionFinalStep return PreparedTransactionBuilder::rollbackPrepared($transactionId); } +/** + * Declare a server-side cursor for a query. + * + * Cursors must be declared within a transaction and provide memory-efficient + * iteration over large result sets via FETCH commands. + * + * Example with query builder: + * declare_cursor('my_cursor', select(star())->from(table('users')))->noScroll() + * Produces: DECLARE my_cursor NO SCROLL CURSOR FOR SELECT * FROM users + * + * Example with raw SQL: + * declare_cursor('my_cursor', 'SELECT * FROM users WHERE active = true')->withHold() + * Produces: DECLARE my_cursor NO SCROLL CURSOR WITH HOLD FOR SELECT * FROM users WHERE active = true + * + * @param string $cursorName Unique cursor name + * @param SelectFinalStep|SqlQuery|string $query Query to iterate over + */ +#[DocumentationDSL(module: Module::PG_QUERY, type: DSLType::HELPER)] +function declare_cursor(string $cursorName, SelectFinalStep|string|SqlQuery $query) : DeclareCursorOptionsStep +{ + if ($query instanceof SelectFinalStep) { + return DeclareCursorBuilder::create($cursorName, $query); + } + + return DeclareCursorBuilder::createFromSql($cursorName, $query); +} + +/** + * Fetch rows from a cursor. + * + * Example: fetch('my_cursor')->forward(100) + * Produces: FETCH FORWARD 100 my_cursor + * + * Example: fetch('my_cursor')->all() + * Produces: FETCH ALL my_cursor + * + * @param string $cursorName Cursor to fetch from + */ +#[DocumentationDSL(module: Module::PG_QUERY, type: DSLType::HELPER)] +function fetch(string $cursorName) : FetchCursorBuilder +{ + return FetchCursorBuilder::create($cursorName); +} + +/** + * Close a cursor. + * + * Example: close_cursor('my_cursor') + * Produces: CLOSE my_cursor + * + * Example: close_cursor() - closes all cursors + * Produces: CLOSE ALL + * + * @param null|string $cursorName Cursor to close, or null to close all + */ +#[DocumentationDSL(module: Module::PG_QUERY, type: DSLType::HELPER)] +function close_cursor(?string $cursorName = null) : CloseCursorFinalStep +{ + if ($cursorName === null) { + return CloseCursorBuilder::closeAll(); + } + + return CloseCursorBuilder::close($cursorName); +} + /** * Create a column definition for CREATE TABLE. * diff --git a/src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/CloseCursorBuilder.php b/src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/CloseCursorBuilder.php new file mode 100644 index 000000000..2cd8e1478 --- /dev/null +++ b/src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/CloseCursorBuilder.php @@ -0,0 +1,36 @@ +setPortalname($this->cursorName); + + return $stmt; + } +} diff --git a/src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/CloseCursorFinalStep.php b/src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/CloseCursorFinalStep.php new file mode 100644 index 000000000..2e38dae52 --- /dev/null +++ b/src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/CloseCursorFinalStep.php @@ -0,0 +1,11 @@ +setSelectStmt($query->toAst()); + + return new self($cursorName, $node); + } + + public static function createFromSql(string $cursorName, string|SqlQuery $query) : DeclareCursorOptionsStep + { + $sql = $query instanceof SqlQuery ? $query->toSql() : $query; + $parser = new Parser(); + $parsed = $parser->parse($sql); + $rawStmts = $parsed->raw()->getStmts(); + + if (\count($rawStmts) === 0) { + throw new \InvalidArgumentException('Query cannot be empty'); + } + + $firstStmt = $rawStmts[0]; + $stmtNode = $firstStmt->getStmt(); + + if ($stmtNode === null) { + throw new \InvalidArgumentException('Invalid query: no statement found'); + } + + return new self($cursorName, $stmtNode); + } + + public function binary() : DeclareCursorOptionsStep + { + $this->options |= CursorOption::BINARY; + + return $this; + } + + public function noScroll() : DeclareCursorOptionsStep + { + $this->options |= CursorOption::NO_SCROLL; + $this->options &= ~CursorOption::SCROLL; + + return $this; + } + + public function scroll() : DeclareCursorOptionsStep + { + $this->options |= CursorOption::SCROLL; + $this->options &= ~CursorOption::NO_SCROLL; + + return $this; + } + + public function toAst() : DeclareCursorStmt + { + $stmt = new DeclareCursorStmt(); + $stmt->setPortalname($this->cursorName); + $stmt->setOptions($this->options); + $stmt->setQuery($this->queryNode); + + return $stmt; + } + + public function withHold() : DeclareCursorOptionsStep + { + $this->options |= CursorOption::HOLD; + + return $this; + } +} diff --git a/src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/DeclareCursorFinalStep.php b/src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/DeclareCursorFinalStep.php new file mode 100644 index 000000000..e1a9372bd --- /dev/null +++ b/src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/DeclareCursorFinalStep.php @@ -0,0 +1,11 @@ +count = 0; + $this->direction = FetchDirection::FETCH_FORWARD; + + return $this; + } + + public function backward(int $count = 1) : self + { + $this->count = $count; + $this->direction = FetchDirection::FETCH_BACKWARD; + + return $this; + } + + public function forward(int $count = 1) : self + { + $this->count = $count; + $this->direction = FetchDirection::FETCH_FORWARD; + + return $this; + } + + public function next() : self + { + $this->count = 1; + $this->direction = FetchDirection::FETCH_FORWARD; + + return $this; + } + + public function prior() : self + { + $this->count = 1; + $this->direction = FetchDirection::FETCH_BACKWARD; + + return $this; + } + + public function toAst() : FetchStmt + { + $stmt = new FetchStmt(); + $stmt->setPortalname($this->cursorName); + $stmt->setDirection($this->direction); + $stmt->setHowMany($this->count); + $stmt->setIsmove(false); + + return $stmt; + } +} diff --git a/src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/FetchCursorFinalStep.php b/src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/FetchCursorFinalStep.php new file mode 100644 index 000000000..8b6d04dad --- /dev/null +++ b/src/lib/postgresql/src/Flow/PostgreSql/QueryBuilder/Cursor/FetchCursorFinalStep.php @@ -0,0 +1,11 @@ +