chore: Improve graceful server shutdown (#4625)
* chore: Improve graceful server shutdown * Replace node timers with custom promise timeout
This commit is contained in:
@@ -25,6 +25,7 @@ import {
|
||||
} from "./utils/startup";
|
||||
import { checkUpdates } from "./utils/updates";
|
||||
import onerror from "./onerror";
|
||||
import ShutdownHelper, { ShutdownOrder } from "./utils/ShutdownHelper";
|
||||
|
||||
// The default is to run all services to make development and OSS installations
|
||||
// easier to deal with. Separate services are only needed at scale.
|
||||
@@ -71,7 +72,8 @@ async function start(id: number, disconnect: () => void) {
|
||||
const server = stoppable(
|
||||
useHTTPS
|
||||
? https.createServer(ssl, app.callback())
|
||||
: http.createServer(app.callback())
|
||||
: http.createServer(app.callback()),
|
||||
ShutdownHelper.connectionGraceTimeout
|
||||
);
|
||||
const router = new Router();
|
||||
|
||||
@@ -120,14 +122,26 @@ async function start(id: number, disconnect: () => void) {
|
||||
server.listen(normalizedPortFlag || env.PORT || "3000");
|
||||
server.setTimeout(env.REQUEST_TIMEOUT);
|
||||
|
||||
process.once("SIGTERM", shutdown);
|
||||
process.once("SIGINT", shutdown);
|
||||
ShutdownHelper.add("server", ShutdownOrder.last, () => {
|
||||
return new Promise((resolve, reject) => {
|
||||
// Calling stop prevents new connections from being accepted and waits for
|
||||
// existing connections to close for the grace period before forcefully
|
||||
// closing them.
|
||||
server.stop((err, gracefully) => {
|
||||
disconnect();
|
||||
|
||||
function shutdown() {
|
||||
Logger.info("lifecycle", "Stopping server");
|
||||
server.emit("shutdown");
|
||||
server.stop(disconnect);
|
||||
}
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(gracefully);
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// Handle shutdown signals
|
||||
process.once("SIGTERM", () => ShutdownHelper.execute());
|
||||
process.once("SIGINT", () => ShutdownHelper.execute());
|
||||
}
|
||||
|
||||
throng({
|
||||
|
||||
@@ -28,7 +28,9 @@ class Logger {
|
||||
output: winston.Logger;
|
||||
|
||||
constructor() {
|
||||
this.output = winston.createLogger();
|
||||
this.output = winston.createLogger({
|
||||
level: isProduction ? "info" : "debug",
|
||||
});
|
||||
this.output.add(
|
||||
new winston.transports.Console({
|
||||
format: isProduction
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import ddMetrics from "datadog-metrics";
|
||||
import env from "@server/env";
|
||||
import ShutdownHelper, { ShutdownOrder } from "@server/utils/ShutdownHelper";
|
||||
|
||||
class Metrics {
|
||||
enabled = !!env.DD_API_KEY;
|
||||
@@ -14,6 +15,8 @@ class Metrics {
|
||||
prefix: "outline.",
|
||||
defaultTags: [`env:${process.env.DD_ENV ?? env.ENVIRONMENT}`],
|
||||
});
|
||||
|
||||
ShutdownHelper.add("metrics", ShutdownOrder.last, () => this.flush());
|
||||
}
|
||||
|
||||
gauge(key: string, value: number, tags?: string[]): void {
|
||||
@@ -42,6 +45,16 @@ class Metrics {
|
||||
|
||||
return ddMetrics.increment(key);
|
||||
}
|
||||
|
||||
flush(): Promise<void> {
|
||||
if (!this.enabled) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
ddMetrics.flush(resolve, reject);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export default new Metrics();
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import { Context, Next } from "koa";
|
||||
import { defaults } from "lodash";
|
||||
import RateLimiter from "@server/RateLimiter";
|
||||
import env from "@server/env";
|
||||
import { RateLimitExceededError } from "@server/errors";
|
||||
import Metrics from "@server/logging/Metrics";
|
||||
import Redis from "@server/redis";
|
||||
import RateLimiter from "@server/utils/RateLimiter";
|
||||
|
||||
/**
|
||||
* Middleware that limits the number of requests that are allowed within a given
|
||||
|
||||
@@ -10,7 +10,6 @@ import {
|
||||
FileOperationType,
|
||||
} from "@shared/types";
|
||||
import { colorPalette } from "@shared/utils/collections";
|
||||
import { RateLimiterStrategy } from "@server/RateLimiter";
|
||||
import collectionExporter from "@server/commands/collectionExporter";
|
||||
import teamUpdater from "@server/commands/teamUpdater";
|
||||
import { sequelize } from "@server/database/sequelize";
|
||||
@@ -42,6 +41,7 @@ import {
|
||||
presentCollectionGroupMembership,
|
||||
presentFileOperation,
|
||||
} from "@server/presenters";
|
||||
import { RateLimiterStrategy } from "@server/utils/RateLimiter";
|
||||
import { collectionIndexing } from "@server/utils/indexing";
|
||||
import removeIndexCollision from "@server/utils/removeIndexCollision";
|
||||
import {
|
||||
|
||||
@@ -7,7 +7,6 @@ import { Op, ScopeOptions, WhereOptions } from "sequelize";
|
||||
import { TeamPreference } from "@shared/types";
|
||||
import { subtractDate } from "@shared/utils/date";
|
||||
import { bytesToHumanReadable } from "@shared/utils/files";
|
||||
import { RateLimiterStrategy } from "@server/RateLimiter";
|
||||
import documentCreator from "@server/commands/documentCreator";
|
||||
import documentImporter from "@server/commands/documentImporter";
|
||||
import documentLoader from "@server/commands/documentLoader";
|
||||
@@ -44,6 +43,7 @@ import {
|
||||
presentPolicies,
|
||||
} from "@server/presenters";
|
||||
import { APIContext } from "@server/types";
|
||||
import { RateLimiterStrategy } from "@server/utils/RateLimiter";
|
||||
import { getTeamFromContext } from "@server/utils/passport";
|
||||
import slugify from "@server/utils/slugify";
|
||||
import { assertPresent } from "@server/validation";
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import invariant from "invariant";
|
||||
import Router from "koa-router";
|
||||
import { RateLimiterStrategy } from "@server/RateLimiter";
|
||||
import teamCreator from "@server/commands/teamCreator";
|
||||
import teamUpdater from "@server/commands/teamUpdater";
|
||||
import { sequelize } from "@server/database/sequelize";
|
||||
@@ -9,6 +8,7 @@ import { rateLimiter } from "@server/middlewares/rateLimiter";
|
||||
import { Event, Team, TeamDomain, User } from "@server/models";
|
||||
import { authorize } from "@server/policies";
|
||||
import { presentTeam, presentPolicies } from "@server/presenters";
|
||||
import { RateLimiterStrategy } from "@server/utils/RateLimiter";
|
||||
import { assertUuid } from "@server/validation";
|
||||
|
||||
const router = new Router();
|
||||
|
||||
@@ -4,7 +4,6 @@ import { has } from "lodash";
|
||||
import { Op, WhereOptions } from "sequelize";
|
||||
import { UserPreference } from "@shared/types";
|
||||
import { UserValidation } from "@shared/validations";
|
||||
import { RateLimiterStrategy } from "@server/RateLimiter";
|
||||
import userDemoter from "@server/commands/userDemoter";
|
||||
import userDestroyer from "@server/commands/userDestroyer";
|
||||
import userInviter from "@server/commands/userInviter";
|
||||
@@ -25,6 +24,7 @@ import { Event, User, Team } from "@server/models";
|
||||
import { UserFlag, UserRole } from "@server/models/User";
|
||||
import { can, authorize } from "@server/policies";
|
||||
import { presentUser, presentPolicies } from "@server/presenters";
|
||||
import { RateLimiterStrategy } from "@server/utils/RateLimiter";
|
||||
import {
|
||||
assertIn,
|
||||
assertSort,
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import Router from "koa-router";
|
||||
import { RateLimiterStrategy } from "@server/RateLimiter";
|
||||
import auth from "@server/middlewares/authentication";
|
||||
import { rateLimiter } from "@server/middlewares/rateLimiter";
|
||||
import { View, Document, Event } from "@server/models";
|
||||
import { authorize } from "@server/policies";
|
||||
import { presentView } from "@server/presenters";
|
||||
import { RateLimiterStrategy } from "@server/utils/RateLimiter";
|
||||
import { assertUuid } from "@server/validation";
|
||||
|
||||
const router = new Router();
|
||||
|
||||
@@ -2,7 +2,6 @@ import Router from "koa-router";
|
||||
import { find } from "lodash";
|
||||
import { Client } from "@shared/types";
|
||||
import { parseDomain } from "@shared/utils/domains";
|
||||
import { RateLimiterStrategy } from "@server/RateLimiter";
|
||||
import InviteAcceptedEmail from "@server/emails/templates/InviteAcceptedEmail";
|
||||
import SigninEmail from "@server/emails/templates/SigninEmail";
|
||||
import WelcomeEmail from "@server/emails/templates/WelcomeEmail";
|
||||
@@ -10,6 +9,7 @@ import env from "@server/env";
|
||||
import { AuthorizationError } from "@server/errors";
|
||||
import { rateLimiter } from "@server/middlewares/rateLimiter";
|
||||
import { User, Team } from "@server/models";
|
||||
import { RateLimiterStrategy } from "@server/utils/RateLimiter";
|
||||
import { signIn } from "@server/utils/authentication";
|
||||
import { getUserForEmailSigninToken } from "@server/utils/jwt";
|
||||
import { assertEmail, assertPresent } from "@server/validation";
|
||||
|
||||
@@ -6,6 +6,7 @@ import Koa from "koa";
|
||||
import WebSocket from "ws";
|
||||
import { DocumentValidation } from "@shared/validations";
|
||||
import Logger from "@server/logging/Logger";
|
||||
import ShutdownHelper, { ShutdownOrder } from "@server/utils/ShutdownHelper";
|
||||
import AuthenticationExtension from "../collaboration/AuthenticationExtension";
|
||||
import LoggerExtension from "../collaboration/LoggerExtension";
|
||||
import MetricsExtension from "../collaboration/MetricsExtension";
|
||||
@@ -79,7 +80,7 @@ export default function init(
|
||||
socket.end(`HTTP/1.1 400 Bad Request\r\n`);
|
||||
});
|
||||
|
||||
server.on("shutdown", () => {
|
||||
return hocuspocus.destroy();
|
||||
});
|
||||
ShutdownHelper.add("collaboration", ShutdownOrder.normal, () =>
|
||||
hocuspocus.destroy()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import * as Tracing from "@server/logging/tracer";
|
||||
import { traceFunction } from "@server/logging/tracing";
|
||||
import { Document, Collection, View, User } from "@server/models";
|
||||
import { can } from "@server/policies";
|
||||
import ShutdownHelper, { ShutdownOrder } from "@server/utils/ShutdownHelper";
|
||||
import { getUserForJWT } from "@server/utils/jwt";
|
||||
import { websocketQueue } from "../queues";
|
||||
import WebsocketsProcessor from "../queues/processors/WebsocketsProcessor";
|
||||
@@ -72,7 +73,7 @@ export default function init(
|
||||
socket.end(`HTTP/1.1 400 Bad Request\r\n`);
|
||||
});
|
||||
|
||||
server.on("shutdown", () => {
|
||||
ShutdownHelper.add("websockets", ShutdownOrder.normal, async () => {
|
||||
Metrics.gaugePerInstance("websockets.count", 0);
|
||||
});
|
||||
|
||||
|
||||
96
server/utils/ShutdownHelper.ts
Normal file
96
server/utils/ShutdownHelper.ts
Normal file
@@ -0,0 +1,96 @@
|
||||
import { groupBy } from "lodash";
|
||||
import Logger from "@server/logging/Logger";
|
||||
import { timeout } from "./timers";
|
||||
|
||||
export enum ShutdownOrder {
|
||||
first = 0,
|
||||
normal = 1,
|
||||
last = 2,
|
||||
}
|
||||
|
||||
type Handler = {
|
||||
name: string;
|
||||
order: ShutdownOrder;
|
||||
callback: () => Promise<unknown>;
|
||||
};
|
||||
|
||||
export default class ShutdownHelper {
|
||||
/**
|
||||
* The amount of time to wait for connections to close before forcefully
|
||||
* closing them. This allows for regular HTTP requests to complete but
|
||||
* prevents long running requests from blocking shutdown.
|
||||
*/
|
||||
public static readonly connectionGraceTimeout = 5 * 1000;
|
||||
|
||||
/**
|
||||
* The maximum amount of time to wait for ongoing work to finish before
|
||||
* force quitting the process. In the event of a force quit, the process
|
||||
* will exit with a non-zero exit code.
|
||||
*/
|
||||
public static readonly forceQuitTimeout = 60 * 1000;
|
||||
|
||||
/** Whether the server is currently shutting down */
|
||||
private static isShuttingDown = false;
|
||||
|
||||
/** List of shutdown handlers to execute */
|
||||
private static handlers: Handler[] = [];
|
||||
|
||||
/**
|
||||
* Add a shutdown handler to be executed when the process is exiting
|
||||
*
|
||||
* @param name The name of the handler
|
||||
* @param callback The callback to execute
|
||||
*/
|
||||
public static add(
|
||||
name: string,
|
||||
order: ShutdownOrder,
|
||||
callback: () => Promise<unknown>
|
||||
) {
|
||||
this.handlers.push({ name, order, callback });
|
||||
}
|
||||
|
||||
/**
|
||||
* Exit the process after all shutdown handlers have completed
|
||||
*/
|
||||
public static async execute() {
|
||||
if (this.isShuttingDown) {
|
||||
return;
|
||||
}
|
||||
this.isShuttingDown = true;
|
||||
|
||||
// Start the shutdown timer
|
||||
void timeout(this.forceQuitTimeout).then(() => {
|
||||
Logger.info("lifecycle", "Force quitting");
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
// Group handlers by order
|
||||
const shutdownGroups = groupBy(this.handlers, "order");
|
||||
const orderedKeys = Object.keys(shutdownGroups).sort();
|
||||
|
||||
// Execute handlers in order
|
||||
for (const key of orderedKeys) {
|
||||
Logger.debug("lifecycle", `Running shutdown group ${key}`);
|
||||
const handlers = shutdownGroups[key];
|
||||
|
||||
await Promise.allSettled(
|
||||
handlers.map(async (handler) => {
|
||||
Logger.debug("lifecycle", `Running shutdown handler ${handler.name}`);
|
||||
|
||||
await handler.callback().catch((error) => {
|
||||
Logger.error(
|
||||
`Error inside shutdown handler ${handler.name}`,
|
||||
error,
|
||||
{
|
||||
name: handler.name,
|
||||
}
|
||||
);
|
||||
});
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
Logger.info("lifecycle", "Gracefully quitting");
|
||||
process.exit(0);
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,7 @@ import { snakeCase } from "lodash";
|
||||
import env from "@server/env";
|
||||
import Metrics from "@server/logging/Metrics";
|
||||
import Redis from "../redis";
|
||||
import ShutdownHelper, { ShutdownOrder } from "./ShutdownHelper";
|
||||
|
||||
export function createQueue(
|
||||
name: string,
|
||||
@@ -57,5 +58,9 @@ export function createQueue(
|
||||
}, 5 * 1000);
|
||||
}
|
||||
|
||||
ShutdownHelper.add(name, ShutdownOrder.normal, async () => {
|
||||
await queue.close();
|
||||
});
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
8
server/utils/timers.ts
Normal file
8
server/utils/timers.ts
Normal file
@@ -0,0 +1,8 @@
|
||||
/**
|
||||
* Returns a promise that resolves after a specified number of milliseconds.
|
||||
*
|
||||
* @param [delay=1] The number of milliseconds to wait before fulfilling the promise.
|
||||
*/
|
||||
export function timeout(ms = 1) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
Reference in New Issue
Block a user