From 963475d2b086e5e794eaf02310705dba6c164a74 Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Sun, 10 Apr 2022 17:50:42 -0700 Subject: [PATCH] fix: Queue retry behavior (#3359) * fix: Queue retry behavior * Add default options for task queue --- server/logging/logger.ts | 5 ++- server/logging/tracing.ts | 28 +++++++++++- server/queues/index.ts | 28 ++++++++++-- server/services/worker.ts | 91 +++++++++++++++++++++++++++------------ server/utils/queue.ts | 6 ++- 5 files changed, 124 insertions(+), 34 deletions(-) diff --git a/server/logging/logger.ts b/server/logging/logger.ts index 00133aec3..2fce9c034 100644 --- a/server/logging/logger.ts +++ b/server/logging/logger.ts @@ -3,6 +3,7 @@ import winston from "winston"; import env from "@server/env"; import Metrics from "@server/logging/metrics"; import Sentry from "@server/logging/sentry"; +import * as Tracing from "./tracing"; const isProduction = env.NODE_ENV === "production"; type LogCategory = @@ -10,8 +11,9 @@ type LogCategory = | "hocuspocus" | "http" | "commands" - | "processor" + | "worker" | "task" + | "processor" | "email" | "queue" | "database" @@ -98,6 +100,7 @@ class Logger { */ error(message: string, error: Error, extra?: Extra) { Metrics.increment("logger.error"); + Tracing.setError(error); if (process.env.SENTRY_DSN) { Sentry.withScope(function (scope) { diff --git a/server/logging/tracing.ts b/server/logging/tracing.ts index b2874783f..fc5e8537a 100644 --- a/server/logging/tracing.ts +++ b/server/logging/tracing.ts @@ -1,4 +1,4 @@ -import { init, tracer } from "@theo.gravity/datadog-apm"; +import { init, tracer, addTags, markAsError } from "@theo.gravity/datadog-apm"; export * as APM from "@theo.gravity/datadog-apm"; @@ -18,4 +18,30 @@ if (process.env.DD_API_KEY) { ); } +/** + * Change the resource of the active APM span. This method wraps addTags to allow + * safe use in environments where APM is disabled. + * + * @param name The name of the resource + */ +export function setResource(name: string) { + if (tracer) { + addTags({ + "resource.name": `${name}`, + }); + } +} + +/** + * Mark the current active span as an error. This method wraps addTags to allow + * safe use in environments where APM is disabled. + * + * @param error The error to add + */ +export function setError(error: Error) { + if (tracer) { + markAsError(error); + } +} + export default tracer; diff --git a/server/queues/index.ts b/server/queues/index.ts index 2ec47e83b..05dba1401 100644 --- a/server/queues/index.ts +++ b/server/queues/index.ts @@ -1,9 +1,29 @@ import { createQueue } from "@server/utils/queue"; -export const globalEventQueue = createQueue("globalEvents"); +export const globalEventQueue = createQueue("globalEvents", { + attempts: 5, + backoff: { + type: "exponential", + delay: 1000, + }, +}); -export const processorEventQueue = createQueue("processorEvents"); +export const processorEventQueue = createQueue("processorEvents", { + attempts: 5, + backoff: { + type: "exponential", + delay: 10 * 1000, + }, +}); -export const websocketQueue = createQueue("websockets"); +export const websocketQueue = createQueue("websockets", { + timeout: 10 * 1000, +}); -export const taskQueue = createQueue("tasks"); +export const taskQueue = createQueue("tasks", { + attempts: 5, + backoff: { + type: "exponential", + delay: 10 * 1000, + }, +}); diff --git a/server/services/worker.ts b/server/services/worker.ts index 570b39ffb..a21383a2e 100644 --- a/server/services/worker.ts +++ b/server/services/worker.ts @@ -1,4 +1,5 @@ import Logger from "@server/logging/logger"; +import * as Tracing from "@server/logging/tracing"; import { APM } from "@server/logging/tracing"; import { globalEventQueue, @@ -14,10 +15,19 @@ export default function init() { globalEventQueue.process( APM.traceFunction({ serviceName: "worker", - spanName: "processGlobalEvent", + spanName: "process", isRoot: true, - })(function (job) { + })(async function (job) { const event = job.data; + let err; + + Tracing.setResource(`Event.${event.name}`); + + Logger.info("worker", `Processing ${event.name}`, { + name: event.name, + modelId: event.modelId, + attempt: job.attemptsMade, + }); // For each registered processor we check to see if it wants to handle the // event (applicableEvents), and if so add a new queued job specifically @@ -25,16 +35,35 @@ export default function init() { for (const name in processors) { const ProcessorClass = processors[name]; - if (name === "WebsocketsProcessor") { - // websockets are a special case on their own queue because they must - // only be consumed by the websockets service rather than workers. - websocketQueue.add(job.data); - } else if ( - ProcessorClass.applicableEvents.length === 0 || - ProcessorClass.applicableEvents.includes(event.name) - ) { - processorEventQueue.add({ event, name }); + if (!ProcessorClass) { + throw new Error( + `Received event "${event.name}" for processor (${name}) that isn't registered. Check the file name matches the class name.` + ); } + + try { + if (name === "WebsocketsProcessor") { + // websockets are a special case on their own queue because they must + // only be consumed by the websockets service rather than workers. + await websocketQueue.add(job.data); + } else if ( + ProcessorClass.applicableEvents.length === 0 || + ProcessorClass.applicableEvents.includes(event.name) + ) { + await processorEventQueue.add({ event, name }); + } + } catch (error) { + Logger.error( + `Error adding ${event.name} to ${name} queue`, + error, + event + ); + err = error; + } + } + + if (err) { + throw err; } }) ); @@ -44,12 +73,14 @@ export default function init() { processorEventQueue.process( APM.traceFunction({ serviceName: "worker", - spanName: "processEvent", + spanName: "process", isRoot: true, - })(function (job) { + })(async function (job) { const { event, name } = job.data; const ProcessorClass = processors[name]; + Tracing.setResource(`Processor.${name}`); + if (!ProcessorClass) { throw new Error( `Received event "${event.name}" for processor (${name}) that isn't registered. Check the file name matches the class name.` @@ -59,17 +90,17 @@ export default function init() { const processor = new ProcessorClass(); if (processor.perform) { - Logger.info("processor", `${name} processing ${event.name}`, { + Logger.info("worker", `${name} running ${event.name}`, { name: event.name, modelId: event.modelId, }); - processor.perform(event).catch((error: Error) => { - Logger.error( - `Error processing ${event.name} in ${name}`, - error, - event - ); - }); + + try { + await processor.perform(event); + } catch (err) { + Logger.error(`Error processing ${event.name} in ${name}`, err, event); + throw err; + } } }) ); @@ -78,24 +109,30 @@ export default function init() { taskQueue.process( APM.traceFunction({ serviceName: "worker", - spanName: "processTask", + spanName: "process", isRoot: true, - })(function (job) { + })(async function (job) { const { name, props } = job.data; const TaskClass = tasks[name]; + Tracing.setResource(`Task.${name}`); + if (!TaskClass) { throw new Error( `Task "${name}" is not registered. Check the file name matches the class name.` ); } - Logger.info("task", `${name} triggered`, props); + Logger.info("worker", `${name} running`, props); const task = new TaskClass(); - task.perform(props).catch((error: Error) => { - Logger.error(`Error processing task in ${name}`, error, props); - }); + + try { + await task.perform(props); + } catch (err) { + Logger.error(`Error processing task in ${name}`, err, props); + throw err; + } }) ); } diff --git a/server/utils/queue.ts b/server/utils/queue.ts index cefb0cf5b..c8870156a 100644 --- a/server/utils/queue.ts +++ b/server/utils/queue.ts @@ -4,7 +4,10 @@ import { snakeCase } from "lodash"; import Metrics from "@server/logging/metrics"; import { client, subscriber } from "../redis"; -export function createQueue(name: string) { +export function createQueue( + name: string, + defaultJobOptions?: Partial +) { const prefix = `queue.${snakeCase(name)}`; const queue = new Queue(name, { createClient(type) { @@ -23,6 +26,7 @@ export function createQueue(name: string) { defaultJobOptions: { removeOnComplete: true, removeOnFail: true, + ...defaultJobOptions, }, }); queue.on("stalled", () => {