Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 72 additions & 3 deletions documentation/components/adapters/postgresql.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ This adapter provides:

### Extractors

Two extraction strategies optimized for different use cases:
Three extraction strategies optimized for different use cases:

- **Server-Side Cursor**: True streaming extraction using PostgreSQL DECLARE CURSOR for maximum memory efficiency
- **LIMIT/OFFSET Pagination**: Simple pagination suitable for smaller datasets
- **Keyset (Cursor) Pagination**: Efficient pagination for large datasets with consistent performance

Both extractors support:
All extractors support:

- Raw SQL strings or Query Builder objects
- Configurable page sizes
- Configurable batch/page sizes
- Maximum row limits
- Custom schema definitions

Expand All @@ -54,6 +55,73 @@ A flexible loader supporting:
- **DELETE**: Delete rows by primary key
- **UPSERT**: ON CONFLICT handling for insert-or-update operations

## Extractor - Server-Side Cursor

The `from_pgsql_cursor` extractor uses PostgreSQL's native server-side cursors via `DECLARE CURSOR` + `FETCH`.
This is the **only way** to achieve true low-memory streaming with PHP's ext-pgsql, as the extension
has no unbuffered query mode.

> **Note:** This extractor automatically manages transactions. Cursors require a transaction context,
> which is auto-started if not already in one.

### Basic Usage

```php
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor;
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};

$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));

data_frame()
->read(from_pgsql_cursor(
$client,
"SELECT id, name, email FROM users",
fetchSize: 1000
))
->write(to_output())
->run();
```

### With Query Builder

```php
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor;
use function Flow\PostgreSql\DSL\{col, select, star, table};

data_frame()
->read(from_pgsql_cursor(
$client,
select(star())->from(table('large_table')),
fetchSize: 500
))
->write(to_output())
->run();
```

### With Parameters

```php
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor;

data_frame()
->read(from_pgsql_cursor(
$client,
"SELECT * FROM orders WHERE status = $1 AND created_at > $2",
parameters: ['pending', '2024-01-01'],
fetchSize: 1000
))
->write(to_output())
->run();
```

### When to Use Each Extractor

| Extractor | Best For | Memory | ORDER BY Required |
|---------------------------|-------------------------------------|------------------------|-------------------|
| `from_pgsql_cursor` | Very large datasets, true streaming | Lowest (server-side) | No |
| `from_pgsql_key_set` | Large datasets with indexed keys | Medium (page buffered) | Auto-generated |
| `from_pgsql_limit_offset` | Small-medium datasets | Medium (page buffered) | Yes |

## Extractor - LIMIT/OFFSET Pagination

The `from_pgsql_limit_offset` extractor uses traditional LIMIT/OFFSET pagination. This is simple to use but may have
Expand Down Expand Up @@ -195,6 +263,7 @@ data_frame()

| Function | Description |
|---------------------------------------------------------------------|---------------------------------------|
| `from_pgsql_cursor($client, $query, $parameters, $fetchSize, $max)` | Extract using server-side cursor |
| `from_pgsql_limit_offset($client, $query, $pageSize, $maximum)` | Extract using LIMIT/OFFSET pagination |
| `from_pgsql_key_set($client, $query, $keySet, $pageSize, $maximum)` | Extract using keyset pagination |

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\PostgreSql;

use function Flow\ETL\DSL\array_to_rows;
use function Flow\PostgreSql\DSL\{close_cursor, declare_cursor, fetch};
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Extractor\Signal;
use Flow\ETL\{Extractor, FlowContext, Schema};
use Flow\PostgreSql\Client\Client;
use Flow\PostgreSql\QueryBuilder\SqlQuery;

