Skip to content

Commit b6b98db

Browse files
authored
feat: impl bitmap_construct_agg (#19053)
* feat: impl group_bitmap_count & group_bitmap * chore: codefmt * chore: group_bitmap was renamed to bitmap_construct_agg * chore: rename bitmap_union -> bitmap_or_agg, bitmap_intersect -> bitmap_and_agg, add bitmap_xor_agg
1 parent f081da6 commit b6b98db

File tree

3 files changed

+325
-1
lines changed

3 files changed

+325
-1
lines changed

src/query/functions/src/aggregates/aggregate_bitmap.rs

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ use databend_common_exception::ErrorCode;
2626
use databend_common_exception::Result;
2727
use databend_common_expression::types::decimal::DecimalType;
2828
use databend_common_expression::types::i256;
29+
use databend_common_expression::types::number::Number;
2930
use databend_common_expression::types::Bitmap;
3031
use databend_common_expression::types::MutableBitmap;
3132
use databend_common_expression::types::*;
3233
use databend_common_expression::with_decimal_mapped_type;
3334
use databend_common_expression::with_number_mapped_type;
35+
use databend_common_expression::with_unsigned_integer_mapped_type;
3436
use databend_common_expression::AggrStateRegistry;
3537
use databend_common_expression::AggrStateType;
3638
use databend_common_expression::BlockEntry;
@@ -40,6 +42,7 @@ use databend_common_expression::Scalar;
4042
use databend_common_expression::StateSerdeItem;
4143
use databend_common_io::prelude::BinaryWrite;
4244
use databend_common_io::HybridBitmap;
45+
use num_traits::AsPrimitive;
4346

4447
use super::assert_arguments;
4548
use super::assert_params;
@@ -193,6 +196,19 @@ impl BitmapAggState {
193196
Self { rb: None }
194197
}
195198

199+
fn insert(&mut self, value: u64) {
200+
match &mut self.rb {
201+
Some(rb) => {
202+
rb.insert(value);
203+
}
204+
None => {
205+
let mut rb = HybridBitmap::new();
206+
rb.insert(value);
207+
self.rb = Some(rb);
208+
}
209+
}
210+
}
211+
196212
fn add<OP: BitmapOperate>(&mut self, other: HybridBitmap) {
197213
match &mut self.rb {
198214
Some(v) => {
@@ -382,6 +398,197 @@ impl<OP, AGG> fmt::Display for AggregateBitmapFunction<OP, AGG> {
382398
}
383399
}
384400

401+
#[derive(Clone)]
402+
struct AggregateGroupBitmapFunction<NUM, AGG> {
403+
display_name: String,
404+
_p: PhantomData<(NUM, AGG)>,
405+
}
406+
407+
impl<NUM, AGG> AggregateGroupBitmapFunction<NUM, AGG>
408+
where
409+
NUM: Number + AsPrimitive<u64>,
410+
AGG: BitmapAggResult,
411+
{
412+
fn try_create(display_name: &str) -> Result<Arc<dyn AggregateFunction>> {
413+
let func = AggregateGroupBitmapFunction::<NUM, AGG> {
414+
display_name: display_name.to_string(),
415+
_p: PhantomData,
416+
};
417+
Ok(Arc::new(func))
418+
}
419+
}
420+
421+
impl<NUM, AGG> AggregateFunction for AggregateGroupBitmapFunction<NUM, AGG>
422+
where
423+
NUM: Number + AsPrimitive<u64>,
424+
AGG: BitmapAggResult,
425+
{
426+
fn name(&self) -> &str {
427+
"AggregateGroupBitmapFunction"
428+
}
429+
430+
fn return_type(&self) -> Result<DataType> {
431+
AGG::return_type()
432+
}
433+
434+
fn init_state(&self, place: AggrState) {
435+
place.write(BitmapAggState::new);
436+
}
437+
438+
fn register_state(&self, registry: &mut AggrStateRegistry) {
439+
registry.register(AggrStateType::Custom(Layout::new::<BitmapAggState>()));
440+
}
441+
442+
fn accumulate(
443+
&self,
444+
place: AggrState,
445+
entries: ProjectedBlock,
446+
validity: Option<&Bitmap>,
447+
_input_rows: usize,
448+
) -> Result<()> {
449+
let view = entries[0].downcast::<NumberType<NUM>>().unwrap();
450+
if view.len() == 0 {
451+
return Ok(());
452+
}
453+
let state = place.get::<BitmapAggState>();
454+
455+
if let Some(validity) = validity {
456+
if validity.null_count() == view.len() {
457+
return Ok(());
458+
}
459+
for (value, valid) in view.iter().zip(validity.iter()) {
460+
if valid {
461+
state.insert(value.as_());
462+
}
463+
}
464+
} else {
465+
for value in view.iter() {
466+
state.insert(value.as_());
467+
}
468+
}
469+
Ok(())
470+
}
471+
472+
fn accumulate_keys(
473+
&self,
474+
places: &[StateAddr],
475+
loc: &[AggrStateLoc],
476+
entries: ProjectedBlock,
477+
_input_rows: usize,
478+
) -> Result<()> {
479+
let view = entries[0].downcast::<NumberType<NUM>>().unwrap();
480+
for (value, addr) in view.iter().zip(places.iter().cloned()) {
481+
let state = AggrState::new(addr, loc).get::<BitmapAggState>();
482+
state.insert(value.as_());
483+
}
484+
Ok(())
485+
}
486+
487+
fn accumulate_row(&self, place: AggrState, entries: ProjectedBlock, row: usize) -> Result<()> {
488+
let view = entries[0].downcast::<NumberType<NUM>>().unwrap();
489+
if let Some(value) = view.index(row) {
490+
let state = place.get::<BitmapAggState>();
491+
state.insert(value.as_());
492+
}
493+
Ok(())
494+
}
495+
496+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
497+
vec![StateSerdeItem::Binary(None)]
498+
}
499+
500+
fn batch_serialize(
501+
&self,
502+
places: &[StateAddr],
503+
loc: &[AggrStateLoc],
504+
builders: &mut [ColumnBuilder],
505+
) -> Result<()> {
506+
let binary_builder = builders[0].as_binary_mut().unwrap();
507+
for place in places {
508+
let state = AggrState::new(*place, loc).get::<BitmapAggState>();
509+
let flag: u8 = if state.rb.is_some() { 1 } else { 0 };
510+
binary_builder.data.write_scalar(&flag)?;
511+
if let Some(rb) = &state.rb {
512+
rb.serialize_into(&mut binary_builder.data)?;
513+
}
514+
binary_builder.commit_row();
515+
}
516+
Ok(())
517+
}
518+
519+
fn batch_merge(
520+
&self,
521+
places: &[StateAddr],
522+
loc: &[AggrStateLoc],
523+
state: &BlockEntry,
524+
filter: Option<&Bitmap>,
525+
) -> Result<()> {
526+
let view = state.downcast::<UnaryType<BinaryType>>().unwrap();
527+
let iter = places.iter().zip(view.iter());
528+
529+
if let Some(filter) = filter {
530+
for (place, mut data) in iter.zip(filter.iter()).filter_map(|(v, b)| b.then_some(v)) {
531+
let state = AggrState::new(*place, loc).get::<BitmapAggState>();
532+
let flag = data[0];
533+
data.consume(1);
534+
if flag == 1 {
535+
let rb = deserialize_bitmap(data)?;
536+
state.add::<BitmapOrOp>(rb);
537+
}
538+
}
539+
} else {
540+
for (place, mut data) in iter {
541+
let state = AggrState::new(*place, loc).get::<BitmapAggState>();
542+
let flag = data[0];
543+
data.consume(1);
544+
if flag == 1 {
545+
let rb = deserialize_bitmap(data)?;
546+
state.add::<BitmapOrOp>(rb);
547+
}
548+
}
549+
}
550+
Ok(())
551+
}
552+
553+
fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> {
554+
let state = place.get::<BitmapAggState>();
555+
let other = rhs.get::<BitmapAggState>();
556+
557+
if let Some(rb) = other.rb.take() {
558+
state.add::<BitmapOrOp>(rb);
559+
}
560+
Ok(())
561+
}
562+
563+
fn merge_result(
564+
&self,
565+
place: AggrState,
566+
_read_only: bool,
567+
builder: &mut ColumnBuilder,
568+
) -> Result<()> {
569+
AGG::merge_result(place, builder)
570+
}
571+
572+
fn need_manual_drop_state(&self) -> bool {
573+
true
574+
}
575+
576+
unsafe fn drop_state(&self, place: AggrState) {
577+
let state = place.get::<BitmapAggState>();
578+
std::ptr::drop_in_place(state);
579+
}
580+
}
581+
582+
impl<NUM, AGG> fmt::Display for AggregateGroupBitmapFunction<NUM, AGG>
583+
where
584+
NUM: Number + AsPrimitive<u64>,
585+
AGG: BitmapAggResult,
586+
{
587+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
588+
write!(f, "{}", self.display_name)
589+
}
590+
}
591+
385592
struct AggregateBitmapIntersectCountFunction<T>
386593
where T: ValueType
387594
{
@@ -665,6 +872,47 @@ pub fn try_create_aggregate_bitmap_intersect_count_function(
665872
})
666873
}
667874

