chore: Move processing of documents.import to async worker (#6595)
This commit is contained in:
83
server/queues/tasks/DocumentImportTask.ts
Normal file
83
server/queues/tasks/DocumentImportTask.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import { SourceMetadata } from "@shared/types";
|
||||
import documentCreator from "@server/commands/documentCreator";
|
||||
import documentImporter from "@server/commands/documentImporter";
|
||||
import { User } from "@server/models";
|
||||
import { sequelize } from "@server/storage/database";
|
||||
import FileStorage from "@server/storage/files";
|
||||
import BaseTask, { TaskPriority } from "./BaseTask";
|
||||
|
||||
type Props = {
|
||||
userId: string;
|
||||
sourceMetadata: Pick<Required<SourceMetadata>, "fileName" | "mimeType">;
|
||||
publish?: boolean;
|
||||
collectionId?: string;
|
||||
parentDocumentId?: string;
|
||||
ip: string;
|
||||
key: string;
|
||||
};
|
||||
|
||||
export type DocumentImportTaskResponse =
|
||||
| {
|
||||
documentId: string;
|
||||
}
|
||||
| {
|
||||
error: string;
|
||||
};
|
||||
|
||||
export default class DocumentImportTask extends BaseTask<Props> {
|
||||
public async perform({
|
||||
key,
|
||||
sourceMetadata,
|
||||
ip,
|
||||
publish,
|
||||
collectionId,
|
||||
parentDocumentId,
|
||||
userId,
|
||||
}: Props): Promise<DocumentImportTaskResponse> {
|
||||
try {
|
||||
const content = await FileStorage.getFileBuffer(key);
|
||||
|
||||
const document = await sequelize.transaction(async (transaction) => {
|
||||
const user = await User.findByPk(userId, {
|
||||
rejectOnEmpty: true,
|
||||
transaction,
|
||||
});
|
||||
|
||||
const { text, state, title, emoji } = await documentImporter({
|
||||
user,
|
||||
fileName: sourceMetadata.fileName,
|
||||
mimeType: sourceMetadata.mimeType,
|
||||
content,
|
||||
ip,
|
||||
transaction,
|
||||
});
|
||||
|
||||
return documentCreator({
|
||||
sourceMetadata,
|
||||
title,
|
||||
emoji,
|
||||
text,
|
||||
state,
|
||||
publish,
|
||||
collectionId,
|
||||
parentDocumentId,
|
||||
user,
|
||||
ip,
|
||||
transaction,
|
||||
});
|
||||
});
|
||||
return { documentId: document.id };
|
||||
} catch (err) {
|
||||
return { error: err.message };
|
||||
} finally {
|
||||
await FileStorage.deleteFile(key);
|
||||
}
|
||||
}
|
||||
|
||||
public get options() {
|
||||
return {
|
||||
attempts: 1,
|
||||
priority: TaskPriority.Normal,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -7,12 +7,12 @@ import Router from "koa-router";
|
||||
import escapeRegExp from "lodash/escapeRegExp";
|
||||
import mime from "mime-types";
|
||||
import { Op, ScopeOptions, Sequelize, WhereOptions } from "sequelize";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
import { StatusFilter, TeamPreference } from "@shared/types";
|
||||
import { subtractDate } from "@shared/utils/date";
|
||||
import slugify from "@shared/utils/slugify";
|
||||
import documentCreator from "@server/commands/documentCreator";
|
||||
import documentDuplicator from "@server/commands/documentDuplicator";
|
||||
import documentImporter from "@server/commands/documentImporter";
|
||||
import documentLoader from "@server/commands/documentLoader";
|
||||
import documentMover from "@server/commands/documentMover";
|
||||
import documentPermanentDeleter from "@server/commands/documentPermanentDeleter";
|
||||
@@ -43,6 +43,7 @@ import {
|
||||
View,
|
||||
UserMembership,
|
||||
} from "@server/models";
|
||||
import AttachmentHelper from "@server/models/helpers/AttachmentHelper";
|
||||
import DocumentHelper from "@server/models/helpers/DocumentHelper";
|
||||
import SearchHelper from "@server/models/helpers/SearchHelper";
|
||||
import { authorize, cannot } from "@server/policies";
|
||||
@@ -54,6 +55,10 @@ import {
|
||||
presentPublicTeam,
|
||||
presentUser,
|
||||
} from "@server/presenters";
|
||||
import DocumentImportTask, {
|
||||
DocumentImportTaskResponse,
|
||||
} from "@server/queues/tasks/DocumentImportTask";
|
||||
import FileStorage from "@server/storage/files";
|
||||
import { APIContext } from "@server/types";
|
||||
import { RateLimiterStrategy } from "@server/utils/RateLimiter";
|
||||
import ZipHelper from "@server/utils/ZipHelper";
|
||||
@@ -1316,12 +1321,9 @@ router.post(
|
||||
rateLimiter(RateLimiterStrategy.TwentyFivePerMinute),
|
||||
validate(T.DocumentsImportSchema),
|
||||
multipart({ maximumFileSize: env.FILE_STORAGE_IMPORT_MAX_SIZE }),
|
||||
transaction(),
|
||||
async (ctx: APIContext<T.DocumentsImportReq>) => {
|
||||
const { collectionId, parentDocumentId, publish } = ctx.input.body;
|
||||
const file = ctx.input.file;
|
||||
|
||||
const { transaction } = ctx.state;
|
||||
const { user } = ctx.state.auth;
|
||||
|
||||
const collection = await Collection.scope({
|
||||
@@ -1331,7 +1333,6 @@ router.post(
|
||||
id: collectionId,
|
||||
teamId: user.teamId,
|
||||
},
|
||||
transaction,
|
||||
});
|
||||
authorize(user, "createDocument", collection);
|
||||
let parentDocument;
|
||||
@@ -1339,42 +1340,51 @@ router.post(
|
||||
if (parentDocumentId) {
|
||||
parentDocument = await Document.findByPk(parentDocumentId, {
|
||||
userId: user.id,
|
||||
transaction,
|
||||
});
|
||||
authorize(user, "read", parentDocument);
|
||||
}
|
||||
|
||||
const content = await fs.readFile(file.filepath);
|
||||
const buffer = await fs.readFile(file.filepath);
|
||||
const fileName = file.originalFilename ?? file.newFilename;
|
||||
const mimeType = file.mimetype ?? "";
|
||||
const acl = "private";
|
||||
|
||||
const { text, state, title, emoji } = await documentImporter({
|
||||
user,
|
||||
fileName,
|
||||
mimeType,
|
||||
content,
|
||||
ip: ctx.request.ip,
|
||||
transaction,
|
||||
const key = AttachmentHelper.getKey({
|
||||
acl,
|
||||
id: uuidv4(),
|
||||
name: fileName,
|
||||
userId: user.id,
|
||||
});
|
||||
|
||||
const document = await documentCreator({
|
||||
await FileStorage.store({
|
||||
body: buffer,
|
||||
contentType: mimeType,
|
||||
contentLength: buffer.length,
|
||||
key,
|
||||
acl,
|
||||
});
|
||||
|
||||
const job = await DocumentImportTask.schedule({
|
||||
key,
|
||||
sourceMetadata: {
|
||||
fileName,
|
||||
mimeType,
|
||||
},
|
||||
title,
|
||||
emoji,
|
||||
text,
|
||||
state,
|
||||
publish,
|
||||
userId: user.id,
|
||||
collectionId,
|
||||
parentDocumentId,
|
||||
user,
|
||||
publish,
|
||||
ip: ctx.request.ip,
|
||||
transaction,
|
||||
});
|
||||
const response: DocumentImportTaskResponse = await job.finished();
|
||||
if ("error" in response) {
|
||||
throw InvalidRequestError(response.error);
|
||||
}
|
||||
|
||||
document.collection = collection;
|
||||
const document = await Document.findByPk(response.documentId, {
|
||||
userId: user.id,
|
||||
rejectOnEmpty: true,
|
||||
});
|
||||
|
||||
ctx.body = {
|
||||
data: await presentDocument(document),
|
||||
|
||||
@@ -143,7 +143,7 @@ export default function init() {
|
||||
const task = new TaskClass();
|
||||
|
||||
try {
|
||||
await task.perform(props);
|
||||
return await task.perform(props);
|
||||
} catch (err) {
|
||||
Logger.error(`Error processing task in ${name}`, err, props);
|
||||
throw err;
|
||||
|
||||
Reference in New Issue
Block a user