Throttle email notifications upon updating document frequently (#4026)
* feat: add needed columns for throttling notifs * feat: update model * feat: deliver only one notif in a 12 hour window * fix: address review comments * prevent retry if notification update fails * fix type compatibility instead of circumventing it * add index for emailedAt * fix: add metadata attr to EmailProps * chore: decouple metadata from EmailProps * chore: add test * chore: revert sending metadata in props
This commit is contained in:
@@ -1,15 +1,18 @@
|
|||||||
import mailer from "@server/emails/mailer";
|
import mailer from "@server/emails/mailer";
|
||||||
import Logger from "@server/logging/Logger";
|
import Logger from "@server/logging/Logger";
|
||||||
import Metrics from "@server/logging/metrics";
|
import Metrics from "@server/logging/metrics";
|
||||||
|
import Notification from "@server/models/Notification";
|
||||||
import { taskQueue } from "@server/queues";
|
import { taskQueue } from "@server/queues";
|
||||||
import { TaskPriority } from "@server/queues/tasks/BaseTask";
|
import { TaskPriority } from "@server/queues/tasks/BaseTask";
|
||||||
|
import { NotificationMetadata } from "@server/types";
|
||||||
|
|
||||||
interface EmailProps {
|
interface EmailProps {
|
||||||
to: string;
|
to: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export default abstract class BaseEmail<T extends EmailProps, S = any> {
|
export default abstract class BaseEmail<T extends EmailProps, S = unknown> {
|
||||||
private props: T;
|
private props: T;
|
||||||
|
private metadata?: NotificationMetadata;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedule this email type to be sent asyncronously by a worker.
|
* Schedule this email type to be sent asyncronously by a worker.
|
||||||
@@ -17,7 +20,7 @@ export default abstract class BaseEmail<T extends EmailProps, S = any> {
|
|||||||
* @param props Properties to be used in the email template
|
* @param props Properties to be used in the email template
|
||||||
* @returns A promise that resolves once the email is placed on the task queue
|
* @returns A promise that resolves once the email is placed on the task queue
|
||||||
*/
|
*/
|
||||||
public static schedule<T>(props: T) {
|
public static schedule<T>(props: T, metadata?: NotificationMetadata) {
|
||||||
const templateName = this.name;
|
const templateName = this.name;
|
||||||
|
|
||||||
Metrics.increment("email.scheduled", {
|
Metrics.increment("email.scheduled", {
|
||||||
@@ -31,6 +34,7 @@ export default abstract class BaseEmail<T extends EmailProps, S = any> {
|
|||||||
name: "EmailTask",
|
name: "EmailTask",
|
||||||
props: {
|
props: {
|
||||||
templateName,
|
templateName,
|
||||||
|
...metadata,
|
||||||
props,
|
props,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -45,8 +49,9 @@ export default abstract class BaseEmail<T extends EmailProps, S = any> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
constructor(props: T) {
|
constructor(props: T, metadata?: NotificationMetadata) {
|
||||||
this.props = props;
|
this.props = props;
|
||||||
|
this.metadata = metadata;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -86,6 +91,23 @@ export default abstract class BaseEmail<T extends EmailProps, S = any> {
|
|||||||
});
|
});
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.metadata?.notificationId) {
|
||||||
|
try {
|
||||||
|
await Notification.update(
|
||||||
|
{
|
||||||
|
emailedAt: new Date(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
where: {
|
||||||
|
id: this.metadata.notificationId,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
} catch (err) {
|
||||||
|
Logger.error(`Failed to update notification`, err, this.metadata);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -0,0 +1,120 @@
|
|||||||
|
"use strict";
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
async up(queryInterface, Sequelize) {
|
||||||
|
const transaction = await queryInterface.sequelize.transaction();
|
||||||
|
|
||||||
|
try {
|
||||||
|
await queryInterface.addColumn(
|
||||||
|
"notifications",
|
||||||
|
"viewedAt",
|
||||||
|
{
|
||||||
|
type: Sequelize.DATE,
|
||||||
|
allowNull: true,
|
||||||
|
},
|
||||||
|
{ transaction }
|
||||||
|
);
|
||||||
|
|
||||||
|
await queryInterface.addColumn(
|
||||||
|
"notifications",
|
||||||
|
"emailedAt",
|
||||||
|
{
|
||||||
|
type: Sequelize.DATE,
|
||||||
|
allowNull: true,
|
||||||
|
},
|
||||||
|
{ transaction }
|
||||||
|
);
|
||||||
|
|
||||||
|
await queryInterface.addIndex("notifications", ["emailedAt"], {
|
||||||
|
name: "notifications_emailed_at",
|
||||||
|
transaction,
|
||||||
|
});
|
||||||
|
|
||||||
|
await queryInterface.addColumn(
|
||||||
|
"notifications",
|
||||||
|
"teamId",
|
||||||
|
{
|
||||||
|
type: Sequelize.UUID,
|
||||||
|
references: {
|
||||||
|
model: "teams",
|
||||||
|
key: "id",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{ transaction }
|
||||||
|
);
|
||||||
|
|
||||||
|
await queryInterface.addColumn(
|
||||||
|
"notifications",
|
||||||
|
"documentId",
|
||||||
|
{
|
||||||
|
type: Sequelize.UUID,
|
||||||
|
allowNull: true,
|
||||||
|
references: {
|
||||||
|
model: "documents",
|
||||||
|
key: "id",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{ transaction }
|
||||||
|
);
|
||||||
|
|
||||||
|
await queryInterface.changeColumn(
|
||||||
|
"notifications",
|
||||||
|
"actorId",
|
||||||
|
{
|
||||||
|
type: Sequelize.UUID,
|
||||||
|
allowNull: true,
|
||||||
|
},
|
||||||
|
{ transaction }
|
||||||
|
);
|
||||||
|
|
||||||
|
await queryInterface.removeColumn("notifications", "email", {
|
||||||
|
transaction,
|
||||||
|
});
|
||||||
|
|
||||||
|
await transaction.commit();
|
||||||
|
} catch (err) {
|
||||||
|
await transaction.rollback();
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
async down(queryInterface, Sequelize) {
|
||||||
|
const transaction = await queryInterface.sequelize.transaction();
|
||||||
|
|
||||||
|
try {
|
||||||
|
await queryInterface.removeColumn("notifications", "viewedAt", {
|
||||||
|
transaction,
|
||||||
|
});
|
||||||
|
await queryInterface.removeColumn("notifications", "emailedAt", {
|
||||||
|
transaction,
|
||||||
|
});
|
||||||
|
await queryInterface.removeColumn("notifications", "teamId", {
|
||||||
|
transaction,
|
||||||
|
});
|
||||||
|
await queryInterface.removeColumn("notifications", "documentId", {
|
||||||
|
transaction,
|
||||||
|
});
|
||||||
|
await queryInterface.changeColumn(
|
||||||
|
"notifications",
|
||||||
|
"actorId",
|
||||||
|
{
|
||||||
|
type: Sequelize.UUID,
|
||||||
|
allowNull: false,
|
||||||
|
},
|
||||||
|
{ transaction }
|
||||||
|
);
|
||||||
|
await queryInterface.addColumn(
|
||||||
|
"notifications",
|
||||||
|
"email",
|
||||||
|
{
|
||||||
|
type: Sequelize.BOOLEAN,
|
||||||
|
},
|
||||||
|
{ transaction }
|
||||||
|
);
|
||||||
|
await transaction.commit();
|
||||||
|
} catch (err) {
|
||||||
|
await transaction.rollback();
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
@@ -9,7 +9,10 @@ import {
|
|||||||
BelongsTo,
|
BelongsTo,
|
||||||
DataType,
|
DataType,
|
||||||
Default,
|
Default,
|
||||||
|
AllowNull,
|
||||||
} from "sequelize-typescript";
|
} from "sequelize-typescript";
|
||||||
|
import Document from "./Document";
|
||||||
|
import Team from "./Team";
|
||||||
import User from "./User";
|
import User from "./User";
|
||||||
import Fix from "./decorators/Fix";
|
import Fix from "./decorators/Fix";
|
||||||
|
|
||||||
@@ -26,15 +29,20 @@ class Notification extends Model {
|
|||||||
@Column(DataType.UUID)
|
@Column(DataType.UUID)
|
||||||
id: string;
|
id: string;
|
||||||
|
|
||||||
|
@AllowNull
|
||||||
|
@Column
|
||||||
|
emailedAt: Date;
|
||||||
|
|
||||||
|
@AllowNull
|
||||||
|
@Column
|
||||||
|
viewedAt: Date;
|
||||||
|
|
||||||
@CreatedAt
|
@CreatedAt
|
||||||
createdAt: Date;
|
createdAt: Date;
|
||||||
|
|
||||||
@Column
|
@Column
|
||||||
event: string;
|
event: string;
|
||||||
|
|
||||||
@Column
|
|
||||||
email: boolean;
|
|
||||||
|
|
||||||
// associations
|
// associations
|
||||||
|
|
||||||
@BelongsTo(() => User, "userId")
|
@BelongsTo(() => User, "userId")
|
||||||
@@ -47,9 +55,25 @@ class Notification extends Model {
|
|||||||
@BelongsTo(() => User, "actorId")
|
@BelongsTo(() => User, "actorId")
|
||||||
actor: User;
|
actor: User;
|
||||||
|
|
||||||
|
@AllowNull
|
||||||
@ForeignKey(() => User)
|
@ForeignKey(() => User)
|
||||||
@Column(DataType.UUID)
|
@Column(DataType.UUID)
|
||||||
actorId: string;
|
actorId: string;
|
||||||
|
|
||||||
|
@BelongsTo(() => Document, "documentId")
|
||||||
|
document: Document;
|
||||||
|
|
||||||
|
@AllowNull
|
||||||
|
@ForeignKey(() => Document)
|
||||||
|
@Column(DataType.UUID)
|
||||||
|
documentId: string;
|
||||||
|
|
||||||
|
@BelongsTo(() => Team, "teamId")
|
||||||
|
team: Team;
|
||||||
|
|
||||||
|
@ForeignKey(() => Team)
|
||||||
|
@Column(DataType.UUID)
|
||||||
|
teamId: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export default Notification;
|
export default Notification;
|
||||||
|
|||||||
@@ -1,5 +1,11 @@
|
|||||||
import DocumentNotificationEmail from "@server/emails/templates/DocumentNotificationEmail";
|
import DocumentNotificationEmail from "@server/emails/templates/DocumentNotificationEmail";
|
||||||
import { View, NotificationSetting, Subscription, Event } from "@server/models";
|
import {
|
||||||
|
View,
|
||||||
|
NotificationSetting,
|
||||||
|
Subscription,
|
||||||
|
Event,
|
||||||
|
Notification,
|
||||||
|
} from "@server/models";
|
||||||
import {
|
import {
|
||||||
buildDocument,
|
buildDocument,
|
||||||
buildCollection,
|
buildCollection,
|
||||||
@@ -74,6 +80,48 @@ describe("documents.publish", () => {
|
|||||||
expect(DocumentNotificationEmail.schedule).toHaveBeenCalled();
|
expect(DocumentNotificationEmail.schedule).toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("should send only one notification in a 12-hour window", async () => {
|
||||||
|
const user = await buildUser();
|
||||||
|
const document = await buildDocument({
|
||||||
|
teamId: user.teamId,
|
||||||
|
createdById: user.id,
|
||||||
|
lastModifiedById: user.id,
|
||||||
|
});
|
||||||
|
|
||||||
|
const recipient = await buildUser({
|
||||||
|
teamId: user.teamId,
|
||||||
|
});
|
||||||
|
|
||||||
|
await NotificationSetting.create({
|
||||||
|
userId: recipient.id,
|
||||||
|
teamId: recipient.teamId,
|
||||||
|
event: "documents.publish",
|
||||||
|
});
|
||||||
|
|
||||||
|
await Notification.create({
|
||||||
|
actorId: user.id,
|
||||||
|
userId: recipient.id,
|
||||||
|
documentId: document.id,
|
||||||
|
teamId: recipient.teamId,
|
||||||
|
event: "documents.publish",
|
||||||
|
emailedAt: new Date(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const processor = new NotificationsProcessor();
|
||||||
|
await processor.perform({
|
||||||
|
name: "documents.publish",
|
||||||
|
documentId: document.id,
|
||||||
|
collectionId: document.collectionId,
|
||||||
|
teamId: document.teamId,
|
||||||
|
actorId: document.createdById,
|
||||||
|
data: {
|
||||||
|
title: document.title,
|
||||||
|
},
|
||||||
|
ip,
|
||||||
|
});
|
||||||
|
expect(DocumentNotificationEmail.schedule).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
test("should not send a notification to users without collection access", async () => {
|
test("should not send a notification to users without collection access", async () => {
|
||||||
const user = await buildUser();
|
const user = await buildUser();
|
||||||
const collection = await buildCollection({
|
const collection = await buildCollection({
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import { subHours } from "date-fns";
|
||||||
import { uniqBy } from "lodash";
|
import { uniqBy } from "lodash";
|
||||||
import { Op } from "sequelize";
|
import { Op } from "sequelize";
|
||||||
import subscriptionCreator from "@server/commands/subscriptionCreator";
|
import subscriptionCreator from "@server/commands/subscriptionCreator";
|
||||||
@@ -13,6 +14,7 @@ import {
|
|||||||
User,
|
User,
|
||||||
NotificationSetting,
|
NotificationSetting,
|
||||||
Subscription,
|
Subscription,
|
||||||
|
Notification,
|
||||||
} from "@server/models";
|
} from "@server/models";
|
||||||
import {
|
import {
|
||||||
CollectionEvent,
|
CollectionEvent,
|
||||||
@@ -72,16 +74,26 @@ export default class NotificationsProcessor extends BaseProcessor {
|
|||||||
const notify = await this.shouldNotify(document, recipient.user);
|
const notify = await this.shouldNotify(document, recipient.user);
|
||||||
|
|
||||||
if (notify) {
|
if (notify) {
|
||||||
await DocumentNotificationEmail.schedule({
|
const notification = await Notification.create({
|
||||||
to: recipient.user.email,
|
event: event.name,
|
||||||
eventName:
|
userId: recipient.user.id,
|
||||||
event.name === "documents.publish" ? "published" : "updated",
|
actorId: document.updatedBy.id,
|
||||||
|
teamId: team.id,
|
||||||
documentId: document.id,
|
documentId: document.id,
|
||||||
teamUrl: team.url,
|
|
||||||
actorName: document.updatedBy.name,
|
|
||||||
collectionName: collection.name,
|
|
||||||
unsubscribeUrl: recipient.unsubscribeUrl,
|
|
||||||
});
|
});
|
||||||
|
await DocumentNotificationEmail.schedule(
|
||||||
|
{
|
||||||
|
to: recipient.user.email,
|
||||||
|
eventName:
|
||||||
|
event.name === "documents.publish" ? "published" : "updated",
|
||||||
|
documentId: document.id,
|
||||||
|
teamUrl: team.url,
|
||||||
|
actorName: document.updatedBy.name,
|
||||||
|
collectionName: collection.name,
|
||||||
|
unsubscribeUrl: recipient.unsubscribeUrl,
|
||||||
|
},
|
||||||
|
{ notificationId: notification.id }
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -237,6 +249,23 @@ export default class NotificationsProcessor extends BaseProcessor {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deliver only a single notification in a 12 hour window
|
||||||
|
const notification = await Notification.findOne({
|
||||||
|
order: [["createdAt", "DESC"]],
|
||||||
|
where: {
|
||||||
|
userId: user.id,
|
||||||
|
documentId: document.id,
|
||||||
|
emailedAt: {
|
||||||
|
[Op.not]: null,
|
||||||
|
[Op.gte]: subHours(new Date(), 12),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
if (notification) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// If this recipient has viewed the document since the last update was made
|
// If this recipient has viewed the document since the last update was made
|
||||||
// then we can avoid sending them a useless notification, yay.
|
// then we can avoid sending them a useless notification, yay.
|
||||||
const view = await View.findOne({
|
const view = await View.findOne({
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ type Props = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
export default class EmailTask extends BaseTask<Props> {
|
export default class EmailTask extends BaseTask<Props> {
|
||||||
public async perform({ templateName, props }: Props) {
|
public async perform({ templateName, props, ...metadata }: Props) {
|
||||||
const EmailClass = emails[templateName];
|
const EmailClass = emails[templateName];
|
||||||
if (!EmailClass) {
|
if (!EmailClass) {
|
||||||
throw new Error(
|
throw new Error(
|
||||||
@@ -15,7 +15,7 @@ export default class EmailTask extends BaseTask<Props> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const email = new EmailClass(props);
|
const email = new EmailClass(props, metadata);
|
||||||
return email.send();
|
return email.send();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -305,3 +305,7 @@ export type Event =
|
|||||||
| UserEvent
|
| UserEvent
|
||||||
| ViewEvent
|
| ViewEvent
|
||||||
| WebhookSubscriptionEvent;
|
| WebhookSubscriptionEvent;
|
||||||
|
|
||||||
|
export type NotificationMetadata = {
|
||||||
|
notificationId?: string;
|
||||||
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user