@@ -21,14 +21,15 @@ import (
2121 "time"
2222
2323 "github.com/alecthomas/kingpin/v2"
24- "github.com/pkg/errors"
2524 config_util "github.com/prometheus/common/config"
2625 "github.com/prometheus/common/model"
26+ "github.com/prometheus/prometheus/model/histogram"
2727 "github.com/prometheus/prometheus/model/labels"
2828 "github.com/prometheus/prometheus/model/value"
29- "github.com/prometheus/prometheus/prompb"
3029 "github.com/prometheus/prometheus/promql/parser"
30+ "github.com/prometheus/prometheus/storage"
3131 "github.com/prometheus/prometheus/storage/remote"
32+ "github.com/prometheus/prometheus/tsdb/chunkenc"
3233 log "github.com/sirupsen/logrus"
3334
3435 "github.com/grafana/mimir/pkg/mimirtool/backfill"
@@ -105,66 +106,59 @@ func (s *setTenantIDTransport) RoundTrip(req *http.Request) (*http.Response, err
105106}
106107
107108type timeSeriesIterator struct {
108- posSeries int
109- posSample int
110- ts []* prompb.TimeSeries
111-
112- // labels slice is reused across samples within a series
113- labels labels.Labels
114- labelsSeriesPos int
109+ ss storage.SeriesSet
110+ it chunkenc.Iterator
111+ ts int64
112+ v float64
113+ h * histogram.Histogram
114+ fh * histogram.FloatHistogram
115115}
116116
117- func newTimeSeriesIterator (ts [] * prompb. TimeSeries ) * timeSeriesIterator {
117+ func newTimeSeriesIterator (seriesSet storage. SeriesSet ) * timeSeriesIterator {
118118 return & timeSeriesIterator {
119- posSeries : 0 ,
120- posSample : - 1 ,
121-
122- // ensure we are not pointing to a valid slice position
123- labelsSeriesPos : - 1 ,
124-
125- ts : ts ,
119+ ss : seriesSet ,
120+ it : chunkenc .NewNopIterator (),
126121 }
127-
128122}
129123
130124func (i * timeSeriesIterator ) Next () error {
131- if i .posSeries >= len (i .ts ) {
132- return io .EOF
133- }
134-
135- i .posSample ++
136-
137- if i .posSample >= len (i .ts [i .posSeries ].Samples ) {
138- i .posSample = - 1
139- i .posSeries ++
140- return i .Next ()
125+ // Find non empty series iterator.
126+ var vt chunkenc.ValueType
127+ for vt = i .it .Next (); vt == chunkenc .ValNone ; vt = i .it .Next () {
128+ if ! i .ss .Next () {
129+ err := i .ss .Err ()
130+ if err != nil {
131+ return err
132+ }
133+ return io .EOF
134+ }
135+ i .it = i .ss .At ().Iterator (i .it )
136+ }
137+ switch vt {
138+ case chunkenc .ValFloat :
139+ i .ts , i .v = i .it .At ()
140+ i .h = nil
141+ i .fh = nil
142+ case chunkenc .ValHistogram :
143+ i .ts , i .h = i .it .AtHistogram (nil )
144+ i .v = i .h .Sum
145+ i .fh = nil
146+ case chunkenc .ValFloatHistogram :
147+ i .ts , i .fh = i .it .AtFloatHistogram (nil )
148+ i .v = i .fh .Sum
149+ i .h = nil
150+ default :
151+ panic ("unreachable" )
141152 }
142-
143153 return nil
144154}
145155
146156func (i * timeSeriesIterator ) Labels () (l labels.Labels ) {
147- // if it's the same label as previously return it
148- if i .posSeries == i .labelsSeriesPos {
149- return i .labels
150- }
151-
152- series := i .ts [i .posSeries ]
153- builder := labels .NewScratchBuilder (len (series .Labels ))
154- for posLabel := range series .Labels {
155- builder .Add (series .Labels [posLabel ].Name , series .Labels [posLabel ].Value )
156- }
157- i .labels = builder .Labels ()
158- i .labelsSeriesPos = i .posSeries
159- return i .labels
157+ return i .ss .At ().Labels ()
160158}
161159
162- func (i * timeSeriesIterator ) Sample () (ts int64 , v float64 ) {
163- series := i .ts [i .posSeries ]
164- sample := series .Samples [i .posSample ]
165-
166- //sample.GetValue()
167- return sample .GetTimestamp (), sample .GetValue ()
160+ func (i * timeSeriesIterator ) Sample () (ts int64 , v float64 , h * histogram.Histogram , fh * histogram.FloatHistogram ) {
161+ return i .ts , i .v , i .h , i .fh
168162}
169163
170164// this is adapted from Go 1.15 for older versions
@@ -231,7 +225,7 @@ func (c *RemoteReadCommand) readClient() (remote.ReadClient, error) {
231225}
232226
233227// prepare() validates the input and prepares the client to query remote read endpoints
234- func (c * RemoteReadCommand ) prepare () (query func (context.Context ) ([] * prompb. TimeSeries , error ), from , to time.Time , err error ) {
228+ func (c * RemoteReadCommand ) prepare () (query func (context.Context ) (storage. SeriesSet , error ), from , to time.Time , err error ) {
235229 from , err = time .Parse (time .RFC3339 , c .from )
236230 if err != nil {
237231 return nil , time.Time {}, time.Time {}, fmt .Errorf ("error parsing from: '%s' value: %w" , c .from , err )
@@ -262,14 +256,14 @@ func (c *RemoteReadCommand) prepare() (query func(context.Context) ([]*prompb.Ti
262256 return nil , time.Time {}, time.Time {}, err
263257 }
264258
265- return func (ctx context.Context ) ([] * prompb. TimeSeries , error ) {
259+ return func (ctx context.Context ) (storage. SeriesSet , error ) {
266260 log .Infof ("Querying time from=%s to=%s with selector=%s" , from .Format (time .RFC3339 ), to .Format (time .RFC3339 ), c .selector )
267- resp , err := readClient .Read (ctx , pbQuery )
261+ resp , err := readClient .Read (ctx , pbQuery , false )
268262 if err != nil {
269263 return nil , err
270264 }
271265
272- return resp . Timeseries , nil
266+ return resp , nil
273267
274268 }, from , to , nil
275269}
@@ -285,23 +279,39 @@ func (c *RemoteReadCommand) dump(_ *kingpin.ParseContext) error {
285279 return err
286280 }
287281
288- iterator := newTimeSeriesIterator (timeseries )
289- for {
290- err := iterator .Next ()
291- if err != nil {
292- if errors .Is (err , io .EOF ) {
293- break
282+ var it chunkenc.Iterator
283+ for timeseries .Next () {
284+ s := timeseries .At ()
285+
286+ l := s .Labels ().String ()
287+ it := s .Iterator (it )
288+ for vt := it .Next (); vt != chunkenc .ValNone ; vt = it .Next () {
289+ switch vt {
290+ case chunkenc .ValFloat :
291+ ts , v := it .At ()
292+ comment := ""
293+ if value .IsStaleNaN (v ) {
294+ comment = " # StaleNaN"
295+ }
296+ fmt .Printf ("%s %g %d%s\n " , l , v , ts , comment )
297+ case chunkenc .ValHistogram :
298+ ts , h := it .AtHistogram (nil )
299+ comment := ""
300+ if value .IsStaleNaN (h .Sum ) {
301+ comment = " # StaleNaN"
302+ }
303+ fmt .Printf ("%s %s %d%s\n " , l , h .String (), ts , comment )
304+ case chunkenc .ValFloatHistogram :
305+ ts , h := it .AtFloatHistogram (nil )
306+ comment := ""
307+ if value .IsStaleNaN (h .Sum ) {
308+ comment = " # StaleNaN"
309+ }
310+ fmt .Printf ("%s %s %d%s\n " , l , h .String (), ts , comment )
311+ default :
312+ panic ("unreachable" )
294313 }
295- return err
296- }
297-
298- l := iterator .Labels ()
299- ts , v := iterator .Sample ()
300- comment := ""
301- if value .IsStaleNaN (v ) {
302- comment = " # StaleNaN"
303314 }
304- fmt .Printf ("%s %g %d%s\n " , l , v , ts , comment )
305315 }
306316
307317 return nil
@@ -331,32 +341,44 @@ func (c *RemoteReadCommand) stats(_ *kingpin.ParseContext) error {
331341 Series : make (map [string ]struct {}),
332342 }
333343
334- iterator := newTimeSeriesIterator (timeseries )
335- for {
336- err := iterator .Next ()
337- if err != nil {
338- if errors .Is (err , io .EOF ) {
339- break
344+ var it chunkenc.Iterator
345+ for timeseries .Next () {
346+ s := timeseries .At ()
347+ it := s .Iterator (it )
348+ for vt := it .Next (); vt != chunkenc .ValNone ; vt = it .Next () {
349+ num .Samples ++
350+ num .Series [s .Labels ().String ()] = struct {}{}
351+
352+ var ts int64
353+ var v float64
354+ switch vt {
355+ case chunkenc .ValFloat :
356+ ts , v = it .At ()
357+ case chunkenc .ValHistogram :
358+ var h * histogram.Histogram
359+ ts , h = it .AtHistogram (nil )
360+ v = h .Sum
361+ case chunkenc .ValFloatHistogram :
362+ var h * histogram.FloatHistogram
363+ ts , h = it .AtFloatHistogram (nil )
364+ v = h .Sum
365+ default :
366+ panic ("unreachable" )
340367 }
341- return err
342- }
343- num .Samples ++
344- num .Series [iterator .Labels ().String ()] = struct {}{}
345-
346- ts , v := iterator .Sample ()
347368
348- if int64 (num .MaxT ) < ts {
349- num .MaxT = model .Time (ts )
350- }
351- if num .MinT == 0 || int64 (num .MinT ) > ts {
352- num .MinT = model .Time (ts )
353- }
369+ if int64 (num .MaxT ) < ts {
370+ num .MaxT = model .Time (ts )
371+ }
372+ if num .MinT == 0 || int64 (num .MinT ) > ts {
373+ num .MinT = model .Time (ts )
374+ }
354375
355- if math .IsNaN (v ) {
356- num .NaNValues ++
357- }
358- if value .IsStaleNaN (v ) {
359- num .StaleNaNValues ++
376+ if math .IsNaN (v ) {
377+ num .NaNValues ++
378+ }
379+ if value .IsStaleNaN (v ) {
380+ num .StaleNaNValues ++
381+ }
360382 }
361383 }
362384
0 commit comments