Skip to content

Comments

fix: replace hardcoded 100s sync wait with configurable health polling#270

Merged
tsivaprasad merged 2 commits intomainfrom
PLAT-421-wait-for-sync-event-hardcoded-timeout-can-be-too-low
Feb 19, 2026
Merged

fix: replace hardcoded 100s sync wait with configurable health polling#270
tsivaprasad merged 2 commits intomainfrom
PLAT-421-wait-for-sync-event-hardcoded-timeout-can-be-too-low

Conversation

@tsivaprasad
Copy link
Contributor

@tsivaprasad tsivaprasad commented Feb 18, 2026

Summary

This PR replaces the fixed 100-second replication sync wait with a configurable health-based polling mechanism to better handle large databases and detect broken subscriptions early.

Changes

  • Replace single blocking spock.wait_for_sync_event() call (100s timeout) with a Go-level polling loop that uses short 10-second iterations
  • Add subscription health monitoring between poll iterations using spock.sub_show_status() — fails early on unhealthy statuses (down, disabled)
  • Add configurable timeout: per-resource Timeout field, PGEDGE_SYNC_TIMEOUT_SECONDS env var, or 1-hour default
  • Add GetSubscriptionStatus() query to fetch a specific subscription's status
  • Add spock subscription status constants (SubStatusInitializing, SubStatusReplicating, etc.) with source reference to pgEdge/spock

Testing

  1. Created a 3 node cluster
  2. Created database with below config:
 "nodes": [
      {"name": "n1", "host_ids": ["host-1"]},
      {"name": "n2", "host_ids": ["host-2"]}
    ]

  1. Insert data using below script on the n1:
-- Drop if exists (optional)
DROP TABLE IF EXISTS employee;

-- Create table
CREATE TABLE employee (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    department VARCHAR(50),
    salary NUMERIC(10,2),
    joining_date DATE,
    email VARCHAR(150)
);

-- Insert 2000 records using generate_series
INSERT INTO employee (name, department, salary, joining_date, email)
SELECT 
    'Employee_' || gs,
    CASE 
        WHEN gs % 5 = 0 THEN 'HR'
        WHEN gs % 5 = 1 THEN 'Engineering'
        WHEN gs % 5 = 2 THEN 'Finance'
        WHEN gs % 5 = 3 THEN 'Marketing'
        ELSE 'Operations'
    END,
    ROUND((30000 + (random() * 70000))::numeric, 2),
    CURRENT_DATE - (gs || ' days')::interval,
    'employee_' || gs || '@company.com'
FROM generate_series(1, 2000) AS gs;
storefront=# SELECT COUNT(*) FROM employee;
 count 
-------
  2000
(1 row)
  1. Verified that the data is inserted and sync with n2 using below query:
SELECT * FROM employee;

storefront=# SELECT COUNT(*) FROM employee;
 count 
-------
  2000
(1 row)
  1. Added new node n3 using update-database API with below config:
 "nodes": [
      {"name": "n1", "host_ids": ["host-1"]},
      {"name": "n2", "host_ids": ["host-2"]},
      {"name": "n3", "host_ids": ["host-3"]}
    ]   
  1. Verified that the data is synced to n3 using below query:
    SELECT * FROM employee;
storefront=# SELECT COUNT(*) FROM employee;
 count 
-------
  2000
(1 row)

Checklist

PLAT-421

@coderabbitai
Copy link

coderabbitai bot commented Feb 18, 2026

📝 Walkthrough

Walkthrough

Replaces a fixed 100-second sync wait with a health-aware polling loop; adds per-iteration subscription health checks and context cancellation handling; introduces SQL queries and types/constants for retrieving Spock subscription status; adds an unreleased changelog entry documenting the fix.

Changes

