Skip to content

Commit a2234ea

Browse files
Fix TestDelayedEvents/delayed_state_events_are_kept_on_server_restart to account for slow servers (de-flake) (#830)
Follow-up to #829 Part of element-hq/synapse#18537 ### What does this PR do? Fix `TestDelayedEvents/delayed_state_events_are_kept_on_server_restart` to account for slow servers (de-flake). Previously, this was naively using a single delayed event with a 10 second delay. But because we're stopping and starting servers here, it could take up `deployment.GetConfig().SpawnHSTimeout` (defaults to 30 seconds) for the server to start up again so by the time the server is back up, the delayed event may have already been sent, invalidating our assertions below (which expect some delayed events to still be pending and then see one of them be sent after the server is back up). We could account for this by setting the delayed event delay to be longer than `deployment.GetConfig().SpawnHSTimeout` but that would make the test suite take longer to run in all cases even for homeservers that are quick to restart because we have to wait for that large delay. We instead account for this by scheduling many delayed events at short intervals (we chose 10 seconds because that's what the test naively chose before). Then whenever the servers comes back, we can just check until it decrements by 1. ### Experiencing the flaky failure As experienced when running this test against the worker-based Synapse setup we use alongside the Synapse Pro Rust apps, element-hq/synapse-rust-apps#344 (comment). We probably experience this heavily in the private project because GitHub runners are less than half as powerful as those for public projects and that single container with a share of the 2 CPU cores available is just not powerful enough to run all 16 workers effectively. <details> <summary>For reference, the CI runners provided by GitHub for private projects are less than half as powerful as those for public projects.</summary> > #### Standard GitHub-hosted runners for public repositories > > Virtual machine | Processor (CPU) | Memory (RAM) | Storage (SSD) | Architecture | Workflow label > --- | --- | --- | --- | --- | --- > Linux | 4 | 16 GB | 14 GB | x64 | ubuntu-latest, ubuntu-24.04, ubuntu-22.04 > > *-- [Standard GitHub-hosted runners for public repositories](https://docs.github.com/en/actions/reference/runners/github-hosted-runners#standard-github-hosted-runners-for-public-repositories)* --- > #### Standard GitHub-hosted runners for private repositories > > Virtual Machine | Processor (CPU) | Memory (RAM) | Storage (SSD) | Architecture | Workflow label > --- | --- | --- | --- | --- | --- > Linux | 2 | 7 GB | 14 GB | x64 | ubuntu-latest, ubuntu-24.04, ubuntu-22.04 > > *-- [Standard GitHub-hosted runners for private repositories](https://docs.github.com/en/actions/reference/runners/github-hosted-runners#standard-github-hosted-runners-for-public-repositories)* </details> And for the same slow reasons, why we're also experiencing this as an occasional [flake](element-hq/synapse#18537) with `(workers, postgres)` in the public Synapse CI as well. ### Reproduction instructions I can easily reproduce this problem if I use #827 to limit the number of CPU's available for the homeserver containers to use: `COMPLEMENT_CONTAINER_CPUS=0.5`
1 parent 9ad96b4 commit a2234ea

File tree

1 file changed

+169
-51
lines changed

1 file changed

+169
-51
lines changed

tests/msc4140/delayed_event_test.go

Lines changed: 169 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package tests
22

33
import (
44
"fmt"
5+
"io"
6+
"math"
57
"net/http"
68
"net/url"
79
"testing"
@@ -50,7 +52,7 @@ func TestDelayedEvents(t *testing.T) {
5052
user2.MustJoinRoom(t, roomID, nil)
5153

5254
t.Run("delayed events are empty on startup", func(t *testing.T) {
53-
matchDelayedEvents(t, user, 0)
55+
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
5456
})
5557

5658
t.Run("delayed event lookups are authenticated", func(t *testing.T) {
@@ -100,14 +102,14 @@ func TestDelayedEvents(t *testing.T) {
100102
}
101103

102104
countExpected = 0
103-
matchDelayedEvents(t, user, numEvents)
105+
matchDelayedEvents(t, user, delayedEventsNumberEqual(numEvents))
104106

105107
t.Run("cannot get delayed events of another user", func(t *testing.T) {
106-
matchDelayedEvents(t, user2, 0)
108+
matchDelayedEvents(t, user2, delayedEventsNumberEqual(0))
107109
})
108110

109111
time.Sleep(1 * time.Second)
110-
matchDelayedEvents(t, user, 0)
112+
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
111113
queryParams := url.Values{}
112114
queryParams.Set("dir", "f")
113115
queryParams.Set("from", token)
@@ -149,7 +151,7 @@ func TestDelayedEvents(t *testing.T) {
149151
getDelayQueryParam("900"),
150152
)
151153

152-
matchDelayedEvents(t, user, 1)
154+
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))
153155

154156
res = getDelayedEvents(t, user)
155157
must.MatchResponse(t, res, match.HTTPResponse{
@@ -172,7 +174,7 @@ func TestDelayedEvents(t *testing.T) {
172174
})
173175

174176
time.Sleep(1 * time.Second)
175-
matchDelayedEvents(t, user, 0)
177+
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
176178
res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey))
177179
must.MatchResponse(t, res, match.HTTPResponse{
178180
JSON: []match.JSON{
@@ -244,7 +246,7 @@ func TestDelayedEvents(t *testing.T) {
244246
delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id")
245247

246248
time.Sleep(1 * time.Second)
247-
matchDelayedEvents(t, user, 1)
249+
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))
248250
res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey))
249251
must.MatchResponse(t, res, match.HTTPResponse{
250252
StatusCode: 404,
@@ -256,7 +258,7 @@ func TestDelayedEvents(t *testing.T) {
256258
getPathForUpdateDelayedEvent(delayID, DelayedEventActionCancel),
257259
client.WithJSONBody(t, map[string]interface{}{}),
258260
)
259-
matchDelayedEvents(t, user, 0)
261+
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
260262

261263
time.Sleep(1 * time.Second)
262264
res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey))
@@ -286,7 +288,7 @@ func TestDelayedEvents(t *testing.T) {
286288
delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id")
287289

288290
time.Sleep(1 * time.Second)
289-
matchDelayedEvents(t, user, 1)
291+
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))
290292
res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey))
291293
must.MatchResponse(t, res, match.HTTPResponse{
292294
StatusCode: 404,
@@ -298,7 +300,7 @@ func TestDelayedEvents(t *testing.T) {
298300
getPathForUpdateDelayedEvent(delayID, DelayedEventActionSend),
299301
client.WithJSONBody(t, map[string]interface{}{}),
300302
)
301-
matchDelayedEvents(t, user, 0)
303+
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
302304
res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey))
303305
must.MatchResponse(t, res, match.HTTPResponse{
304306
JSON: []match.JSON{
@@ -328,7 +330,7 @@ func TestDelayedEvents(t *testing.T) {
328330
delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id")
329331

330332
time.Sleep(1 * time.Second)
331-
matchDelayedEvents(t, user, 1)
333+
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))
332334
res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey))
333335
must.MatchResponse(t, res, match.HTTPResponse{
334336
StatusCode: 404,
@@ -342,14 +344,14 @@ func TestDelayedEvents(t *testing.T) {
342344
)
343345

344346
time.Sleep(1 * time.Second)
345-
matchDelayedEvents(t, user, 1)
347+
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))
346348
res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey))
347349
must.MatchResponse(t, res, match.HTTPResponse{
348350
StatusCode: 404,
349351
})
350352

