Skip to content
Open

impl #1324

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/Storages/ColumnsDescription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,6 @@ std::optional<ColumnDefault> ColumnsDescription::getDefault(const String & colum
return {};
}


bool ColumnsDescription::hasCompressionCodec(const String & column_name) const
{
const auto it = columns.get<1>().find(column_name);
Expand Down
35 changes: 35 additions & 0 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Core/Settings.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/Exception.h>
#include <Common/ProfileEventsScope.h>
#include <Storages/MergeTree/ExportList.h>
#include <Formats/FormatFactory.h>
#include <Databases/enableAllExperimentalSettings.h>

namespace ProfileEvents
{
Expand Down Expand Up @@ -58,7 +61,11 @@ bool ExportPartTask::executeStep()

const auto & metadata_snapshot = manifest.metadata_snapshot;

// Read only physical columns from the part
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();

// But we want all columns (including aliases) in the output
NamesAndTypesList all_columns = metadata_snapshot->getColumns().getAll();

MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;

Expand Down Expand Up @@ -146,6 +153,34 @@ bool ExportPartTask::executeStep()
local_context,
getLogger("ExportPartition"));

// Add expression step to compute alias and other default columns for export
// This materializes virtual columns (like ALIAS) so they can be written to output
const auto & current_header = plan_for_part.getCurrentHeader();

// Enable all experimental settings for default expressions
// (same pattern as in IMergeTreeReader::evaluateMissingDefaults)
auto context_for_defaults = Context::createCopy(local_context);
enableAllExperimentalSettings(context_for_defaults);

auto defaults_dag = evaluateMissingDefaults(
*current_header,
all_columns,
metadata_snapshot->getColumns(),
Comment on lines +165 to +168

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Align export output columns with destination physical schema

This step builds defaults/reordering from all_columns, which includes ALIAS columns from the source. The export sink’s header comes from the destination’s getSampleBlock() (physical-only; see StorageInMemoryMetadata::getSampleBlock()), so if the destination table also defines an ALIAS column (same schema as source), the pipeline will output that alias column and StorageObjectStorageSink::consume() will throw because Block::cloneWithColumns requires the column count to match the physical header. This means EXPORT PART fails for tables that include aliases on the destination. Consider filtering to the destination’s physical columns (or using its sample block) when building the output DAG.

Useful? React with 👍 / 👎.

context_for_defaults);

if (defaults_dag)
{
// Ensure columns are in the correct order matching all_columns
defaults_dag->removeUnusedActions(all_columns.getNames(), false);
defaults_dag->addMaterializingOutputActions(/*materialize_sparse=*/ false);

auto expression_step = std::make_unique<ExpressionStep>(
current_header,
std::move(*defaults_dag));
expression_step->setStepDescription("Compute alias and default expressions for export");
plan_for_part.addStep(std::move(expression_step));
}

ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, "");

