From 1f3a1d4b86771b7705a5c145ccedff2b65ff0867 Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Sun, 3 Jul 2022 09:00:59 +0200 Subject: [PATCH] fix: Improved websockets error handling (#3726) * fix: Add websocket client error capturing fix: Incorrect parsing of documentName will never be empty * fix: Non-present documentId in collaboration route should trigger an error response * fix: Close unhandled websocket requests --- server/index.ts | 2 +- server/logging/Logger.ts | 15 +++++++- server/services/collaboration.ts | 60 ++++++++++++++++++++++++++------ server/services/websockets.ts | 26 +++++++++++--- 4 files changed, 87 insertions(+), 16 deletions(-) diff --git a/server/index.ts b/server/index.ts index 58b29d270..d6ca0940e 100644 --- a/server/index.ts +++ b/server/index.ts @@ -101,7 +101,7 @@ async function start(id: number, disconnect: () => void) { Logger.info("lifecycle", `Starting ${name} service`); const init = services[name]; - await init(app, server); + await init(app, server, serviceNames); } server.on("error", (err) => { diff --git a/server/logging/Logger.ts b/server/logging/Logger.ts index aa13a67ac..cd3899ac9 100644 --- a/server/logging/Logger.ts +++ b/server/logging/Logger.ts @@ -1,3 +1,4 @@ +import { IncomingMessage } from "http"; import chalk from "chalk"; import { isEmpty } from "lodash"; import winston from "winston"; @@ -100,8 +101,14 @@ class Logger { * @param message A description of the error * @param error The error that occurred * @param extra Arbitrary data to be logged that will appear in prod logs + * @param request An optional request object to attach to the error */ - error(message: string, error: Error, extra?: Extra) { + error( + message: string, + error: Error, + extra?: Extra, + request?: IncomingMessage + ) { Metrics.increment("logger.error"); Tracing.setError(error); @@ -113,6 +120,12 @@ class Logger { scope.setExtra(key, extra[key]); } + if (request) { + scope.addEventProcessor(function (event) { + return Sentry.Handlers.parseRequest(event, request); + }); + } + Sentry.captureException(error); }); } diff --git a/server/services/collaboration.ts b/server/services/collaboration.ts index 306cd6098..9454e4280 100644 --- a/server/services/collaboration.ts +++ b/server/services/collaboration.ts @@ -1,15 +1,20 @@ -import http from "http"; +import http, { IncomingMessage } from "http"; +import { Duplex } from "stream"; import url from "url"; import { Server } from "@hocuspocus/server"; -import invariant from "invariant"; import Koa from "koa"; import WebSocket from "ws"; +import Logger from "@server/logging/Logger"; import AuthenticationExtension from "../collaboration/AuthenticationExtension"; import LoggerExtension from "../collaboration/LoggerExtension"; import MetricsExtension from "../collaboration/MetricsExtension"; import PersistenceExtension from "../collaboration/PersistenceExtension"; -export default function init(app: Koa, server: http.Server) { +export default function init( + app: Koa, + server: http.Server, + serviceNames: string[] +) { const path = "/collaboration"; const wss = new WebSocket.Server({ noServer: true, @@ -17,6 +22,7 @@ export default function init(app: Koa, server: http.Server) { const hocuspocus = Server.configure({ debounce: 3000, + timeout: 30000, maxDebounce: 10000, extensions: [ new AuthenticationExtension(), @@ -26,15 +32,49 @@ export default function init(app: Koa, server: http.Server) { ], }); - server.on("upgrade", function (req, socket, head) { - if (req.url && req.url.indexOf(path) > -1) { - const documentName = url.parse(req.url).pathname?.split("/").pop(); - invariant(documentName, "Document name must be provided"); + server.on("upgrade", function ( + req: IncomingMessage, + socket: Duplex, + head: Buffer + ) { + if (req.url?.startsWith(path)) { + // parse document id and close connection if not present in request + const documentId = url + .parse(req.url) + .pathname?.replace(path, "") + .split("/") + .pop(); - wss.handleUpgrade(req, socket, head, (client) => { - hocuspocus.handleConnection(client, req, documentName); - }); + if (documentId) { + wss.handleUpgrade(req, socket, head, (client) => { + // Handle websocket connection errors as soon as the client is upgraded + client.on("error", (error) => { + Logger.error( + `Websocket error`, + error, + { + documentId, + }, + req + ); + }); + + hocuspocus.handleConnection(client, req, documentId); + }); + return; + } } + + if ( + req.url?.startsWith("/realtime") && + serviceNames.includes("websockets") + ) { + // Nothing to do, the websockets service will handle this request + return; + } + + // If the collaboration service is running it will close the connection + socket.end(`HTTP/1.1 400 Bad Request\r\n`); }); server.on("shutdown", () => { diff --git a/server/services/websockets.ts b/server/services/websockets.ts index 5be7fe8e6..b2369cfd5 100644 --- a/server/services/websockets.ts +++ b/server/services/websockets.ts @@ -1,4 +1,5 @@ -import http from "http"; +import http, { IncomingMessage } from "http"; +import { Duplex } from "stream"; import invariant from "invariant"; import Koa from "koa"; import IO from "socket.io"; @@ -13,7 +14,11 @@ import { websocketQueue } from "../queues"; import WebsocketsProcessor from "../queues/processors/WebsocketsProcessor"; import Redis from "../redis"; -export default function init(app: Koa, server: http.Server) { +export default function init( + app: Koa, + server: http.Server, + serviceNames: string[] +) { const path = "/realtime"; // Websockets for events and non-collaborative documents @@ -36,11 +41,24 @@ export default function init(app: Koa, server: http.Server) { ); } - server.on("upgrade", function (req, socket, head) { - if (req.url && req.url.indexOf(path) > -1) { + server.on("upgrade", function ( + req: IncomingMessage, + socket: Duplex, + head: Buffer + ) { + if (req.url?.startsWith(path)) { invariant(ioHandleUpgrade, "Existing upgrade handler must exist"); ioHandleUpgrade(req, socket, head); + return; } + + if (serviceNames.includes("collaboration")) { + // Nothing to do, the collaboration service will handle this request + return; + } + + // If the collaboration service isn't running then we need to close the connection + socket.end(`HTTP/1.1 400 Bad Request\r\n`); }); server.on("shutdown", () => {