Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit b59403d

Browse files
committed
Move map predicate and base it off the min/max
This predicate still wouldn't work all that well with a fully SeriesID sorted block (so accress row groups). But let's fix that separately
1 parent 80ef214 commit b59403d

File tree

2 files changed

+41
-54
lines changed

2 files changed

+41
-54
lines changed

pkg/phlaredb/block_querier.go

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/samber/lo"
2929
"github.com/segmentio/parquet-go"
3030
"github.com/thanos-io/objstore"
31-
"golang.org/x/exp/constraints"
3231
"golang.org/x/sync/errgroup"
3332
"google.golang.org/grpc/codes"
3433

@@ -462,58 +461,6 @@ func (b *singleBlockQuerier) Bounds() (model.Time, model.Time) {
462461
return b.meta.MinTime, b.meta.MaxTime
463462
}
464463

465-
type mapPredicate[K constraints.Integer, V any] struct {
466-
min K
467-
max K
468-
m map[K]V
469-
}
470-
471-
func newMapPredicate[K constraints.Integer, V any](m map[K]V) query.Predicate {
472-
p := &mapPredicate[K, V]{
473-
m: m,
474-
}
475-
476-
first := true
477-
for k := range m {
478-
if first || p.max < k {
479-
p.max = k
480-
}
481-
if first || p.min > k {
482-
p.min = k
483-
}
484-
first = false
485-
}
486-
487-
return p
488-
}
489-
490-
func (m *mapPredicate[K, V]) KeepColumnChunk(c parquet.ColumnChunk) bool {
491-
if ci := c.ColumnIndex(); ci != nil {
492-
for i := 0; i < ci.NumPages(); i++ {
493-
min := K(ci.MinValue(i).Int64())
494-
max := K(ci.MaxValue(i).Int64())
495-
if m.max >= min && m.min <= max {
496-
return true
497-
}
498-
}
499-
return false
500-
}
501-
502-
return true
503-
}
504-
505-
func (m *mapPredicate[K, V]) KeepPage(page parquet.Page) bool {
506-
if min, max, ok := page.Bounds(); ok {
507-
return m.max >= K(min.Int64()) && m.min <= K(max.Int64())
508-
}
509-
return true
510-
}
511-
512-
func (m *mapPredicate[K, V]) KeepValue(v parquet.Value) bool {
513-
_, exists := m.m[K(v.Int64())]
514-
return exists
515-
}
516-
517464
type labelsInfo struct {
518465
fp model.Fingerprint
519466
lbs phlaremodel.Labels
@@ -993,7 +940,7 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
993940

994941
pIt := query.NewBinaryJoinIterator(
995942
0,
996-
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
943+
b.profiles.columnIter(ctx, "SeriesIndex", query.NewMapPredicate(lblsPerRef), "SeriesIndex"),
997944
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
998945
)
999946

pkg/phlaredb/query/predicates.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
pq "github.com/segmentio/parquet-go"
88
"go.uber.org/atomic"
9+
"golang.org/x/exp/constraints"
910
)
1011

1112
// Predicate is a pushdown predicate that can be applied at
@@ -254,3 +255,42 @@ func (p *InstrumentedPredicate) KeepValue(v pq.Value) bool {
254255

255256
return false
256257
}
258+
259+
type mapPredicate[K constraints.Integer, V any] struct {
260+
inbetweenPred Predicate
261+
m map[K]V
262+
}
263+
264+
func NewMapPredicate[K constraints.Integer, V any](m map[K]V) Predicate {
265+
266+
var min, max int64
267+
268+
first := true
269+
for k := range m {
270+
if first || max < int64(k) {
271+
max = int64(k)
272+
}
273+
if first || min > int64(k) {
274+
min = int64(k)
275+
}
276+
first = false
277+
}
278+
279+
return &mapPredicate[K, V]{
280+
inbetweenPred: NewIntBetweenPredicate(min, max),
281+
m: m,
282+
}
283+
}
284+
285+
func (m *mapPredicate[K, V]) KeepColumnChunk(c pq.ColumnChunk) bool {
286+
return m.inbetweenPred.KeepColumnChunk(c)
287+
}
288+
289+
func (m *mapPredicate[K, V]) KeepPage(page pq.Page) bool {
290+
return m.inbetweenPred.KeepPage(page)
291+
}
292+
293+
func (m *mapPredicate[K, V]) KeepValue(v pq.Value) bool {
294+
_, exists := m.m[K(v.Int64())]
295+
return exists
296+
}

0 commit comments

Comments
 (0)