chore: Refactor collection export to match import (#3483)
* chore: Refactor collection export to use FileOperations processor and task * Tweak options
This commit is contained in:
@@ -1,5 +1,11 @@
|
||||
import { Transaction } from "sequelize";
|
||||
import { APM } from "@server/logging/tracing";
|
||||
import { Collection, Event, Team, User, FileOperation } from "@server/models";
|
||||
import {
|
||||
FileOperationType,
|
||||
FileOperationState,
|
||||
FileOperationFormat,
|
||||
} from "@server/models/FileOperation";
|
||||
import { getAWSKeyForFileOp } from "@server/utils/s3";
|
||||
|
||||
async function collectionExporter({
|
||||
@@ -7,34 +13,49 @@ async function collectionExporter({
|
||||
team,
|
||||
user,
|
||||
ip,
|
||||
transaction,
|
||||
}: {
|
||||
collection?: Collection;
|
||||
team: Team;
|
||||
user: User;
|
||||
ip: string;
|
||||
transaction: Transaction;
|
||||
}) {
|
||||
const collectionId = collection?.id;
|
||||
const key = getAWSKeyForFileOp(user.teamId, collection?.name || team.name);
|
||||
const fileOperation = await FileOperation.create({
|
||||
type: "export",
|
||||
state: "creating",
|
||||
key,
|
||||
url: null,
|
||||
size: 0,
|
||||
collectionId,
|
||||
userId: user.id,
|
||||
teamId: user.teamId,
|
||||
});
|
||||
const fileOperation = await FileOperation.create(
|
||||
{
|
||||
type: FileOperationType.Export,
|
||||
state: FileOperationState.Creating,
|
||||
format: FileOperationFormat.MarkdownZip,
|
||||
key,
|
||||
url: null,
|
||||
size: 0,
|
||||
collectionId,
|
||||
userId: user.id,
|
||||
teamId: user.teamId,
|
||||
},
|
||||
{
|
||||
transaction,
|
||||
}
|
||||
);
|
||||
|
||||
// Event is consumed on worker in queues/processors/exports
|
||||
await Event.create({
|
||||
name: collection ? "collections.export" : "collections.export_all",
|
||||
collectionId,
|
||||
teamId: user.teamId,
|
||||
actorId: user.id,
|
||||
modelId: fileOperation.id,
|
||||
ip,
|
||||
});
|
||||
await Event.create(
|
||||
{
|
||||
name: "fileOperations.create",
|
||||
teamId: user.teamId,
|
||||
actorId: user.id,
|
||||
modelId: fileOperation.id,
|
||||
collectionId,
|
||||
ip,
|
||||
data: {
|
||||
type: FileOperationType.Import,
|
||||
},
|
||||
},
|
||||
{
|
||||
transaction,
|
||||
}
|
||||
);
|
||||
|
||||
fileOperation.user = user;
|
||||
|
||||
|
||||
@@ -1,126 +0,0 @@
|
||||
import fs from "fs";
|
||||
import invariant from "invariant";
|
||||
import ExportFailureEmail from "@server/emails/templates/ExportFailureEmail";
|
||||
import ExportSuccessEmail from "@server/emails/templates/ExportSuccessEmail";
|
||||
import Logger from "@server/logging/logger";
|
||||
import { FileOperation, Collection, Event, Team, User } from "@server/models";
|
||||
import { FileOperationState } from "@server/models/FileOperation";
|
||||
import { Event as TEvent } from "@server/types";
|
||||
import { uploadToS3FromBuffer } from "@server/utils/s3";
|
||||
import { archiveCollections } from "@server/utils/zip";
|
||||
import BaseProcessor from "./BaseProcessor";
|
||||
|
||||
export default class ExportsProcessor extends BaseProcessor {
|
||||
static applicableEvents: TEvent["name"][] = [
|
||||
"collections.export",
|
||||
"collections.export_all",
|
||||
];
|
||||
|
||||
async perform(event: TEvent) {
|
||||
switch (event.name) {
|
||||
case "collections.export":
|
||||
case "collections.export_all": {
|
||||
const { actorId, teamId } = event;
|
||||
const team = await Team.findByPk(teamId);
|
||||
invariant(team, "team operation not found");
|
||||
|
||||
const user = await User.findByPk(actorId);
|
||||
invariant(user, "user operation not found");
|
||||
|
||||
const fileOperation = await FileOperation.findByPk(event.modelId);
|
||||
invariant(fileOperation, "fileOperation not found");
|
||||
|
||||
const collectionIds =
|
||||
"collectionId" in event && event.collectionId
|
||||
? event.collectionId
|
||||
: await user.collectionIds();
|
||||
|
||||
const collections = await Collection.findAll({
|
||||
where: {
|
||||
id: collectionIds,
|
||||
},
|
||||
});
|
||||
|
||||
this.updateFileOperation(fileOperation, actorId, teamId, {
|
||||
state: FileOperationState.Creating,
|
||||
});
|
||||
// heavy lifting of creating the zip file
|
||||
Logger.info(
|
||||
"processor",
|
||||
`Archiving collections for file operation ${fileOperation.id}`
|
||||
);
|
||||
const filePath = await archiveCollections(collections);
|
||||
let url;
|
||||
let state = FileOperationState.Creating;
|
||||
|
||||
try {
|
||||
// @ts-expect-error ts-migrate(2769) FIXME: No overload matches this call.
|
||||
const readBuffer = await fs.promises.readFile(filePath);
|
||||
// @ts-expect-error ts-migrate(2769) FIXME: No overload matches this call.
|
||||
const stat = await fs.promises.stat(filePath);
|
||||
this.updateFileOperation(fileOperation, actorId, teamId, {
|
||||
state: FileOperationState.Uploading,
|
||||
size: stat.size,
|
||||
});
|
||||
Logger.info(
|
||||
"processor",
|
||||
`Uploading archive for file operation ${fileOperation.id}`
|
||||
);
|
||||
url = await uploadToS3FromBuffer(
|
||||
readBuffer,
|
||||
"application/zip",
|
||||
fileOperation.key,
|
||||
"private"
|
||||
);
|
||||
Logger.info(
|
||||
"processor",
|
||||
`Upload complete for file operation ${fileOperation.id}`
|
||||
);
|
||||
state = FileOperationState.Complete;
|
||||
} catch (error) {
|
||||
Logger.error("Error exporting collection data", error, {
|
||||
fileOperationId: fileOperation.id,
|
||||
});
|
||||
state = FileOperationState.Error;
|
||||
url = undefined;
|
||||
} finally {
|
||||
this.updateFileOperation(fileOperation, actorId, teamId, {
|
||||
state,
|
||||
url,
|
||||
});
|
||||
|
||||
if (state === FileOperationState.Error) {
|
||||
await ExportFailureEmail.schedule({
|
||||
to: user.email,
|
||||
teamUrl: team.url,
|
||||
});
|
||||
} else {
|
||||
await ExportSuccessEmail.schedule({
|
||||
to: user.email,
|
||||
id: fileOperation.id,
|
||||
teamUrl: team.url,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
async updateFileOperation(
|
||||
fileOperation: FileOperation,
|
||||
actorId: string,
|
||||
teamId: string,
|
||||
data: Partial<FileOperation>
|
||||
) {
|
||||
await fileOperation.update(data);
|
||||
await Event.schedule({
|
||||
name: "fileOperations.update",
|
||||
teamId,
|
||||
actorId,
|
||||
modelId: fileOperation.id,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
FileOperationType,
|
||||
} from "@server/models/FileOperation";
|
||||
import { Event as TEvent, FileOperationEvent } from "@server/types";
|
||||
import ExportMarkdownZipTask from "../tasks/ExportMarkdownZipTask";
|
||||
import ImportMarkdownZipTask from "../tasks/ImportMarkdownZipTask";
|
||||
import ImportNotionTask from "../tasks/ImportNotionTask";
|
||||
import BaseProcessor from "./BaseProcessor";
|
||||
@@ -36,5 +37,16 @@ export default class FileOperationsProcessor extends BaseProcessor {
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
if (fileOperation.type === FileOperationType.Export) {
|
||||
switch (fileOperation.format) {
|
||||
case FileOperationFormat.MarkdownZip:
|
||||
await ExportMarkdownZipTask.schedule({
|
||||
fileOperationId: event.modelId,
|
||||
});
|
||||
break;
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
129
server/queues/tasks/ExportMarkdownZipTask.ts
Normal file
129
server/queues/tasks/ExportMarkdownZipTask.ts
Normal file
@@ -0,0 +1,129 @@
|
||||
import fs from "fs";
|
||||
import invariant from "invariant";
|
||||
import { truncate } from "lodash";
|
||||
import ExportFailureEmail from "@server/emails/templates/ExportFailureEmail";
|
||||
import ExportSuccessEmail from "@server/emails/templates/ExportSuccessEmail";
|
||||
import Logger from "@server/logging/logger";
|
||||
import { Collection, Event, FileOperation, Team, User } from "@server/models";
|
||||
import { FileOperationState } from "@server/models/FileOperation";
|
||||
import { uploadToS3FromBuffer } from "@server/utils/s3";
|
||||
import { archiveCollections } from "@server/utils/zip";
|
||||
import BaseTask, { TaskPriority } from "./BaseTask";
|
||||
|
||||
type Props = {
|
||||
fileOperationId: string;
|
||||
};
|
||||
|
||||
export default class ExportMarkdownZipTask extends BaseTask<Props> {
|
||||
/**
|
||||
* Runs the export task.
|
||||
*
|
||||
* @param props The props
|
||||
*/
|
||||
public async perform({ fileOperationId }: Props) {
|
||||
const fileOperation = await FileOperation.findByPk(fileOperationId);
|
||||
invariant(fileOperation, "fileOperation not found");
|
||||
|
||||
const [team, user] = await Promise.all([
|
||||
Team.findByPk(fileOperation.teamId),
|
||||
User.findByPk(fileOperation.userId),
|
||||
]);
|
||||
invariant(team, "team operation not found");
|
||||
invariant(user, "user operation not found");
|
||||
|
||||
const collectionIds = fileOperation.collectionId
|
||||
? [fileOperation.collectionId]
|
||||
: await user.collectionIds();
|
||||
|
||||
const collections = await Collection.findAll({
|
||||
where: {
|
||||
id: collectionIds,
|
||||
},
|
||||
});
|
||||
|
||||
try {
|
||||
Logger.info("task", `ExportTask processing data for ${fileOperationId}`);
|
||||
|
||||
await this.updateFileOperation(
|
||||
fileOperation,
|
||||
FileOperationState.Creating
|
||||
);
|
||||
|
||||
const filePath = await archiveCollections(collections);
|
||||
|
||||
Logger.info("task", `ExportTask uploading data for ${fileOperationId}`);
|
||||
|
||||
await this.updateFileOperation(
|
||||
fileOperation,
|
||||
FileOperationState.Uploading
|
||||
);
|
||||
|
||||
const fileBuffer = await fs.promises.readFile(filePath);
|
||||
const url = await uploadToS3FromBuffer(
|
||||
fileBuffer,
|
||||
"application/zip",
|
||||
fileOperation.key,
|
||||
"private"
|
||||
);
|
||||
|
||||
await this.updateFileOperation(
|
||||
fileOperation,
|
||||
FileOperationState.Complete,
|
||||
undefined,
|
||||
url
|
||||
);
|
||||
|
||||
await ExportSuccessEmail.schedule({
|
||||
to: user.email,
|
||||
id: fileOperation.id,
|
||||
teamUrl: team.url,
|
||||
});
|
||||
} catch (error) {
|
||||
await this.updateFileOperation(
|
||||
fileOperation,
|
||||
FileOperationState.Error,
|
||||
error
|
||||
);
|
||||
await ExportFailureEmail.schedule({
|
||||
to: user.email,
|
||||
teamUrl: team.url,
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the state of the underlying FileOperation in the database and send
|
||||
* an event to the client.
|
||||
*
|
||||
* @param fileOperation The FileOperation to update
|
||||
*/
|
||||
private async updateFileOperation(
|
||||
fileOperation: FileOperation,
|
||||
state: FileOperationState,
|
||||
error?: Error,
|
||||
url?: string
|
||||
) {
|
||||
await fileOperation.update({
|
||||
state,
|
||||
url,
|
||||
error: error ? truncate(error.message, { length: 255 }) : undefined,
|
||||
});
|
||||
await Event.schedule({
|
||||
name: "fileOperations.update",
|
||||
modelId: fileOperation.id,
|
||||
teamId: fileOperation.teamId,
|
||||
actorId: fileOperation.userId,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Job options such as priority and retry strategy, as defined by Bull.
|
||||
*/
|
||||
public get options() {
|
||||
return {
|
||||
priority: TaskPriority.Background,
|
||||
attempts: 2,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,7 @@ import attachmentCreator from "@server/commands/attachmentCreator";
|
||||
import documentCreator from "@server/commands/documentCreator";
|
||||
import { sequelize } from "@server/database/sequelize";
|
||||
import { ValidationError } from "@server/errors";
|
||||
import logger from "@server/logging/logger";
|
||||
import Logger from "@server/logging/logger";
|
||||
import {
|
||||
User,
|
||||
Event,
|
||||
@@ -83,10 +83,10 @@ export default abstract class ImportTask extends BaseTask<Props> {
|
||||
invariant(fileOperation, "fileOperation not found");
|
||||
|
||||
try {
|
||||
logger.info("task", `ImportTask fetching data for ${fileOperationId}`);
|
||||
Logger.info("task", `ImportTask fetching data for ${fileOperationId}`);
|
||||
const data = await this.fetchData(fileOperation);
|
||||
|
||||
logger.info("task", `ImportTask parsing data for ${fileOperationId}`);
|
||||
Logger.info("task", `ImportTask parsing data for ${fileOperationId}`);
|
||||
const parsed = await this.parseData(data, fileOperation);
|
||||
|
||||
if (parsed.collections.length === 0) {
|
||||
@@ -103,13 +103,13 @@ export default abstract class ImportTask extends BaseTask<Props> {
|
||||
|
||||
let result;
|
||||
try {
|
||||
logger.info(
|
||||
Logger.info(
|
||||
"task",
|
||||
`ImportTask persisting data for ${fileOperationId}`
|
||||
);
|
||||
result = await this.persistData(parsed, fileOperation);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
Logger.error(
|
||||
`ImportTask failed to persist data for ${fileOperationId}`,
|
||||
error
|
||||
);
|
||||
|
||||
@@ -495,11 +495,14 @@ router.post("collections.export", auth(), async (ctx) => {
|
||||
}).findByPk(id);
|
||||
authorize(user, "read", collection);
|
||||
|
||||
const fileOperation = await collectionExporter({
|
||||
collection,
|
||||
user,
|
||||
team,
|
||||
ip: ctx.request.ip,
|
||||
const fileOperation = await sequelize.transaction(async (transaction) => {
|
||||
return collectionExporter({
|
||||
collection,
|
||||
user,
|
||||
team,
|
||||
ip: ctx.request.ip,
|
||||
transaction,
|
||||
});
|
||||
});
|
||||
|
||||
ctx.body = {
|
||||
@@ -515,10 +518,13 @@ router.post("collections.export_all", auth(), async (ctx) => {
|
||||
const team = await Team.findByPk(user.teamId);
|
||||
authorize(user, "export", team);
|
||||
|
||||
const fileOperation = await collectionExporter({
|
||||
user,
|
||||
team,
|
||||
ip: ctx.request.ip,
|
||||
const fileOperation = await sequelize.transaction(async (transaction) => {
|
||||
return collectionExporter({
|
||||
user,
|
||||
team,
|
||||
ip: ctx.request.ip,
|
||||
transaction,
|
||||
});
|
||||
});
|
||||
|
||||
ctx.body = {
|
||||
|
||||
@@ -15,6 +15,10 @@ import {
|
||||
AuthenticationProvider,
|
||||
FileOperation,
|
||||
} from "@server/models";
|
||||
import {
|
||||
FileOperationState,
|
||||
FileOperationType,
|
||||
} from "@server/models/FileOperation";
|
||||
|
||||
let count = 1;
|
||||
|
||||
@@ -319,11 +323,11 @@ export async function buildFileOperation(
|
||||
}
|
||||
|
||||
return FileOperation.create({
|
||||
state: "creating",
|
||||
state: FileOperationState.Creating,
|
||||
type: FileOperationType.Export,
|
||||
size: 0,
|
||||
key: "uploads/key/to/file.zip",
|
||||
collectionId: null,
|
||||
type: "export",
|
||||
url: "https://www.urltos3file.com/file.zip",
|
||||
...overrides,
|
||||
});
|
||||
|
||||
@@ -104,21 +104,6 @@ export type RevisionEvent = {
|
||||
teamId: string;
|
||||
};
|
||||
|
||||
export type CollectionExportEvent = {
|
||||
name: "collections.export";
|
||||
teamId: string;
|
||||
actorId: string;
|
||||
collectionId: string;
|
||||
modelId: string;
|
||||
};
|
||||
|
||||
export type CollectionExportAllEvent = {
|
||||
name: "collections.export_all";
|
||||
teamId: string;
|
||||
actorId: string;
|
||||
modelId: string;
|
||||
};
|
||||
|
||||
export type FileOperationEvent = {
|
||||
name:
|
||||
| "fileOperations.create"
|
||||
@@ -139,7 +124,7 @@ export type FileOperationEvent = {
|
||||
|
||||
export type CollectionEvent =
|
||||
| {
|
||||
name: "collections.create" // eslint-disable-line
|
||||
name: "collections.create" // eslint-disable-line
|
||||
| "collections.update"
|
||||
| "collections.delete";
|
||||
collectionId: string;
|
||||
@@ -189,8 +174,7 @@ export type CollectionEvent =
|
||||
sharingChanged: boolean;
|
||||
};
|
||||
ip: string;
|
||||
}
|
||||
| CollectionExportEvent;
|
||||
};
|
||||
|
||||
export type GroupEvent =
|
||||
| {
|
||||
@@ -257,7 +241,6 @@ export type Event =
|
||||
| PinEvent
|
||||
| StarEvent
|
||||
| CollectionEvent
|
||||
| CollectionExportAllEvent
|
||||
| FileOperationEvent
|
||||
| IntegrationEvent
|
||||
| GroupEvent
|
||||
|
||||
@@ -135,7 +135,7 @@ function safeAddFileToArchive(
|
||||
* @param zip JSZip object
|
||||
* @returns pathname of the temporary file where the zip was written to disk
|
||||
*/
|
||||
async function archiveToPath(zip: JSZip) {
|
||||
async function archiveToPath(zip: JSZip): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
tmp.file(
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user