chore: Various importer improvements (#6519)
* Handle new Notion export format Clear data on file operation delete * fix: Don't restart development server on html upload * fix: Do not send collection created notifications on bulk import * fix: Avoid parellelizing all uploads at once Move import into one transaction per-collection
This commit is contained in:
@@ -7,7 +7,10 @@ import { Event as TEvent, FileOperationEvent } from "@server/types";
|
||||
import BaseProcessor from "./BaseProcessor";
|
||||
|
||||
export default class FileOperationDeletedProcessor extends BaseProcessor {
|
||||
static applicableEvents: TEvent["name"][] = ["fileOperations.delete"];
|
||||
static applicableEvents: TEvent["name"][] = [
|
||||
"fileOperations.delete",
|
||||
"fileOperations.update",
|
||||
];
|
||||
|
||||
async perform(event: FileOperationEvent) {
|
||||
await sequelize.transaction(async (transaction) => {
|
||||
@@ -16,9 +19,22 @@ export default class FileOperationDeletedProcessor extends BaseProcessor {
|
||||
paranoid: false,
|
||||
transaction,
|
||||
});
|
||||
if (fileOperation.type === FileOperationType.Export) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
fileOperation.type === FileOperationType.Export ||
|
||||
fileOperation.state !== FileOperationState.Complete
|
||||
event.name === "fileOperations.update" &&
|
||||
fileOperation.state !== FileOperationState.Error
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
event.name === "fileOperations.delete" &&
|
||||
![FileOperationState.Complete, FileOperationState.Error].includes(
|
||||
fileOperation.state
|
||||
)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -13,6 +13,10 @@ export default class CollectionCreatedNotificationsTask extends BaseTask<Collect
|
||||
return;
|
||||
}
|
||||
|
||||
if ("source" in event.data && event.data.source === "import") {
|
||||
return;
|
||||
}
|
||||
|
||||
const recipients =
|
||||
await NotificationHelper.getCollectionNotificationRecipients(
|
||||
collection,
|
||||
|
||||
@@ -19,6 +19,19 @@ export default class ImportNotionTask extends ImportTask {
|
||||
if (!tree) {
|
||||
throw new Error("Could not find valid content in zip file");
|
||||
}
|
||||
|
||||
// New Notion exports have a single folder with the name of the export, we must skip this
|
||||
// folder and go directly to the children.
|
||||
if (
|
||||
tree.children.length === 1 &&
|
||||
tree.children[0].children.find((child) => child.title === "index")
|
||||
) {
|
||||
return this.parseFileTree(
|
||||
fileOperation,
|
||||
tree.children[0].children.filter((child) => child.title !== "index")
|
||||
);
|
||||
}
|
||||
|
||||
return this.parseFileTree(fileOperation, tree.children);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import path from "path";
|
||||
import fs from "fs-extra";
|
||||
import chunk from "lodash/chunk";
|
||||
import truncate from "lodash/truncate";
|
||||
import tmp from "tmp";
|
||||
import {
|
||||
@@ -288,39 +289,19 @@ export default abstract class ImportTask extends BaseTask<Props> {
|
||||
const documents = new Map<string, Document>();
|
||||
const attachments = new Map<string, Attachment>();
|
||||
|
||||
const user = await User.findByPk(fileOperation.userId, {
|
||||
rejectOnEmpty: true,
|
||||
});
|
||||
const ip = user.lastActiveIp || undefined;
|
||||
|
||||
try {
|
||||
return await sequelize.transaction(async (transaction) => {
|
||||
const user = await User.findByPk(fileOperation.userId, {
|
||||
transaction,
|
||||
rejectOnEmpty: true,
|
||||
});
|
||||
|
||||
const ip = user.lastActiveIp || undefined;
|
||||
|
||||
// Attachments
|
||||
await Promise.all(
|
||||
data.attachments.map(async (item) => {
|
||||
Logger.debug("task", `ImportTask persisting attachment ${item.id}`);
|
||||
const attachment = await attachmentCreator({
|
||||
source: "import",
|
||||
preset: AttachmentPreset.DocumentAttachment,
|
||||
id: item.id,
|
||||
name: item.name,
|
||||
type: item.mimeType,
|
||||
buffer: await item.buffer(),
|
||||
user,
|
||||
ip,
|
||||
transaction,
|
||||
});
|
||||
if (attachment) {
|
||||
attachments.set(item.id, attachment);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
// Collections
|
||||
for (const item of data.collections) {
|
||||
Logger.debug("task", `ImportTask persisting collection ${item.id}`);
|
||||
// Collections
|
||||
for (const item of data.collections) {
|
||||
await sequelize.transaction(async (transaction) => {
|
||||
Logger.debug(
|
||||
"task",
|
||||
`ImportTask persisting collection ${item.name} (${item.id})`
|
||||
);
|
||||
let description = item.description;
|
||||
|
||||
// Description can be markdown text or a Prosemirror object if coming
|
||||
@@ -333,13 +314,9 @@ export default abstract class ImportTask extends BaseTask<Props> {
|
||||
// Check all of the attachments we've created against urls in the text
|
||||
// and replace them out with attachment redirect urls before saving.
|
||||
for (const aitem of data.attachments) {
|
||||
const attachment = attachments.get(aitem.id);
|
||||
if (!attachment) {
|
||||
continue;
|
||||
}
|
||||
description = description.replace(
|
||||
new RegExp(`<<${attachment.id}>>`, "g"),
|
||||
attachment.redirectUrl
|
||||
new RegExp(`<<${aitem.id}>>`, "g"),
|
||||
Attachment.getRedirectUrl(aitem.id)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -437,89 +414,114 @@ export default abstract class ImportTask extends BaseTask<Props> {
|
||||
);
|
||||
|
||||
collections.set(item.id, collection);
|
||||
}
|
||||
|
||||
// Documents
|
||||
for (const item of data.documents) {
|
||||
Logger.debug("task", `ImportTask persisting document ${item.id}`);
|
||||
let text = item.text;
|
||||
// Documents
|
||||
for (const item of data.documents.filter(
|
||||
(d) => d.collectionId === collection.id
|
||||
)) {
|
||||
Logger.debug(
|
||||
"task",
|
||||
`ImportTask persisting document ${item.title} (${item.id})`
|
||||
);
|
||||
let text = item.text;
|
||||
|
||||
// Check all of the attachments we've created against urls in the text
|
||||
// and replace them out with attachment redirect urls before saving.
|
||||
for (const aitem of data.attachments) {
|
||||
const attachment = attachments.get(aitem.id);
|
||||
if (!attachment) {
|
||||
continue;
|
||||
// Check all of the attachments we've created against urls in the text
|
||||
// and replace them out with attachment redirect urls before saving.
|
||||
for (const aitem of data.attachments) {
|
||||
text = text.replace(
|
||||
new RegExp(`<<${aitem.id}>>`, "g"),
|
||||
Attachment.getRedirectUrl(aitem.id)
|
||||
);
|
||||
}
|
||||
text = text.replace(
|
||||
new RegExp(`<<${attachment.id}>>`, "g"),
|
||||
attachment.redirectUrl
|
||||
);
|
||||
}
|
||||
|
||||
// Check all of the document we've created against urls in the text
|
||||
// and replace them out with a valid internal link. Because we are doing
|
||||
// this before saving, we can't use the document slug, but we can take
|
||||
// advantage of the fact that the document id will redirect in the client
|
||||
for (const ditem of data.documents) {
|
||||
text = text.replace(
|
||||
new RegExp(`<<${ditem.id}>>`, "g"),
|
||||
`/doc/${ditem.id}`
|
||||
);
|
||||
}
|
||||
// Check all of the document we've created against urls in the text
|
||||
// and replace them out with a valid internal link. Because we are doing
|
||||
// this before saving, we can't use the document slug, but we can take
|
||||
// advantage of the fact that the document id will redirect in the client
|
||||
for (const ditem of data.documents) {
|
||||
text = text.replace(
|
||||
new RegExp(`<<${ditem.id}>>`, "g"),
|
||||
`/doc/${ditem.id}`
|
||||
);
|
||||
}
|
||||
|
||||
const options: { urlId?: string } = {};
|
||||
if (item.urlId) {
|
||||
const existing = await Document.unscoped().findOne({
|
||||
attributes: ["id"],
|
||||
paranoid: false,
|
||||
transaction,
|
||||
where: {
|
||||
urlId: item.urlId,
|
||||
const options: { urlId?: string } = {};
|
||||
if (item.urlId) {
|
||||
const existing = await Document.unscoped().findOne({
|
||||
attributes: ["id"],
|
||||
paranoid: false,
|
||||
transaction,
|
||||
where: {
|
||||
urlId: item.urlId,
|
||||
},
|
||||
});
|
||||
|
||||
if (!existing) {
|
||||
options.urlId = item.urlId;
|
||||
}
|
||||
}
|
||||
|
||||
const document = await documentCreator({
|
||||
...options,
|
||||
sourceMetadata: {
|
||||
fileName: path.basename(item.path),
|
||||
mimeType: item.mimeType,
|
||||
externalId: item.externalId,
|
||||
},
|
||||
id: item.id,
|
||||
title: item.title,
|
||||
text,
|
||||
collectionId: item.collectionId,
|
||||
createdAt: item.createdAt,
|
||||
updatedAt: item.updatedAt ?? item.createdAt,
|
||||
publishedAt: item.updatedAt ?? item.createdAt ?? new Date(),
|
||||
parentDocumentId: item.parentDocumentId,
|
||||
importId: fileOperation.id,
|
||||
user,
|
||||
ip,
|
||||
transaction,
|
||||
});
|
||||
documents.set(item.id, document);
|
||||
|
||||
if (!existing) {
|
||||
options.urlId = item.urlId;
|
||||
}
|
||||
}
|
||||
|
||||
const document = await documentCreator({
|
||||
...options,
|
||||
sourceMetadata: {
|
||||
fileName: path.basename(item.path),
|
||||
mimeType: item.mimeType,
|
||||
externalId: item.externalId,
|
||||
},
|
||||
id: item.id,
|
||||
title: item.title,
|
||||
text,
|
||||
collectionId: item.collectionId,
|
||||
createdAt: item.createdAt,
|
||||
updatedAt: item.updatedAt ?? item.createdAt,
|
||||
publishedAt: item.updatedAt ?? item.createdAt ?? new Date(),
|
||||
parentDocumentId: item.parentDocumentId,
|
||||
importId: fileOperation.id,
|
||||
user,
|
||||
ip,
|
||||
transaction,
|
||||
});
|
||||
documents.set(item.id, document);
|
||||
|
||||
const collection = collections.get(item.collectionId);
|
||||
if (collection) {
|
||||
await collection.addDocumentToStructure(document, 0, {
|
||||
transaction,
|
||||
save: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Return value is only used for testing
|
||||
return {
|
||||
collections,
|
||||
documents,
|
||||
attachments,
|
||||
};
|
||||
await collection.save({ transaction });
|
||||
});
|
||||
}
|
||||
|
||||
// Attachments
|
||||
await sequelize.transaction(async (transaction) => {
|
||||
const chunks = chunk(data.attachments, 10);
|
||||
|
||||
for (const chunk of chunks) {
|
||||
// Parallelize 10 uploads at a time
|
||||
await Promise.all(
|
||||
chunk.map(async (item) => {
|
||||
Logger.debug(
|
||||
"task",
|
||||
`ImportTask persisting attachment ${item.name} (${item.id})`
|
||||
);
|
||||
const attachment = await attachmentCreator({
|
||||
source: "import",
|
||||
preset: AttachmentPreset.DocumentAttachment,
|
||||
id: item.id,
|
||||
name: item.name,
|
||||
type: item.mimeType,
|
||||
buffer: await item.buffer(),
|
||||
user,
|
||||
ip,
|
||||
transaction,
|
||||
});
|
||||
if (attachment) {
|
||||
attachments.set(item.id, attachment);
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
});
|
||||
} catch (err) {
|
||||
Logger.info(
|
||||
@@ -534,6 +536,13 @@ export default abstract class ImportTask extends BaseTask<Props> {
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
|
||||
// Return value is only used for testing
|
||||
return {
|
||||
collections,
|
||||
documents,
|
||||
attachments,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user