/**
* PostgreSQL extractor using server-side cursors for memory-efficient extraction.
*
* Uses DECLARE CURSOR + FETCH to stream data from PostgreSQL without loading
* the entire result set into memory. This is the only way to achieve true
* low memory extraction with PHP's ext-pgsql.
*
* Note: Requires a transaction context (auto-started if not in one).
*/
final class PostgreSqlCursorExtractor implements Extractor
{
private ?string $cursorName = null;

private int $fetchSize = 1000;

private ?int $maximum = null;

private ?Schema $schema = null;

/**
* @param array<int, mixed> $parameters
*/
public function __construct(
private readonly Client $client,
private readonly string|SqlQuery $query,
private readonly array $parameters = [],
) {
}

public function extract(FlowContext $context) : \Generator
{
$cursorName = $this->cursorName ?? 'flow_cursor_' . \bin2hex(\random_bytes(8));

$ownTransaction = $this->client->getTransactionNestingLevel() === 0;

if ($ownTransaction) {
$this->client->beginTransaction();
}

try {
$this->client->execute(
declare_cursor($cursorName, $this->query),
$this->parameters
);

$totalFetched = 0;

while (true) {
$cursor = $this->client->cursor(fetch($cursorName)->forward($this->fetchSize));
$hasRows = false;

foreach ($cursor->iterate() as $row) {
$hasRows = true;
$signal = yield array_to_rows($row, $context->entryFactory(), [], $this->schema);

if ($signal === Signal::STOP) {
$cursor->free();

return;
}

$totalFetched++;

if ($this->maximum !== null && $totalFetched >= $this->maximum) {
$cursor->free();

return;
}
}

$cursor->free();

if (!$hasRows) {
break;
}
}
} finally {
$this->client->execute(close_cursor($cursorName));

if ($ownTransaction) {
$this->client->commit();
}
}
}

public function withCursorName(string $cursorName) : self
{
$this->cursorName = $cursorName;

return $this;
}

public function withFetchSize(int $fetchSize) : self
{
if ($fetchSize <= 0) {
throw new InvalidArgumentException('Fetch size must be greater than 0, got ' . $fetchSize);
}

$this->fetchSize = $fetchSize;

return $this;
}

public function withMaximum(int $maximum) : self
{
if ($maximum <= 0) {
throw new InvalidArgumentException('Maximum must be greater than 0, got ' . $maximum);
}

$this->maximum = $maximum;

return $this;
}

public function withSchema(Schema $schema) : self
{
$this->schema = $schema;

return $this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,38 @@
use Flow\PostgreSql\Client\Client;
use Flow\PostgreSql\QueryBuilder\SqlQuery;

/**
* Create a PostgreSQL cursor extractor using server-side cursors for memory-efficient extraction.
*
* Uses DECLARE CURSOR + FETCH to stream data without loading entire result set into memory.
* This is the only way to achieve true low memory extraction with PHP's ext-pgsql.
*
* Note: Requires a transaction context (auto-started if not in one).
*
* @param Client $client PostgreSQL client
* @param SqlQuery|string $query SQL query to execute (wrapped in DECLARE CURSOR)
* @param array<int, mixed> $parameters Positional parameters for the query
* @param int $fetchSize Number of rows to fetch per batch (default: 1000)
* @param null|int $maximum Maximum number of rows to extract (null for unlimited)
*/
#[DocumentationDSL(module: Module::POSTGRESQL, type: DSLType::EXTRACTOR)]
function from_pgsql_cursor(
Client $client,
string|SqlQuery $query,
array $parameters = [],
int $fetchSize = 1000,
?int $maximum = null,
) : PostgreSqlCursorExtractor {
$extractor = (new PostgreSqlCursorExtractor($client, $query, $parameters))
->withFetchSize($fetchSize);

if ($maximum !== null) {
$extractor->withMaximum($maximum);
}

return $extractor;
}

#[DocumentationDSL(module: Module::POSTGRESQL, type: DSLType::EXTRACTOR)]
function from_pgsql_limit_offset(
Client $client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\ETL\Adapter\PostgreSql\Tests\Benchmark;

use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, from_pgsql_limit_offset, pgsql_pagination_key_asc, pgsql_pagination_key_set, to_pgsql_table};
use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_cursor, from_pgsql_key_set, from_pgsql_limit_offset, pgsql_pagination_key_asc, pgsql_pagination_key_set, to_pgsql_table};
use function Flow\ETL\DSL\{config, df, flow_context};
use function Flow\PostgreSql\DSL\{asc, col, column, create, data_type_double_precision, data_type_integer, data_type_jsonb, data_type_text, data_type_timestamptz, data_type_uuid, drop, pgsql_client, pgsql_connection_dsn, pgsql_mapper, select, star, table};
use Flow\ETL\Tests\Double\FakeStaticOrdersExtractor;
Expand Down Expand Up @@ -50,6 +50,18 @@ public function __destruct()
$this->client->close();
}

public function bench_extract_10k_cursor() : void
{
$context = flow_context(config());

foreach (from_pgsql_cursor(
$this->client,
select(star())->from(table(self::TABLE_NAME)),
fetchSize: 1000
)->extract($context) as $rows) {
}
}

public function bench_extract_10k_keyset() : void
{
$context = flow_context(config());
Expand Down
Loading
Loading