From eeb8008927e81330299652a4addec6be2ca4b2bb Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Sun, 1 May 2022 21:06:07 -0700 Subject: [PATCH] chore: Refactor collection export to match import (#3483) * chore: Refactor collection export to use FileOperations processor and task * Tweak options --- server/commands/collectionExporter.ts | 59 +++++--- server/queues/processors/ExportsProcessor.ts | 126 ----------------- .../processors/FileOperationsProcessor.ts | 12 ++ server/queues/tasks/ExportMarkdownZipTask.ts | 129 ++++++++++++++++++ server/queues/tasks/ImportTask.ts | 10 +- server/routes/api/collections.ts | 24 ++-- server/test/factories.ts | 8 +- server/types.ts | 21 +-- server/utils/zip.ts | 2 +- 9 files changed, 210 insertions(+), 181 deletions(-) delete mode 100644 server/queues/processors/ExportsProcessor.ts create mode 100644 server/queues/tasks/ExportMarkdownZipTask.ts diff --git a/server/commands/collectionExporter.ts b/server/commands/collectionExporter.ts index 53a2eb86f..de00418fc 100644 --- a/server/commands/collectionExporter.ts +++ b/server/commands/collectionExporter.ts @@ -1,5 +1,11 @@ +import { Transaction } from "sequelize"; import { APM } from "@server/logging/tracing"; import { Collection, Event, Team, User, FileOperation } from "@server/models"; +import { + FileOperationType, + FileOperationState, + FileOperationFormat, +} from "@server/models/FileOperation"; import { getAWSKeyForFileOp } from "@server/utils/s3"; async function collectionExporter({ @@ -7,34 +13,49 @@ async function collectionExporter({ team, user, ip, + transaction, }: { collection?: Collection; team: Team; user: User; ip: string; + transaction: Transaction; }) { const collectionId = collection?.id; const key = getAWSKeyForFileOp(user.teamId, collection?.name || team.name); - const fileOperation = await FileOperation.create({ - type: "export", - state: "creating", - key, - url: null, - size: 0, - collectionId, - userId: user.id, - teamId: user.teamId, - }); + const fileOperation = await FileOperation.create( + { + type: FileOperationType.Export, + state: FileOperationState.Creating, + format: FileOperationFormat.MarkdownZip, + key, + url: null, + size: 0, + collectionId, + userId: user.id, + teamId: user.teamId, + }, + { + transaction, + } + ); - // Event is consumed on worker in queues/processors/exports - await Event.create({ - name: collection ? "collections.export" : "collections.export_all", - collectionId, - teamId: user.teamId, - actorId: user.id, - modelId: fileOperation.id, - ip, - }); + await Event.create( + { + name: "fileOperations.create", + teamId: user.teamId, + actorId: user.id, + modelId: fileOperation.id, + collectionId, + ip, + data: { + type: FileOperationType.Import, + }, + }, + { + transaction, + } + ); fileOperation.user = user; diff --git a/server/queues/processors/ExportsProcessor.ts b/server/queues/processors/ExportsProcessor.ts deleted file mode 100644 index d81a1c773..000000000 --- a/server/queues/processors/ExportsProcessor.ts +++ /dev/null @@ -1,126 +0,0 @@ -import fs from "fs"; -import invariant from "invariant"; -import ExportFailureEmail from "@server/emails/templates/ExportFailureEmail"; -import ExportSuccessEmail from "@server/emails/templates/ExportSuccessEmail"; -import Logger from "@server/logging/logger"; -import { FileOperation, Collection, Event, Team, User } from "@server/models"; -import { FileOperationState } from "@server/models/FileOperation"; -import { Event as TEvent } from "@server/types"; -import { uploadToS3FromBuffer } from "@server/utils/s3"; -import { archiveCollections } from "@server/utils/zip"; -import BaseProcessor from "./BaseProcessor"; - -export default class ExportsProcessor extends BaseProcessor { - static applicableEvents: TEvent["name"][] = [ - "collections.export", - "collections.export_all", - ]; - - async perform(event: TEvent) { - switch (event.name) { - case "collections.export": - case "collections.export_all": { - const { actorId, teamId } = event; - const team = await Team.findByPk(teamId); - invariant(team, "team operation not found"); - - const user = await User.findByPk(actorId); - invariant(user, "user operation not found"); - - const fileOperation = await FileOperation.findByPk(event.modelId); - invariant(fileOperation, "fileOperation not found"); - - const collectionIds = - "collectionId" in event && event.collectionId - ? event.collectionId - : await user.collectionIds(); - - const collections = await Collection.findAll({ - where: { - id: collectionIds, - }, - }); - - this.updateFileOperation(fileOperation, actorId, teamId, { - state: FileOperationState.Creating, - }); - // heavy lifting of creating the zip file - Logger.info( - "processor", - `Archiving collections for file operation ${fileOperation.id}` - ); - const filePath = await archiveCollections(collections); - let url; - let state = FileOperationState.Creating; - - try { - // @ts-expect-error ts-migrate(2769) FIXME: No overload matches this call. - const readBuffer = await fs.promises.readFile(filePath); - // @ts-expect-error ts-migrate(2769) FIXME: No overload matches this call. - const stat = await fs.promises.stat(filePath); - this.updateFileOperation(fileOperation, actorId, teamId, { - state: FileOperationState.Uploading, - size: stat.size, - }); - Logger.info( - "processor", - `Uploading archive for file operation ${fileOperation.id}` - ); - url = await uploadToS3FromBuffer( - readBuffer, - "application/zip", - fileOperation.key, - "private" - ); - Logger.info( - "processor", - `Upload complete for file operation ${fileOperation.id}` - ); - state = FileOperationState.Complete; - } catch (error) { - Logger.error("Error exporting collection data", error, { - fileOperationId: fileOperation.id, - }); - state = FileOperationState.Error; - url = undefined; - } finally { - this.updateFileOperation(fileOperation, actorId, teamId, { - state, - url, - }); - - if (state === FileOperationState.Error) { - await ExportFailureEmail.schedule({ - to: user.email, - teamUrl: team.url, - }); - } else { - await ExportSuccessEmail.schedule({ - to: user.email, - id: fileOperation.id, - teamUrl: team.url, - }); - } - } - - break; - } - default: - } - } - - async updateFileOperation( - fileOperation: FileOperation, - actorId: string, - teamId: string, - data: Partial - ) { - await fileOperation.update(data); - await Event.schedule({ - name: "fileOperations.update", - teamId, - actorId, - modelId: fileOperation.id, - }); - } -} diff --git a/server/queues/processors/FileOperationsProcessor.ts b/server/queues/processors/FileOperationsProcessor.ts index fffd98cee..933f85750 100644 --- a/server/queues/processors/FileOperationsProcessor.ts +++ b/server/queues/processors/FileOperationsProcessor.ts @@ -5,6 +5,7 @@ import { FileOperationType, } from "@server/models/FileOperation"; import { Event as TEvent, FileOperationEvent } from "@server/types"; +import ExportMarkdownZipTask from "../tasks/ExportMarkdownZipTask"; import ImportMarkdownZipTask from "../tasks/ImportMarkdownZipTask"; import ImportNotionTask from "../tasks/ImportNotionTask"; import BaseProcessor from "./BaseProcessor"; @@ -36,5 +37,16 @@ export default class FileOperationsProcessor extends BaseProcessor { default: } } + + if (fileOperation.type === FileOperationType.Export) { + switch (fileOperation.format) { + case FileOperationFormat.MarkdownZip: + await ExportMarkdownZipTask.schedule({ + fileOperationId: event.modelId, + }); + break; + default: + } + } } } diff --git a/server/queues/tasks/ExportMarkdownZipTask.ts b/server/queues/tasks/ExportMarkdownZipTask.ts new file mode 100644 index 000000000..bce34f3ed --- /dev/null +++ b/server/queues/tasks/ExportMarkdownZipTask.ts @@ -0,0 +1,129 @@ +import fs from "fs"; +import invariant from "invariant"; +import { truncate } from "lodash"; +import ExportFailureEmail from "@server/emails/templates/ExportFailureEmail"; +import ExportSuccessEmail from "@server/emails/templates/ExportSuccessEmail"; +import Logger from "@server/logging/logger"; +import { Collection, Event, FileOperation, Team, User } from "@server/models"; +import { FileOperationState } from "@server/models/FileOperation"; +import { uploadToS3FromBuffer } from "@server/utils/s3"; +import { archiveCollections } from "@server/utils/zip"; +import BaseTask, { TaskPriority } from "./BaseTask"; + +type Props = { + fileOperationId: string; +}; + +export default class ExportMarkdownZipTask extends BaseTask { + /** + * Runs the export task. + * + * @param props The props + */ + public async perform({ fileOperationId }: Props) { + const fileOperation = await FileOperation.findByPk(fileOperationId); + invariant(fileOperation, "fileOperation not found"); + + const [team, user] = await Promise.all([ + Team.findByPk(fileOperation.teamId), + User.findByPk(fileOperation.userId), + ]); + invariant(team, "team operation not found"); + invariant(user, "user operation not found"); + + const collectionIds = fileOperation.collectionId + ? [fileOperation.collectionId] + : await user.collectionIds(); + + const collections = await Collection.findAll({ + where: { + id: collectionIds, + }, + }); + + try { + Logger.info("task", `ExportTask processing data for ${fileOperationId}`); + + await this.updateFileOperation( + fileOperation, + FileOperationState.Creating + ); + + const filePath = await archiveCollections(collections); + + Logger.info("task", `ExportTask uploading data for ${fileOperationId}`); + + await this.updateFileOperation( + fileOperation, + FileOperationState.Uploading + ); + + const fileBuffer = await fs.promises.readFile(filePath); + const url = await uploadToS3FromBuffer( + fileBuffer, + "application/zip", + fileOperation.key, + "private" + ); + + await this.updateFileOperation( + fileOperation, + FileOperationState.Complete, + undefined, + url + ); + + await ExportSuccessEmail.schedule({ + to: user.email, + id: fileOperation.id, + teamUrl: team.url, + }); + } catch (error) { + await this.updateFileOperation( + fileOperation, + FileOperationState.Error, + error + ); + await ExportFailureEmail.schedule({ + to: user.email, + teamUrl: team.url, + }); + throw error; + } + } + + /** + * Update the state of the underlying FileOperation in the database and send + * an event to the client. + * + * @param fileOperation The FileOperation to update + */ + private async updateFileOperation( + fileOperation: FileOperation, + state: FileOperationState, + error?: Error, + url?: string + ) { + await fileOperation.update({ + state, + url, + error: error ? truncate(error.message, { length: 255 }) : undefined, + }); + await Event.schedule({ + name: "fileOperations.update", + modelId: fileOperation.id, + teamId: fileOperation.teamId, + actorId: fileOperation.userId, + }); + } + + /** + * Job options such as priority and retry strategy, as defined by Bull. + */ + public get options() { + return { + priority: TaskPriority.Background, + attempts: 2, + }; + } +} diff --git a/server/queues/tasks/ImportTask.ts b/server/queues/tasks/ImportTask.ts index 4417c3940..2a7f97c4e 100644 --- a/server/queues/tasks/ImportTask.ts +++ b/server/queues/tasks/ImportTask.ts @@ -4,7 +4,7 @@ import attachmentCreator from "@server/commands/attachmentCreator"; import documentCreator from "@server/commands/documentCreator"; import { sequelize } from "@server/database/sequelize"; import { ValidationError } from "@server/errors"; -import logger from "@server/logging/logger"; +import Logger from "@server/logging/logger"; import { User, Event, @@ -83,10 +83,10 @@ export default abstract class ImportTask extends BaseTask { invariant(fileOperation, "fileOperation not found"); try { - logger.info("task", `ImportTask fetching data for ${fileOperationId}`); + Logger.info("task", `ImportTask fetching data for ${fileOperationId}`); const data = await this.fetchData(fileOperation); - logger.info("task", `ImportTask parsing data for ${fileOperationId}`); + Logger.info("task", `ImportTask parsing data for ${fileOperationId}`); const parsed = await this.parseData(data, fileOperation); if (parsed.collections.length === 0) { @@ -103,13 +103,13 @@ export default abstract class ImportTask extends BaseTask { let result; try { - logger.info( + Logger.info( "task", `ImportTask persisting data for ${fileOperationId}` ); result = await this.persistData(parsed, fileOperation); } catch (error) { - logger.error( + Logger.error( `ImportTask failed to persist data for ${fileOperationId}`, error ); diff --git a/server/routes/api/collections.ts b/server/routes/api/collections.ts index 33ff0b5ab..b4688355b 100644 --- a/server/routes/api/collections.ts +++ b/server/routes/api/collections.ts @@ -495,11 +495,14 @@ router.post("collections.export", auth(), async (ctx) => { }).findByPk(id); authorize(user, "read", collection); - const fileOperation = await collectionExporter({ - collection, - user, - team, - ip: ctx.request.ip, + const fileOperation = await sequelize.transaction(async (transaction) => { + return collectionExporter({ + collection, + user, + team, + ip: ctx.request.ip, + transaction, + }); }); ctx.body = { @@ -515,10 +518,13 @@ router.post("collections.export_all", auth(), async (ctx) => { const team = await Team.findByPk(user.teamId); authorize(user, "export", team); - const fileOperation = await collectionExporter({ - user, - team, - ip: ctx.request.ip, + const fileOperation = await sequelize.transaction(async (transaction) => { + return collectionExporter({ + user, + team, + ip: ctx.request.ip, + transaction, + }); }); ctx.body = { diff --git a/server/test/factories.ts b/server/test/factories.ts index 447e16c0b..ba3cde030 100644 --- a/server/test/factories.ts +++ b/server/test/factories.ts @@ -15,6 +15,10 @@ import { AuthenticationProvider, FileOperation, } from "@server/models"; +import { + FileOperationState, + FileOperationType, +} from "@server/models/FileOperation"; let count = 1; @@ -319,11 +323,11 @@ export async function buildFileOperation( } return FileOperation.create({ - state: "creating", + state: FileOperationState.Creating, + type: FileOperationType.Export, size: 0, key: "uploads/key/to/file.zip", collectionId: null, - type: "export", url: "https://www.urltos3file.com/file.zip", ...overrides, }); diff --git a/server/types.ts b/server/types.ts index 3eb4e97ee..47fb4f2a7 100644 --- a/server/types.ts +++ b/server/types.ts @@ -104,21 +104,6 @@ export type RevisionEvent = { teamId: string; }; -export type CollectionExportEvent = { - name: "collections.export"; - teamId: string; - actorId: string; - collectionId: string; - modelId: string; -}; - -export type CollectionExportAllEvent = { - name: "collections.export_all"; - teamId: string; - actorId: string; - modelId: string; -}; - export type FileOperationEvent = { name: | "fileOperations.create" @@ -139,7 +124,7 @@ export type FileOperationEvent = { export type CollectionEvent = | { - name: "collections.create" // eslint-disable-line + name: "collections.create" // eslint-disable-line | "collections.update" | "collections.delete"; collectionId: string; @@ -189,8 +174,7 @@ export type CollectionEvent = sharingChanged: boolean; }; ip: string; - } - | CollectionExportEvent; + }; export type GroupEvent = | { @@ -257,7 +241,6 @@ export type Event = | PinEvent | StarEvent | CollectionEvent - | CollectionExportAllEvent | FileOperationEvent | IntegrationEvent | GroupEvent diff --git a/server/utils/zip.ts b/server/utils/zip.ts index d4fac85c9..dc69e0b68 100644 --- a/server/utils/zip.ts +++ b/server/utils/zip.ts @@ -135,7 +135,7 @@ function safeAddFileToArchive( * @param zip JSZip object * @returns pathname of the temporary file where the zip was written to disk */ -async function archiveToPath(zip: JSZip) { +async function archiveToPath(zip: JSZip): Promise { return new Promise((resolve, reject) => { tmp.file( {