From 5d70129c04afa267369383078d12cfeca9c883b6 Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Mon, 27 Nov 2023 08:36:07 -0500 Subject: [PATCH] Add health check for background queue (#6218) --- server/utils/queue.ts | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/server/utils/queue.ts b/server/utils/queue.ts index ef10d52a2..1073dab1c 100644 --- a/server/utils/queue.ts +++ b/server/utils/queue.ts @@ -1,7 +1,9 @@ +/* eslint-disable @typescript-eslint/no-misused-promises */ import Queue from "bull"; import snakeCase from "lodash/snakeCase"; import { Second } from "@shared/utils/time"; import env from "@server/env"; +import Logger from "@server/logging/Logger"; import Metrics from "@server/logging/Metrics"; import Redis from "@server/storage/redis"; import ShutdownHelper, { ShutdownOrder } from "./ShutdownHelper"; @@ -11,6 +13,7 @@ export function createQueue( defaultJobOptions?: Partial ) { const prefix = `queue.${snakeCase(name)}`; + let processedJobsSinceCheck = 0; // Notes on reusing Redis connections for Bull: // https://github.com/OptimalBits/bull/blob/b6d530f72a774be0fd4936ddb4ad9df3b183f4b6/PATTERNS.md#reusing-redis-connections @@ -51,13 +54,31 @@ export function createQueue( queue.on("failed", () => { Metrics.increment(`${prefix}.jobs.failed`); }); + queue.on("active", () => { + processedJobsSinceCheck += 1; + }); if (env.ENVIRONMENT !== "test") { - // eslint-disable-next-line @typescript-eslint/no-misused-promises setInterval(async () => { Metrics.gauge(`${prefix}.count`, await queue.count()); Metrics.gauge(`${prefix}.delayed_count`, await queue.getDelayedCount()); }, 5 * Second); + + setInterval(async () => { + if (processedJobsSinceCheck > 0) { + processedJobsSinceCheck = 0; + return; + } + + processedJobsSinceCheck = 0; + const waiting = await queue.getWaitingCount(); + if (waiting > 50) { + Logger.fatal( + "Queue has stopped processing jobs", + new Error(`${waiting} jobs are waiting in the ${name} queue`) + ); + } + }, 30 * Second); } ShutdownHelper.add(name, ShutdownOrder.normal, async () => {