From 93cd55fb79f4b595b3ea948444080688fe439fa3 Mon Sep 17 00:00:00 2001 From: Simon Detheridge Date: Wed, 29 Jan 2020 12:23:31 +0000 Subject: [PATCH] Refactor persistors to use a helper for common things --- services/filestore/app/js/FSPersistor.js | 31 ++-- services/filestore/app/js/PersistorHelper.js | 114 ++++++++++++++ services/filestore/app/js/S3Persistor.js | 141 +++++++----------- .../test/unit/js/FSPersistorTests.js | 5 +- .../test/unit/js/S3PersistorTests.js | 38 ++--- 5 files changed, 203 insertions(+), 126 deletions(-) create mode 100644 services/filestore/app/js/PersistorHelper.js diff --git a/services/filestore/app/js/FSPersistor.js b/services/filestore/app/js/FSPersistor.js index 3f54e2d091..a5b1a35c8c 100644 --- a/services/filestore/app/js/FSPersistor.js +++ b/services/filestore/app/js/FSPersistor.js @@ -8,6 +8,7 @@ const { promisify, callbackify } = require('util') const LocalFileWriter = require('./LocalFileWriter').promises const { NotFoundError, ReadError, WriteError } = require('./Errors') +const PersistorHelper = require('./PersistorHelper') const pipeline = promisify(Stream.pipeline) const fsUnlink = promisify(fs.unlink) @@ -28,7 +29,7 @@ async function sendFile(location, target, source) { const targetStream = fs.createWriteStream(`${location}/${filteredTarget}`) await pipeline(sourceStream, targetStream) } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'failed to copy the specified file', { location, target, source }, @@ -65,7 +66,7 @@ async function getFileStream(location, name, opts) { try { opts.fd = await fsOpen(`${location}/${filteredName}`, 'r') } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'failed to open file for streaming', { location, filteredName, opts }, @@ -83,7 +84,7 @@ async function getFileSize(location, filename) { const stat = await fsStat(fullPath) return stat.size } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'failed to stat file', { location, filename }, @@ -126,7 +127,7 @@ async function copyFile(location, fromName, toName) { const targetStream = fs.createWriteStream(`${location}/${filteredToName}`) await pipeline(sourceStream, targetStream) } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'failed to copy file', { location, filteredFromName, filteredToName }, @@ -140,7 +141,7 @@ async function deleteFile(location, name) { try { await fsUnlink(`${location}/${filteredName}`) } catch (err) { - const wrappedError = _wrapError( + const wrappedError = PersistorHelper.wrapError( err, 'failed to delete file', { location, filteredName }, @@ -161,7 +162,7 @@ async function deleteDirectory(location, name) { try { await rmrf(`${location}/${filteredName}`) } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'failed to delete directory', { location, filteredName }, @@ -179,7 +180,7 @@ async function checkIfFileExists(location, name) { if (err.code === 'ENOENT') { return false } - throw _wrapError( + throw PersistorHelper.wrapError( err, 'failed to stat file', { location, filteredName }, @@ -209,7 +210,7 @@ async function directorySize(location, name) { } } } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'failed to get directory size', { location, name }, @@ -220,20 +221,6 @@ async function directorySize(location, name) { return size } -function _wrapError(error, message, params, ErrorType) { - if (error.code === 'ENOENT') { - return new NotFoundError({ - message: 'no such file or directory', - info: params - }).withCause(error) - } else { - return new ErrorType({ - message: message, - info: params - }).withCause(error) - } -} - module.exports = { sendFile: callbackify(sendFile), sendStream: callbackify(sendStream), diff --git a/services/filestore/app/js/PersistorHelper.js b/services/filestore/app/js/PersistorHelper.js new file mode 100644 index 0000000000..d8beb4a0a9 --- /dev/null +++ b/services/filestore/app/js/PersistorHelper.js @@ -0,0 +1,114 @@ +const crypto = require('crypto') +const meter = require('stream-meter') +const Stream = require('stream') +const logger = require('logger-sharelatex') +const { WriteError, ReadError, NotFoundError } = require('./Errors') +const { promisify } = require('util') + +const pipeline = promisify(Stream.pipeline) + +module.exports = { + calculateStreamMd5, + verifyMd5, + getMeteredStream, + waitForStreamReady, + wrapError +} + +// returns a promise which resolves with the md5 hash of the stream +function calculateStreamMd5(stream) { + const hash = crypto.createHash('md5') + hash.setEncoding('hex') + + return new Promise((resolve, reject) => { + pipeline(stream, hash) + .then(() => { + hash.end() + resolve(hash.read()) + }) + .catch(err => { + reject(err) + }) + }) +} + +// verifies the md5 hash of a file against the supplied md5 or the one stored in +// storage if not supplied - deletes the new file if the md5 does not match and +// throws an error +async function verifyMd5(persistor, bucket, key, sourceMd5, destMd5 = null) { + if (!destMd5) { + destMd5 = await persistor.promises.getFileMd5Hash(bucket, key) + } + + if (sourceMd5 !== destMd5) { + try { + await persistor.promises.deleteFile(bucket, key) + } catch (err) { + logger.warn(err, 'error deleting file for invalid upload') + } + + throw new WriteError({ + message: 'source and destination hashes do not match', + info: { + sourceMd5, + destMd5, + bucket, + key + } + }) + } +} + +// returns the next stream in the pipeline, and calls the callback with the byte count +// when the stream finishes or receives an error +function getMeteredStream(stream, callback) { + const meteredStream = meter() + + pipeline(stream, meteredStream) + .then(() => { + callback(null, meteredStream.bytes) + }) + .catch(err => { + // on error, just send how many bytes we received before the stream stopped + callback(err, meteredStream.bytes) + }) + + return meteredStream +} + +// resolves when a stream is 'readable', or rejects if the stream throws an error +// before that happens - this lets us handle protocol-level errors before trying +// to read them +function waitForStreamReady(stream) { + return new Promise((resolve, reject) => { + const onError = function(err) { + reject(wrapError(err, 'error before stream became ready', {}, ReadError)) + } + const onStreamReady = function() { + stream.removeListener('readable', onStreamReady) + stream.removeListener('error', onError) + resolve(stream) + } + stream.on('readable', onStreamReady) + stream.on('error', onError) + }) +} + +function wrapError(error, message, params, ErrorType) { + if ( + error instanceof NotFoundError || + ['NoSuchKey', 'NotFound', 404, 'AccessDenied', 'ENOENT'].includes( + error.code + ) + ) { + return new NotFoundError({ + message: 'no such file', + info: params + }).withCause(error) + } else { + return new ErrorType({ + message: message, + info: params + }).withCause(error) + } +} diff --git a/services/filestore/app/js/S3Persistor.js b/services/filestore/app/js/S3Persistor.js index a10251a642..196d2aecda 100644 --- a/services/filestore/app/js/S3Persistor.js +++ b/services/filestore/app/js/S3Persistor.js @@ -5,11 +5,11 @@ https.globalAgent.maxSockets = 300 const settings = require('settings-sharelatex') const metrics = require('metrics-sharelatex') -const logger = require('logger-sharelatex') + +const PersistorHelper = require('./PersistorHelper') const meter = require('stream-meter') const Stream = require('stream') -const crypto = require('crypto') const fs = require('fs') const S3 = require('aws-sdk/clients/s3') const { URL } = require('url') @@ -21,7 +21,7 @@ const { SettingsError } = require('./Errors') -module.exports = { +const S3Persistor = { sendFile: callbackify(sendFile), sendStream: callbackify(sendStream), getFileStream: callbackify(getFileStream), @@ -46,6 +46,8 @@ module.exports = { } } +module.exports = S3Persistor + const pipeline = promisify(Stream.pipeline) function hexToBase64(hex) { @@ -57,7 +59,7 @@ async function sendFile(bucketName, key, fsPath) { try { readStream = fs.createReadStream(fsPath) } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'error reading file from disk', { bucketName, key, fsPath }, @@ -76,27 +78,14 @@ async function sendStream(bucketName, key, readStream, sourceMd5) { if (sourceMd5) { b64Hash = hexToBase64(sourceMd5) } else { - const hash = crypto.createHash('md5') - hash.setEncoding('hex') - pipeline(readStream, hash) - hashPromise = new Promise((resolve, reject) => { - readStream.on('end', () => { - hash.end() - resolve(hash.read()) - }) - readStream.on('error', err => { - reject(err) - }) - }) + hashPromise = PersistorHelper.calculateStreamMd5(readStream) } - const meteredStream = meter() - meteredStream.on('finish', () => { - metrics.count('s3.egress', meteredStream.bytes) + const meteredStream = PersistorHelper.getMeteredStream(readStream, (_, byteCount) => { + // ignore the error parameter and just log the byte count + metrics.count('s3.egress', byteCount) }) - pipeline(readStream, meteredStream) - // if we have an md5 hash, pass this to S3 to verify the upload const uploadOptions = { Bucket: bucketName, @@ -112,30 +101,21 @@ async function sendStream(bucketName, key, readStream, sourceMd5) { .promise() const destMd5 = _md5FromResponse(response) - // if we didn't have an md5 hash, compare our computed one with S3's + // if we didn't have an md5 hash, we should compare our computed one with S3's + // as we couldn't tell S3 about it beforehand if (hashPromise) { sourceMd5 = await hashPromise - - if (sourceMd5 !== destMd5) { - try { - await deleteFile(bucketName, key) - } catch (err) { - logger.warn(err, 'error deleting file for invalid upload') - } - - throw new WriteError({ - message: 'source and destination hashes do not match', - info: { - sourceMd5, - destMd5, - bucketName, - key - } - }) - } + // throws on mismatch + await PersistorHelper.verifyMd5( + S3Persistor, + bucketName, + key, + sourceMd5, + destMd5 + ) } } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'upload to S3 failed', { bucketName, key }, @@ -155,25 +135,29 @@ async function getFileStream(bucketName, key, opts) { params.Range = `bytes=${opts.start}-${opts.end}` } - return new Promise((resolve, reject) => { - const stream = _getClientForBucket(bucketName) - .getObject(params) - .createReadStream() + const stream = _getClientForBucket(bucketName) + .getObject(params) + .createReadStream() - const meteredStream = meter() - meteredStream.on('finish', () => { - metrics.count('s3.ingress', meteredStream.bytes) - }) - - const onStreamReady = function() { - stream.removeListener('readable', onStreamReady) - resolve(stream.pipe(meteredStream)) + const meteredStream = PersistorHelper.getMeteredStream( + stream, + (_, byteCount) => { + // ignore the error parameter and just log the byte count + metrics.count('s3.ingress', byteCount) } - stream.on('readable', onStreamReady) - stream.on('error', err => { - reject(_wrapError(err, 'error reading from S3', params, ReadError)) - }) - }) + ) + + try { + await PersistorHelper.waitForStreamReady(stream) + return meteredStream + } catch (err) { + throw PersistorHelper.wrapError( + err, + 'error reading file from S3', + { bucketName, key, opts }, + ReadError + ) + } } async function deleteDirectory(bucketName, key) { @@ -184,7 +168,7 @@ async function deleteDirectory(bucketName, key) { .listObjects({ Bucket: bucketName, Prefix: key }) .promise() } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'failed to list objects in S3', { bucketName, key }, @@ -205,7 +189,7 @@ async function deleteDirectory(bucketName, key) { }) .promise() } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'failed to delete objects in S3', { bucketName, key }, @@ -222,7 +206,7 @@ async function getFileSize(bucketName, key) { .promise() return response.ContentLength } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'error getting size of s3 object', { bucketName, key }, @@ -239,7 +223,7 @@ async function getFileMd5Hash(bucketName, key) { const md5 = _md5FromResponse(response) return md5 } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'error getting hash of s3 object', { bucketName, key }, @@ -255,7 +239,7 @@ async function deleteFile(bucketName, key) { .promise() } catch (err) { // s3 does not give us a NotFoundError here - throw _wrapError( + throw PersistorHelper.wrapError( err, 'failed to delete file in S3', { bucketName, key }, @@ -275,7 +259,12 @@ async function copyFile(bucketName, sourceKey, destKey) { .copyObject(params) .promise() } catch (err) { - throw _wrapError(err, 'failed to copy file in S3', params, WriteError) + throw PersistorHelper.wrapError( + err, + 'failed to copy file in S3', + params, + WriteError + ) } } @@ -287,7 +276,7 @@ async function checkIfFileExists(bucketName, key) { if (err instanceof NotFoundError) { return false } - throw _wrapError( + throw PersistorHelper.wrapError( err, 'error checking whether S3 object exists', { bucketName, key }, @@ -304,7 +293,7 @@ async function directorySize(bucketName, key) { return response.Contents.reduce((acc, item) => item.Size + acc, 0) } catch (err) { - throw _wrapError( + throw PersistorHelper.wrapError( err, 'error getting directory size in S3', { bucketName, key }, @@ -313,26 +302,6 @@ async function directorySize(bucketName, key) { } } -function _wrapError(error, message, params, ErrorType) { - // the AWS client can return one of 'NoSuchKey', 'NotFound' or 404 (integer) - // when something is not found, depending on the endpoint - if ( - ['NoSuchKey', 'NotFound', 404, 'AccessDenied', 'ENOENT'].includes( - error.code - ) - ) { - return new NotFoundError({ - message: 'no such file', - info: params - }).withCause(error) - } else { - return new ErrorType({ - message: message, - info: params - }).withCause(error) - } -} - const _clients = new Map() let _defaultClient diff --git a/services/filestore/test/unit/js/FSPersistorTests.js b/services/filestore/test/unit/js/FSPersistorTests.js index 1be8eea3e2..0a09869bc0 100644 --- a/services/filestore/test/unit/js/FSPersistorTests.js +++ b/services/filestore/test/unit/js/FSPersistorTests.js @@ -70,7 +70,10 @@ describe('FSPersistorTests', function() { glob, rimraf, stream, - crypto + crypto, + // imported by PersistorHelper but otherwise unused here + 'stream-meter': {}, + 'logger-sharelatex': {} }, globals: { console } }) diff --git a/services/filestore/test/unit/js/S3PersistorTests.js b/services/filestore/test/unit/js/S3PersistorTests.js index b9711572c2..9686deed5f 100644 --- a/services/filestore/test/unit/js/S3PersistorTests.js +++ b/services/filestore/test/unit/js/S3PersistorTests.js @@ -89,6 +89,7 @@ describe('S3PersistorTests', function() { } MeteredStream = { + type: 'metered', on: sinon.stub(), bytes: objectSize } @@ -103,7 +104,7 @@ describe('S3PersistorTests', function() { S3ReadStream = { on: sinon.stub(), - pipe: sinon.stub().returns('s3Stream'), + pipe: sinon.stub(), removeListener: sinon.stub() } S3ReadStream.on.withArgs('readable').yields() @@ -168,8 +169,8 @@ describe('S3PersistorTests', function() { stream = await S3Persistor.promises.getFileStream(bucket, key) }) - it('returns a stream', function() { - expect(stream).to.equal('s3Stream') + it('returns a metered stream', function() { + expect(stream).to.equal(MeteredStream) }) it('sets the AWS client up with credentials from settings', function() { @@ -184,7 +185,10 @@ describe('S3PersistorTests', function() { }) it('pipes the stream through the meter', function() { - expect(S3ReadStream.pipe).to.have.been.calledWith(MeteredStream) + expect(Stream.pipeline).to.have.been.calledWith( + S3ReadStream, + MeteredStream + ) }) it('records an ingress metric', function() { @@ -202,8 +206,8 @@ describe('S3PersistorTests', function() { }) }) - it('returns a stream', function() { - expect(stream).to.equal('s3Stream') + it('returns a metered stream', function() { + expect(stream).to.equal(MeteredStream) }) it('passes the byte range on to S3', function() { @@ -236,8 +240,8 @@ describe('S3PersistorTests', function() { stream = await S3Persistor.promises.getFileStream(bucket, key) }) - it('returns a stream', function() { - expect(stream).to.equal('s3Stream') + it('returns a metered stream', function() { + expect(stream).to.equal(MeteredStream) }) it('sets the AWS client up with the alternative credentials', function() { @@ -305,12 +309,12 @@ describe('S3PersistorTests', function() { expect(error).to.be.an.instanceOf(Errors.NotFoundError) }) - it('wraps the error from S3', function() { - expect(error.cause).to.equal(S3NotFoundError) + it('wraps the error', function() { + expect(error.cause).to.exist }) it('stores the bucket and key in the error', function() { - expect(error.info).to.deep.equal({ Bucket: bucket, Key: key }) + expect(error.info).to.include({ bucketName: bucket, key: key }) }) }) @@ -335,12 +339,12 @@ describe('S3PersistorTests', function() { expect(error).to.be.an.instanceOf(Errors.NotFoundError) }) - it('wraps the error from S3', function() { - expect(error.cause).to.equal(S3AccessDeniedError) + it('wraps the error', function() { + expect(error.cause).to.exist }) it('stores the bucket and key in the error', function() { - expect(error.info).to.deep.equal({ Bucket: bucket, Key: key }) + expect(error.info).to.include({ bucketName: bucket, key: key }) }) }) @@ -365,12 +369,12 @@ describe('S3PersistorTests', function() { expect(error).to.be.an.instanceOf(Errors.ReadError) }) - it('wraps the error from S3', function() { - expect(error.cause).to.equal(genericError) + it('wraps the error', function() { + expect(error.cause).to.exist }) it('stores the bucket and key in the error', function() { - expect(error.info).to.deep.equal({ Bucket: bucket, Key: key }) + expect(error.info).to.include({ bucketName: bucket, key: key }) }) }) })