875+
fn try_create_bitmap_construct_function_impl<AGG: BitmapAggResult>(
876+
display_name: &str,
877+
params: Vec<Scalar>,
878+
argument_types: Vec<DataType>,
879+
) -> Result<Arc<dyn AggregateFunction>> {
880+
assert_params(display_name, params.len(), 0)?;
881+
assert_unary_arguments(display_name, argument_types.len())?;
882+
let arg_type = argument_types[0].remove_nullable();
883+
match &arg_type {
884+
DataType::Number(num_type) => {
885+
let num_type = *num_type;
886+
with_unsigned_integer_mapped_type!(|NUM_TYPE| match num_type {
887+
NumberDataType::NUM_TYPE => {
888+
AggregateGroupBitmapFunction::<NUM_TYPE, AGG>::try_create(display_name)
889+
}
890+
_ => Err(ErrorCode::BadDataValueType(format!(
891+
"{} does not support type '{:?}', expect unsigned integer",
892+
display_name, arg_type
893+
))),
894+
})
895+
}
896+
_ => Err(ErrorCode::BadDataValueType(format!(
897+
"{} does not support type '{:?}', expect unsigned integer",
898+
display_name, arg_type
899+
))),
900+
}
901+
}
902+
903+
pub fn try_create_bitmap_construct_agg_function(
904+
display_name: &str,
905+
params: Vec<Scalar>,
906+
argument_types: Vec<DataType>,
907+
_sort_descs: Vec<AggregateFunctionSortDesc>,
908+
) -> Result<Arc<dyn AggregateFunction>> {
909+
try_create_bitmap_construct_function_impl::<BitmapRawResult>(
910+
display_name,
911+
params,
912+
argument_types,
913+
)
914+
}
915+
668916
fn extract_params<T: AccessType>(
669917
display_name: &str,
670918
val_type: DataType,
@@ -741,6 +989,17 @@ pub fn aggregate_bitmap_xor_count_function_desc() -> AggregateFunctionDescriptio
741989
)
742990
}
743991

