Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit 7d21d11

Browse files
committed
Make use of the BinaryJoinIterator and remove JoinIterator
1 parent 208bdc0 commit 7d21d11

File tree

3 files changed

+16
-162
lines changed

3 files changed

+16
-162
lines changed

pkg/phlaredb/block_querier.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -988,26 +988,26 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
988988
}
989989

990990
var (
991-
buf [][]parquet.Value
992-
joinIters []query.Iterator
991+
buf [][]parquet.Value
992+
)
993+
994+
pIt := query.NewBinaryJoinIterator(
995+
0,
996+
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
997+
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
993998
)
994999

9951000
if b.meta.Version >= 2 {
996-
joinIters = []query.Iterator{
997-
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
998-
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
1001+
pIt = query.NewBinaryJoinIterator(
1002+
0,
1003+
pIt,
9991004
b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
1000-
}
1005+
)
10011006
buf = make([][]parquet.Value, 3)
10021007
} else {
1003-
joinIters = []query.Iterator{
1004-
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
1005-
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
1006-
}
10071008
buf = make([][]parquet.Value, 2)
10081009
}
10091010

1010-
pIt := query.NewJoinIterator(0, joinIters, nil)
10111011
iters := make([]iter.Iterator[Profile], 0, len(lblsPerRef))
10121012
defer pIt.Close()
10131013

pkg/phlaredb/head_queriers.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,13 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *
4848
start = model.Time(params.Start)
4949
end = model.Time(params.End)
5050
)
51-
pIt := query.NewJoinIterator(
52-
0,
53-
[]query.Iterator{
51+
pIt := query.NewBinaryJoinIterator(0,
52+
query.NewBinaryJoinIterator(
53+
0,
5454
rowIter,
5555
q.rowGroup().columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(start.UnixNano(), end.UnixNano()), "TimeNanos"),
56-
q.rowGroup().columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
57-
},
58-
nil,
56+
),
57+
q.rowGroup().columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
5958
)
6059
defer pIt.Close()
6160

pkg/phlaredb/query/iters.go

Lines changed: 0 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -487,151 +487,6 @@ type JoinIterator struct {
487487
result *IteratorResult
488488
}
489489

490-
var _ Iterator = (*JoinIterator)(nil)
491-
492-
func NewJoinIterator(definitionLevel int, iters []Iterator, pred GroupPredicate) *JoinIterator {
493-
j := JoinIterator{
494-
definitionLevel: definitionLevel,
495-
iters: iters,
496-
peeks: make([]*IteratorResult, len(iters)),
497-
pred: pred,
498-
}
499-
return &j
500-
}
501-
502-
func (j *JoinIterator) At() *IteratorResult {
503-
return j.result
504-
}
505-
506-
func (j *JoinIterator) Next() bool {
507-
// Here is the algorithm for joins: On each pass of the iterators
508-
// we remember which ones are pointing at the earliest rows. If all
509-
// are the lowest (and therefore pointing at the same thing) then
510-
// there is a successful join and return the result.
511-
// Else we progress the iterators and try again.
512-
// There is an optimization here in that we can seek to the highest
513-
// row seen. It's impossible to have joins before that row.
514-
for {
515-
lowestRowNumber := MaxRowNumber()
516-
highestRowNumber := EmptyRowNumber()
517-
lowestIters := make([]int, 0, len(j.iters))
518-
519-
for iterNum := range j.iters {
520-
res := j.peek(iterNum)
521-
522-
if res == nil {
523-
// Iterator exhausted, no more joins possible
524-
j.result = nil
525-
return false
526-
}
527-
528-
c := CompareRowNumbers(j.definitionLevel, res.RowNumber, lowestRowNumber)
529-
switch c {
530-
case -1:
531-
// New lowest, reset
532-
lowestIters = lowestIters[:0]
533-
lowestRowNumber = res.RowNumber
534-
fallthrough
535-
536-
case 0:
537-
// Same, append
538-
lowestIters = append(lowestIters, iterNum)
539-
}
540-
541-
if CompareRowNumbers(j.definitionLevel, res.RowNumber, highestRowNumber) == 1 {
542-
// New high water mark
543-
highestRowNumber = res.RowNumber
544-
}
545-
}
546-
547-
// All iterators pointing at same row?
548-
if len(lowestIters) == len(j.iters) {
549-
// Get the data
550-
result := j.collect(lowestRowNumber)
551-
552-
// Keep group?
553-
if j.pred == nil || j.pred.KeepGroup(result) {
554-
// Yes
555-
j.result = result
556-
return true
557-
}
558-
}
559-
560-
// Skip all iterators to the highest row seen, it's impossible
561-
// to find matches before that.
562-
j.seekAll(RowNumberWithDefinitionLevel{RowNumber: highestRowNumber, DefinitionLevel: j.definitionLevel})
563-
}
564-
}
565-
566-
func (j *JoinIterator) Seek(to RowNumberWithDefinitionLevel) bool {
567-
j.seekAll(to)
568-
return j.Next()
569-
}
570-
571-
func (j *JoinIterator) seekAll(to RowNumberWithDefinitionLevel) {
572-
to.RowNumber = TruncateRowNumber(to)
573-
for iterNum, iter := range j.iters {
574-
if j.peeks[iterNum] == nil || CompareRowNumbers(to.DefinitionLevel, j.peeks[iterNum].RowNumber, to.RowNumber) == -1 {
575-
iteratorResultPoolPut(j.peeks[iterNum])
576-
if iter.Seek(to) {
577-
j.peeks[iterNum] = iter.At()
578-
} else {
579-
j.peeks[iterNum] = nil
580-
}
581-
}
582-
}
583-
}
584-
585-
func (j *JoinIterator) peek(iterNum int) *IteratorResult {
586-
if j.peeks[iterNum] == nil {
587-
if j.iters[iterNum].Next() {
588-
j.peeks[iterNum] = j.iters[iterNum].At()
589-
}
590-
}
591-
return j.peeks[iterNum]
592-
}
593-
594-
// Collect data from the given iterators until they point at
595-
// the next row (according to the configured definition level)
596-
// or are exhausted.
597-
func (j *JoinIterator) collect(rowNumber RowNumber) *IteratorResult {
598-
result := iteratorResultPoolGet()
599-
result.RowNumber = rowNumber
600-
601-
for i := range j.iters {
602-
for j.peeks[i] != nil && CompareRowNumbers(j.definitionLevel, j.peeks[i].RowNumber, rowNumber) == 0 {
603-
604-
result.Append(j.peeks[i])
605-
606-
iteratorResultPoolPut(j.peeks[i])
607-
608-
if j.iters[i].Next() {
609-
j.peeks[i] = j.iters[i].At()
610-
} else {
611-
j.peeks[i] = nil
612-
}
613-
}
614-
}
615-
return result
616-
}
617-
618-
func (j *JoinIterator) Close() error {
619-
var merr multierror.MultiError
620-
for _, i := range j.iters {
621-
merr.Add(i.Close())
622-
}
623-
return merr.Err()
624-
}
625-
626-
func (j *JoinIterator) Err() error {
627-
for _, i := range j.iters {
628-
if err := i.Err(); err != nil {
629-
return err
630-
}
631-
}
632-
return nil
633-
}
634-
635490
// UnionIterator produces all results for all given iterators. When iterators
636491
// align to the same row, based on the configured definition level, then the results
637492
// are returned together. Else the next matching iterator is returned.

0 commit comments

Comments
 (0)