chore: Refactor worker, emails and data cleanup to task system (#3337)

* Refactor worker, all emails on task system

* fix

* lint

* fix: Remove a bunch of expect-error comments in related tests

* refactor: Move work from utils.gc into tasks

* test

* Add tracing to tasks and processors
fix: DebounceProcessor triggering on all events
Event.add -> Event.schedule
This commit is contained in:
Tom Moor
2022-04-06 16:48:28 -07:00
committed by GitHub
parent 9c766362ed
commit dbfdcd6d23
41 changed files with 729 additions and 444 deletions

View File

@@ -3,10 +3,10 @@ import { BullAdapter } from "@bull-board/api/bullAdapter";
import { KoaAdapter } from "@bull-board/koa";
import Koa from "koa";
import {
emailsQueue,
globalEventQueue,
processorEventQueue,
websocketsQueue,
websocketQueue,
taskQueue,
} from "../queues";
export default function init(app: Koa) {
@@ -15,8 +15,8 @@ export default function init(app: Koa) {
queues: [
new BullAdapter(globalEventQueue),
new BullAdapter(processorEventQueue),
new BullAdapter(emailsQueue),
new BullAdapter(websocketsQueue),
new BullAdapter(websocketQueue),
new BullAdapter(taskQueue),
],
serverAdapter,
});

View File

@@ -9,8 +9,8 @@ import Metrics from "@server/logging/metrics";
import { Document, Collection, View } from "@server/models";
import { can } from "@server/policies";
import { getUserForJWT } from "@server/utils/jwt";
import { websocketsQueue } from "../queues";
import WebsocketsProcessor from "../queues/processors/websockets";
import { websocketQueue } from "../queues";
import WebsocketsProcessor from "../queues/processors/WebsocketsProcessor";
import { client, subscriber } from "../redis";
export default function init(app: Koa, server: http.Server) {
@@ -247,9 +247,9 @@ export default function init(app: Koa, server: http.Server) {
// Handle events from event queue that should be sent to the clients down ws
const websockets = new WebsocketsProcessor();
websocketsQueue.process(async function websocketEventsProcessor(job) {
websocketQueue.process(async function websocketEventsProcessor(job) {
const event = job.data;
websockets.on(event, io).catch((error) => {
websockets.perform(event, io).catch((error) => {
Logger.error("Error processing websocket event", error, {
event,
});

View File

@@ -2,68 +2,77 @@ import Logger from "@server/logging/logger";
import {
globalEventQueue,
processorEventQueue,
websocketsQueue,
emailsQueue,
websocketQueue,
taskQueue,
} from "../queues";
import Backlinks from "../queues/processors/backlinks";
import Debouncer from "../queues/processors/debouncer";
import Emails from "../queues/processors/emails";
import Exports from "../queues/processors/exports";
import Imports from "../queues/processors/imports";
import Notifications from "../queues/processors/notifications";
import Revisions from "../queues/processors/revisions";
import Slack from "../queues/processors/slack";
const EmailsProcessor = new Emails();
const eventProcessors = {
backlinks: new Backlinks(),
debouncer: new Debouncer(),
imports: new Imports(),
exports: new Exports(),
notifications: new Notifications(),
revisions: new Revisions(),
slack: new Slack(),
};
import processors from "../queues/processors";
import tasks from "../queues/tasks";
export default function init() {
// this queue processes global events and hands them off to services
// This queue processes the global event bus
globalEventQueue.process(function (job) {
Object.keys(eventProcessors).forEach((name) => {
processorEventQueue.add({ ...job.data, service: name });
});
websocketsQueue.add(job.data);
});
processorEventQueue.process(function (job) {
const event = job.data;
const processor = eventProcessors[event.service];
if (!processor) {
Logger.warn(`Received event for processor that isn't registered`, event);
return;
// For each registered processor we check to see if it wants to handle the
// event (applicableEvents), and if so add a new queued job specifically
// for that processor.
for (const name in processors) {
const ProcessorClass = processors[name];
if (name === "WebsocketsProcessor") {
// websockets are a special case on their own queue because they must
// only be consumed by the websockets service rather than workers.
websocketQueue.add(job.data);
} else if (
ProcessorClass.applicableEvents.length === 0 ||
ProcessorClass.applicableEvents.includes(event.name)
) {
processorEventQueue.add({ event, name });
}
}
});
// Jobs for individual processors are processed here. Only applicable events
// as unapplicable events were filtered in the global event queue above.
processorEventQueue.process(function (job) {
const { event, name } = job.data;
const ProcessorClass = processors[name];
if (!ProcessorClass) {
throw new Error(
`Received event "${event.name}" for processor (${name}) that isn't registered. Check the file name matches the class name.`
);
}
if (processor.on) {
Logger.info("processor", `${event.service} processing ${event.name}`, {
const processor = new ProcessorClass();
if (processor.perform) {
Logger.info("processor", `${name} processing ${event.name}`, {
name: event.name,
modelId: event.modelId,
});
processor.on(event).catch((error: Error) => {
Logger.error(
`Error processing ${event.name} in ${event.service}`,
error,
event
);
processor.perform(event).catch((error: Error) => {
Logger.error(`Error processing ${event.name} in ${name}`, error, event);
});
}
});
emailsQueue.process(function (job) {
const event = job.data;
EmailsProcessor.on(event).catch((error) => {
Logger.error(
`Error processing ${event.name} in emails processor`,
error,
event
// Jobs for async tasks are processed here.
taskQueue.process(function (job) {
const { name, props } = job.data;
const TaskClass = tasks[name];
if (!TaskClass) {
throw new Error(
`Task "${name}" is not registered. Check the file name matches the class name.`
);
}
Logger.info("task", `${name} triggered`, props);
const task = new TaskClass();
task.perform(props).catch((error: Error) => {
Logger.error(`Error processing task in ${name}`, error, props);
});
});
}