@@ -201,7 +201,7 @@ func TestColumnIteratorExitEarly(t *testing.T) {
201201 rows = append (rows , T {i })
202202 }
203203
204- pf := createFileWith (t , rows )
204+ pf := createFileWith (t , rows , 2 )
205205 idx , _ := GetColumnIndexByPath (pf , "A" )
206206 readSize := 1000
207207
@@ -299,15 +299,47 @@ func createTestFile(t testing.TB, count int) *parquet.File {
299299 rows = append (rows , T {i })
300300 }
301301
302- pf := createFileWith (t , rows )
302+ pf := createFileWith (t , rows , 2 )
303303 return pf
304304}
305305
306- func createFileWith [T any ](t testing.TB , rows []T ) * parquet.File {
306+ func createProfileLikeFile (t testing.TB , count int ) * parquet.File {
307+ type T struct {
308+ SeriesID uint32
309+ TimeNanos int64
310+ }
311+
312+ // every row group is ordered by serieID and then time nanos
313+ // time is always increasing between rowgroups
314+
315+ rowGroups := 10
316+ series := 8
317+
318+ rows := make ([]T , count )
319+ for i := range rows {
320+
321+ rowsPerRowGroup := count / rowGroups
322+ seriesPerRowGroup := rowsPerRowGroup / series
323+ rowGroupNum := i / rowsPerRowGroup
324+
325+ seriesID := uint32 (i % (count / rowGroups ) / (rowsPerRowGroup / series ))
326+ rows [i ] = T {
327+ SeriesID : seriesID ,
328+ TimeNanos : int64 (i % seriesPerRowGroup + rowGroupNum * seriesPerRowGroup ) * 1000 ,
329+ }
330+
331+ }
332+
333+ return createFileWith [T ](t , rows , rowGroups )
334+
335+ }
336+
337+ func createFileWith [T any ](t testing.TB , rows []T , rowGroups int ) * parquet.File {
307338 f , err := os .CreateTemp (t .TempDir (), "data.parquet" )
308339 require .NoError (t , err )
340+ t .Logf ("Created temp file %s" , f .Name ())
309341
310- half := len (rows ) / 2
342+ half := len (rows ) / rowGroups
311343
312344 w := parquet.NewGenericWriter [T ](f )
313345 _ , err = w .Write (rows [0 :half ])
@@ -328,3 +360,69 @@ func createFileWith[T any](t testing.TB, rows []T) *parquet.File {
328360
329361 return pf
330362}
363+
364+ func TestBinaryJoinIterator (t * testing.T ) {
365+ rowCount := 1600
366+ pf := createProfileLikeFile (t , rowCount )
367+
368+ for _ , tc := range []struct {
369+ name string
370+ seriesPredicate Predicate
371+ timePredicate Predicate
372+ expectedResultCount int
373+ }{
374+ {
375+ name : "no predicate" ,
376+ expectedResultCount : rowCount , // expect everything
377+ },
378+ {
379+ name : "one series ID" ,
380+ expectedResultCount : rowCount / 8 , // expect an eigth of the rows
381+ seriesPredicate : NewMapPredicate (map [int64 ]struct {}{0 : {}}),
382+ },
383+ {
384+ name : "two series IDs" ,
385+ expectedResultCount : rowCount / 8 * 2 , // expect two eigth of the rows
386+ seriesPredicate : NewMapPredicate (map [int64 ]struct {}{0 : {}, 1 : {}}),
387+ },
388+ {
389+ name : "first two time stamps each" ,
390+ expectedResultCount : 2 * 8 , // expect an eigth of the rows
391+ timePredicate : NewIntBetweenPredicate (0 , 1000 ),
392+ },
393+ {
394+ name : "time before results" ,
395+ expectedResultCount : 0 , // expect an eigth of the rows
396+ timePredicate : NewIntBetweenPredicate (- 10 , - 1 ),
397+ },
398+ {
399+ name : "time after results" ,
400+ expectedResultCount : 0 , // expect an eigth of the rows
401+ timePredicate : NewIntBetweenPredicate (200000 , 20001000 ),
402+ seriesPredicate : NewMapPredicate (map [int64 ]struct {}{0 : {}, 1 : {}}),
403+ },
404+ } {
405+ t .Run (tc .name , func (t * testing.T ) {
406+ seriesIt := NewSyncIterator (ctx , pf .RowGroups (), 0 , "SeriesId" , 1000 , tc .seriesPredicate , "SeriesId" )
407+ timeIt := NewSyncIterator (ctx , pf .RowGroups (), 1 , "TimeNanos" , 1000 , tc .timePredicate , "TimeNanos" )
408+
409+ it := NewBinaryJoinIterator (
410+ 0 ,
411+ seriesIt ,
412+ timeIt ,
413+ )
414+ defer func () {
415+ require .NoError (t , it .Close ())
416+ }()
417+
418+ results := 0
419+ for it .Next () {
420+ results ++
421+ }
422+ require .NoError (t , it .Err ())
423+
424+ require .Equal (t , tc .expectedResultCount , results )
425+
426+ })
427+ }
428+ }
0 commit comments