Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion be/src/runtime_filter/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state, const TNetworkAddress
RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len));

if (len > 0) {
DCHECK(data != nullptr);
if (data == nullptr) {
return Status::InternalError(
"data is nullptr after serialization with len > 0, filter: {}", debug_string());
}
merge_filter_callback->cntl_->request_attachment().append(data, len);
}

Expand Down
13 changes: 11 additions & 2 deletions be/src/runtime_filter/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ class RuntimeFilter {
auto in_filter = request->mutable_in_filter();
RETURN_IF_ERROR(_to_protobuf(in_filter));
} else if (real_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) {
DCHECK(data != nullptr);
if (data == nullptr) {
return Status::InternalError(
"data is nullptr for bloom filter serialization, filter_id: {}",
_wrapper->filter_id());
}
RETURN_IF_ERROR(_to_protobuf(request->mutable_bloom_filter(), (char**)data, len));
} else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER ||
real_runtime_filter_type == RuntimeFilterType::MIN_FILTER ||
Expand All @@ -89,7 +93,12 @@ class RuntimeFilter {
RuntimeFilter(const TRuntimeFilterDesc* desc)
: _has_remote_target(desc->has_remote_targets),
_runtime_filter_type(get_runtime_filter_type(desc)) {
DCHECK_NE(desc->has_remote_targets, desc->has_local_targets);
if (desc->has_remote_targets == desc->has_local_targets) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"has_remote_targets ({}) should not equal has_local_targets ({}), "
"filter_id: {}",
desc->has_remote_targets, desc->has_local_targets, desc->filter_id);
}
}

virtual Status _init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options);
Expand Down
21 changes: 15 additions & 6 deletions be/src/runtime_filter/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
_wrapper->minmax_func()->get_min(), min_literal));
min_pred->add_child(probe_ctx->root());
min_pred->add_child(min_literal);
DCHECK(null_aware == false) << "only min predicate do not support null aware";
if (null_aware) {
return Status::InternalError("only min predicate do not support null aware");
}
container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
min_pred_node, min_pred, get_comparison_ignore_thredhold(), null_aware,
_wrapper->filter_id()));
Expand All @@ -131,7 +133,9 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
_wrapper->minmax_func()->get_max(), max_literal));
max_pred->add_child(probe_ctx->root());
max_pred->add_child(max_literal);
DCHECK(null_aware == false) << "only max predicate do not support null aware";
if (null_aware) {
return Status::InternalError("only max predicate do not support null aware");
}
container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
max_pred_node, max_pred, get_comparison_ignore_thredhold(), null_aware,
_wrapper->filter_id()));
Expand Down Expand Up @@ -200,22 +204,27 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
auto bitmap_pred = vectorized::VBitmapPredicate::create_shared(node);
bitmap_pred->set_filter(_wrapper->bitmap_filter_func());
bitmap_pred->add_child(probe_ctx->root());
DCHECK(null_aware == false) << "bitmap predicate do not support null aware";
if (null_aware) {
return Status::InternalError("bitmap predicate do not support null aware");
}
auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(
node, bitmap_pred, 0, null_aware, _wrapper->filter_id());
container.push_back(wrapper);
break;
}
default:
DCHECK(false);
break;
return Status::InternalError("unknown runtime filter type: {}", int(real_filter_type));
}
return Status::OK();
}

void RuntimeFilterConsumer::collect_realtime_profile(RuntimeProfile* parent_operator_profile) {
std::unique_lock<std::recursive_mutex> l(_rmtx);
DCHECK(parent_operator_profile != nullptr);
if (parent_operator_profile == nullptr) {
LOG(WARNING) << "parent_operator_profile is nullptr in "
"RuntimeFilterConsumer::collect_realtime_profile";
return;
}
int filter_id = -1;
{
// since debug_string will read from RuntimeFilter::_wrapper
Expand Down
11 changes: 9 additions & 2 deletions be/src/runtime_filter/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
filter_id);
}
*local_merge_filters = &iter->second;
DCHECK(iter->second.merger);
if (!iter->second.merger) {
return Status::InternalError("local merge context merger is nullptr for filter_id: {}",
filter_id);
}
return Status::OK();
}

