diff --git a/tests/msc4140/delayed_event_test.go b/tests/msc4140/delayed_event_test.go index 42856120..3e59cf20 100644 --- a/tests/msc4140/delayed_event_test.go +++ b/tests/msc4140/delayed_event_test.go @@ -2,6 +2,8 @@ package tests import ( "fmt" + "io" + "math" "net/http" "net/url" "testing" @@ -50,7 +52,7 @@ func TestDelayedEvents(t *testing.T) { user2.MustJoinRoom(t, roomID, nil) t.Run("delayed events are empty on startup", func(t *testing.T) { - matchDelayedEvents(t, user, 0) + matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) }) t.Run("delayed event lookups are authenticated", func(t *testing.T) { @@ -100,14 +102,14 @@ func TestDelayedEvents(t *testing.T) { } countExpected = 0 - matchDelayedEvents(t, user, numEvents) + matchDelayedEvents(t, user, delayedEventsNumberEqual(numEvents)) t.Run("cannot get delayed events of another user", func(t *testing.T) { - matchDelayedEvents(t, user2, 0) + matchDelayedEvents(t, user2, delayedEventsNumberEqual(0)) }) time.Sleep(1 * time.Second) - matchDelayedEvents(t, user, 0) + matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) queryParams := url.Values{} queryParams.Set("dir", "f") queryParams.Set("from", token) @@ -149,7 +151,7 @@ func TestDelayedEvents(t *testing.T) { getDelayQueryParam("900"), ) - matchDelayedEvents(t, user, 1) + matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) res = getDelayedEvents(t, user) must.MatchResponse(t, res, match.HTTPResponse{ @@ -172,7 +174,7 @@ func TestDelayedEvents(t *testing.T) { }) time.Sleep(1 * time.Second) - matchDelayedEvents(t, user, 0) + matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ JSON: []match.JSON{ @@ -244,7 +246,7 @@ func TestDelayedEvents(t *testing.T) { delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id") time.Sleep(1 * time.Second) - matchDelayedEvents(t, user, 1) + matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ StatusCode: 404, @@ -256,7 +258,7 @@ func TestDelayedEvents(t *testing.T) { getPathForUpdateDelayedEvent(delayID, DelayedEventActionCancel), client.WithJSONBody(t, map[string]interface{}{}), ) - matchDelayedEvents(t, user, 0) + matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) time.Sleep(1 * time.Second) res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) @@ -286,7 +288,7 @@ func TestDelayedEvents(t *testing.T) { delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id") time.Sleep(1 * time.Second) - matchDelayedEvents(t, user, 1) + matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ StatusCode: 404, @@ -298,7 +300,7 @@ func TestDelayedEvents(t *testing.T) { getPathForUpdateDelayedEvent(delayID, DelayedEventActionSend), client.WithJSONBody(t, map[string]interface{}{}), ) - matchDelayedEvents(t, user, 0) + matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ JSON: []match.JSON{ @@ -328,7 +330,7 @@ func TestDelayedEvents(t *testing.T) { delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id") time.Sleep(1 * time.Second) - matchDelayedEvents(t, user, 1) + matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ StatusCode: 404, @@ -342,14 +344,14 @@ func TestDelayedEvents(t *testing.T) { ) time.Sleep(1 * time.Second) - matchDelayedEvents(t, user, 1) + matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ StatusCode: 404, }) time.Sleep(1 * time.Second) - matchDelayedEvents(t, user, 0) + matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey)) must.MatchResponse(t, res, match.HTTPResponse{ JSON: []match.JSON{ @@ -376,7 +378,7 @@ func TestDelayedEvents(t *testing.T) { }), getDelayQueryParam("900"), ) - matchDelayedEvents(t, user, 1) + matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) user.MustDo( t, @@ -386,7 +388,7 @@ func TestDelayedEvents(t *testing.T) { setterKey: "manual", }), ) - matchDelayedEvents(t, user, 1) + matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) time.Sleep(1 * time.Second) res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey)) @@ -415,7 +417,7 @@ func TestDelayedEvents(t *testing.T) { }), getDelayQueryParam("900"), ) - matchDelayedEvents(t, user, 1) + matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) setterExpected := "manual" user2.MustDo( @@ -426,7 +428,7 @@ func TestDelayedEvents(t *testing.T) { setterKey: setterExpected, }), ) - matchDelayedEvents(t, user, 0) + matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) time.Sleep(1 * time.Second) res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey)) @@ -446,6 +448,10 @@ func TestDelayedEvents(t *testing.T) { stateKey1 := "1" stateKey2 := "2" + numberOfDelayedEvents := 0 + + // Send an initial delayed event that will be ready to send as soon as the server + // comes back up. user.MustDo( t, "PUT", @@ -453,34 +459,78 @@ func TestDelayedEvents(t *testing.T) { client.WithJSONBody(t, map[string]interface{}{}), getDelayQueryParam("900"), ) - beforeScheduleStateTimestamp2 := time.Now() - user.MustDo( - t, - "PUT", - getPathForState(roomID, eventType, stateKey2), - client.WithJSONBody(t, map[string]interface{}{}), - getDelayQueryParam("9900"), - ) - matchDelayedEvents(t, user, 2) + numberOfDelayedEvents++ + + // Previously, this was naively using a single delayed event with a 10 second delay. + // But because we're stopping and starting servers here, it could take up to + // `deployment.GetConfig().SpawnHSTimeout` (defaults to 30 seconds) for the server + // to start up again so by the time the server is back up, the delayed event may + // have already been sent invalidating our assertions below (which expect some + // delayed events to still be pending and then see one of them be sent after the + // server is back up). + // + // We could account for this by setting the delayed event delay to be longer than + // `deployment.GetConfig().SpawnHSTimeout` but that would make the test suite take + // longer to run in all cases even for homeservers that are quick to restart because + // we have to wait for that large delay. + // + // We instead account for this by scheduling many delayed events at short intervals + // (we chose 10 seconds because that's what the test naively chose before). Then + // whenever the servers comes back, we can just check until it decrements by 1. + // + // We add 1 to the number of intervals to ensure that we have at least one interval + // to check against no matter how things are configured. + numberOf10SecondIntervals := int(math.Ceil(deployment.GetConfig().SpawnHSTimeout.Seconds()/10)) + 1 + for i := 0; i < numberOf10SecondIntervals; i++ { + // +1 as we want to start at 10 seconds and so we don't end up with -100ms delay + // on the first one. + delay := time.Duration(i+1)*10*time.Second - 100*time.Millisecond + + user.MustDo( + t, + "PUT", + // Avoid clashing state keys as that would cancel previous delayed events on the + // same key (start at 2). + getPathForState(roomID, eventType, fmt.Sprintf("%d", i+2)), + client.WithJSONBody(t, map[string]interface{}{}), + getDelayQueryParam(fmt.Sprintf("%d", delay.Milliseconds())), + ) + numberOfDelayedEvents++ + } + // We expect all of the delayed events to be scheduled and not sent yet. + matchDelayedEvents(t, user, delayedEventsNumberEqual(numberOfDelayedEvents)) + // Restart the server and wait until it's back up. deployment.StopServer(t, hsName) + // Wait one second which will cause the first delayed event to be ready to be sent + // when the server is back up. time.Sleep(1 * time.Second) deployment.StartServer(t, hsName) - // The rest of the test assumes the second delayed event (10 second delay) still - // hasn't been sent yet. - if time.Now().Sub(beforeScheduleStateTimestamp2) > 10*time.Second { - t.Fatalf( - "Test took too long to run, cannot guarantee delayed event timings. " + - "More than 10 seconds elapsed between scheduling the delayed event and now when we're about to check for it.", - ) - } - - matchDelayedEvents(t, user, 1) + delayedEventResponse := matchDelayedEvents(t, user, + // We should still see some delayed events left after the restart. + delayedEventsNumberGreaterThan(0), + // We should see at-least one less than we had before the restart (the first + // delayed event should have been sent). Other delayed events may have been sent + // by the time the server actually came back up. + delayedEventsNumberLessThan(numberOfDelayedEvents-1), + ) + // Capture whatever number of delayed events are remaining after the server restart. + remainingDelayedEventCount := countDelayedEvents(t, delayedEventResponse) + // Sanity check that the room state was updated correctly with the delayed events + // that were sent. user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey1)) - time.Sleep(9 * time.Second) - matchDelayedEvents(t, user, 0) + // Wait until we see another delayed event being sent (ensure things resumed and are continuing). + time.Sleep(10 * time.Second) + matchDelayedEvents(t, user, + delayedEventsNumberLessThan(remainingDelayedEventCount), + ) + // Sanity check that the other delayed events also updated the room state correctly. + // + // FIXME: Ideally, we'd check specifically for the last one that was sent but it + // will be a bit of a juggle and fiddly to get this right so for now we just check + // one. user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey2)) }) } @@ -512,25 +562,93 @@ func getDelayedEvents(t *testing.T, user *client.CSAPI) *http.Response { return user.MustDo(t, "GET", getPathForDelayedEvents()) } -// Checks if the number of delayed events match the given number. This will +// countDelayedEvents counts the number of delayed events in the response. Assumes the +// response is well-formed. +func countDelayedEventsInternal(res *http.Response) (int, error) { + body, err := io.ReadAll(res.Body) + if err != nil { + return 0, fmt.Errorf("countDelayedEventsInternal: Failed to read response body: %s", err) + } + + parsedBody := gjson.ParseBytes(body) + return len(parsedBody.Get("delayed_events").Array()), nil +} + +func countDelayedEvents(t *testing.T, res *http.Response) int { + t.Helper() + count, err := countDelayedEventsInternal(res) + if err != nil { + t.Fatalf("countDelayedEvents: %s", err) + } + return count +} + +type delayedEventsCheckOpt func(res *http.Response) error + +// delayedEventsNumberEqual returns a check option that checks if the number of delayed events +// is equal to the given number. +func delayedEventsNumberEqual(wantNumber int) delayedEventsCheckOpt { + return func(res *http.Response) error { + _, err := should.MatchResponse(res, match.HTTPResponse{ + StatusCode: 200, + JSON: []match.JSON{ + match.JSONKeyArrayOfSize("delayed_events", wantNumber), + }, + }) + if err == nil { + return nil + } + return fmt.Errorf("delayedEventsNumberEqual(%d): %s", wantNumber, err) + } +} + +// delayedEventsNumberLessThan returns a check option that checks if the number of delayed events +// is greater than the given number. +func delayedEventsNumberGreaterThan(target int) delayedEventsCheckOpt { + return func(res *http.Response) error { + count, err := countDelayedEventsInternal(res) + if err != nil { + return fmt.Errorf("delayedEventsNumberGreaterThan(%d): %s", target, err) + } + if count > target { + return nil + } + return fmt.Errorf("delayedEventsNumberGreaterThan(%d): got %d", target, count) + } +} + +// delayedEventsNumberLessThan returns a check option that checks if the number of delayed events +// is less than the given number. +func delayedEventsNumberLessThan(target int) delayedEventsCheckOpt { + return func(res *http.Response) error { + count, err := countDelayedEventsInternal(res) + if err != nil { + return fmt.Errorf("delayedEventsNumberLessThan(%d): %s", target, err) + } + if count < target { + return nil + } + return fmt.Errorf("delayedEventsNumberLessThan(%d): got %d", target, count) + } +} + +// matchDelayedEvents will run the given checks on the delayed events response. This will // retry to handle replication lag. -func matchDelayedEvents(t *testing.T, user *client.CSAPI, wantNumber int) { +func matchDelayedEvents(t *testing.T, user *client.CSAPI, checks ...delayedEventsCheckOpt) *http.Response { t.Helper() // We need to retry this as replication can sometimes lag. - user.MustDo(t, "GET", getPathForDelayedEvents(), + return user.MustDo(t, "GET", getPathForDelayedEvents(), client.WithRetryUntil( 500*time.Millisecond, func(res *http.Response) bool { - _, err := should.MatchResponse(res, match.HTTPResponse{ - StatusCode: 200, - JSON: []match.JSON{ - match.JSONKeyArrayOfSize("delayed_events", wantNumber), - }, - }) - if err != nil { - t.Log(err) - return false + for _, check := range checks { + err := check(res) + + if err != nil { + t.Log(err) + return false + } } return true }, @@ -553,5 +671,5 @@ func cleanupDelayedEvents(t *testing.T, user *client.CSAPI) { ) } - matchDelayedEvents(t, user, 0) + matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) }