Skip to content

Commit e6ef17f

Browse files
committed
Add state machine to ReconnectingWebSocket
1 parent da35ab7 commit e6ef17f

File tree

2 files changed

+97
-34
lines changed

2 files changed

+97
-34
lines changed

src/websocket/reconnectingWebSocket.ts

Lines changed: 63 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,24 @@ import type {
1717
UnidirectionalStream,
1818
} from "./eventStreamConnection";
1919

20+
/**
21+
* Connection states for the ReconnectingWebSocket state machine.
22+
*/
23+
export enum ConnectionState {
24+
/** Initial state, ready to connect */
25+
IDLE = "IDLE",
26+
/** Actively running connect() - WS factory in progress */
27+
CONNECTING = "CONNECTING",
28+
/** Socket is open and working */
29+
CONNECTED = "CONNECTED",
30+
/** Waiting for backoff timer before attempting reconnection */
31+
AWAITING_RETRY = "AWAITING_RETRY",
32+
/** Temporarily paused - user must call reconnect() to resume */
33+
DISCONNECTED = "DISCONNECTED",
34+
/** Permanently closed - cannot be reused */
35+
DISPOSED = "DISPOSED",
36+
}
37+
2038
export type SocketFactory<TData> = () => Promise<UnidirectionalStream<TData>>;
2139

2240
export interface ReconnectingWebSocketOptions {
@@ -49,10 +67,8 @@ export class ReconnectingWebSocket<
4967
#lastRoute = "unknown"; // Cached route for logging when socket is closed
5068
#backoffMs: number;
5169
#reconnectTimeoutId: NodeJS.Timeout | null = null;
52-
#isDisconnected = false; // Temporary pause, can be resumed via reconnect()
53-
#isDisposed = false; // Permanent disposal, cannot be resumed
54-
#isConnecting = false;
55-
#pendingReconnect = false;
70+
#state: ConnectionState = ConnectionState.IDLE;
71+
#pendingReconnect = false; // Queue reconnect during CONNECTING state
5672
#certRefreshAttempted = false; // Tracks if cert refresh was already attempted this connection cycle
5773
readonly #onDispose?: () => void;
5874

@@ -97,11 +113,10 @@ export class ReconnectingWebSocket<
97113
}
98114

