Skip to content

Commit 6515582

Browse files
authored
streaming: reintroduce websocket ping/pong conn configurer (#182)
Re-add the pingPonger ConnConfigureFunc for the Subscribe endpoint that was removed at some point. This configurer periodically pings clients and cancels the request context if they fail to respond with a pong, improving connection health detection for server-side streaming endpoints. Also fix errorHandler to log write errors instead of discarding them.
1 parent 7a2873f commit 6515582

File tree

1 file changed

+83
-2
lines changed

1 file changed

+83
-2
lines changed

streaming/cmd/chatter/http.go

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ func handleHTTPServer(ctx context.Context, u *url.URL, chatterEndpoints *chatter
5555
{
5656
eh := errorHandler(logger)
5757
upgrader := &websocket.Upgrader{}
58-
chatterServer = chattersvr.New(chatterEndpoints, mux, dec, enc, eh, nil, upgrader, nil)
58+
chatterConfigurer := chattersvr.NewConnConfigurer(nil)
59+
chatterConfigurer.SubscribeFn = pingPonger(logger)
60+
chatterServer = chattersvr.New(chatterEndpoints, mux, dec, enc, eh, nil, upgrader, chatterConfigurer)
5961
if debug {
6062
servers := goahttp.Servers{
6163
chatterServer,
@@ -111,7 +113,86 @@ func handleHTTPServer(ctx context.Context, u *url.URL, chatterEndpoints *chatter
111113
func errorHandler(logger *log.Logger) func(context.Context, http.ResponseWriter, error) {
112114
return func(ctx context.Context, w http.ResponseWriter, err error) {
113115
id := ctx.Value(middleware.RequestIDKey).(string)
114-
_, _ = w.Write([]byte("[" + id + "] encoding: " + err.Error()))
116+
_, writeErr := w.Write([]byte("[" + id + "] encoding: " + err.Error()))
117+
if writeErr != nil {
118+
logger.Printf("[%s] ERROR: failed to write error response: %s", id, writeErr.Error())
119+
}
115120
logger.Printf("[%s] ERROR: %s", id, err.Error())
116121
}
117122
}
123+
124+
// pingPonger configures the websocket connection to check the health of the
125+
// connection between client and server. It periodically sends a ping message
126+
// to the client and if the client does not respond with a pong within a
127+
// specified time, it closes the websocket connection and cancels the request
128+
// context.
129+
//
130+
// NOTE: This is suitable for use only in server-side streaming endpoints
131+
// (i.e. client does NOT send any messages through the stream), because it
132+
// reads the websocket connection for pong messages from the client. If this is
133+
// used in any endpoint where the client streams, it will result in lost
134+
// messages from the client which is undesirable.
135+
func pingPonger(logger *log.Logger) goahttp.ConnConfigureFunc {
136+
pingInterval := 3 * time.Second
137+
return goahttp.ConnConfigureFunc(func(conn *websocket.Conn, cancel context.CancelFunc) *websocket.Conn {
138+
// errc is the channel read by ping-ponger to check if there were any
139+
// errors when reading messages sent by the client from the websocket.
140+
errc := make(chan error)
141+
142+
// Start a goroutine to read messages sent by the client from the
143+
// websocket connection. This will pick up any pong message sent
144+
// by the client. Send any errors to errc.
145+
go func() {
146+
for {
147+
if _, _, err := conn.ReadMessage(); err != nil {
148+
logger.Printf("error reading messages from client: %v", err)
149+
errc <- err
150+
return
151+
}
152+
}
153+
}()
154+
155+
// Start the pinger in a separate goroutine. Read any errors in the
156+
// error channel and stop the goroutine when error received. Close the
157+
// websocket connection and cancel the request when client when error
158+
// received.
159+
go func() {
160+
ticker := time.NewTicker(pingInterval)
161+
defer func() {
162+
ticker.Stop()
163+
logger.Printf("client did not respond with pong")
164+
// cancel the request context when timer expires
165+
cancel()
166+
}()
167+
168+
// Set a read deadline to read pong messages from the client.
169+
// If a client fails to send a pong before the deadline any
170+
// further connection reads will result in an error. We exit the
171+
// goroutine if connection reads error out.
172+
conn.SetReadDeadline(time.Now().Add(pingInterval * 2))
173+
174+
// set a custom pong handler
175+
pongFn := conn.PongHandler()
176+
conn.SetPongHandler(func(appData string) error {
177+
logger.Printf("client says pong")
178+
// Reset the read deadline
179+
conn.SetReadDeadline(time.Now().Add(pingInterval * 2))
180+
return pongFn(appData)
181+
})
182+
183+
for {
184+
select {
185+
case <-errc:
186+
return
187+
case <-ticker.C:
188+
// send periodic ping message
189+
if err := conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(time.Second)); err != nil {
190+
return
191+
}
192+
logger.Printf("pinged client")
193+
}
194+
}
195+
}()
196+
return conn
197+
})
198+
}

0 commit comments

Comments
 (0)