diff --git a/docs/en/engines/table-engines/mergetree-family/part_export.md b/docs/en/engines/table-engines/mergetree-family/part_export.md index 1ef6c437585c..09f59e37075a 100644 --- a/docs/en/engines/table-engines/mergetree-family/part_export.md +++ b/docs/en/engines/table-engines/mergetree-family/part_export.md @@ -60,6 +60,24 @@ Source and destination tables must be 100% compatible: - **Default**: `0` - **Description**: Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit. This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. Using this might break idempotency, use it with care. +### export_merge_tree_part_throw_on_pending_mutations + +- **Type**: `bool` +- **Default**: `true` +- **Description**: If set to true, throws if pending mutations exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables. + +### export_merge_tree_part_throw_on_pending_patch_parts + +- **Type**: `bool` +- **Default**: `true` +- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables. + +### export_merge_tree_part_allow_outdated_parts + +- **Type**: `bool` +- **Default**: `false` +- **Description**: Allows outdated parts to be exported. By default, only `ACTIVE` parts can be exported. + ## Examples ### Basic Export to S3 diff --git a/docs/en/engines/table-engines/mergetree-family/partition_export.md b/docs/en/engines/table-engines/mergetree-family/partition_export.md index 1b91cf9bdeb9..d91f226dbbf6 100644 --- a/docs/en/engines/table-engines/mergetree-family/partition_export.md +++ b/docs/en/engines/table-engines/mergetree-family/partition_export.md @@ -70,6 +70,18 @@ TO TABLE [destination_database.]destination_table - `error` - Throw an error if the file already exists - `overwrite` - Overwrite the file +### export_merge_tree_part_throw_on_pending_mutations + +- **Type**: `bool` +- **Default**: `true` +- **Description**: If set to true, throws if pending mutations exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables. + +### export_merge_tree_part_throw_on_pending_patch_parts + +- **Type**: `bool` +- **Default**: `true` +- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables. + ## Examples ### Basic Export to S3 diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index eb5b64f2f328..e1e4d26a3e11 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -648,6 +648,7 @@ M(1002, UNKNOWN_EXCEPTION) \ M(1003, SSH_EXCEPTION) \ M(1004, STARTUP_SCRIPTS_ERROR) \ + M(1005, PENDING_MUTATIONS_NOT_ALLOWED) \ /* See END */ #ifdef APPLY_FOR_EXTERNAL_ERROR_CODES @@ -664,7 +665,7 @@ namespace ErrorCodes APPLY_FOR_ERROR_CODES(M) #undef M - constexpr ErrorCode END = 1004; + constexpr ErrorCode END = 1005; ErrorPairHolder values[END + 1]{}; struct ErrorCodesNames diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 9977550b19de..25939ad2f9d3 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6920,6 +6920,15 @@ This is not a hard limit, and it highly depends on the output format granularity DECLARE(UInt64, export_merge_tree_part_max_rows_per_file, 0, R"( Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit. This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. +)", 0) \ + DECLARE(Bool, export_merge_tree_part_throw_on_pending_mutations, true, R"( +Throw an error if there are pending mutations when exporting a merge tree part. +)", 0) \ + DECLARE(Bool, export_merge_tree_part_throw_on_pending_patch_parts, true, R"( +Throw an error if there are pending patch parts when exporting a merge tree part. +)", 0) \ + DECLARE(Bool, export_merge_tree_part_allow_outdated_parts, false, R"( +Allow exporting parts in the outdated state. )", 0) \ DECLARE(Bool, serialize_string_in_memory_with_zero_byte, true, R"( Serialize String values during aggregation with zero byte at the end. Enable to keep compatibility when querying cluster of incompatible versions. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 3d57025ddd76..032a0f3a2dbd 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -64,6 +64,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, {"cluster_table_function_split_granularity", "file", "file", "New setting."}, {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."}, + {"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."}, + {"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."}, + {"export_merge_tree_part_allow_outdated_parts", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index a71cf6ae0e45..eb5597eddd60 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -30,6 +30,14 @@ namespace context_copy->setSetting("export_merge_tree_part_file_already_exists_policy", String(magic_enum::enum_name(manifest.file_already_exists_policy))); context_copy->setSetting("export_merge_tree_part_max_bytes_per_file", manifest.max_bytes_per_file); context_copy->setSetting("export_merge_tree_part_max_rows_per_file", manifest.max_rows_per_file); + + /// always allow exporting outdated parts because the parts have been validated when the query was processed + context_copy->setSetting("export_merge_tree_part_allow_outdated_parts", true); + + /// always skip pending mutations and patch parts because we already validated the parts during query processing + context_copy->setSetting("export_merge_tree_part_throw_on_pending_mutations", false); + context_copy->setSetting("export_merge_tree_part_throw_on_pending_patch_parts", false); + return context_copy; } } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index e90a6e3ffc0b..93685a0bd82e 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -217,6 +217,9 @@ namespace Setting extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy; extern const SettingsBool output_format_parallel_formatting; extern const SettingsBool output_format_parquet_parallel_encoding; + extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations; + extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts; + extern const SettingsBool export_merge_tree_part_allow_outdated_parts; } namespace MergeTreeSetting @@ -336,6 +339,7 @@ namespace ErrorCodes extern const int TOO_LARGE_LIGHTWEIGHT_UPDATES; extern const int UNKNOWN_TABLE; extern const int FILE_ALREADY_EXISTS; + extern const int PENDING_MUTATIONS_NOT_ALLOWED; } static void checkSuspiciousIndices(const ASTFunction * index_function) @@ -6254,6 +6258,45 @@ void MergeTreeData::exportPartToTable( throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No such data part '{}' to export in table '{}'", part_name, getStorageID().getFullTableName()); + const bool allow_outdated_parts = query_context->getSettingsRef()[Setting::export_merge_tree_part_allow_outdated_parts]; + + if (part->getState() == MergeTreeDataPartState::Outdated && !allow_outdated_parts) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Part {} is in the outdated state and cannot be exported. Set `export_merge_tree_part_allow_outdated_parts` to true to allow exporting outdated parts", + part_name); + + const bool throw_on_pending_mutations = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_mutations]; + const bool throw_on_pending_patch_parts = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_patch_parts]; + + MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params + { + .metadata_version = source_metadata_ptr->getMetadataVersion(), + .min_part_metadata_version = part->getMetadataVersion(), + .need_data_mutations = throw_on_pending_mutations, + .need_alter_mutations = throw_on_pending_mutations || throw_on_pending_patch_parts, + .need_patch_parts = throw_on_pending_patch_parts, + }; + + const auto mutations_snapshot = getMutationsSnapshot(mutations_snapshot_params); + + const auto alter_conversions = getAlterConversionsForPart(part, mutations_snapshot, query_context); + + /// re-check `throw_on_pending_mutations` because `pending_mutations` might have been filled due to `throw_on_pending_patch_parts` + if (throw_on_pending_mutations && alter_conversions->hasMutations()) + { + throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED, + "Part {} can not be exported because there are pending mutations. Either wait for the mutations to be applied or set `export_merge_tree_part_throw_on_pending_mutations` to false", + part_name); + } + + if (alter_conversions->hasPatches()) + { + throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED, + "Part {} can not be exported because there are pending patch parts. Either wait for the patch parts to be applied or set `export_merge_tree_part_throw_on_pending_patch_parts` to false", + part_name); + } + { const auto format_settings = getFormatSettings(query_context); MergeTreePartExportManifest manifest( diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 0e6b54f06f86..748fa7eb0cf8 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -202,6 +202,8 @@ namespace Setting extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy; extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file; extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file; + extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations; + extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts; } namespace MergeTreeSetting @@ -305,6 +307,7 @@ namespace ErrorCodes extern const int CANNOT_FORGET_PARTITION; extern const int TIMEOUT_EXCEEDED; extern const int INVALID_SETTING_VALUE; + extern const int PENDING_MUTATIONS_NOT_ALLOWED; } namespace ServerSetting @@ -8192,18 +8195,54 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & ops.emplace_back(zkutil::makeCreateRequest(partition_exports_path, "", zkutil::CreateMode::Persistent)); - auto data_parts_lock = lockParts(); + DataPartsVector parts; - const auto parts = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id, &data_parts_lock); + { + auto data_parts_lock = lockParts(); + parts = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id, &data_parts_lock); + } if (parts.empty()) { throw Exception(ErrorCodes::BAD_ARGUMENTS, "Partition {} doesn't exist", partition_id); } + const bool throw_on_pending_mutations = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_mutations]; + const bool throw_on_pending_patch_parts = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_patch_parts]; + + MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params + { + .metadata_version = getInMemoryMetadataPtr()->getMetadataVersion(), + .min_part_metadata_version = MergeTreeData::getMinMetadataVersion(parts), + .need_data_mutations = throw_on_pending_mutations, + .need_alter_mutations = throw_on_pending_mutations || throw_on_pending_patch_parts, + .need_patch_parts = throw_on_pending_patch_parts, + }; + + const auto mutations_snapshot = getMutationsSnapshot(mutations_snapshot_params); + std::vector part_names; for (const auto & part : parts) { + const auto alter_conversions = getAlterConversionsForPart(part, mutations_snapshot, query_context); + + /// re-check `throw_on_pending_mutations` because `pending_mutations` might have been filled due to `throw_on_pending_patch_parts` + if (alter_conversions->hasMutations() && throw_on_pending_mutations) + { + throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED, + "Partition {} can not be exported because the part {} has pending mutations. Either wait for the mutations to be applied or set `export_merge_tree_part_throw_on_pending_mutations` to false", + partition_id, + part->name); + } + + if (alter_conversions->hasPatches()) + { + throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED, + "Partition {} can not be exported because the part {} has pending patch parts. Either wait for the patch parts to be applied or set `export_merge_tree_part_throw_on_pending_patch_parts` to false", + partition_id, + part->name); + } + part_names.push_back(part->name); } diff --git a/tests/integration/test_export_merge_tree_part_to_object_storage/test.py b/tests/integration/test_export_merge_tree_part_to_object_storage/test.py index ce6b23bf4231..71c9108470a4 100644 --- a/tests/integration/test_export_merge_tree_part_to_object_storage/test.py +++ b/tests/integration/test_export_merge_tree_part_to_object_storage/test.py @@ -30,7 +30,7 @@ def create_s3_table(node, s3_table): def create_tables_and_insert_data(node, mt_table, s3_table): - node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()") + node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple() SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1") node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)") create_s3_table(node, s3_table) @@ -129,3 +129,146 @@ def test_add_column_during_export(cluster): time.sleep(5) assert node.query(f"SELECT count() FROM {s3_table} WHERE id >= 0") == '3\n', "Export did not preserve snapshot at start time after source mutation" assert "Unknown expression identifier `id2`" in node.query_and_get_error(f"SELECT id2 FROM {s3_table}"), "Column id2 is present in the exported data" + + +def test_pending_mutations_throw_before_export(cluster): + """Test that pending mutations before export throw an error with default settings.""" + node = cluster.instances["node1"] + + mt_table = "pending_mutations_throw_mt_table" + s3_table = "pending_mutations_throw_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table) + + node.query(f"SYSTEM STOP MERGES {mt_table}") + + node.query(f"ALTER TABLE {mt_table} UPDATE id = id + 100 WHERE year = 2020") + + mutations = node.query(f"SELECT count() FROM system.mutations WHERE table = '{mt_table}' AND is_done = 0") + assert mutations.strip() != '0', "Mutation should be pending" + + error = node.query_and_get_error( + f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table} SETTINGS export_merge_tree_part_throw_on_pending_mutations=true" + ) + + assert "PENDING_MUTATIONS_NOT_ALLOWED" in error, f"Expected error about pending mutations, got: {error}" + + +def test_pending_mutations_skip_before_export(cluster): + """Test that pending mutations before export are skipped with throw_on_pending_mutations=false.""" + node = cluster.instances["node1"] + + mt_table = "pending_mutations_skip_mt_table" + s3_table = "pending_mutations_skip_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table) + + node.query(f"SYSTEM STOP MERGES {mt_table}") + + node.query(f"ALTER TABLE {mt_table} UPDATE id = id + 100 WHERE year = 2020") + + mutations = node.query(f"SELECT count() FROM system.mutations WHERE table = '{mt_table}' AND is_done = 0") + assert mutations.strip() != '0', "Mutation should be pending" + + node.query( + f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_throw_on_pending_mutations=false" + ) + + time.sleep(5) + + result = node.query(f"SELECT id FROM {s3_table} WHERE year = 2020 ORDER BY id") + assert "101" not in result and "102" not in result and "103" not in result, \ + "Export should contain original data before mutation" + assert "1\n2\n3" in result, "Export should contain original data" + + +def test_data_mutations_after_export_started(cluster): + """Test that mutations applied after export starts don't affect the exported data.""" + node = cluster.instances["node1"] + + mt_table = "mutations_after_export_mt_table" + s3_table = "mutations_after_export_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table) + + # Block traffic to MinIO to delay export + minio_ip = cluster.minio_ip + minio_port = cluster.minio_port + + with PartitionManager() as pm: + pm_rule_reject_responses = { + "destination": node.ip_address, + "source_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + } + pm._add_rule(pm_rule_reject_responses) + + pm_rule_reject_requests = { + "destination": minio_ip, + "destination_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + } + pm._add_rule(pm_rule_reject_requests) + + node.query( + f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_throw_on_pending_mutations=true" + ) + + node.query(f"ALTER TABLE {mt_table} UPDATE id = id + 100 WHERE year = 2020") + + time.sleep(5) + + result = node.query(f"SELECT id FROM {s3_table} WHERE year = 2020 ORDER BY id") + assert "1\n2\n3" in result, "Export should contain original data before mutation" + assert "101" not in result, "Export should not contain mutated data" + + +def test_pending_patch_parts_throw_before_export(cluster): + """Test that pending patch parts before export throw an error with default settings.""" + node = cluster.instances["node1"] + + mt_table = "pending_patches_throw_mt_table" + s3_table = "pending_patches_throw_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table) + + node.query(f"SYSTEM STOP MERGES {mt_table}") + + node.query(f"UPDATE {mt_table} SET id = id + 100 WHERE year = 2020") + + error = node.query_and_get_error( + f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table}" + ) + + node.query(f"DROP TABLE {mt_table}") + + assert "PENDING_MUTATIONS_NOT_ALLOWED" in error or "pending patch parts" in error.lower(), \ + f"Expected error about pending patch parts, got: {error}" + + +def test_pending_patch_parts_skip_before_export(cluster): + """Test that pending patch parts before export are skipped with throw_on_pending_patch_parts=false.""" + node = cluster.instances["node1"] + + mt_table = "pending_patches_skip_mt_table" + s3_table = "pending_patches_skip_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table) + + node.query(f"SYSTEM STOP MERGES {mt_table}") + + node.query(f"UPDATE {mt_table} SET id = id + 100 WHERE year = 2020") + + node.query( + f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_throw_on_pending_patch_parts=false" + ) + + time.sleep(5) + + result = node.query(f"SELECT id FROM {s3_table} WHERE year = 2020 ORDER BY id") + assert "1\n2\n3" in result, "Export should contain original data before patch" + + node.query(f"DROP TABLE {mt_table}") diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index a4cb0807d6ee..83deae5c1d73 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -113,7 +113,7 @@ def create_s3_table(node, s3_table): def create_tables_and_insert_data(node, mt_table, s3_table, replica_name): - node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{mt_table}', '{replica_name}') PARTITION BY year ORDER BY tuple()") + node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{mt_table}', '{replica_name}') PARTITION BY year ORDER BY tuple() SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1") node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)") create_s3_table(node, s3_table) @@ -720,30 +720,235 @@ def test_multiple_exports_within_a_single_query(cluster): """ ) == "COMPLETED\n", "Export should be marked as COMPLETED" -# def test_source_mutations_during_export_snapshot(cluster): -# node = cluster.instances["replica1"] -# mt_table = "mutations_snapshot_mt_table" -# s3_table = "mutations_snapshot_s3_table" +def test_pending_mutations_throw_before_export_partition(cluster): + """Test that pending mutations before export partition throw an error.""" + node = cluster.instances["replica1"] + + mt_table = "pending_mutations_throw_partition_mt_table" + s3_table = "pending_mutations_throw_partition_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table, "replica1") + + node.query(f"SYSTEM STOP MERGES {mt_table}") + + node.query(f"ALTER TABLE {mt_table} UPDATE id = id + 100 WHERE year = 2020") + + mutations = node.query(f"SELECT count() FROM system.mutations WHERE table = '{mt_table}' AND is_done = 0") + assert mutations.strip() != '0', "Mutation should be pending" + + error = node.query_and_get_error( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_throw_on_pending_mutations=true" + ) + + assert "PENDING_MUTATIONS_NOT_ALLOWED" in error, f"Expected error about pending mutations, got: {error}" + + +def test_pending_mutations_skip_before_export_partition(cluster): + """Test that pending mutations before export partition are skipped with throw_on_pending_mutations=false.""" + node = cluster.instances["replica1"] + + mt_table = "pending_mutations_skip_partition_mt_table" + s3_table = "pending_mutations_skip_partition_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table, "replica1") + + node.query(f"SYSTEM STOP MERGES {mt_table}") + + node.query(f"ALTER TABLE {mt_table} UPDATE id = id + 100 WHERE year = 2020") + + mutations = node.query(f"SELECT count() FROM system.mutations WHERE table = '{mt_table}' AND is_done = 0") + assert mutations.strip() != '0', "Mutation should be pending" + + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_throw_on_pending_mutations=false" + ) + + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") + + result = node.query(f"SELECT id FROM {s3_table} WHERE year = 2020 ORDER BY id") + assert "101" not in result and "102" not in result and "103" not in result, \ + "Export should contain original data before mutation" + assert "1\n2\n3" in result, "Export should contain original data" + + +def test_pending_patch_parts_throw_before_export_partition(cluster): + """Test that pending patch parts before export partition throw an error with default settings.""" + node = cluster.instances["replica1"] + + mt_table = "pending_patches_throw_partition_mt_table" + s3_table = "pending_patches_throw_partition_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table, "replica1") + + node.query(f"SYSTEM STOP MERGES {mt_table}") + + node.query(f"UPDATE {mt_table} SET id = id + 100 WHERE year = 2020") + + error = node.query_and_get_error( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" + ) + + node.query(f"DROP TABLE {mt_table}") + + assert "PENDING_MUTATIONS_NOT_ALLOWED" in error or "pending patch parts" in error.lower(), \ + f"Expected error about pending patch parts, got: {error}" + + +def test_pending_patch_parts_skip_before_export_partition(cluster): + """Test that pending patch parts before export partition are skipped with throw_on_pending_patch_parts=false.""" + node = cluster.instances["replica1"] + + mt_table = "pending_patches_skip_partition_mt_table" + s3_table = "pending_patches_skip_partition_s3_table" -# create_tables_and_insert_data(node, mt_table, s3_table, "replica1") + create_tables_and_insert_data(node, mt_table, s3_table, "replica1") + + node.query(f"SYSTEM STOP MERGES {mt_table}") -# # Ensure export sees a consistent snapshot at start time even if we mutate the source later -# with PartitionManager() as pm: -# pm.add_network_delay(node, delay_ms=5000) + node.query(f"UPDATE {mt_table} SET id = id + 100 WHERE year = 2020") -# # Start export of 2020 -# node.query( -# f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table};" -# ) + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_throw_on_pending_patch_parts=false" + ) + + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") -# # Mutate the source after export started (delete the same partition) -# node.query(f"ALTER TABLE {mt_table} DROP COLUMN id") + result = node.query(f"SELECT id FROM {s3_table} WHERE year = 2020 ORDER BY id") + assert "1\n2\n3" in result, "Export should contain original data before patch" -# # assert the mutation has been applied AND the data has not been exported yet -# assert node.query(f"SELECT count() FROM {mt_table} WHERE year = 2020") == '0\n', "Mutation has not been applied" -# assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") == '0\n', "Data has been exported" + node.query(f"DROP TABLE {mt_table}") + + +def test_mutations_after_export_partition_started(cluster): + """Test that mutations applied after export partition starts don't affect the exported data.""" + node = cluster.instances["replica1"] + + mt_table = "mutations_after_export_partition_mt_table" + s3_table = "mutations_after_export_partition_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table, "replica1") + + # Block traffic to MinIO to delay export + minio_ip = cluster.minio_ip + minio_port = cluster.minio_port + + with PartitionManager() as pm: + pm_rule_reject_responses = { + "destination": node.ip_address, + "source_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + } + pm._add_rule(pm_rule_reject_responses) + + pm_rule_reject_requests = { + "destination": minio_ip, + "destination_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + } + pm._add_rule(pm_rule_reject_requests) + + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_throw_on_pending_mutations=true" + ) + + # Wait for export to start + wait_for_export_to_start(node, mt_table, s3_table, "2020") + + node.query(f"ALTER TABLE {mt_table} UPDATE id = id + 100 WHERE year = 2020") + + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") + + result = node.query(f"SELECT id FROM {s3_table} WHERE year = 2020 ORDER BY id") + assert "1\n2\n3" in result, "Export should contain original data before mutation" + assert "101" not in result, "Export should not contain mutated data" + + +def test_patch_parts_after_export_partition_started(cluster): + """Test that patch parts created after export partition starts don't affect the exported data.""" + node = cluster.instances["replica1"] + + mt_table = "patches_after_export_partition_mt_table" + s3_table = "patches_after_export_partition_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table, "replica1") + + # Block traffic to MinIO to delay export + minio_ip = cluster.minio_ip + minio_port = cluster.minio_port + + with PartitionManager() as pm: + pm_rule_reject_responses = { + "destination": node.ip_address, + "source_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + } + pm._add_rule(pm_rule_reject_responses) + + pm_rule_reject_requests = { + "destination": minio_ip, + "destination_port": minio_port, + "action": "REJECT --reject-with tcp-reset", + } + pm._add_rule(pm_rule_reject_requests) + + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" + ) + + # Wait for export to start + wait_for_export_to_start(node, mt_table, s3_table, "2020") + + node.query(f"UPDATE {mt_table} SET id = id + 100 WHERE year = 2020") + + wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") + + result = node.query(f"SELECT id FROM {s3_table} WHERE year = 2020 ORDER BY id") + assert "1\n2\n3" in result, "Export should contain original data before patch" + assert "101" not in result, "Export should not contain patched data" + + node.query(f"DROP TABLE {mt_table}") + + +def test_mutation_in_partition_clause(cluster): + """Test that mutations limited to specific partitions using IN PARTITION clause + allow exports of unaffected partitions to succeed.""" + node = cluster.instances["replica1"] + + mt_table = "mutation_in_partition_clause_mt_table" + s3_table = "mutation_in_partition_clause_s3_table" + + create_tables_and_insert_data(node, mt_table, s3_table, "replica1") + + node.query(f"SYSTEM STOP MERGES {mt_table}") + + # Issue a mutation that uses IN PARTITION to limit it to partition 2020 + node.query(f"ALTER TABLE {mt_table} UPDATE id = id + 100 IN PARTITION '2020' WHERE year = 2020") + + # Verify mutation is pending for 2020 + mutations = node.query( + f"SELECT count() FROM system.mutations WHERE table = '{mt_table}' AND is_done = 0" + ) + assert mutations.strip() != '0', "Mutation should be pending" + + # Export of 2020 should fail (it has pending mutations) + error = node.query_and_get_error( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_throw_on_pending_mutations=true" + ) + assert "PENDING_MUTATIONS_NOT_ALLOWED" in error, f"Expected error about pending mutations for partition 2020, got: {error}" + + # Export of 2021 should succeed (no mutations affecting it) + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2021' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_throw_on_pending_mutations=true" + ) + + wait_for_export_status(node, mt_table, s3_table, "2021", "COMPLETED") -# # Wait for export to finish and then verify destination still reflects the original snapshot (3 rows) -# time.sleep(5) -# assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") == '3\n', "Export did not preserve snapshot at start time after source mutation" + result = node.query(f"SELECT id FROM {s3_table} WHERE year = 2021 ORDER BY id") + assert "4" in result, "Export of partition 2021 should contain original data" diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql index a61c066e8789..828ed41df92b 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql @@ -1,6 +1,6 @@ -- Tags: no-parallel, no-fasttest -DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table; +DROP TABLE IF EXISTS 03572_outdated_mt_table, 03572_invalid_schema_table, 03572_s3_outdated_table; CREATE TABLE 03572_mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); @@ -19,4 +19,17 @@ CREATE TABLE 03572_invalid_schema_table (id UInt64, year UInt16) ENGINE = S3(s3_ ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_invalid_schema_table SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError NOT_IMPLEMENTED} -DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table; +-- Test export_merge_tree_part_allow_outdated_parts setting +CREATE TABLE 03572_outdated_mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); +CREATE TABLE 03572_s3_outdated_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='03572_s3_outdated_table', format='Parquet', partition_strategy='hive') PARTITION BY year; + +INSERT INTO 03572_outdated_mt_table VALUES (2, 2020); +INSERT INTO 03572_outdated_mt_table VALUES (3, 2020); + +-- Optimize to merge parts, making old parts outdated +OPTIMIZE TABLE 03572_outdated_mt_table FINAL; + +ALTER TABLE 03572_outdated_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_s3_outdated_table +SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_allow_outdated_parts = false; -- {serverError BAD_ARGUMENTS} + +DROP TABLE IF EXISTS 03572_outdated_mt_table, 03572_invalid_schema_table, 03572_s3_outdated_table;