99115
/**
100-
* Returns true if the socket is temporarily disconnected and not attempting to reconnect.
101-
* Use reconnect() to resume.
116+
* Returns the current connection state.
102117
*/
103-
get isDisconnected(): boolean {
104-
return this.#isDisconnected;
118+
get state(): string {
119+
return this.#state;
105120
}
106121

107122
/**
@@ -136,22 +151,22 @@ export class ReconnectingWebSocket<
136151
* Resumes the socket if previously disconnected via disconnect().
137152
*/
138153
reconnect(): void {
139-
if (this.#isDisconnected) {
140-
this.#isDisconnected = false;
141-
this.#backoffMs = this.#options.initialBackoffMs;
142-
this.#certRefreshAttempted = false; // User-initiated reconnect, allow retry
154+
if (this.#state === ConnectionState.DISPOSED) {
155+
return;
143156
}
144157

145-
if (this.#isDisposed) {
146-
return;
158+
if (this.#state === ConnectionState.DISCONNECTED) {
159+
this.#state = ConnectionState.IDLE;
160+
this.#backoffMs = this.#options.initialBackoffMs;
161+
this.#certRefreshAttempted = false; // User-initiated reconnect, allow retry
147162
}
148163

149164
if (this.#reconnectTimeoutId !== null) {
150165
clearTimeout(this.#reconnectTimeoutId);
151166
this.#reconnectTimeoutId = null;
152167
}
153168

154-
if (this.#isConnecting) {
169+
if (this.#state === ConnectionState.CONNECTING) {
155170
this.#pendingReconnect = true;
156171
return;
157172
}
@@ -164,16 +179,19 @@ export class ReconnectingWebSocket<
164179
* Temporarily disconnect the socket. Can be resumed via reconnect().
165180
*/
166181
disconnect(code?: number, reason?: string): void {
167-
if (this.#isDisposed || this.#isDisconnected) {
182+
if (
183+
this.#state === ConnectionState.DISPOSED ||
184+
this.#state === ConnectionState.DISCONNECTED
185+
) {
168186
return;
169187
}
170188

171-
this.#isDisconnected = true;
189+
this.#state = ConnectionState.DISCONNECTED;
172190
this.clearCurrentSocket(code, reason);
173191
}
174192

175193
close(code?: number, reason?: string): void {
176-
if (this.#isDisposed) {
194+
if (this.#state === ConnectionState.DISPOSED) {
177195
return;
178196
}
179197

@@ -190,11 +208,16 @@ export class ReconnectingWebSocket<
190208
}
191209

192210
private async connect(): Promise<void> {
193-
if (this.#isDisposed || this.#isDisconnected || this.#isConnecting) {
211+
// Only allow connecting from IDLE, CONNECTED (reconnect), or AWAITING_RETRY states
212+
if (
213+
this.#state === ConnectionState.DISPOSED ||
214+
this.#state === ConnectionState.DISCONNECTED ||
215+
this.#state === ConnectionState.CONNECTING
216+
) {
194217
return;
195218
}
196219

197-
this.#isConnecting = true;
220+
this.#state = ConnectionState.CONNECTING;
198221
try {
199222
// Close any existing socket before creating a new one
200223
if (this.#currentSocket) {
@@ -207,18 +230,20 @@ export class ReconnectingWebSocket<
207230

208231
const socket = await this.#socketFactory();
209232

210-
// Check if disconnected/disposed while waiting for factory
211-
if (this.#isDisposed || this.#isDisconnected) {
233+
// Check if state changed while waiting for factory (e.g., disconnect/dispose called)
234+
if (this.#state !== ConnectionState.CONNECTING) {
212235
socket.close(WebSocketCloseCode.NORMAL, "Cancelled during connection");
213236
return;
214237
}
215238

216239
this.#currentSocket = socket;
217240
this.#lastRoute = this.#route;
241+
this.#state = ConnectionState.CONNECTED;
218242

219243
socket.addEventListener("open", (event) => {
244+
// Reset backoff on successful connection
220245
this.#backoffMs = this.#options.initialBackoffMs;
221-
this.#certRefreshAttempted = false; // Reset on successful connection
246+
this.#certRefreshAttempted = false;
222247
this.executeHandlers("open", event);
223248
});
224249

@@ -236,7 +261,10 @@ export class ReconnectingWebSocket<
236261
});
237262

238263
socket.addEventListener("close", (event) => {
239-
if (this.#isDisposed || this.#isDisconnected) {
264+
if (
265+
this.#state === ConnectionState.DISPOSED ||
266+
this.#state === ConnectionState.DISCONNECTED
267+
) {
240268
return;
241269
}
242270

@@ -259,8 +287,6 @@ export class ReconnectingWebSocket<
259287
} catch (error) {
260288
await this.handleConnectionError(error);
261289
} finally {
262-
this.#isConnecting = false;
263-
264290
if (this.#pendingReconnect) {
265291
this.#pendingReconnect = false;
266292
this.reconnect();
@@ -270,13 +296,15 @@ export class ReconnectingWebSocket<
270296

271297
private scheduleReconnect(): void {
272298
if (
273-
this.#isDisposed ||
274-
this.#isDisconnected ||
275-
this.#reconnectTimeoutId !== null
299+
this.#state === ConnectionState.DISPOSED ||
300+
this.#state === ConnectionState.DISCONNECTED ||
301+
this.#state === ConnectionState.AWAITING_RETRY
276302
) {
277303
return;
278304
}
279305

306+
this.#state = ConnectionState.AWAITING_RETRY;
307+
280308
const jitter =
281309
this.#backoffMs * this.#options.jitterFactor * (Math.random() * 2 - 1);
282310
const delayMs = Math.max(0, this.#backoffMs + jitter);
@@ -361,7 +389,10 @@ export class ReconnectingWebSocket<
361389
* otherwise schedules a reconnect.
362390
*/
363391
private async handleConnectionError(error: unknown): Promise<void> {
364-
if (this.#isDisposed || this.#isDisconnected) {
392+
if (
393+
this.#state === ConnectionState.DISPOSED ||
394+
this.#state === ConnectionState.DISCONNECTED
395+
) {
365396
return;
366397
}
367398

@@ -403,11 +434,11 @@ export class ReconnectingWebSocket<
403434
}
404435

405436
private dispose(code?: number, reason?: string): void {
406-
if (this.#isDisposed) {
437+
if (this.#state === ConnectionState.DISPOSED) {
407438
return;
408439
}
409440

410-
this.#isDisposed = true;
441+
this.#state = ConnectionState.DISPOSED;
411442
this.clearCurrentSocket(code, reason);
412443

413444
for (const set of Object.values(this.#eventHandlers)) {

test/unit/websocket/reconnectingWebSocket.test.ts

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
22

33
import { WebSocketCloseCode, HttpStatusCode } from "@/websocket/codes";
44
import {
5+
ConnectionState,
56
ReconnectingWebSocket,
67
type SocketFactory,
78
} from "@/websocket/reconnectingWebSocket";
@@ -27,13 +28,17 @@ describe("ReconnectingWebSocket", () => {
2728
const { ws, sockets } = await createReconnectingWebSocket();
2829

2930
sockets[0].fireOpen();
31+
expect(ws.state).toBe(ConnectionState.CONNECTED);
32+
3033
sockets[0].fireClose({
3134
code: WebSocketCloseCode.ABNORMAL,
3235
reason: "Network error",
3336
});
37+
expect(ws.state).toBe(ConnectionState.AWAITING_RETRY);
3438

3539
await vi.advanceTimersByTimeAsync(300);
3640
expect(sockets).toHaveLength(2);
41+
expect(ws.state).toBe(ConnectionState.CONNECTED);
3742

3843
ws.close();
3944
});
@@ -65,7 +70,10 @@ describe("ReconnectingWebSocket", () => {
6570
const { ws, sockets } = await createReconnectingWebSocket();
6671

6772
sockets[0].fireOpen();
73+
expect(ws.state).toBe(ConnectionState.CONNECTED);
74+
6875
sockets[0].fireClose({ code, reason: "Unrecoverable" });
76+
expect(ws.state).toBe(ConnectionState.DISCONNECTED);
6977

7078
await vi.advanceTimersByTimeAsync(10000);
7179
expect(sockets).toHaveLength(1);
@@ -97,7 +105,7 @@ describe("ReconnectingWebSocket", () => {
97105
);
98106

99107
// Should be disconnected after unrecoverable HTTP error
100-
expect(ws.isDisconnected).toBe(true);
108+
expect(ws.state).toBe(ConnectionState.DISCONNECTED);
101109

102110
// Should not retry after unrecoverable HTTP error
103111
await vi.advanceTimersByTimeAsync(10000);
@@ -120,6 +128,8 @@ describe("ReconnectingWebSocket", () => {
120128
sockets[0].fireError(
121129
new Error(`Unexpected server response: ${statusCode}`),
122130
);
131+
expect(ws.state).toBe(ConnectionState.DISCONNECTED);
132+
123133
sockets[0].fireClose({
124134
code: WebSocketCloseCode.ABNORMAL,
125135
reason: "Connection failed",
@@ -178,11 +188,13 @@ describe("ReconnectingWebSocket", () => {
178188
await createBlockingReconnectingWebSocket();
179189

180190
ws.reconnect();
191+
expect(ws.state).toBe(ConnectionState.CONNECTING);
181192
ws.reconnect(); // queued
182193
expect(sockets).toHaveLength(2);
183194

184195
// This should cancel the queued request
185196
ws.disconnect();
197+
expect(ws.state).toBe(ConnectionState.DISCONNECTED);
186198
failConnection(new Error("No base URL"));
187199
await Promise.resolve();
188200

@@ -199,10 +211,12 @@ describe("ReconnectingWebSocket", () => {
199211

200212
// Start reconnect (will block on factory promise)
201213
ws.reconnect();
214+
expect(ws.state).toBe(ConnectionState.CONNECTING);
202215
expect(sockets).toHaveLength(2);
203216

204217
// Disconnect while factory is still pending
205218
ws.disconnect();
219+
expect(ws.state).toBe(ConnectionState.DISCONNECTED);
206220

207221
completeConnection();
208222
await Promise.resolve();
@@ -273,6 +287,7 @@ describe("ReconnectingWebSocket", () => {
273287
it("preserves event handlers after suspend() and reconnect()", async () => {
274288
const { ws, sockets } = await createReconnectingWebSocket();
275289
sockets[0].fireOpen();
290+
expect(ws.state).toBe(ConnectionState.CONNECTED);
276291

277292
const handler = vi.fn();
278293
ws.addEventListener("message", handler);
@@ -281,12 +296,14 @@ describe("ReconnectingWebSocket", () => {
281296

282297
// Suspend the socket
283298
ws.disconnect();
299+
expect(ws.state).toBe(ConnectionState.DISCONNECTED);
284300

285301
// Reconnect (async operation)
286302
ws.reconnect();
287303
await Promise.resolve(); // Wait for async connect()
288304
expect(sockets).toHaveLength(2);
289305
sockets[1].fireOpen();
306+
expect(ws.state).toBe(ConnectionState.CONNECTED);
290307

291308
// Handler should still work after suspend/reconnect
292309
sockets[1].fireMessage({ test: 2 });
@@ -360,19 +377,26 @@ describe("ReconnectingWebSocket", () => {
360377
);
361378

362379
sockets[0].fireOpen();
380+
expect(ws.state).toBe(ConnectionState.CONNECTED);
381+
363382
sockets[0].fireClose({
364383
code: WebSocketCloseCode.PROTOCOL_ERROR,
365384
reason: "Protocol error",
366385
});
367386

368387
// Should suspend, not dispose - allows recovery when credentials change
388+
expect(ws.state).toBe(ConnectionState.DISCONNECTED);
369389
expect(disposeCount).toBe(0);
370390

371391
// Should be able to reconnect after suspension
372392
ws.reconnect();
393+
await Promise.resolve();
373394
expect(sockets).toHaveLength(2);
395+
sockets[1].fireOpen();
396+
expect(ws.state).toBe(ConnectionState.CONNECTED);
374397

375398
ws.close();
399+
expect(ws.state).toBe(ConnectionState.DISPOSED);
376400
});
377401

378402
it("does not call onDispose callback during reconnection", async () => {
@@ -398,6 +422,7 @@ describe("ReconnectingWebSocket", () => {
398422
const { ws, sockets, setFactoryError } =
399423
await createReconnectingWebSocketWithErrorControl();
400424
sockets[0].fireOpen();
425+
expect(ws.state).toBe(ConnectionState.CONNECTED);
401426

402427
// Trigger reconnect that will fail with 403
403428
setFactoryError(
@@ -407,6 +432,7 @@ describe("ReconnectingWebSocket", () => {
407432
await Promise.resolve();
408433

409434
// Socket should be suspended - no automatic reconnection
435+
expect(ws.state).toBe(ConnectionState.DISCONNECTED);
410436
await vi.advanceTimersByTimeAsync(10000);
411437
expect(sockets).toHaveLength(1);
412438

@@ -415,17 +441,23 @@ describe("ReconnectingWebSocket", () => {
415441
ws.reconnect();
416442
await Promise.resolve();
417443
expect(sockets).toHaveLength(2);
444+
sockets[1].fireOpen();
445+
expect(ws.state).toBe(ConnectionState.CONNECTED);
418446

419447
ws.close();
448+
expect(ws.state).toBe(ConnectionState.DISPOSED);
420449
});
421450

422451
it("reconnect() does nothing after close()", async () => {
423452
const { ws, sockets } = await createReconnectingWebSocket();
424453
sockets[0].fireOpen();
454+
expect(ws.state).toBe(ConnectionState.CONNECTED);
425455

426456
ws.close();
427-
ws.reconnect();
457+
expect(ws.state).toBe(ConnectionState.DISPOSED);
428458

459+
ws.reconnect();
460+
expect(ws.state).toBe(ConnectionState.DISPOSED);
429461
expect(sockets).toHaveLength(1);
430462
});
431463
});

0 commit comments

Comments
 (0)