Expand Down Expand Up @@ -354,7 +357,11 @@ Status RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
int64_t merge_time,
PUniqueId query_id,
int execution_timeout) {
DCHECK_GT(cnt_val.targetv2_info.size(), 0);
if (cnt_val.targetv2_info.empty()) {
return Status::InternalError(
"_send_rf_to_target called with empty targetv2_info, filter: {}",
cnt_val.merger ? cnt_val.merger->debug_string() : "unknown");
}

if (cnt_val.done) {
return Status::InternalError("Runtime filter has been sent",
Expand Down
21 changes: 17 additions & 4 deletions be/src/runtime_filter/runtime_filter_producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table
RETURN_IF_ERROR(do_merge());
}
} else {
DCHECK(_is_broadcast_join);
if (!_is_broadcast_join) {
return Status::InternalError(
"Expected broadcast join for non-build hash table path in publish, filter: {}",
debug_string());
}
}

// wrapper may moved to rf merger, release wrapper here to make sure thread safe
Expand Down Expand Up @@ -148,7 +152,10 @@ void RuntimeFilterProducer::latch_dependency(
if (_rf_state != State::WAITING_FOR_SEND_SIZE) {
return;
}
DCHECK(dependency != nullptr);
if (dependency == nullptr) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"dependency is nullptr in latch_dependency, filter: {}", debug_string());
}
_dependency = dependency;
_dependency->add();
}
Expand All @@ -158,7 +165,10 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt
if (_rf_state != State::WAITING_FOR_SEND_SIZE) {
return Status::OK();
}
DCHECK(_dependency != nullptr);
if (_dependency == nullptr) {
return Status::InternalError("_dependency is nullptr in send_size, filter: {}",
debug_string());
}
set_state(State::WAITING_FOR_SYNCED_SIZE);

// two case we need do local merge:
Expand Down Expand Up @@ -239,7 +249,10 @@ void RuntimeFilterProducer::set_synced_size(uint64_t global_size) {
}

_synced_size = global_size;
DCHECK(_dependency != nullptr);
if (_dependency == nullptr) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"_dependency is nullptr in set_synced_size, filter: {}", debug_string());
}
_dependency->sub();
}

Expand Down
27 changes: 22 additions & 5 deletions be/src/runtime_filter/runtime_filter_producer_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ Status RuntimeFilterProducerHelper::_insert(const vectorized::Block* block, size
for (int i = 0; i < _producers.size(); i++) {
auto filter = _producers[i];
int result_column_id = _filter_expr_contexts[i]->get_last_result_column_id();
DCHECK_NE(result_column_id, -1);
if (result_column_id == -1) {
return Status::InternalError(
"runtime filter producer _insert got invalid result_column_id -1");
}
const auto& column = block->get_by_position(result_column_id).column;
RETURN_IF_ERROR(filter->insert(column, start));
}
Expand Down Expand Up @@ -108,12 +111,25 @@ Status RuntimeFilterProducerHelper::build(

for (const auto& filter : _producers) {
if (use_shared_table) {
DCHECK(_is_broadcast_join);
if (!_is_broadcast_join) {
return Status::InternalError(
"use_shared_table is true but _is_broadcast_join is false");
}
if (_should_build_hash_table) {
DCHECK(!runtime_filters.contains(filter->wrapper()->filter_id()));
if (runtime_filters.contains(filter->wrapper()->filter_id())) {
return Status::InternalError(
"runtime_filters already contains filter_id {} when building hash "
"table",
filter->wrapper()->filter_id());
}
runtime_filters[filter->wrapper()->filter_id()] = filter->wrapper();
} else {
DCHECK(runtime_filters.contains(filter->wrapper()->filter_id()));
if (!runtime_filters.contains(filter->wrapper()->filter_id())) {
return Status::InternalError(
"runtime_filters does not contain filter_id {} when not building "
"hash table",
filter->wrapper()->filter_id());
}
filter->set_wrapper(runtime_filters[filter->wrapper()->filter_id()]);
}
}
Expand Down Expand Up @@ -147,8 +163,9 @@ Status RuntimeFilterProducerHelper::skip_process(RuntimeState* state) {

void RuntimeFilterProducerHelper::collect_realtime_profile(
RuntimeProfile* parent_operator_profile) {
DCHECK(parent_operator_profile != nullptr);
if (parent_operator_profile == nullptr) {
LOG(WARNING) << "parent_operator_profile is nullptr in "
"RuntimeFilterProducerHelper::collect_realtime_profile";
return;
}

Expand Down
7 changes: 6 additions & 1 deletion be/src/runtime_filter/runtime_filter_producer_helper_cross.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ class RuntimeFilterProducerHelperCross : public RuntimeFilterProducerHelper {
for (const auto& vexpr_ctx : _filter_expr_contexts) {
int result_column_id = -1;
RETURN_IF_ERROR(vexpr_ctx->execute(block, &result_column_id));
DCHECK_NE(result_column_id, -1) << vexpr_ctx->root()->debug_string();
if (result_column_id == -1) {
return Status::InternalError(
"runtime filter cross join _process_block got invalid result_column_id "
"-1, expr: {}",
vexpr_ctx->root()->debug_string());
}
block->get_by_position(result_column_id).column =
block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
Expand Down
Loading
Loading