Cohort / File(s) Summary
Changelog Entry
changes/unreleased/Fixed-20260218-230320.yaml
Adds an unreleased changelog entry describing the replacement of a fixed 100-second sync wait with a configurable health-based polling mechanism and timestamp.
Sync wait implementation
server/internal/database/wait_for_sync_event_resource.go
Replaces one-shot fixed wait with a polling loop using a 10s poll interval, adds per-iteration subscription health checks and context cancellation handling, repeatedly calls WaitForSyncEvent with per-iteration timeouts, preserves ErrNoRows as NotFound, and removes the previous overall fixed-timeout path.
Postgres subscription queries & types
server/internal/postgres/create_db.go, server/internal/postgres/info.go
Adds public Query function GetSubscriptionStatus(providerNode, subscriberNode string) Query[string]; introduces subscription status constants (initializing, replicating, unknown, disabled, down), a SubscriptionStatus struct with metadata fields, and GetSubscriptionStatuses() Query[SubscriptionStatus] returning JSON-encoded spock.sub_show_status results.

Poem

🐰 I hopped in code where waits would stall,
Polling beats a hundred-second wall.
I check each health, then hop once more,
Syncs hum softly, bugs explore.
A tiny rabbit cheers the chore! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and concisely summarizes the main change: replacing a hardcoded 100-second synchronization wait with a configurable health-based polling mechanism.
Description check ✅ Passed The PR description includes all required sections: Summary, Changes, Testing, and Checklist with appropriate detail and documentation of testing steps performed.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch PLAT-421-wait-for-sync-event-hardcoded-timeout-can-be-too-low

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
server/internal/database/wait_for_sync_event_resource.go (3)

103-145: Transient errors from the subscription health check will abort the entire sync wait.

If GetSubscriptionStatus fails due to a transient issue (e.g., momentary connection hiccup), the loop exits immediately at line 120. Unlike a genuinely unhealthy subscription status (down/disabled), a transient query error doesn't necessarily mean sync cannot progress. Consider tolerating a limited number of consecutive health-check failures before giving up, or at least distinguishing connection errors from a definitive unhealthy-status response.

Sketch: tolerate transient health-check failures
+	const maxHealthCheckFailures = 3
+	healthCheckFailures := 0
+
 	for {
 		if ctx.Err() != nil {
 			return ctx.Err()
 		}
 
 		if time.Now().After(deadline) {
 			return fmt.Errorf("replication sync timed out after %s: provider=%s subscriber=%s lsn=%s",
 				timeout, r.ProviderNode, r.SubscriberNode, syncEvent.SyncEventLsn)
 		}
 
 		status, err := postgres.GetSubscriptionStatus(r.ProviderNode, r.SubscriberNode).
 			Scalar(ctx, subscriberConn)
 		if err != nil {
-			return fmt.Errorf("failed to check subscription status: %w", err)
+			healthCheckFailures++
+			if healthCheckFailures >= maxHealthCheckFailures {
+				return fmt.Errorf("failed to check subscription status after %d attempts: %w", healthCheckFailures, err)
+			}
+			// Transient failure — skip health gate this iteration
+		} else {
+			healthCheckFailures = 0
+			switch status {
+			case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown:
+				// Worker is running — continue waiting
+			default:
+				return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s",
+					status, r.ProviderNode, r.SubscriberNode)
+			}
 		}
