fix: Queue health monitor should only run on worker processes (#6228)
This commit is contained in:
@@ -18,6 +18,7 @@ import {
|
|||||||
IsBoolean,
|
IsBoolean,
|
||||||
MaxLength,
|
MaxLength,
|
||||||
} from "class-validator";
|
} from "class-validator";
|
||||||
|
import uniq from "lodash/uniq";
|
||||||
import { languages } from "@shared/i18n";
|
import { languages } from "@shared/i18n";
|
||||||
import { CannotUseWithout } from "@server/utils/validators";
|
import { CannotUseWithout } from "@server/utils/validators";
|
||||||
import Deprecated from "./models/decorators/Deprecated";
|
import Deprecated from "./models/decorators/Deprecated";
|
||||||
@@ -226,16 +227,20 @@ export class Environment {
|
|||||||
public DEFAULT_LANGUAGE = process.env.DEFAULT_LANGUAGE ?? "en_US";
|
public DEFAULT_LANGUAGE = process.env.DEFAULT_LANGUAGE ?? "en_US";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A comma separated list of which services should be enabled on this
|
* A comma list of which services should be enabled on this instance – defaults to all.
|
||||||
* instance – defaults to all.
|
|
||||||
*
|
*
|
||||||
* If a services flag is passed it takes priority over the environment variable
|
* If a services flag is passed it takes priority over the environment variable
|
||||||
* for example: --services=web,worker
|
* for example: --services=web,worker
|
||||||
*/
|
*/
|
||||||
public SERVICES =
|
public SERVICES = uniq(
|
||||||
getArg("services") ??
|
(
|
||||||
process.env.SERVICES ??
|
getArg("services") ??
|
||||||
"collaboration,websockets,worker,web";
|
process.env.SERVICES ??
|
||||||
|
"collaboration,websockets,worker,web"
|
||||||
|
)
|
||||||
|
.split(",")
|
||||||
|
.map((service) => service.toLowerCase().trim())
|
||||||
|
);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Auto-redirect to https in production. The default is true but you may set
|
* Auto-redirect to https in production. The default is true but you may set
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import Koa from "koa";
|
|||||||
import helmet from "koa-helmet";
|
import helmet from "koa-helmet";
|
||||||
import logger from "koa-logger";
|
import logger from "koa-logger";
|
||||||
import Router from "koa-router";
|
import Router from "koa-router";
|
||||||
import uniq from "lodash/uniq";
|
|
||||||
import { AddressInfo } from "net";
|
import { AddressInfo } from "net";
|
||||||
import stoppable from "stoppable";
|
import stoppable from "stoppable";
|
||||||
import throng from "throng";
|
import throng from "throng";
|
||||||
@@ -27,17 +26,11 @@ import { checkConnection, sequelize } from "./storage/database";
|
|||||||
import RedisAdapter from "./storage/redis";
|
import RedisAdapter from "./storage/redis";
|
||||||
import Metrics from "./logging/Metrics";
|
import Metrics from "./logging/Metrics";
|
||||||
|
|
||||||
// The default is to run all services to make development and OSS installations
|
|
||||||
// easier to deal with. Separate services are only needed at scale.
|
|
||||||
const serviceNames = uniq(
|
|
||||||
env.SERVICES.split(",").map((service) => service.trim())
|
|
||||||
);
|
|
||||||
|
|
||||||
// The number of processes to run, defaults to the number of CPU's available
|
// The number of processes to run, defaults to the number of CPU's available
|
||||||
// for the web service, and 1 for collaboration during the beta period.
|
// for the web service, and 1 for collaboration during the beta period.
|
||||||
let processCount = env.WEB_CONCURRENCY;
|
let processCount = env.WEB_CONCURRENCY;
|
||||||
|
|
||||||
if (serviceNames.includes("collaboration")) {
|
if (env.SERVICES.includes("collaboration")) {
|
||||||
if (processCount !== 1) {
|
if (processCount !== 1) {
|
||||||
Logger.info(
|
Logger.info(
|
||||||
"lifecycle",
|
"lifecycle",
|
||||||
@@ -114,14 +107,14 @@ async function start(id: number, disconnect: () => void) {
|
|||||||
app.use(router.routes());
|
app.use(router.routes());
|
||||||
|
|
||||||
// loop through requested services at startup
|
// loop through requested services at startup
|
||||||
for (const name of serviceNames) {
|
for (const name of env.SERVICES) {
|
||||||
if (!Object.keys(services).includes(name)) {
|
if (!Object.keys(services).includes(name)) {
|
||||||
throw new Error(`Unknown service ${name}`);
|
throw new Error(`Unknown service ${name}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
Logger.info("lifecycle", `Starting ${name} service`);
|
Logger.info("lifecycle", `Starting ${name} service`);
|
||||||
const init = services[name];
|
const init = services[name];
|
||||||
await init(app, server, serviceNames);
|
await init(app, server, env.SERVICES);
|
||||||
}
|
}
|
||||||
|
|
||||||
server.on("error", (err) => {
|
server.on("error", (err) => {
|
||||||
|
|||||||
40
server/queues/HealthMonitor.ts
Normal file
40
server/queues/HealthMonitor.ts
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
import { Queue } from "bull";
|
||||||
|
import { Second } from "@shared/utils/time";
|
||||||
|
import Logger from "@server/logging/Logger";
|
||||||
|
|
||||||
|
/* eslint-disable @typescript-eslint/no-misused-promises */
|
||||||
|
export default class HealthMonitor {
|
||||||
|
/**
|
||||||
|
* Starts a health monitor for the given queue. If the queue stops processing jobs then the
|
||||||
|
* process is exit.
|
||||||
|
*
|
||||||
|
* @param queue The queue to monitor
|
||||||
|
*/
|
||||||
|
public static start(queue: Queue) {
|
||||||
|
let processedJobsSinceCheck = 0;
|
||||||
|
|
||||||
|
queue.on("active", () => {
|
||||||
|
processedJobsSinceCheck += 1;
|
||||||
|
});
|
||||||
|
|
||||||
|
setInterval(async () => {
|
||||||
|
if (processedJobsSinceCheck > 0) {
|
||||||
|
processedJobsSinceCheck = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
processedJobsSinceCheck = 0;
|
||||||
|
const waiting = await queue.getWaitingCount();
|
||||||
|
if (waiting > 50) {
|
||||||
|
Logger.fatal(
|
||||||
|
"Queue has stopped processing jobs",
|
||||||
|
new Error(`Jobs are waiting in the ${queue.name} queue`),
|
||||||
|
{
|
||||||
|
queue: queue.name,
|
||||||
|
waiting,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}, 30 * Second);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
import { createQueue } from "@server/utils/queue";
|
import { createQueue } from "@server/queues/queue";
|
||||||
|
|
||||||
export const globalEventQueue = createQueue("globalEvents", {
|
export const globalEventQueue = createQueue("globalEvents", {
|
||||||
attempts: 5,
|
attempts: 5,
|
||||||
|
|||||||
@@ -3,17 +3,15 @@ import Queue from "bull";
|
|||||||
import snakeCase from "lodash/snakeCase";
|
import snakeCase from "lodash/snakeCase";
|
||||||
import { Second } from "@shared/utils/time";
|
import { Second } from "@shared/utils/time";
|
||||||
import env from "@server/env";
|
import env from "@server/env";
|
||||||
import Logger from "@server/logging/Logger";
|
|
||||||
import Metrics from "@server/logging/Metrics";
|
import Metrics from "@server/logging/Metrics";
|
||||||
import Redis from "@server/storage/redis";
|
import Redis from "@server/storage/redis";
|
||||||
import ShutdownHelper, { ShutdownOrder } from "./ShutdownHelper";
|
import ShutdownHelper, { ShutdownOrder } from "@server/utils/ShutdownHelper";
|
||||||
|
|
||||||
export function createQueue(
|
export function createQueue(
|
||||||
name: string,
|
name: string,
|
||||||
defaultJobOptions?: Partial<Queue.JobOptions>
|
defaultJobOptions?: Partial<Queue.JobOptions>
|
||||||
) {
|
) {
|
||||||
const prefix = `queue.${snakeCase(name)}`;
|
const prefix = `queue.${snakeCase(name)}`;
|
||||||
let processedJobsSinceCheck = 0;
|
|
||||||
|
|
||||||
// Notes on reusing Redis connections for Bull:
|
// Notes on reusing Redis connections for Bull:
|
||||||
// https://github.com/OptimalBits/bull/blob/b6d530f72a774be0fd4936ddb4ad9df3b183f4b6/PATTERNS.md#reusing-redis-connections
|
// https://github.com/OptimalBits/bull/blob/b6d530f72a774be0fd4936ddb4ad9df3b183f4b6/PATTERNS.md#reusing-redis-connections
|
||||||
@@ -54,31 +52,12 @@ export function createQueue(
|
|||||||
queue.on("failed", () => {
|
queue.on("failed", () => {
|
||||||
Metrics.increment(`${prefix}.jobs.failed`);
|
Metrics.increment(`${prefix}.jobs.failed`);
|
||||||
});
|
});
|
||||||
queue.on("active", () => {
|
|
||||||
processedJobsSinceCheck += 1;
|
|
||||||
});
|
|
||||||
|
|
||||||
if (env.ENVIRONMENT !== "test") {
|
if (env.ENVIRONMENT !== "test") {
|
||||||
setInterval(async () => {
|
setInterval(async () => {
|
||||||
Metrics.gauge(`${prefix}.count`, await queue.count());
|
Metrics.gauge(`${prefix}.count`, await queue.count());
|
||||||
Metrics.gauge(`${prefix}.delayed_count`, await queue.getDelayedCount());
|
Metrics.gauge(`${prefix}.delayed_count`, await queue.getDelayedCount());
|
||||||
}, 5 * Second);
|
}, 5 * Second);
|
||||||
|
|
||||||
setInterval(async () => {
|
|
||||||
if (processedJobsSinceCheck > 0) {
|
|
||||||
processedJobsSinceCheck = 0;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
processedJobsSinceCheck = 0;
|
|
||||||
const waiting = await queue.getWaitingCount();
|
|
||||||
if (waiting > 50) {
|
|
||||||
Logger.fatal(
|
|
||||||
"Queue has stopped processing jobs",
|
|
||||||
new Error(`${waiting} jobs are waiting in the ${name} queue`)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}, 30 * Second);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ShutdownHelper.add(name, ShutdownOrder.normal, async () => {
|
ShutdownHelper.add(name, ShutdownOrder.normal, async () => {
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
import Logger from "@server/logging/Logger";
|
import Logger from "@server/logging/Logger";
|
||||||
import { setResource } from "@server/logging/tracer";
|
import { setResource } from "@server/logging/tracer";
|
||||||
import { traceFunction } from "@server/logging/tracing";
|
import { traceFunction } from "@server/logging/tracing";
|
||||||
|
import HealthMonitor from "@server/queues/HealthMonitor";
|
||||||
import { initI18n } from "@server/utils/i18n";
|
import { initI18n } from "@server/utils/i18n";
|
||||||
import {
|
import {
|
||||||
globalEventQueue,
|
globalEventQueue,
|
||||||
@@ -152,4 +153,8 @@ export default function init() {
|
|||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
Logger.fatal("Error starting taskQueue", err);
|
Logger.fatal("Error starting taskQueue", err);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
HealthMonitor.start(globalEventQueue);
|
||||||
|
HealthMonitor.start(processorEventQueue);
|
||||||
|
HealthMonitor.start(websocketQueue);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ export default class RedisAdapter extends Redis {
|
|||||||
*/
|
*/
|
||||||
const connectionNamePrefix = env.isDevelopment ? process.pid : "outline";
|
const connectionNamePrefix = env.isDevelopment ? process.pid : "outline";
|
||||||
const connectionName =
|
const connectionName =
|
||||||
`${connectionNamePrefix}:${env.SERVICES.replace(/,/g, "-")}` +
|
`${connectionNamePrefix}:${env.SERVICES.join("-")}` +
|
||||||
(connectionNameSuffix ? `:${connectionNameSuffix}` : "");
|
(connectionNameSuffix ? `:${connectionNameSuffix}` : "");
|
||||||
|
|
||||||
if (!url || !url.startsWith("ioredis://")) {
|
if (!url || !url.startsWith("ioredis://")) {
|
||||||
|
|||||||
Reference in New Issue
Block a user