chore: Refactoring event processors and service architecture (#2495)
This commit is contained in:
@@ -1,132 +0,0 @@
|
||||
// @flow
|
||||
import type { DocumentEvent, RevisionEvent } from "../events";
|
||||
import { Document, Backlink } from "../models";
|
||||
import { Op } from "../sequelize";
|
||||
import parseDocumentIds from "../utils/parseDocumentIds";
|
||||
import slugify from "../utils/slugify";
|
||||
|
||||
export default class Backlinks {
|
||||
async on(event: DocumentEvent | RevisionEvent) {
|
||||
switch (event.name) {
|
||||
case "documents.publish": {
|
||||
const document = await Document.findByPk(event.documentId);
|
||||
if (!document) return;
|
||||
|
||||
const linkIds = parseDocumentIds(document.text);
|
||||
|
||||
await Promise.all(
|
||||
linkIds.map(async (linkId) => {
|
||||
const linkedDocument = await Document.findByPk(linkId);
|
||||
if (!linkedDocument || linkedDocument.id === event.documentId) {
|
||||
return;
|
||||
}
|
||||
|
||||
await Backlink.findOrCreate({
|
||||
where: {
|
||||
documentId: linkedDocument.id,
|
||||
reverseDocumentId: event.documentId,
|
||||
},
|
||||
defaults: {
|
||||
userId: document.lastModifiedById,
|
||||
},
|
||||
});
|
||||
})
|
||||
);
|
||||
|
||||
break;
|
||||
}
|
||||
case "documents.update": {
|
||||
const document = await Document.findByPk(event.documentId);
|
||||
if (!document) return;
|
||||
|
||||
// backlinks are only created for published documents
|
||||
if (!document.publishedAt) return;
|
||||
|
||||
const linkIds = parseDocumentIds(document.text);
|
||||
const linkedDocumentIds = [];
|
||||
|
||||
// create or find existing backlink records for referenced docs
|
||||
await Promise.all(
|
||||
linkIds.map(async (linkId) => {
|
||||
const linkedDocument = await Document.findByPk(linkId);
|
||||
if (!linkedDocument || linkedDocument.id === event.documentId) {
|
||||
return;
|
||||
}
|
||||
|
||||
await Backlink.findOrCreate({
|
||||
where: {
|
||||
documentId: linkedDocument.id,
|
||||
reverseDocumentId: event.documentId,
|
||||
},
|
||||
defaults: {
|
||||
userId: document.lastModifiedById,
|
||||
},
|
||||
});
|
||||
linkedDocumentIds.push(linkedDocument.id);
|
||||
})
|
||||
);
|
||||
|
||||
// delete any backlinks that no longer exist
|
||||
await Backlink.destroy({
|
||||
where: {
|
||||
documentId: {
|
||||
[Op.notIn]: linkedDocumentIds,
|
||||
},
|
||||
reverseDocumentId: event.documentId,
|
||||
},
|
||||
});
|
||||
break;
|
||||
}
|
||||
case "documents.title_change": {
|
||||
const document = await Document.findByPk(event.documentId);
|
||||
if (!document) return;
|
||||
|
||||
// might as well check
|
||||
const { title, previousTitle } = event.data;
|
||||
if (!previousTitle || title === previousTitle) break;
|
||||
|
||||
// update any link titles in documents that lead to this one
|
||||
const backlinks = await Backlink.findAll({
|
||||
where: {
|
||||
documentId: event.documentId,
|
||||
},
|
||||
include: [{ model: Document, as: "reverseDocument" }],
|
||||
});
|
||||
|
||||
await Promise.all(
|
||||
backlinks.map(async (backlink) => {
|
||||
const previousUrl = `/doc/${slugify(previousTitle)}-${
|
||||
document.urlId
|
||||
}`;
|
||||
|
||||
// find links in the other document that lead to this one and have
|
||||
// the old title as anchor text. Go ahead and update those to the
|
||||
// new title automatically
|
||||
backlink.reverseDocument.text = backlink.reverseDocument.text.replace(
|
||||
`[${previousTitle}](${previousUrl})`,
|
||||
`[${title}](${document.url})`
|
||||
);
|
||||
await backlink.reverseDocument.save({
|
||||
silent: true,
|
||||
hooks: false,
|
||||
});
|
||||
})
|
||||
);
|
||||
|
||||
break;
|
||||
}
|
||||
case "documents.delete": {
|
||||
await Backlink.destroy({
|
||||
where: {
|
||||
[Op.or]: [
|
||||
{ reverseDocumentId: event.documentId },
|
||||
{ documentId: event.documentId },
|
||||
],
|
||||
},
|
||||
});
|
||||
break;
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,242 +0,0 @@
|
||||
/* eslint-disable flowtype/require-valid-file-annotation */
|
||||
import { Backlink } from "../models";
|
||||
import { buildDocument } from "../test/factories";
|
||||
import { flushdb } from "../test/support";
|
||||
import BacklinksService from "./backlinks";
|
||||
|
||||
const Backlinks = new BacklinksService();
|
||||
|
||||
beforeEach(() => flushdb());
|
||||
beforeEach(jest.resetAllMocks);
|
||||
|
||||
describe("documents.publish", () => {
|
||||
test("should create new backlink records", async () => {
|
||||
const otherDocument = await buildDocument();
|
||||
const document = await buildDocument({
|
||||
text: `[this is a link](${otherDocument.url})`,
|
||||
});
|
||||
|
||||
await Backlinks.on({
|
||||
name: "documents.publish",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
const backlinks = await Backlink.findAll({
|
||||
where: { reverseDocumentId: document.id },
|
||||
});
|
||||
|
||||
expect(backlinks.length).toBe(1);
|
||||
});
|
||||
|
||||
test("should not fail when linked document is destroyed", async () => {
|
||||
const otherDocument = await buildDocument();
|
||||
await otherDocument.destroy();
|
||||
|
||||
const document = await buildDocument({
|
||||
version: null,
|
||||
text: `[ ] checklist item`,
|
||||
});
|
||||
|
||||
document.text = `[this is a link](${otherDocument.url})`;
|
||||
await document.save();
|
||||
|
||||
await Backlinks.on({
|
||||
name: "documents.publish",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
const backlinks = await Backlink.findAll({
|
||||
where: { reverseDocumentId: document.id },
|
||||
});
|
||||
|
||||
expect(backlinks.length).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("documents.update", () => {
|
||||
test("should not fail on a document with no previous revisions", async () => {
|
||||
const otherDocument = await buildDocument();
|
||||
const document = await buildDocument({
|
||||
text: `[this is a link](${otherDocument.url})`,
|
||||
});
|
||||
|
||||
await Backlinks.on({
|
||||
name: "documents.update",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
const backlinks = await Backlink.findAll({
|
||||
where: { reverseDocumentId: document.id },
|
||||
});
|
||||
|
||||
expect(backlinks.length).toBe(1);
|
||||
});
|
||||
|
||||
test("should not fail when previous revision is different document version", async () => {
|
||||
const otherDocument = await buildDocument();
|
||||
const document = await buildDocument({
|
||||
version: null,
|
||||
text: `[ ] checklist item`,
|
||||
});
|
||||
|
||||
document.text = `[this is a link](${otherDocument.url})`;
|
||||
await document.save();
|
||||
|
||||
await Backlinks.on({
|
||||
name: "documents.update",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
const backlinks = await Backlink.findAll({
|
||||
where: { reverseDocumentId: document.id },
|
||||
});
|
||||
|
||||
expect(backlinks.length).toBe(1);
|
||||
});
|
||||
|
||||
test("should create new backlink records", async () => {
|
||||
const otherDocument = await buildDocument();
|
||||
const document = await buildDocument();
|
||||
|
||||
document.text = `[this is a link](${otherDocument.url})`;
|
||||
await document.save();
|
||||
|
||||
await Backlinks.on({
|
||||
name: "documents.update",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
const backlinks = await Backlink.findAll({
|
||||
where: { reverseDocumentId: document.id },
|
||||
});
|
||||
|
||||
expect(backlinks.length).toBe(1);
|
||||
});
|
||||
|
||||
test("should destroy removed backlink records", async () => {
|
||||
const otherDocument = await buildDocument();
|
||||
const yetAnotherDocument = await buildDocument();
|
||||
const document = await buildDocument({
|
||||
text: `[this is a link](${otherDocument.url})
|
||||
|
||||
[this is a another link](${yetAnotherDocument.url})`,
|
||||
});
|
||||
|
||||
await Backlinks.on({
|
||||
name: "documents.publish",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
document.text = `First link is gone
|
||||
|
||||
[this is a another link](${yetAnotherDocument.url})`;
|
||||
await document.save();
|
||||
|
||||
await Backlinks.on({
|
||||
name: "documents.update",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
const backlinks = await Backlink.findAll({
|
||||
where: { reverseDocumentId: document.id },
|
||||
});
|
||||
|
||||
expect(backlinks.length).toBe(1);
|
||||
expect(backlinks[0].documentId).toBe(yetAnotherDocument.id);
|
||||
});
|
||||
});
|
||||
|
||||
describe("documents.delete", () => {
|
||||
test("should destroy related backlinks", async () => {
|
||||
const otherDocument = await buildDocument();
|
||||
const document = await buildDocument();
|
||||
|
||||
document.text = `[this is a link](${otherDocument.url})`;
|
||||
await document.save();
|
||||
|
||||
await Backlinks.on({
|
||||
name: "documents.update",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
await Backlinks.on({
|
||||
name: "documents.delete",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
const backlinks = await Backlink.findAll({
|
||||
where: { reverseDocumentId: document.id },
|
||||
});
|
||||
|
||||
expect(backlinks.length).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("documents.title_change", () => {
|
||||
test("should update titles in backlinked documents", async () => {
|
||||
const newTitle = "test";
|
||||
const document = await buildDocument();
|
||||
const otherDocument = await buildDocument();
|
||||
const previousTitle = otherDocument.title;
|
||||
|
||||
// create a doc with a link back
|
||||
document.text = `[${otherDocument.title}](${otherDocument.url})`;
|
||||
await document.save();
|
||||
|
||||
// ensure the backlinks are created
|
||||
await Backlinks.on({
|
||||
name: "documents.update",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
// change the title of the linked doc
|
||||
otherDocument.title = newTitle;
|
||||
await otherDocument.save();
|
||||
|
||||
// does the text get updated with the new title
|
||||
await Backlinks.on({
|
||||
name: "documents.title_change",
|
||||
documentId: otherDocument.id,
|
||||
collectionId: otherDocument.collectionId,
|
||||
teamId: otherDocument.teamId,
|
||||
actorId: otherDocument.createdById,
|
||||
data: {
|
||||
previousTitle,
|
||||
title: newTitle,
|
||||
},
|
||||
});
|
||||
await document.reload();
|
||||
|
||||
expect(document.text).toBe(`[${newTitle}](${otherDocument.url})`);
|
||||
});
|
||||
});
|
||||
@@ -1,44 +0,0 @@
|
||||
// @flow
|
||||
import events, { type Event } from "../events";
|
||||
import { Document } from "../models";
|
||||
|
||||
export default class Debouncer {
|
||||
async on(event: Event) {
|
||||
switch (event.name) {
|
||||
case "documents.update": {
|
||||
events.add(
|
||||
{
|
||||
...event,
|
||||
name: "documents.update.delayed",
|
||||
},
|
||||
{
|
||||
delay: 5 * 60 * 1000,
|
||||
removeOnComplete: true,
|
||||
}
|
||||
);
|
||||
break;
|
||||
}
|
||||
case "documents.update.delayed": {
|
||||
const document = await Document.findByPk(event.documentId);
|
||||
|
||||
// If the document has been deleted then prevent further processing
|
||||
if (!document) return;
|
||||
|
||||
// If the document has been updated since we initially queued the delayed
|
||||
// event then abort, there must be another updated event in the queue –
|
||||
// this functions as a simple distributed debounce.
|
||||
if (document.updatedAt > new Date(event.createdAt)) return;
|
||||
|
||||
events.add(
|
||||
{
|
||||
...event,
|
||||
name: "documents.update.debounced",
|
||||
},
|
||||
{ removeOnComplete: true }
|
||||
);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,42 +0,0 @@
|
||||
// @flow
|
||||
import fs from "fs";
|
||||
import os from "os";
|
||||
import File from "formidable/lib/file";
|
||||
import collectionImporter from "../commands/collectionImporter";
|
||||
import type { Event } from "../events";
|
||||
import { Attachment, User } from "../models";
|
||||
|
||||
export default class Importer {
|
||||
async on(event: Event) {
|
||||
switch (event.name) {
|
||||
case "collections.import": {
|
||||
const { type } = event.data;
|
||||
const attachment = await Attachment.findByPk(event.modelId);
|
||||
const user = await User.findByPk(event.actorId);
|
||||
|
||||
const buffer = await attachment.buffer;
|
||||
const tmpDir = os.tmpdir();
|
||||
const tmpFilePath = `${tmpDir}/upload-${event.modelId}`;
|
||||
|
||||
await fs.promises.writeFile(tmpFilePath, buffer);
|
||||
const file = new File({
|
||||
name: attachment.name,
|
||||
type: attachment.type,
|
||||
path: tmpFilePath,
|
||||
});
|
||||
|
||||
await collectionImporter({
|
||||
file,
|
||||
user,
|
||||
type,
|
||||
ip: event.ip,
|
||||
});
|
||||
|
||||
await attachment.destroy();
|
||||
|
||||
return;
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,18 +1,6 @@
|
||||
// @flow
|
||||
import debug from "debug";
|
||||
import { requireDirectory } from "../utils/fs";
|
||||
import web from "./web";
|
||||
import websockets from "./websockets";
|
||||
import worker from "./worker";
|
||||
|
||||
const log = debug("services");
|
||||
const services = {};
|
||||
|
||||
if (!process.env.SINGLE_RUN) {
|
||||
requireDirectory(__dirname).forEach(([module, name]) => {
|
||||
if (module && module.default) {
|
||||
const Service = module.default;
|
||||
services[name] = new Service();
|
||||
log(`loaded ${name} service`);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export default services;
|
||||
export default { web, websockets, worker };
|
||||
|
||||
@@ -1,164 +0,0 @@
|
||||
// @flow
|
||||
import debug from "debug";
|
||||
import type { DocumentEvent, CollectionEvent, Event } from "../events";
|
||||
import mailer from "../mailer";
|
||||
import {
|
||||
View,
|
||||
Document,
|
||||
Team,
|
||||
Collection,
|
||||
User,
|
||||
NotificationSetting,
|
||||
} from "../models";
|
||||
import { Op } from "../sequelize";
|
||||
|
||||
const log = debug("services");
|
||||
|
||||
export default class Notifications {
|
||||
async on(event: Event) {
|
||||
switch (event.name) {
|
||||
case "documents.publish":
|
||||
case "documents.update.debounced":
|
||||
return this.documentUpdated(event);
|
||||
case "collections.create":
|
||||
return this.collectionCreated(event);
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
async documentUpdated(event: DocumentEvent) {
|
||||
// never send notifications when batch importing documents
|
||||
if (event.data && event.data.source === "import") return;
|
||||
|
||||
const document = await Document.findByPk(event.documentId);
|
||||
if (!document) return;
|
||||
|
||||
const { collection } = document;
|
||||
if (!collection) return;
|
||||
|
||||
const team = await Team.findByPk(document.teamId);
|
||||
if (!team) return;
|
||||
|
||||
const notificationSettings = await NotificationSetting.findAll({
|
||||
where: {
|
||||
userId: {
|
||||
[Op.ne]: document.lastModifiedById,
|
||||
},
|
||||
teamId: document.teamId,
|
||||
event:
|
||||
event.name === "documents.publish"
|
||||
? "documents.publish"
|
||||
: "documents.update",
|
||||
},
|
||||
include: [
|
||||
{
|
||||
model: User,
|
||||
required: true,
|
||||
as: "user",
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const eventName =
|
||||
event.name === "documents.publish" ? "published" : "updated";
|
||||
|
||||
for (const setting of notificationSettings) {
|
||||
// Suppress notifications for suspended users
|
||||
if (setting.user.isSuspended) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// For document updates we only want to send notifications if
|
||||
// the document has been edited by the user with this notification setting
|
||||
// This could be replaced with ability to "follow" in the future
|
||||
if (
|
||||
eventName === "updated" &&
|
||||
!document.collaboratorIds.includes(setting.userId)
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check the user has access to the collection this document is in. Just
|
||||
// because they were a collaborator once doesn't mean they still are.
|
||||
const collectionIds = await setting.user.collectionIds();
|
||||
if (!collectionIds.includes(document.collectionId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If this user has viewed the document since the last update was made
|
||||
// then we can avoid sending them a useless notification, yay.
|
||||
const view = await View.findOne({
|
||||
where: {
|
||||
userId: setting.userId,
|
||||
documentId: event.documentId,
|
||||
updatedAt: {
|
||||
[Op.gt]: document.updatedAt,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
if (view) {
|
||||
log(
|
||||
`suppressing notification to ${setting.userId} because update viewed`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
mailer.documentNotification({
|
||||
to: setting.user.email,
|
||||
eventName,
|
||||
document,
|
||||
team,
|
||||
collection,
|
||||
actor: document.updatedBy,
|
||||
unsubscribeUrl: setting.unsubscribeUrl,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async collectionCreated(event: CollectionEvent) {
|
||||
const collection = await Collection.findByPk(event.collectionId, {
|
||||
include: [
|
||||
{
|
||||
model: User,
|
||||
required: true,
|
||||
as: "user",
|
||||
},
|
||||
],
|
||||
});
|
||||
if (!collection) return;
|
||||
if (!collection.permission) return;
|
||||
|
||||
const notificationSettings = await NotificationSetting.findAll({
|
||||
where: {
|
||||
userId: {
|
||||
[Op.ne]: collection.createdById,
|
||||
},
|
||||
teamId: collection.teamId,
|
||||
event: event.name,
|
||||
},
|
||||
include: [
|
||||
{
|
||||
model: User,
|
||||
required: true,
|
||||
as: "user",
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
for (const setting of notificationSettings) {
|
||||
// Suppress notifications for suspended users
|
||||
if (setting.user.isSuspended) {
|
||||
continue;
|
||||
}
|
||||
|
||||
mailer.collectionNotification({
|
||||
to: setting.user.email,
|
||||
eventName: "created",
|
||||
collection,
|
||||
actor: collection.user,
|
||||
unsubscribeUrl: setting.unsubscribeUrl,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,164 +0,0 @@
|
||||
/* eslint-disable flowtype/require-valid-file-annotation */
|
||||
import mailer from "../mailer";
|
||||
import { View, NotificationSetting } from "../models";
|
||||
import { buildDocument, buildCollection, buildUser } from "../test/factories";
|
||||
import { flushdb } from "../test/support";
|
||||
import NotificationsService from "./notifications";
|
||||
|
||||
jest.mock("../mailer");
|
||||
|
||||
const Notifications = new NotificationsService();
|
||||
|
||||
beforeEach(() => flushdb());
|
||||
beforeEach(jest.resetAllMocks);
|
||||
|
||||
describe("documents.publish", () => {
|
||||
test("should not send a notification to author", async () => {
|
||||
const user = await buildUser();
|
||||
const document = await buildDocument({
|
||||
teamId: user.teamId,
|
||||
lastModifiedById: user.id,
|
||||
});
|
||||
|
||||
await NotificationSetting.create({
|
||||
userId: user.id,
|
||||
teamId: user.teamId,
|
||||
event: "documents.publish",
|
||||
});
|
||||
|
||||
await Notifications.on({
|
||||
name: "documents.publish",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
expect(mailer.documentNotification).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test("should send a notification to other users in team", async () => {
|
||||
const user = await buildUser();
|
||||
const document = await buildDocument({
|
||||
teamId: user.teamId,
|
||||
});
|
||||
|
||||
await NotificationSetting.create({
|
||||
userId: user.id,
|
||||
teamId: user.teamId,
|
||||
event: "documents.publish",
|
||||
});
|
||||
|
||||
await Notifications.on({
|
||||
name: "documents.publish",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
expect(mailer.documentNotification).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test("should not send a notification to users without collection access", async () => {
|
||||
const user = await buildUser();
|
||||
const collection = await buildCollection({
|
||||
teamId: user.teamId,
|
||||
permission: null,
|
||||
});
|
||||
const document = await buildDocument({
|
||||
teamId: user.teamId,
|
||||
collectionId: collection.id,
|
||||
});
|
||||
|
||||
await NotificationSetting.create({
|
||||
userId: user.id,
|
||||
teamId: user.teamId,
|
||||
event: "documents.publish",
|
||||
});
|
||||
|
||||
await Notifications.on({
|
||||
name: "documents.publish",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
expect(mailer.documentNotification).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("documents.update.debounced", () => {
|
||||
test("should send a notification to other collaborator", async () => {
|
||||
const document = await buildDocument();
|
||||
const collaborator = await buildUser({ teamId: document.teamId });
|
||||
document.collaboratorIds = [collaborator.id];
|
||||
await document.save();
|
||||
|
||||
await NotificationSetting.create({
|
||||
userId: collaborator.id,
|
||||
teamId: collaborator.teamId,
|
||||
event: "documents.update",
|
||||
});
|
||||
|
||||
await Notifications.on({
|
||||
name: "documents.update.debounced",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
expect(mailer.documentNotification).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test("should not send a notification if viewed since update", async () => {
|
||||
const document = await buildDocument();
|
||||
const collaborator = await buildUser({ teamId: document.teamId });
|
||||
document.collaboratorIds = [collaborator.id];
|
||||
await document.save();
|
||||
|
||||
await NotificationSetting.create({
|
||||
userId: collaborator.id,
|
||||
teamId: collaborator.teamId,
|
||||
event: "documents.update",
|
||||
});
|
||||
|
||||
await View.touch(document.id, collaborator.id, true);
|
||||
|
||||
await Notifications.on({
|
||||
name: "documents.update.debounced",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
expect(mailer.documentNotification).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test("should not send a notification to last editor", async () => {
|
||||
const user = await buildUser();
|
||||
const document = await buildDocument({
|
||||
teamId: user.teamId,
|
||||
lastModifiedById: user.id,
|
||||
});
|
||||
|
||||
await NotificationSetting.create({
|
||||
userId: user.id,
|
||||
teamId: user.teamId,
|
||||
event: "documents.update",
|
||||
});
|
||||
|
||||
await Notifications.on({
|
||||
name: "documents.update.debounced",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
expect(mailer.documentNotification).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -1,37 +0,0 @@
|
||||
// @flow
|
||||
import invariant from "invariant";
|
||||
import revisionCreator from "../commands/revisionCreator";
|
||||
import type { DocumentEvent, RevisionEvent } from "../events";
|
||||
import { Revision, Document, User } from "../models";
|
||||
|
||||
export default class Revisions {
|
||||
async on(event: DocumentEvent | RevisionEvent) {
|
||||
switch (event.name) {
|
||||
case "documents.publish":
|
||||
case "documents.update.debounced": {
|
||||
const document = await Document.findByPk(event.documentId);
|
||||
invariant(document, "Document should exist");
|
||||
|
||||
const previous = await Revision.findLatest(document.id);
|
||||
|
||||
// we don't create revisions if identical to previous revision, this can
|
||||
// happen if a manual revision was created from another service or user.
|
||||
if (
|
||||
previous &&
|
||||
document.text === previous.text &&
|
||||
document.title === previous.title
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
const user = await User.findByPk(event.actorId);
|
||||
invariant(user, "User should exist");
|
||||
|
||||
await revisionCreator({ user, document });
|
||||
|
||||
break;
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,61 +0,0 @@
|
||||
/* eslint-disable flowtype/require-valid-file-annotation */
|
||||
import { Revision } from "../models";
|
||||
import { buildDocument } from "../test/factories";
|
||||
import { flushdb } from "../test/support";
|
||||
import RevisionsService from "./revisions";
|
||||
|
||||
const Revisions = new RevisionsService();
|
||||
|
||||
beforeEach(() => flushdb());
|
||||
beforeEach(jest.resetAllMocks);
|
||||
|
||||
describe("documents.publish", () => {
|
||||
test("should create a revision", async () => {
|
||||
const document = await buildDocument();
|
||||
|
||||
await Revisions.on({
|
||||
name: "documents.publish",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
const amount = await Revision.count({ where: { documentId: document.id } });
|
||||
expect(amount).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("documents.update.debounced", () => {
|
||||
test("should create a revision", async () => {
|
||||
const document = await buildDocument();
|
||||
|
||||
await Revisions.on({
|
||||
name: "documents.update.debounced",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
const amount = await Revision.count({ where: { documentId: document.id } });
|
||||
expect(amount).toBe(1);
|
||||
});
|
||||
|
||||
test("should not create a revision if identical to previous", async () => {
|
||||
const document = await buildDocument();
|
||||
|
||||
await Revision.createFromDocument(document);
|
||||
|
||||
await Revisions.on({
|
||||
name: "documents.update.debounced",
|
||||
documentId: document.id,
|
||||
collectionId: document.collectionId,
|
||||
teamId: document.teamId,
|
||||
actorId: document.createdById,
|
||||
});
|
||||
|
||||
const amount = await Revision.count({ where: { documentId: document.id } });
|
||||
expect(amount).toBe(1);
|
||||
});
|
||||
});
|
||||
@@ -1,99 +0,0 @@
|
||||
// @flow
|
||||
import fetch from "fetch-with-proxy";
|
||||
import type { DocumentEvent, IntegrationEvent, Event } from "../events";
|
||||
import { Document, Integration, Collection, Team } from "../models";
|
||||
import { presentSlackAttachment } from "../presenters";
|
||||
|
||||
export default class Slack {
|
||||
async on(event: Event) {
|
||||
switch (event.name) {
|
||||
case "documents.publish":
|
||||
case "documents.update.debounced":
|
||||
return this.documentUpdated(event);
|
||||
case "integrations.create":
|
||||
return this.integrationCreated(event);
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
async integrationCreated(event: IntegrationEvent) {
|
||||
const integration = await Integration.findOne({
|
||||
where: {
|
||||
id: event.modelId,
|
||||
service: "slack",
|
||||
type: "post",
|
||||
},
|
||||
include: [
|
||||
{
|
||||
model: Collection,
|
||||
required: true,
|
||||
as: "collection",
|
||||
},
|
||||
],
|
||||
});
|
||||
if (!integration) return;
|
||||
|
||||
const collection = integration.collection;
|
||||
if (!collection) return;
|
||||
|
||||
await fetch(integration.settings.url, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
text: `👋 Hey there! When documents are published or updated in the *${collection.name}* collection on Outline they will be posted to this channel!`,
|
||||
attachments: [
|
||||
{
|
||||
color: collection.color,
|
||||
title: collection.name,
|
||||
title_link: `${process.env.URL}${collection.url}`,
|
||||
text: collection.description,
|
||||
},
|
||||
],
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
async documentUpdated(event: DocumentEvent) {
|
||||
// never send notifications when batch importing documents
|
||||
if (event.data && event.data.source === "import") return;
|
||||
|
||||
const document = await Document.findByPk(event.documentId);
|
||||
if (!document) return;
|
||||
|
||||
// never send notifications for draft documents
|
||||
if (!document.publishedAt) return;
|
||||
|
||||
const integration = await Integration.findOne({
|
||||
where: {
|
||||
teamId: document.teamId,
|
||||
collectionId: document.collectionId,
|
||||
service: "slack",
|
||||
type: "post",
|
||||
},
|
||||
});
|
||||
if (!integration) return;
|
||||
|
||||
const team = await Team.findByPk(document.teamId);
|
||||
|
||||
let text = `${document.updatedBy.name} updated a document`;
|
||||
|
||||
if (event.name === "documents.publish") {
|
||||
text = `${document.createdBy.name} published a new document`;
|
||||
}
|
||||
|
||||
await fetch(integration.settings.url, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
text,
|
||||
attachments: [
|
||||
presentSlackAttachment(document, document.collection, team),
|
||||
],
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
174
server/services/web.js
Normal file
174
server/services/web.js
Normal file
@@ -0,0 +1,174 @@
|
||||
// @flow
|
||||
import http from "http";
|
||||
import Koa from "koa";
|
||||
import {
|
||||
contentSecurityPolicy,
|
||||
dnsPrefetchControl,
|
||||
referrerPolicy,
|
||||
} from "koa-helmet";
|
||||
import mount from "koa-mount";
|
||||
import onerror from "koa-onerror";
|
||||
import enforceHttps from "koa-sslify";
|
||||
import api from "../api";
|
||||
import auth from "../auth";
|
||||
import emails from "../emails";
|
||||
import env from "../env";
|
||||
import routes from "../routes";
|
||||
import Sentry from "../sentry";
|
||||
|
||||
const isProduction = env.NODE_ENV === "production";
|
||||
const isTest = env.NODE_ENV === "test";
|
||||
|
||||
// Construct scripts CSP based on services in use by this installation
|
||||
const defaultSrc = ["'self'"];
|
||||
const scriptSrc = [
|
||||
"'self'",
|
||||
"'unsafe-inline'",
|
||||
"'unsafe-eval'",
|
||||
"gist.github.com",
|
||||
];
|
||||
|
||||
if (env.GOOGLE_ANALYTICS_ID) {
|
||||
scriptSrc.push("www.google-analytics.com");
|
||||
}
|
||||
if (env.CDN_URL) {
|
||||
scriptSrc.push(env.CDN_URL);
|
||||
defaultSrc.push(env.CDN_URL);
|
||||
}
|
||||
|
||||
export default function init(app: Koa = new Koa(), server?: http.Server): Koa {
|
||||
if (isProduction) {
|
||||
// Force redirect to HTTPS protocol unless explicitly disabled
|
||||
if (process.env.FORCE_HTTPS !== "false") {
|
||||
app.use(
|
||||
enforceHttps({
|
||||
trustProtoHeader: true,
|
||||
})
|
||||
);
|
||||
} else {
|
||||
console.warn("Enforced https was disabled with FORCE_HTTPS env variable");
|
||||
}
|
||||
|
||||
// trust header fields set by our proxy. eg X-Forwarded-For
|
||||
app.proxy = true;
|
||||
} else if (!isTest) {
|
||||
/* eslint-disable global-require */
|
||||
const convert = require("koa-convert");
|
||||
const webpack = require("webpack");
|
||||
const devMiddleware = require("koa-webpack-dev-middleware");
|
||||
const hotMiddleware = require("koa-webpack-hot-middleware");
|
||||
const config = require("../../webpack.config.dev");
|
||||
const compile = webpack(config);
|
||||
/* eslint-enable global-require */
|
||||
|
||||
const middleware = devMiddleware(compile, {
|
||||
// display no info to console (only warnings and errors)
|
||||
noInfo: true,
|
||||
|
||||
// display nothing to the console
|
||||
quiet: false,
|
||||
|
||||
watchOptions: {
|
||||
poll: 1000,
|
||||
ignored: ["node_modules", "flow-typed", "server", "build", "__mocks__"],
|
||||
},
|
||||
|
||||
// public path to bind the middleware to
|
||||
// use the same as in webpack
|
||||
publicPath: config.output.publicPath,
|
||||
|
||||
// options for formatting the statistics
|
||||
stats: {
|
||||
colors: true,
|
||||
},
|
||||
});
|
||||
|
||||
app.use(async (ctx, next) => {
|
||||
ctx.webpackConfig = config;
|
||||
ctx.devMiddleware = middleware;
|
||||
await next();
|
||||
});
|
||||
app.use(convert(middleware));
|
||||
app.use(
|
||||
convert(
|
||||
hotMiddleware(compile, {
|
||||
log: console.log, // eslint-disable-line
|
||||
path: "/__webpack_hmr",
|
||||
heartbeat: 10 * 1000,
|
||||
})
|
||||
)
|
||||
);
|
||||
app.use(mount("/emails", emails));
|
||||
}
|
||||
|
||||
// catch errors in one place, automatically set status and response headers
|
||||
onerror(app);
|
||||
|
||||
app.on("error", (error, ctx) => {
|
||||
// we don't need to report every time a request stops to the bug tracker
|
||||
if (error.code === "EPIPE" || error.code === "ECONNRESET") {
|
||||
console.warn("Connection error", { error });
|
||||
return;
|
||||
}
|
||||
|
||||
if (process.env.SENTRY_DSN) {
|
||||
Sentry.withScope(function (scope) {
|
||||
const requestId = ctx.headers["x-request-id"];
|
||||
if (requestId) {
|
||||
scope.setTag("request_id", requestId);
|
||||
}
|
||||
|
||||
const authType = ctx.state ? ctx.state.authType : undefined;
|
||||
if (authType) {
|
||||
scope.setTag("auth_type", authType);
|
||||
}
|
||||
|
||||
const userId =
|
||||
ctx.state && ctx.state.user ? ctx.state.user.id : undefined;
|
||||
if (userId) {
|
||||
scope.setUser({ id: userId });
|
||||
}
|
||||
|
||||
scope.addEventProcessor(function (event) {
|
||||
return Sentry.Handlers.parseRequest(event, ctx.request);
|
||||
});
|
||||
Sentry.captureException(error);
|
||||
});
|
||||
} else {
|
||||
console.error(error);
|
||||
}
|
||||
});
|
||||
|
||||
app.use(mount("/auth", auth));
|
||||
app.use(mount("/api", api));
|
||||
|
||||
// Sets common security headers by default, such as no-sniff, hsts, hide powered
|
||||
// by etc, these are applied after auth and api so they are only returned on
|
||||
// standard non-XHR accessed routes
|
||||
app.use(async (ctx, next) => {
|
||||
ctx.set("Permissions-Policy", "interest-cohort=()");
|
||||
await next();
|
||||
});
|
||||
app.use(
|
||||
contentSecurityPolicy({
|
||||
directives: {
|
||||
defaultSrc,
|
||||
scriptSrc,
|
||||
styleSrc: ["'self'", "'unsafe-inline'", "github.githubassets.com"],
|
||||
imgSrc: ["*", "data:", "blob:"],
|
||||
frameSrc: ["*"],
|
||||
connectSrc: ["*"],
|
||||
// Do not use connect-src: because self + websockets does not work in
|
||||
// Safari, ref: https://bugs.webkit.org/show_bug.cgi?id=201591
|
||||
},
|
||||
})
|
||||
);
|
||||
|
||||
// Allow DNS prefetching for performance, we do not care about leaking requests
|
||||
// to our own CDN's
|
||||
app.use(dnsPrefetchControl({ allow: true }));
|
||||
app.use(referrerPolicy({ policy: "no-referrer" }));
|
||||
app.use(mount(routes));
|
||||
|
||||
return app;
|
||||
}
|
||||
@@ -1,503 +1,242 @@
|
||||
// @flow
|
||||
import { subHours } from "date-fns";
|
||||
import type { Event } from "../events";
|
||||
import { socketio } from "../main";
|
||||
import {
|
||||
Document,
|
||||
Collection,
|
||||
Group,
|
||||
CollectionGroup,
|
||||
GroupUser,
|
||||
} from "../models";
|
||||
import { Op } from "../sequelize";
|
||||
import http from "http";
|
||||
import Koa from "koa";
|
||||
import IO from "socket.io";
|
||||
import socketRedisAdapter from "socket.io-redis";
|
||||
import SocketAuth from "socketio-auth";
|
||||
import env from "../env";
|
||||
import { Document, Collection, View } from "../models";
|
||||
import policy from "../policies";
|
||||
import { websocketsQueue } from "../queues";
|
||||
import WebsocketsProcessor from "../queues/processors/websockets";
|
||||
import { client, subscriber } from "../redis";
|
||||
import Sentry from "../sentry";
|
||||
import { getUserForJWT } from "../utils/jwt";
|
||||
import * as metrics from "../utils/metrics";
|
||||
|
||||
export default class Websockets {
|
||||
async on(event: Event) {
|
||||
if (!socketio) {
|
||||
return;
|
||||
const { can } = policy;
|
||||
const websockets = new WebsocketsProcessor();
|
||||
|
||||
export default function init(app: Koa, server: http.Server) {
|
||||
const io = IO(server, {
|
||||
path: "/realtime",
|
||||
serveClient: false,
|
||||
cookie: false,
|
||||
});
|
||||
|
||||
io.adapter(
|
||||
socketRedisAdapter({
|
||||
pubClient: client,
|
||||
subClient: subscriber,
|
||||
})
|
||||
);
|
||||
|
||||
io.origins((_, callback) => {
|
||||
callback(null, true);
|
||||
});
|
||||
|
||||
io.of("/").adapter.on("error", (err) => {
|
||||
if (err.name === "MaxRetriesPerRequestError") {
|
||||
console.error(`Redis error: ${err.message}. Shutting down now.`);
|
||||
throw err;
|
||||
} else {
|
||||
console.error(`Redis error: ${err.message}`);
|
||||
}
|
||||
});
|
||||
|
||||
switch (event.name) {
|
||||
case "documents.publish":
|
||||
case "documents.restore":
|
||||
case "documents.archive":
|
||||
case "documents.unarchive": {
|
||||
const document = await Document.findByPk(event.documentId, {
|
||||
paranoid: false,
|
||||
});
|
||||
io.on("connection", (socket) => {
|
||||
metrics.increment("websockets.connected");
|
||||
metrics.gaugePerInstance(
|
||||
"websockets.count",
|
||||
socket.client.conn.server.clientsCount
|
||||
);
|
||||
|
||||
const channel = document.publishedAt
|
||||
? `collection-${document.collectionId}`
|
||||
: `user-${event.actorId}`;
|
||||
socket.on("disconnect", () => {
|
||||
metrics.increment("websockets.disconnected");
|
||||
metrics.gaugePerInstance(
|
||||
"websockets.count",
|
||||
socket.client.conn.server.clientsCount
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
return socketio.to(channel).emit("entities", {
|
||||
event: event.name,
|
||||
documentIds: [
|
||||
{
|
||||
id: document.id,
|
||||
updatedAt: document.updatedAt,
|
||||
},
|
||||
],
|
||||
collectionIds: [
|
||||
{
|
||||
id: document.collectionId,
|
||||
},
|
||||
],
|
||||
});
|
||||
SocketAuth(io, {
|
||||
authenticate: async (socket, data, callback) => {
|
||||
const { token } = data;
|
||||
|
||||
try {
|
||||
const user = await getUserForJWT(token);
|
||||
socket.client.user = user;
|
||||
|
||||
// store the mapping between socket id and user id in redis
|
||||
// so that it is accessible across multiple server nodes
|
||||
await client.hset(socket.id, "userId", user.id);
|
||||
|
||||
return callback(null, true);
|
||||
} catch (err) {
|
||||
return callback(err);
|
||||
}
|
||||
case "documents.delete": {
|
||||
const document = await Document.findByPk(event.documentId, {
|
||||
paranoid: false,
|
||||
});
|
||||
},
|
||||
postAuthenticate: async (socket, data) => {
|
||||
const { user } = socket.client;
|
||||
|
||||
if (!document.publishedAt) {
|
||||
return socketio.to(`user-${document.createdById}`).emit("entities", {
|
||||
event: event.name,
|
||||
documentIds: [
|
||||
{
|
||||
id: document.id,
|
||||
updatedAt: document.updatedAt,
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
// the rooms associated with the current team
|
||||
// and user so we can send authenticated events
|
||||
let rooms = [`team-${user.teamId}`, `user-${user.id}`];
|
||||
|
||||
return socketio
|
||||
.to(`collection-${document.collectionId}`)
|
||||
.emit("entities", {
|
||||
event: event.name,
|
||||
documentIds: [
|
||||
{
|
||||
id: document.id,
|
||||
updatedAt: document.updatedAt,
|
||||
},
|
||||
],
|
||||
collectionIds: [
|
||||
{
|
||||
id: document.collectionId,
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
case "documents.permanent_delete": {
|
||||
return socketio
|
||||
.to(`collection-${event.collectionId}`)
|
||||
.emit(event.name, {
|
||||
documentId: event.documentId,
|
||||
});
|
||||
}
|
||||
case "documents.pin":
|
||||
case "documents.unpin":
|
||||
case "documents.update": {
|
||||
const document = await Document.findByPk(event.documentId, {
|
||||
paranoid: false,
|
||||
});
|
||||
// the rooms associated with collections this user
|
||||
// has access to on connection. New collection subscriptions
|
||||
// are managed from the client as needed through the 'join' event
|
||||
const collectionIds = await user.collectionIds();
|
||||
collectionIds.forEach((collectionId) =>
|
||||
rooms.push(`collection-${collectionId}`)
|
||||
);
|
||||
|
||||
const channel = document.publishedAt
|
||||
? `collection-${document.collectionId}`
|
||||
: `user-${event.actorId}`;
|
||||
// join all of the rooms at once
|
||||
socket.join(rooms);
|
||||
|
||||
return socketio.to(channel).emit("entities", {
|
||||
event: event.name,
|
||||
documentIds: [
|
||||
{
|
||||
id: document.id,
|
||||
updatedAt: document.updatedAt,
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
case "documents.create": {
|
||||
const document = await Document.findByPk(event.documentId);
|
||||
|
||||
return socketio.to(`user-${event.actorId}`).emit("entities", {
|
||||
event: event.name,
|
||||
documentIds: [
|
||||
{
|
||||
id: document.id,
|
||||
updatedAt: document.updatedAt,
|
||||
},
|
||||
],
|
||||
collectionIds: [
|
||||
{
|
||||
id: document.collectionId,
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
case "documents.star":
|
||||
case "documents.unstar": {
|
||||
return socketio.to(`user-${event.actorId}`).emit(event.name, {
|
||||
documentId: event.documentId,
|
||||
});
|
||||
}
|
||||
case "documents.move": {
|
||||
const documents = await Document.findAll({
|
||||
where: {
|
||||
id: event.data.documentIds,
|
||||
},
|
||||
paranoid: false,
|
||||
});
|
||||
documents.forEach((document) => {
|
||||
socketio.to(`collection-${document.collectionId}`).emit("entities", {
|
||||
event: event.name,
|
||||
documentIds: [
|
||||
{
|
||||
id: document.id,
|
||||
updatedAt: document.updatedAt,
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
event.data.collectionIds.forEach((collectionId) => {
|
||||
socketio.to(`collection-${collectionId}`).emit("entities", {
|
||||
event: event.name,
|
||||
collectionIds: [{ id: collectionId }],
|
||||
});
|
||||
});
|
||||
return;
|
||||
}
|
||||
case "collections.create": {
|
||||
const collection = await Collection.findByPk(event.collectionId, {
|
||||
paranoid: false,
|
||||
});
|
||||
|
||||
socketio
|
||||
.to(
|
||||
collection.permission
|
||||
? `team-${collection.teamId}`
|
||||
: `collection-${collection.id}`
|
||||
)
|
||||
.emit("entities", {
|
||||
event: event.name,
|
||||
collectionIds: [
|
||||
{
|
||||
id: collection.id,
|
||||
updatedAt: collection.updatedAt,
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
return socketio
|
||||
.to(
|
||||
collection.permission
|
||||
? `team-${collection.teamId}`
|
||||
: `collection-${collection.id}`
|
||||
)
|
||||
.emit("join", {
|
||||
event: event.name,
|
||||
collectionId: collection.id,
|
||||
});
|
||||
}
|
||||
case "collections.update":
|
||||
case "collections.delete": {
|
||||
const collection = await Collection.findByPk(event.collectionId, {
|
||||
paranoid: false,
|
||||
});
|
||||
|
||||
return socketio.to(`team-${collection.teamId}`).emit("entities", {
|
||||
event: event.name,
|
||||
collectionIds: [
|
||||
{
|
||||
id: collection.id,
|
||||
updatedAt: collection.updatedAt,
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
|
||||
case "collections.move": {
|
||||
return socketio
|
||||
.to(`collection-${event.collectionId}`)
|
||||
.emit("collections.update_index", {
|
||||
collectionId: event.collectionId,
|
||||
index: event.data.index,
|
||||
});
|
||||
}
|
||||
|
||||
case "collections.add_user": {
|
||||
// the user being added isn't yet in the websocket channel for the collection
|
||||
// so they need to be notified separately
|
||||
socketio.to(`user-${event.userId}`).emit(event.name, {
|
||||
event: event.name,
|
||||
userId: event.userId,
|
||||
collectionId: event.collectionId,
|
||||
});
|
||||
|
||||
// let everyone with access to the collection know a user was added
|
||||
socketio.to(`collection-${event.collectionId}`).emit(event.name, {
|
||||
event: event.name,
|
||||
userId: event.userId,
|
||||
collectionId: event.collectionId,
|
||||
});
|
||||
|
||||
// tell any user clients to connect to the websocket channel for the collection
|
||||
return socketio.to(`user-${event.userId}`).emit("join", {
|
||||
event: event.name,
|
||||
collectionId: event.collectionId,
|
||||
});
|
||||
}
|
||||
case "collections.remove_user": {
|
||||
const membershipUserIds = await Collection.membershipUserIds(
|
||||
event.collectionId
|
||||
);
|
||||
|
||||
if (membershipUserIds.includes(event.userId)) {
|
||||
// Even though we just removed a user from the collection
|
||||
// the user still has access through some means
|
||||
// treat this like an add, so that the client re-syncs policies
|
||||
socketio.to(`user-${event.userId}`).emit("collections.add_user", {
|
||||
event: "collections.add_user",
|
||||
userId: event.userId,
|
||||
collectionId: event.collectionId,
|
||||
});
|
||||
} else {
|
||||
// let everyone with access to the collection know a user was removed
|
||||
socketio
|
||||
.to(`collection-${event.collectionId}`)
|
||||
.emit("collections.remove_user", {
|
||||
event: event.name,
|
||||
userId: event.userId,
|
||||
collectionId: event.collectionId,
|
||||
});
|
||||
|
||||
// tell any user clients to disconnect from the websocket channel for the collection
|
||||
socketio.to(`user-${event.userId}`).emit("leave", {
|
||||
event: event.name,
|
||||
collectionId: event.collectionId,
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
case "collections.add_group": {
|
||||
const group = await Group.findByPk(event.data.groupId);
|
||||
|
||||
// the users being added are not yet in the websocket channel for the collection
|
||||
// so they need to be notified separately
|
||||
for (const groupMembership of group.groupMemberships) {
|
||||
socketio
|
||||
.to(`user-${groupMembership.userId}`)
|
||||
.emit("collections.add_user", {
|
||||
event: event.name,
|
||||
userId: groupMembership.userId,
|
||||
collectionId: event.collectionId,
|
||||
});
|
||||
|
||||
// tell any user clients to connect to the websocket channel for the collection
|
||||
socketio.to(`user-${groupMembership.userId}`).emit("join", {
|
||||
event: event.name,
|
||||
collectionId: event.collectionId,
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
case "collections.remove_group": {
|
||||
const group = await Group.findByPk(event.data.groupId);
|
||||
const membershipUserIds = await Collection.membershipUserIds(
|
||||
event.collectionId
|
||||
);
|
||||
|
||||
for (const groupMembership of group.groupMemberships) {
|
||||
if (membershipUserIds.includes(groupMembership.userId)) {
|
||||
// the user still has access through some means...
|
||||
// treat this like an add, so that the client re-syncs policies
|
||||
socketio
|
||||
.to(`user-${groupMembership.userId}`)
|
||||
.emit("collections.add_user", {
|
||||
event: event.name,
|
||||
userId: groupMembership.userId,
|
||||
collectionId: event.collectionId,
|
||||
});
|
||||
} else {
|
||||
// let users in the channel know they were removed
|
||||
socketio
|
||||
.to(`user-${groupMembership.userId}`)
|
||||
.emit("collections.remove_user", {
|
||||
event: event.name,
|
||||
userId: groupMembership.userId,
|
||||
collectionId: event.collectionId,
|
||||
});
|
||||
|
||||
// tell any user clients to disconnect to the websocket channel for the collection
|
||||
socketio.to(`user-${groupMembership.userId}`).emit("leave", {
|
||||
event: event.name,
|
||||
collectionId: event.collectionId,
|
||||
});
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
case "groups.create":
|
||||
case "groups.update": {
|
||||
const group = await Group.findByPk(event.modelId, {
|
||||
paranoid: false,
|
||||
});
|
||||
|
||||
return socketio.to(`team-${group.teamId}`).emit("entities", {
|
||||
event: event.name,
|
||||
groupIds: [
|
||||
{
|
||||
id: group.id,
|
||||
updatedAt: group.updatedAt,
|
||||
},
|
||||
],
|
||||
});
|
||||
}
|
||||
case "groups.add_user": {
|
||||
// do an add user for every collection that the group is a part of
|
||||
const collectionGroupMemberships = await CollectionGroup.findAll({
|
||||
where: { groupId: event.modelId },
|
||||
});
|
||||
|
||||
for (const collectionGroup of collectionGroupMemberships) {
|
||||
// the user being added isn't yet in the websocket channel for the collection
|
||||
// so they need to be notified separately
|
||||
socketio.to(`user-${event.userId}`).emit("collections.add_user", {
|
||||
event: event.name,
|
||||
userId: event.userId,
|
||||
collectionId: collectionGroup.collectionId,
|
||||
});
|
||||
|
||||
// let everyone with access to the collection know a user was added
|
||||
socketio
|
||||
.to(`collection-${collectionGroup.collectionId}`)
|
||||
.emit("collections.add_user", {
|
||||
event: event.name,
|
||||
userId: event.userId,
|
||||
collectionId: collectionGroup.collectionId,
|
||||
});
|
||||
|
||||
// tell any user clients to connect to the websocket channel for the collection
|
||||
return socketio.to(`user-${event.userId}`).emit("join", {
|
||||
event: event.name,
|
||||
collectionId: collectionGroup.collectionId,
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
case "groups.remove_user": {
|
||||
const collectionGroupMemberships = await CollectionGroup.findAll({
|
||||
where: { groupId: event.modelId },
|
||||
});
|
||||
|
||||
for (const collectionGroup of collectionGroupMemberships) {
|
||||
// if the user has any memberships remaining on the collection
|
||||
// we need to emit add instead of remove
|
||||
// allow the client to request to join rooms
|
||||
socket.on("join", async (event) => {
|
||||
// user is joining a collection channel, because their permissions have
|
||||
// changed, granting them access.
|
||||
if (event.collectionId) {
|
||||
const collection = await Collection.scope({
|
||||
method: ["withMembership", event.userId],
|
||||
}).findByPk(collectionGroup.collectionId);
|
||||
method: ["withMembership", user.id],
|
||||
}).findByPk(event.collectionId);
|
||||
|
||||
if (!collection) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const hasMemberships =
|
||||
collection.memberships.length > 0 ||
|
||||
collection.collectionGroupMemberships.length > 0;
|
||||
|
||||
if (hasMemberships) {
|
||||
// the user still has access through some means...
|
||||
// treat this like an add, so that the client re-syncs policies
|
||||
socketio.to(`user-${event.userId}`).emit("collections.add_user", {
|
||||
event: event.name,
|
||||
userId: event.userId,
|
||||
collectionId: collectionGroup.collectionId,
|
||||
});
|
||||
} else {
|
||||
// let everyone with access to the collection know a user was removed
|
||||
socketio
|
||||
.to(`collection-${collectionGroup.collectionId}`)
|
||||
.emit("collections.remove_user", {
|
||||
event: event.name,
|
||||
userId: event.userId,
|
||||
collectionId: collectionGroup.collectionId,
|
||||
});
|
||||
|
||||
// tell any user clients to disconnect from the websocket channel for the collection
|
||||
socketio.to(`user-${event.userId}`).emit("leave", {
|
||||
event: event.name,
|
||||
collectionId: collectionGroup.collectionId,
|
||||
if (can(user, "read", collection)) {
|
||||
socket.join(`collection-${event.collectionId}`, () => {
|
||||
metrics.increment("websockets.collections.join");
|
||||
});
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
case "groups.delete": {
|
||||
const group = await Group.findByPk(event.modelId, {
|
||||
paranoid: false,
|
||||
});
|
||||
|
||||
socketio.to(`team-${group.teamId}`).emit("entities", {
|
||||
event: event.name,
|
||||
groupIds: [
|
||||
{
|
||||
id: group.id,
|
||||
updatedAt: group.updatedAt,
|
||||
},
|
||||
],
|
||||
});
|
||||
// user is joining a document channel, because they have navigated to
|
||||
// view a document.
|
||||
if (event.documentId) {
|
||||
const document = await Document.findByPk(event.documentId, {
|
||||
userId: user.id,
|
||||
});
|
||||
|
||||
// we the users and collection relations that were just severed as a result of the group deletion
|
||||
// since there are cascading deletes, we approximate this by looking for the recently deleted
|
||||
// items in the GroupUser and CollectionGroup tables
|
||||
const groupUsers = await GroupUser.findAll({
|
||||
paranoid: false,
|
||||
where: {
|
||||
groupId: event.modelId,
|
||||
deletedAt: {
|
||||
[Op.gt]: subHours(new Date(), 1),
|
||||
},
|
||||
},
|
||||
});
|
||||
if (can(user, "read", document)) {
|
||||
const room = `document-${event.documentId}`;
|
||||
|
||||
const collectionGroupMemberships = await CollectionGroup.findAll({
|
||||
paranoid: false,
|
||||
where: {
|
||||
groupId: event.modelId,
|
||||
deletedAt: {
|
||||
[Op.gt]: subHours(new Date(), 1),
|
||||
},
|
||||
},
|
||||
});
|
||||
await View.touch(event.documentId, user.id, event.isEditing);
|
||||
const editing = await View.findRecentlyEditingByDocument(
|
||||
event.documentId
|
||||
);
|
||||
|
||||
for (const collectionGroup of collectionGroupMemberships) {
|
||||
const membershipUserIds = await Collection.membershipUserIds(
|
||||
collectionGroup.collectionId
|
||||
socket.join(room, () => {
|
||||
metrics.increment("websockets.documents.join");
|
||||
|
||||
// let everyone else in the room know that a new user joined
|
||||
io.to(room).emit("user.join", {
|
||||
userId: user.id,
|
||||
documentId: event.documentId,
|
||||
isEditing: event.isEditing,
|
||||
});
|
||||
|
||||
// let this user know who else is already present in the room
|
||||
io.in(room).clients(async (err, sockets) => {
|
||||
if (err) {
|
||||
if (process.env.SENTRY_DSN) {
|
||||
Sentry.withScope(function (scope) {
|
||||
scope.setExtra("clients", sockets);
|
||||
Sentry.captureException(err);
|
||||
});
|
||||
} else {
|
||||
console.error(err);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// because a single user can have multiple socket connections we
|
||||
// need to make sure that only unique userIds are returned. A Map
|
||||
// makes this easy.
|
||||
let userIds = new Map();
|
||||
for (const socketId of sockets) {
|
||||
const userId = await client.hget(socketId, "userId");
|
||||
userIds.set(userId, userId);
|
||||
}
|
||||
socket.emit("document.presence", {
|
||||
documentId: event.documentId,
|
||||
userIds: Array.from(userIds.keys()),
|
||||
editingIds: editing.map((view) => view.userId),
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// allow the client to request to leave rooms
|
||||
socket.on("leave", (event) => {
|
||||
if (event.collectionId) {
|
||||
socket.leave(`collection-${event.collectionId}`, () => {
|
||||
metrics.increment("websockets.collections.leave");
|
||||
});
|
||||
}
|
||||
if (event.documentId) {
|
||||
const room = `document-${event.documentId}`;
|
||||
socket.leave(room, () => {
|
||||
metrics.increment("websockets.documents.leave");
|
||||
|
||||
io.to(room).emit("user.leave", {
|
||||
userId: user.id,
|
||||
documentId: event.documentId,
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
socket.on("disconnecting", () => {
|
||||
const rooms = Object.keys(socket.rooms);
|
||||
|
||||
rooms.forEach((room) => {
|
||||
if (room.startsWith("document-")) {
|
||||
const documentId = room.replace("document-", "");
|
||||
io.to(room).emit("user.leave", {
|
||||
userId: user.id,
|
||||
documentId,
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
socket.on("presence", async (event) => {
|
||||
metrics.increment("websockets.presence");
|
||||
|
||||
const room = `document-${event.documentId}`;
|
||||
|
||||
if (event.documentId && socket.rooms[room]) {
|
||||
const view = await View.touch(
|
||||
event.documentId,
|
||||
user.id,
|
||||
event.isEditing
|
||||
);
|
||||
view.user = user;
|
||||
|
||||
for (const groupUser of groupUsers) {
|
||||
if (membershipUserIds.includes(groupUser.userId)) {
|
||||
// the user still has access through some means...
|
||||
// treat this like an add, so that the client re-syncs policies
|
||||
socketio
|
||||
.to(`user-${groupUser.userId}`)
|
||||
.emit("collections.add_user", {
|
||||
event: event.name,
|
||||
userId: groupUser.userId,
|
||||
collectionId: collectionGroup.collectionId,
|
||||
});
|
||||
} else {
|
||||
// let everyone with access to the collection know a user was removed
|
||||
socketio
|
||||
.to(`collection-${collectionGroup.collectionId}`)
|
||||
.emit("collections.remove_user", {
|
||||
event: event.name,
|
||||
userId: groupUser.userId,
|
||||
collectionId: collectionGroup.collectionId,
|
||||
});
|
||||
|
||||
// tell any user clients to disconnect from the websocket channel for the collection
|
||||
socketio.to(`user-${groupUser.userId}`).emit("leave", {
|
||||
event: event.name,
|
||||
collectionId: collectionGroup.collectionId,
|
||||
});
|
||||
}
|
||||
}
|
||||
io.to(room).emit("user.presence", {
|
||||
userId: user.id,
|
||||
documentId: event.documentId,
|
||||
isEditing: event.isEditing,
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
default:
|
||||
}
|
||||
}
|
||||
websocketsQueue.process(async function websocketEventsProcessor(job) {
|
||||
const event = job.data;
|
||||
websockets.on(event, io).catch((error) => {
|
||||
if (env.SENTRY_DSN) {
|
||||
Sentry.withScope(function (scope) {
|
||||
scope.setExtra("event", event);
|
||||
Sentry.captureException(error);
|
||||
});
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
86
server/services/worker.js
Normal file
86
server/services/worker.js
Normal file
@@ -0,0 +1,86 @@
|
||||
// @flow
|
||||
import http from "http";
|
||||
import debug from "debug";
|
||||
import Koa from "koa";
|
||||
import {
|
||||
globalEventQueue,
|
||||
processorEventQueue,
|
||||
websocketsQueue,
|
||||
emailsQueue,
|
||||
} from "../queues";
|
||||
import Backlinks from "../queues/processors/backlinks";
|
||||
import Debouncer from "../queues/processors/debouncer";
|
||||
import Emails from "../queues/processors/emails";
|
||||
import Imports from "../queues/processors/imports";
|
||||
import Notifications from "../queues/processors/notifications";
|
||||
import Revisions from "../queues/processors/revisions";
|
||||
import Slack from "../queues/processors/slack";
|
||||
import Sentry from "../sentry";
|
||||
|
||||
const log = debug("queue");
|
||||
|
||||
const EmailsProcessor = new Emails();
|
||||
|
||||
const eventProcessors = {
|
||||
backlinks: new Backlinks(),
|
||||
debouncer: new Debouncer(),
|
||||
imports: new Imports(),
|
||||
notifications: new Notifications(),
|
||||
revisions: new Revisions(),
|
||||
slack: new Slack(),
|
||||
};
|
||||
|
||||
export default function init(app: Koa, server?: http.Server) {
|
||||
// this queue processes global events and hands them off to services
|
||||
globalEventQueue.process(function (job) {
|
||||
Object.keys(eventProcessors).forEach((name) => {
|
||||
processorEventQueue.add(
|
||||
{ ...job.data, service: name },
|
||||
{ removeOnComplete: true }
|
||||
);
|
||||
});
|
||||
|
||||
websocketsQueue.add(job.data, { removeOnComplete: true });
|
||||
});
|
||||
|
||||
processorEventQueue.process(function (job) {
|
||||
const event = job.data;
|
||||
const processor = eventProcessors[event.service];
|
||||
if (!processor) {
|
||||
console.warn(
|
||||
`Received event for processor that isn't registered (${event.service})`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (processor.on) {
|
||||
log(`${event.service} processing ${event.name}`);
|
||||
|
||||
processor.on(event).catch((error) => {
|
||||
if (process.env.SENTRY_DSN) {
|
||||
Sentry.withScope(function (scope) {
|
||||
scope.setExtra("event", event);
|
||||
Sentry.captureException(error);
|
||||
});
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
emailsQueue.process(function (job) {
|
||||
const event = job.data;
|
||||
|
||||
EmailsProcessor.on(event).catch((error) => {
|
||||
if (process.env.SENTRY_DSN) {
|
||||
Sentry.withScope(function (scope) {
|
||||
scope.setExtra("event", event);
|
||||
Sentry.captureException(error);
|
||||
});
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user