From bcf536cb3b9b4fe7c068e6be531fffda144cac7e Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 9 Feb 2026 10:42:19 +0800 Subject: [PATCH] [Improve](streaming job) support postgres partition table sync (#60560) ### What problem does this PR solve? Related PR: #59461 To enhance partition table synchronization, add `publish_via_partition_root` when creating a PUBLICATION instance, specifically for PG 13+. --- .../streaming/StreamingMultiTblTask.java | 2 +- .../PostgresReplicationConnection.java | 932 ++++++++++++++++++ .../reader/postgres/PostgresSourceReader.java | 12 +- .../test_streaming_postgres_job_partition.out | 10 + ...st_streaming_postgres_job_partition.groovy | 174 ++++ .../test_streaming_postgres_job_priv.groovy | 4 +- 6 files changed, 1129 insertions(+), 5 deletions(-) create mode 100644 fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java create mode 100644 regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 0bdd62628648db..3061cb80c58c9c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -348,7 +348,7 @@ public String getTimeoutReason() { log.warn("Failed to get task timeout reason, response: {}", response); } } catch (ExecutionException | InterruptedException ex) { - log.error("Send get fail reason request failed: ", ex); + log.error("Send get task fail reason request failed: ", ex); } return ""; } diff --git a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java new file mode 100644 index 00000000000000..ac372bbb8cb24e --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -0,0 +1,932 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.postgresql.connection; + +import io.debezium.DebeziumException; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresSchema; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.connector.postgresql.spi.SlotCreationResult; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.jdbc.JdbcConnectionException; +import io.debezium.relational.RelationalTableFilters; +import io.debezium.relational.TableId; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; +import org.apache.kafka.connect.errors.ConnectException; +import org.postgresql.core.BaseConnection; +import org.postgresql.core.ServerVersion; +import org.postgresql.replication.PGReplicationStream; +import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; +import org.postgresql.util.PSQLException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Statement; +import java.time.Duration; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.lang.Math.toIntExact; + +/** + * Copied from Flink Cdc 3.5.0 + * + *

Line 192~199: add publish_via_partition_root for partition table. + */ +public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection { + + private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class); + + private final String slotName; + private final String publicationName; + private final RelationalTableFilters tableFilter; + private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode; + private final PostgresConnectorConfig.LogicalDecoder plugin; + private final boolean dropSlotOnClose; + private final PostgresConnectorConfig connectorConfig; + private final Duration statusUpdateInterval; + private final MessageDecoder messageDecoder; + private final PostgresConnection jdbcConnection; + private final TypeRegistry typeRegistry; + private final Properties streamParams; + + private Lsn defaultStartingPos; + private SlotCreationResult slotCreationInfo; + private boolean hasInitedSlot; + + private Lsn endingPos; + + /** + * Creates a new replication connection with the given params. + * + * @param config the JDBC configuration for the connection; may not be null + * @param slotName the name of the DB slot for logical replication; may not be null + * @param publicationName the name of the DB publication for logical replication; may not be + * null + * @param tableFilter the tables to watch of the DB publication for logical replication; may not + * be null + * @param publicationAutocreateMode the mode for publication autocreation; may not be null + * @param plugin decoder matching the server side plug-in used for streaming changes; may not be + * null + * @param dropSlotOnClose whether the replication slot should be dropped once the connection is + * closed + * @param statusUpdateInterval the interval at which the replication connection should + * periodically send status + * @param doSnapshot whether the connector is doing snapshot + * @param jdbcConnection general PostgreSQL JDBC connection + * @param typeRegistry registry with PostgreSQL types + * @param streamParams additional parameters to pass to the replication stream + * @param schema the schema; must not be null + *

updates to the server + */ + private PostgresReplicationConnection( + PostgresConnectorConfig config, + String slotName, + String publicationName, + RelationalTableFilters tableFilter, + PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode, + PostgresConnectorConfig.LogicalDecoder plugin, + boolean dropSlotOnClose, + boolean doSnapshot, + Duration statusUpdateInterval, + PostgresConnection jdbcConnection, + TypeRegistry typeRegistry, + Properties streamParams, + PostgresSchema schema) { + super( + addDefaultSettings(config.getJdbcConfig()), + PostgresConnection.FACTORY, + null, + null, + "\"", + "\""); + + this.connectorConfig = config; + this.slotName = slotName; + this.publicationName = publicationName; + this.tableFilter = tableFilter; + this.publicationAutocreateMode = publicationAutocreateMode; + this.plugin = plugin; + this.dropSlotOnClose = dropSlotOnClose; + this.statusUpdateInterval = statusUpdateInterval; + this.messageDecoder = + plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection); + this.jdbcConnection = jdbcConnection; + this.typeRegistry = typeRegistry; + this.streamParams = streamParams; + this.slotCreationInfo = null; + this.hasInitedSlot = false; + } + + private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) { + // first copy the parent's default settings... + // then set some additional replication specific settings + return JdbcConfiguration.adapt( + PostgresConnection.addDefaultSettings( + configuration, PostgresConnection.CONNECTION_STREAMING) + .edit() + .with("replication", "database") + .with( + "preferQueryMode", + "simple") // replication protocol only supports simple query mode + .build()); + } + + private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException { + try (PostgresConnection connection = + new PostgresConnection( + connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_SLOT_INFO)) { + return connection.readReplicationSlotInfo(slotName, plugin.getPostgresPluginName()); + } + } + + protected void initPublication() { + String createPublicationStmt; + String tableFilterString = null; + if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) { + LOGGER.info("Initializing PgOutput logical decoder publication"); + try { + // Unless the autocommit is disabled the SELECT publication query will stay running + Connection conn = pgConnection(); + conn.setAutoCommit(false); + + String selectPublication = + String.format( + "SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", + publicationName); + try (Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(selectPublication)) { + if (rs.next()) { + Long count = rs.getLong(1); + // Close eagerly as the transaction might stay running + if (count == 0L) { + LOGGER.info( + "Creating new publication '{}' for plugin '{}'", + publicationName, + plugin); + switch (publicationAutocreateMode) { + case DISABLED: + throw new ConnectException( + "Publication autocreation is disabled, please create one and restart the connector."); + case ALL_TABLES: + boolean supportPartitionRoot = ((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13); + createPublicationStmt = supportPartitionRoot + ? String.format( + "CREATE PUBLICATION %s FOR ALL TABLES WITH (publish_via_partition_root = true);", + publicationName) + : String.format( + "CREATE PUBLICATION %s FOR ALL TABLES;", + publicationName); + LOGGER.info( + "Creating Publication with statement '{}'", + createPublicationStmt); + // Publication doesn't exist, create it. + stmt.execute(createPublicationStmt); + break; + case FILTERED: + createOrUpdatePublicationModeFilterted( + tableFilterString, stmt, false); + break; + } + } else { + switch (publicationAutocreateMode) { + case FILTERED: + createOrUpdatePublicationModeFilterted( + tableFilterString, stmt, true); + break; + default: + LOGGER.trace( + "A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server " + + "and will be used by the plugin", + publicationName, + plugin, + database()); + } + } + } + } + conn.commit(); + conn.setAutoCommit(true); + } catch (SQLException e) { + throw new JdbcConnectionException(e); + } + } + } + + private void createOrUpdatePublicationModeFilterted( + String tableFilterString, Statement stmt, boolean isUpdate) { + String createOrUpdatePublicationStmt; + try { + Set tablesToCapture = determineCapturedTables(); + tableFilterString = + tablesToCapture.stream() + .map(TableId::toDoubleQuotedString) + .collect(Collectors.joining(", ")); + if (tableFilterString.isEmpty()) { + throw new DebeziumException( + String.format( + "No table filters found for filtered publication %s", + publicationName)); + } + createOrUpdatePublicationStmt = + isUpdate + ? String.format( + "ALTER PUBLICATION %s SET TABLE %s;", + publicationName, tableFilterString) + : String.format( + "CREATE PUBLICATION %s FOR TABLE %s;", + publicationName, tableFilterString); + LOGGER.info( + isUpdate + ? "Updating Publication with statement '{}'" + : "Creating Publication with statement '{}'", + createOrUpdatePublicationStmt); + stmt.execute(createOrUpdatePublicationStmt); + } catch (Exception e) { + throw new ConnectException( + String.format( + "Unable to %s filtered publication %s for %s", + isUpdate ? "update" : "create", publicationName, tableFilterString), + e); + } + } + + private Set determineCapturedTables() throws Exception { + Set allTableIds = jdbcConnection.getAllTableIds(connectorConfig.databaseName()); + + Set capturedTables = new HashSet<>(); + + for (TableId tableId : allTableIds) { + if (tableFilter.dataCollectionFilter().isIncluded(tableId)) { + LOGGER.trace("Adding table {} to the list of captured tables", tableId); + capturedTables.add(tableId); + } else { + LOGGER.trace( + "Ignoring table {} as it's not included in the filter configuration", + tableId); + } + } + + return capturedTables.stream() + .sorted() + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + protected void initReplicationSlot() throws SQLException, InterruptedException { + ServerInfo.ReplicationSlot slotInfo = getSlotInfo(); + + boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo; + try { + // there's no info for this plugin and slot so create a new slot + if (shouldCreateSlot) { + this.createReplicationSlot(); + } + + // replication connection does not support parsing of SQL statements so we need to + // create + // the connection without executing on connect statements - see JDBC opt + // preferQueryMode=simple + pgConnection(); + final String identifySystemStatement = "IDENTIFY_SYSTEM"; + LOGGER.debug( + "running '{}' to validate replication connection", identifySystemStatement); + final Lsn xlogStart = + queryAndMap( + identifySystemStatement, + rs -> { + if (!rs.next()) { + throw new IllegalStateException( + "The DB connection is not a valid replication connection"); + } + String xlogpos = rs.getString("xlogpos"); + LOGGER.debug("received latest xlogpos '{}'", xlogpos); + return Lsn.valueOf(xlogpos); + }); + + if (slotCreationInfo != null) { + this.defaultStartingPos = slotCreationInfo.startLsn(); + } else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) { + // this is a new slot or we weren't able to read a valid flush LSN pos, so we always + // start from the xlog pos that was reported + this.defaultStartingPos = xlogStart; + } else { + Lsn latestFlushedLsn = slotInfo.latestFlushedLsn(); + this.defaultStartingPos = + latestFlushedLsn.compareTo(xlogStart) < 0 ? latestFlushedLsn : xlogStart; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("found previous flushed LSN '{}'", latestFlushedLsn); + } + } + hasInitedSlot = true; + } catch (SQLException e) { + throw new JdbcConnectionException(e); + } + } + + // Temporary replication slots is a new feature of PostgreSQL 10 + private boolean useTemporarySlot() throws SQLException { + // Temporary replication slots cannot be used due to connection restart + // when finding WAL position + // return dropSlotOnClose && pgConnection().haveMinimumServerVersion(ServerVersion.v10); + return false; + } + + /** + * creating a replication connection and starting to stream involves a few steps: 1. we create + * the connection and ensure that a. the slot exists b. the slot isn't currently being used 2. + * we query to get our potential start position in the slot (lsn) 3. we try and start streaming, + * depending on our options (such as in wal2json) this may fail, which can result in the + * connection being killed and we need to start the process over if we are using a temporary + * slot 4. actually start the streamer + * + *

This method takes care of all of these and this method queries for a default starting + * position If you know where you are starting from you should call {@link #startStreaming(Lsn, + * WalPositionLocator)}, this method delegates to that method + * + * @return + * @throws SQLException + * @throws InterruptedException + */ + @Override + public ReplicationStream startStreaming(WalPositionLocator walPosition) + throws SQLException, InterruptedException { + return startStreaming(null, walPosition); + } + + @Override + public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPosition) + throws SQLException, InterruptedException { + initConnection(); + + connect(); + if (offset == null || !offset.isValid()) { + offset = defaultStartingPos; + } + Lsn lsn = offset; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("starting streaming from LSN '{}'", lsn); + } + + final int maxRetries = connectorConfig.maxRetries(); + final Duration delay = connectorConfig.retryDelay(); + int tryCount = 0; + while (true) { + try { + return createReplicationStream(lsn, walPosition); + } catch (Exception e) { + String message = "Failed to start replication stream at " + lsn; + if (++tryCount > maxRetries) { + if (e.getMessage().matches(".*replication slot .* is active.*")) { + message += + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each."; + } + throw new DebeziumException(message, e); + } else { + LOGGER.warn( + message + ", waiting for {} ms and retrying, attempt number {} over {}", + delay, + tryCount, + maxRetries); + final Metronome metronome = Metronome.sleeper(delay, Clock.SYSTEM); + metronome.pause(); + } + } + } + } + + @Override + public void initConnection() throws SQLException, InterruptedException { + // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html + // For pgoutput specifically, the publication must be created before the slot. + initPublication(); + if (!hasInitedSlot) { + initReplicationSlot(); + } + } + + @Override + public Optional createReplicationSlot() throws SQLException { + // note that some of these options are only supported in Postgres 9.4+, additionally + // the options are not yet exported by the jdbc api wrapper, therefore, we just do + // this ourselves but eventually this should be moved back to the jdbc API + // see https://github.com/pgjdbc/pgjdbc/issues/1305 + + LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", slotName, plugin); + String tempPart = ""; + // Exported snapshots are supported in Postgres 9.4+ + boolean canExportSnapshot = pgConnection().haveMinimumServerVersion(ServerVersion.v9_4); + if ((dropSlotOnClose) && !canExportSnapshot) { + LOGGER.warn( + "A slot marked as temporary or with an exported snapshot was created, " + + "but not on a supported version of Postgres, ignoring!"); + } + if (useTemporarySlot()) { + tempPart = "TEMPORARY"; + } + + // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html + // For pgoutput specifically, the publication must be created prior to the slot. + initPublication(); + + try (Statement stmt = pgConnection().createStatement()) { + String createCommand = + String.format( + "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s", + slotName, tempPart, plugin.getPostgresPluginName()); + LOGGER.info("Creating replication slot with command {}", createCommand); + stmt.execute(createCommand); + // when we are in Postgres 9.4+, we can parse the slot creation info, + // otherwise, it returns nothing + if (canExportSnapshot) { + this.slotCreationInfo = parseSlotCreation(stmt.getResultSet()); + } + + return Optional.ofNullable(slotCreationInfo); + } + } + + protected BaseConnection pgConnection() throws SQLException { + return (BaseConnection) connection(false); + } + + private SlotCreationResult parseSlotCreation(ResultSet rs) { + try { + if (rs.next()) { + String slotName = rs.getString("slot_name"); + String startPoint = rs.getString("consistent_point"); + String snapName = rs.getString("snapshot_name"); + String pluginName = rs.getString("output_plugin"); + + return new SlotCreationResult(slotName, startPoint, snapName, pluginName); + } else { + throw new ConnectException("No replication slot found"); + } + } catch (SQLException ex) { + throw new ConnectException("Unable to parse create_replication_slot response", ex); + } + } + + private ReplicationStream createReplicationStream( + final Lsn startLsn, WalPositionLocator walPosition) + throws SQLException, InterruptedException { + PGReplicationStream s; + + try { + try { + s = + startPgReplicationStream( + startLsn, + plugin.forceRds() + ? messageDecoder::optionsWithoutMetadata + : messageDecoder::optionsWithMetadata); + messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true); + } catch (PSQLException e) { + LOGGER.debug( + "Could not register for streaming, retrying without optional options", e); + + // re-init the slot after a failed start of slot, as this + // may have closed the slot + if (useTemporarySlot()) { + initReplicationSlot(); + } + + s = + startPgReplicationStream( + startLsn, + plugin.forceRds() + ? messageDecoder::optionsWithoutMetadata + : messageDecoder::optionsWithMetadata); + messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true); + } + } catch (PSQLException e) { + if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) { + // It is possible we are connecting to an old wal2json plug-in + LOGGER.warn( + "Could not register for streaming with metadata in messages, falling back to messages without metadata"); + + // re-init the slot after a failed start of slot, as this + // may have closed the slot + if (useTemporarySlot()) { + initReplicationSlot(); + } + + s = startPgReplicationStream(startLsn, messageDecoder::optionsWithoutMetadata); + messageDecoder.setContainsMetadata(false); + } else if (e.getMessage() + .matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) { + LOGGER.error("Cannot rewind to last processed WAL position", e); + throw new ConnectException( + "The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0."); + } else { + throw e; + } + } + + final PGReplicationStream stream = s; + + return new ReplicationStream() { + + private static final int CHECK_WARNINGS_AFTER_COUNT = 100; + private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT; + private ExecutorService keepAliveExecutor = null; + private AtomicBoolean keepAliveRunning; + private final Metronome metronome = + Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM); + + // make sure this is volatile since multiple threads may be interested in this value + private volatile Lsn lastReceivedLsn; + + @Override + public void read(ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { + processWarnings(false); + ByteBuffer read = stream.read(); + final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN()); + LOGGER.trace( + "Streaming requested from LSN {}, received LSN {}", + startLsn, + lastReceiveLsn); + if (reachEnd(lastReceivedLsn)) { + lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN()); + LOGGER.trace("Received message at LSN {}", lastReceivedLsn); + processor.process(new ReplicationMessage.NoopMessage(null, null)); + return; + } + if (messageDecoder.shouldMessageBeSkipped( + read, lastReceiveLsn, startLsn, walPosition)) { + return; + } + deserializeMessages(read, processor); + } + + @Override + public boolean readPending(ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { + processWarnings(false); + ByteBuffer read = stream.readPending(); + final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN()); + LOGGER.trace( + "Streaming requested from LSN {}, received LSN {}", + startLsn, + lastReceiveLsn); + + if (reachEnd(lastReceiveLsn)) { + lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN()); + LOGGER.trace("Received message at LSN {}", lastReceivedLsn); + processor.process(new ReplicationMessage.NoopMessage(null, null)); + return true; + } + + if (read == null) { + return false; + } + + if (messageDecoder.shouldMessageBeSkipped( + read, lastReceiveLsn, startLsn, walPosition)) { + return true; + } + + deserializeMessages(read, processor); + + return true; + } + + private void deserializeMessages( + ByteBuffer buffer, ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { + lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN()); + LOGGER.trace("Received message at LSN {}", lastReceivedLsn); + messageDecoder.processMessage(buffer, processor, typeRegistry); + } + + @Override + public void close() throws SQLException { + processWarnings(true); + stream.close(); + } + + @Override + public void flushLsn(Lsn lsn) throws SQLException { + doFlushLsn(lsn); + } + + private void doFlushLsn(Lsn lsn) throws SQLException { + stream.setFlushedLSN(lsn.asLogSequenceNumber()); + stream.setAppliedLSN(lsn.asLogSequenceNumber()); + + stream.forceUpdateStatus(); + } + + @Override + public Lsn lastReceivedLsn() { + return lastReceivedLsn; + } + + @Override + public void startKeepAlive(ExecutorService service) { + if (keepAliveExecutor == null) { + keepAliveExecutor = service; + keepAliveRunning = new AtomicBoolean(true); + keepAliveExecutor.submit( + () -> { + while (keepAliveRunning.get()) { + try { + LOGGER.trace( + "Forcing status update with replication stream"); + stream.forceUpdateStatus(); + metronome.pause(); + } catch (Exception exp) { + throw new RuntimeException( + "received unexpected exception will perform keep alive", + exp); + } + } + }); + } + } + + @Override + public void stopKeepAlive() { + if (keepAliveExecutor != null) { + keepAliveRunning.set(false); + keepAliveExecutor.shutdownNow(); + keepAliveExecutor = null; + } + } + + private void processWarnings(final boolean forced) throws SQLException { + if (--warningCheckCounter == 0 || forced) { + warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT; + for (SQLWarning w = connection().getWarnings(); + w != null; + w = w.getNextWarning()) { + LOGGER.debug( + "Server-side message: '{}', state = {}, code = {}", + w.getMessage(), + w.getSQLState(), + w.getErrorCode()); + } + connection().clearWarnings(); + } + } + + @Override + public Lsn startLsn() { + return startLsn; + } + + private boolean reachEnd(Lsn receivedLsn) { + if (receivedLsn == null) { + return false; + } + return endingPos != null + && (!endingPos.isNonStopping()) + && endingPos.compareTo(receivedLsn) < 0; + } + }; + } + + public void setEndingPos(Lsn endingPos) { + this.endingPos = endingPos; + } + + private PGReplicationStream startPgReplicationStream( + final Lsn lsn, + BiFunction< + ChainedLogicalStreamBuilder, + Function, + ChainedLogicalStreamBuilder> + configurator) + throws SQLException { + assert lsn != null; + ChainedLogicalStreamBuilder streamBuilder = + pgConnection() + .getReplicationAPI() + .replicationStream() + .logical() + .withSlotName("\"" + slotName + "\"") + .withStartPosition(lsn.asLogSequenceNumber()) + .withSlotOptions(streamParams); + streamBuilder = configurator.apply(streamBuilder, this::hasMinimumVersion); + + if (statusUpdateInterval != null && statusUpdateInterval.toMillis() > 0) { + streamBuilder.withStatusInterval( + toIntExact(statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS); + } + + PGReplicationStream stream = streamBuilder.start(); + + // TODO DBZ-508 get rid of this + // Needed by tests when connections are opened and closed in a fast sequence + try { + Thread.sleep(10); + } catch (Exception e) { + } + stream.forceUpdateStatus(); + return stream; + } + + private Boolean hasMinimumVersion(int version) { + try { + return pgConnection().haveMinimumServerVersion(version); + } catch (SQLException e) { + throw new DebeziumException(e); + } + } + + @Override + public synchronized void close() { + close(true); + } + + public synchronized void close(boolean dropSlot) { + try { + LOGGER.debug("Closing message decoder"); + messageDecoder.close(); + } catch (Throwable e) { + LOGGER.error("Unexpected error while closing message decoder", e); + } + + try { + LOGGER.debug("Closing replication connection"); + super.close(); + } catch (Throwable e) { + LOGGER.error("Unexpected error while closing Postgres connection", e); + } + if (dropSlotOnClose && dropSlot) { + // we're dropping the replication slot via a regular - i.e. not a replication - + // connection + try (PostgresConnection connection = + new PostgresConnection( + connectorConfig.getJdbcConfig(), + PostgresConnection.CONNECTION_DROP_SLOT)) { + connection.dropReplicationSlot(slotName); + } catch (Throwable e) { + LOGGER.error("Unexpected error while dropping replication slot", e); + } + } + } + + @Override + public void reconnect() throws SQLException { + close(false); + // Don't re-execute initial commands on reconnection + connection(false); + } + + protected static class ReplicationConnectionBuilder implements Builder { + + private final PostgresConnectorConfig config; + private String slotName = DEFAULT_SLOT_NAME; + private String publicationName = DEFAULT_PUBLICATION_NAME; + private RelationalTableFilters tableFilter; + private PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode = + PostgresConnectorConfig.AutoCreateMode.ALL_TABLES; + private PostgresConnectorConfig.LogicalDecoder plugin = + PostgresConnectorConfig.LogicalDecoder.DECODERBUFS; + private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE; + private Duration statusUpdateIntervalVal; + private boolean doSnapshot; + private TypeRegistry typeRegistry; + private PostgresSchema schema; + private Properties slotStreamParams = new Properties(); + private PostgresConnection jdbcConnection; + + protected ReplicationConnectionBuilder(PostgresConnectorConfig config) { + assert config != null; + this.config = config; + } + + @Override + public ReplicationConnectionBuilder withSlot(final String slotName) { + assert slotName != null; + this.slotName = slotName; + return this; + } + + @Override + public Builder withPublication(String publicationName) { + assert publicationName != null; + this.publicationName = publicationName; + return this; + } + + @Override + public Builder withTableFilter(RelationalTableFilters tableFilter) { + assert tableFilter != null; + this.tableFilter = tableFilter; + return this; + } + + @Override + public Builder withPublicationAutocreateMode( + PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode) { + assert publicationName != null; + this.publicationAutocreateMode = publicationAutocreateMode; + return this; + } + + @Override + public ReplicationConnectionBuilder withPlugin( + final PostgresConnectorConfig.LogicalDecoder plugin) { + assert plugin != null; + this.plugin = plugin; + return this; + } + + @Override + public ReplicationConnectionBuilder dropSlotOnClose(final boolean dropSlotOnClose) { + this.dropSlotOnClose = dropSlotOnClose; + return this; + } + + @Override + public ReplicationConnectionBuilder streamParams(final String slotStreamParams) { + if (slotStreamParams != null && !slotStreamParams.isEmpty()) { + this.slotStreamParams = new Properties(); + String[] paramsWithValues = slotStreamParams.split(";"); + for (String paramsWithValue : paramsWithValues) { + String[] paramAndValue = paramsWithValue.split("="); + if (paramAndValue.length == 2) { + this.slotStreamParams.setProperty(paramAndValue[0], paramAndValue[1]); + } else { + LOGGER.warn( + "The following STREAM_PARAMS value is invalid: {}", + paramsWithValue); + } + } + } + return this; + } + + @Override + public ReplicationConnectionBuilder statusUpdateInterval( + final Duration statusUpdateInterval) { + this.statusUpdateIntervalVal = statusUpdateInterval; + return this; + } + + @Override + public Builder doSnapshot(boolean doSnapshot) { + this.doSnapshot = doSnapshot; + return this; + } + + @Override + public Builder jdbcMetadataConnection(PostgresConnection jdbcConnection) { + this.jdbcConnection = jdbcConnection; + return this; + } + + @Override + public ReplicationConnection build() { + assert plugin != null : "Decoding plugin name is not set"; + return new PostgresReplicationConnection( + config, + slotName, + publicationName, + tableFilter, + publicationAutocreateMode, + plugin, + dropSlotOnClose, + doSnapshot, + statusUpdateIntervalVal, + jdbcConnection, + typeRegistry, + slotStreamParams, + schema); + } + + @Override + public Builder withTypeRegistry(TypeRegistry typeRegistry) { + this.typeRegistry = typeRegistry; + return this; + } + + @Override + public Builder withSchema(PostgresSchema schema) { + this.schema = schema; + return this; + } + } +} \ No newline at end of file diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index c7effe7a967431..5a9fa095941ace 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -158,16 +158,19 @@ private PostgresSourceConfig generatePostgresConfig( String hostname = props.getProperty("PGHOST"); String port = props.getProperty("PGPORT"); - String database = props.getProperty("PGDBNAME"); + String databaseFromUrl = props.getProperty("PGDBNAME"); Preconditions.checkNotNull(hostname, "host is required"); Preconditions.checkNotNull(port, "port is required"); - Preconditions.checkNotNull(database, "database is required"); configFactory.hostname(hostname); configFactory.port(Integer.parseInt(port)); configFactory.username(cdcConfig.get(DataSourceConfigKeys.USER)); configFactory.password(cdcConfig.get(DataSourceConfigKeys.PASSWORD)); - configFactory.database(database); + + String database = cdcConfig.get(DataSourceConfigKeys.DATABASE); + String finalDatabase = StringUtils.isNotEmpty(database) ? database : databaseFromUrl; + Preconditions.checkNotNull(finalDatabase, "database is required"); + configFactory.database(finalDatabase); String schema = cdcConfig.get(DataSourceConfigKeys.SCHEMA); Preconditions.checkNotNull(schema, "schema is required"); @@ -219,6 +222,9 @@ private PostgresSourceConfig generatePostgresConfig( configFactory.heartbeatInterval( Duration.ofMillis(Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS)); + // support scan partition table + configFactory.setIncludePartitionedTables(true); + // subtaskId use pg create slot in snapshot phase, slotname is slot_name_subtaskId return configFactory.create(subtaskId); } diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out new file mode 100644 index 00000000000000..331d1fe20407ac --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_orders_partition_snapshot -- +1 1001 2024-01-10 +2 1002 2024-02-05 + +-- !select_orders_partition_binlog_all -- +2 2002 2024-02-05 +3 1003 2024-01-20 +4 1004 2024-03-15 + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy new file mode 100644 index 00000000000000..2ae0ac1f6db121 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_postgres_job_partition", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_partition_name" + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_pg_orders" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // 1. create postgres partition table and insert snapshot data + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}""" + + sql """ + CREATE TABLE ${pgSchema}.${table1} ( + id BIGINT, + user_id BIGINT, + order_date DATE, + PRIMARY KEY (id, order_date) + ) PARTITION BY RANGE (order_date) + """ + + // create two partitions: 2024-01, 2024-02 + sql """CREATE TABLE ${table1}_p202401 PARTITION OF ${pgSchema}.${table1} + FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')""" + sql """CREATE TABLE ${table1}_p202402 PARTITION OF ${pgSchema}.${table1} + FOR VALUES FROM ('2024-02-01') TO ('2024-03-01')""" + + // make snapshot data, insert into two partitions + sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date) VALUES (1, 1001, DATE '2024-01-10');""" + sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date) VALUES (2, 1002, DATE '2024-02-05');""" + } + + // 2. create streaming job + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + // wait snapshot data sync completed + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") + where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSuccendCount: " + jobSuccendCount) + jobSuccendCount.size() == 1 && '2' <= jobSuccendCount.get(0).get(0) + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + // 3. check snapshot data + qt_select_orders_partition_snapshot """ + SELECT id, user_id, order_date + FROM ${table1} + ORDER BY id + """ + + // 4. mock insert, update, delete and create new partition + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """SET search_path TO ${pgSchema}""" + + // insert + sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date) + VALUES (3, 1003, DATE '2024-01-20');""" + + // update + sql """UPDATE ${pgSchema}.${table1} + SET user_id = 2002 + WHERE id = 2 AND order_date = DATE '2024-02-05';""" + + // delete + sql """DELETE FROM ${pgSchema}.${table1} + WHERE id = 1 AND order_date = DATE '2024-01-10';""" + + // create new partition and insert data + sql """CREATE TABLE ${table1}_p202403 PARTITION OF ${pgSchema}.${table1} + FOR VALUES FROM ('2024-03-01') TO ('2024-04-01')""" + + sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date) + VALUES (4, 1004, DATE '2024-03-15');""" + } + + // wait for all incremental data + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") + where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSuccendCount: " + jobSuccendCount) + jobSuccendCount.size() == 1 && '3' <= jobSuccendCount.get(0).get(0) + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + def jobInfo = sql """ + select loadStatistic, status from jobs("type"="insert") where Name='${jobName}' + """ + log.info("jobInfo: " + jobInfo) + assert jobInfo.get(0).get(1) == "RUNNING" + + // check binlog data + qt_select_orders_partition_binlog_all """ + SELECT id, user_id, order_date + FROM ${table1} + ORDER BY id + """ + + sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """ + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy index 9c0cd6a464c8ca..5747438b717cb0 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy @@ -134,8 +134,10 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern ) // mock incremental into - connect("${newPgUser}", "${newPgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age) VALUES ('Doris',18);""" + def xminResult = sql """SELECT xmin, xmax , * FROM ${pgDB}.${pgSchema}.${tableName} WHERE name = 'Doris';""" + log.info("xminResult: " + xminResult) } Awaitility.await().atMost(300, SECONDS)