Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/en/engines/table-engines/mergetree-family/part_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ SETTINGS allow_experimental_export_merge_tree_part = 1
[, setting_name = value, ...]
```

## Syntax with table function

```sql
ALTER TABLE [database.]table_name
EXPORT PART 'part_name'
TO TABLE FUNCTION s3(s3_conn, filename='table_function', partition_strategy...)
SETTINGS allow_experimental_export_merge_tree_part = 1
[, setting_name = value, ...]
```

### Parameters

- **`table_name`**: The source MergeTree table containing the part to export
Expand All @@ -34,6 +44,8 @@ Source and destination tables must be 100% compatible:
1. **Identical schemas** - same columns, types, and order
2. **Matching partition keys** - partition expressions must be identical

In case a table function is used as the destination, the schema can be omitted and it will be inferred from the source table.

## Settings

### `allow_experimental_export_merge_tree_part` (Required)
Expand Down Expand Up @@ -83,6 +95,20 @@ ALTER TABLE mt_table EXPORT PART '2021_2_2_0' TO TABLE s3_table
SETTINGS allow_experimental_export_merge_tree_part = 1;
```

### Table function export

```sql
-- Create source and destination tables
CREATE TABLE mt_table (id UInt64, year UInt16)
ENGINE = MergeTree() PARTITION BY year ORDER BY tuple();

-- Insert and export
INSERT INTO mt_table VALUES (1, 2020), (2, 2020), (3, 2021);

ALTER TABLE mt_table EXPORT PART '2020_1_1_0' TO TABLE FUNCTION s3(s3_conn, filename='table_function', format=Parquet, partition_strategy='hive') PARTITION BY year
SETTINGS allow_experimental_export_merge_tree_part = 1;
```

## Monitoring

### Active Exports
Expand Down
4 changes: 3 additions & 1 deletion src/Interpreters/InterpreterAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,9 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
case ASTAlterCommand::EXPORT_PART:
{
required_access.emplace_back(AccessType::ALTER_EXPORT_PART, database, table);
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
/// For table functions, access control is handled by the table function itself
if (!command.to_table_function)
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
break;
}
case ASTAlterCommand::EXPORT_PARTITION:
Expand Down
25 changes: 22 additions & 3 deletions src/Parsers/ASTAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ ASTPtr ASTAlterCommand::clone() const
res->sql_security = res->children.emplace_back(sql_security->clone()).get();
if (rename_to)
res->rename_to = res->children.emplace_back(rename_to->clone()).get();
if (to_table_function)
res->to_table_function = res->children.emplace_back(to_table_function->clone()).get();
if (partition_by_expr)
res->partition_by_expr = res->children.emplace_back(partition_by_expr->clone()).get();

return res;
}
Expand Down Expand Up @@ -367,11 +371,24 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett
{
case DataDestinationType::TABLE:
ostr << "TABLE ";
if (!to_database.empty())
if (to_table_function)
{
ostr << "FUNCTION ";
to_table_function->format(ostr, settings, state, frame);
if (partition_by_expr)
{
ostr << " PARTITION BY ";
partition_by_expr->format(ostr, settings, state, frame);
}
}
else
{
ostr << backQuoteIfNeed(to_database) << ".";
if (!to_database.empty())
{
ostr << backQuoteIfNeed(to_database) << ".";
}
ostr << backQuoteIfNeed(to_table);
}
ostr << backQuoteIfNeed(to_table);
return;
default:
break;
Expand Down Expand Up @@ -584,6 +601,8 @@ void ASTAlterCommand::forEachPointerToChild(std::function<void(void**)> f)
f(reinterpret_cast<void **>(&select));
f(reinterpret_cast<void **>(&sql_security));
f(reinterpret_cast<void **>(&rename_to));
f(reinterpret_cast<void **>(&to_table_function));
f(reinterpret_cast<void **>(&partition_by_expr));
}


Expand Down
3 changes: 3 additions & 0 deletions src/Parsers/ASTAlterQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ class ASTAlterCommand : public IAST
/// MOVE PARTITION partition TO TABLE db.table
String to_database;
String to_table;
/// EXPORT PART/PARTITION to TABLE FUNCTION (e.g., s3())
IAST * to_table_function = nullptr;
IAST * partition_by_expr = nullptr;

String snapshot_name;
IAST * snapshot_desc;
Expand Down
36 changes: 33 additions & 3 deletions src/Parsers/ParserAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_unfreeze(Keyword::UNFREEZE);
ParserKeyword s_unlock_snapshot(Keyword::UNLOCK_SNAPSHOT);
ParserKeyword s_partition(Keyword::PARTITION);
ParserKeyword s_partition_by(Keyword::PARTITION_BY);

ParserKeyword s_first(Keyword::FIRST);
ParserKeyword s_after(Keyword::AFTER);
Expand All @@ -107,6 +108,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_to_volume(Keyword::TO_VOLUME);
ParserKeyword s_to_table(Keyword::TO_TABLE);
ParserKeyword s_to_shard(Keyword::TO_SHARD);
ParserKeyword s_function(Keyword::FUNCTION);