-		switch status {
-		case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown:
-			// Worker is running — continue waiting
-		default:
-			return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s",
-				status, r.ProviderNode, r.SubscriberNode)
-		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/database/wait_for_sync_event_resource.go` around lines 103 -
145, GetSubscriptionStatus failures currently abort the wait loop immediately;
change the logic in wait_for_sync_event_resource.go around the call to
postgres.GetSubscriptionStatus(r.ProviderNode, r.SubscriberNode).Scalar(ctx,
subscriberConn) to tolerate transient errors by tracking a small retry counter
(or timestamped failures) and only return an error after N consecutive failures
or a total retry timeout, while still treating definitive unhealthy statuses
(anything not in postgres.SubStatusInitializing, postgres.SubStatusReplicating,
postgres.SubStatusUnknown) as fatal; on transient errors increment the counter,
sleep/backoff briefly and continue the loop instead of returning immediately,
and reset the counter when the status query succeeds.

122-128: Conservative default branch treats any future spock status as unhealthy.

The switch allows only initializing, replicating, and unknown — any new status introduced by spock (or an unexpected casing difference) would immediately fail the sync. This is a safe-by-default choice, but worth documenting the intent so future maintainers know it's deliberate.

Minor: add a comment about the design choice
 		switch status {
 		case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown:
 			// Worker is running — continue waiting
 		default:
+			// Any status not explicitly known to indicate a running worker is
+			// treated as unhealthy. Update the allow-list if spock adds new
+			// healthy statuses.
 			return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s",
 				status, r.ProviderNode, r.SubscriberNode)
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/database/wait_for_sync_event_resource.go` around lines 122 -
128, Add a clear comment above the switch that handles subscription statuses
(the switch that checks postgres.SubStatusInitializing,
postgres.SubStatusReplicating, and postgres.SubStatusUnknown) explaining that
the conservative default branch intentionally treats any other/new/
differently-cased spock status as unhealthy and will fail the sync; state this
is a deliberate safety decision so future maintainers understand why unknown
statuses are not allowed and when it would be safe to relax the default
behavior.

28-44: Silent fallback on invalid PGEDGE_SYNC_TIMEOUT_SECONDS may surprise operators.

If the env var is set but contains a non-numeric or non-positive value, the function silently falls through to the 1-hour default. An operator debugging a timeout issue might not realize their override isn't taking effect. Consider logging a warning when the env var is present but invalid.

Proposed fix

This would require adding a logger parameter or using a package-level logger. Alternatively, a minimal approach:

 func syncTimeout(resourceTimeout time.Duration) time.Duration {
 	if resourceTimeout > 0 {
 		return resourceTimeout
 	}
 	if v := os.Getenv("PGEDGE_SYNC_TIMEOUT_SECONDS"); v != "" {
 		if seconds, err := strconv.Atoi(v); err == nil && seconds > 0 {
 			return time.Duration(seconds) * time.Second
+		} else {
+			// Log or surface the invalid value so operators can diagnose misconfigurations.
+			fmt.Fprintf(os.Stderr, "WARN: invalid PGEDGE_SYNC_TIMEOUT_SECONDS=%q, using default %s\n", v, defaultSyncTimeout)
 		}
 	}
 	return defaultSyncTimeout
 }

