A Rust implementation of a PostgreSQL logical replication client that connects to a database, creates replication slots, and displays changes in real-time. This project now uses the pg-walstream library for robust PostgreSQL logical replication protocol handling.
Originally based on the C++ implementation from https://github.com/fkfk000/replication_checker, this version leverages modern Rust async capabilities and the pg-walstream library for production-ready replication streaming.
- Built on pg-walstream: Leverages the robust pg-walstream library for protocol handling
- Full Protocol Support: Implements PostgreSQL logical replication protocol versions 1-4
- Streaming Transactions: Support for streaming large transactions (protocol v2+)
- Automatic Retry Logic: Built-in connection management with exponential backoff
- Thread-Safe LSN Tracking: Atomic LSN feedback for proper WAL management
- Zero-Copy Operations: Efficient buffer management using the
bytescrate - Real-time Change Display: Shows INSERT, UPDATE, DELETE, TRUNCATE operations as they happen
- Graceful Shutdown: Proper cleanup with Ctrl+C signal handling
- Comprehensive Logging: Uses tracing for structured logging and debugging
- PostgreSQL server version 14+ with logical replication enabled (
wal_level = logical) - A publication created on the source database
- Rust 1.70+ with Cargo
-
Enable logical replication in your PostgreSQL configuration:
ALTER SYSTEM SET wal_level = logical; ALTER SYSTEM SET max_replication_slots = 4; ALTER SYSTEM SET max_wal_senders = 4; -- Restart PostgreSQL server after these changes
-
Create a publication for the tables you want to replicate:
CREATE PUBLICATION my_publication FOR TABLE table1, table2; -- Or for all tables: CREATE PUBLICATION my_publication FOR ALL TABLES; -
Create a user with replication privileges:
CREATE USER replicator WITH REPLICATION LOGIN PASSWORD 'password'; GRANT SELECT ON ALL TABLES IN SCHEMA public TO replicator; GRANT USAGE ON SCHEMA public TO replicator;
Important
PostgreSQL version must be 14 or higher. The pg-walstream library requires PostgreSQL 14+ for full protocol support.
git clone https://github.com/isdaniel/replication_checker_rs.git
cd replication_checker_rs
cargo build --releasedocker build -t pg_replica_rs .The pg-walstream library (used internally) requires libpq development libraries:
Ubuntu/Debian:
sudo apt-get install libpq-dev clang libclang-dev CentOS/RHEL/Fedora:
sudo yum install postgresql-devel
# or
sudo dnf install postgresql-develmacOS:
brew install postgresqlThe application uses environment variables for configuration and automatically handles connection management through pg-walstream:
# Set required environment variables
export DB_CONNECTION_STRING="postgresql://username:password@host:port/database?replication=database"
export slot_name="my_slot"
export pub_name="my_publication"
# Run the application
./target/release/pg_replica_rs
# Example with full connection string
export DB_CONNECTION_STRING="postgresql://postgres:test.123@127.0.0.1:5432/postgres?replication=database"
export slot_name="cdc_slot1"
export pub_name="cdc_pub"
./target/release/pg_replica_rs
# Using a .env file for configuration
set -a; source .env; set +a
./target/release/pg_replica_rsThe DB_CONNECTION_STRING must include the replication=database parameter. If not present, the application will automatically add it:
# Basic format (replication parameter required)
DB_CONNECTION_STRING="postgresql://username:password@host:port/database?replication=database"
# With SSL settings
DB_CONNECTION_STRING="postgresql://username:password@host:port/database?replication=database&sslmode=require"
# With connection timeout
DB_CONNECTION_STRING="postgresql://username:password@host:port/database?replication=database&connect_timeout=10"
# Examples
DB_CONNECTION_STRING="postgresql://replicator:secret@localhost:5432/mydb?replication=database"
DB_CONNECTION_STRING="postgresql://postgres:test.123@127.0.0.1:5432/postgres?replication=database&sslmode=prefer"# Basic run with environment variables
docker run \
-e DB_CONNECTION_STRING="postgresql://postgres:secret@host.docker.internal:5432/mydb?replication=database" \
-e slot_name=my_slot \
-e pub_name=my_pub \
pg_replica_rs
# With custom logging configuration
docker run \
-e DB_CONNECTION_STRING="postgresql://postgres:secret@host.docker.internal:5432/mydb?replication=database" \
-e slot_name=my_slot \
-e pub_name=my_pub \
-e LOG_OUTPUT=all \
-e LOG_JSON_FORMAT=true \
-v $(pwd)/logs:/app/logs \
pg_replica_rsRequired Environment Variables:
DB_CONNECTION_STRING: PostgreSQL connection string withreplication=databaseparameter (required)
Replication Configuration:
slot_name: Name of the replication slot to create/use (default: "sub")pub_name: Name of the publication to subscribe to (default: "pub")
Logging Configuration:
LOG_OUTPUT: Where to send logs -console,file, orall(default: console)LOG_DIRECTORY: Directory for log files (default: "./logs")LOG_FILE_PREFIX: Prefix for log file names (default: "replication")LOG_ROTATION: Log rotation policy -never,hourly,daily,weekly(default: daily)LOG_CONSOLE_LEVEL: Console log level (default: info)LOG_FILE_LEVEL: File log level (default: debug)LOG_JSON_FORMAT: Enable JSON format for file logs -true/false(default: false)LOG_ANSI_ENABLED: Enable colors in console output -true/false(default: true)
Legacy Logging Control:
RUST_LOG: Traditional Rust log level control (overrides other settings if used)
The application supports sophisticated logging configurations:
# Console only with info level
LOG_OUTPUT=console LOG_CONSOLE_LEVEL=info ./target/release/pg_replica_rs ...
# File only with JSON format for analysis
LOG_OUTPUT=file LOG_JSON_FORMAT=true LOG_FILE_LEVEL=debug ./target/release/pg_replica_rs ...
# Both console and file with different levels
LOG_OUTPUT=all LOG_CONSOLE_LEVEL=info LOG_FILE_LEVEL=debug ./target/release/pg_replica_rs ...
# Custom log directory and file prefix
LOG_OUTPUT=file LOG_DIRECTORY=/var/log/postgres LOG_FILE_PREFIX=replication ./target/release/pg_replica_rs ...When running, you'll see structured output like:
2024-12-26T10:30:00.123Z INFO pg_replica_rs: Logging initialized: output=Console, json=false
2024-12-26T10:30:00.124Z INFO pg_replica_rs: Slot name: my_slot
2024-12-26T10:30:00.124Z INFO pg_replica_rs: Publication name: my_publication
2024-12-26T10:30:00.125Z INFO pg_replica_rs: Using connection string with replication enabled
2024-12-26T10:30:00.126Z INFO pg_replica_rs: Creating logical replication stream
2024-12-26T10:30:00.127Z INFO pg_replica_rs: Starting replication stream from latest position
2024-12-26T10:30:00.128Z INFO pg_replica_rs: Processing replication events (Press Ctrl+C to stop)...
2024-12-26T10:30:01.200Z INFO pg_replica_rs: Event: ChangeEvent { event_type: Begin { transaction_id: 12345, commit_timestamp: 2024-12-26T10:30:01Z }, lsn: Some(Lsn(0x1A2B3C00)), metadata: None }
2024-12-26T10:30:01.201Z INFO pg_replica_rs: Event: ChangeEvent { event_type: Insert { schema: "public", table: "users", relation_oid: 16384, data: {"id": "100", "name": "John Doe", "email": "john@example.com"} }, lsn: Some(Lsn(0x1A2B3C4D)), metadata: None }
2024-12-26T10:30:01.203Z INFO pg_replica_rs: Event: ChangeEvent { event_type: Commit { commit_timestamp: 2024-12-26T10:30:01Z }, lsn: Some(Lsn(0x1A2B3C5D)), metadata: None }
2024-12-26T10:30:02.300Z INFO pg_replica_rs: Event: ChangeEvent { event_type: Begin { transaction_id: 12346, commit_timestamp: 2024-12-26T10:30:02Z }, lsn: Some(Lsn(0x1A2B3C6D)), metadata: None }
2024-12-26T10:30:02.301Z INFO pg_replica_rs: Event: ChangeEvent { event_type: Update { schema: "public", table: "users", relation_oid: 16384, old_data: Some({"id": "100"}), new_data: {"id": "100", "name": "John Smith", "email": "john.smith@example.com"}, replica_identity: Default, key_columns: ["id"] }, lsn: Some(Lsn(0x1A2B3C7D)), metadata: None }
2024-12-26T10:30:02.302Z INFO pg_replica_rs: Event: ChangeEvent { event_type: Commit { commit_timestamp: 2024-12-26T10:30:02Z }, lsn: Some(Lsn(0x1A2B3C8D)), metadata: None }
When LOG_JSON_FORMAT=true is set, structured JSON logs are generated:
{
"timestamp": "2024-09-07T10:30:01.201Z",
"level": "INFO",
"fields": {
"event_type": "table_operation",
"operation": "INSERT",
"table_schema": "public",
"table_name": "users",
"data": {
"id": "100",
"name": "John Doe",
"email": "john@example.com"
},
"message": "Table operation: INSERT on public.users"
},
"target": "pg_replica_rs"
}
{
"timestamp": "2024-09-07T10:30:01.203Z",
"level": "INFO",
"fields": {
"event_type": "transaction_commit",
"flags": 0,
"commit_lsn": "0/1A2B3C4D",
"end_lsn": "0/1A2B3C5D",
"commit_time": "2024-09-07T10:30:01.203Z",
"message": "Transaction committed with details"
},
"target": "pg_replica_rs"
}The implementation consists of several well-organized modules:
main.rs: Application entry point with clap-based argument parsing and async runtime setupserver.rs: Main replication server that manages PostgreSQL connection and message processingparser.rs: Protocol message parser for PostgreSQL logical replication messagestypes.rs: Data structures for relations, tuples, and replication messagesutils.rs: Utility functions for connection management, byte manipulation, and PostgreSQL integrationlogging.rs: Advanced logging configuration with support for console, file, and JSON outputerrors.rs: Comprehensive error handling with detailed error typesbuffer.rs: Efficient buffer reading and writing for PostgreSQL protocol messages
- Connection Management: Safe wrapper around libpq connections with proper resource cleanup
- Protocol Parsing: Complete implementation of PostgreSQL logical replication protocol v2
- Message Processing: Handlers for all logical replication message types (BEGIN, COMMIT, INSERT, UPDATE, DELETE, TRUNCATE, streaming transactions)
- Feedback System: Implements the feedback protocol to acknowledge processed WAL positions
- Logging System: Structured logging with JSON support for monitoring and analytics
- CLI Interface: Modern command-line interface with proper help and validation
The pg-walstream library (and this application) supports all PostgreSQL logical replication message types:
- BEGIN - Transaction start with XID and timestamp
- COMMIT - Transaction commit with LSN tracking and commit timestamp
- ORIGIN - Replication origin tracking
- RELATION - Table schema information with column metadata
- TYPE - Data type definitions
- INSERT - Row insertions with full data
- UPDATE - Row updates with old/new values (based on replica identity)
- DELETE - Row deletions with key columns
- TRUNCATE - Table truncation with cascade and restart identity options
- MESSAGE - Generic logical decoding messages
- STREAM_START - Streaming transaction start
- STREAM_STOP - Streaming transaction segment end
- STREAM_COMMIT - Streaming transaction commit
- STREAM_ABORT - Streaming transaction abort
- BEGIN_PREPARE - Prepared transaction start
- PREPARE - Transaction prepare
- COMMIT_PREPARED - Commit prepared transaction
- ROLLBACK_PREPARED - Rollback prepared transaction
- STREAM_PREPARE - Stream prepare message
- Keep-alive Messages - Connection health monitoring with automatic feedback
- LSN Feedback - Automatic acknowledgment of processed WAL positions
- Replica Identity Support - Handles DEFAULT, NOTHING, FULL, and INDEX modes
- Flexible Output: Choose between console, file, or both
- JSON Logging: Structured logs for monitoring and analytics integration
- Log Rotation: Daily, hourly, or custom rotation policies
- Configurable Levels: Different log levels for console vs. file output
- Structured Events: Dedicated macros for transaction and table operation logging
- Modern Interface: Built with clap for better help and validation
- Flexible Parameters: Support for all PostgreSQL connection parameters
- Environment Integration: Easy configuration via environment variables
- Currently displays changes in human-readable format (console) or structured JSON (file logging)
- Text data type display optimization (binary types show as raw data)
- Basic replication slot management (creates slot, manual cleanup required on exit)
- Minimal error recovery (will exit on critical errors, but with detailed error context)
- "Connection failed": Check your PostgreSQL server is running and accessible
- "Permission denied": Ensure your user has REPLICATION privilege
- "Replication slot creation failed": The slot might already exist, or you lack privileges
- "libpq not found": Install PostgreSQL development libraries
- "linking failed": Ensure libpq is in your library path
- No data received: Check that your publication includes the tables being modified
- "Unknown relation": The replication stream may be out of sync; restart the application
- Log files not created: Check LOG_DIRECTORY permissions and path validity
- High memory usage: Check LOG_ROTATION settings and ensure log files are being rotated
- Slow startup: Database connection might be slow; check network connectivity and timeouts
# Wrong: Using old environment variable format
RUST_LOG=debug ./target/release/pg_replica_rs ...
# Right: Using new logging configuration
LOG_OUTPUT=all LOG_CONSOLE_LEVEL=info LOG_FILE_LEVEL=debug ./target/release/pg_replica_rs ...
# Wrong: Forgetting to set publication and slot names
./target/release/pg_replica_rs user postgres ...
# Right: Setting required environment variables
export slot_name="my_slot"
export pub_name="my_publication"
./target/release/pg_replica_rs user postgres ...- Start a PostgreSQL instance with logical replication:
-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
-- Restart PostgreSQL after this change
-- Create test database and user
CREATE DATABASE test_replication;
CREATE USER repl_user WITH REPLICATION LOGIN PASSWORD 'repl_pass';
GRANT CONNECT ON DATABASE test_replication TO repl_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO repl_user;- Create test table and publication:
\c test_replication
CREATE TABLE test_table (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT NOW()
);
CREATE PUBLICATION test_pub FOR ALL TABLES;- Run the replication checker:
export slot_name="test_slot"
export pub_name="test_pub"
export LOG_OUTPUT=console
export LOG_CONSOLE_LEVEL=debug
./target/release/pg_replica_rs user repl_user password repl_pass \
host localhost port 5432 dbname test_replication- Generate test data in another terminal:
-- Connect to the test database
\c test_replication
-- Insert some data
INSERT INTO test_table (name) VALUES ('Alice'), ('Bob'), ('Charlie');
-- Update data
UPDATE test_table SET name = 'Alice Smith' WHERE id = 1;
-- Delete data
DELETE FROM test_table WHERE id = 3;
-- Truncate table
TRUNCATE test_table;Check that you see output similar to:
INFO pg_replica_rs: BEGIN: Xid 12345
INFO pg_replica_rs: table public.test_table: INSERT: id: 1 name: Alice ...
INFO pg_replica_rs: COMMIT: flags: 0, lsn: 0/1A2B3C4D ...
The project uses the following key dependencies:
- libpq-sys (0.8): Low-level PostgreSQL libpq bindings
- tokio (1.47.1): Async runtime with full features
- tracing (0.1): Structured logging and tracing
- tracing-subscriber (0.3): Log formatting and filtering with chrono and JSON support
- tracing-appender (0.2.3): Log file rotation and management
- chrono (0.4): DateTime handling with serde support
- thiserror (2.0.12): Ergonomic error handling
- anyhow (1.0): Error context and chaining
This project is licensed under the same terms as the original C++ implementation.