ParserKeyword s_delete(Keyword::DELETE);
ParserKeyword s_update(Keyword::UPDATE);
Expand Down Expand Up @@ -176,6 +178,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ASTPtr command_rename_to;
ASTPtr command_sql_security;
ASTPtr command_snapshot_desc;
ASTPtr export_table_function;
ASTPtr export_table_function_partition_by_expr;

if (with_round_bracket)
{
Expand Down Expand Up @@ -550,9 +554,31 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
return false;
}

if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
return false;
command->move_destination_type = DataDestinationType::TABLE;
if (s_function.ignore(pos, expected))
{
ParserFunction table_function_parser(/*allow_function_parameters=*/true, /*is_table_function=*/true);

if (!table_function_parser.parse(pos, export_table_function, expected))
{
return false;
}

if (s_partition_by.ignore(pos, expected))
{
if (!parser_exp_elem.parse(pos, export_table_function_partition_by_expr, expected))
return false;
}

command->to_table_function = export_table_function.get();
command->partition_by_expr = export_table_function_partition_by_expr.get();
command->move_destination_type = DataDestinationType::TABLE;
}
else
{
if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
return false;
command->move_destination_type = DataDestinationType::TABLE;
}
}
else if (s_export_partition.ignore(pos, expected))
{
Expand Down Expand Up @@ -1061,6 +1087,10 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->rename_to = command->children.emplace_back(std::move(command_rename_to)).get();
if (command_snapshot_desc)
command->snapshot_desc = command->children.emplace_back(std::move(command_snapshot_desc)).get();
if (export_table_function)
command->to_table_function = command->children.emplace_back(std::move(export_table_function)).get();
if (export_table_function_partition_by_expr)
command->partition_by_expr = command->children.emplace_back(std::move(export_table_function_partition_by_expr)).get();

return true;
}
Expand Down
13 changes: 3 additions & 10 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,12 @@ bool ExportPartTask::executeStep()
block_with_partition_values = manifest.data_part->minmax_idx->getBlock(storage);
}

auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest.destination_storage_id, local_context);
if (!destination_storage)
{
std::lock_guard inner_lock(storage.export_manifests_mutex);

const auto destination_storage_id_name = manifest.destination_storage_id.getNameForLogs();
storage.export_manifests.erase(manifest);
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name);
}
const auto & destination_storage = manifest.destination_storage_ptr;
const auto destination_storage_id = destination_storage->getStorageID();

Comment on lines +72 to 74

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Revalidate destination table before export task runs

The export task now uses the captured destination_storage_ptr directly without rechecking whether the destination table still exists or is dropped/detached. If the destination table is dropped after the ALTER EXPORT is queued but before this background task executes, destination_storage->import(...) will still run and can write data for a table that no longer exists (or has been recreated with a different definition). Previously the task re-resolved the destination from the catalog and aborted on UNKNOWN_TABLE; consider re-looking it up or at least checking is_dropped/locking before proceeding.

Useful? React with 👍 / 👎.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting one. This is kind of an existing problem, not exclusive to this PR.

Plain object storage tables don't have state, and there is no trigger / event for when these tables are dropped. At the same time, it is safe (in terms of thread and memory safety) to write to S3 even after the table has been dropped.

Being that said, I think we have a few options:

  1. Document it and leave it as is, meaning that if a table gets deleted after the export process has started, it will continue writing to the S3 bucket.
  2. Somehow trigger an event when the table gets deleted, and cancel exports. Might leak abstractions
  3. Constantly check in the pipeline if the table remains alive

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case, I don't think this is a major issue. We probably have bigger fish to fry

auto exports_list_entry = storage.getContext()->getExportsList().insert(
getStorageID(),
manifest.destination_storage_id,
destination_storage_id,
manifest.data_part->getBytesOnDisk(),
manifest.data_part->name,
std::vector<std::string>{},
Expand Down
60 changes: 53 additions & 7 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <Storages/MergeTree/ExportList.h>
#include <Access/AccessControl.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <Analyzer/QueryTreeBuilder.h>
#include <Analyzer/Utils.h>
Expand Down Expand Up @@ -6212,9 +6213,44 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP

const auto part_name = command.partition->as<ASTLiteral &>().value.safeGet<String>();

const auto database_name = query_context->resolveDatabase(command.to_database);
if (!command.to_table_function)
{
const auto database_name = query_context->resolveDatabase(command.to_database);
exportPartToTable(part_name, StorageID{database_name, command.to_table}, generateSnowflakeIDString(), query_context);

return;
}

auto table_function_ast = command.to_table_function;
auto table_function_ptr = TableFunctionFactory::instance().get(command.to_table_function, query_context);

if (table_function_ptr->needStructureHint())
{
auto source_metadata_ptr = getInMemoryMetadataPtr();
ColumnsDescription structure_hint = source_metadata_ptr->getColumns();
table_function_ptr->setStructureHint(structure_hint);
}

if (command.partition_by_expr)
{
table_function_ptr->setPartitionBy(command.partition_by_expr);
}

auto dest_storage = table_function_ptr->execute(
table_function_ast,
query_context,
table_function_ptr->getName(),
/* cached_columns */ {},
/* use_global_context */ false,
/* is_insert_query */ true);

if (!dest_storage)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failed to reconstruct destination storage");
}