QueryPlanOptimizationSettings optimization_settings(local_context);
Expand Down
6 changes: 5 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6242,7 +6242,11 @@ void MergeTreeData::exportPartToTable(
auto source_metadata_ptr = getInMemoryMetadataPtr();
auto destination_metadata_ptr = dest_storage->getInMemoryMetadataPtr();

if (destination_metadata_ptr->getColumns().getAllPhysical().sizeOfDifference(source_metadata_ptr->getColumns().getAllPhysical()))
const auto & source_columns = source_metadata_ptr->getColumns();

const auto & destination_columns = destination_metadata_ptr->getColumns();

if (destination_columns.getAll().sizeOfDifference(source_columns.getAll()))
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");

if (query_to_string(source_metadata_ptr->getPartitionKeyAST()) != query_to_string(destination_metadata_ptr->getPartitionKeyAST()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,17 @@
---- Count rows in big_table and big_destination_max_rows
4194304
4194304
---- Test ALIAS columns export
---- Verify ALIAS column data in source table (arr_1 computed from arr[1])
1 [1,2,3] 1
1 [10,20,30] 10
---- Verify ALIAS column data exported to S3 (should match source)
1 [1,2,3] 1
1 [10,20,30] 10
---- Test MATERIALIZED columns export
---- Verify MATERIALIZED column data in source table (arr_1 computed from arr[1])
1 [1,2,3] 1
1 [10,20,30] 10
---- Verify MATERIALIZED column data exported to S3 (should match source)
1 [1,2,3] 1
1 [10,20,30] 10
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ mt_table_roundtrip="mt_table_roundtrip_${RANDOM}"
big_table="big_table_${RANDOM}"
big_destination_max_bytes="big_destination_max_bytes_${RANDOM}"
big_destination_max_rows="big_destination_max_rows_${RANDOM}"
mt_table_tf="mt_table_tf_${RANDOM}"
mt_alias="mt_alias_${RANDOM}"
mt_materialized="mt_materialized_${RANDOM}"
s3_alias_export="s3_alias_export_${RANDOM}"
s3_materialized_export="s3_materialized_export_${RANDOM}"

query() {
$CLICKHOUSE_CLIENT --query "$1"
}

query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function"
query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function, $mt_alias, $mt_materialized, $s3_alias_export, $s3_materialized_export"

query "CREATE TABLE $mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()"
query "CREATE TABLE $s3_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table', format=Parquet, partition_strategy='hive') PARTITION BY year"
Expand Down Expand Up @@ -114,4 +119,40 @@ echo "---- Count rows in big_table and big_destination_max_rows"
query "SELECT COUNT() from $big_table"
query "SELECT COUNT() from $big_destination_max_rows"

query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function, $big_table, $big_destination_max_bytes, $big_destination_max_rows"
echo "---- Test ALIAS columns export"
query "CREATE TABLE $mt_alias (a UInt32, arr Array(UInt64), arr_1 UInt64 ALIAS arr[1]) ENGINE = MergeTree() PARTITION BY a ORDER BY (a, arr[1]) SETTINGS index_granularity = 1"
query "CREATE TABLE $s3_alias_export (a UInt32, arr Array(UInt64), arr_1 UInt64) ENGINE = S3(s3_conn, filename='$s3_alias_export', format=Parquet, partition_strategy='hive') PARTITION BY a"

query "INSERT INTO $mt_alias VALUES (1, [1, 2, 3]), (1, [10, 20, 30])"

alias_part=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$mt_alias' AND partition_id = '1' AND active = 1 ORDER BY name LIMIT 1" | tr -d '\n')

query "ALTER TABLE $mt_alias EXPORT PART '$alias_part' TO TABLE $s3_alias_export SETTINGS allow_experimental_export_merge_tree_part = 1"

sleep 3

echo "---- Verify ALIAS column data in source table (arr_1 computed from arr[1])"
query "SELECT a, arr, arr_1 FROM $mt_alias ORDER BY arr"

echo "---- Verify ALIAS column data exported to S3 (should match source)"
query "SELECT a, arr, arr_1 FROM $s3_alias_export ORDER BY arr"

echo "---- Test MATERIALIZED columns export"
query "CREATE TABLE $mt_materialized (a UInt32, arr Array(UInt64), arr_1 UInt64 MATERIALIZED arr[1]) ENGINE = MergeTree() PARTITION BY a ORDER BY (a, arr_1) SETTINGS index_granularity = 1"
query "CREATE TABLE $s3_materialized_export (a UInt32, arr Array(UInt64), arr_1 UInt64) ENGINE = S3(s3_conn, filename='$s3_materialized_export', format=Parquet, partition_strategy='hive') PARTITION BY a"

query "INSERT INTO $mt_materialized VALUES (1, [1, 2, 3]), (1, [10, 20, 30])"

materialized_part=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$mt_materialized' AND partition_id = '1' AND active = 1 ORDER BY name LIMIT 1" | tr -d '\n')

query "ALTER TABLE $mt_materialized EXPORT PART '$materialized_part' TO TABLE $s3_materialized_export SETTINGS allow_experimental_export_merge_tree_part = 1"

sleep 3

echo "---- Verify MATERIALIZED column data in source table (arr_1 computed from arr[1])"
query "SELECT a, arr, arr_1 FROM $mt_materialized ORDER BY arr"

echo "---- Verify MATERIALIZED column data exported to S3 (should match source)"
query "SELECT a, arr, arr_1 FROM $s3_materialized_export ORDER BY arr"

query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function, $big_table, $big_destination_max_bytes, $big_destination_max_rows, $mt_alias, $mt_materialized, $s3_alias_export, $s3_materialized_export"
Loading