From 9b5df516254b4f56292287774ac78511242b7323 Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Wed, 6 Apr 2022 21:59:52 -0700 Subject: [PATCH] chore: Add APM tracing around queues --- server/services/worker.ts | 125 ++++++++++++++++++++++---------------- 1 file changed, 74 insertions(+), 51 deletions(-) diff --git a/server/services/worker.ts b/server/services/worker.ts index 3473bf0f9..570b39ffb 100644 --- a/server/services/worker.ts +++ b/server/services/worker.ts @@ -1,4 +1,5 @@ import Logger from "@server/logging/logger"; +import { APM } from "@server/logging/tracing"; import { globalEventQueue, processorEventQueue, @@ -10,69 +11,91 @@ import tasks from "../queues/tasks"; export default function init() { // This queue processes the global event bus - globalEventQueue.process(function (job) { - const event = job.data; + globalEventQueue.process( + APM.traceFunction({ + serviceName: "worker", + spanName: "processGlobalEvent", + isRoot: true, + })(function (job) { + const event = job.data; - // For each registered processor we check to see if it wants to handle the - // event (applicableEvents), and if so add a new queued job specifically - // for that processor. - for (const name in processors) { - const ProcessorClass = processors[name]; + // For each registered processor we check to see if it wants to handle the + // event (applicableEvents), and if so add a new queued job specifically + // for that processor. + for (const name in processors) { + const ProcessorClass = processors[name]; - if (name === "WebsocketsProcessor") { - // websockets are a special case on their own queue because they must - // only be consumed by the websockets service rather than workers. - websocketQueue.add(job.data); - } else if ( - ProcessorClass.applicableEvents.length === 0 || - ProcessorClass.applicableEvents.includes(event.name) - ) { - processorEventQueue.add({ event, name }); + if (name === "WebsocketsProcessor") { + // websockets are a special case on their own queue because they must + // only be consumed by the websockets service rather than workers. + websocketQueue.add(job.data); + } else if ( + ProcessorClass.applicableEvents.length === 0 || + ProcessorClass.applicableEvents.includes(event.name) + ) { + processorEventQueue.add({ event, name }); + } } - } - }); + }) + ); // Jobs for individual processors are processed here. Only applicable events // as unapplicable events were filtered in the global event queue above. - processorEventQueue.process(function (job) { - const { event, name } = job.data; - const ProcessorClass = processors[name]; + processorEventQueue.process( + APM.traceFunction({ + serviceName: "worker", + spanName: "processEvent", + isRoot: true, + })(function (job) { + const { event, name } = job.data; + const ProcessorClass = processors[name]; - if (!ProcessorClass) { - throw new Error( - `Received event "${event.name}" for processor (${name}) that isn't registered. Check the file name matches the class name.` - ); - } + if (!ProcessorClass) { + throw new Error( + `Received event "${event.name}" for processor (${name}) that isn't registered. Check the file name matches the class name.` + ); + } - const processor = new ProcessorClass(); + const processor = new ProcessorClass(); - if (processor.perform) { - Logger.info("processor", `${name} processing ${event.name}`, { - name: event.name, - modelId: event.modelId, - }); - processor.perform(event).catch((error: Error) => { - Logger.error(`Error processing ${event.name} in ${name}`, error, event); - }); - } - }); + if (processor.perform) { + Logger.info("processor", `${name} processing ${event.name}`, { + name: event.name, + modelId: event.modelId, + }); + processor.perform(event).catch((error: Error) => { + Logger.error( + `Error processing ${event.name} in ${name}`, + error, + event + ); + }); + } + }) + ); // Jobs for async tasks are processed here. - taskQueue.process(function (job) { - const { name, props } = job.data; - const TaskClass = tasks[name]; + taskQueue.process( + APM.traceFunction({ + serviceName: "worker", + spanName: "processTask", + isRoot: true, + })(function (job) { + const { name, props } = job.data; + const TaskClass = tasks[name]; - if (!TaskClass) { - throw new Error( - `Task "${name}" is not registered. Check the file name matches the class name.` - ); - } + if (!TaskClass) { + throw new Error( + `Task "${name}" is not registered. Check the file name matches the class name.` + ); + } - Logger.info("task", `${name} triggered`, props); + Logger.info("task", `${name} triggered`, props); - const task = new TaskClass(); - task.perform(props).catch((error: Error) => { - Logger.error(`Error processing task in ${name}`, error, props); - }); - }); + const task = new TaskClass(); + task.perform(props).catch((error: Error) => { + Logger.error(`Error processing task in ${name}`, error, props); + }); + }) + ); }