351353
time.Sleep(1 * time.Second)
352-
matchDelayedEvents(t, user, 0)
354+
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
353355
res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey))
354356
must.MatchResponse(t, res, match.HTTPResponse{
355357
JSON: []match.JSON{
@@ -376,7 +378,7 @@ func TestDelayedEvents(t *testing.T) {
376378
}),
377379
getDelayQueryParam("900"),
378380
)
379-
matchDelayedEvents(t, user, 1)
381+
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))
380382

381383
user.MustDo(
382384
t,
@@ -386,7 +388,7 @@ func TestDelayedEvents(t *testing.T) {
386388
setterKey: "manual",
387389
}),
388390
)
389-
matchDelayedEvents(t, user, 1)
391+
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))
390392

391393
time.Sleep(1 * time.Second)
392394
res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey))
@@ -415,7 +417,7 @@ func TestDelayedEvents(t *testing.T) {
415417
}),
416418
getDelayQueryParam("900"),
417419
)
418-
matchDelayedEvents(t, user, 1)
420+
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))
419421

420422
setterExpected := "manual"
421423
user2.MustDo(
@@ -426,7 +428,7 @@ func TestDelayedEvents(t *testing.T) {
426428
setterKey: setterExpected,
427429
}),
428430
)
429-
matchDelayedEvents(t, user, 0)
431+
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
430432

