-
Notifications
You must be signed in to change notification settings - Fork 62
Fix TestDelayedEvents/delayed_state_events_are_kept_on_server_restart to account for slow servers (de-flake)
#830
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
621d9d4
800e591
86ee737
5bfef04
6159049
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,8 @@ package tests | |
|
|
||
| import ( | ||
| "fmt" | ||
| "io" | ||
| "math" | ||
| "net/http" | ||
| "net/url" | ||
| "testing" | ||
|
|
@@ -50,7 +52,7 @@ func TestDelayedEvents(t *testing.T) { | |
| user2.MustJoinRoom(t, roomID, nil) | ||
|
|
||
| t.Run("delayed events are empty on startup", func(t *testing.T) { | ||
| matchDelayedEvents(t, user, 0) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) | ||
| }) | ||
|
|
||
| t.Run("delayed event lookups are authenticated", func(t *testing.T) { | ||
|
|
@@ -100,14 +102,14 @@ func TestDelayedEvents(t *testing.T) { | |
| } | ||
|
|
||
| countExpected = 0 | ||
| matchDelayedEvents(t, user, numEvents) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(numEvents)) | ||
|
|
||
| t.Run("cannot get delayed events of another user", func(t *testing.T) { | ||
| matchDelayedEvents(t, user2, 0) | ||
| matchDelayedEvents(t, user2, delayedEventsNumberEqual(0)) | ||
| }) | ||
|
|
||
| time.Sleep(1 * time.Second) | ||
| matchDelayedEvents(t, user, 0) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) | ||
| queryParams := url.Values{} | ||
| queryParams.Set("dir", "f") | ||
| queryParams.Set("from", token) | ||
|
|
@@ -149,7 +151,7 @@ func TestDelayedEvents(t *testing.T) { | |
| getDelayQueryParam("900"), | ||
| ) | ||
|
|
||
| matchDelayedEvents(t, user, 1) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) | ||
|
|
||
| res = getDelayedEvents(t, user) | ||
| must.MatchResponse(t, res, match.HTTPResponse{ | ||
|
|
@@ -172,7 +174,7 @@ func TestDelayedEvents(t *testing.T) { | |
| }) | ||
|
|
||
| time.Sleep(1 * time.Second) | ||
| matchDelayedEvents(t, user, 0) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) | ||
| res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey)) | ||
| must.MatchResponse(t, res, match.HTTPResponse{ | ||
| JSON: []match.JSON{ | ||
|
|
@@ -244,7 +246,7 @@ func TestDelayedEvents(t *testing.T) { | |
| delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id") | ||
|
|
||
| time.Sleep(1 * time.Second) | ||
| matchDelayedEvents(t, user, 1) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) | ||
| res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) | ||
| must.MatchResponse(t, res, match.HTTPResponse{ | ||
| StatusCode: 404, | ||
|
|
@@ -256,7 +258,7 @@ func TestDelayedEvents(t *testing.T) { | |
| getPathForUpdateDelayedEvent(delayID, DelayedEventActionCancel), | ||
| client.WithJSONBody(t, map[string]interface{}{}), | ||
| ) | ||
| matchDelayedEvents(t, user, 0) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) | ||
|
|
||
| time.Sleep(1 * time.Second) | ||
| res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) | ||
|
|
@@ -286,7 +288,7 @@ func TestDelayedEvents(t *testing.T) { | |
| delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id") | ||
|
|
||
| time.Sleep(1 * time.Second) | ||
| matchDelayedEvents(t, user, 1) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) | ||
| res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) | ||
| must.MatchResponse(t, res, match.HTTPResponse{ | ||
| StatusCode: 404, | ||
|
|
@@ -298,7 +300,7 @@ func TestDelayedEvents(t *testing.T) { | |
| getPathForUpdateDelayedEvent(delayID, DelayedEventActionSend), | ||
| client.WithJSONBody(t, map[string]interface{}{}), | ||
| ) | ||
| matchDelayedEvents(t, user, 0) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) | ||
| res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) | ||
| must.MatchResponse(t, res, match.HTTPResponse{ | ||
| JSON: []match.JSON{ | ||
|
|
@@ -328,7 +330,7 @@ func TestDelayedEvents(t *testing.T) { | |
| delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id") | ||
|
|
||
| time.Sleep(1 * time.Second) | ||
| matchDelayedEvents(t, user, 1) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) | ||
| res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) | ||
| must.MatchResponse(t, res, match.HTTPResponse{ | ||
| StatusCode: 404, | ||
|
|
@@ -342,14 +344,14 @@ func TestDelayedEvents(t *testing.T) { | |
| ) | ||
|
|
||
| time.Sleep(1 * time.Second) | ||
| matchDelayedEvents(t, user, 1) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) | ||
| res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey)) | ||
| must.MatchResponse(t, res, match.HTTPResponse{ | ||
| StatusCode: 404, | ||
| }) | ||
|
|
||
| time.Sleep(1 * time.Second) | ||
| matchDelayedEvents(t, user, 0) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) | ||
| res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey)) | ||
| must.MatchResponse(t, res, match.HTTPResponse{ | ||
| JSON: []match.JSON{ | ||
|
|
@@ -376,7 +378,7 @@ func TestDelayedEvents(t *testing.T) { | |
| }), | ||
| getDelayQueryParam("900"), | ||
| ) | ||
| matchDelayedEvents(t, user, 1) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) | ||
|
|
||
| user.MustDo( | ||
| t, | ||
|
|
@@ -386,7 +388,7 @@ func TestDelayedEvents(t *testing.T) { | |
| setterKey: "manual", | ||
| }), | ||
| ) | ||
| matchDelayedEvents(t, user, 1) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) | ||
|
|
||
| time.Sleep(1 * time.Second) | ||
| res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey)) | ||
|
|
@@ -415,7 +417,7 @@ func TestDelayedEvents(t *testing.T) { | |
| }), | ||
| getDelayQueryParam("900"), | ||
| ) | ||
| matchDelayedEvents(t, user, 1) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(1)) | ||
|
|
||
| setterExpected := "manual" | ||
| user2.MustDo( | ||
|
|
@@ -426,7 +428,7 @@ func TestDelayedEvents(t *testing.T) { | |
| setterKey: setterExpected, | ||
| }), | ||
| ) | ||
| matchDelayedEvents(t, user, 0) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) | ||
|
|
||
| time.Sleep(1 * time.Second) | ||
| res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey)) | ||
|
|
@@ -446,41 +448,89 @@ func TestDelayedEvents(t *testing.T) { | |
| stateKey1 := "1" | ||
| stateKey2 := "2" | ||
|
|
||
| numberOfDelayedEvents := 0 | ||
|
|
||
| // Send an initial delayed event that will be ready to send as soon as the server | ||
| // comes back up. | ||
| user.MustDo( | ||
| t, | ||
| "PUT", | ||
| getPathForState(roomID, eventType, stateKey1), | ||
| client.WithJSONBody(t, map[string]interface{}{}), | ||
| getDelayQueryParam("900"), | ||
| ) | ||
| beforeScheduleStateTimestamp2 := time.Now() | ||
| user.MustDo( | ||
| t, | ||
| "PUT", | ||
| getPathForState(roomID, eventType, stateKey2), | ||
| client.WithJSONBody(t, map[string]interface{}{}), | ||
| getDelayQueryParam("9900"), | ||
| ) | ||
| matchDelayedEvents(t, user, 2) | ||
| numberOfDelayedEvents++ | ||
|
|
||
| // Previously, this was naively using a single delayed event with a 10 second delay. | ||
| // But because we're stopping and starting servers here, it could take up to | ||
| // `deployment.GetConfig().SpawnHSTimeout` (defaults to 30 seconds) for the server | ||
| // to start up again so by the time the server is back up, the delayed event may | ||
| // have already been sent invalidating our assertions below (which expect some | ||
| // delayed events to still be pending and then see one of them be sent after the | ||
| // server is back up). | ||
| // | ||
| // We could account for this by setting the delayed event delay to be longer than | ||
| // `deployment.GetConfig().SpawnHSTimeout` but that would make the test suite take | ||
| // longer to run in all cases even for homeservers that are quick to restart because | ||
| // we have to wait for that large delay. | ||
| // | ||
| // We instead account for this by scheduling many delayed events at short intervals | ||
| // (we chose 10 seconds because that's what the test naively chose before). Then | ||
| // whenever the servers comes back, we can just check until it decrements by 1. | ||
| // | ||
| // We add 1 to the number of intervals to ensure that we have at least one interval | ||
| // to check against no matter how things are configured. | ||
| numberOf10SecondIntervals := int(math.Ceil(deployment.GetConfig().SpawnHSTimeout.Seconds()/10)) + 1 | ||
| for i := 0; i < numberOf10SecondIntervals; i++ { | ||
| // +1 as we want to start at 10 seconds and so we don't end up with -100ms delay | ||
| // on the first one. | ||
| delay := time.Duration(i+1)*10*time.Second - 100*time.Millisecond | ||
|
|
||
| user.MustDo( | ||
| t, | ||
| "PUT", | ||
| // Avoid clashing state keys as that would cancel previous delayed events on the | ||
| // same key (start at 2). | ||
| getPathForState(roomID, eventType, fmt.Sprintf("%d", i+2)), | ||
| client.WithJSONBody(t, map[string]interface{}{}), | ||
| getDelayQueryParam(fmt.Sprintf("%d", delay.Milliseconds())), | ||
| ) | ||
| numberOfDelayedEvents++ | ||
| } | ||
| // We expect all of the delayed events to be scheduled and not sent yet. | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(numberOfDelayedEvents)) | ||
|
|
||
| // Restart the server and wait until it's back up. | ||
| deployment.StopServer(t, hsName) | ||
| // Wait one second which will cause the first delayed event to be ready to be sent | ||
| // when the server is back up. | ||
| time.Sleep(1 * time.Second) | ||
| deployment.StartServer(t, hsName) | ||
|
|
||
| // The rest of the test assumes the second delayed event (10 second delay) still | ||
| // hasn't been sent yet. | ||
| if time.Now().Sub(beforeScheduleStateTimestamp2) > 10*time.Second { | ||
| t.Fatalf( | ||
| "Test took too long to run, cannot guarantee delayed event timings. " + | ||
| "More than 10 seconds elapsed between scheduling the delayed event and now when we're about to check for it.", | ||
| ) | ||
| } | ||
|
|
||
| matchDelayedEvents(t, user, 1) | ||
| delayedEventResponse := matchDelayedEvents(t, user, | ||
| // We should still see some delayed events left after the restart. | ||
| delayedEventsNumberGreaterThan(0), | ||
| // We should see at-least one less than we had before the restart (the first | ||
| // delayed event should have been sent). Other delayed events may have been sent | ||
| // by the time the server actually came back up. | ||
| delayedEventsNumberLessThan(numberOfDelayedEvents-1), | ||
| ) | ||
| // Capture whatever number of delayed events are remaining after the server restart. | ||
| remainingDelayedEventCount := countDelayedEvents(t, delayedEventResponse) | ||
| // Sanity check that the room state was updated correctly with the delayed events | ||
| // that were sent. | ||
| user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey1)) | ||
|
|
||
| time.Sleep(9 * time.Second) | ||
| matchDelayedEvents(t, user, 0) | ||
| // Wait until we see another delayed event being sent (ensure things resumed and are continuing). | ||
| time.Sleep(10 * time.Second) | ||
| matchDelayedEvents(t, user, | ||
| delayedEventsNumberLessThan(remainingDelayedEventCount), | ||
| ) | ||
| // Sanity check that the other delayed events also updated the room state correctly. | ||
| // | ||
| // FIXME: Ideally, we'd check specifically for the last one that was sent but it | ||
| // will be a bit of a juggle and fiddly to get this right so for now we just check | ||
| // one. | ||
| user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey2)) | ||
| }) | ||
| } | ||
|
|
@@ -512,25 +562,93 @@ func getDelayedEvents(t *testing.T, user *client.CSAPI) *http.Response { | |
| return user.MustDo(t, "GET", getPathForDelayedEvents()) | ||
| } | ||
|
|
||
| // Checks if the number of delayed events match the given number. This will | ||
| // countDelayedEvents counts the number of delayed events in the response. Assumes the | ||
| // response is well-formed. | ||
| func countDelayedEventsInternal(res *http.Response) (int, error) { | ||
| body, err := io.ReadAll(res.Body) | ||
| if err != nil { | ||
| return 0, fmt.Errorf("countDelayedEventsInternal: Failed to read response body: %s", err) | ||
| } | ||
|
|
||
| parsedBody := gjson.ParseBytes(body) | ||
| return len(parsedBody.Get("delayed_events").Array()), nil | ||
| } | ||
|
|
||
| func countDelayedEvents(t *testing.T, res *http.Response) int { | ||
| t.Helper() | ||
| count, err := countDelayedEventsInternal(res) | ||
| if err != nil { | ||
| t.Fatalf("countDelayedEvents: %s", err) | ||
| } | ||
| return count | ||
| } | ||
|
|
||
| type delayedEventsCheckOpt func(res *http.Response) error | ||
|
|
||
| // delayedEventsNumberEqual returns a check option that checks if the number of delayed events | ||
| // is equal to the given number. | ||
| func delayedEventsNumberEqual(wantNumber int) delayedEventsCheckOpt { | ||
| return func(res *http.Response) error { | ||
| _, err := should.MatchResponse(res, match.HTTPResponse{ | ||
| StatusCode: 200, | ||
| JSON: []match.JSON{ | ||
| match.JSONKeyArrayOfSize("delayed_events", wantNumber), | ||
| }, | ||
| }) | ||
|
Comment on lines
+592
to
+597
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally, the other |
||
| if err == nil { | ||
| return nil | ||
| } | ||
| return fmt.Errorf("delayedEventsNumberEqual(%d): %s", wantNumber, err) | ||
| } | ||
| } | ||
|
|
||
| // delayedEventsNumberLessThan returns a check option that checks if the number of delayed events | ||
| // is greater than the given number. | ||
| func delayedEventsNumberGreaterThan(target int) delayedEventsCheckOpt { | ||
| return func(res *http.Response) error { | ||
| count, err := countDelayedEventsInternal(res) | ||
| if err != nil { | ||
| return fmt.Errorf("delayedEventsNumberGreaterThan(%d): %s", target, err) | ||
| } | ||
| if count > target { | ||
| return nil | ||
| } | ||
| return fmt.Errorf("delayedEventsNumberGreaterThan(%d): got %d", target, count) | ||
| } | ||
| } | ||
|
|
||
| // delayedEventsNumberLessThan returns a check option that checks if the number of delayed events | ||
| // is less than the given number. | ||
| func delayedEventsNumberLessThan(target int) delayedEventsCheckOpt { | ||
| return func(res *http.Response) error { | ||
| count, err := countDelayedEventsInternal(res) | ||
| if err != nil { | ||
| return fmt.Errorf("delayedEventsNumberLessThan(%d): %s", target, err) | ||
| } | ||
| if count < target { | ||
| return nil | ||
| } | ||
| return fmt.Errorf("delayedEventsNumberLessThan(%d): got %d", target, count) | ||
| } | ||
| } | ||
|
|
||
| // matchDelayedEvents will run the given checks on the delayed events response. This will | ||
| // retry to handle replication lag. | ||
| func matchDelayedEvents(t *testing.T, user *client.CSAPI, wantNumber int) { | ||
| func matchDelayedEvents(t *testing.T, user *client.CSAPI, checks ...delayedEventsCheckOpt) *http.Response { | ||
| t.Helper() | ||
|
|
||
| // We need to retry this as replication can sometimes lag. | ||
| user.MustDo(t, "GET", getPathForDelayedEvents(), | ||
| return user.MustDo(t, "GET", getPathForDelayedEvents(), | ||
| client.WithRetryUntil( | ||
| 500*time.Millisecond, | ||
| func(res *http.Response) bool { | ||
| _, err := should.MatchResponse(res, match.HTTPResponse{ | ||
| StatusCode: 200, | ||
| JSON: []match.JSON{ | ||
| match.JSONKeyArrayOfSize("delayed_events", wantNumber), | ||
| }, | ||
| }) | ||
| if err != nil { | ||
| t.Log(err) | ||
| return false | ||
| for _, check := range checks { | ||
| err := check(res) | ||
|
|
||
| if err != nil { | ||
| t.Log(err) | ||
| return false | ||
| } | ||
| } | ||
| return true | ||
| }, | ||
|
|
@@ -553,5 +671,5 @@ func cleanupDelayedEvents(t *testing.T, user *client.CSAPI) { | |
| ) | ||
| } | ||
|
|
||
| matchDelayedEvents(t, user, 0) | ||
| matchDelayedEvents(t, user, delayedEventsNumberEqual(0)) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I needed some less than/greater than comparisons in the refactored test logic, I decided to make
matchDelayedEventslike theMustSyncUntilwhere we can pass in a bevy of check opts.