Rearchitect import (#6141)
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
import path from "path";
|
||||
import { rm } from "fs-extra";
|
||||
import truncate from "lodash/truncate";
|
||||
import tmp from "tmp";
|
||||
import unzipper from "unzipper";
|
||||
import {
|
||||
AttachmentPreset,
|
||||
CollectionPermission,
|
||||
@@ -10,6 +13,7 @@ import { CollectionValidation } from "@shared/validations";
|
||||
import attachmentCreator from "@server/commands/attachmentCreator";
|
||||
import documentCreator from "@server/commands/documentCreator";
|
||||
import { serializer } from "@server/editor";
|
||||
import env from "@server/env";
|
||||
import { InternalError, ValidationError } from "@server/errors";
|
||||
import Logger from "@server/logging/Logger";
|
||||
import {
|
||||
@@ -98,19 +102,22 @@ export default abstract class ImportTask extends BaseTask<Props> {
|
||||
* @param props The props
|
||||
*/
|
||||
public async perform({ fileOperationId }: Props) {
|
||||
let dirPath;
|
||||
const fileOperation = await FileOperation.findByPk(fileOperationId, {
|
||||
rejectOnEmpty: true,
|
||||
});
|
||||
|
||||
try {
|
||||
Logger.info("task", `ImportTask fetching data for ${fileOperationId}`);
|
||||
const data = await this.fetchData(fileOperation);
|
||||
if (!data) {
|
||||
dirPath = await this.fetchAndExtractData(fileOperation);
|
||||
if (!dirPath) {
|
||||
throw InternalError("Failed to fetch data for import from storage.");
|
||||
}
|
||||
|
||||
Logger.info("task", `ImportTask parsing data for ${fileOperationId}`);
|
||||
const parsed = await this.parseData(data, fileOperation);
|
||||
Logger.info("task", `ImportTask parsing data for ${fileOperationId}`, {
|
||||
dirPath,
|
||||
});
|
||||
const parsed = await this.parseData(dirPath, fileOperation);
|
||||
|
||||
if (parsed.collections.length === 0) {
|
||||
throw ValidationError(
|
||||
@@ -152,6 +159,10 @@ export default abstract class ImportTask extends BaseTask<Props> {
|
||||
error
|
||||
);
|
||||
throw error;
|
||||
} finally {
|
||||
if (dirPath) {
|
||||
await this.cleanupExtractedData(dirPath, fileOperation);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,38 +190,70 @@ export default abstract class ImportTask extends BaseTask<Props> {
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the remote data associated with the file operation as a Buffer.
|
||||
* Fetch the remote data associated with the file operation into a temporary disk location.
|
||||
*
|
||||
* @param fileOperation The FileOperation to fetch data for
|
||||
* @returns A promise that resolves to the data as a buffer.
|
||||
* @returns A promise that resolves to the temporary file path.
|
||||
*/
|
||||
protected async fetchData(fileOperation: FileOperation): Promise<Buffer> {
|
||||
protected async fetchAndExtractData(
|
||||
fileOperation: FileOperation
|
||||
): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const bufs: Buffer[] = [];
|
||||
const stream = fileOperation.stream;
|
||||
if (!stream) {
|
||||
return reject(new Error("No stream available"));
|
||||
}
|
||||
|
||||
stream.on("data", function (d) {
|
||||
bufs.push(d);
|
||||
});
|
||||
stream.on("error", reject);
|
||||
stream.on("end", () => {
|
||||
resolve(Buffer.concat(bufs));
|
||||
tmp.dir((err, path) => {
|
||||
if (err) {
|
||||
return reject(err);
|
||||
}
|
||||
|
||||
const dest = unzipper
|
||||
.Extract({ path, verbose: env.isDevelopment })
|
||||
.on("error", reject)
|
||||
.on("close", () => resolve(path));
|
||||
|
||||
stream
|
||||
.on("error", (err) => {
|
||||
dest.end();
|
||||
reject(err);
|
||||
})
|
||||
.pipe(dest);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the data loaded from fetchData into a consistent structured format
|
||||
* Cleanup the temporary directory where the data was fetched and extracted.
|
||||
*
|
||||
* @param dirPath The temporary directory path where the data was fetched
|
||||
* @param fileOperation The associated FileOperation
|
||||
*/
|
||||
protected async cleanupExtractedData(
|
||||
dirPath: string,
|
||||
fileOperation: FileOperation
|
||||
) {
|
||||
try {
|
||||
await rm(dirPath, { recursive: true, force: true });
|
||||
} catch (error) {
|
||||
Logger.error(
|
||||
`ImportTask failed to cleanup extracted data for ${fileOperation.id}`,
|
||||
error
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the data loaded from fetchAndExtractData into a consistent structured format
|
||||
* that represents collections, documents, and the relationships between them.
|
||||
*
|
||||
* @param data The data loaded from fetchData
|
||||
* @param dirPath The temporary directory path where the data was fetched
|
||||
* @param fileOperation The FileOperation to parse data for
|
||||
* @returns A promise that resolves to the structured data
|
||||
*/
|
||||
protected abstract parseData(
|
||||
data: Buffer | NodeJS.ReadableStream,
|
||||
dirPath: string,
|
||||
fileOperation: FileOperation
|
||||
): Promise<StructuredImportData>;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user