|
| 1 | +package stream |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "testing" |
| 6 | + |
| 7 | + "github.com/apache/kvrocks/tests/gocase/util" |
| 8 | + "github.com/redis/go-redis/v9" |
| 9 | + "github.com/stretchr/testify/require" |
| 10 | +) |
| 11 | + |
| 12 | +func TestXPendingBug(t *testing.T) { |
| 13 | + srv := util.StartServer(t, map[string]string{}) |
| 14 | + defer srv.Close() |
| 15 | + |
| 16 | + ctx := context.Background() |
| 17 | + rdb := srv.NewClient() |
| 18 | + defer func() { require.NoError(t, rdb.Close()) }() |
| 19 | + |
| 20 | + key := "stream_xpending_bug" |
| 21 | + group := "group1" |
| 22 | + |
| 23 | + // Create stream and group |
| 24 | + require.NoError(t, rdb.XGroupCreateMkStream(ctx, key, group, "0").Err()) |
| 25 | + |
| 26 | + // Add messages |
| 27 | + id1, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]interface{}{"k": "v1"}}).Result() |
| 28 | + require.NoError(t, err) |
| 29 | + id2, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: key, Values: map[string]interface{}{"k": "v2"}}).Result() |
| 30 | + require.NoError(t, err) |
| 31 | + |
| 32 | + // Read to make them pending |
| 33 | + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ |
| 34 | + Group: group, |
| 35 | + Consumer: "c1", |
| 36 | + Streams: []string{key, ">"}, |
| 37 | + Count: 10, |
| 38 | + }).Result() |
| 39 | + require.NoError(t, err) |
| 40 | + |
| 41 | + // XPENDING key group - + 10 |
| 42 | + // If bug exists, this becomes XPENDING key group - - 10, which returns nothing (or only 0-0 if it existed) |
| 43 | + res, err := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{ |
| 44 | + Stream: key, |
| 45 | + Group: group, |
| 46 | + Start: "-", |
| 47 | + End: "+", |
| 48 | + Count: 10, |
| 49 | + }).Result() |
| 50 | + require.NoError(t, err) |
| 51 | + |
| 52 | + // Should have 2 pending items |
| 53 | + require.Len(t, res, 2) |
| 54 | + require.Equal(t, id1, res[0].ID) |
| 55 | + require.Equal(t, id2, res[1].ID) |
| 56 | +} |
0 commit comments