From 1d3d96eaa84ea1fd7cd7479c4d7fed65fb54b363 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 19 Feb 2026 19:54:37 -0800 Subject: [PATCH 1/2] fix: replication stream broke due to parser changes --- pgdog-config/src/database.rs | 3 + pgdog-stats/src/pool.rs | 3 + pgdog-stats/src/replication.rs | 5 + pgdog/src/backend/pool/config.rs | 1 + pgdog/src/backend/pool/lb/mod.rs | 6 +- .../logical/publisher/publisher_impl.rs | 65 ++++--- .../replication/logical/publisher/slot.rs | 163 +++++++++++++++++- .../replication/logical/subscriber/context.rs | 46 ++++- pgdog/src/net/messages/data_row.rs | 8 + .../replication/logical/tuple_data.rs | 2 +- 10 files changed, 276 insertions(+), 26 deletions(-) diff --git a/pgdog-config/src/database.rs b/pgdog-config/src/database.rs index 2bb483c0f..a8683d528 100644 --- a/pgdog-config/src/database.rs +++ b/pgdog-config/src/database.rs @@ -123,6 +123,9 @@ pub struct Database { pub read_only: Option, /// Server lifetime. pub server_lifetime: Option, + /// Used for resharding only. + #[serde(default)] + pub resharding_only: bool, } impl Database { diff --git a/pgdog-stats/src/pool.rs b/pgdog-stats/src/pool.rs index 03fb5ad45..922ff35d6 100644 --- a/pgdog-stats/src/pool.rs +++ b/pgdog-stats/src/pool.rs @@ -325,6 +325,8 @@ pub struct Config { pub lsn_check_delay: Duration, /// Automatic role detection enabled. pub role_detection: bool, + /// Used for resharding only. + pub resharding_only: bool, } impl Default for Config { @@ -360,6 +362,7 @@ impl Default for Config { lsn_check_timeout: Duration::from_millis(5_000), lsn_check_delay: Duration::from_millis(5_000), role_detection: false, + resharding_only: false, } } } diff --git a/pgdog-stats/src/replication.rs b/pgdog-stats/src/replication.rs index 2bd271655..8601d2cd6 100644 --- a/pgdog-stats/src/replication.rs +++ b/pgdog-stats/src/replication.rs @@ -21,6 +21,11 @@ impl Lsn { let low = ((lsn & 0xFFFF_FFFF) as u32) as i64; Self { high, low, lsn } } + + /// Replication lag in bytes. + pub fn distance_bytes(&self, other: &Lsn) -> i64 { + self.lsn - other.lsn + } } impl FromDataType for Lsn { diff --git a/pgdog/src/backend/pool/config.rs b/pgdog/src/backend/pool/config.rs index 33c3300ec..3d3292207 100644 --- a/pgdog/src/backend/pool/config.rs +++ b/pgdog/src/backend/pool/config.rs @@ -143,6 +143,7 @@ impl Config { lsn_check_timeout: Duration::from_millis(general.lsn_check_timeout), lsn_check_delay: Duration::from_millis(general.lsn_check_delay), role_detection: database.role == Role::Auto, + resharding_only: database.resharding_only, ..Default::default() }, } diff --git a/pgdog/src/backend/pool/lb/mod.rs b/pgdog/src/backend/pool/lb/mod.rs index 8e030384e..f6f24662a 100644 --- a/pgdog/src/backend/pool/lb/mod.rs +++ b/pgdog/src/backend/pool/lb/mod.rs @@ -256,7 +256,11 @@ impl LoadBalancer { use LoadBalancingStrategy::*; use ReadWriteSplit::*; - let mut candidates: Vec<&Target> = self.targets.iter().collect(); + let mut candidates: Vec<&Target> = self + .targets + .iter() + .filter(|target| !target.pool.config().resharding_only) // Don't let reads on resharding-only replicas. + .collect(); let primary_reads = match self.rw_split { IncludePrimary => true, diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index d20aff42d..79c97c9ad 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::time::Duration; use pgdog_config::QueryParserEngine; -use tokio::{select, spawn}; +use tokio::{select, spawn, time::interval}; use tracing::{debug, error, info}; use super::super::{publisher::Table, Error}; @@ -121,6 +121,8 @@ impl Publisher { .ok_or(Error::NoReplicationSlot(number))?; stream.set_current_lsn(slot.lsn().lsn); + let mut check_lag = interval(Duration::from_secs(1)); + // Replicate in parallel. let handle = spawn(async move { slot.start_replication().await?; @@ -128,16 +130,23 @@ impl Publisher { loop { select! { + // This is cancellation-safe. replication_data = slot.replicate(Duration::MAX) => { + let replication_data = replication_data?; + match replication_data { - Ok(Some(ReplicationData::CopyData(data))) => { - let lsn = if let Some(ReplicationMeta::KeepAlive(ka)) = data.replication_meta() { - // If the LSN hasn't moved, we reached the end of the stream. - // If Postgres is getting requesting reply, provide our LSN now. - if !stream.set_current_lsn(ka.wal_end) || ka.reply() { + Some(ReplicationData::CopyData(data)) => { + let lsn = if let Some(ReplicationMeta::KeepAlive(ka)) = + data.replication_meta() + { + if ka.reply() { slot.status_update(stream.status_update()).await?; } - debug!("origin at lsn {} [{}]", Lsn::from_i64(ka.wal_end), slot.server()?.addr()); + debug!( + "origin at lsn {} [{}]", + Lsn::from_i64(ka.wal_end), + slot.server()?.addr() + ); ka.wal_end } else { if let Some(status_update) = stream.handle(data).await? { @@ -147,16 +156,23 @@ impl Publisher { }; progress.update(stream.bytes_sharded(), lsn); } - Ok(Some(ReplicationData::CopyDone)) => (), - Ok(None) => { + Some(ReplicationData::CopyDone) => (), + None => { slot.drop_slot().await?; break; } - Err(err) => { - return Err(err); - } } } + + _ = check_lag.tick() => { + let lag = slot.replication_lag().await?; + + info!( + "replication lag at {} bytes [{}]", + lag, + slot.server()?.addr() + ); + } } } @@ -195,16 +211,25 @@ impl Publisher { Table::load(&self.publication, &mut primary, self.query_parser_engine).await?; let include_primary = !shard.has_replicas(); - let replicas = shard - .pools_with_roles() + let resharding_only = shard + .pools() .into_iter() - .filter(|(r, _)| match *r { - Role::Replica => true, - Role::Primary => include_primary, - Role::Auto => false, - }) - .map(|(_, p)| p) + .filter(|pool| pool.config().resharding_only) .collect::>(); + let replicas = if resharding_only.is_empty() { + shard + .pools_with_roles() + .into_iter() + .filter(|(r, _)| match *r { + Role::Replica => true, + Role::Primary => include_primary, + Role::Auto => false, + }) + .map(|(_, p)| p) + .collect::>() + } else { + resharding_only + }; let manager = ParallelSyncManager::new(tables, replicas, dest)?; let tables = manager.run().await?; diff --git a/pgdog/src/backend/replication/logical/publisher/slot.rs b/pgdog/src/backend/replication/logical/publisher/slot.rs index 7269de953..afbe51832 100644 --- a/pgdog/src/backend/replication/logical/publisher/slot.rs +++ b/pgdog/src/backend/replication/logical/publisher/slot.rs @@ -2,8 +2,8 @@ use super::super::Error; use crate::{ backend::{self, pool::Address, ConnectReason, Server, ServerOptions}, net::{ - replication::StatusUpdate, CopyData, CopyDone, DataRow, ErrorResponse, Format, FromBytes, - Protocol, Query, ToBytes, + replication::{StatusUpdate, XLogData}, + CopyData, CopyDone, DataRow, ErrorResponse, Format, FromBytes, Protocol, Query, ToBytes, }, util::random_string, }; @@ -46,6 +46,7 @@ pub struct ReplicationSlot { dropped: bool, server: Option, kind: SlotKind, + server_meta: Option, } impl ReplicationSlot { @@ -62,6 +63,7 @@ impl ReplicationSlot { dropped: false, server: None, kind: SlotKind::Replication, + server_meta: None, } } @@ -78,6 +80,7 @@ impl ReplicationSlot { dropped: true, // Temporary. server: None, kind: SlotKind::DataSync, + server_meta: None, } } @@ -95,6 +98,37 @@ impl ReplicationSlot { Ok(()) } + /// Get or create a separate server connection for meta commands. + pub async fn server_meta(&mut self) -> Result<&mut Server, Error> { + if let Some(ref mut server) = self.server_meta { + Ok(server) + } else { + self.server_meta = Some( + Server::connect( + &self.address, + ServerOptions::default(), + ConnectReason::Replication, + ) + .await?, + ); + Ok(self.server_meta.as_mut().unwrap()) + } + } + + /// Replication lag in bytes for this slot. + pub async fn replication_lag(&mut self) -> Result { + let query = format!( + "SELECT pg_current_wal_lsn() - confirmed_flush_lsn \ + FROM pg_replication_slots \ + WHERE slot_name = '{}'", + self.name + ); + let mut lag: Vec = self.server_meta().await?.fetch_all(&query).await?; + + lag.pop() + .ok_or(Error::MissingReplicationSlot(self.name.clone())) + } + pub fn server(&mut self) -> Result<&mut Server, Error> { self.server.as_mut().ok_or(Error::NotConnected) } @@ -294,14 +328,28 @@ impl ReplicationSlot { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ReplicationData { CopyData(CopyData), CopyDone, } +impl ReplicationData { + pub fn xlog_data(&self) -> Option { + if let Self::CopyData(copy_data) = self { + copy_data.xlog_data() + } else { + None + } + } +} + #[cfg(test)] mod test { + use tokio::spawn; + + use crate::{backend::server::test::test_server, net::replication::xlog_data::XLogPayload}; + use super::*; #[test] @@ -312,4 +360,113 @@ mod test { let lsn = lsn.to_string(); assert_eq!(lsn, original); } + + #[tokio::test] + async fn test_real_lsn() { + let result: Vec = test_server() + .await + .fetch_all("SELECT pg_current_wal_lsn()") + .await + .unwrap(); + let lsn = Lsn::from_str(&result[0]).unwrap(); + let lsn_2 = Lsn::from_i64(lsn.lsn); + assert_eq!(lsn.to_string(), result[0]); + assert_eq!(lsn, lsn_2); + } + + #[tokio::test] + async fn test_slot_replication() { + use tokio::sync::mpsc::*; + crate::logger(); + + let mut server = test_server().await; + + server + .execute("CREATE TABLE IF NOT EXISTS public.test_slot_replication(id BIGINT)") + .await + .unwrap(); + let _ = server + .execute("DROP PUBLICATION test_slot_replication") + .await; + let _ = server + .execute(format!( + "SELECT pg_drop_replication_slot(test_slot_replication)" + )) + .await; + server + .execute( + "CREATE PUBLICATION test_slot_replication FOR TABLE public.test_slot_replication", + ) + .await + .unwrap(); + + let addr = server.addr(); + + let mut slot = ReplicationSlot::replication( + "test_slot_replication", + addr, + Some("test_slot_replication".into()), + ); + let _ = slot.create_slot().await.unwrap(); + slot.connect().await.unwrap(); + + let (tx, mut rx) = channel(16); + + let handle = spawn(async move { + slot.start_replication().await?; + server + .execute("INSERT INTO test_slot_replication (id) VALUES (1)") + .await?; + + loop { + let message = slot.replicate(Duration::MAX).await?; + tx.send(message.clone()).await.unwrap(); + + if let Some(message) = message { + match message.clone() { + ReplicationData::CopyData(copy_data) => match copy_data.xlog_data() { + Some(xlog_data) => { + if let Some(XLogPayload::Commit(_)) = xlog_data.payload() { + slot.stop_replication().await?; + } + } + _ => (), + }, + ReplicationData::CopyDone => (), + } + } else { + break; + } + } + + slot.drop_slot().await?; + + Ok::<(), Error>(()) + }); + + let mut got_row = false; + + while let Some(message) = rx.recv().await { + let payload = message + .and_then(|message| message.xlog_data()) + .and_then(|payload| payload.payload()); + if let Some(payload) = payload { + match payload { + XLogPayload::Relation(relation) => { + assert_eq!(relation.name, "test_slot_replication") + } + XLogPayload::Insert(insert) => { + assert_eq!(insert.column(0).unwrap().as_str().unwrap(), "1") + } + XLogPayload::Begin(_) => (), + XLogPayload::Commit(_) => got_row = true, + _ => panic!("{:#?}", payload), + } + } + } + + assert!(got_row); + + handle.await.unwrap().unwrap(); + } } diff --git a/pgdog/src/backend/replication/logical/subscriber/context.rs b/pgdog/src/backend/replication/logical/subscriber/context.rs index 405f60e0c..c373bd196 100644 --- a/pgdog/src/backend/replication/logical/subscriber/context.rs +++ b/pgdog/src/backend/replication/logical/subscriber/context.rs @@ -4,7 +4,9 @@ use super::super::Error; use crate::{ backend::Cluster, frontend::{ - client::Sticky, router::parser::Shard, ClientRequest, Command, Router, RouterContext, + client::Sticky, + router::parser::{AstContext, Cache, Shard}, + BufferedQuery, ClientRequest, Command, PreparedStatements, Router, RouterContext, }, net::{replication::TupleData, Bind, Parameters, Parse}, }; @@ -14,6 +16,7 @@ pub struct StreamContext<'a> { request: ClientRequest, cluster: &'a Cluster, bind: Bind, + parse: Parse, } impl<'a> StreamContext<'a> { @@ -26,6 +29,7 @@ impl<'a> StreamContext<'a> { request, cluster, bind, + parse: stmt.clone(), } } @@ -51,6 +55,16 @@ impl<'a> StreamContext<'a> { lazy_static! { static ref PARAMS: Parameters = Parameters::default(); } + + let ast_context = AstContext::from_cluster(&self.cluster, &PARAMS); + + let ast = Cache::get().query( + &BufferedQuery::Prepared(self.parse.clone()), + &ast_context, + &mut PreparedStatements::default(), + )?; + self.request.ast = Some(ast); + Ok(RouterContext::new( &self.request, self.cluster, @@ -60,3 +74,33 @@ impl<'a> StreamContext<'a> { )?) } } + +#[cfg(test)] +mod test { + use bytes::Bytes; + + use crate::{ + config::config, + net::replication::logical::tuple_data::{Column, Identifier}, + }; + + use super::*; + + #[test] + fn test_stream_context() { + let cluster = Cluster::new_test(&config()); + let tuple = TupleData { + columns: vec![Column { + identifier: Identifier::Format(pgdog_postgres_types::Format::Text), + len: 1, + data: Bytes::from("1"), + }], + }; + let parse = Parse::new_anonymous("INSERT INTO sharded (customer_id) VALUES ($1)"); + + let shard = StreamContext::new(&cluster, &tuple, &parse) + .shard() + .unwrap(); + assert!(matches!(shard, Shard::Direct(_))); + } +} diff --git a/pgdog/src/net/messages/data_row.rs b/pgdog/src/net/messages/data_row.rs index dd63a0ffb..733199093 100644 --- a/pgdog/src/net/messages/data_row.rs +++ b/pgdog/src/net/messages/data_row.rs @@ -6,6 +6,7 @@ use super::{ code, prelude::*, Datum, Double, Float, Format, FromDataType, Numeric, RowDescription, }; pub use pgdog_postgres_types::{Data, ToDataRowColumn}; +use pgdog_stats::Lsn; /// DataRow message. #[derive(Debug, Clone, Hash, Eq, PartialEq)] @@ -291,3 +292,10 @@ mod test { assert_eq!(dr.get::(1, Format::Text).unwrap(), "c"); } } + +impl From for Lsn { + fn from(value: DataRow) -> Self { + let value = value.get::(0, Format::Text); + value.unwrap_or_default() + } +} diff --git a/pgdog/src/net/messages/replication/logical/tuple_data.rs b/pgdog/src/net/messages/replication/logical/tuple_data.rs index 71b1bcfc5..724e2474a 100644 --- a/pgdog/src/net/messages/replication/logical/tuple_data.rs +++ b/pgdog/src/net/messages/replication/logical/tuple_data.rs @@ -7,7 +7,7 @@ use super::super::super::bind::Format; use super::super::super::prelude::*; use super::string::unescape; -#[derive(Clone)] +#[derive(Clone, Default)] pub struct TupleData { pub columns: Vec, } From cc21a5d21868e4d62bd6c841fde7235a984966b3 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 19 Feb 2026 20:00:00 -0800 Subject: [PATCH 2/2] ok then --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d0f5635de..ff7998bb1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,6 +62,7 @@ jobs: sudo -u postgres createuser --superuser --login $USER sudo -u postgres createdb $USER sudo -u postgres psql -c 'ALTER SYSTEM SET max_prepared_transactions TO 1000;' + sudo -u postgres psql -c 'ALTER SYSTEM SET wal_level TO logical;' sudo service postgresql restart bash integration/setup.sh sudo apt update && sudo apt install -y python3-virtualenv mold