Ideally, use zerolog instead of fmt.Fprintf to remain consistent with the codebase's structured logging. As per coding guidelines, use structured JSON logging with zerolog throughout the codebase.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/database/wait_for_sync_event_resource.go` around lines 28 -
44, syncTimeout currently silently ignores an invalid
PGEDGE_SYNC_TIMEOUT_SECONDS value; update syncTimeout to surface a warning via
the codebase logger when the env var is present but non-numeric or non-positive:
accept a zerolog.Logger (or use the package-level logger) as a parameter to
syncTimeout (or provide a thin wrapper that reads the env var and logs), and
when strconv.Atoi fails or seconds <= 0 log a structured warning including the
raw env value and the parsing error/context before falling back to
defaultSyncTimeout; keep the existing priority order and return values
unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@server/internal/database/wait_for_sync_event_resource.go`:
- Around line 103-145: GetSubscriptionStatus failures currently abort the wait
loop immediately; change the logic in wait_for_sync_event_resource.go around the
call to postgres.GetSubscriptionStatus(r.ProviderNode,
r.SubscriberNode).Scalar(ctx, subscriberConn) to tolerate transient errors by
tracking a small retry counter (or timestamped failures) and only return an
error after N consecutive failures or a total retry timeout, while still
treating definitive unhealthy statuses (anything not in
postgres.SubStatusInitializing, postgres.SubStatusReplicating,
postgres.SubStatusUnknown) as fatal; on transient errors increment the counter,
sleep/backoff briefly and continue the loop instead of returning immediately,
and reset the counter when the status query succeeds.
- Around line 122-128: Add a clear comment above the switch that handles
subscription statuses (the switch that checks postgres.SubStatusInitializing,
postgres.SubStatusReplicating, and postgres.SubStatusUnknown) explaining that
the conservative default branch intentionally treats any other/new/
differently-cased spock status as unhealthy and will fail the sync; state this
is a deliberate safety decision so future maintainers understand why unknown
statuses are not allowed and when it would be safe to relax the default
behavior.
- Around line 28-44: syncTimeout currently silently ignores an invalid
PGEDGE_SYNC_TIMEOUT_SECONDS value; update syncTimeout to surface a warning via
the codebase logger when the env var is present but non-numeric or non-positive:
accept a zerolog.Logger (or use the package-level logger) as a parameter to
syncTimeout (or provide a thin wrapper that reads the env var and logs), and
when strconv.Atoi fails or seconds <= 0 log a structured warning including the
raw env value and the parsing error/context before falling back to
defaultSyncTimeout; keep the existing priority order and return values
unchanged.

Copy link
Member

@jason-lynch jason-lynch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I was able to invoke a sync failure by creating a table in the spock schema and it failed appropriately. I just have the one request to completely remove the timeout, and then I think this is good to go.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
server/internal/database/wait_for_sync_event_resource.go (2)

92-98: Consider logging a warning when subscription status is "unknown".

SubStatusUnknown is treated identically to SubStatusInitializing/SubStatusReplicating, but "unknown" means the worker's state is indeterminate — not that it is definitely running. A stuck-unknown subscription (e.g., a background worker crash that never transitions to "down") will silently loop until the context deadline with no visible indication of the degraded state.

♻️ Proposed differentiation of the unknown branch
     switch status {
-    case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown:
-        // Worker is running — continue waiting
+    case postgres.SubStatusInitializing, postgres.SubStatusReplicating:
+        // Worker is running — continue waiting
+    case postgres.SubStatusUnknown:
+        log.Ctx(ctx).Warn().
+            Str("provider", r.ProviderNode).
+            Str("subscriber", r.SubscriberNode).
+            Msg("subscription status is unknown; sync may not make progress")
     default:
         return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s",
             status, r.ProviderNode, r.SubscriberNode)
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/database/wait_for_sync_event_resource.go` around lines 92 -
98, The switch treats postgres.SubStatusUnknown like a healthy running state;
change the branch so SubStatusUnknown is handled separately: emit a warning log
(using the resource's logger, e.g., r.Logger or r.log) that includes the current
status, r.ProviderNode and r.SubscriberNode, then continue waiting; keep
postgres.SubStatusInitializing and postgres.SubStatusReplicating grouped as
before and leave the default case returning the fmt.Errorf unchanged.

78-115: Add structured zerolog logging to the polling loop.

The polling loop emits no log output. For a sync that can span many minutes, this leaves operators with no visibility into progress, current subscription status, or why a wait is taking a long time. The coding guideline requires structured JSON logging with zerolog throughout the codebase.

At minimum, log on loop entry, on each health-check result (at Debug level), and on success/failure.

♻️ Proposed logging additions
+import "github.com/rs/zerolog/log"

 for {
     if ctx.Err() != nil {
         return ctx.Err()
     }

+    log.Ctx(ctx).Debug().
+        Str("provider", r.ProviderNode).
+        Str("subscriber", r.SubscriberNode).
+        Str("lsn", syncEvent.SyncEventLsn).
+        Msg("polling subscription sync status")

     status, err := postgres.GetSubscriptionStatus(r.ProviderNode, r.SubscriberNode).
         Scalar(ctx, subscriberConn)
     if err != nil {
         return fmt.Errorf("failed to check subscription status: %w", err)
     }
+    log.Ctx(ctx).Debug().
+        Str("provider", r.ProviderNode).
+        Str("subscriber", r.SubscriberNode).
+        Str("status", status).
+        Msg("subscription status")
     switch status {
     case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown:
         // Worker is running — continue waiting
     default:
         return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s",
             status, r.ProviderNode, r.SubscriberNode)
     }

     synced, err := postgres.WaitForSyncEvent(
         r.ProviderNode, syncEvent.SyncEventLsn, int(pollInterval.Seconds()),
     ).Scalar(ctx, subscriberConn)
     if errors.Is(err, pgx.ErrNoRows) {
         return resource.ErrNotFound
     }
     if err != nil {
         return fmt.Errorf("failed to wait for sync event on subscriber: %w", err)
     }
     if synced {
+        log.Ctx(ctx).Info().
+            Str("provider", r.ProviderNode).
+            Str("subscriber", r.SubscriberNode).
+            Str("lsn", syncEvent.SyncEventLsn).
+            Msg("sync event confirmed")
         return nil
     }
 }

