Skip to content
Open
18 changes: 18 additions & 0 deletions docs/en/engines/table-engines/mergetree-family/part_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,24 @@ Source and destination tables must be 100% compatible:
- **Default**: `0`
- **Description**: Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit. This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. Using this might break idempotency, use it with care.

### export_merge_tree_part_throw_on_pending_mutations

- **Type**: `bool`
- **Default**: `true`
- **Description**: If set to true, throws if pending mutations exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

### export_merge_tree_part_throw_on_pending_patch_parts

- **Type**: `bool`
- **Default**: `true`
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

### export_merge_tree_part_allow_outdated_parts

- **Type**: `bool`
- **Default**: `false`
- **Description**: Allows outdated parts to be exported. By default, only `ACTIVE` parts can be exported.

## Examples

### Basic Export to S3
Expand Down
12 changes: 12 additions & 0 deletions docs/en/engines/table-engines/mergetree-family/partition_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ TO TABLE [destination_database.]destination_table
- `error` - Throw an error if the file already exists
- `overwrite` - Overwrite the file

### export_merge_tree_part_throw_on_pending_mutations

- **Type**: `bool`
- **Default**: `true`
- **Description**: If set to true, throws if pending mutations exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

### export_merge_tree_part_throw_on_pending_patch_parts

- **Type**: `bool`
- **Default**: `true`
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

## Examples

### Basic Export to S3
Expand Down
3 changes: 2 additions & 1 deletion src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@
M(1002, UNKNOWN_EXCEPTION) \
M(1003, SSH_EXCEPTION) \
M(1004, STARTUP_SCRIPTS_ERROR) \
M(1005, PENDING_MUTATIONS_NOT_ALLOWED) \
/* See END */

#ifdef APPLY_FOR_EXTERNAL_ERROR_CODES
Expand All @@ -664,7 +665,7 @@ namespace ErrorCodes
APPLY_FOR_ERROR_CODES(M)
#undef M

constexpr ErrorCode END = 1004;
constexpr ErrorCode END = 1005;
ErrorPairHolder values[END + 1]{};

struct ErrorCodesNames
Expand Down
9 changes: 9 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6920,6 +6920,15 @@ This is not a hard limit, and it highly depends on the output format granularity
DECLARE(UInt64, export_merge_tree_part_max_rows_per_file, 0, R"(
Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit.
This is not a hard limit, and it highly depends on the output format granularity and input source chunk size.
)", 0) \
DECLARE(Bool, export_merge_tree_part_throw_on_pending_mutations, true, R"(
Throw an error if there are pending mutations when exporting a merge tree part.
)", 0) \
DECLARE(Bool, export_merge_tree_part_throw_on_pending_patch_parts, true, R"(
Throw an error if there are pending patch parts when exporting a merge tree part.
)", 0) \
DECLARE(Bool, export_merge_tree_part_allow_outdated_parts, false, R"(
Allow exporting parts in the outdated state.
)", 0) \
DECLARE(Bool, serialize_string_in_memory_with_zero_byte, true, R"(
Serialize String values during aggregation with zero byte at the end. Enable to keep compatibility when querying cluster of incompatible versions.
Expand Down
3 changes: 3 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"cluster_table_function_split_granularity", "file", "file", "New setting."},
{"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
{"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
{"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
{"export_merge_tree_part_allow_outdated_parts", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
8 changes: 8 additions & 0 deletions src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ namespace
context_copy->setSetting("export_merge_tree_part_file_already_exists_policy", String(magic_enum::enum_name(manifest.file_already_exists_policy)));
context_copy->setSetting("export_merge_tree_part_max_bytes_per_file", manifest.max_bytes_per_file);
context_copy->setSetting("export_merge_tree_part_max_rows_per_file", manifest.max_rows_per_file);

/// always allow exporting outdated parts because the parts have been validated when the query was processed
context_copy->setSetting("export_merge_tree_part_allow_outdated_parts", true);

/// always skip pending mutations and patch parts because we already validated the parts during query processing
context_copy->setSetting("export_merge_tree_part_throw_on_pending_mutations", false);
context_copy->setSetting("export_merge_tree_part_throw_on_pending_patch_parts", false);

return context_copy;
}
}
Expand Down
43 changes: 43 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ namespace Setting
extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy;
extern const SettingsBool output_format_parallel_formatting;
extern const SettingsBool output_format_parquet_parallel_encoding;
extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations;
extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts;
extern const SettingsBool export_merge_tree_part_allow_outdated_parts;
}

namespace MergeTreeSetting
Expand Down Expand Up @@ -336,6 +339,7 @@ namespace ErrorCodes
extern const int TOO_LARGE_LIGHTWEIGHT_UPDATES;
extern const int UNKNOWN_TABLE;
extern const int FILE_ALREADY_EXISTS;
extern const int PENDING_MUTATIONS_NOT_ALLOWED;
}

