chore: Add APM tracing around queues

This commit is contained in:
Tom Moor
2022-04-06 21:59:52 -07:00
parent f10cfbbd9e
commit 9b5df51625

View File

@@ -1,4 +1,5 @@
import Logger from "@server/logging/logger";
import { APM } from "@server/logging/tracing";
import {
globalEventQueue,
processorEventQueue,
@@ -10,7 +11,12 @@ import tasks from "../queues/tasks";
export default function init() {
// This queue processes the global event bus
globalEventQueue.process(function (job) {
globalEventQueue.process(
APM.traceFunction({
serviceName: "worker",
spanName: "processGlobalEvent",
isRoot: true,
})(function (job) {
const event = job.data;
// For each registered processor we check to see if it wants to handle the
@@ -30,11 +36,17 @@ export default function init() {
processorEventQueue.add({ event, name });
}
}
});
})
);
// Jobs for individual processors are processed here. Only applicable events
// as unapplicable events were filtered in the global event queue above.
processorEventQueue.process(function (job) {
processorEventQueue.process(
APM.traceFunction({
serviceName: "worker",
spanName: "processEvent",
isRoot: true,
})(function (job) {
const { event, name } = job.data;
const ProcessorClass = processors[name];
@@ -52,13 +64,23 @@ export default function init() {
modelId: event.modelId,
});
processor.perform(event).catch((error: Error) => {
Logger.error(`Error processing ${event.name} in ${name}`, error, event);
Logger.error(
`Error processing ${event.name} in ${name}`,
error,
event
);
});
}
});
})
);
// Jobs for async tasks are processed here.
taskQueue.process(function (job) {
taskQueue.process(
APM.traceFunction({
serviceName: "worker",
spanName: "processTask",
isRoot: true,
})(function (job) {
const { name, props } = job.data;
const TaskClass = tasks[name];
@@ -74,5 +96,6 @@ export default function init() {
task.perform(props).catch((error: Error) => {
Logger.error(`Error processing task in ${name}`, error, props);
});
});
})
);
}