fix: Queue retry behavior (#3359)

* fix: Queue retry behavior

* Add default options for task queue
This commit is contained in:
Tom Moor
2022-04-10 17:50:42 -07:00
committed by GitHub
parent cfa71762c2
commit 963475d2b0
5 changed files with 124 additions and 34 deletions

View File

@@ -3,6 +3,7 @@ import winston from "winston";
import env from "@server/env";
import Metrics from "@server/logging/metrics";
import Sentry from "@server/logging/sentry";
import * as Tracing from "./tracing";
const isProduction = env.NODE_ENV === "production";
type LogCategory =
@@ -10,8 +11,9 @@ type LogCategory =
| "hocuspocus"
| "http"
| "commands"
| "processor"
| "worker"
| "task"
| "processor"
| "email"
| "queue"
| "database"
@@ -98,6 +100,7 @@ class Logger {
*/
error(message: string, error: Error, extra?: Extra) {
Metrics.increment("logger.error");
Tracing.setError(error);
if (process.env.SENTRY_DSN) {
Sentry.withScope(function (scope) {

View File

@@ -1,4 +1,4 @@
import { init, tracer } from "@theo.gravity/datadog-apm";
import { init, tracer, addTags, markAsError } from "@theo.gravity/datadog-apm";
export * as APM from "@theo.gravity/datadog-apm";
@@ -18,4 +18,30 @@ if (process.env.DD_API_KEY) {
);
}
/**
* Change the resource of the active APM span. This method wraps addTags to allow
* safe use in environments where APM is disabled.
*
* @param name The name of the resource
*/
export function setResource(name: string) {
if (tracer) {
addTags({
"resource.name": `${name}`,
});
}
}
/**
* Mark the current active span as an error. This method wraps addTags to allow
* safe use in environments where APM is disabled.
*
* @param error The error to add
*/
export function setError(error: Error) {
if (tracer) {
markAsError(error);
}
}
export default tracer;

View File

@@ -1,9 +1,29 @@
import { createQueue } from "@server/utils/queue";
export const globalEventQueue = createQueue("globalEvents");
export const globalEventQueue = createQueue("globalEvents", {
attempts: 5,
backoff: {
type: "exponential",
delay: 1000,
},
});
export const processorEventQueue = createQueue("processorEvents");
export const processorEventQueue = createQueue("processorEvents", {
attempts: 5,
backoff: {
type: "exponential",
delay: 10 * 1000,
},
});
export const websocketQueue = createQueue("websockets");
export const websocketQueue = createQueue("websockets", {
timeout: 10 * 1000,
});
export const taskQueue = createQueue("tasks");
export const taskQueue = createQueue("tasks", {
attempts: 5,
backoff: {
type: "exponential",
delay: 10 * 1000,
},
});

View File

@@ -1,4 +1,5 @@
import Logger from "@server/logging/logger";
import * as Tracing from "@server/logging/tracing";
import { APM } from "@server/logging/tracing";
import {
globalEventQueue,
@@ -14,10 +15,19 @@ export default function init() {
globalEventQueue.process(
APM.traceFunction({
serviceName: "worker",
spanName: "processGlobalEvent",
spanName: "process",
isRoot: true,
})(function (job) {
})(async function (job) {
const event = job.data;
let err;
Tracing.setResource(`Event.${event.name}`);
Logger.info("worker", `Processing ${event.name}`, {
name: event.name,
modelId: event.modelId,
attempt: job.attemptsMade,
});
// For each registered processor we check to see if it wants to handle the
// event (applicableEvents), and if so add a new queued job specifically
@@ -25,16 +35,35 @@ export default function init() {
for (const name in processors) {
const ProcessorClass = processors[name];
if (name === "WebsocketsProcessor") {
// websockets are a special case on their own queue because they must
// only be consumed by the websockets service rather than workers.
websocketQueue.add(job.data);
} else if (
ProcessorClass.applicableEvents.length === 0 ||
ProcessorClass.applicableEvents.includes(event.name)
) {
processorEventQueue.add({ event, name });
if (!ProcessorClass) {
throw new Error(
`Received event "${event.name}" for processor (${name}) that isn't registered. Check the file name matches the class name.`
);
}
try {
if (name === "WebsocketsProcessor") {
// websockets are a special case on their own queue because they must
// only be consumed by the websockets service rather than workers.
await websocketQueue.add(job.data);
} else if (
ProcessorClass.applicableEvents.length === 0 ||
ProcessorClass.applicableEvents.includes(event.name)
) {
await processorEventQueue.add({ event, name });
}
} catch (error) {
Logger.error(
`Error adding ${event.name} to ${name} queue`,
error,
event
);
err = error;
}
}
if (err) {
throw err;
}
})
);
@@ -44,12 +73,14 @@ export default function init() {
processorEventQueue.process(
APM.traceFunction({
serviceName: "worker",
spanName: "processEvent",
spanName: "process",
isRoot: true,
})(function (job) {
})(async function (job) {
const { event, name } = job.data;
const ProcessorClass = processors[name];
Tracing.setResource(`Processor.${name}`);
if (!ProcessorClass) {
throw new Error(
`Received event "${event.name}" for processor (${name}) that isn't registered. Check the file name matches the class name.`
@@ -59,17 +90,17 @@ export default function init() {
const processor = new ProcessorClass();
if (processor.perform) {
Logger.info("processor", `${name} processing ${event.name}`, {
Logger.info("worker", `${name} running ${event.name}`, {
name: event.name,
modelId: event.modelId,
});
processor.perform(event).catch((error: Error) => {
Logger.error(
`Error processing ${event.name} in ${name}`,
error,
event
);
});
try {
await processor.perform(event);
} catch (err) {
Logger.error(`Error processing ${event.name} in ${name}`, err, event);
throw err;
}
}
})
);
@@ -78,24 +109,30 @@ export default function init() {
taskQueue.process(
APM.traceFunction({
serviceName: "worker",
spanName: "processTask",
spanName: "process",
isRoot: true,
})(function (job) {
})(async function (job) {
const { name, props } = job.data;
const TaskClass = tasks[name];
Tracing.setResource(`Task.${name}`);
if (!TaskClass) {
throw new Error(
`Task "${name}" is not registered. Check the file name matches the class name.`
);
}
Logger.info("task", `${name} triggered`, props);
Logger.info("worker", `${name} running`, props);
const task = new TaskClass();
task.perform(props).catch((error: Error) => {
Logger.error(`Error processing task in ${name}`, error, props);
});
try {
await task.perform(props);
} catch (err) {
Logger.error(`Error processing task in ${name}`, err, props);
throw err;
}
})
);
}

View File

@@ -4,7 +4,10 @@ import { snakeCase } from "lodash";
import Metrics from "@server/logging/metrics";
import { client, subscriber } from "../redis";
export function createQueue(name: string) {
export function createQueue(
name: string,
defaultJobOptions?: Partial<Queue.JobOptions>
) {
const prefix = `queue.${snakeCase(name)}`;
const queue = new Queue(name, {
createClient(type) {
@@ -23,6 +26,7 @@ export function createQueue(name: string) {
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
...defaultJobOptions,
},
});
queue.on("stalled", () => {