static void checkSuspiciousIndices(const ASTFunction * index_function)
Expand Down Expand Up @@ -6254,6 +6258,45 @@ void MergeTreeData::exportPartToTable(
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No such data part '{}' to export in table '{}'",
part_name, getStorageID().getFullTableName());

const bool allow_outdated_parts = query_context->getSettingsRef()[Setting::export_merge_tree_part_allow_outdated_parts];

if (part->getState() == MergeTreeDataPartState::Outdated && !allow_outdated_parts)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Part {} is in the outdated state and cannot be exported. Set `export_merge_tree_part_allow_outdated_parts` to true to allow exporting outdated parts",
part_name);

const bool throw_on_pending_mutations = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_mutations];
const bool throw_on_pending_patch_parts = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_patch_parts];

MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params
{
.metadata_version = source_metadata_ptr->getMetadataVersion(),
.min_part_metadata_version = part->getMetadataVersion(),
.need_data_mutations = throw_on_pending_mutations,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does ClickHouse throw some exception when this flag is set? made only fast check, but looks like only some parts are skipped.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does ClickHouse throw some exception when this flag is set

It should not, at least in this context. The code you are reviewing is the code that process the export query request, not the code that actually exports the data. The mutations_snapshot_params is used to create mutations_snapshot and alter_conversions. None is used for anything but to check if there are pending mutations within this function.

but looks like only some parts are skipped.

I did not understand this, can you elaborate?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclean. why flags "need_something" are filled with values from "throw_on_some_condition".
When I see "throw_on_some_condition", I expect that setting causes exception, but here it changes other behavior.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR: To throw, I need to know if it exists. To know if it exists, I need to set it to true.

Ah, I see. The need_something flags dictacte if the mutation snapshot contains the mutations / patch parts and etc. If I hard code it to false, the snapshot will never include information about pending mutations / patch parts. In this case, I will not be able to check if they exist and will not be able to throw.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that it throw exception. I mean, setting does something more than only throw exception.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What else does it do in this case? I don't think so.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I say about naming, not about code logic :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting export_merge_tree_part_throw_on_pending_mutations change value of need_data_mutations field in mutations_snapshot_params. need_data_mutations changes behavior. So setting name is confusing, something like "export_merge_tree_part_do_not_export_on_pending_mutations" (I'm not good in naming) is more clean.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need_data_mutations changes behavior.

Not in this case. Like I said here, in this code snippet the only effect of setting need_data_mutations is that the snapshot and alter conversions object will contain information about mutations.

And if it does, we throw. Because the only reason for that object to have mutations is: 1. there are pending mutations; 2. the user set throw = true

.need_alter_mutations = throw_on_pending_mutations || throw_on_pending_patch_parts,
.need_patch_parts = throw_on_pending_patch_parts,
};

const auto mutations_snapshot = getMutationsSnapshot(mutations_snapshot_params);

const auto alter_conversions = getAlterConversionsForPart(part, mutations_snapshot, query_context);

/// re-check `throw_on_pending_mutations` because `pending_mutations` might have been filled due to `throw_on_pending_patch_parts`
if (throw_on_pending_mutations && alter_conversions->hasMutations())
{
throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED,
"Part {} can not be exported because there are pending mutations. Either wait for the mutations to be applied or set `export_merge_tree_part_throw_on_pending_mutations` to false",
part_name);
}

if (alter_conversions->hasPatches())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to check export_merge_tree_part_throw_on_pending_patch_parts here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because of #1294 (comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add the check if you think it is more clear, but it is not needed at all

{
throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED,
"Part {} can not be exported because there are pending patch parts. Either wait for the patch parts to be applied or set `export_merge_tree_part_throw_on_pending_patch_parts` to false",
part_name);
}