431433
time.Sleep(1 * time.Second)
432434
res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey))
@@ -446,41 +448,89 @@ func TestDelayedEvents(t *testing.T) {
446448
stateKey1 := "1"
447449
stateKey2 := "2"
448450

451+
numberOfDelayedEvents := 0
452+
453+
// Send an initial delayed event that will be ready to send as soon as the server
454+
// comes back up.
449455
user.MustDo(
450456
t,
451457
"PUT",
452458
getPathForState(roomID, eventType, stateKey1),
453459
client.WithJSONBody(t, map[string]interface{}{}),
454460
getDelayQueryParam("900"),
455461
)
456-
beforeScheduleStateTimestamp2 := time.Now()
457-
user.MustDo(
458-
t,
459-
"PUT",
460-
getPathForState(roomID, eventType, stateKey2),
461-
client.WithJSONBody(t, map[string]interface{}{}),
462-
getDelayQueryParam("9900"),
463-
)
464-
matchDelayedEvents(t, user, 2)
462+
numberOfDelayedEvents++
463+
464+
// Previously, this was naively using a single delayed event with a 10 second delay.
465+
// But because we're stopping and starting servers here, it could take up to
466+
// `deployment.GetConfig().SpawnHSTimeout` (defaults to 30 seconds) for the server
467+
// to start up again so by the time the server is back up, the delayed event may
468+
// have already been sent invalidating our assertions below (which expect some
469+
// delayed events to still be pending and then see one of them be sent after the
470+
// server is back up).
471+
//
472+
// We could account for this by setting the delayed event delay to be longer than
473+
// `deployment.GetConfig().SpawnHSTimeout` but that would make the test suite take
474+
// longer to run in all cases even for homeservers that are quick to restart because
475+
// we have to wait for that large delay.
476+
//
477+
// We instead account for this by scheduling many delayed events at short intervals
478+
// (we chose 10 seconds because that's what the test naively chose before). Then
479+
// whenever the servers comes back, we can just check until it decrements by 1.
480+
//
481+
// We add 1 to the number of intervals to ensure that we have at least one interval
482+
// to check against no matter how things are configured.
483+
numberOf10SecondIntervals := int(math.Ceil(deployment.GetConfig().SpawnHSTimeout.Seconds()/10)) + 1
484+
for i := 0; i < numberOf10SecondIntervals; i++ {
485+
// +1 as we want to start at 10 seconds and so we don't end up with -100ms delay
486+
// on the first one.
487+
delay := time.Duration(i+1)*10*time.Second - 100*time.Millisecond
488+
489+
user.MustDo(
490+
t,
491+
"PUT",
492+
// Avoid clashing state keys as that would cancel previous delayed events on the
493+
// same key (start at 2).
494+
getPathForState(roomID, eventType, fmt.Sprintf("%d", i+2)),
495+
client.WithJSONBody(t, map[string]interface{}{}),
496+
getDelayQueryParam(fmt.Sprintf("%d", delay.Milliseconds())),
497+
)
498+
numberOfDelayedEvents++
499+
}
500+
// We expect all of the delayed events to be scheduled and not sent yet.
501+
matchDelayedEvents(t, user, delayedEventsNumberEqual(numberOfDelayedEvents))
465502

503+
// Restart the server and wait until it's back up.
466504
deployment.StopServer(t, hsName)
505+
// Wait one second which will cause the first delayed event to be ready to be sent
506+
// when the server is back up.
467507
time.Sleep(1 * time.Second)
468508
deployment.StartServer(t, hsName)
469509

470-
// The rest of the test assumes the second delayed event (10 second delay) still
471-
// hasn't been sent yet.
472-
if time.Now().Sub(beforeScheduleStateTimestamp2) > 10*time.Second {
473-
t.Fatalf(
474-
"Test took too long to run, cannot guarantee delayed event timings. " +
475-
"More than 10 seconds elapsed between scheduling the delayed event and now when we're about to check for it.",
476-
)
477-
}
478-
479-
matchDelayedEvents(t, user, 1)
510+
delayedEventResponse := matchDelayedEvents(t, user,
511+
// We should still see some delayed events left after the restart.
512+
delayedEventsNumberGreaterThan(0),
513+
// We should see at-least one less than we had before the restart (the first
514+
// delayed event should have been sent). Other delayed events may have been sent
515+
// by the time the server actually came back up.
516+
delayedEventsNumberLessThan(numberOfDelayedEvents-1),
517+
)
518+
// Capture whatever number of delayed events are remaining after the server restart.
519+
remainingDelayedEventCount := countDelayedEvents(t, delayedEventResponse)
520+
// Sanity check that the room state was updated correctly with the delayed events
521+
// that were sent.
480522
user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey1))
481523

482-
time.Sleep(9 * time.Second)
483-
matchDelayedEvents(t, user, 0)
524+
// Wait until we see another delayed event being sent (ensure things resumed and are continuing).
525+
time.Sleep(10 * time.Second)
526+
matchDelayedEvents(t, user,
527+
delayedEventsNumberLessThan(remainingDelayedEventCount),
528+
)
529+
// Sanity check that the other delayed events also updated the room state correctly.
530+
//
531+
// FIXME: Ideally, we'd check specifically for the last one that was sent but it
532+
// will be a bit of a juggle and fiddly to get this right so for now we just check
533+
// one.
484534
user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey2))
485535
})
486536
}
@@ -512,25 +562,93 @@ func getDelayedEvents(t *testing.T, user *client.CSAPI) *http.Response {
512562
return user.MustDo(t, "GET", getPathForDelayedEvents())
513563
}
514564

