diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 0bdd62628648db..3061cb80c58c9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -348,7 +348,7 @@ public String getTimeoutReason() {
log.warn("Failed to get task timeout reason, response: {}", response);
}
} catch (ExecutionException | InterruptedException ex) {
- log.error("Send get fail reason request failed: ", ex);
+ log.error("Send get task fail reason request failed: ", ex);
}
return "";
}
diff --git a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
new file mode 100644
index 00000000000000..ac372bbb8cb24e
--- /dev/null
+++ b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
@@ -0,0 +1,932 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.postgresql.connection;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.spi.SlotCreationResult;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.JdbcConnectionException;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.Metronome;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.postgresql.core.BaseConnection;
+import org.postgresql.core.ServerVersion;
+import org.postgresql.replication.PGReplicationStream;
+import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
+import org.postgresql.util.PSQLException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.toIntExact;
+
+/**
+ * Copied from Flink Cdc 3.5.0
+ *
+ *
Line 192~199: add publish_via_partition_root for partition table.
+ */
+public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection {
+
+ private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
+
+ private final String slotName;
+ private final String publicationName;
+ private final RelationalTableFilters tableFilter;
+ private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode;
+ private final PostgresConnectorConfig.LogicalDecoder plugin;
+ private final boolean dropSlotOnClose;
+ private final PostgresConnectorConfig connectorConfig;
+ private final Duration statusUpdateInterval;
+ private final MessageDecoder messageDecoder;
+ private final PostgresConnection jdbcConnection;
+ private final TypeRegistry typeRegistry;
+ private final Properties streamParams;
+
+ private Lsn defaultStartingPos;
+ private SlotCreationResult slotCreationInfo;
+ private boolean hasInitedSlot;
+
+ private Lsn endingPos;
+
+ /**
+ * Creates a new replication connection with the given params.
+ *
+ * @param config the JDBC configuration for the connection; may not be null
+ * @param slotName the name of the DB slot for logical replication; may not be null
+ * @param publicationName the name of the DB publication for logical replication; may not be
+ * null
+ * @param tableFilter the tables to watch of the DB publication for logical replication; may not
+ * be null
+ * @param publicationAutocreateMode the mode for publication autocreation; may not be null
+ * @param plugin decoder matching the server side plug-in used for streaming changes; may not be
+ * null
+ * @param dropSlotOnClose whether the replication slot should be dropped once the connection is
+ * closed
+ * @param statusUpdateInterval the interval at which the replication connection should
+ * periodically send status
+ * @param doSnapshot whether the connector is doing snapshot
+ * @param jdbcConnection general PostgreSQL JDBC connection
+ * @param typeRegistry registry with PostgreSQL types
+ * @param streamParams additional parameters to pass to the replication stream
+ * @param schema the schema; must not be null
+ *
updates to the server
+ */
+ private PostgresReplicationConnection(
+ PostgresConnectorConfig config,
+ String slotName,
+ String publicationName,
+ RelationalTableFilters tableFilter,
+ PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
+ PostgresConnectorConfig.LogicalDecoder plugin,
+ boolean dropSlotOnClose,
+ boolean doSnapshot,
+ Duration statusUpdateInterval,
+ PostgresConnection jdbcConnection,
+ TypeRegistry typeRegistry,
+ Properties streamParams,
+ PostgresSchema schema) {
+ super(
+ addDefaultSettings(config.getJdbcConfig()),
+ PostgresConnection.FACTORY,
+ null,
+ null,
+ "\"",
+ "\"");
+
+ this.connectorConfig = config;
+ this.slotName = slotName;
+ this.publicationName = publicationName;
+ this.tableFilter = tableFilter;
+ this.publicationAutocreateMode = publicationAutocreateMode;
+ this.plugin = plugin;
+ this.dropSlotOnClose = dropSlotOnClose;
+ this.statusUpdateInterval = statusUpdateInterval;
+ this.messageDecoder =
+ plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection);
+ this.jdbcConnection = jdbcConnection;
+ this.typeRegistry = typeRegistry;
+ this.streamParams = streamParams;
+ this.slotCreationInfo = null;
+ this.hasInitedSlot = false;
+ }
+
+ private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) {
+ // first copy the parent's default settings...
+ // then set some additional replication specific settings
+ return JdbcConfiguration.adapt(
+ PostgresConnection.addDefaultSettings(
+ configuration, PostgresConnection.CONNECTION_STREAMING)
+ .edit()
+ .with("replication", "database")
+ .with(
+ "preferQueryMode",
+ "simple") // replication protocol only supports simple query mode
+ .build());
+ }
+
+ private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException {
+ try (PostgresConnection connection =
+ new PostgresConnection(
+ connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_SLOT_INFO)) {
+ return connection.readReplicationSlotInfo(slotName, plugin.getPostgresPluginName());
+ }
+ }
+
+ protected void initPublication() {
+ String createPublicationStmt;
+ String tableFilterString = null;
+ if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
+ LOGGER.info("Initializing PgOutput logical decoder publication");
+ try {
+ // Unless the autocommit is disabled the SELECT publication query will stay running
+ Connection conn = pgConnection();
+ conn.setAutoCommit(false);
+
+ String selectPublication =
+ String.format(
+ "SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'",
+ publicationName);
+ try (Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(selectPublication)) {
+ if (rs.next()) {
+ Long count = rs.getLong(1);
+ // Close eagerly as the transaction might stay running
+ if (count == 0L) {
+ LOGGER.info(
+ "Creating new publication '{}' for plugin '{}'",
+ publicationName,
+ plugin);
+ switch (publicationAutocreateMode) {
+ case DISABLED:
+ throw new ConnectException(
+ "Publication autocreation is disabled, please create one and restart the connector.");
+ case ALL_TABLES:
+ boolean supportPartitionRoot = ((BaseConnection) conn).haveMinimumServerVersion(ServerVersion.v13);
+ createPublicationStmt = supportPartitionRoot
+ ? String.format(
+ "CREATE PUBLICATION %s FOR ALL TABLES WITH (publish_via_partition_root = true);",
+ publicationName)
+ : String.format(
+ "CREATE PUBLICATION %s FOR ALL TABLES;",
+ publicationName);
+ LOGGER.info(
+ "Creating Publication with statement '{}'",
+ createPublicationStmt);
+ // Publication doesn't exist, create it.
+ stmt.execute(createPublicationStmt);
+ break;
+ case FILTERED:
+ createOrUpdatePublicationModeFilterted(
+ tableFilterString, stmt, false);
+ break;
+ }
+ } else {
+ switch (publicationAutocreateMode) {
+ case FILTERED:
+ createOrUpdatePublicationModeFilterted(
+ tableFilterString, stmt, true);
+ break;
+ default:
+ LOGGER.trace(
+ "A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server "
+ + "and will be used by the plugin",
+ publicationName,
+ plugin,
+ database());
+ }
+ }
+ }
+ }
+ conn.commit();
+ conn.setAutoCommit(true);
+ } catch (SQLException e) {
+ throw new JdbcConnectionException(e);
+ }
+ }
+ }
+
+ private void createOrUpdatePublicationModeFilterted(
+ String tableFilterString, Statement stmt, boolean isUpdate) {
+ String createOrUpdatePublicationStmt;
+ try {
+ Set tablesToCapture = determineCapturedTables();
+ tableFilterString =
+ tablesToCapture.stream()
+ .map(TableId::toDoubleQuotedString)
+ .collect(Collectors.joining(", "));
+ if (tableFilterString.isEmpty()) {
+ throw new DebeziumException(
+ String.format(
+ "No table filters found for filtered publication %s",
+ publicationName));
+ }
+ createOrUpdatePublicationStmt =
+ isUpdate
+ ? String.format(
+ "ALTER PUBLICATION %s SET TABLE %s;",
+ publicationName, tableFilterString)
+ : String.format(
+ "CREATE PUBLICATION %s FOR TABLE %s;",
+ publicationName, tableFilterString);
+ LOGGER.info(
+ isUpdate
+ ? "Updating Publication with statement '{}'"
+ : "Creating Publication with statement '{}'",
+ createOrUpdatePublicationStmt);
+ stmt.execute(createOrUpdatePublicationStmt);
+ } catch (Exception e) {
+ throw new ConnectException(
+ String.format(
+ "Unable to %s filtered publication %s for %s",
+ isUpdate ? "update" : "create", publicationName, tableFilterString),
+ e);
+ }
+ }
+
+ private Set determineCapturedTables() throws Exception {
+ Set allTableIds = jdbcConnection.getAllTableIds(connectorConfig.databaseName());
+
+ Set capturedTables = new HashSet<>();
+
+ for (TableId tableId : allTableIds) {
+ if (tableFilter.dataCollectionFilter().isIncluded(tableId)) {
+ LOGGER.trace("Adding table {} to the list of captured tables", tableId);
+ capturedTables.add(tableId);
+ } else {
+ LOGGER.trace(
+ "Ignoring table {} as it's not included in the filter configuration",
+ tableId);
+ }
+ }
+
+ return capturedTables.stream()
+ .sorted()
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
+ protected void initReplicationSlot() throws SQLException, InterruptedException {
+ ServerInfo.ReplicationSlot slotInfo = getSlotInfo();
+
+ boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo;
+ try {
+ // there's no info for this plugin and slot so create a new slot
+ if (shouldCreateSlot) {
+ this.createReplicationSlot();
+ }
+
+ // replication connection does not support parsing of SQL statements so we need to
+ // create
+ // the connection without executing on connect statements - see JDBC opt
+ // preferQueryMode=simple
+ pgConnection();
+ final String identifySystemStatement = "IDENTIFY_SYSTEM";
+ LOGGER.debug(
+ "running '{}' to validate replication connection", identifySystemStatement);
+ final Lsn xlogStart =
+ queryAndMap(
+ identifySystemStatement,
+ rs -> {
+ if (!rs.next()) {
+ throw new IllegalStateException(
+ "The DB connection is not a valid replication connection");
+ }
+ String xlogpos = rs.getString("xlogpos");
+ LOGGER.debug("received latest xlogpos '{}'", xlogpos);
+ return Lsn.valueOf(xlogpos);
+ });
+
+ if (slotCreationInfo != null) {
+ this.defaultStartingPos = slotCreationInfo.startLsn();
+ } else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
+ // this is a new slot or we weren't able to read a valid flush LSN pos, so we always
+ // start from the xlog pos that was reported
+ this.defaultStartingPos = xlogStart;
+ } else {
+ Lsn latestFlushedLsn = slotInfo.latestFlushedLsn();
+ this.defaultStartingPos =
+ latestFlushedLsn.compareTo(xlogStart) < 0 ? latestFlushedLsn : xlogStart;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("found previous flushed LSN '{}'", latestFlushedLsn);
+ }
+ }
+ hasInitedSlot = true;
+ } catch (SQLException e) {
+ throw new JdbcConnectionException(e);
+ }
+ }
+
+ // Temporary replication slots is a new feature of PostgreSQL 10
+ private boolean useTemporarySlot() throws SQLException {
+ // Temporary replication slots cannot be used due to connection restart
+ // when finding WAL position
+ // return dropSlotOnClose && pgConnection().haveMinimumServerVersion(ServerVersion.v10);
+ return false;
+ }
+
+ /**
+ * creating a replication connection and starting to stream involves a few steps: 1. we create
+ * the connection and ensure that a. the slot exists b. the slot isn't currently being used 2.
+ * we query to get our potential start position in the slot (lsn) 3. we try and start streaming,
+ * depending on our options (such as in wal2json) this may fail, which can result in the
+ * connection being killed and we need to start the process over if we are using a temporary
+ * slot 4. actually start the streamer
+ *
+ * This method takes care of all of these and this method queries for a default starting
+ * position If you know where you are starting from you should call {@link #startStreaming(Lsn,
+ * WalPositionLocator)}, this method delegates to that method
+ *
+ * @return
+ * @throws SQLException
+ * @throws InterruptedException
+ */
+ @Override
+ public ReplicationStream startStreaming(WalPositionLocator walPosition)
+ throws SQLException, InterruptedException {
+ return startStreaming(null, walPosition);
+ }
+
+ @Override
+ public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPosition)
+ throws SQLException, InterruptedException {
+ initConnection();
+
+ connect();
+ if (offset == null || !offset.isValid()) {
+ offset = defaultStartingPos;
+ }
+ Lsn lsn = offset;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("starting streaming from LSN '{}'", lsn);
+ }
+
+ final int maxRetries = connectorConfig.maxRetries();
+ final Duration delay = connectorConfig.retryDelay();
+ int tryCount = 0;
+ while (true) {
+ try {
+ return createReplicationStream(lsn, walPosition);
+ } catch (Exception e) {
+ String message = "Failed to start replication stream at " + lsn;
+ if (++tryCount > maxRetries) {
+ if (e.getMessage().matches(".*replication slot .* is active.*")) {
+ message +=
+ "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
+ }
+ throw new DebeziumException(message, e);
+ } else {
+ LOGGER.warn(
+ message + ", waiting for {} ms and retrying, attempt number {} over {}",
+ delay,
+ tryCount,
+ maxRetries);
+ final Metronome metronome = Metronome.sleeper(delay, Clock.SYSTEM);
+ metronome.pause();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void initConnection() throws SQLException, InterruptedException {
+ // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
+ // For pgoutput specifically, the publication must be created before the slot.
+ initPublication();
+ if (!hasInitedSlot) {
+ initReplicationSlot();
+ }
+ }
+
+ @Override
+ public Optional createReplicationSlot() throws SQLException {
+ // note that some of these options are only supported in Postgres 9.4+, additionally
+ // the options are not yet exported by the jdbc api wrapper, therefore, we just do
+ // this ourselves but eventually this should be moved back to the jdbc API
+ // see https://github.com/pgjdbc/pgjdbc/issues/1305
+
+ LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", slotName, plugin);
+ String tempPart = "";
+ // Exported snapshots are supported in Postgres 9.4+
+ boolean canExportSnapshot = pgConnection().haveMinimumServerVersion(ServerVersion.v9_4);
+ if ((dropSlotOnClose) && !canExportSnapshot) {
+ LOGGER.warn(
+ "A slot marked as temporary or with an exported snapshot was created, "
+ + "but not on a supported version of Postgres, ignoring!");
+ }
+ if (useTemporarySlot()) {
+ tempPart = "TEMPORARY";
+ }
+
+ // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
+ // For pgoutput specifically, the publication must be created prior to the slot.
+ initPublication();
+
+ try (Statement stmt = pgConnection().createStatement()) {
+ String createCommand =
+ String.format(
+ "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s",
+ slotName, tempPart, plugin.getPostgresPluginName());
+ LOGGER.info("Creating replication slot with command {}", createCommand);
+ stmt.execute(createCommand);
+ // when we are in Postgres 9.4+, we can parse the slot creation info,
+ // otherwise, it returns nothing
+ if (canExportSnapshot) {
+ this.slotCreationInfo = parseSlotCreation(stmt.getResultSet());
+ }
+
+ return Optional.ofNullable(slotCreationInfo);
+ }
+ }
+
+ protected BaseConnection pgConnection() throws SQLException {
+ return (BaseConnection) connection(false);
+ }
+
+ private SlotCreationResult parseSlotCreation(ResultSet rs) {
+ try {
+ if (rs.next()) {
+ String slotName = rs.getString("slot_name");
+ String startPoint = rs.getString("consistent_point");
+ String snapName = rs.getString("snapshot_name");
+ String pluginName = rs.getString("output_plugin");
+
+ return new SlotCreationResult(slotName, startPoint, snapName, pluginName);
+ } else {
+ throw new ConnectException("No replication slot found");
+ }
+ } catch (SQLException ex) {
+ throw new ConnectException("Unable to parse create_replication_slot response", ex);
+ }
+ }
+
+ private ReplicationStream createReplicationStream(
+ final Lsn startLsn, WalPositionLocator walPosition)
+ throws SQLException, InterruptedException {
+ PGReplicationStream s;
+
+ try {
+ try {
+ s =
+ startPgReplicationStream(
+ startLsn,
+ plugin.forceRds()
+ ? messageDecoder::optionsWithoutMetadata
+ : messageDecoder::optionsWithMetadata);
+ messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
+ } catch (PSQLException e) {
+ LOGGER.debug(
+ "Could not register for streaming, retrying without optional options", e);
+
+ // re-init the slot after a failed start of slot, as this
+ // may have closed the slot
+ if (useTemporarySlot()) {
+ initReplicationSlot();
+ }
+
+ s =
+ startPgReplicationStream(
+ startLsn,
+ plugin.forceRds()
+ ? messageDecoder::optionsWithoutMetadata
+ : messageDecoder::optionsWithMetadata);
+ messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
+ }
+ } catch (PSQLException e) {
+ if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
+ // It is possible we are connecting to an old wal2json plug-in
+ LOGGER.warn(
+ "Could not register for streaming with metadata in messages, falling back to messages without metadata");
+
+ // re-init the slot after a failed start of slot, as this
+ // may have closed the slot
+ if (useTemporarySlot()) {
+ initReplicationSlot();
+ }
+
+ s = startPgReplicationStream(startLsn, messageDecoder::optionsWithoutMetadata);
+ messageDecoder.setContainsMetadata(false);
+ } else if (e.getMessage()
+ .matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) {
+ LOGGER.error("Cannot rewind to last processed WAL position", e);
+ throw new ConnectException(
+ "The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0.");
+ } else {
+ throw e;
+ }
+ }
+
+ final PGReplicationStream stream = s;
+
+ return new ReplicationStream() {
+
+ private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
+ private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
+ private ExecutorService keepAliveExecutor = null;
+ private AtomicBoolean keepAliveRunning;
+ private final Metronome metronome =
+ Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM);
+
+ // make sure this is volatile since multiple threads may be interested in this value
+ private volatile Lsn lastReceivedLsn;
+
+ @Override
+ public void read(ReplicationMessageProcessor processor)
+ throws SQLException, InterruptedException {
+ processWarnings(false);
+ ByteBuffer read = stream.read();
+ final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace(
+ "Streaming requested from LSN {}, received LSN {}",
+ startLsn,
+ lastReceiveLsn);
+ if (reachEnd(lastReceivedLsn)) {
+ lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
+ processor.process(new ReplicationMessage.NoopMessage(null, null));
+ return;
+ }
+ if (messageDecoder.shouldMessageBeSkipped(
+ read, lastReceiveLsn, startLsn, walPosition)) {
+ return;
+ }
+ deserializeMessages(read, processor);
+ }
+
+ @Override
+ public boolean readPending(ReplicationMessageProcessor processor)
+ throws SQLException, InterruptedException {
+ processWarnings(false);
+ ByteBuffer read = stream.readPending();
+ final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace(
+ "Streaming requested from LSN {}, received LSN {}",
+ startLsn,
+ lastReceiveLsn);
+
+ if (reachEnd(lastReceiveLsn)) {
+ lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
+ processor.process(new ReplicationMessage.NoopMessage(null, null));
+ return true;
+ }
+
+ if (read == null) {
+ return false;
+ }
+
+ if (messageDecoder.shouldMessageBeSkipped(
+ read, lastReceiveLsn, startLsn, walPosition)) {
+ return true;
+ }
+
+ deserializeMessages(read, processor);
+
+ return true;
+ }
+
+ private void deserializeMessages(
+ ByteBuffer buffer, ReplicationMessageProcessor processor)
+ throws SQLException, InterruptedException {
+ lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
+ LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
+ messageDecoder.processMessage(buffer, processor, typeRegistry);
+ }
+
+ @Override
+ public void close() throws SQLException {
+ processWarnings(true);
+ stream.close();
+ }
+
+ @Override
+ public void flushLsn(Lsn lsn) throws SQLException {
+ doFlushLsn(lsn);
+ }
+
+ private void doFlushLsn(Lsn lsn) throws SQLException {
+ stream.setFlushedLSN(lsn.asLogSequenceNumber());
+ stream.setAppliedLSN(lsn.asLogSequenceNumber());
+
+ stream.forceUpdateStatus();
+ }
+
+ @Override
+ public Lsn lastReceivedLsn() {
+ return lastReceivedLsn;
+ }
+
+ @Override
+ public void startKeepAlive(ExecutorService service) {
+ if (keepAliveExecutor == null) {
+ keepAliveExecutor = service;
+ keepAliveRunning = new AtomicBoolean(true);
+ keepAliveExecutor.submit(
+ () -> {
+ while (keepAliveRunning.get()) {
+ try {
+ LOGGER.trace(
+ "Forcing status update with replication stream");
+ stream.forceUpdateStatus();
+ metronome.pause();
+ } catch (Exception exp) {
+ throw new RuntimeException(
+ "received unexpected exception will perform keep alive",
+ exp);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void stopKeepAlive() {
+ if (keepAliveExecutor != null) {
+ keepAliveRunning.set(false);
+ keepAliveExecutor.shutdownNow();
+ keepAliveExecutor = null;
+ }
+ }
+
+ private void processWarnings(final boolean forced) throws SQLException {
+ if (--warningCheckCounter == 0 || forced) {
+ warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
+ for (SQLWarning w = connection().getWarnings();
+ w != null;
+ w = w.getNextWarning()) {
+ LOGGER.debug(
+ "Server-side message: '{}', state = {}, code = {}",
+ w.getMessage(),
+ w.getSQLState(),
+ w.getErrorCode());
+ }
+ connection().clearWarnings();
+ }
+ }
+
+ @Override
+ public Lsn startLsn() {
+ return startLsn;
+ }
+
+ private boolean reachEnd(Lsn receivedLsn) {
+ if (receivedLsn == null) {
+ return false;
+ }
+ return endingPos != null
+ && (!endingPos.isNonStopping())
+ && endingPos.compareTo(receivedLsn) < 0;
+ }
+ };
+ }
+
+ public void setEndingPos(Lsn endingPos) {
+ this.endingPos = endingPos;
+ }
+
+ private PGReplicationStream startPgReplicationStream(
+ final Lsn lsn,
+ BiFunction<
+ ChainedLogicalStreamBuilder,
+ Function,
+ ChainedLogicalStreamBuilder>
+ configurator)
+ throws SQLException {
+ assert lsn != null;
+ ChainedLogicalStreamBuilder streamBuilder =
+ pgConnection()
+ .getReplicationAPI()
+ .replicationStream()
+ .logical()
+ .withSlotName("\"" + slotName + "\"")
+ .withStartPosition(lsn.asLogSequenceNumber())
+ .withSlotOptions(streamParams);
+ streamBuilder = configurator.apply(streamBuilder, this::hasMinimumVersion);
+
+ if (statusUpdateInterval != null && statusUpdateInterval.toMillis() > 0) {
+ streamBuilder.withStatusInterval(
+ toIntExact(statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS);
+ }
+
+ PGReplicationStream stream = streamBuilder.start();
+
+ // TODO DBZ-508 get rid of this
+ // Needed by tests when connections are opened and closed in a fast sequence
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+ }
+ stream.forceUpdateStatus();
+ return stream;
+ }
+
+ private Boolean hasMinimumVersion(int version) {
+ try {
+ return pgConnection().haveMinimumServerVersion(version);
+ } catch (SQLException e) {
+ throw new DebeziumException(e);
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ close(true);
+ }
+
+ public synchronized void close(boolean dropSlot) {
+ try {
+ LOGGER.debug("Closing message decoder");
+ messageDecoder.close();
+ } catch (Throwable e) {
+ LOGGER.error("Unexpected error while closing message decoder", e);
+ }
+
+ try {
+ LOGGER.debug("Closing replication connection");
+ super.close();
+ } catch (Throwable e) {
+ LOGGER.error("Unexpected error while closing Postgres connection", e);
+ }
+ if (dropSlotOnClose && dropSlot) {
+ // we're dropping the replication slot via a regular - i.e. not a replication -
+ // connection
+ try (PostgresConnection connection =
+ new PostgresConnection(
+ connectorConfig.getJdbcConfig(),
+ PostgresConnection.CONNECTION_DROP_SLOT)) {
+ connection.dropReplicationSlot(slotName);
+ } catch (Throwable e) {
+ LOGGER.error("Unexpected error while dropping replication slot", e);
+ }
+ }
+ }
+
+ @Override
+ public void reconnect() throws SQLException {
+ close(false);
+ // Don't re-execute initial commands on reconnection
+ connection(false);
+ }
+
+ protected static class ReplicationConnectionBuilder implements Builder {
+
+ private final PostgresConnectorConfig config;
+ private String slotName = DEFAULT_SLOT_NAME;
+ private String publicationName = DEFAULT_PUBLICATION_NAME;
+ private RelationalTableFilters tableFilter;
+ private PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode =
+ PostgresConnectorConfig.AutoCreateMode.ALL_TABLES;
+ private PostgresConnectorConfig.LogicalDecoder plugin =
+ PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
+ private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
+ private Duration statusUpdateIntervalVal;
+ private boolean doSnapshot;
+ private TypeRegistry typeRegistry;
+ private PostgresSchema schema;
+ private Properties slotStreamParams = new Properties();
+ private PostgresConnection jdbcConnection;
+
+ protected ReplicationConnectionBuilder(PostgresConnectorConfig config) {
+ assert config != null;
+ this.config = config;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder withSlot(final String slotName) {
+ assert slotName != null;
+ this.slotName = slotName;
+ return this;
+ }
+
+ @Override
+ public Builder withPublication(String publicationName) {
+ assert publicationName != null;
+ this.publicationName = publicationName;
+ return this;
+ }
+
+ @Override
+ public Builder withTableFilter(RelationalTableFilters tableFilter) {
+ assert tableFilter != null;
+ this.tableFilter = tableFilter;
+ return this;
+ }
+
+ @Override
+ public Builder withPublicationAutocreateMode(
+ PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode) {
+ assert publicationName != null;
+ this.publicationAutocreateMode = publicationAutocreateMode;
+ return this;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder withPlugin(
+ final PostgresConnectorConfig.LogicalDecoder plugin) {
+ assert plugin != null;
+ this.plugin = plugin;
+ return this;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder dropSlotOnClose(final boolean dropSlotOnClose) {
+ this.dropSlotOnClose = dropSlotOnClose;
+ return this;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder streamParams(final String slotStreamParams) {
+ if (slotStreamParams != null && !slotStreamParams.isEmpty()) {
+ this.slotStreamParams = new Properties();
+ String[] paramsWithValues = slotStreamParams.split(";");
+ for (String paramsWithValue : paramsWithValues) {
+ String[] paramAndValue = paramsWithValue.split("=");
+ if (paramAndValue.length == 2) {
+ this.slotStreamParams.setProperty(paramAndValue[0], paramAndValue[1]);
+ } else {
+ LOGGER.warn(
+ "The following STREAM_PARAMS value is invalid: {}",
+ paramsWithValue);
+ }
+ }
+ }
+ return this;
+ }
+
+ @Override
+ public ReplicationConnectionBuilder statusUpdateInterval(
+ final Duration statusUpdateInterval) {
+ this.statusUpdateIntervalVal = statusUpdateInterval;
+ return this;
+ }
+
+ @Override
+ public Builder doSnapshot(boolean doSnapshot) {
+ this.doSnapshot = doSnapshot;
+ return this;
+ }
+
+ @Override
+ public Builder jdbcMetadataConnection(PostgresConnection jdbcConnection) {
+ this.jdbcConnection = jdbcConnection;
+ return this;
+ }
+
+ @Override
+ public ReplicationConnection build() {
+ assert plugin != null : "Decoding plugin name is not set";
+ return new PostgresReplicationConnection(
+ config,
+ slotName,
+ publicationName,
+ tableFilter,
+ publicationAutocreateMode,
+ plugin,
+ dropSlotOnClose,
+ doSnapshot,
+ statusUpdateIntervalVal,
+ jdbcConnection,
+ typeRegistry,
+ slotStreamParams,
+ schema);
+ }
+
+ @Override
+ public Builder withTypeRegistry(TypeRegistry typeRegistry) {
+ this.typeRegistry = typeRegistry;
+ return this;
+ }
+
+ @Override
+ public Builder withSchema(PostgresSchema schema) {
+ this.schema = schema;
+ return this;
+ }
+ }
+}
\ No newline at end of file
diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index c7effe7a967431..5a9fa095941ace 100644
--- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -158,16 +158,19 @@ private PostgresSourceConfig generatePostgresConfig(
String hostname = props.getProperty("PGHOST");
String port = props.getProperty("PGPORT");
- String database = props.getProperty("PGDBNAME");
+ String databaseFromUrl = props.getProperty("PGDBNAME");
Preconditions.checkNotNull(hostname, "host is required");
Preconditions.checkNotNull(port, "port is required");
- Preconditions.checkNotNull(database, "database is required");
configFactory.hostname(hostname);
configFactory.port(Integer.parseInt(port));
configFactory.username(cdcConfig.get(DataSourceConfigKeys.USER));
configFactory.password(cdcConfig.get(DataSourceConfigKeys.PASSWORD));
- configFactory.database(database);
+
+ String database = cdcConfig.get(DataSourceConfigKeys.DATABASE);
+ String finalDatabase = StringUtils.isNotEmpty(database) ? database : databaseFromUrl;
+ Preconditions.checkNotNull(finalDatabase, "database is required");
+ configFactory.database(finalDatabase);
String schema = cdcConfig.get(DataSourceConfigKeys.SCHEMA);
Preconditions.checkNotNull(schema, "schema is required");
@@ -219,6 +222,9 @@ private PostgresSourceConfig generatePostgresConfig(
configFactory.heartbeatInterval(
Duration.ofMillis(Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS));
+ // support scan partition table
+ configFactory.setIncludePartitionedTables(true);
+
// subtaskId use pg create slot in snapshot phase, slotname is slot_name_subtaskId
return configFactory.create(subtaskId);
}
diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out
new file mode 100644
index 00000000000000..331d1fe20407ac
--- /dev/null
+++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !select_orders_partition_snapshot --
+1 1001 2024-01-10
+2 1002 2024-02-05
+
+-- !select_orders_partition_binlog_all --
+2 2002 2024-02-05
+3 1003 2024-01-20
+4 1004 2024-03-15
+
diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
new file mode 100644
index 00000000000000..2ae0ac1f6db121
--- /dev/null
+++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_partition.groovy
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_partition", "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_postgres_job_partition_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_pg_orders"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ sql """drop table if exists ${currentDb}.${table1} force"""
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // 1. create postgres partition table and insert snapshot data
+ connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}"""
+
+ sql """
+ CREATE TABLE ${pgSchema}.${table1} (
+ id BIGINT,
+ user_id BIGINT,
+ order_date DATE,
+ PRIMARY KEY (id, order_date)
+ ) PARTITION BY RANGE (order_date)
+ """
+
+ // create two partitions: 2024-01, 2024-02
+ sql """CREATE TABLE ${table1}_p202401 PARTITION OF ${pgSchema}.${table1}
+ FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')"""
+ sql """CREATE TABLE ${table1}_p202402 PARTITION OF ${pgSchema}.${table1}
+ FOR VALUES FROM ('2024-02-01') TO ('2024-03-01')"""
+
+ // make snapshot data, insert into two partitions
+ sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date) VALUES (1, 1001, DATE '2024-01-10');"""
+ sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date) VALUES (2, 1002, DATE '2024-02-05');"""
+ }
+
+ // 2. create streaming job
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1}",
+ "offset" = "initial"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // wait snapshot data sync completed
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert")
+ where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ jobSuccendCount.size() == 1 && '2' <= jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ // 3. check snapshot data
+ qt_select_orders_partition_snapshot """
+ SELECT id, user_id, order_date
+ FROM ${table1}
+ ORDER BY id
+ """
+
+ // 4. mock insert, update, delete and create new partition
+ connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ sql """SET search_path TO ${pgSchema}"""
+
+ // insert
+ sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date)
+ VALUES (3, 1003, DATE '2024-01-20');"""
+
+ // update
+ sql """UPDATE ${pgSchema}.${table1}
+ SET user_id = 2002
+ WHERE id = 2 AND order_date = DATE '2024-02-05';"""
+
+ // delete
+ sql """DELETE FROM ${pgSchema}.${table1}
+ WHERE id = 1 AND order_date = DATE '2024-01-10';"""
+
+ // create new partition and insert data
+ sql """CREATE TABLE ${table1}_p202403 PARTITION OF ${pgSchema}.${table1}
+ FOR VALUES FROM ('2024-03-01') TO ('2024-04-01')"""
+
+ sql """INSERT INTO ${pgSchema}.${table1} (id, user_id, order_date)
+ VALUES (4, 1004, DATE '2024-03-15');"""
+ }
+
+ // wait for all incremental data
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert")
+ where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ jobSuccendCount.size() == 1 && '3' <= jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobInfo = sql """
+ select loadStatistic, status from jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ assert jobInfo.get(0).get(1) == "RUNNING"
+
+ // check binlog data
+ qt_select_orders_partition_binlog_all """
+ SELECT id, user_id, order_date
+ FROM ${table1}
+ ORDER BY id
+ """
+
+ sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
+
diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
index 9c0cd6a464c8ca..5747438b717cb0 100644
--- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
+++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
@@ -134,8 +134,10 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern
)
// mock incremental into
- connect("${newPgUser}", "${newPgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age) VALUES ('Doris',18);"""
+ def xminResult = sql """SELECT xmin, xmax , * FROM ${pgDB}.${pgSchema}.${tableName} WHERE name = 'Doris';"""
+ log.info("xminResult: " + xminResult)
}
Awaitility.await().atMost(300, SECONDS)