diff --git a/libraries/object-persistor/src/FSPersistor.js b/libraries/object-persistor/src/FSPersistor.js index 4afc1cbb4a..058017038b 100644 --- a/libraries/object-persistor/src/FSPersistor.js +++ b/libraries/object-persistor/src/FSPersistor.js @@ -1,3 +1,4 @@ +const crypto = require('crypto') const fs = require('fs') const fsPromises = require('fs/promises') const globCallbacks = require('glob') @@ -41,30 +42,16 @@ module.exports = class FSPersistor extends AbstractPersistor { await this._ensureDirectoryExists(targetPath) const tempFilePath = await this._writeStreamToTempFile( location, - sourceStream + sourceStream, + opts ) try { - if (opts.sourceMd5) { - const actualMd5 = await _getFileMd5HashForPath(tempFilePath) - if (actualMd5 !== opts.sourceMd5) { - throw new WriteError('md5 hash mismatch', { - location, - target, - expectedMd5: opts.sourceMd5, - actualMd5, - }) - } - } - await fsPromises.rename(tempFilePath, targetPath) } finally { await this._cleanupTempFile(tempFilePath) } } catch (err) { - if (err instanceof WriteError) { - throw err - } throw PersistorHelper.wrapError( err, 'failed to write stream', @@ -116,7 +103,9 @@ module.exports = class FSPersistor extends AbstractPersistor { async getObjectMd5Hash(location, filename) { const fsPath = this._getFsPath(location, filename) try { - return await _getFileMd5HashForPath(fsPath) + const stream = fs.createReadStream(fsPath) + const hash = await PersistorHelper.calculateStreamMd5(stream) + return hash } catch (err) { throw new ReadError( 'unable to get md5 hash from file', @@ -231,30 +220,49 @@ module.exports = class FSPersistor extends AbstractPersistor { return size } - async _writeStreamToTempFile(location, stream) { + async _writeStreamToTempFile(location, stream, opts = {}) { const tempDirPath = await fsPromises.mkdtemp(Path.join(location, 'tmp-')) const tempFilePath = Path.join(tempDirPath, 'uploaded-file') + const transforms = [] + let md5Observer + if (opts.sourceMd5) { + md5Observer = createMd5Observer() + transforms.push(md5Observer.transform) + } + let timer if (this.metrics) { timer = new this.metrics.Timer('writingFile') } - const writeStream = fs.createWriteStream(tempFilePath) try { - await pipeline(stream, writeStream) + const writeStream = fs.createWriteStream(tempFilePath) + await pipeline(stream, ...transforms, writeStream) if (timer) { timer.done() } - return tempFilePath } catch (err) { - await fsPromises.rm(tempFilePath, { force: true }) + await this._cleanupTempFile(tempFilePath) throw new WriteError( 'problem writing temp file locally', - { err, tempFilePath }, + { tempFilePath }, err ) } + + if (opts.sourceMd5) { + const actualMd5 = md5Observer.hash.digest('hex') + if (actualMd5 !== opts.sourceMd5) { + await this._cleanupTempFile(tempFilePath) + throw new WriteError('md5 hash mismatch', { + expectedMd5: opts.sourceMd5, + actualMd5, + }) + } + } + + return tempFilePath } async _cleanupTempFile(tempFilePath) { @@ -283,7 +291,15 @@ module.exports = class FSPersistor extends AbstractPersistor { } } -async function _getFileMd5HashForPath(fullPath) { - const stream = fs.createReadStream(fullPath) - return PersistorHelper.calculateStreamMd5(stream) +function createMd5Observer() { + const hash = crypto.createHash('md5') + + async function* transform(chunks) { + for await (const chunk of chunks) { + hash.update(chunk) + yield chunk + } + } + + return { hash, transform } } diff --git a/libraries/object-persistor/test/unit/FSPersistorTests.js b/libraries/object-persistor/test/unit/FSPersistorTests.js index c8e7172218..220c7f95e4 100644 --- a/libraries/object-persistor/test/unit/FSPersistorTests.js +++ b/libraries/object-persistor/test/unit/FSPersistorTests.js @@ -147,7 +147,7 @@ describe('FSPersistorTests', function () { persistor.sendStream(location, files.wombat, stream, { sourceMd5: md5('wrong content'), }) - ).to.be.rejectedWith(Errors.WriteError, 'md5 hash mismatch') + ).to.be.rejectedWith(Errors.WriteError) }) it('should not write the target file', async function () { @@ -236,7 +236,7 @@ describe('FSPersistorTests', function () { persistor.sendStream(location, files.wombat, stream, { sourceMd5: md5('wrong content'), }) - ).to.be.rejectedWith(Errors.WriteError, 'md5 hash mismatch') + ).to.be.rejectedWith(Errors.WriteError) }) it('should not update the target file', async function () {