From ebbfd0abfca5bdcad668958c5e8bc156d2b6f213 Mon Sep 17 00:00:00 2001 From: Tien Tran Date: Mon, 7 Oct 2024 07:04:33 +1100 Subject: [PATCH] Support upload stream processing via new files.upload filter --- api/src/services/files.ts | 41 ++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/api/src/services/files.ts b/api/src/services/files.ts index 9381f16e9bd33..df31e36bfe021 100644 --- a/api/src/services/files.ts +++ b/api/src/services/files.ts @@ -109,25 +109,48 @@ export class FilesService extends ItemsService { await disk.delete(payload.filename_disk!); } } catch (err: any) { - if (isReplacement === true) { - logger.warn(`Couldn't delete temp file ${tempFilenameDisk}`); - } else { - logger.warn(`Couldn't delete file ${payload.filename_disk}`); + if (!err.message?.includes("ENOENT")) { + if (isReplacement === true) { + logger.warn(`Couldn't delete temp file ${tempFilenameDisk}`); + } else { + logger.warn(`Couldn't delete file ${payload.filename_disk}`); + } + + logger.warn(err); } - - logger.warn(err); } }; try { + const { stream: streamAfterHooks, reason, cleanup } = await emitter.emitFilter( + 'files.upload', + { stream, reason: "", cleanup: () => 0 }, + { + payload, + collection: this.collection, + }, + { + database: this.knex, + schema: this.schema, + accountability: this.accountability, + }, + ); + + if (!streamAfterHooks) { + if (typeof cleanup === "function") cleanup(); + throw new InvalidPayloadError({ reason: reason || 'Invalid payload' }); + } + // If this is a replacement, we'll write the file to a temp location first to ensure we don't overwrite the existing file if something goes wrong if (isReplacement === true) { - await disk.write(tempFilenameDisk, stream, payload.type); + await disk.write(tempFilenameDisk, streamAfterHooks, payload.type); } else { // If this is a new file upload, we'll write the file to the final location - await disk.write(payload.filename_disk, stream, payload.type); + await disk.write(payload.filename_disk, streamAfterHooks, payload.type); } + if (typeof cleanup === "function") cleanup(); + // Check if the file was truncated (if the stream ended early) and throw limit error if it was if ('truncated' in stream && stream.truncated === true) { throw new ContentTooLargeError(); @@ -138,7 +161,7 @@ export class FilesService extends ItemsService { await cleanUp(); - if (err instanceof ContentTooLargeError) { + if (err instanceof ContentTooLargeError || err instanceof InvalidPayloadError) { throw err; } else { throw new ServiceUnavailableError({ service: 'files', reason: `Couldn't save file ${payload.filename_disk}` });