diff --git a/server/env.ts b/server/env.ts index 6eb8d19a0..015a51dc1 100644 --- a/server/env.ts +++ b/server/env.ts @@ -18,6 +18,7 @@ import { IsBoolean, MaxLength, } from "class-validator"; +import uniq from "lodash/uniq"; import { languages } from "@shared/i18n"; import { CannotUseWithout } from "@server/utils/validators"; import Deprecated from "./models/decorators/Deprecated"; @@ -226,16 +227,20 @@ export class Environment { public DEFAULT_LANGUAGE = process.env.DEFAULT_LANGUAGE ?? "en_US"; /** - * A comma separated list of which services should be enabled on this - * instance – defaults to all. + * A comma list of which services should be enabled on this instance – defaults to all. * * If a services flag is passed it takes priority over the environment variable * for example: --services=web,worker */ - public SERVICES = - getArg("services") ?? - process.env.SERVICES ?? - "collaboration,websockets,worker,web"; + public SERVICES = uniq( + ( + getArg("services") ?? + process.env.SERVICES ?? + "collaboration,websockets,worker,web" + ) + .split(",") + .map((service) => service.toLowerCase().trim()) + ); /** * Auto-redirect to https in production. The default is true but you may set diff --git a/server/index.ts b/server/index.ts index 542fc547a..2678e4ee5 100644 --- a/server/index.ts +++ b/server/index.ts @@ -10,7 +10,6 @@ import Koa from "koa"; import helmet from "koa-helmet"; import logger from "koa-logger"; import Router from "koa-router"; -import uniq from "lodash/uniq"; import { AddressInfo } from "net"; import stoppable from "stoppable"; import throng from "throng"; @@ -27,17 +26,11 @@ import { checkConnection, sequelize } from "./storage/database"; import RedisAdapter from "./storage/redis"; import Metrics from "./logging/Metrics"; -// The default is to run all services to make development and OSS installations -// easier to deal with. Separate services are only needed at scale. -const serviceNames = uniq( - env.SERVICES.split(",").map((service) => service.trim()) -); - // The number of processes to run, defaults to the number of CPU's available // for the web service, and 1 for collaboration during the beta period. let processCount = env.WEB_CONCURRENCY; -if (serviceNames.includes("collaboration")) { +if (env.SERVICES.includes("collaboration")) { if (processCount !== 1) { Logger.info( "lifecycle", @@ -114,14 +107,14 @@ async function start(id: number, disconnect: () => void) { app.use(router.routes()); // loop through requested services at startup - for (const name of serviceNames) { + for (const name of env.SERVICES) { if (!Object.keys(services).includes(name)) { throw new Error(`Unknown service ${name}`); } Logger.info("lifecycle", `Starting ${name} service`); const init = services[name]; - await init(app, server, serviceNames); + await init(app, server, env.SERVICES); } server.on("error", (err) => { diff --git a/server/queues/HealthMonitor.ts b/server/queues/HealthMonitor.ts new file mode 100644 index 000000000..f4c5aad3d --- /dev/null +++ b/server/queues/HealthMonitor.ts @@ -0,0 +1,40 @@ +import { Queue } from "bull"; +import { Second } from "@shared/utils/time"; +import Logger from "@server/logging/Logger"; + +/* eslint-disable @typescript-eslint/no-misused-promises */ +export default class HealthMonitor { + /** + * Starts a health monitor for the given queue. If the queue stops processing jobs then the + * process is exit. + * + * @param queue The queue to monitor + */ + public static start(queue: Queue) { + let processedJobsSinceCheck = 0; + + queue.on("active", () => { + processedJobsSinceCheck += 1; + }); + + setInterval(async () => { + if (processedJobsSinceCheck > 0) { + processedJobsSinceCheck = 0; + return; + } + + processedJobsSinceCheck = 0; + const waiting = await queue.getWaitingCount(); + if (waiting > 50) { + Logger.fatal( + "Queue has stopped processing jobs", + new Error(`Jobs are waiting in the ${queue.name} queue`), + { + queue: queue.name, + waiting, + } + ); + } + }, 30 * Second); + } +} diff --git a/server/queues/index.ts b/server/queues/index.ts index 05dba1401..3506d4575 100644 --- a/server/queues/index.ts +++ b/server/queues/index.ts @@ -1,4 +1,4 @@ -import { createQueue } from "@server/utils/queue"; +import { createQueue } from "@server/queues/queue"; export const globalEventQueue = createQueue("globalEvents", { attempts: 5, diff --git a/server/utils/queue.ts b/server/queues/queue.ts similarity index 74% rename from server/utils/queue.ts rename to server/queues/queue.ts index 1073dab1c..b56aba6fd 100644 --- a/server/utils/queue.ts +++ b/server/queues/queue.ts @@ -3,17 +3,15 @@ import Queue from "bull"; import snakeCase from "lodash/snakeCase"; import { Second } from "@shared/utils/time"; import env from "@server/env"; -import Logger from "@server/logging/Logger"; import Metrics from "@server/logging/Metrics"; import Redis from "@server/storage/redis"; -import ShutdownHelper, { ShutdownOrder } from "./ShutdownHelper"; +import ShutdownHelper, { ShutdownOrder } from "@server/utils/ShutdownHelper"; export function createQueue( name: string, defaultJobOptions?: Partial ) { const prefix = `queue.${snakeCase(name)}`; - let processedJobsSinceCheck = 0; // Notes on reusing Redis connections for Bull: // https://github.com/OptimalBits/bull/blob/b6d530f72a774be0fd4936ddb4ad9df3b183f4b6/PATTERNS.md#reusing-redis-connections @@ -54,31 +52,12 @@ export function createQueue( queue.on("failed", () => { Metrics.increment(`${prefix}.jobs.failed`); }); - queue.on("active", () => { - processedJobsSinceCheck += 1; - }); if (env.ENVIRONMENT !== "test") { setInterval(async () => { Metrics.gauge(`${prefix}.count`, await queue.count()); Metrics.gauge(`${prefix}.delayed_count`, await queue.getDelayedCount()); }, 5 * Second); - - setInterval(async () => { - if (processedJobsSinceCheck > 0) { - processedJobsSinceCheck = 0; - return; - } - - processedJobsSinceCheck = 0; - const waiting = await queue.getWaitingCount(); - if (waiting > 50) { - Logger.fatal( - "Queue has stopped processing jobs", - new Error(`${waiting} jobs are waiting in the ${name} queue`) - ); - } - }, 30 * Second); } ShutdownHelper.add(name, ShutdownOrder.normal, async () => { diff --git a/server/services/worker.ts b/server/services/worker.ts index 60521336a..ad4a2277c 100644 --- a/server/services/worker.ts +++ b/server/services/worker.ts @@ -1,6 +1,7 @@ import Logger from "@server/logging/Logger"; import { setResource } from "@server/logging/tracer"; import { traceFunction } from "@server/logging/tracing"; +import HealthMonitor from "@server/queues/HealthMonitor"; import { initI18n } from "@server/utils/i18n"; import { globalEventQueue, @@ -152,4 +153,8 @@ export default function init() { .catch((err) => { Logger.fatal("Error starting taskQueue", err); }); + + HealthMonitor.start(globalEventQueue); + HealthMonitor.start(processorEventQueue); + HealthMonitor.start(websocketQueue); } diff --git a/server/storage/redis.ts b/server/storage/redis.ts index 900025361..d721c2ab7 100644 --- a/server/storage/redis.ts +++ b/server/storage/redis.ts @@ -42,7 +42,7 @@ export default class RedisAdapter extends Redis { */ const connectionNamePrefix = env.isDevelopment ? process.pid : "outline"; const connectionName = - `${connectionNamePrefix}:${env.SERVICES.replace(/,/g, "-")}` + + `${connectionNamePrefix}:${env.SERVICES.join("-")}` + (connectionNameSuffix ? `:${connectionNameSuffix}` : ""); if (!url || !url.startsWith("ioredis://")) {