As per coding guidelines: "Use structured JSON logging with zerolog throughout the codebase."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/database/wait_for_sync_event_resource.go` around lines 78 -
115, The polling loop currently has no structured logs; add zerolog structured
JSON logging by obtaining the logger with zerolog.Ctx(ctx) at loop start and
emit: an Info-level entry log each loop iteration (include
provider=r.ProviderNode, subscriber=r.SubscriberNode, poll_interval=
pollInterval.Seconds()), a Debug-level log after calling
postgres.GetSubscriptionStatus with subscription_status=status and any error
detail, an Error-level log and return when the subscription status is unhealthy
(include status/provider/subscriber), an Error-level log when
postgres.WaitForSyncEvent returns an error (include err/provider/subscriber), a
Debug-level log when WaitForSyncEvent returns synced=false (include reason and
poll interval), and an Info-level success log when synced==true (include
provider/subscriber and synced_lsn or the syncEvent identifier); also log before
returning resource.ErrNotFound when ErrNoRows is mapped. Reference
functions/values: postgres.GetSubscriptionStatus, postgres.WaitForSyncEvent,
r.ProviderNode, r.SubscriberNode, subscriberConn, pollInterval, and use
zerolog.Ctx(ctx) for structured fields.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@server/internal/database/wait_for_sync_event_resource.go`:
- Around line 92-98: The switch treats postgres.SubStatusUnknown like a healthy
running state; change the branch so SubStatusUnknown is handled separately: emit
a warning log (using the resource's logger, e.g., r.Logger or r.log) that
includes the current status, r.ProviderNode and r.SubscriberNode, then continue
waiting; keep postgres.SubStatusInitializing and postgres.SubStatusReplicating
grouped as before and leave the default case returning the fmt.Errorf unchanged.
- Around line 78-115: The polling loop currently has no structured logs; add
zerolog structured JSON logging by obtaining the logger with zerolog.Ctx(ctx) at
loop start and emit: an Info-level entry log each loop iteration (include
provider=r.ProviderNode, subscriber=r.SubscriberNode, poll_interval=
pollInterval.Seconds()), a Debug-level log after calling
postgres.GetSubscriptionStatus with subscription_status=status and any error
detail, an Error-level log and return when the subscription status is unhealthy
(include status/provider/subscriber), an Error-level log when
postgres.WaitForSyncEvent returns an error (include err/provider/subscriber), a
Debug-level log when WaitForSyncEvent returns synced=false (include reason and
poll interval), and an Info-level success log when synced==true (include
provider/subscriber and synced_lsn or the syncEvent identifier); also log before
returning resource.ErrNotFound when ErrNoRows is mapped. Reference
functions/values: postgres.GetSubscriptionStatus, postgres.WaitForSyncEvent,
r.ProviderNode, r.SubscriberNode, subscriberConn, pollInterval, and use
zerolog.Ctx(ctx) for structured fields.

@tsivaprasad tsivaprasad merged commit 43a1517 into main Feb 19, 2026
3 checks passed
@tsivaprasad tsivaprasad deleted the PLAT-421-wait-for-sync-event-hardcoded-timeout-can-be-too-low branch February 19, 2026 18:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants