From 53f4b322f71eadda5204a7d37a1bb153b795d895 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Mon, 5 Jan 2026 10:16:55 +0800 Subject: [PATCH 1/5] feat: extend table scan to support v2 deletes --- example/demo_example.cc | 1 + src/iceberg/table.cc | 2 +- src/iceberg/table_scan.cc | 400 ++++++++++++++++------ src/iceberg/table_scan.h | 347 +++++++++++++------ src/iceberg/test/file_scan_task_test.cc | 6 +- src/iceberg/util/snapshot_util.cc | 31 +- src/iceberg/util/snapshot_util_internal.h | 8 + 7 files changed, 564 insertions(+), 231 deletions(-) diff --git a/example/demo_example.cc b/example/demo_example.cc index ab011feec..6869aa37e 100644 --- a/example/demo_example.cc +++ b/example/demo_example.cc @@ -22,6 +22,7 @@ #include "iceberg/arrow/arrow_file_io.h" #include "iceberg/avro/avro_register.h" #include "iceberg/catalog/memory/in_memory_catalog.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/parquet/parquet_register.h" #include "iceberg/table.h" #include "iceberg/table_scan.h" diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index ee3ce594a..f2e6d3202 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -141,7 +141,7 @@ const std::shared_ptr& Table::metadata() const { return metadata_ const std::shared_ptr& Table::catalog() const { return catalog_; } Result> Table::NewScan() const { - return std::make_unique(metadata_, io_); + return TableScanBuilder::Make(metadata_, io_); } Result> Table::NewTransaction() { diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 700cab1f5..8b26b5f46 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -20,22 +20,40 @@ #include "iceberg/table_scan.h" #include -#include -#include "iceberg/arrow_c_data.h" +#include "iceberg/expression/expression.h" #include "iceberg/file_reader.h" #include "iceberg/manifest/manifest_entry.h" -#include "iceberg/manifest/manifest_list.h" -#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_group.h" +#include "iceberg/result.h" #include "iceberg/schema.h" -#include "iceberg/schema_field.h" #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" #include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/timepoint.h" namespace iceberg { namespace { + +const std::vector kScanColumns = { + "snapshot_id", "file_path", "file_ordinal", "file_format", + "block_size_in_bytes", "file_size_in_bytes", "record_count", "partition", + "key_metadata", "split_offsets", "sort_order_id", +}; + +const std::vector kStatsColumns = { + "value_counts", "null_value_counts", "nan_value_counts", + "lower_bounds", "upper_bounds", "column_sizes", +}; + +const std::vector kScanColumnsWithStats = [] { + auto cols = kScanColumns; + cols.insert(cols.end(), kStatsColumns.begin(), kStatsColumns.end()); + return cols; +}(); + /// \brief Private data structure to hold the Reader and error state struct ReaderStreamPrivateData { std::unique_ptr reader; @@ -135,6 +153,31 @@ Result MakeArrowArrayStream(std::unique_ptr reader) { } // namespace +namespace internal { + +Status TableScanContext::Validate() const { + if (!columns_to_keep_stats.empty() && !return_column_stats) { + return InvalidArgument( + "Cannot select columns to keep stats when column stats are not returned"); + } + if (projected_schema != nullptr && !selected_columns.empty()) { + return InvalidArgument( + "Cannot set projection schema and selected columns at the same time"); + } + if (snapshot_id.has_value() && + (from_snapshot_id.has_value() || to_snapshot_id.has_value())) { + return InvalidArgument("Cannot mix snapshot scan and incremental scan"); + } + if (min_rows_requested.has_value() && min_rows_requested.value() < 0) { + return InvalidArgument("Min rows requested cannot be negative"); + } + return {}; +} + +} // namespace internal + +ScanTask::~ScanTask() = default; + // FileScanTask implementation FileScanTask::FileScanTask(std::shared_ptr data_file, @@ -142,22 +185,10 @@ FileScanTask::FileScanTask(std::shared_ptr data_file, std::shared_ptr residual_filter) : data_file_(std::move(data_file)), delete_files_(std::move(delete_files)), - residual_filter_(std::move(residual_filter)) {} - -const std::shared_ptr& FileScanTask::data_file() const { return data_file_; } - -const std::vector>& FileScanTask::delete_files() const { - return delete_files_; -} - -const std::shared_ptr& FileScanTask::residual_filter() const { - return residual_filter_; + residual_filter_(std::move(residual_filter)) { + ICEBERG_DCHECK(data_file_ != nullptr, "Data file cannot be null for FileScanTask"); } -bool FileScanTask::has_deletes() const { return !delete_files_.empty(); } - -bool FileScanTask::has_residual_filter() const { return residual_filter_ != nullptr; } - int64_t FileScanTask::size_bytes() const { return data_file_->file_size_in_bytes; } int32_t FileScanTask::files_count() const { return 1; } @@ -165,17 +196,16 @@ int32_t FileScanTask::files_count() const { return 1; } int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; } Result FileScanTask::ToArrow( - const std::shared_ptr& io, const std::shared_ptr& projected_schema, - const std::shared_ptr& filter) const { - if (has_deletes()) { + const std::shared_ptr& io, std::shared_ptr projected_schema) const { + if (!delete_files_.empty()) { return NotSupported("Reading data files with delete files is not yet supported."); } const ReaderOptions options{.path = data_file_->file_path, .length = data_file_->file_size_in_bytes, .io = io, - .projection = projected_schema, - .filter = filter}; + .projection = std::move(projected_schema), + .filter = residual_filter_}; ICEBERG_ASSIGN_OR_RAISE(auto reader, ReaderFactoryRegistry::Open(data_file_->file_format, options)); @@ -183,138 +213,286 @@ Result FileScanTask::ToArrow( return MakeArrowArrayStream(std::move(reader)); } +Result> TableScanBuilder::Make( + std::shared_ptr metadata, std::shared_ptr io) { + ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null"); + ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null"); + return std::unique_ptr( + new TableScanBuilder(std::move(metadata), std::move(io))); +} + TableScanBuilder::TableScanBuilder(std::shared_ptr table_metadata, std::shared_ptr file_io) - : file_io_(std::move(file_io)) { - context_.table_metadata = std::move(table_metadata); -} + : metadata_(std::move(table_metadata)), io_(std::move(file_io)) {} -TableScanBuilder& TableScanBuilder::WithColumnNames( - std::vector column_names) { - column_names_ = std::move(column_names); +TableScanBuilder& TableScanBuilder::Option(std::string key, std::string value) { + context_.options[std::move(key)] = std::move(value); return *this; } -TableScanBuilder& TableScanBuilder::WithProjectedSchema(std::shared_ptr schema) { +TableScanBuilder& TableScanBuilder::Project(std::shared_ptr schema) { context_.projected_schema = std::move(schema); return *this; } -TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) { - snapshot_id_ = snapshot_id; +TableScanBuilder& TableScanBuilder::CaseSensitive(bool case_sensitive) { + context_.case_sensitive = case_sensitive; + return *this; +} + +TableScanBuilder& TableScanBuilder::IncludeColumnStats() { + context_.return_column_stats = true; + return *this; +} + +TableScanBuilder& TableScanBuilder::IncludeColumnStats( + const std::vector& requested_columns) { + context_.return_column_stats = true; + context_.columns_to_keep_stats.clear(); + context_.columns_to_keep_stats.reserve(requested_columns.size()); + + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_ref, ResolveSnapshotSchema()); + const auto& schema = schema_ref.get(); + for (const auto& column_name : requested_columns) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field, schema->FindFieldByName(column_name)); + if (field.has_value()) { + context_.columns_to_keep_stats.insert(field.value().get().field_id()); + } + } + + return *this; +} + +TableScanBuilder& TableScanBuilder::Select(const std::vector& column_names) { + context_.selected_columns = column_names; return *this; } -TableScanBuilder& TableScanBuilder::WithFilter(std::shared_ptr filter) { +TableScanBuilder& TableScanBuilder::Filter(std::shared_ptr filter) { context_.filter = std::move(filter); return *this; } -TableScanBuilder& TableScanBuilder::WithCaseSensitive(bool case_sensitive) { - context_.case_sensitive = case_sensitive; +TableScanBuilder& TableScanBuilder::IgnoreResiduals() { + context_.ignore_residuals = true; return *this; } -TableScanBuilder& TableScanBuilder::WithOption(std::string property, std::string value) { - context_.options[std::move(property)] = std::move(value); +TableScanBuilder& TableScanBuilder::MinRowsRequested(int64_t num_rows) { + context_.min_rows_requested = num_rows; return *this; } -TableScanBuilder& TableScanBuilder::WithLimit(std::optional limit) { - context_.limit = limit; +TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(), + "Cannot override snapshot, already set snapshot id={}", + context_.snapshot_id.value()); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(snapshot_id)); + context_.snapshot_id = snapshot_id; return *this; } -Result> TableScanBuilder::Build() { - const auto& table_metadata = context_.table_metadata; - auto snapshot_id = snapshot_id_ ? snapshot_id_ : table_metadata->current_snapshot_id; - if (!snapshot_id) { - return InvalidArgument("No snapshot ID specified for table {}", - table_metadata->table_uuid); +TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) { + if (ref == SnapshotRef::kMainBranch) { + snapshot_schema_ = nullptr; + context_.snapshot_id.reset(); + return *this; + } + + ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(), + "Cannot override ref, already set snapshot id={}", + context_.snapshot_id.value()); + auto iter = metadata_->refs.find(ref); + if (iter != metadata_->refs.end()) { + ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref); + int32_t snapshot_id = iter->second->snapshot_id; + ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(snapshot_id)); + context_.snapshot_id = snapshot_id; + } else { + return AddError(InvalidArgument("Cannot find ref {}", ref)); } - ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_metadata->SnapshotById(*snapshot_id)); - if (!context_.projected_schema) { - const auto& snapshot = context_.snapshot; - auto schema_id = table_metadata->current_schema_id; - ICEBERG_ASSIGN_OR_RAISE(auto schema, table_metadata->SchemaById(schema_id)); + return *this; +} + +TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto time_point_ms, + TimePointMsFromUnixMs(timestamp_millis)); + ICEBERG_BUILDER_ASSIGN_OR_RETURN( + auto snapshot_id, SnapshotUtil::SnapshotIdAsOfTime(*metadata_, time_point_ms)); + return UseSnapshot(snapshot_id); +} - if (column_names_.empty()) { - context_.projected_schema = schema; +TableScanBuilder& TableScanBuilder::FromSnapshotInclusive( + [[maybe_unused]] int64_t from_snapshot_id) { + return AddError(NotImplemented("Incremental scan is not implemented")); +} + +TableScanBuilder& TableScanBuilder::FromSnapshotInclusive( + [[maybe_unused]] const std::string& ref) { + return AddError(NotImplemented("Incremental scan is not implemented")); +} + +TableScanBuilder& TableScanBuilder::FromSnapshotExclusive( + [[maybe_unused]] int64_t from_snapshot_id) { + return AddError(NotImplemented("Incremental scan is not implemented")); +} + +TableScanBuilder& TableScanBuilder::FromSnapshotExclusive( + [[maybe_unused]] const std::string& ref) { + return AddError(NotImplemented("Incremental scan is not implemented")); +} + +TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t to_snapshot_id) { + return AddError(NotImplemented("Incremental scan is not implemented")); +} + +TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const std::string& ref) { + return AddError(NotImplemented("Incremental scan is not implemented")); +} + +TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) { + context_.branch = branch; + return *this; +} + +Result>> +TableScanBuilder::ResolveSnapshotSchema() { + if (snapshot_schema_ == nullptr) { + if (context_.snapshot_id.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, + metadata_->SnapshotById(*context_.snapshot_id)); + int32_t schema_id = snapshot->schema_id.value_or(Schema::kInitialSchemaId); + ICEBERG_ASSIGN_OR_RAISE(snapshot_schema_, metadata_->SchemaById(schema_id)); } else { - // TODO(gty404): collect touched columns from filter expression - std::vector projected_fields; - projected_fields.reserve(column_names_.size()); - for (const auto& column_name : column_names_) { - // TODO(gty404): support case-insensitive column names - auto field_opt = schema->GetFieldByName(column_name); - if (!field_opt) { - return InvalidArgument("Column {} not found in schema '{}'", column_name, - schema_id); - } - projected_fields.emplace_back(field_opt.value()->get()); - } - context_.projected_schema = - std::make_shared(std::move(projected_fields), schema->schema_id()); + ICEBERG_ASSIGN_OR_RAISE(snapshot_schema_, metadata_->Schema()); } - } else if (!column_names_.empty()) { - return InvalidArgument( - "Cannot specify column names when a projected schema is provided"); } + ICEBERG_CHECK(snapshot_schema_ != nullptr, "Snapshot schema is null"); + return snapshot_schema_; +} - return std::make_unique(std::move(context_), file_io_); +bool TableScanBuilder::IsIncrementalScan() const { + return context_.from_snapshot_id.has_value() || context_.to_snapshot_id.has_value(); } -TableScan::TableScan(TableScanContext context, std::shared_ptr file_io) - : context_(std::move(context)), file_io_(std::move(file_io)) {} +Result> TableScanBuilder::Build() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + ICEBERG_RETURN_UNEXPECTED(context_.Validate()); -const std::shared_ptr& TableScan::snapshot() const { return context_.snapshot; } + if (IsIncrementalScan()) { + return NotImplemented("Incremental scan is not yet implemented"); + } -const std::shared_ptr& TableScan::projection() const { - return context_.projected_schema; + ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema()); + return DataTableScan::Make(metadata_, schema.get(), io_, std::move(context_)); } -const TableScanContext& TableScan::context() const { return context_; } +TableScan::TableScan(std::shared_ptr metadata, + std::shared_ptr schema, std::shared_ptr file_io, + internal::TableScanContext context) + : metadata_(std::move(metadata)), + schema_(std::move(schema)), + io_(std::move(file_io)), + context_(std::move(context)) {} + +TableScan::~TableScan() = default; + +const std::shared_ptr& TableScan::metadata() const { return metadata_; } -const std::shared_ptr& TableScan::io() const { return file_io_; } +Result> TableScan::snapshot() const { + auto snapshot_id = context_.snapshot_id ? context_.snapshot_id.value() + : metadata_->current_snapshot_id; + return metadata_->SnapshotById(snapshot_id); +} + +Result> TableScan::schema() const { + return ResolveProjectedSchema(); +} -DataTableScan::DataTableScan(TableScanContext context, std::shared_ptr file_io) - : TableScan(std::move(context), std::move(file_io)) {} +const internal::TableScanContext& TableScan::context() const { return context_; } + +const std::shared_ptr& TableScan::io() const { return io_; } + +const std::shared_ptr& TableScan::filter() const { + const static std::shared_ptr true_expr = True::Instance(); + if (!context_.filter) { + return true_expr; + } + return context_.filter; +} + +bool TableScan::is_case_sensitive() const { return context_.case_sensitive; } + +Result>> +TableScan::ResolveProjectedSchema() const { + if (projected_schema_ != nullptr) { + return projected_schema_; + } + + if (!context_.selected_columns.empty()) { + /// TODO(gangwu): port Java BaseScan.lazyColumnProjection to collect field ids + /// from selected column names and bound references in the filter, and then create + /// projected schema based on the collected field ids. + return NotImplemented( + "Selecting columns by name to create projected schema is not yet implemented"); + } else if (context_.projected_schema != nullptr) { + projected_schema_ = context_.projected_schema; + } else { + projected_schema_ = schema_; + } + + return projected_schema_; +} + +const std::vector& TableScan::ScanColumns() const { + return context_.return_column_stats ? kScanColumnsWithStats : kScanColumns; +} + +Result> DataTableScan::Make( + std::shared_ptr metadata, std::shared_ptr schema, + std::shared_ptr io, internal::TableScanContext context) { + ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null"); + ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null"); + ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null"); + return std::unique_ptr(new DataTableScan( + std::move(metadata), std::move(schema), std::move(io), std::move(context))); +} + +DataTableScan::DataTableScan(std::shared_ptr metadata, + std::shared_ptr schema, std::shared_ptr io, + internal::TableScanContext context) + : TableScan(std::move(metadata), std::move(schema), std::move(io), + std::move(context)) {} Result>> DataTableScan::PlanFiles() const { - ICEBERG_ASSIGN_OR_RAISE( - auto manifest_list_reader, - ManifestListReader::Make(context_.snapshot->manifest_list, file_io_)); - ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files()); - - std::vector> tasks; - ICEBERG_ASSIGN_OR_RAISE(auto partition_spec, context_.table_metadata->PartitionSpec()); - - // Get the table schema - ICEBERG_ASSIGN_OR_RAISE(auto current_schema, context_.table_metadata->Schema()); - - for (const auto& manifest_file : manifest_files) { - ICEBERG_ASSIGN_OR_RAISE( - auto manifest_reader, - ManifestReader::Make(manifest_file, file_io_, current_schema, partition_spec)); - ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries()); - - // TODO(gty404): filter manifests using partition spec and filter expression - - for (auto& manifest_entry : manifests) { - const auto& data_file = manifest_entry.data_file; - switch (data_file->content) { - case DataFile::Content::kData: - tasks.emplace_back(std::make_shared(manifest_entry.data_file)); - break; - case DataFile::Content::kPositionDeletes: - case DataFile::Content::kEqualityDeletes: - return NotSupported("Equality/Position deletes are not supported in data scan"); - } - } + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot()); + if (!snapshot) { + return std::vector>{}; } - return tasks; + TableMetadataCache metadata_cache(metadata_.get()); + ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById()); + + SnapshotCache snapshot_cache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto data_manifests, snapshot_cache.DataManifests(io_)); + ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests, snapshot_cache.DeleteManifests(io_)); + + ICEBERG_ASSIGN_OR_RAISE( + auto manifest_group, + ManifestGroup::Make(io_, schema_, specs_by_id, + {data_manifests.begin(), data_manifests.end()}, + {delete_manifests.begin(), delete_manifests.end()})); + (*manifest_group) + .CaseSensitive(context_.case_sensitive) + .Select(ScanColumns()) + .FilterData(filter()) + .IgnoreDeleted() + .ColumnsToKeepStats(context_.columns_to_keep_stats); + if (context_.ignore_residuals) { + manifest_group->IgnoreResiduals(); + } + return manifest_group->PlanFiles(); } } // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 4f2ddfde2..0df7e9bc5 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -19,19 +19,30 @@ #pragma once +#include +#include +#include #include +#include +#include #include #include "iceberg/arrow_c_data.h" -#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" #include "iceberg/type_fwd.h" +#include "iceberg/util/error_collector.h" namespace iceberg { /// \brief An abstract scan task. class ICEBERG_EXPORT ScanTask { public: - virtual ~ScanTask() = default; + enum class Kind : uint8_t { + kFileScanTask, + }; + + /// \brief The kind of scan task. + virtual Kind kind() const = 0; /// \brief The number of bytes that should be read by this scan task. virtual int64_t size_bytes() const = 0; @@ -41,6 +52,8 @@ class ICEBERG_EXPORT ScanTask { /// \brief The number of rows that should be read by this scan task. virtual int64_t estimated_row_count() const = 0; + + virtual ~ScanTask(); }; /// \brief Task representing a data file and its corresponding delete files. @@ -50,175 +63,293 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { /// /// \param data_file The data file to read. /// \param delete_files Delete files that apply to this data file. - /// \param residual_filter Optional residual filter to apply after reading. + /// \param filter Optional residual filter to apply after reading. explicit FileScanTask(std::shared_ptr data_file, std::vector> delete_files = {}, - std::shared_ptr residual_filter = nullptr); + std::shared_ptr filter = nullptr); /// \brief The data file that should be read by this scan task. - const std::shared_ptr& data_file() const; + const std::shared_ptr& data_file() const { return data_file_; } /// \brief Delete files that apply to this data file. - const std::vector>& delete_files() const; + const std::vector>& delete_files() const { + return delete_files_; + } /// \brief Residual filter to apply after reading. - const std::shared_ptr& residual_filter() const; - - /// \brief Check if any deletes need to be applied. - bool has_deletes() const; - - /// \brief Check if a residual filter needs to be applied. - bool has_residual_filter() const; + const std::shared_ptr& residual_filter() const { return residual_filter_; } + Kind kind() const override { return Kind::kFileScanTask; } int64_t size_bytes() const override; int32_t files_count() const override; int64_t estimated_row_count() const override; - /** - * \brief Returns a C-ABI compatible ArrowArrayStream to read the data for this task. - * - * \param io The FileIO instance for accessing the file data. - * \param projected_schema The projected schema for reading the data. - * \param filter Optional filter expression to apply during reading. - * \return A Result containing an ArrowArrayStream, or an error on failure. - */ + /// TODO(gangwu): move it to iceberg/data/task_scanner.h + /// + /// \brief Returns a C-ABI compatible ArrowArrayStream to read the data for this task. + /// + /// \param io The FileIO instance for accessing the file data. + /// \param projected_schema The projected schema for reading the data. + /// \return A Result containing an ArrowArrayStream, or an error on failure. Result ToArrow(const std::shared_ptr& io, - const std::shared_ptr& projected_schema, - const std::shared_ptr& filter) const; + std::shared_ptr projected_schema) const; private: - /// \brief Data file metadata. std::shared_ptr data_file_; - /// \brief Delete files that apply to this data file. std::vector> delete_files_; - /// \brief Residual filter to apply after reading. std::shared_ptr residual_filter_; }; -/// \brief Scan context holding snapshot and scan-specific metadata. +namespace internal { + +// Internal table scan context used by different scan implementations. struct TableScanContext { - /// \brief Table metadata. - std::shared_ptr table_metadata; - /// \brief Snapshot to scan. - std::shared_ptr snapshot; - /// \brief Projected schema. - std::shared_ptr projected_schema; - /// \brief Filter expression to apply. + std::optional snapshot_id; std::shared_ptr filter; - /// \brief Whether the scan is case-sensitive. - bool case_sensitive = false; - /// \brief Additional options for the scan. + bool ignore_residuals{false}; + bool case_sensitive{true}; + bool return_column_stats{false}; + std::unordered_set columns_to_keep_stats; + std::vector selected_columns; + std::shared_ptr projected_schema; std::unordered_map options; - /// \brief Optional limit on the number of rows to scan. - std::optional limit; + bool from_snapshot_id_inclusive{false}; + std::optional from_snapshot_id; + std::optional to_snapshot_id; + std::string branch{}; + std::optional min_rows_requested; + + // Validate the context parameters to see if they have conflicts. + [[nodiscard]] Status Validate() const; }; +} // namespace internal + /// \brief Builder class for creating TableScan instances. -class ICEBERG_EXPORT TableScanBuilder { +class ICEBERG_EXPORT TableScanBuilder : public ErrorCollector { public: /// \brief Constructs a TableScanBuilder for the given table. - /// \param table_metadata The metadata of the table to scan. - /// \param file_io The FileIO instance for reading manifests and data files. - explicit TableScanBuilder(std::shared_ptr table_metadata, - std::shared_ptr file_io); - - /// \brief Sets the snapshot ID to scan. - /// \param snapshot_id The ID of the snapshot. - /// \return Reference to the builder. - TableScanBuilder& WithSnapshotId(int64_t snapshot_id); - - /// \brief Selects columns to include in the scan. - /// \param column_names A list of column names. If empty, all columns will be selected. - /// \return Reference to the builder. - TableScanBuilder& WithColumnNames(std::vector column_names); - - /// \brief Sets the schema to use for the scan. - /// \param schema The schema to use. - /// \return Reference to the builder. - TableScanBuilder& WithProjectedSchema(std::shared_ptr schema); - - /// \brief Applies a filter expression to the scan. - /// \param filter Filter expression to use. - /// \return Reference to the builder. - TableScanBuilder& WithFilter(std::shared_ptr filter); - - /// \brief Sets whether the scan should be case-sensitive. - /// \param case_sensitive Whether the scan is case-sensitive. - /// /return Reference to the builder. - TableScanBuilder& WithCaseSensitive(bool case_sensitive); - - /// \brief Sets an option for the scan. - /// \param property The name of the option. - /// \param value The value of the option. - /// \return Reference to the builder. - TableScanBuilder& WithOption(std::string property, std::string value); - - /// \brief Sets an optional limit on the number of rows to scan. - /// \param limit Optional limit on the number of rows. - /// \return Reference to the builder. - TableScanBuilder& WithLimit(std::optional limit); + /// \param metadata Current table metadata. + /// \param io FileIO instance for reading manifests files. + static Result> Make( + std::shared_ptr metadata, std::shared_ptr io); + + /// \brief Update property that will override the table's behavior + /// based on the incoming pair. Unknown properties will be ignored. + /// \param key name of the table property to be overridden + /// \param value value to override with + TableScanBuilder& Option(std::string key, std::string value); + + /// \brief Set the projected schema. + /// \param schema a projection schema + TableScanBuilder& Project(std::shared_ptr schema); + + /// \brief If data columns are selected via Select(), controls whether + /// the match to the schema will be done with case sensitivity. Default is true. + /// \param case_sensitive whether the scan is case-sensitive + TableScanBuilder& CaseSensitive(bool case_sensitive); + + /// \brief Request this scan to load the column stats with each data file. + /// + /// Column stats include: value count, null value count, lower bounds, and upper bounds. + TableScanBuilder& IncludeColumnStats(); + + /// \brief Request this scan to load the column stats for the specific columns with each + /// data file. + /// + /// Column stats include: value count, null value count, lower bounds, and upper bounds. + /// + /// \param requested_columns column names for which to keep the stats. + TableScanBuilder& IncludeColumnStats(const std::vector& requested_columns); + + /// \brief Request this scan to read the given data columns. + /// + /// This produces an expected schema that includes all fields that are either selected + /// or used by this scan's filter expression. + /// + /// \param column_names column names from the table's schema + TableScanBuilder& Select(const std::vector& column_names); + + /// \brief Set the expression to filter data. + /// \param filter a filter expression + TableScanBuilder& Filter(std::shared_ptr filter); + + /// \brief Request data filtering to files but not to rows in those files. + TableScanBuilder& IgnoreResiduals(); + + /// \brief Request this can to return at least the given number of rows. + /// + /// This is used as a hint and is entirely optional in order to not have to return more + /// rows than necessary. This may return fewer rows if the scan does not contain that + /// many, or it may return more than requested. + /// + /// \param num_rows The minimum number of rows requested + TableScanBuilder& MinRowsRequested(int64_t num_rows); + + /// \brief Request this scan to use the given snapshot by ID. + /// \param snapshot_id a snapshot ID + /// \note InvalidArgument will be returned if the snapshot cannot be found + TableScanBuilder& UseSnapshot(int64_t snapshot_id); + + /// \brief Request this scan to use the given reference. + /// \param ref reference + /// \note InvalidArgument will be returned if a reference with the given name + /// could not be found + TableScanBuilder& UseRef(const std::string& ref); + + /// \brief Request this scan to use the most recent snapshot as of the given time + /// in milliseconds on the branch in the scan or main if no branch is set. + /// \param timestamp_millis a timestamp in milliseconds. + /// \note InvalidArgument will be returned if the snapshot cannot be found or time + /// travel is attempted on a tag + TableScanBuilder& AsOfTime(int64_t timestamp_millis); + + /// \brief Instructs this scan to look for changes starting from a particular snapshot + /// (inclusive). + /// + /// If the start snapshot is not configured, it defaults to the oldest ancestor of the + /// end snapshot (inclusive). + /// + /// \param from_snapshot_id the start snapshot ID (inclusive) + /// \note InvalidArgument will be returned if the start snapshot is not an ancestor of + /// the end snapshot + TableScanBuilder& FromSnapshotInclusive(int64_t from_snapshot_id); + + /// \brief Instructs this scan to look for changes starting from a particular snapshot + /// (inclusive). + /// + /// If the start snapshot is not configured, it defaults to the oldest ancestor of the + /// end snapshot (inclusive). + /// + /// \param ref the start ref name that points to a particular snapshot ID (inclusive) + /// \note InvalidArgument will be returned if the start snapshot is not an ancestor of + /// the end snapshot + TableScanBuilder& FromSnapshotInclusive(const std::string& ref); + + /// \brief Instructs this scan to look for changes starting from a particular snapshot + /// (exclusive). + /// + /// If the start snapshot is not configured, it defaults to the oldest ancestor of the + /// end snapshot (inclusive). + /// + /// \param from_snapshot_id the start snapshot ID (exclusive) + /// \note InvalidArgument will be returned if the start snapshot is not an ancestor of + /// the end snapshot + TableScanBuilder& FromSnapshotExclusive(int64_t from_snapshot_id); + + /// \brief Instructs this scan to look for changes starting from a particular snapshot + /// (exclusive). + /// + /// If the start snapshot is not configured, it defaults to the oldest ancestor of the + /// end snapshot (inclusive). + /// + /// \param ref the start ref name that points to a particular snapshot ID (exclusive) + /// \note InvalidArgument will be returned if the start snapshot is not an ancestor of + /// the end snapshot + TableScanBuilder& FromSnapshotExclusive(const std::string& ref); + + /// \brief Instructs this scan to look for changes up to a particular snapshot + /// (inclusive). + /// + /// If the end snapshot is not configured, it defaults to the current table snapshot + /// (inclusive). + /// + /// \param to_snapshot_id the end snapshot ID (inclusive) + TableScanBuilder& ToSnapshot(int64_t to_snapshot_id); + + /// \brief Instructs this scan to look for changes up to a particular snapshot ref + /// (inclusive). + /// + /// If the end snapshot is not configured, it defaults to the current table snapshot + /// (inclusive). + /// + /// \param ref the end snapshot Ref (inclusive) + TableScanBuilder& ToSnapshot(const std::string& ref); + + /// \brief Use the specified branch + /// \param branch the branch name + TableScanBuilder& UseBranch(const std::string& branch); /// \brief Builds and returns a TableScan instance. /// \return A Result containing the TableScan or an error. Result> Build(); private: - /// \brief the file I/O instance for reading manifests and data files. - std::shared_ptr file_io_; - /// \brief column names to project in the scan. - std::vector column_names_; - /// \brief snapshot ID to scan, if specified. - std::optional snapshot_id_; - /// \brief Context for the scan, including snapshot, schema, and filter. - TableScanContext context_; + TableScanBuilder(std::shared_ptr metadata, std::shared_ptr io); + + // Return the schema bound to the specified snapshot. + Result>> ResolveSnapshotSchema(); + + // Return whether current configuration indicates an incremental scan mode. + bool IsIncrementalScan() const; + + std::shared_ptr metadata_; + std::shared_ptr io_; + internal::TableScanContext context_; + std::shared_ptr snapshot_schema_; }; /// \brief Represents a configured scan operation on a table. class ICEBERG_EXPORT TableScan { public: - virtual ~TableScan() = default; + virtual ~TableScan(); - /// \brief Constructs a TableScan with the given context and file I/O. - /// \param context Scan context including snapshot, schema, and filter. - /// \param file_io File I/O instance for reading manifests and data files. - TableScan(TableScanContext context, std::shared_ptr file_io); + /// \brief Returns the table metadata being scanned. + const std::shared_ptr& metadata() const; - /// \brief Returns the snapshot being scanned. - /// \return A shared pointer to the snapshot. - const std::shared_ptr& snapshot() const; + /// \brief Returns the snapshot to scan. + Result> snapshot() const; /// \brief Returns the projected schema for the scan. - /// \return A shared pointer to the projected schema. - const std::shared_ptr& projection() const; + Result> schema() const; /// \brief Returns the scan context. - /// \return A reference to the TableScanContext. - const TableScanContext& context() const; + const internal::TableScanContext& context() const; - /// \brief Returns the file I/O instance used for reading manifests and data files. - /// \return A shared pointer to the FileIO instance. + /// \brief Returns the file I/O instance used for reading files. const std::shared_ptr& io() const; + /// \brief Returns this scan's filter expression. + const std::shared_ptr& filter() const; + + /// \brief Returns whether this scan is case-sensitive. + bool is_case_sensitive() const; + /// \brief Plans the scan tasks by resolving manifests and data files. /// \return A Result containing scan tasks or an error. virtual Result>> PlanFiles() const = 0; protected: - /// \brief context for the scan, including snapshot, schema, and filter. - const TableScanContext context_; - /// \brief File I/O instance for reading manifests and data files. - std::shared_ptr file_io_; + TableScan(std::shared_ptr metadata, std::shared_ptr schema, + std::shared_ptr io, internal::TableScanContext context); + + Result>> ResolveProjectedSchema() + const; + + virtual const std::vector& ScanColumns() const; + + const std::shared_ptr metadata_; + const std::shared_ptr schema_; + const std::shared_ptr io_; + const internal::TableScanContext context_; + mutable std::shared_ptr projected_schema_; }; /// \brief A scan that reads data files and applies delete files to filter rows. class ICEBERG_EXPORT DataTableScan : public TableScan { public: - /// \brief Constructs a DataScan with the given context and file I/O. - DataTableScan(TableScanContext context, std::shared_ptr file_io); + /// \brief Constructs a DataTableScan instance. + static Result> Make( + std::shared_ptr metadata, std::shared_ptr schema, + std::shared_ptr io, internal::TableScanContext context); /// \brief Plans the scan tasks by resolving manifests and data files. /// \return A Result containing scan tasks or an error. Result>> PlanFiles() const override; + + protected: + DataTableScan(std::shared_ptr metadata, std::shared_ptr schema, + std::shared_ptr io, internal::TableScanContext context); }; } // namespace iceberg diff --git a/src/iceberg/test/file_scan_task_test.cc b/src/iceberg/test/file_scan_task_test.cc index b72528507..ba0c41b37 100644 --- a/src/iceberg/test/file_scan_task_test.cc +++ b/src/iceberg/test/file_scan_task_test.cc @@ -137,7 +137,7 @@ TEST_F(FileScanTaskTest, ReadFullSchema) { FileScanTask task(data_file); - auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr); + auto stream_result = task.ToArrow(file_io_, projected_schema); ASSERT_THAT(stream_result, IsOk()); auto stream = std::move(stream_result.value()); @@ -156,7 +156,7 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) { FileScanTask task(data_file); - auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr); + auto stream_result = task.ToArrow(file_io_, projected_schema); ASSERT_THAT(stream_result, IsOk()); auto stream = std::move(stream_result.value()); @@ -175,7 +175,7 @@ TEST_F(FileScanTaskTest, ReadEmptyFile) { FileScanTask task(data_file); - auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr); + auto stream_result = task.ToArrow(file_io_, projected_schema); ASSERT_THAT(stream_result, IsOk()); auto stream = std::move(stream_result.value()); diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index e76426f3c..2ec36478a 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -228,9 +228,30 @@ Result> SnapshotUtil::SnapshotAfter(const Table& table snapshot_id); } +namespace { + +std::optional OptionalSnapshotIdAsOfTimeImpl(const TableMetadata& metadata, + TimePointMs timestamp_ms) { + std::optional snapshot_id = std::nullopt; + for (const auto& log_entry : metadata.snapshot_log) { + if (log_entry.timestamp_ms <= timestamp_ms) { + snapshot_id = log_entry.snapshot_id; + } + } + return snapshot_id; +} + +} // namespace + Result SnapshotUtil::SnapshotIdAsOfTime(const Table& table, TimePointMs timestamp_ms) { - auto snapshot_id = OptionalSnapshotIdAsOfTime(table, timestamp_ms); + ICEBERG_PRECHECK(table.metadata() != nullptr, "Table metadata is null"); + return SnapshotIdAsOfTime(*table.metadata(), timestamp_ms); +} + +Result SnapshotUtil::SnapshotIdAsOfTime(const TableMetadata& metadata, + TimePointMs timestamp_ms) { + auto snapshot_id = OptionalSnapshotIdAsOfTimeImpl(metadata, timestamp_ms); ICEBERG_CHECK(snapshot_id.has_value(), "Cannot find a snapshot older than {}", FormatTimePointMs(timestamp_ms)); return snapshot_id.value(); @@ -238,13 +259,7 @@ Result SnapshotUtil::SnapshotIdAsOfTime(const Table& table, std::optional SnapshotUtil::OptionalSnapshotIdAsOfTime( const Table& table, TimePointMs timestamp_ms) { - std::optional snapshot_id = std::nullopt; - for (const auto& log_entry : table.history()) { - if (log_entry.timestamp_ms <= timestamp_ms) { - snapshot_id = log_entry.snapshot_id; - } - } - return snapshot_id; + return OptionalSnapshotIdAsOfTimeImpl(*table.metadata(), timestamp_ms); } Result> SnapshotUtil::SchemaFor(const Table& table, diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index e0d8830ff..2b11168ed 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -170,6 +170,14 @@ class ICEBERG_EXPORT SnapshotUtil { /// \return The snapshot ID static Result SnapshotIdAsOfTime(const Table& table, TimePointMs timestamp_ms); + /// \brief Returns the ID of the most recent snapshot for the table as of the timestamp. + /// + /// \param metadata The table metadata + /// \param timestamp_ms The timestamp in millis since the Unix epoch + /// \return The snapshot ID + static Result SnapshotIdAsOfTime(const TableMetadata& metadata, + TimePointMs timestamp_ms); + /// \brief Returns the ID of the most recent snapshot for the table as of the timestamp, /// or nullopt if not found. /// From f2541ae3a9d9a1da807bd66030eb6bab2f129889 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 6 Jan 2026 16:13:50 +0800 Subject: [PATCH 2/5] Update src/iceberg/table_scan.h Co-authored-by: Guotao Yu --- src/iceberg/table_scan.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 0df7e9bc5..43b5d7d5b 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -177,7 +177,7 @@ class ICEBERG_EXPORT TableScanBuilder : public ErrorCollector { /// \brief Request data filtering to files but not to rows in those files. TableScanBuilder& IgnoreResiduals(); - /// \brief Request this can to return at least the given number of rows. + /// \brief Request this scan to return at least the given number of rows. /// /// This is used as a hint and is entirely optional in order to not have to return more /// rows than necessary. This may return fewer rows if the scan does not contain that From 7e5f0606c61ae8cada2f52d1ba3b0b3d193c1a12 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 6 Jan 2026 16:14:12 +0800 Subject: [PATCH 3/5] Update src/iceberg/table_scan.cc Co-authored-by: Guotao Yu --- src/iceberg/table_scan.cc | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 8b26b5f46..7919c2d89 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -303,14 +303,11 @@ TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) { "Cannot override ref, already set snapshot id={}", context_.snapshot_id.value()); auto iter = metadata_->refs.find(ref); - if (iter != metadata_->refs.end()) { - ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref); - int32_t snapshot_id = iter->second->snapshot_id; - ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(snapshot_id)); - context_.snapshot_id = snapshot_id; - } else { - return AddError(InvalidArgument("Cannot find ref {}", ref)); - } + ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref {}", ref); + ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref); + int32_t snapshot_id = iter->second->snapshot_id; + ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(snapshot_id)); + context_.snapshot_id = snapshot_id; return *this; } From 5b7f14060dff6ebcbd5c784b8ecdd4bc5e4c37c2 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 6 Jan 2026 16:31:19 +0800 Subject: [PATCH 4/5] address feedback --- src/iceberg/table_scan.cc | 42 ++++++++++++--------------------------- src/iceberg/table_scan.h | 38 ++++++++--------------------------- 2 files changed, 21 insertions(+), 59 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 7919c2d89..617918d83 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -156,21 +156,15 @@ Result MakeArrowArrayStream(std::unique_ptr reader) { namespace internal { Status TableScanContext::Validate() const { - if (!columns_to_keep_stats.empty() && !return_column_stats) { - return InvalidArgument( - "Cannot select columns to keep stats when column stats are not returned"); - } - if (projected_schema != nullptr && !selected_columns.empty()) { - return InvalidArgument( - "Cannot set projection schema and selected columns at the same time"); - } - if (snapshot_id.has_value() && - (from_snapshot_id.has_value() || to_snapshot_id.has_value())) { - return InvalidArgument("Cannot mix snapshot scan and incremental scan"); - } - if (min_rows_requested.has_value() && min_rows_requested.value() < 0) { - return InvalidArgument("Min rows requested cannot be negative"); - } + ICEBERG_CHECK(columns_to_keep_stats.empty() || return_column_stats, + "Cannot select columns to keep stats when column stats are not returned"); + ICEBERG_CHECK(projected_schema == nullptr || selected_columns.empty(), + "Cannot set projection schema and selected columns at the same time"); + ICEBERG_CHECK(!snapshot_id.has_value() || + (!from_snapshot_id.has_value() && !to_snapshot_id.has_value()), + "Cannot mix snapshot scan and incremental scan"); + ICEBERG_CHECK(!min_rows_requested.has_value() || min_rows_requested.value() >= 0, + "Min rows requested cannot be negative"); return {}; } @@ -320,23 +314,13 @@ TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) { return UseSnapshot(snapshot_id); } -TableScanBuilder& TableScanBuilder::FromSnapshotInclusive( - [[maybe_unused]] int64_t from_snapshot_id) { - return AddError(NotImplemented("Incremental scan is not implemented")); -} - -TableScanBuilder& TableScanBuilder::FromSnapshotInclusive( - [[maybe_unused]] const std::string& ref) { - return AddError(NotImplemented("Incremental scan is not implemented")); -} - -TableScanBuilder& TableScanBuilder::FromSnapshotExclusive( - [[maybe_unused]] int64_t from_snapshot_id) { +TableScanBuilder& TableScanBuilder::FromSnapshot( + [[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) { return AddError(NotImplemented("Incremental scan is not implemented")); } -TableScanBuilder& TableScanBuilder::FromSnapshotExclusive( - [[maybe_unused]] const std::string& ref) { +TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const std::string& ref, + [[maybe_unused]] bool inclusive) { return AddError(NotImplemented("Incremental scan is not implemented")); } diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 43b5d7d5b..cc26830f4 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -204,49 +204,27 @@ class ICEBERG_EXPORT TableScanBuilder : public ErrorCollector { /// travel is attempted on a tag TableScanBuilder& AsOfTime(int64_t timestamp_millis); - /// \brief Instructs this scan to look for changes starting from a particular snapshot - /// (inclusive). - /// - /// If the start snapshot is not configured, it defaults to the oldest ancestor of the - /// end snapshot (inclusive). - /// - /// \param from_snapshot_id the start snapshot ID (inclusive) - /// \note InvalidArgument will be returned if the start snapshot is not an ancestor of - /// the end snapshot - TableScanBuilder& FromSnapshotInclusive(int64_t from_snapshot_id); - - /// \brief Instructs this scan to look for changes starting from a particular snapshot - /// (inclusive). - /// - /// If the start snapshot is not configured, it defaults to the oldest ancestor of the - /// end snapshot (inclusive). - /// - /// \param ref the start ref name that points to a particular snapshot ID (inclusive) - /// \note InvalidArgument will be returned if the start snapshot is not an ancestor of - /// the end snapshot - TableScanBuilder& FromSnapshotInclusive(const std::string& ref); - - /// \brief Instructs this scan to look for changes starting from a particular snapshot - /// (exclusive). + /// \brief Instructs this scan to look for changes starting from a particular snapshot. /// /// If the start snapshot is not configured, it defaults to the oldest ancestor of the /// end snapshot (inclusive). /// - /// \param from_snapshot_id the start snapshot ID (exclusive) + /// \param from_snapshot_id the start snapshot ID + /// \param inclusive whether the start snapshot is inclusive, default is false /// \note InvalidArgument will be returned if the start snapshot is not an ancestor of /// the end snapshot - TableScanBuilder& FromSnapshotExclusive(int64_t from_snapshot_id); + TableScanBuilder& FromSnapshot(int64_t from_snapshot_id, bool inclusive = false); - /// \brief Instructs this scan to look for changes starting from a particular snapshot - /// (exclusive). + /// \brief Instructs this scan to look for changes starting from a particular snapshot. /// /// If the start snapshot is not configured, it defaults to the oldest ancestor of the /// end snapshot (inclusive). /// - /// \param ref the start ref name that points to a particular snapshot ID (exclusive) + /// \param ref the start ref name that points to a particular snapshot ID + /// \param inclusive whether the start snapshot is inclusive, default is false /// \note InvalidArgument will be returned if the start snapshot is not an ancestor of /// the end snapshot - TableScanBuilder& FromSnapshotExclusive(const std::string& ref); + TableScanBuilder& FromSnapshot(const std::string& ref, bool inclusive = false); /// \brief Instructs this scan to look for changes up to a particular snapshot /// (inclusive). From 0f9afd70ff0f2a41fb992181ff55699c085eb049 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 6 Jan 2026 17:47:15 +0800 Subject: [PATCH 5/5] Update src/iceberg/table_scan.cc Co-authored-by: Zehua Zou --- src/iceberg/table_scan.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 617918d83..124102705 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -464,8 +464,7 @@ Result>> DataTableScan::PlanFiles() co ManifestGroup::Make(io_, schema_, specs_by_id, {data_manifests.begin(), data_manifests.end()}, {delete_manifests.begin(), delete_manifests.end()})); - (*manifest_group) - .CaseSensitive(context_.case_sensitive) + manifest_group->CaseSensitive(context_.case_sensitive) .Select(ScanColumns()) .FilterData(filter()) .IgnoreDeleted()