992+
pub fn aggregate_bitmap_xor_function_desc() -> AggregateFunctionDescription {
993+
let features = super::AggregateFunctionFeatures {
994+
is_decomposable: true,
995+
..Default::default()
996+
};
997+
AggregateFunctionDescription::creator_with_features(
998+
Box::new(try_create_aggregate_bitmap_function::<BITMAP_XOR, BITMAP_AGG_RAW>),
999+
features,
1000+
)
1001+
}
1002+
7441003
pub fn aggregate_bitmap_union_function_desc() -> AggregateFunctionDescription {
7451004
let features = super::AggregateFunctionFeatures {
7461005
is_decomposable: true,
@@ -773,3 +1032,15 @@ pub fn aggregate_bitmap_intersect_count_function_desc() -> AggregateFunctionDesc
7731032
features,
7741033
)
7751034
}
1035+
1036+
pub fn aggregate_bitmap_construct_agg_function_desc() -> AggregateFunctionDescription {
1037+
let features = super::AggregateFunctionFeatures {
1038+
returns_default_when_only_null: true,
1039+
is_decomposable: true,
1040+
..Default::default()
1041+
};
1042+
AggregateFunctionDescription::creator_with_features(
1043+
Box::new(try_create_bitmap_construct_agg_function),
1044+
features,
1045+
)
1046+
}