{
const auto format_settings = getFormatSettings(query_context);
MergeTreePartExportManifest manifest(
Expand Down
43 changes: 41 additions & 2 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ namespace Setting
extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy;
extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file;
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations;
extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts;
}

namespace MergeTreeSetting
Expand Down Expand Up @@ -305,6 +307,7 @@ namespace ErrorCodes
extern const int CANNOT_FORGET_PARTITION;
extern const int TIMEOUT_EXCEEDED;
extern const int INVALID_SETTING_VALUE;
extern const int PENDING_MUTATIONS_NOT_ALLOWED;
}

namespace ServerSetting
Expand Down Expand Up @@ -8192,18 +8195,54 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &

ops.emplace_back(zkutil::makeCreateRequest(partition_exports_path, "", zkutil::CreateMode::Persistent));

auto data_parts_lock = lockParts();
DataPartsVector parts;

const auto parts = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id, &data_parts_lock);
{
auto data_parts_lock = lockParts();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either getMutationsSnapshot or getAlterConversionsForPart grabs a lock. Putting this in a scope to avoid dead locks

parts = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id, &data_parts_lock);
}

if (parts.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Partition {} doesn't exist", partition_id);
}

const bool throw_on_pending_mutations = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_mutations];
const bool throw_on_pending_patch_parts = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_patch_parts];

MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params
{
.metadata_version = getInMemoryMetadataPtr()->getMetadataVersion(),
.min_part_metadata_version = MergeTreeData::getMinMetadataVersion(parts),
.need_data_mutations = throw_on_pending_mutations,
.need_alter_mutations = throw_on_pending_mutations || throw_on_pending_patch_parts,
.need_patch_parts = throw_on_pending_patch_parts,
};

const auto mutations_snapshot = getMutationsSnapshot(mutations_snapshot_params);

std::vector<String> part_names;
for (const auto & part : parts)
{
const auto alter_conversions = getAlterConversionsForPart(part, mutations_snapshot, query_context);

/// re-check `throw_on_pending_mutations` because `pending_mutations` might have been filled due to `throw_on_pending_patch_parts`
if (alter_conversions->hasMutations() && throw_on_pending_mutations)
{
throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED,
"Partition {} can not be exported because the part {} has pending mutations. Either wait for the mutations to be applied or set `export_merge_tree_part_throw_on_pending_mutations` to false",
partition_id,
part->name);
}

if (alter_conversions->hasPatches())
{
throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED,
"Partition {} can not be exported because the part {} has pending patch parts. Either wait for the patch parts to be applied or set `export_merge_tree_part_throw_on_pending_patch_parts` to false",
partition_id,
part->name);
}

part_names.push_back(part->name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def create_s3_table(node, s3_table):


def create_tables_and_insert_data(node, mt_table, s3_table):
node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()")
node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple() SETTINGS enable_block_number_column = 1, enable_block_offset_column = 1")
node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)")

create_s3_table(node, s3_table)
Expand Down Expand Up @@ -129,3 +129,146 @@ def test_add_column_during_export(cluster):
time.sleep(5)
assert node.query(f"SELECT count() FROM {s3_table} WHERE id >= 0") == '3\n', "Export did not preserve snapshot at start time after source mutation"
assert "Unknown expression identifier `id2`" in node.query_and_get_error(f"SELECT id2 FROM {s3_table}"), "Column id2 is present in the exported data"


def test_pending_mutations_throw_before_export(cluster):
"""Test that pending mutations before export throw an error with default settings."""
node = cluster.instances["node1"]

mt_table = "pending_mutations_throw_mt_table"
s3_table = "pending_mutations_throw_s3_table"

create_tables_and_insert_data(node, mt_table, s3_table)

node.query(f"SYSTEM STOP MERGES {mt_table}")

node.query(f"ALTER TABLE {mt_table} UPDATE id = id + 100 WHERE year = 2020")

mutations = node.query(f"SELECT count() FROM system.mutations WHERE table = '{mt_table}' AND is_done = 0")
assert mutations.strip() != '0', "Mutation should be pending"

error = node.query_and_get_error(
f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table} SETTINGS export_merge_tree_part_throw_on_pending_mutations=true"
)

