Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.

Commit d34f635

Browse files
committed
Fix closing and cancellation
A call to Next/Seek after a close should also yield a context cancelled.
1 parent 706aa29 commit d34f635

File tree

2 files changed

+18
-3
lines changed

2 files changed

+18
-3
lines changed

pkg/phlaredb/query/iters.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,8 @@ type SyncIterator struct {
774774
filter *InstrumentedPredicate
775775

776776
// Status
777+
ctx context.Context
778+
cancel func()
777779
span opentracing.Span
778780
metrics *Metrics
779781
curr RowNumber
@@ -831,13 +833,17 @@ func NewSyncIterator(ctx context.Context, rgs []parquet.RowGroup, column int, co
831833
rn.Skip(rg.NumRows())
832834
}
833835

834-
span, _ := opentracing.StartSpanFromContext(ctx, "syncIterator", opentracing.Tags{
836+
span, ctx := opentracing.StartSpanFromContext(ctx, "syncIterator", opentracing.Tags{
835837
"columnIndex": column,
836838
"column": columnName,
837839
})
838840

841+
ctx, cancel := context.WithCancel(ctx)
842+
839843
return &SyncIterator{
840844
table: strings.ToLower(rgs[0].Schema().Name()) + "s",
845+
ctx: ctx,
846+
cancel: cancel,
841847
metrics: getMetricsFromContext(ctx),
842848
span: span,
843849
column: column,
@@ -1040,6 +1046,14 @@ func (c *SyncIterator) seekPages(seekTo RowNumber, definitionLevel int) (done bo
10401046
// when being called multiple times and throwing away the results like in SeekTo().
10411047
func (c *SyncIterator) next() (RowNumber, *parquet.Value, error) {
10421048
for {
1049+
1050+
// return if context is cancelled
1051+
select {
1052+
case <-c.ctx.Done():
1053+
return EmptyRowNumber(), nil, c.ctx.Err()
1054+
default:
1055+
}
1056+
10431057
if c.currRowGroup == nil {
10441058
rg, min, max := c.popRowGroup()
10451059
if rg == nil {
@@ -1159,6 +1173,7 @@ func (c *SyncIterator) setPage(pg parquet.Page) {
11591173
}
11601174

11611175
func (c *SyncIterator) closeCurrRowGroup() {
1176+
c.cancel()
11621177
if c.currPages != nil {
11631178
c.currPages.Close()
11641179
}

pkg/phlaredb/query/iters_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ func TestColumnIteratorExitEarly(t *testing.T) {
247247
iter := NewSyncIterator(context.TODO(), pf.RowGroups(), idx, "", readSize, nil, "A")
248248
iter.Close()
249249
count, err := readIter(iter)
250-
require.NoError(t, err)
250+
require.ErrorContains(t, err, "context canceled")
251251
require.Equal(t, 0, count)
252252
})
253253

@@ -262,7 +262,7 @@ func TestColumnIteratorExitEarly(t *testing.T) {
262262

263263
// Read again = should close early
264264
res2, err := readIter(iter)
265-
require.NoError(t, err)
265+
require.ErrorContains(t, err, "context canceled")
266266
require.Less(t, readSize+res2, count)
267267
})
268268
}

0 commit comments

Comments
 (0)