Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ jobs:
sudo -u postgres createuser --superuser --login $USER
sudo -u postgres createdb $USER
sudo -u postgres psql -c 'ALTER SYSTEM SET max_prepared_transactions TO 1000;'
sudo -u postgres psql -c 'ALTER SYSTEM SET wal_level TO logical;'
sudo service postgresql restart
bash integration/setup.sh
sudo apt update && sudo apt install -y python3-virtualenv mold
Expand Down
3 changes: 3 additions & 0 deletions pgdog-config/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ pub struct Database {
pub read_only: Option<bool>,
/// Server lifetime.
pub server_lifetime: Option<u64>,
/// Used for resharding only.
#[serde(default)]
pub resharding_only: bool,
}

impl Database {
Expand Down
3 changes: 3 additions & 0 deletions pgdog-stats/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ pub struct Config {
pub lsn_check_delay: Duration,
/// Automatic role detection enabled.
pub role_detection: bool,
/// Used for resharding only.
pub resharding_only: bool,
}

impl Default for Config {
Expand Down Expand Up @@ -360,6 +362,7 @@ impl Default for Config {
lsn_check_timeout: Duration::from_millis(5_000),
lsn_check_delay: Duration::from_millis(5_000),
role_detection: false,
resharding_only: false,
}
}
}
5 changes: 5 additions & 0 deletions pgdog-stats/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ impl Lsn {
let low = ((lsn & 0xFFFF_FFFF) as u32) as i64;
Self { high, low, lsn }
}

/// Replication lag in bytes.
pub fn distance_bytes(&self, other: &Lsn) -> i64 {
self.lsn - other.lsn
}
}

