fix: replace hardcoded 100s sync wait with configurable health polling#270
Conversation
📝 WalkthroughWalkthroughReplaces a fixed 100-second sync wait with a health-aware polling loop; adds per-iteration subscription health checks and context cancellation handling; introduces SQL queries and types/constants for retrieving Spock subscription status; adds an unreleased changelog entry documenting the fix. Changes
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
server/internal/database/wait_for_sync_event_resource.go (3)
103-145: Transient errors from the subscription health check will abort the entire sync wait.If
GetSubscriptionStatusfails due to a transient issue (e.g., momentary connection hiccup), the loop exits immediately at line 120. Unlike a genuinely unhealthy subscription status (down/disabled), a transient query error doesn't necessarily mean sync cannot progress. Consider tolerating a limited number of consecutive health-check failures before giving up, or at least distinguishing connection errors from a definitive unhealthy-status response.Sketch: tolerate transient health-check failures
+ const maxHealthCheckFailures = 3 + healthCheckFailures := 0 + for { if ctx.Err() != nil { return ctx.Err() } if time.Now().After(deadline) { return fmt.Errorf("replication sync timed out after %s: provider=%s subscriber=%s lsn=%s", timeout, r.ProviderNode, r.SubscriberNode, syncEvent.SyncEventLsn) } status, err := postgres.GetSubscriptionStatus(r.ProviderNode, r.SubscriberNode). Scalar(ctx, subscriberConn) if err != nil { - return fmt.Errorf("failed to check subscription status: %w", err) + healthCheckFailures++ + if healthCheckFailures >= maxHealthCheckFailures { + return fmt.Errorf("failed to check subscription status after %d attempts: %w", healthCheckFailures, err) + } + // Transient failure — skip health gate this iteration + } else { + healthCheckFailures = 0 + switch status { + case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown: + // Worker is running — continue waiting + default: + return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s", + status, r.ProviderNode, r.SubscriberNode) + } } - switch status { - case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown: - // Worker is running — continue waiting - default: - return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s", - status, r.ProviderNode, r.SubscriberNode) - }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/database/wait_for_sync_event_resource.go` around lines 103 - 145, GetSubscriptionStatus failures currently abort the wait loop immediately; change the logic in wait_for_sync_event_resource.go around the call to postgres.GetSubscriptionStatus(r.ProviderNode, r.SubscriberNode).Scalar(ctx, subscriberConn) to tolerate transient errors by tracking a small retry counter (or timestamped failures) and only return an error after N consecutive failures or a total retry timeout, while still treating definitive unhealthy statuses (anything not in postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown) as fatal; on transient errors increment the counter, sleep/backoff briefly and continue the loop instead of returning immediately, and reset the counter when the status query succeeds.
122-128: Conservativedefaultbranch treats any future spock status as unhealthy.The switch allows only
initializing,replicating, andunknown— any new status introduced by spock (or an unexpected casing difference) would immediately fail the sync. This is a safe-by-default choice, but worth documenting the intent so future maintainers know it's deliberate.Minor: add a comment about the design choice
switch status { case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown: // Worker is running — continue waiting default: + // Any status not explicitly known to indicate a running worker is + // treated as unhealthy. Update the allow-list if spock adds new + // healthy statuses. return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s", status, r.ProviderNode, r.SubscriberNode) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/database/wait_for_sync_event_resource.go` around lines 122 - 128, Add a clear comment above the switch that handles subscription statuses (the switch that checks postgres.SubStatusInitializing, postgres.SubStatusReplicating, and postgres.SubStatusUnknown) explaining that the conservative default branch intentionally treats any other/new/ differently-cased spock status as unhealthy and will fail the sync; state this is a deliberate safety decision so future maintainers understand why unknown statuses are not allowed and when it would be safe to relax the default behavior.
28-44: Silent fallback on invalidPGEDGE_SYNC_TIMEOUT_SECONDSmay surprise operators.If the env var is set but contains a non-numeric or non-positive value, the function silently falls through to the 1-hour default. An operator debugging a timeout issue might not realize their override isn't taking effect. Consider logging a warning when the env var is present but invalid.
Proposed fix
This would require adding a logger parameter or using a package-level logger. Alternatively, a minimal approach:
func syncTimeout(resourceTimeout time.Duration) time.Duration { if resourceTimeout > 0 { return resourceTimeout } if v := os.Getenv("PGEDGE_SYNC_TIMEOUT_SECONDS"); v != "" { if seconds, err := strconv.Atoi(v); err == nil && seconds > 0 { return time.Duration(seconds) * time.Second + } else { + // Log or surface the invalid value so operators can diagnose misconfigurations. + fmt.Fprintf(os.Stderr, "WARN: invalid PGEDGE_SYNC_TIMEOUT_SECONDS=%q, using default %s\n", v, defaultSyncTimeout) } } return defaultSyncTimeout }Ideally, use zerolog instead of
fmt.Fprintfto remain consistent with the codebase's structured logging. As per coding guidelines, use structured JSON logging with zerolog throughout the codebase.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/database/wait_for_sync_event_resource.go` around lines 28 - 44, syncTimeout currently silently ignores an invalid PGEDGE_SYNC_TIMEOUT_SECONDS value; update syncTimeout to surface a warning via the codebase logger when the env var is present but non-numeric or non-positive: accept a zerolog.Logger (or use the package-level logger) as a parameter to syncTimeout (or provide a thin wrapper that reads the env var and logs), and when strconv.Atoi fails or seconds <= 0 log a structured warning including the raw env value and the parsing error/context before falling back to defaultSyncTimeout; keep the existing priority order and return values unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@server/internal/database/wait_for_sync_event_resource.go`:
- Around line 103-145: GetSubscriptionStatus failures currently abort the wait
loop immediately; change the logic in wait_for_sync_event_resource.go around the
call to postgres.GetSubscriptionStatus(r.ProviderNode,
r.SubscriberNode).Scalar(ctx, subscriberConn) to tolerate transient errors by
tracking a small retry counter (or timestamped failures) and only return an
error after N consecutive failures or a total retry timeout, while still
treating definitive unhealthy statuses (anything not in
postgres.SubStatusInitializing, postgres.SubStatusReplicating,
postgres.SubStatusUnknown) as fatal; on transient errors increment the counter,
sleep/backoff briefly and continue the loop instead of returning immediately,
and reset the counter when the status query succeeds.
- Around line 122-128: Add a clear comment above the switch that handles
subscription statuses (the switch that checks postgres.SubStatusInitializing,
postgres.SubStatusReplicating, and postgres.SubStatusUnknown) explaining that
the conservative default branch intentionally treats any other/new/
differently-cased spock status as unhealthy and will fail the sync; state this
is a deliberate safety decision so future maintainers understand why unknown
statuses are not allowed and when it would be safe to relax the default
behavior.
- Around line 28-44: syncTimeout currently silently ignores an invalid
PGEDGE_SYNC_TIMEOUT_SECONDS value; update syncTimeout to surface a warning via
the codebase logger when the env var is present but non-numeric or non-positive:
accept a zerolog.Logger (or use the package-level logger) as a parameter to
syncTimeout (or provide a thin wrapper that reads the env var and logs), and
when strconv.Atoi fails or seconds <= 0 log a structured warning including the
raw env value and the parsing error/context before falling back to
defaultSyncTimeout; keep the existing priority order and return values
unchanged.
jason-lynch
left a comment
There was a problem hiding this comment.
Nice! I was able to invoke a sync failure by creating a table in the spock schema and it failed appropriately. I just have the one request to completely remove the timeout, and then I think this is good to go.
There was a problem hiding this comment.
🧹 Nitpick comments (2)
server/internal/database/wait_for_sync_event_resource.go (2)
92-98: Consider logging a warning when subscription status is"unknown".
SubStatusUnknownis treated identically toSubStatusInitializing/SubStatusReplicating, but "unknown" means the worker's state is indeterminate — not that it is definitely running. A stuck-unknown subscription (e.g., a background worker crash that never transitions to"down") will silently loop until the context deadline with no visible indication of the degraded state.♻️ Proposed differentiation of the unknown branch
switch status { - case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown: - // Worker is running — continue waiting + case postgres.SubStatusInitializing, postgres.SubStatusReplicating: + // Worker is running — continue waiting + case postgres.SubStatusUnknown: + log.Ctx(ctx).Warn(). + Str("provider", r.ProviderNode). + Str("subscriber", r.SubscriberNode). + Msg("subscription status is unknown; sync may not make progress") default: return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s", status, r.ProviderNode, r.SubscriberNode) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/database/wait_for_sync_event_resource.go` around lines 92 - 98, The switch treats postgres.SubStatusUnknown like a healthy running state; change the branch so SubStatusUnknown is handled separately: emit a warning log (using the resource's logger, e.g., r.Logger or r.log) that includes the current status, r.ProviderNode and r.SubscriberNode, then continue waiting; keep postgres.SubStatusInitializing and postgres.SubStatusReplicating grouped as before and leave the default case returning the fmt.Errorf unchanged.
78-115: Add structured zerolog logging to the polling loop.The polling loop emits no log output. For a sync that can span many minutes, this leaves operators with no visibility into progress, current subscription status, or why a wait is taking a long time. The coding guideline requires structured JSON logging with zerolog throughout the codebase.
At minimum, log on loop entry, on each health-check result (at
Debuglevel), and on success/failure.♻️ Proposed logging additions
+import "github.com/rs/zerolog/log" for { if ctx.Err() != nil { return ctx.Err() } + log.Ctx(ctx).Debug(). + Str("provider", r.ProviderNode). + Str("subscriber", r.SubscriberNode). + Str("lsn", syncEvent.SyncEventLsn). + Msg("polling subscription sync status") status, err := postgres.GetSubscriptionStatus(r.ProviderNode, r.SubscriberNode). Scalar(ctx, subscriberConn) if err != nil { return fmt.Errorf("failed to check subscription status: %w", err) } + log.Ctx(ctx).Debug(). + Str("provider", r.ProviderNode). + Str("subscriber", r.SubscriberNode). + Str("status", status). + Msg("subscription status") switch status { case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown: // Worker is running — continue waiting default: return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s", status, r.ProviderNode, r.SubscriberNode) } synced, err := postgres.WaitForSyncEvent( r.ProviderNode, syncEvent.SyncEventLsn, int(pollInterval.Seconds()), ).Scalar(ctx, subscriberConn) if errors.Is(err, pgx.ErrNoRows) { return resource.ErrNotFound } if err != nil { return fmt.Errorf("failed to wait for sync event on subscriber: %w", err) } if synced { + log.Ctx(ctx).Info(). + Str("provider", r.ProviderNode). + Str("subscriber", r.SubscriberNode). + Str("lsn", syncEvent.SyncEventLsn). + Msg("sync event confirmed") return nil } }As per coding guidelines: "Use structured JSON logging with zerolog throughout the codebase."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/database/wait_for_sync_event_resource.go` around lines 78 - 115, The polling loop currently has no structured logs; add zerolog structured JSON logging by obtaining the logger with zerolog.Ctx(ctx) at loop start and emit: an Info-level entry log each loop iteration (include provider=r.ProviderNode, subscriber=r.SubscriberNode, poll_interval= pollInterval.Seconds()), a Debug-level log after calling postgres.GetSubscriptionStatus with subscription_status=status and any error detail, an Error-level log and return when the subscription status is unhealthy (include status/provider/subscriber), an Error-level log when postgres.WaitForSyncEvent returns an error (include err/provider/subscriber), a Debug-level log when WaitForSyncEvent returns synced=false (include reason and poll interval), and an Info-level success log when synced==true (include provider/subscriber and synced_lsn or the syncEvent identifier); also log before returning resource.ErrNotFound when ErrNoRows is mapped. Reference functions/values: postgres.GetSubscriptionStatus, postgres.WaitForSyncEvent, r.ProviderNode, r.SubscriberNode, subscriberConn, pollInterval, and use zerolog.Ctx(ctx) for structured fields.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@server/internal/database/wait_for_sync_event_resource.go`:
- Around line 92-98: The switch treats postgres.SubStatusUnknown like a healthy
running state; change the branch so SubStatusUnknown is handled separately: emit
a warning log (using the resource's logger, e.g., r.Logger or r.log) that
includes the current status, r.ProviderNode and r.SubscriberNode, then continue
waiting; keep postgres.SubStatusInitializing and postgres.SubStatusReplicating
grouped as before and leave the default case returning the fmt.Errorf unchanged.
- Around line 78-115: The polling loop currently has no structured logs; add
zerolog structured JSON logging by obtaining the logger with zerolog.Ctx(ctx) at
loop start and emit: an Info-level entry log each loop iteration (include
provider=r.ProviderNode, subscriber=r.SubscriberNode, poll_interval=
pollInterval.Seconds()), a Debug-level log after calling
postgres.GetSubscriptionStatus with subscription_status=status and any error
detail, an Error-level log and return when the subscription status is unhealthy
(include status/provider/subscriber), an Error-level log when
postgres.WaitForSyncEvent returns an error (include err/provider/subscriber), a
Debug-level log when WaitForSyncEvent returns synced=false (include reason and
poll interval), and an Info-level success log when synced==true (include
provider/subscriber and synced_lsn or the syncEvent identifier); also log before
returning resource.ErrNotFound when ErrNoRows is mapped. Reference
functions/values: postgres.GetSubscriptionStatus, postgres.WaitForSyncEvent,
r.ProviderNode, r.SubscriberNode, subscriberConn, pollInterval, and use
zerolog.Ctx(ctx) for structured fields.
Summary
This PR replaces the fixed 100-second replication sync wait with a configurable health-based polling mechanism to better handle large databases and detect broken subscriptions early.
Changes
spock.wait_for_sync_event()call (100s timeout) with a Go-level polling loop that uses short 10-second iterationsspock.sub_show_status()— fails early on unhealthy statuses (down,disabled)Timeoutfield,PGEDGE_SYNC_TIMEOUT_SECONDSenv var, or 1-hour defaultGetSubscriptionStatus()query to fetch a specific subscription's statusSubStatusInitializing,SubStatusReplicating, etc.) with source reference to pgEdge/spockTesting
n1:n2using below query:n3usingupdate-databaseAPI with below config:SELECT * FROM employee;Checklist
PLAT-421