exportPartToTable(part_name, StorageID{database_name, command.to_table}, generateSnowflakeIDString(), query_context);
exportPartToTable(part_name, dest_storage, generateSnowflakeIDString(), query_context);
return;
}

void MergeTreeData::exportPartToTable(
Expand All @@ -6231,6 +6267,16 @@ void MergeTreeData::exportPartToTable(
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Exporting to the same table is not allowed");
}

exportPartToTable(part_name, dest_storage, transaction_id, query_context, completion_callback);
}

void MergeTreeData::exportPartToTable(
const std::string & part_name,
const StoragePtr & dest_storage,
const String & transaction_id,
ContextPtr query_context,
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback)
{
if (!dest_storage->supportsImport())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Destination storage {} does not support MergeTree parts or uses unsupported partitioning", dest_storage->getName());

Expand All @@ -6257,7 +6303,7 @@ void MergeTreeData::exportPartToTable(
{
const auto format_settings = getFormatSettings(query_context);
MergeTreePartExportManifest manifest(
dest_storage->getStorageID(),
dest_storage,
part,
transaction_id,
query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
Expand All @@ -6269,8 +6315,7 @@ void MergeTreeData::exportPartToTable(

if (!export_manifests.emplace(std::move(manifest)).second)
{
throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'",
part_name, dest_storage->getStorageID().getFullTableName());
throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported", part_name);
}
}

Expand Down Expand Up @@ -8703,8 +8748,9 @@ std::vector<MergeTreeExportStatus> MergeTreeData::getExportsStatus() const

status.source_database = source_database;
status.source_table = source_table;
status.destination_database = manifest.destination_storage_id.database_name;
status.destination_table = manifest.destination_storage_id.table_name;
const auto destination_storage_id = manifest.destination_storage_ptr->getStorageID();
status.destination_database = destination_storage_id.database_name;
status.destination_table = destination_storage_id.table_name;
status.create_time = manifest.create_time;
status.part_name = manifest.data_part->name;

Expand Down
7 changes: 7 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,13 @@ class MergeTreeData : public IStorage, public WithMutableContext

void exportPartToTable(const PartitionCommand & command, ContextPtr query_context);

void exportPartToTable(
const std::string & part_name,
const StoragePtr & destination_storage,
const String & transaction_id,
ContextPtr query_context,
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback = {});

void exportPartToTable(
const std::string & part_name,
const StorageID & destination_storage_id,
Expand Down
17 changes: 5 additions & 12 deletions src/Storages/MergeTree/MergeTreePartExportManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <QueryPipeline/QueryPipeline.h>
#include <optional>
#include <Core/Settings.h>
#include <Storages/IStorage.h>

namespace DB
{
Expand Down Expand Up @@ -43,14 +44,14 @@ struct MergeTreePartExportManifest
};

MergeTreePartExportManifest(
const StorageID & destination_storage_id_,
const StoragePtr destination_storage_ptr_,
const DataPartPtr & data_part_,
const String & transaction_id_,
FileAlreadyExistsPolicy file_already_exists_policy_,
const Settings & settings_,
const StorageMetadataPtr & metadata_snapshot_,
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
: destination_storage_id(destination_storage_id_),
: destination_storage_ptr(destination_storage_ptr_),
data_part(data_part_),
transaction_id(transaction_id_),
file_already_exists_policy(file_already_exists_policy_),
Expand All @@ -59,7 +60,7 @@ struct MergeTreePartExportManifest
completion_callback(completion_callback_),
create_time(time(nullptr)) {}

StorageID destination_storage_id;
StoragePtr destination_storage_ptr;
DataPartPtr data_part;
/// Used for killing the export.
String transaction_id;
Expand All @@ -78,20 +79,12 @@ struct MergeTreePartExportManifest

bool operator<(const MergeTreePartExportManifest & rhs) const
{
// Lexicographic comparison: first compare destination storage, then part name
auto lhs_storage = destination_storage_id.getQualifiedName();
auto rhs_storage = rhs.destination_storage_id.getQualifiedName();

if (lhs_storage != rhs_storage)
return lhs_storage < rhs_storage;

return data_part->name < rhs.data_part->name;
}
Comment on lines 80 to 83

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Include destination in export manifest ordering

Because MergeTreeData::exportPartToTable stores manifests in a std::set and relies on the comparator for uniqueness, comparing only data_part->name makes all exports of the same part equivalent even if the destination differs. This means concurrent exports of the same part to two different tables or table functions will now collide and the second will fail with “already being exported,” which is a regression from the previous per-destination behavior. Consider including the destination (e.g., storage ID or pointer) in operator</operator== so the set only deduplicates identical part+destination pairs.

Useful? React with 👍 / 👎.


bool operator==(const MergeTreePartExportManifest & rhs) const
{
return destination_storage_id.getQualifiedName() == rhs.destination_storage_id.getQualifiedName()
&& data_part->name == rhs.data_part->name;
return data_part->name == rhs.data_part->name;
}
};

Expand Down
Loading
Loading