impl FromDataType for Lsn {
Expand Down
1 change: 1 addition & 0 deletions pgdog/src/backend/pool/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl Config {
lsn_check_timeout: Duration::from_millis(general.lsn_check_timeout),
lsn_check_delay: Duration::from_millis(general.lsn_check_delay),
role_detection: database.role == Role::Auto,
resharding_only: database.resharding_only,
..Default::default()
},
}
Expand Down
6 changes: 5 additions & 1 deletion pgdog/src/backend/pool/lb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,11 @@ impl LoadBalancer {
use LoadBalancingStrategy::*;
use ReadWriteSplit::*;

let mut candidates: Vec<&Target> = self.targets.iter().collect();
let mut candidates: Vec<&Target> = self
.targets
.iter()
.filter(|target| !target.pool.config().resharding_only) // Don't let reads on resharding-only replicas.
.collect();

let primary_reads = match self.rw_split {
IncludePrimary => true,
Expand Down
65 changes: 45 additions & 20 deletions pgdog/src/backend/replication/logical/publisher/publisher_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::time::Duration;

use pgdog_config::QueryParserEngine;
use tokio::{select, spawn};
use tokio::{select, spawn, time::interval};
use tracing::{debug, error, info};

use super::super::{publisher::Table, Error};
Expand Down Expand Up @@ -121,23 +121,32 @@ impl Publisher {
.ok_or(Error::NoReplicationSlot(number))?;
stream.set_current_lsn(slot.lsn().lsn);

let mut check_lag = interval(Duration::from_secs(1));

// Replicate in parallel.
let handle = spawn(async move {
slot.start_replication().await?;
let progress = Progress::new_stream();

loop {
select! {
// This is cancellation-safe.
replication_data = slot.replicate(Duration::MAX) => {
let replication_data = replication_data?;

match replication_data {
Ok(Some(ReplicationData::CopyData(data))) => {
let lsn = if let Some(ReplicationMeta::KeepAlive(ka)) = data.replication_meta() {
// If the LSN hasn't moved, we reached the end of the stream.
// If Postgres is getting requesting reply, provide our LSN now.
if !stream.set_current_lsn(ka.wal_end) || ka.reply() {
Some(ReplicationData::CopyData(data)) => {
let lsn = if let Some(ReplicationMeta::KeepAlive(ka)) =
data.replication_meta()
{
if ka.reply() {
slot.status_update(stream.status_update()).await?;
}
debug!("origin at lsn {} [{}]", Lsn::from_i64(ka.wal_end), slot.server()?.addr());
debug!(
"origin at lsn {} [{}]",
Lsn::from_i64(ka.wal_end),
slot.server()?.addr()
);
ka.wal_end
} else {
if let Some(status_update) = stream.handle(data).await? {
Expand All @@ -147,16 +156,23 @@ impl Publisher {
};
progress.update(stream.bytes_sharded(), lsn);
}
Ok(Some(ReplicationData::CopyDone)) => (),
Ok(None) => {
Some(ReplicationData::CopyDone) => (),
None => {
slot.drop_slot().await?;
break;
}
Err(err) => {
return Err(err);
}
}
}

_ = check_lag.tick() => {
let lag = slot.replication_lag().await?;

info!(
"replication lag at {} bytes [{}]",
lag,
slot.server()?.addr()
);
}
}
}

Expand Down Expand Up @@ -195,16 +211,25 @@ impl Publisher {
Table::load(&self.publication, &mut primary, self.query_parser_engine).await?;

let include_primary = !shard.has_replicas();
let replicas = shard
.pools_with_roles()
let resharding_only = shard
.pools()
.into_iter()
.filter(|(r, _)| match *r {
Role::Replica => true,
Role::Primary => include_primary,
Role::Auto => false,
})
.map(|(_, p)| p)
.filter(|pool| pool.config().resharding_only)
.collect::<Vec<_>>();
let replicas = if resharding_only.is_empty() {
shard
.pools_with_roles()
.into_iter()
.filter(|(r, _)| match *r {
Role::Replica => true,
Role::Primary => include_primary,
Role::Auto => false,
})
.map(|(_, p)| p)
.collect::<Vec<_>>()
} else {
resharding_only
};

let manager = ParallelSyncManager::new(tables, replicas, dest)?;
let tables = manager.run().await?;
Expand Down
163 changes: 160 additions & 3 deletions pgdog/src/backend/replication/logical/publisher/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use super::super::Error;
use crate::{
backend::{self, pool::Address, ConnectReason, Server, ServerOptions},
net::{
replication::StatusUpdate, CopyData, CopyDone, DataRow, ErrorResponse, Format, FromBytes,
Protocol, Query, ToBytes,
replication::{StatusUpdate, XLogData},
CopyData, CopyDone, DataRow, ErrorResponse, Format, FromBytes, Protocol, Query, ToBytes,
},
util::random_string,
};
Expand Down Expand Up @@ -46,6 +46,7 @@ pub struct ReplicationSlot {
dropped: bool,
server: Option<Server>,
kind: SlotKind,
server_meta: Option<Server>,
}

impl ReplicationSlot {
Expand All @@ -62,6 +63,7 @@ impl ReplicationSlot {
dropped: false,
server: None,
kind: SlotKind::Replication,
server_meta: None,
}
}

Expand All @@ -78,6 +80,7 @@ impl ReplicationSlot {
dropped: true, // Temporary.
server: None,
kind: SlotKind::DataSync,
server_meta: None,
}
}

Expand All @@ -95,6 +98,37 @@ impl ReplicationSlot {
Ok(())
}

/// Get or create a separate server connection for meta commands.
pub async fn server_meta(&mut self) -> Result<&mut Server, Error> {
if let Some(ref mut server) = self.server_meta {
Ok(server)
} else {
self.server_meta = Some(
Server::connect(
&self.address,
ServerOptions::default(),
ConnectReason::Replication,
)
.await?,
);
Ok(self.server_meta.as_mut().unwrap())
}
}

/// Replication lag in bytes for this slot.
pub async fn replication_lag(&mut self) -> Result<i64, Error> {
let query = format!(
"SELECT pg_current_wal_lsn() - confirmed_flush_lsn \
FROM pg_replication_slots \
WHERE slot_name = '{}'",
self.name
);
let mut lag: Vec<i64> = self.server_meta().await?.fetch_all(&query).await?;

lag.pop()
.ok_or(Error::MissingReplicationSlot(self.name.clone()))
}

pub fn server(&mut self) -> Result<&mut Server, Error> {
self.server.as_mut().ok_or(Error::NotConnected)
}
Expand Down Expand Up @@ -294,14 +328,28 @@ impl ReplicationSlot {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ReplicationData {
CopyData(CopyData),
CopyDone,
}

impl ReplicationData {
pub fn xlog_data(&self) -> Option<XLogData> {
if let Self::CopyData(copy_data) = self {
copy_data.xlog_data()
} else {
None
}
}
}

#[cfg(test)]
mod test {
use tokio::spawn;

use crate::{backend::server::test::test_server, net::replication::xlog_data::XLogPayload};

use super::*;

#[test]
Expand All @@ -312,4 +360,113 @@ mod test {
let lsn = lsn.to_string();
assert_eq!(lsn, original);
}

#[tokio::test]
async fn test_real_lsn() {
let result: Vec<String> = test_server()
.await
.fetch_all("SELECT pg_current_wal_lsn()")
.await
.unwrap();
let lsn = Lsn::from_str(&result[0]).unwrap();
let lsn_2 = Lsn::from_i64(lsn.lsn);
assert_eq!(lsn.to_string(), result[0]);
assert_eq!(lsn, lsn_2);
}

#[tokio::test]
async fn test_slot_replication() {
use tokio::sync::mpsc::*;
crate::logger();

let mut server = test_server().await;

server
.execute("CREATE TABLE IF NOT EXISTS public.test_slot_replication(id BIGINT)")
.await
.unwrap();
let _ = server
.execute("DROP PUBLICATION test_slot_replication")
.await;
let _ = server
.execute(format!(
"SELECT pg_drop_replication_slot(test_slot_replication)"
))
.await;
server
.execute(
"CREATE PUBLICATION test_slot_replication FOR TABLE public.test_slot_replication",
)
.await
.unwrap();

let addr = server.addr();

let mut slot = ReplicationSlot::replication(
"test_slot_replication",
addr,
Some("test_slot_replication".into()),
);
let _ = slot.create_slot().await.unwrap();
slot.connect().await.unwrap();

let (tx, mut rx) = channel(16);

let handle = spawn(async move {
slot.start_replication().await?;
server
.execute("INSERT INTO test_slot_replication (id) VALUES (1)")
.await?;

loop {
let message = slot.replicate(Duration::MAX).await?;
tx.send(message.clone()).await.unwrap();

if let Some(message) = message {
match message.clone() {
ReplicationData::CopyData(copy_data) => match copy_data.xlog_data() {
Some(xlog_data) => {
if let Some(XLogPayload::Commit(_)) = xlog_data.payload() {
slot.stop_replication().await?;
}
}
_ => (),
},
ReplicationData::CopyDone => (),
}
} else {
break;
}
}

slot.drop_slot().await?;

Ok::<(), Error>(())
});

let mut got_row = false;

while let Some(message) = rx.recv().await {
let payload = message
.and_then(|message| message.xlog_data())
.and_then(|payload| payload.payload());
if let Some(payload) = payload {
match payload {
XLogPayload::Relation(relation) => {
assert_eq!(relation.name, "test_slot_replication")
}
XLogPayload::Insert(insert) => {
assert_eq!(insert.column(0).unwrap().as_str().unwrap(), "1")
}
XLogPayload::Begin(_) => (),
XLogPayload::Commit(_) => got_row = true,
_ => panic!("{:#?}", payload),
}
}
}

assert!(got_row);

handle.await.unwrap().unwrap();
}
}
Loading
Loading