assert "PENDING_MUTATIONS_NOT_ALLOWED" in error, f"Expected error about pending mutations, got: {error}"


def test_pending_mutations_skip_before_export(cluster):
"""Test that pending mutations before export are skipped with throw_on_pending_mutations=false."""
node = cluster.instances["node1"]

mt_table = "pending_mutations_skip_mt_table"
s3_table = "pending_mutations_skip_s3_table"

create_tables_and_insert_data(node, mt_table, s3_table)

node.query(f"SYSTEM STOP MERGES {mt_table}")

node.query(f"ALTER TABLE {mt_table} UPDATE id = id + 100 WHERE year = 2020")

mutations = node.query(f"SELECT count() FROM system.mutations WHERE table = '{mt_table}' AND is_done = 0")
assert mutations.strip() != '0', "Mutation should be pending"

node.query(
f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table} "
f"SETTINGS export_merge_tree_part_throw_on_pending_mutations=false"
)

time.sleep(5)

result = node.query(f"SELECT id FROM {s3_table} WHERE year = 2020 ORDER BY id")
assert "101" not in result and "102" not in result and "103" not in result, \
"Export should contain original data before mutation"
assert "1\n2\n3" in result, "Export should contain original data"


def test_data_mutations_after_export_started(cluster):
"""Test that mutations applied after export starts don't affect the exported data."""
node = cluster.instances["node1"]

mt_table = "mutations_after_export_mt_table"
s3_table = "mutations_after_export_s3_table"

create_tables_and_insert_data(node, mt_table, s3_table)

# Block traffic to MinIO to delay export
minio_ip = cluster.minio_ip
minio_port = cluster.minio_port

with PartitionManager() as pm:
pm_rule_reject_responses = {
"destination": node.ip_address,
"source_port": minio_port,
"action": "REJECT --reject-with tcp-reset",
}
pm._add_rule(pm_rule_reject_responses)

pm_rule_reject_requests = {
"destination": minio_ip,
"destination_port": minio_port,
"action": "REJECT --reject-with tcp-reset",
}
pm._add_rule(pm_rule_reject_requests)

node.query(
f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table} "
f"SETTINGS export_merge_tree_part_throw_on_pending_mutations=true"
)

node.query(f"ALTER TABLE {mt_table} UPDATE id = id + 100 WHERE year = 2020")

time.sleep(5)

result = node.query(f"SELECT id FROM {s3_table} WHERE year = 2020 ORDER BY id")
assert "1\n2\n3" in result, "Export should contain original data before mutation"
assert "101" not in result, "Export should not contain mutated data"


def test_pending_patch_parts_throw_before_export(cluster):
"""Test that pending patch parts before export throw an error with default settings."""
node = cluster.instances["node1"]

mt_table = "pending_patches_throw_mt_table"
s3_table = "pending_patches_throw_s3_table"

create_tables_and_insert_data(node, mt_table, s3_table)

node.query(f"SYSTEM STOP MERGES {mt_table}")

node.query(f"UPDATE {mt_table} SET id = id + 100 WHERE year = 2020")

error = node.query_and_get_error(
f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table}"
)

node.query(f"DROP TABLE {mt_table}")

assert "PENDING_MUTATIONS_NOT_ALLOWED" in error or "pending patch parts" in error.lower(), \
f"Expected error about pending patch parts, got: {error}"


def test_pending_patch_parts_skip_before_export(cluster):
"""Test that pending patch parts before export are skipped with throw_on_pending_patch_parts=false."""
node = cluster.instances["node1"]

mt_table = "pending_patches_skip_mt_table"
s3_table = "pending_patches_skip_s3_table"

create_tables_and_insert_data(node, mt_table, s3_table)

node.query(f"SYSTEM STOP MERGES {mt_table}")

node.query(f"UPDATE {mt_table} SET id = id + 100 WHERE year = 2020")

node.query(
f"ALTER TABLE {mt_table} EXPORT PART '2020_1_1_0' TO TABLE {s3_table} "
f"SETTINGS export_merge_tree_part_throw_on_pending_patch_parts=false"
)

time.sleep(5)

result = node.query(f"SELECT id FROM {s3_table} WHERE year = 2020 ORDER BY id")
assert "1\n2\n3" in result, "Export should contain original data before patch"

node.query(f"DROP TABLE {mt_table}")
Loading
Loading