@@ -25,6 +25,7 @@ import (
2525 e2edb "github.com/cortexproject/cortex/integration/e2e/db"
2626 "github.com/cortexproject/cortex/integration/e2ecortex"
2727 "github.com/cortexproject/cortex/pkg/storage/bucket"
28+ cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
2829 "github.com/cortexproject/cortex/pkg/storage/tsdb"
2930 "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
3031 "github.com/cortexproject/cortex/pkg/util/log"
@@ -180,7 +181,7 @@ func TestParquetFuzz(t *testing.T) {
180181 labels .MustNewMatcher (labels .MatchEqual , "type" , "parquet" ))))
181182}
182183
183- func TestParquetProjectionPushdown (t * testing.T ) {
184+ func TestParquetProjectionPushdownFuzz (t * testing.T ) {
184185 s , err := e2e .NewScenario (networkName )
185186 require .NoError (t , err )
186187 defer s .Close ()
@@ -247,14 +248,12 @@ func TestParquetProjectionPushdown(t *testing.T) {
247248 statusCodes := []string {"200" , "400" , "404" , "500" , "502" }
248249 methods := []string {"GET" , "POST" , "PUT" , "DELETE" }
249250 now := time .Now ()
250- // Make sure query time is old enough to not overlap with ingesters
251- // With query-ingesters-within=2h, queries with maxT < now-2h won't hit ingesters
252- // Using 24h-48h ago ensures no ingester overlap, allowing projection to be enabled
253- start := now .Add (- time .Hour * 48 )
254- end := now .Add (- time .Hour * 24 )
251+ // Make sure query time is old enough to not overlap with ingesters.
252+ start := now .Add (- time .Hour * 72 )
253+ end := now .Add (- time .Hour * 48 )
255254
256255 // Create series with multiple labels
257- for i := 0 ; i < numSeries ; i ++ {
256+ for i := range numSeries {
258257 lbls = append (lbls , labels .FromStrings (
259258 labels .MetricName , "http_requests_total" ,
260259 "job" , "api-server" ,
@@ -276,16 +275,17 @@ func TestParquetProjectionPushdown(t *testing.T) {
276275
277276 storage , err := e2ecortex .NewS3ClientForMinio (minio , flags ["-blocks-storage.s3.bucket-name" ])
278277 require .NoError (t , err )
279- bkt := bucket .NewUserBucketClient ("user-1" , storage .GetBucket (), nil )
278+ bkt := storage .GetBucket ()
279+ userBucket := bucket .NewUserBucketClient ("user-1" , bkt , nil )
280280
281- err = block .Upload (ctx , log .Logger , bkt , filepath .Join (dir , id .String ()), metadata .NoneFunc )
281+ err = block .Upload (ctx , log .Logger , userBucket , filepath .Join (dir , id .String ()), metadata .NoneFunc )
282282 require .NoError (t , err )
283283
284284 // Wait until we convert the blocks to parquet AND bucket index is updated
285- cortex_testutil .Poll (t , 30 * time .Second , true , func () interface {} {
285+ cortex_testutil .Poll (t , 300 * time .Second , true , func () interface {} {
286286 // Check if parquet marker exists
287287 markerFound := false
288- err := bkt .Iter (context .Background (), "" , func (name string ) error {
288+ err := userBucket .Iter (context .Background (), "" , func (name string ) error {
289289 if name == fmt .Sprintf ("parquet-markers/%v-parquet-converter-mark.json" , id .String ()) {
290290 markerFound = true
291291 }
@@ -304,6 +304,7 @@ func TestParquetProjectionPushdown(t *testing.T) {
304304 // Verify the block is in the bucket index with parquet metadata
305305 for _ , b := range idx .Blocks {
306306 if b .ID == id && b .Parquet != nil {
307+ require .True (t , b .Parquet .Version == cortex_parquet .CurrentVersion )
307308 return true
308309 }
309310 }
@@ -364,7 +365,7 @@ func TestParquetProjectionPushdown(t *testing.T) {
364365 // Verify projection worked: series should only have the expected labels
365366 for _ , sample := range vector {
366367 actualLabels := make (map [string ]struct {})
367- for _ , label := range sample .Metric {
368+ for label := range sample .Metric {
368369 actualLabels [string (label )] = struct {}{}
369370 }
370371
0 commit comments