Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit 664563b

Browse files
committed
BinaryJoinIterator
1 parent 0884479 commit 664563b

File tree

5 files changed

+57
-239
lines changed

5 files changed

+57
-239
lines changed

pkg/phlaredb/block_querier_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,5 +190,4 @@ func TestBlockCompatability(t *testing.T) {
190190
})
191191

192192
}
193-
194193
}

pkg/phlaredb/profile_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ func (r *rowGroupOnDisk) columnIter(ctx context.Context, columnName string, pred
498498
if !found {
499499
return query.NewErrIterator(fmt.Errorf("column '%s' not found in head row group segment '%s'", columnName, r.file.Name()))
500500
}
501-
return query.NewColumnIterator(ctx, []parquet.RowGroup{r.RowGroup}, column.ColumnIndex, columnName, 1000, predicate, alias)
501+
return query.NewSyncIterator(ctx, []parquet.RowGroup{r.RowGroup}, column.ColumnIndex, columnName, 1000, predicate, alias)
502502
}
503503

504504
type seriesIDRowsRewriter struct {

pkg/phlaredb/query/iters.go

Lines changed: 53 additions & 234 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@ import (
66
"fmt"
77
"io"
88
"math"
9-
"strings"
109
"sync"
1110
"sync/atomic"
1211

1312
"github.com/grafana/dskit/multierror"
1413
"github.com/opentracing/opentracing-go"
15-
"github.com/opentracing/opentracing-go/log"
1614
"github.com/segmentio/parquet-go"
1715

1816
"github.com/grafana/phlare/pkg/iter"
@@ -466,265 +464,86 @@ type ColumnIterator struct {
466464
err error
467465
}
468466

469-
var _ Iterator = (*ColumnIterator)(nil)
470-
471467
type columnIteratorBuffer struct {
472468
rowNumbers []RowNumber
473469
values []parquet.Value
474470
err error
475471
}
476472

477-
func NewColumnIterator(ctx context.Context, rgs []parquet.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string) *ColumnIterator {
478-
c := &ColumnIterator{
479-
metrics: getMetricsFromContext(ctx),
480-
table: strings.ToLower(rgs[0].Schema().Name()) + "s",
481-
rgs: rgs,
482-
col: column,
483-
colName: columnName,
484-
filter: &InstrumentedPredicate{pred: filter},
485-
selectAs: selectAs,
486-
quit: make(chan struct{}),
487-
ch: make(chan *columnIteratorBuffer, 1),
488-
currN: -1,
489-
}
473+
type BinaryJoinIterator struct {
474+
left Iterator
475+
right Iterator
476+
definitionLevel int
490477

491-
go c.iterate(ctx, readSize)
492-
return c
478+
err error
479+
res *IteratorResult
493480
}
494481

495-
func (c *ColumnIterator) iterate(ctx context.Context, readSize int) {
496-
defer close(c.ch)
497-
498-
span, _ := opentracing.StartSpanFromContext(ctx, "columnIterator.iterate", opentracing.Tags{
499-
"columnIndex": c.col,
500-
"column": c.colName,
501-
})
502-
defer func() {
503-
span.SetTag("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load())
504-
span.SetTag("inspectedPages", c.filter.InspectedPages.Load())
505-
span.SetTag("inspectedValues", c.filter.InspectedValues.Load())
506-
span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load())
507-
span.SetTag("keptPages", c.filter.KeptPages.Load())
508-
span.SetTag("keptValues", c.filter.KeptValues.Load())
509-
span.Finish()
510-
}()
511-
512-
rn := EmptyRowNumber()
513-
buffer := make([]parquet.Value, readSize)
514-
515-
checkSkip := func(numRows int64) bool {
516-
seekTo := c.seekTo.Load()
517-
if seekTo == nil {
518-
return false
519-
}
520-
521-
seekToRN := seekTo.(RowNumber)
522-
523-
rnNext := rn
524-
rnNext.Skip(numRows)
482+
var _ Iterator = (*BinaryJoinIterator)(nil)
525483

526-
return CompareRowNumbers(0, rnNext, seekToRN) == -1
484+
func NewBinaryJoinIterator(definitionLevel int, left, right Iterator) *BinaryJoinIterator {
485+
return &BinaryJoinIterator{
486+
left: left,
487+
right: right,
488+
definitionLevel: definitionLevel,
527489
}
490+
}
528491

529-
for _, rg := range c.rgs {
530-
col := rg.ColumnChunks()[c.col]
531-
532-
if checkSkip(rg.NumRows()) {
533-
// Skip column chunk
534-
rn.Skip(rg.NumRows())
535-
continue
492+
func (bj *BinaryJoinIterator) Next() bool {
493+
for {
494+
if !bj.left.Next() {
495+
bj.err = bj.left.Err()
496+
return false
536497
}
498+
resLeft := bj.left.At()
537499

538-
if c.filter != nil {
539-
if !c.filter.KeepColumnChunk(col) {
540-
// Skip column chunk
541-
rn.Skip(rg.NumRows())
542-
continue
543-
}
500+
// now seek the right iterator to the left position
501+
if !bj.right.Seek(RowNumberWithDefinitionLevel{resLeft.RowNumber, bj.definitionLevel}) {
502+
bj.err = bj.right.Err()
503+
return false
544504
}
545-
546-
func(col parquet.ColumnChunk) {
547-
pgs := col.Pages()
548-
defer func() {
549-
if err := pgs.Close(); err != nil {
550-
span.LogKV("closing error", err)
551-
}
552-
}()
553-
for {
554-
pg, err := pgs.ReadPage()
555-
if pg == nil || err == io.EOF {
556-
break
557-
}
558-
c.metrics.pageReadsTotal.WithLabelValues(c.table, c.colName).Add(1)
559-
span.LogFields(
560-
log.String("msg", "reading page"),
561-
log.Int64("page_num_values", pg.NumValues()),
562-
log.Int64("page_size", pg.Size()),
563-
)
564-
if err != nil {
565-
return
566-
}
567-
568-
if checkSkip(pg.NumRows()) {
569-
// Skip page
570-
rn.Skip(pg.NumRows())
571-
continue
572-
}
573-
574-
if c.filter != nil {
575-
if !c.filter.KeepPage(pg) {
576-
// Skip page
577-
rn.Skip(pg.NumRows())
578-
continue
579-
}
580-
}
581-
582-
vr := pg.Values()
583-
for {
584-
count, err := vr.ReadValues(buffer)
585-
if count > 0 {
586-
587-
// Assign row numbers, filter values, and collect the results.
588-
newBuffer := columnIteratorPoolGet(readSize, 0)
589-
590-
for i := 0; i < count; i++ {
591-
592-
v := buffer[i]
593-
594-
// We have to do this for all values (even if the
595-
// value is excluded by the predicate)
596-
rn.Next(v.RepetitionLevel(), v.DefinitionLevel())
597-
598-
if c.filter != nil {
599-
if !c.filter.KeepValue(v) {
600-
continue
601-
}
602-
}
603-
604-
newBuffer.rowNumbers = append(newBuffer.rowNumbers, rn)
605-
newBuffer.values = append(newBuffer.values, v)
606-
}
607-
608-
if len(newBuffer.rowNumbers) > 0 {
609-
select {
610-
case c.ch <- newBuffer:
611-
case <-c.quit:
612-
return
613-
case <-ctx.Done():
614-
return
615-
}
616-
} else {
617-
// All values excluded, we go ahead and immediately
618-
// return the buffer to the pool.
619-
columnIteratorPoolPut(newBuffer)
620-
}
621-
}
622-
623-
// Error checks MUST occur after processing any returned data
624-
// following io.Reader behavior.
625-
if err == io.EOF {
626-
break
627-
}
628-
if err != nil {
629-
c.ch <- &columnIteratorBuffer{err: err}
630-
return
631-
}
632-
}
633-
634-
}
635-
}(col)
636-
}
637-
}
638-
639-
// At returns the current value from the iterator.
640-
func (c *ColumnIterator) At() *IteratorResult {
641-
return c.result
642-
}
643-
644-
// Next returns the next matching value from the iterator.
645-
// Returns nil when finished.
646-
func (c *ColumnIterator) Next() bool {
647-
t, v := c.next()
648-
if t.Valid() {
649-
c.result = c.makeResult(t, v)
650-
return true
651-
}
652-
653-
c.result = nil
654-
return false
655-
}
656-
657-
func (c *ColumnIterator) next() (RowNumber, parquet.Value) {
658-
// Consume current buffer until exhausted
659-
// then read another one from the channel.
660-
if c.curr != nil {
661-
for c.currN++; c.currN < len(c.curr.rowNumbers); {
662-
t := c.curr.rowNumbers[c.currN]
663-
if t.Valid() {
664-
return t, c.curr.values[c.currN]
505+
resRight := bj.right.At()
506+
507+
if cmp := CompareRowNumbers(bj.definitionLevel, resLeft.RowNumber, resRight.RowNumber); cmp == 0 {
508+
// we have a found an element
509+
bj.res = columnIteratorResultPoolGet()
510+
bj.res.RowNumber = resLeft.RowNumber
511+
bj.res.Append(resLeft)
512+
bj.res.Append(resRight)
513+
// columnIteratorResultPoolPut(resLeft)
514+
// columnIteratorResultPoolPut(resRight)
515+
return true
516+
} else if cmp < 0 {
517+
if !bj.left.Seek(RowNumberWithDefinitionLevel{resRight.RowNumber, bj.definitionLevel}) {
518+
bj.err = bj.left.Err()
519+
return false
665520
}
666-
}
667521

668-
// Done with this buffer
669-
columnIteratorPoolPut(c.curr)
670-
c.curr = nil
671-
}
672-
673-
if v, ok := <-c.ch; ok {
674-
if v.err != nil {
675-
c.err = v.err
676-
return EmptyRowNumber(), parquet.Value{}
522+
} else {
523+
// the right value can't be smaller than the left one because we seeked beyond it
524+
panic("not expected to happen")
677525
}
678-
// Got next buffer, guaranteed to have at least 1 element
679-
c.curr = v
680-
c.currN = 0
681-
return c.curr.rowNumbers[0], c.curr.values[0]
682526
}
683-
684-
// Failed to read from the channel, means iterator is exhausted.
685-
return EmptyRowNumber(), parquet.Value{}
686527
}
687528

688-
// SeekTo moves this iterator to the next result that is greater than
689-
// or equal to the given row number (and based on the given definition level)
690-
func (c *ColumnIterator) Seek(to RowNumberWithDefinitionLevel) bool {
691-
var at RowNumber
692-
var v parquet.Value
693-
694-
// Because iteration happens in the background, we signal the row
695-
// to skip to, and then read until we are at the right spot. The
696-
// seek is best-effort and may have no effect if the iteration
697-
// already further ahead, and there may already be older data
698-
// in the buffer.
699-
c.seekTo.Store(to.RowNumber)
700-
for at, v = c.next(); at.Valid() && CompareRowNumbers(to.DefinitionLevel, at, to.RowNumber) < 0; {
701-
at, v = c.next()
702-
}
703-
704-
if at.Valid() {
705-
c.result = c.makeResult(at, v)
706-
return true
707-
}
708-
709-
c.result = nil
710-
return false
529+
func (bj *BinaryJoinIterator) At() *IteratorResult {
530+
return bj.res
711531
}
712532

713-
func (c *ColumnIterator) makeResult(t RowNumber, v parquet.Value) *IteratorResult {
714-
r := columnIteratorResultPoolGet()
715-
r.RowNumber = t
716-
if c.selectAs != "" {
717-
r.AppendValue(c.selectAs, v)
718-
}
719-
return r
533+
func (bj *BinaryJoinIterator) Seek(to RowNumberWithDefinitionLevel) bool {
534+
bj.left.Seek(to)
535+
bj.right.Seek(to)
536+
return bj.Next()
720537
}
721538

722-
func (c *ColumnIterator) Close() error {
723-
close(c.quit)
724-
return nil
539+
func (bj *BinaryJoinIterator) Close() error {
540+
var merr multierror.MultiError
541+
merr.Add(bj.left.Close())
542+
merr.Add(bj.right.Close())
543+
return merr.Err()
725544
}
726545

727-
func (c *ColumnIterator) Err() error {
546+
func (c *BinaryJoinIterator) Err() error {
728547
return c.err
729548
}
730549

pkg/phlaredb/query/iters_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func newTestSet() []parquet.RowGroup {
9393
}
9494
}
9595

96-
func TestColumnIterator(t *testing.T) {
96+
func TestSyncIterator(t *testing.T) {
9797
for _, tc := range []struct {
9898
name string
9999
result []parquet.Value
@@ -121,7 +121,7 @@ func TestColumnIterator(t *testing.T) {
121121
buffer [][]parquet.Value
122122

123123
ctx = context.Background()
124-
i = NewColumnIterator(ctx, tc.rowGroups, 0, "id", 10, nil, "id")
124+
i = NewSyncIterator(ctx, tc.rowGroups, 0, "id", 10, nil, "id")
125125
)
126126
for i.Next() {
127127
require.Nil(t, i.Err())

pkg/phlaredb/query/predicate_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func testPredicate[T any](t *testing.T, tc predicateTestCase[T]) {
7575

7676
p := InstrumentedPredicate{pred: tc.predicate}
7777

78-
i := NewColumnIterator(context.TODO(), r.RowGroups(), 0, "test", 100, &p, "")
78+
i := NewSyncIterator(context.TODO(), r.RowGroups(), 0, "test", 100, &p, "")
7979
for i.Next() {
8080
}
8181

0 commit comments

Comments
 (0)