11package query
22
33import (
4+ "bytes"
45 "context"
6+ "fmt"
57 "math"
68 "os"
79 "testing"
810
11+ "github.com/prometheus/client_golang/prometheus"
12+ "github.com/prometheus/client_golang/prometheus/testutil"
913 "github.com/segmentio/parquet-go"
1014 "github.com/stretchr/testify/require"
1115)
@@ -339,14 +343,17 @@ func createFileWith[T any](t testing.TB, rows []T, rowGroups int) *parquet.File
339343 require .NoError (t , err )
340344 t .Logf ("Created temp file %s" , f .Name ())
341345
342- half := len (rows ) / rowGroups
346+ perRG := len (rows ) / rowGroups
343347
344348 w := parquet.NewGenericWriter [T ](f )
345- _ , err = w .Write (rows [0 :half ])
346- require .NoError (t , err )
347- require .NoError (t , w .Flush ())
349+ for i := 0 ; i < (rowGroups - 1 ); i ++ {
350+ _ , err = w .Write (rows [0 :perRG ])
351+ require .NoError (t , err )
352+ require .NoError (t , w .Flush ())
353+ rows = rows [perRG :]
354+ }
348355
349- _ , err = w .Write (rows [ half :] )
356+ _ , err = w .Write (rows )
350357 require .NoError (t , err )
351358 require .NoError (t , w .Flush ())
352359
@@ -368,22 +375,30 @@ func TestBinaryJoinIterator(t *testing.T) {
368375 for _ , tc := range []struct {
369376 name string
370377 seriesPredicate Predicate
378+ seriesPageReads int
371379 timePredicate Predicate
380+ timePageReads int
372381 expectedResultCount int
373382 }{
374383 {
375384 name : "no predicate" ,
376385 expectedResultCount : rowCount , // expect everything
386+ seriesPageReads : 10 ,
387+ timePageReads : 10 ,
377388 },
378389 {
379390 name : "one series ID" ,
380- expectedResultCount : rowCount / 8 , // expect an eigth of the rows
391+ expectedResultCount : rowCount / 8 , // expect an eight of the rows
381392 seriesPredicate : NewMapPredicate (map [int64 ]struct {}{0 : {}}),
393+ seriesPageReads : 10 ,
394+ timePageReads : 10 ,
382395 },
383396 {
384397 name : "two series IDs" ,
385- expectedResultCount : rowCount / 8 * 2 , // expect two eigth of the rows
398+ expectedResultCount : rowCount / 8 * 2 , // expect two eights of the rows
386399 seriesPredicate : NewMapPredicate (map [int64 ]struct {}{0 : {}, 1 : {}}),
400+ seriesPageReads : 10 ,
401+ timePageReads : 10 ,
387402 },
388403 {
389404 name : "missing series" ,
@@ -394,20 +409,35 @@ func TestBinaryJoinIterator(t *testing.T) {
394409 name : "first two time stamps each" ,
395410 expectedResultCount : 2 * 8 , // expect two profiles for each series
396411 timePredicate : NewIntBetweenPredicate (0 , 1000 ),
412+ seriesPageReads : 1 ,
413+ timePageReads : 1 ,
397414 },
398415 {
399416 name : "time before results" ,
400417 expectedResultCount : 0 ,
401418 timePredicate : NewIntBetweenPredicate (- 10 , - 1 ),
419+ seriesPageReads : 1 ,
420+ timePageReads : 0 ,
402421 },
403422 {
404423 name : "time after results" ,
405424 expectedResultCount : 0 ,
406425 timePredicate : NewIntBetweenPredicate (200000 , 20001000 ),
407426 seriesPredicate : NewMapPredicate (map [int64 ]struct {}{0 : {}, 1 : {}}),
427+ seriesPageReads : 1 ,
428+ timePageReads : 0 ,
408429 },
409430 } {
410431 t .Run (tc .name , func (t * testing.T ) {
432+ ctx , cancel := context .WithCancel (context .Background ())
433+ defer cancel ()
434+
435+ reg := prometheus .NewRegistry ()
436+ metrics := NewMetrics (reg )
437+ metrics .pageReadsTotal .WithLabelValues ("ts" , "SeriesId" ).Add (0 )
438+ metrics .pageReadsTotal .WithLabelValues ("ts" , "TimeNanos" ).Add (0 )
439+ ctx = AddMetricsToContext (ctx , metrics )
440+
411441 seriesIt := NewSyncIterator (ctx , pf .RowGroups (), 0 , "SeriesId" , 1000 , tc .seriesPredicate , "SeriesId" )
412442 timeIt := NewSyncIterator (ctx , pf .RowGroups (), 1 , "TimeNanos" , 1000 , tc .timePredicate , "TimeNanos" )
413443
@@ -416,18 +446,25 @@ func TestBinaryJoinIterator(t *testing.T) {
416446 seriesIt ,
417447 timeIt ,
418448 )
419- defer func () {
420- require .NoError (t , it .Close ())
421- }()
422449
423450 results := 0
424451 for it .Next () {
425452 results ++
426453 }
427454 require .NoError (t , it .Err ())
428455
456+ require .NoError (t , it .Close ())
457+
429458 require .Equal (t , tc .expectedResultCount , results )
430459
460+ require .NoError (t , testutil .GatherAndCompare (reg , bytes .NewReader ([]byte (fmt .Sprintf (
461+ `
462+ # HELP pyroscopedb_page_reads_total Total number of pages read while querying
463+ # TYPE pyroscopedb_page_reads_total counter
464+ pyroscopedb_page_reads_total{column="SeriesId",table="ts"} %d
465+ pyroscopedb_page_reads_total{column="TimeNanos",table="ts"} %d
466+ ` , tc .seriesPageReads , tc .timePageReads ))), "pyroscopedb_page_reads_total" ))
467+
431468 })
432469 }
433470}
0 commit comments