515-
// Checks if the number of delayed events match the given number. This will
565+
// countDelayedEvents counts the number of delayed events in the response. Assumes the
566+
// response is well-formed.
567+
func countDelayedEventsInternal(res *http.Response) (int, error) {
568+
body, err := io.ReadAll(res.Body)
569+
if err != nil {
570+
return 0, fmt.Errorf("countDelayedEventsInternal: Failed to read response body: %s", err)
571+
}
572+
573+
parsedBody := gjson.ParseBytes(body)
574+
return len(parsedBody.Get("delayed_events").Array()), nil
575+
}
576+
577+
func countDelayedEvents(t *testing.T, res *http.Response) int {
578+
t.Helper()
579+
count, err := countDelayedEventsInternal(res)
580+
if err != nil {
581+
t.Fatalf("countDelayedEvents: %s", err)
582+
}
583+
return count
584+
}
585+
586+
type delayedEventsCheckOpt func(res *http.Response) error
587+
588+
// delayedEventsNumberEqual returns a check option that checks if the number of delayed events
589+
// is equal to the given number.
590+
func delayedEventsNumberEqual(wantNumber int) delayedEventsCheckOpt {
591+
return func(res *http.Response) error {
592+
_, err := should.MatchResponse(res, match.HTTPResponse{
593+
StatusCode: 200,
594+
JSON: []match.JSON{
595+
match.JSONKeyArrayOfSize("delayed_events", wantNumber),
596+
},
597+
})
598+
if err == nil {
599+
return nil
600+
}
601+
return fmt.Errorf("delayedEventsNumberEqual(%d): %s", wantNumber, err)
602+
}
603+
}
604+
605+
// delayedEventsNumberLessThan returns a check option that checks if the number of delayed events
606+
// is greater than the given number.
607+
func delayedEventsNumberGreaterThan(target int) delayedEventsCheckOpt {
608+
return func(res *http.Response) error {
609+
count, err := countDelayedEventsInternal(res)
610+
if err != nil {
611+
return fmt.Errorf("delayedEventsNumberGreaterThan(%d): %s", target, err)
612+
}
613+
if count > target {
614+
return nil
615+
}
616+
return fmt.Errorf("delayedEventsNumberGreaterThan(%d): got %d", target, count)
617+
}
618+
}
619+
620+
// delayedEventsNumberLessThan returns a check option that checks if the number of delayed events
621+
// is less than the given number.
622+
func delayedEventsNumberLessThan(target int) delayedEventsCheckOpt {
623+
return func(res *http.Response) error {
624+
count, err := countDelayedEventsInternal(res)
625+
if err != nil {
626+
return fmt.Errorf("delayedEventsNumberLessThan(%d): %s", target, err)
627+
}
628+
if count < target {
629+
return nil
630+
}
631+
return fmt.Errorf("delayedEventsNumberLessThan(%d): got %d", target, count)
632+
}
633+
}
634+
635+
// matchDelayedEvents will run the given checks on the delayed events response. This will
516636
// retry to handle replication lag.
517-
func matchDelayedEvents(t *testing.T, user *client.CSAPI, wantNumber int) {
637+
func matchDelayedEvents(t *testing.T, user *client.CSAPI, checks ...delayedEventsCheckOpt) *http.Response {
518638
t.Helper()
519639

520640
// We need to retry this as replication can sometimes lag.
521-
user.MustDo(t, "GET", getPathForDelayedEvents(),
641+
return user.MustDo(t, "GET", getPathForDelayedEvents(),
522642
client.WithRetryUntil(
523643
500*time.Millisecond,
524644
func(res *http.Response) bool {
525-
_, err := should.MatchResponse(res, match.HTTPResponse{
526-
StatusCode: 200,
527-
JSON: []match.JSON{
528-
match.JSONKeyArrayOfSize("delayed_events", wantNumber),
529-
},
530-
})
531-
if err != nil {
532-
t.Log(err)
533-
return false
645+
for _, check := range checks {
646+
err := check(res)
647+
648+
if err != nil {
649+
t.Log(err)
650+
return false
651+
}
534652
}
535653
return true
536654
},
@@ -553,5 +671,5 @@ func cleanupDelayedEvents(t *testing.T, user *client.CSAPI) {
553671
)
554672
}
555673

556-
matchDelayedEvents(t, user, 0)
674+
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
557675
}

0 commit comments

Comments
 (0)