Webhook / Integration Event bus (#499)
* First bash at an event bus for webhooks and integrations * Refactoring * poc * Revert too wide ranging changes Move to two-queues
This commit is contained in:
46
server/events.js
Normal file
46
server/events.js
Normal file
@@ -0,0 +1,46 @@
|
||||
// @flow
|
||||
import Queue from 'bull';
|
||||
import debug from 'debug';
|
||||
import services from '../services';
|
||||
import Document from './models/Document';
|
||||
import Collection from './models/Collection';
|
||||
|
||||
type DocumentEvent = {
|
||||
name: 'documents.create',
|
||||
model: Document,
|
||||
};
|
||||
|
||||
type CollectionEvent = {
|
||||
name: 'collections.create',
|
||||
model: Collection,
|
||||
};
|
||||
|
||||
export type Event = DocumentEvent | CollectionEvent;
|
||||
|
||||
const log = debug('events');
|
||||
const globalEventsQueue = new Queue('global events', process.env.REDIS_URL);
|
||||
const serviceEventsQueue = new Queue('service events', process.env.REDIS_URL);
|
||||
|
||||
// this queue processes global events and hands them off to service hooks
|
||||
globalEventsQueue.process(async function(job) {
|
||||
const names = Object.keys(services);
|
||||
names.forEach(name => {
|
||||
const service = services[name];
|
||||
if (service.on) {
|
||||
serviceEventsQueue.add({ service: name, ...job.data });
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// this queue processes an individual event for a specific service
|
||||
serviceEventsQueue.process(async function(job) {
|
||||
const event = job.data;
|
||||
const service = services[event.service];
|
||||
|
||||
if (service.on) {
|
||||
log(`Triggering ${event.name} for ${service.name}`);
|
||||
service.on(event);
|
||||
}
|
||||
});
|
||||
|
||||
export default globalEventsQueue;
|
||||
@@ -1,11 +1,12 @@
|
||||
// @flow
|
||||
import _ from 'lodash';
|
||||
import slug from 'slug';
|
||||
import randomstring from 'randomstring';
|
||||
import { DataTypes, sequelize } from '../sequelize';
|
||||
import { asyncLock } from '../redis';
|
||||
import events from '../events';
|
||||
import Document from './Document';
|
||||
import Event from './Event';
|
||||
import _ from 'lodash';
|
||||
|
||||
// $FlowIssue invalid flow-typed
|
||||
slug.defaults.mode = 'rfc3986';
|
||||
@@ -93,6 +94,20 @@ Collection.associate = models => {
|
||||
});
|
||||
};
|
||||
|
||||
// Hooks
|
||||
|
||||
Collection.addHook('afterCreate', model =>
|
||||
events.add({ name: 'collections.create', model })
|
||||
);
|
||||
|
||||
Collection.addHook('afterDestroy', model =>
|
||||
events.add({ name: 'collections.delete', model })
|
||||
);
|
||||
|
||||
Collection.addHook('afterUpdate', model =>
|
||||
events.add({ name: 'collections.update', model })
|
||||
);
|
||||
|
||||
// Instance methods
|
||||
|
||||
Collection.prototype.getUrl = function() {
|
||||
|
||||
@@ -7,6 +7,7 @@ import Plain from 'slate-plain-serializer';
|
||||
|
||||
import isUUID from 'validator/lib/isUUID';
|
||||
import { DataTypes, sequelize } from '../sequelize';
|
||||
import events from '../events';
|
||||
import parseTitle from '../../shared/utils/parseTitle';
|
||||
import Revision from './Revision';
|
||||
|
||||
@@ -204,6 +205,20 @@ Document.searchForUser = async (
|
||||
return _.sortBy(documents, doc => ids.indexOf(doc.id));
|
||||
};
|
||||
|
||||
// Hooks
|
||||
|
||||
Document.addHook('afterCreate', model =>
|
||||
events.add({ name: 'documents.create', model })
|
||||
);
|
||||
|
||||
Document.addHook('afterDestroy', model =>
|
||||
events.add({ name: 'documents.delete', model })
|
||||
);
|
||||
|
||||
Document.addHook('afterUpdate', model =>
|
||||
events.add({ name: 'documents.update', model })
|
||||
);
|
||||
|
||||
// Instance methods
|
||||
|
||||
Document.prototype.getSummary = function() {
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
// @flow
|
||||
const Slack = {
|
||||
id: 'slack',
|
||||
name: 'Slack',
|
||||
};
|
||||
|
||||
export default Slack;
|
||||
Reference in New Issue
Block a user