Add health check for background queue (#6218)

This commit is contained in:
Tom Moor
2023-11-27 08:36:07 -05:00
committed by GitHub
parent 8f53f3b28c
commit 5d70129c04

View File

@@ -1,7 +1,9 @@
/* eslint-disable @typescript-eslint/no-misused-promises */
import Queue from "bull";
import snakeCase from "lodash/snakeCase";
import { Second } from "@shared/utils/time";
import env from "@server/env";
import Logger from "@server/logging/Logger";
import Metrics from "@server/logging/Metrics";
import Redis from "@server/storage/redis";
import ShutdownHelper, { ShutdownOrder } from "./ShutdownHelper";
@@ -11,6 +13,7 @@ export function createQueue(
defaultJobOptions?: Partial<Queue.JobOptions>
) {
const prefix = `queue.${snakeCase(name)}`;
let processedJobsSinceCheck = 0;
// Notes on reusing Redis connections for Bull:
// https://github.com/OptimalBits/bull/blob/b6d530f72a774be0fd4936ddb4ad9df3b183f4b6/PATTERNS.md#reusing-redis-connections
@@ -51,13 +54,31 @@ export function createQueue(
queue.on("failed", () => {
Metrics.increment(`${prefix}.jobs.failed`);
});
queue.on("active", () => {
processedJobsSinceCheck += 1;
});
if (env.ENVIRONMENT !== "test") {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
setInterval(async () => {
Metrics.gauge(`${prefix}.count`, await queue.count());
Metrics.gauge(`${prefix}.delayed_count`, await queue.getDelayedCount());
}, 5 * Second);
setInterval(async () => {
if (processedJobsSinceCheck > 0) {
processedJobsSinceCheck = 0;
return;
}
processedJobsSinceCheck = 0;
const waiting = await queue.getWaitingCount();
if (waiting > 50) {
Logger.fatal(
"Queue has stopped processing jobs",
new Error(`${waiting} jobs are waiting in the ${name} queue`)
);
}
}, 30 * Second);
}
ShutdownHelper.add(name, ShutdownOrder.normal, async () => {