Skip to content

Commit 112e62a

Browse files
committed
feat: impl metrics config
1 parent b9ce88f commit 112e62a

File tree

7 files changed

+644
-16
lines changed

7 files changed

+644
-16
lines changed

src/iceberg/metrics_config.cc

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,235 @@
1919

2020
#include "iceberg/metrics_config.h"
2121

22+
#include <charconv>
2223
#include <string>
2324
#include <unordered_map>
2425

2526
#include "iceberg/result.h"
2627
#include "iceberg/schema.h"
28+
#include "iceberg/sort_order.h"
29+
#include "iceberg/table.h"
2730
#include "iceberg/table_properties.h"
31+
#include "iceberg/util/checked_cast.h"
32+
#include "iceberg/util/type_util.h"
2833

2934
namespace iceberg {
3035

36+
namespace {
37+
38+
constexpr std::string_view kNoneName = "none";
39+
constexpr std::string_view kCountsName = "counts";
40+
constexpr std::string_view kFullName = "full";
41+
constexpr std::string_view kTruncatePrefix = "truncate(";
42+
constexpr int32_t kDefaultTruncateLength = 16;
43+
const std::shared_ptr<MetricsMode> kDefaultMetricsMode =
44+
std::make_shared<TruncateMetricsMode>(kDefaultTruncateLength);
45+
46+
std::shared_ptr<MetricsMode> SortedColumnDefaultMode(
47+
std::shared_ptr<MetricsMode> default_mode) {
48+
if (default_mode->kind() == MetricsMode::Kind::kNone ||
49+
default_mode->kind() == MetricsMode::Kind::kCounts) {
50+
return kDefaultMetricsMode;
51+
} else {
52+
return std::move(default_mode);
53+
}
54+
}
55+
56+
int32_t MaxInferredColumns(const TableProperties& properties) {
57+
int32_t max_inferred_columns =
58+
properties.Get(TableProperties::kMetricsMaxInferredColumnDefaults);
59+
if (max_inferred_columns < 0) {
60+
// fallback to default
61+
return TableProperties::kMetricsMaxInferredColumnDefaults.value();
62+
}
63+
return max_inferred_columns;
64+
}
65+
66+
Result<std::shared_ptr<MetricsMode>> ParseMode(const std::string& mode,
67+
std::shared_ptr<MetricsMode> fallback) {
68+
if (auto metrics_mode = MetricsMode::FromString(mode); metrics_mode.has_value()) {
69+
return std::move(metrics_mode.value());
70+
}
71+
return std::move(fallback);
72+
}
73+
74+
} // namespace
75+
76+
const std::shared_ptr<MetricsMode>& MetricsMode::None() {
77+
static const std::shared_ptr<MetricsMode> none = std::make_shared<NoneMetricsMode>();
78+
return none;
79+
}
80+
81+
const std::shared_ptr<MetricsMode>& MetricsMode::Counts() {
82+
static const std::shared_ptr<MetricsMode> counts =
83+
std::make_shared<CountsMetricsMode>();
84+
return counts;
85+
}
86+
87+
const std::shared_ptr<MetricsMode>& MetricsMode::Full() {
88+
static const std::shared_ptr<MetricsMode> full = std::make_shared<FullMetricsMode>();
89+
return full;
90+
}
91+
92+
const std::shared_ptr<MetricsMode>& MetricsMode::Truncate() {
93+
return kDefaultMetricsMode;
94+
}
95+
96+
Result<std::shared_ptr<MetricsMode>> MetricsMode::FromString(const std::string& mode) {
97+
if (StringUtils::EqualsIgnoreCase(mode, kNoneName)) {
98+
return MetricsMode::None();
99+
} else if (StringUtils::EqualsIgnoreCase(mode, kCountsName)) {
100+
return MetricsMode::Counts();
101+
} else if (StringUtils::EqualsIgnoreCase(mode, kFullName)) {
102+
return MetricsMode::Full();
103+
}
104+
105+
if (mode.starts_with(kTruncatePrefix) && mode.ends_with(")")) {
106+
int32_t length;
107+
auto [ptr, ec] = std::from_chars(mode.data() + 9 /* "truncate(" length */,
108+
mode.data() + mode.size() - 1, length);
109+
if (ec != std::errc{}) {
110+
return InvalidArgument("Invalid truncate mode: {}", mode);
111+
}
112+
if (length == kDefaultTruncateLength) {
113+
return kDefaultMetricsMode;
114+
}
115+
return TruncateMetricsMode::Make(length);
116+
}
117+
return InvalidArgument("Invalid metrics mode: {}", mode);
118+
}
119+
120+
std::string NoneMetricsMode::ToString() const { return std::string(kNoneName); }
121+
std::string CountsMetricsMode::ToString() const { return std::string(kCountsName); }
122+
std::string FullMetricsMode::ToString() const { return std::string(kFullName); }
123+
std::string TruncateMetricsMode::ToString() const {
124+
return std::format("truncate({})", length_);
125+
}
126+
127+
Result<std::shared_ptr<MetricsMode>> TruncateMetricsMode::Make(int32_t length) {
128+
ICEBERG_PRECHECK(length > 0, "Truncate length should be positive.");
129+
return std::make_shared<TruncateMetricsMode>(length);
130+
}
131+
132+
MetricsConfig::MetricsConfig(
133+
std::unordered_map<std::string, std::shared_ptr<MetricsMode>> column_modes,
134+
std::shared_ptr<MetricsMode> default_mode)
135+
: column_modes_(std::move(column_modes)), default_mode_(std::move(default_mode)) {}
136+
137+
std::shared_ptr<MetricsConfig> MetricsConfig::Default() {
138+
static auto default_config = std::make_shared<MetricsConfig>(
139+
std::unordered_map<std::string, std::shared_ptr<MetricsMode>>{},
140+
kDefaultMetricsMode);
141+
return default_config;
142+
}
143+
144+
Result<std::shared_ptr<MetricsConfig>> MetricsConfig::Make(std::shared_ptr<Table> table) {
145+
ICEBERG_PRECHECK(table != nullptr, "table cannot be null");
146+
ICEBERG_ASSIGN_OR_RAISE(auto schema, table->schema());
147+
148+
auto sort_order = table->sort_order();
149+
return MakeInternal(
150+
table->properties(), *schema,
151+
sort_order.has_value() ? *sort_order.value() : *SortOrder::Unsorted());
152+
}
153+
154+
Result<std::shared_ptr<MetricsConfig>> MetricsConfig::MakeInternal(
155+
const TableProperties& props, const Schema& schema, const SortOrder& order) {
156+
std::unordered_map<std::string, std::shared_ptr<MetricsMode>> column_modes;
157+
158+
std::shared_ptr<MetricsMode> default_mode = kDefaultMetricsMode;
159+
if (props.configs().contains(TableProperties::kDefaultWriteMetricsMode.key())) {
160+
std::string configured_metrics_mode =
161+
props.Get(TableProperties::kDefaultWriteMetricsMode);
162+
ICEBERG_ASSIGN_OR_RAISE(default_mode,
163+
ParseMode(configured_metrics_mode, kDefaultMetricsMode));
164+
} else {
165+
int32_t max_inferred_columns = MaxInferredColumns(props);
166+
GetProjectedIdsVisitor visitor(true);
167+
ICEBERG_RETURN_UNEXPECTED(
168+
visitor.Visit(internal::checked_cast<const StructType&>(schema)));
169+
int32_t projected_columns = visitor.Finish().size();
170+
if (max_inferred_columns < projected_columns) {
171+
ICEBERG_ASSIGN_OR_RAISE(auto limit_field_ids,
172+
LimitFieldIds(schema, max_inferred_columns));
173+
for (auto id : limit_field_ids) {
174+
ICEBERG_ASSIGN_OR_RAISE(auto column_name, schema.FindColumnNameById(id));
175+
ICEBERG_CHECK(column_name.has_value(), "Field id {} not found in schema", id);
176+
column_modes[std::string(column_name.value())] = kDefaultMetricsMode;
177+
}
178+
// All other columns don't use metrics
179+
default_mode = MetricsMode::None();
180+
}
181+
}
182+
183+
// First set sorted column with sorted column default (can be overridden by user)
184+
auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode);
185+
auto sorted_columns = SortOrder::OrderPreservingSortedColumns(schema, order);
186+
for (const auto& sc : sorted_columns) {
187+
column_modes[std::string(sc)] = sorted_col_default_mode;
188+
}
189+
190+
// Handle user overrides of defaults
191+
for (const auto& prop : props.configs()) {
192+
if (prop.first.starts_with(TableProperties::kMetricModeColumnConfPrefix)) {
193+
std::string column_alias =
194+
prop.first.substr(TableProperties::kMetricModeColumnConfPrefix.size());
195+
ICEBERG_ASSIGN_OR_RAISE(auto mode, ParseMode(prop.second, default_mode));
196+
column_modes[std::move(column_alias)] = mode;
197+
}
198+
}
199+
200+
return std::make_shared<MetricsConfig>(std::move(column_modes),
201+
std::move(default_mode));
202+
}
203+
204+
Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& schema,
205+
int32_t limit) {
206+
class Visitor {
207+
public:
208+
explicit Visitor(int32_t limit) : limit_(limit) {}
209+
210+
Status Visit(const std::shared_ptr<Type>& type) {
211+
if (type->is_nested()) {
212+
return Visit(internal::checked_cast<const NestedType&>(*type));
213+
}
214+
return {};
215+
}
216+
217+
Status Visit(const NestedType& type) {
218+
for (auto& field : type.fields()) {
219+
if (!ShouldContinue()) {
220+
break;
221+
}
222+
if (field.type()->is_primitive()) {
223+
ids_.insert(field.field_id());
224+
}
225+
}
226+
227+
for (auto& field : type.fields()) {
228+
if (ShouldContinue()) {
229+
ICEBERG_RETURN_UNEXPECTED(Visit(field.type()));
230+
}
231+
}
232+
return {};
233+
}
234+
235+
std::unordered_set<int32_t> Finish() { return ids_; }
236+
237+
private:
238+
bool ShouldContinue() { return ids_.size() < limit_; }
239+
240+
private:
241+
std::unordered_set<int32_t> ids_;
242+
int32_t limit_;
243+
};
244+
245+
Visitor visitor(limit);
246+
ICEBERG_RETURN_UNEXPECTED(
247+
visitor.Visit(internal::checked_cast<const NestedType&>(schema)));
248+
return visitor.Finish();
249+
}
250+
31251
Status MetricsConfig::VerifyReferencedColumns(
32252
const std::unordered_map<std::string, std::string>& updates, const Schema& schema) {
33253
for (const auto& [key, value] : updates) {
@@ -47,4 +267,12 @@ Status MetricsConfig::VerifyReferencedColumns(
47267
return {};
48268
}
49269

270+
std::shared_ptr<MetricsMode> MetricsConfig::ColumnMode(
271+
const std::string& column_name) const {
272+
if (auto it = column_modes_.find(column_name); it != column_modes_.end()) {
273+
return it->second;
274+
}
275+
return default_mode_;
276+
}
277+
50278
} // namespace iceberg

src/iceberg/metrics_config.h

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,117 @@
2424

2525
#include <string>
2626
#include <unordered_map>
27+
#include <unordered_set>
2728

2829
#include "iceberg/iceberg_export.h"
2930
#include "iceberg/result.h"
3031
#include "iceberg/type_fwd.h"
32+
#include "iceberg/util/formattable.h"
3133

3234
namespace iceberg {
3335

36+
class ICEBERG_EXPORT MetricsMode : public util::Formattable {
37+
public:
38+
enum class Kind : uint8_t {
39+
kNone,
40+
kCounts,
41+
kTruncate,
42+
kFull,
43+
};
44+
45+
static Result<std::shared_ptr<MetricsMode>> FromString(const std::string& mode);
46+
47+
static const std::shared_ptr<MetricsMode>& None();
48+
static const std::shared_ptr<MetricsMode>& Counts();
49+
static const std::shared_ptr<MetricsMode>& Truncate();
50+
static const std::shared_ptr<MetricsMode>& Full();
51+
52+
/// \brief Return the kind of this metrics mode.
53+
virtual Kind kind() const = 0;
54+
55+
std::string ToString() const override = 0;
56+
};
57+
58+
class ICEBERG_EXPORT NoneMetricsMode : public MetricsMode {
59+
public:
60+
constexpr Kind kind() const override { return Kind::kNone; }
61+
62+
std::string ToString() const override;
63+
};
64+
65+
class ICEBERG_EXPORT CountsMetricsMode : public MetricsMode {
66+
public:
67+
constexpr Kind kind() const override { return Kind::kCounts; }
68+
69+
std::string ToString() const override;
70+
};
71+
72+
class ICEBERG_EXPORT TruncateMetricsMode : public MetricsMode {
73+
public:
74+
explicit TruncateMetricsMode(int32_t length) : length_(length) {}
75+
76+
constexpr Kind kind() const override { return Kind::kTruncate; }
77+
78+
std::string ToString() const override;
79+
80+
static Result<std::shared_ptr<MetricsMode>> Make(int32_t length);
81+
82+
private:
83+
const int32_t length_;
84+
};
85+
86+
class ICEBERG_EXPORT FullMetricsMode : public MetricsMode {
87+
public:
88+
constexpr Kind kind() const override { return Kind::kFull; }
89+
90+
std::string ToString() const override;
91+
};
92+
3493
/// \brief Configuration utilities for table metrics
3594
class ICEBERG_EXPORT MetricsConfig {
3695
public:
96+
MetricsConfig(
97+
std::unordered_map<std::string, std::shared_ptr<MetricsMode>> column_modes,
98+
std::shared_ptr<MetricsMode> default_mode);
99+
100+
/// \brief Get the default metrics config.
101+
static std::shared_ptr<MetricsConfig> Default();
102+
103+
/// \brief Creates a metrics config from a table.
104+
static Result<std::shared_ptr<MetricsConfig>> Make(std::shared_ptr<Table> table);
105+
106+
/// \brief Get `limit` num of primitive field ids from schema
107+
static Result<std::unordered_set<int32_t>> LimitFieldIds(const Schema& schema,
108+
int32_t limit);
109+
37110
/// \brief Verify that all referenced columns are valid
38111
/// \param updates The updates to verify
39112
/// \param schema The schema to verify against
40113
/// \return OK if all referenced columns are valid
41114
static Status VerifyReferencedColumns(
42115
const std::unordered_map<std::string, std::string>& updates, const Schema& schema);
116+
117+
/// \brief Get the metrics mode for a specific column
118+
/// \param column_name The full name of the column
119+
/// \return The metrics mode for the column
120+
std::shared_ptr<MetricsMode> ColumnMode(const std::string& column_name) const;
121+
122+
private:
123+
/// \brief Generate a MetricsConfig for all columns based on overrides, schema, and sort
124+
/// order.
125+
///
126+
/// \param props will be read for metrics overrides (write.metadata.metrics.column.*)
127+
/// and default(write.metadata.metrics.default)
128+
/// \param schema table schema
129+
/// \param order sort order columns, will be promoted to truncate(16)
130+
/// \return metrics configuration
131+
static Result<std::shared_ptr<MetricsConfig>> MakeInternal(const TableProperties& props,
132+
const Schema& schema,
133+
const SortOrder& order);
134+
135+
private:
136+
std::unordered_map<std::string, std::shared_ptr<MetricsMode>> column_modes_;
137+
std::shared_ptr<MetricsMode> default_mode_;
43138
};
44139

45140
} // namespace iceberg

src/iceberg/sort_order.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,18 @@ Result<std::unique_ptr<SortOrder>> SortOrder::Make(int32_t sort_id,
132132
return std::unique_ptr<SortOrder>(new SortOrder(sort_id, std::move(fields)));
133133
}
134134

135+
std::unordered_set<std::string_view> SortOrder::OrderPreservingSortedColumns(
136+
const Schema& schema, const SortOrder& order) {
137+
return order.fields() | std::views::filter([&schema](const SortField& field) {
138+
return field.transform()->PreservesOrder();
139+
}) |
140+
std::views::transform([&schema](const SortField& field) {
141+
return schema.FindColumnNameById(field.source_id())
142+
.value_or(std::nullopt)
143+
.value_or("");
144+
}) |
145+
std::views::filter([](std::string_view name) { return !name.empty(); }) |
146+
std::ranges::to<std::unordered_set<std::string_view>>();
147+
}
148+
135149
} // namespace iceberg

0 commit comments

Comments
 (0)