From ffc86f9092507799b63d939e8444311757b0239f Mon Sep 17 00:00:00 2001 From: tobi <31960611+tsmethurst@users.noreply.github.com> Date: Fri, 18 Oct 2024 15:43:09 +0200 Subject: [PATCH] [bugfix] Fix occasionally streaming empty messages (#3456) --- internal/api/client/streaming/stream.go | 62 ++++++++++++++++--------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/internal/api/client/streaming/stream.go b/internal/api/client/streaming/stream.go index 900df4383..eba040cba 100644 --- a/internal/api/client/streaming/stream.go +++ b/internal/api/client/streaming/stream.go @@ -35,6 +35,8 @@ "github.com/gorilla/websocket" ) +var pingMsg = []byte("ping!") + // StreamGETHandler swagger:operation GET /api/v1/streaming streamGet // // Initiate a websocket connection for live streaming of statuses and notifications. @@ -389,40 +391,54 @@ func (m *Module) writeToWSConn( ) { for { // Wrap context with timeout to send a ping. - pingctx, cncl := context.WithTimeout(ctx, ping) + pingCtx, cncl := context.WithTimeout(ctx, ping) - // Block on receipt of msg. - msg, ok := stream.Recv(pingctx) + // Block and wait for + // one of the following: + // + // - receipt of msg + // - timeout of pingCtx + // - stream closed. + msg, gotMsg := stream.Recv(pingCtx) - // Check if cancel because ping. - pinged := (pingctx.Err() != nil) + // If ping context has timed + // out, we should send a ping. + // + // In any case cancel pingCtx + // as we're done with it. + shouldPing := (pingCtx.Err() != nil) cncl() switch { - case !ok && pinged: - // The ping context timed out! - l.Trace("writing websocket ping") - // Wrapped context time-out, send a keep-alive "ping". - if err := wsConn.WriteControl(websocket.PingMessage, nil, time.Time{}); err != nil { + // We have a message to stream. + case gotMsg: + l.Tracef("writing websocket message: %+v", msg) + if err := wsConn.WriteJSON(msg); err != nil { + // If there's an error writing then drop the + // connection, as client may have disappeared + // suddenly; they can reconnect if necessary. + l.Debugf("error writing websocket message: %v", err) + break + } + + // We have no message but we + // need to send a keep-alive ping. + case shouldPing: + l.Trace("writing websocket ping") + if err := wsConn.WriteControl(websocket.PingMessage, pingMsg, time.Time{}); err != nil { + // If there's an error writing then drop the + // connection, as client may have disappeared + // suddenly; they can reconnect if necessary. l.Debugf("error writing websocket ping: %v", err) break } - case !ok: - // Stream was - // closed. + // We have no message and we shouldn't + // send a ping; this means the stream + // has been closed from the client's end. + default: return } - - l.Trace("writing websocket message: %+v", msg) - - // Received a new message from the processor. - if err := wsConn.WriteJSON(msg); err != nil { - l.Debugf("error writing websocket message: %v", err) - break - } } - - l.Debug("finished websocket write") }