src/query/functions/src/aggregates/aggregator.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ use super::aggregate_array_moving::aggregate_array_moving_avg_function_desc;
2121
use super::aggregate_array_moving::aggregate_array_moving_sum_function_desc;
2222
use super::aggregate_avg::aggregate_avg_function_desc;
2323
use super::aggregate_bitmap::aggregate_bitmap_and_count_function_desc;
24+
use super::aggregate_bitmap::aggregate_bitmap_construct_agg_function_desc;
2425
use super::aggregate_bitmap::aggregate_bitmap_intersect_count_function_desc;
2526
use super::aggregate_bitmap::aggregate_bitmap_intersect_function_desc;
2627
use super::aggregate_bitmap::aggregate_bitmap_not_count_function_desc;
2728
use super::aggregate_bitmap::aggregate_bitmap_or_count_function_desc;
2829
use super::aggregate_bitmap::aggregate_bitmap_union_function_desc;
2930
use super::aggregate_bitmap::aggregate_bitmap_xor_count_function_desc;
31+
use super::aggregate_bitmap::aggregate_bitmap_xor_function_desc;
3032
use super::aggregate_boolean::aggregate_boolean_function_desc;
3133
use super::aggregate_covariance::aggregate_covariance_population_desc;
3234
use super::aggregate_covariance::aggregate_covariance_sample_desc;
@@ -141,6 +143,10 @@ impl Aggregators {
141143

142144
factory.register("range_bound", aggregate_range_bound_function_desc());
143145

146+
factory.register_multi_names(
147+
&["bitmap_construct_agg", "group_bitmap"],
148+
aggregate_bitmap_construct_agg_function_desc,
149+
);
144150
factory.register(
145151
"bitmap_and_count",
146152
aggregate_bitmap_and_count_function_desc(),
@@ -149,16 +155,24 @@ impl Aggregators {
149155
"bitmap_not_count",
150156
aggregate_bitmap_not_count_function_desc(),
151157
);
158+
factory.register_multi_names(
159+
&["bitmap_union", "bitmap_or_agg"],
160+
aggregate_bitmap_union_function_desc,
161+
);
152162
factory.register("bitmap_or_count", aggregate_bitmap_or_count_function_desc());
153163
factory.register(
154164
"bitmap_xor_count",
155165
aggregate_bitmap_xor_count_function_desc(),
156166
);
157-
factory.register("bitmap_union", aggregate_bitmap_union_function_desc());
158167
factory.register(
159168
"bitmap_intersect",
160169
aggregate_bitmap_intersect_function_desc(),
161170
);
171+
factory.register_multi_names(
172+
&["bitmap_intersect", "bitmap_and_agg"],
173+
aggregate_bitmap_intersect_function_desc,
174+
);
175+
factory.register("bitmap_xor_agg", aggregate_bitmap_xor_function_desc());
162176
factory.register(
163177
"intersect_count",
164178
aggregate_bitmap_intersect_count_function_desc(),

0 commit comments

Comments
 (0)