diff --git a/server/models/Attachment.ts b/server/models/Attachment.ts index 3fe8dc103..11920323f 100644 --- a/server/models/Attachment.ts +++ b/server/models/Attachment.ts @@ -72,9 +72,9 @@ class Attachment extends IdModel { } /** - * Get the contents of this attachment as a Buffer + * Get the contents of this attachment as a readable stream. */ - get buffer() { + get stream() { return getFileByKey(this.key); } diff --git a/server/models/FileOperation.ts b/server/models/FileOperation.ts index f5889e40b..e4cd63b56 100644 --- a/server/models/FileOperation.ts +++ b/server/models/FileOperation.ts @@ -58,6 +58,9 @@ class FileOperation extends IdModel { @Column(DataType.BIGINT) size: number; + /** + * Mark the current file operation as expired and remove the file from storage. + */ expire = async function () { this.state = "expired"; try { @@ -70,7 +73,10 @@ class FileOperation extends IdModel { await this.save(); }; - get buffer() { + /** + * The file operation contents as a readable stream. + */ + get stream() { return getFileByKey(this.key); } diff --git a/server/queues/tasks/ExportDocumentTreeTask.ts b/server/queues/tasks/ExportDocumentTreeTask.ts index 471319033..5bc018963 100644 --- a/server/queues/tasks/ExportDocumentTreeTask.ts +++ b/server/queues/tasks/ExportDocumentTreeTask.ts @@ -55,10 +55,10 @@ export default abstract class ExportDocumentTreeTask extends ExportTask { await Promise.all( attachments.map(async (attachment) => { try { - const img = await getFileByKey(attachment.key); + const stream = getFileByKey(attachment.key); const dir = path.dirname(pathInZip); - if (img) { - zip.file(path.join(dir, attachment.key), img as Blob, { + if (stream) { + zip.file(path.join(dir, attachment.key), stream, { createFolders: true, }); } diff --git a/server/queues/tasks/ImportMarkdownZipTask.test.ts b/server/queues/tasks/ImportMarkdownZipTask.test.ts index 0039500bd..a5d47cab8 100644 --- a/server/queues/tasks/ImportMarkdownZipTask.test.ts +++ b/server/queues/tasks/ImportMarkdownZipTask.test.ts @@ -10,9 +10,9 @@ setupTestDatabase(); describe("ImportMarkdownZipTask", () => { it("should import the documents, attachments", async () => { const fileOperation = await buildFileOperation(); - Object.defineProperty(fileOperation, "buffer", { + Object.defineProperty(fileOperation, "stream", { get() { - return fs.readFileSync( + return fs.createReadStream( path.resolve(__dirname, "..", "..", "test", "fixtures", "outline.zip") ); }, @@ -33,9 +33,9 @@ describe("ImportMarkdownZipTask", () => { it("should throw an error with corrupt zip", async () => { const fileOperation = await buildFileOperation(); - Object.defineProperty(fileOperation, "buffer", { + Object.defineProperty(fileOperation, "stream", { get() { - return fs.readFileSync( + return fs.createReadStream( path.resolve(__dirname, "..", "..", "test", "fixtures", "corrupt.zip") ); }, @@ -59,9 +59,9 @@ describe("ImportMarkdownZipTask", () => { it("should throw an error with empty collection in zip", async () => { const fileOperation = await buildFileOperation(); - Object.defineProperty(fileOperation, "buffer", { + Object.defineProperty(fileOperation, "stream", { get() { - return fs.readFileSync( + return fs.createReadStream( path.resolve(__dirname, "..", "..", "test", "fixtures", "empty.zip") ); }, diff --git a/server/queues/tasks/ImportMarkdownZipTask.ts b/server/queues/tasks/ImportMarkdownZipTask.ts index 0f549fb01..cf1217032 100644 --- a/server/queues/tasks/ImportMarkdownZipTask.ts +++ b/server/queues/tasks/ImportMarkdownZipTask.ts @@ -10,10 +10,10 @@ import ImportTask, { StructuredImportData } from "./ImportTask"; export default class ImportMarkdownZipTask extends ImportTask { public async parseData( - buffer: Buffer, + stream: NodeJS.ReadableStream, fileOperation: FileOperation ): Promise { - const zip = await JSZip.loadAsync(buffer); + const zip = await JSZip.loadAsync(stream); const tree = ZipHelper.toFileTree(zip); return this.parseFileTree({ fileOperation, zip, tree }); diff --git a/server/queues/tasks/ImportNotionTask.test.ts b/server/queues/tasks/ImportNotionTask.test.ts index 082092e40..25034333d 100644 --- a/server/queues/tasks/ImportNotionTask.test.ts +++ b/server/queues/tasks/ImportNotionTask.test.ts @@ -10,9 +10,9 @@ setupTestDatabase(); describe("ImportNotionTask", () => { it("should import successfully from a Markdown export", async () => { const fileOperation = await buildFileOperation(); - Object.defineProperty(fileOperation, "buffer", { + Object.defineProperty(fileOperation, "stream", { get() { - return fs.readFileSync( + return fs.createReadStream( path.resolve( __dirname, "..", @@ -45,9 +45,9 @@ describe("ImportNotionTask", () => { it("should import successfully from a HTML export", async () => { const fileOperation = await buildFileOperation(); - Object.defineProperty(fileOperation, "buffer", { + Object.defineProperty(fileOperation, "stream", { get() { - return fs.readFileSync( + return fs.createReadStream( path.resolve( __dirname, "..", diff --git a/server/queues/tasks/ImportNotionTask.ts b/server/queues/tasks/ImportNotionTask.ts index 7b67523e2..4ad07d118 100644 --- a/server/queues/tasks/ImportNotionTask.ts +++ b/server/queues/tasks/ImportNotionTask.ts @@ -11,10 +11,10 @@ import ImportTask, { StructuredImportData } from "./ImportTask"; export default class ImportNotionTask extends ImportTask { public async parseData( - buffer: Buffer, + stream: NodeJS.ReadableStream, fileOperation: FileOperation ): Promise { - const zip = await JSZip.loadAsync(buffer); + const zip = await JSZip.loadAsync(stream); const tree = ZipHelper.toFileTree(zip); return this.parseFileTree({ fileOperation, zip, tree }); } diff --git a/server/queues/tasks/ImportTask.ts b/server/queues/tasks/ImportTask.ts index 1e72367e7..123ba9ef8 100644 --- a/server/queues/tasks/ImportTask.ts +++ b/server/queues/tasks/ImportTask.ts @@ -161,15 +161,27 @@ export default abstract class ImportTask extends BaseTask { } /** - * Fetch the remote data needed for the import, by default this will download - * any file associated with the FileOperation, save it to a temporary file, - * and return the path. + * Fetch the remote data associated with the file operation as a Buffer. * * @param fileOperation The FileOperation to fetch data for - * @returns string + * @returns A promise that resolves to the data as a buffer. */ - protected async fetchData(fileOperation: FileOperation) { - return fileOperation.buffer; + protected async fetchData(fileOperation: FileOperation): Promise { + return new Promise((resolve, reject) => { + const bufs: Buffer[] = []; + const stream = fileOperation.stream; + if (!stream) { + return reject(new Error("No stream available")); + } + + stream.on("data", function (d) { + bufs.push(d); + }); + stream.on("error", reject); + stream.on("end", () => { + resolve(Buffer.concat(bufs)); + }); + }); } /** diff --git a/server/utils/s3.ts b/server/utils/s3.ts index 88d79fb7e..4454ffdfe 100644 --- a/server/utils/s3.ts +++ b/server/utils/s3.ts @@ -185,15 +185,14 @@ export const getAWSKeyForFileOp = (teamId: string, name: string) => { return `${bucket}/${teamId}/${uuidv4()}/${name}-export.zip`; }; -export const getFileByKey = async (key: string) => { +export const getFileByKey = (key: string) => { const params = { Bucket: AWS_S3_UPLOAD_BUCKET_NAME, Key: key, }; try { - const data = await s3.getObject(params).promise(); - return data.Body || null; + return s3.getObject(params).createReadStream(); } catch (err) { Logger.error("Error getting file from S3 by key", err, { key,