diff --git a/services/filestore/.eslintrc b/services/filestore/.eslintrc index 42a4b5cace..73103de7f6 100644 --- a/services/filestore/.eslintrc +++ b/services/filestore/.eslintrc @@ -23,7 +23,8 @@ "rules": { // Swap the no-unused-expressions rule with a more chai-friendly one "no-unused-expressions": 0, - "chai-friendly/no-unused-expressions": "error" + "chai-friendly/no-unused-expressions": "error", + "no-console": "error" }, "overrides": [ { diff --git a/services/filestore/app/js/FSPersistor.js b/services/filestore/app/js/FSPersistor.js index 2ba65f06d2..3f54e2d091 100644 --- a/services/filestore/app/js/FSPersistor.js +++ b/services/filestore/app/js/FSPersistor.js @@ -1,6 +1,7 @@ const fs = require('fs') const glob = require('glob') const path = require('path') +const crypto = require('crypto') const rimraf = require('rimraf') const Stream = require('stream') const { promisify, callbackify } = require('util') @@ -36,11 +37,22 @@ async function sendFile(location, target, source) { } } -async function sendStream(location, target, sourceStream) { +async function sendStream(location, target, sourceStream, sourceMd5) { const fsPath = await LocalFileWriter.writeStream(sourceStream) + if (!sourceMd5) { + sourceMd5 = await _getFileMd5HashForPath(fsPath) + } try { await sendFile(location, target, fsPath) + const destMd5 = await getFileMd5Hash(location, target) + if (sourceMd5 !== destMd5) { + await LocalFileWriter.deleteFile(`${location}/${filterName(target)}`) + throw new WriteError({ + message: 'md5 hash mismatch', + info: { sourceMd5, destMd5, location, target } + }) + } } finally { await LocalFileWriter.deleteFile(fsPath) } @@ -80,6 +92,31 @@ async function getFileSize(location, filename) { } } +async function getFileMd5Hash(location, filename) { + const fullPath = path.join(location, filterName(filename)) + try { + return await _getFileMd5HashForPath(fullPath) + } catch (err) { + throw new ReadError({ + message: 'unable to get md5 hash from file', + info: { location, filename } + }).withCause(err) + } +} + +async function _getFileMd5HashForPath(fullPath) { + return new Promise((resolve, reject) => { + const readStream = fs.createReadStream(fullPath) + const hash = crypto.createHash('md5') + hash.setEncoding('hex') + readStream.on('end', () => { + hash.end() + resolve(hash.read()) + }) + pipeline(readStream, hash).catch(reject) + }) +} + async function copyFile(location, fromName, toName) { const filteredFromName = filterName(fromName) const filteredToName = filterName(toName) @@ -202,6 +239,7 @@ module.exports = { sendStream: callbackify(sendStream), getFileStream: callbackify(getFileStream), getFileSize: callbackify(getFileSize), + getFileMd5Hash: callbackify(getFileMd5Hash), copyFile: callbackify(copyFile), deleteFile: callbackify(deleteFile), deleteDirectory: callbackify(deleteDirectory), @@ -212,6 +250,7 @@ module.exports = { sendStream, getFileStream, getFileSize, + getFileMd5Hash, copyFile, deleteFile, deleteDirectory, diff --git a/services/filestore/app/js/MigrationPersistor.js b/services/filestore/app/js/MigrationPersistor.js index 9f7a834f31..fdc31368a3 100644 --- a/services/filestore/app/js/MigrationPersistor.js +++ b/services/filestore/app/js/MigrationPersistor.js @@ -1,8 +1,9 @@ const metrics = require('metrics-sharelatex') const Settings = require('settings-sharelatex') const logger = require('logger-sharelatex') +const Minipass = require('minipass') const { callbackify } = require('util') -const { NotFoundError } = require('./Errors') +const { NotFoundError, WriteError } = require('./Errors') // Persistor that wraps two other persistors. Talks to the 'primary' by default, // but will fall back to an older persistor in the case of a not-found error. @@ -14,7 +15,7 @@ const { NotFoundError } = require('./Errors') // e.g. // Settings.filestore.fallback.buckets = { // myBucketOnS3: 'myBucketOnGCS' -// }s +// } module.exports = function(primary, fallback) { function _wrapMethodOnBothPersistors(method) { @@ -40,10 +41,7 @@ module.exports = function(primary, fallback) { } function _getFallbackBucket(bucket) { - return ( - Settings.filestore.fallback.buckets && - Settings.filestore.fallback.buckets[bucket] - ) + return Settings.filestore.fallback.buckets[bucket] } function _wrapFallbackMethod(method, enableCopy = true) { @@ -68,20 +66,130 @@ module.exports = function(primary, fallback) { } } - async function _copyFileFromFallback( + async function _getFileStreamAndCopyIfRequired(bucketName, key, opts) { + const shouldCopy = + Settings.filestore.fallback.copyOnMiss && !opts.start && !opts.end + + try { + return await primary.promises.getFileStream(bucketName, key, opts) + } catch (err) { + if (err instanceof NotFoundError) { + const fallbackBucket = _getFallbackBucket(bucketName) + if (shouldCopy) { + return _copyFileFromFallback( + fallbackBucket, + bucketName, + key, + key, + true + ) + } else { + return fallback.promises.getFileStream(fallbackBucket, key, opts) + } + } + throw err + } + } + + async function _copyFromFallbackStreamAndVerify( + stream, sourceBucket, destBucket, sourceKey, destKey ) { + try { + let sourceMd5 + try { + sourceMd5 = await fallback.promises.getFileMd5Hash( + sourceBucket, + sourceKey + ) + } catch (err) { + logger.warn(err, 'error getting md5 hash from fallback persistor') + } + + await primary.promises.sendStream(destBucket, destKey, stream, sourceMd5) + } catch (err) { + let error = err + metrics.inc('fallback.copy.failure') + + try { + await primary.promises.deleteFile(destBucket, destKey) + } catch (err) { + error = new WriteError({ + message: 'unable to clean up destination copy artifact', + info: { + destBucket, + destKey + } + }).withCause(err) + } + + error = new WriteError({ + message: 'unable to copy file to destination persistor', + info: { + sourceBucket, + destBucket, + sourceKey, + destKey + } + }).withCause(error) + + logger.warn({ error }, 'failed to copy file from fallback') + throw error + } + } + + async function _copyFileFromFallback( + sourceBucket, + destBucket, + sourceKey, + destKey, + returnStream = false + ) { + metrics.inc('fallback.copy') const sourceStream = await fallback.promises.getFileStream( sourceBucket, sourceKey, {} ) - await primary.promises.sendStream(destBucket, destKey, sourceStream) - metrics.inc('fallback.copy') + if (!returnStream) { + return _copyFromFallbackStreamAndVerify( + sourceStream, + sourceBucket, + destBucket, + sourceKey, + destKey + ) + } + + const tee = new Minipass() + const clientStream = new Minipass() + const copyStream = new Minipass() + + tee.pipe(clientStream) + tee.pipe(copyStream) + + // copy the file in the background + _copyFromFallbackStreamAndVerify( + copyStream, + sourceBucket, + destBucket, + sourceKey, + destKey + ).catch( + // the error handler in this method will log a metric and a warning, so + // we don't need to do anything extra here, but catching it will prevent + // unhandled promise rejection warnings + () => {} + ) + + // start piping the source stream into the tee after everything is set up, + // otherwise one stream may consume bytes that don't arrive at the other + sourceStream.pipe(tee) + return clientStream } return { @@ -89,7 +197,8 @@ module.exports = function(primary, fallback) { fallbackPersistor: fallback, sendFile: primary.sendFile, sendStream: primary.sendStream, - getFileStream: callbackify(_wrapFallbackMethod('getFileStream')), + getFileStream: callbackify(_getFileStreamAndCopyIfRequired), + getFileMd5Hash: callbackify(_wrapFallbackMethod('getFileMd5Hash')), deleteDirectory: callbackify( _wrapMethodOnBothPersistors('deleteDirectory') ), @@ -97,17 +206,18 @@ module.exports = function(primary, fallback) { deleteFile: callbackify(_wrapMethodOnBothPersistors('deleteFile')), copyFile: callbackify(copyFileWithFallback), checkIfFileExists: callbackify(_wrapFallbackMethod('checkIfFileExists')), - directorySize: callbackify(_wrapFallbackMethod('directorySize', false)), + directorySize: callbackify(_wrapFallbackMethod('directorySize')), promises: { sendFile: primary.promises.sendFile, sendStream: primary.promises.sendStream, - getFileStream: _wrapFallbackMethod('getFileStream'), + getFileStream: _getFileStreamAndCopyIfRequired, + getFileMd5Hash: _wrapFallbackMethod('getFileMd5Hash'), deleteDirectory: _wrapMethodOnBothPersistors('deleteDirectory'), getFileSize: _wrapFallbackMethod('getFileSize'), deleteFile: _wrapMethodOnBothPersistors('deleteFile'), copyFile: copyFileWithFallback, checkIfFileExists: _wrapFallbackMethod('checkIfFileExists'), - directorySize: _wrapFallbackMethod('directorySize', false) + directorySize: _wrapFallbackMethod('directorySize') } } } diff --git a/services/filestore/app/js/S3Persistor.js b/services/filestore/app/js/S3Persistor.js index 6d22823401..ef465da25c 100644 --- a/services/filestore/app/js/S3Persistor.js +++ b/services/filestore/app/js/S3Persistor.js @@ -5,8 +5,11 @@ https.globalAgent.maxSockets = 300 const settings = require('settings-sharelatex') const metrics = require('metrics-sharelatex') +const logger = require('logger-sharelatex') +const Minipass = require('minipass') const meter = require('stream-meter') +const crypto = require('crypto') const fs = require('fs') const S3 = require('aws-sdk/clients/s3') const { URL } = require('url') @@ -22,6 +25,7 @@ module.exports = { sendFile: callbackify(sendFile), sendStream: callbackify(sendStream), getFileStream: callbackify(getFileStream), + getFileMd5Hash: callbackify(getFileMd5Hash), deleteDirectory: callbackify(deleteDirectory), getFileSize: callbackify(getFileSize), deleteFile: callbackify(deleteFile), @@ -32,6 +36,7 @@ module.exports = { sendFile, sendStream, getFileStream, + getFileMd5Hash, deleteDirectory, getFileSize, deleteFile, @@ -41,6 +46,10 @@ module.exports = { } } +function hexToBase64(hex) { + return Buffer.from(hex, 'hex').toString('base64') +} + async function sendFile(bucketName, key, fsPath) { let readStream try { @@ -56,20 +65,79 @@ async function sendFile(bucketName, key, fsPath) { return sendStream(bucketName, key, readStream) } -async function sendStream(bucketName, key, readStream) { +async function sendStream(bucketName, key, readStream, sourceMd5) { try { + // if there is no supplied md5 hash, we calculate the hash as the data passes through + const passthroughStream = new Minipass() + let hashPromise + let b64Hash + + if (sourceMd5) { + b64Hash = hexToBase64(sourceMd5) + } else { + const hash = crypto.createHash('md5') + hash.setEncoding('hex') + passthroughStream.pipe(hash) + hashPromise = new Promise((resolve, reject) => { + passthroughStream.on('end', () => { + hash.end() + resolve(hash.read()) + }) + passthroughStream.on('error', err => { + reject(err) + }) + }) + } + const meteredStream = meter() + passthroughStream.pipe(meteredStream) meteredStream.on('finish', () => { metrics.count('s3.egress', meteredStream.bytes) }) - await _getClientForBucket(bucketName) - .upload({ - Bucket: bucketName, - Key: key, - Body: readStream.pipe(meteredStream) - }) + // pipe the readstream through minipass, which can write to both the metered + // stream (which goes on to S3) and the md5 generator if necessary + // - we do this last so that a listener streams does not consume data meant + // for both destinations + readStream.pipe(passthroughStream) + + // if we have an md5 hash, pass this to S3 to verify the upload + const uploadOptions = { + Bucket: bucketName, + Key: key, + Body: meteredStream + } + if (b64Hash) { + uploadOptions.ContentMD5 = b64Hash + } + + const response = await _getClientForBucket(bucketName) + .upload(uploadOptions) .promise() + const destMd5 = _md5FromResponse(response) + + // if we didn't have an md5 hash, compare our computed one with S3's + if (hashPromise) { + sourceMd5 = await hashPromise + + if (sourceMd5 !== destMd5) { + try { + await deleteFile(bucketName, key) + } catch (err) { + logger.warn(err, 'error deleting file for invalid upload') + } + + throw new WriteError({ + message: 'source and destination hashes do not match', + info: { + sourceMd5, + destMd5, + bucketName, + key + } + }) + } + } } catch (err) { throw _wrapError( err, @@ -167,6 +235,23 @@ async function getFileSize(bucketName, key) { } } +async function getFileMd5Hash(bucketName, key) { + try { + const response = await _getClientForBucket(bucketName) + .headObject({ Bucket: bucketName, Key: key }) + .promise() + const md5 = _md5FromResponse(response) + return md5 + } catch (err) { + throw _wrapError( + err, + 'error getting hash of s3 object', + { bucketName, key }, + ReadError + ) + } +} + async function deleteFile(bucketName, key) { try { await _getClientForBucket(bucketName) @@ -314,3 +399,18 @@ function _buildClientOptions(bucketCredentials) { return options } + +function _md5FromResponse(response) { + const md5 = (response.ETag || '').replace(/[ "]/g, '') + if (!md5.match(/^[a-f0-9]{32}$/)) { + throw new ReadError({ + message: 's3 etag not in md5-hash format', + info: { + md5, + eTag: response.ETag + } + }) + } + + return md5 +} diff --git a/services/filestore/config/settings.defaults.coffee b/services/filestore/config/settings.defaults.coffee index a4a2df2d24..bb124ae8e0 100644 --- a/services/filestore/config/settings.defaults.coffee +++ b/services/filestore/config/settings.defaults.coffee @@ -17,8 +17,8 @@ unless process.env['BACKEND']? else process.env['BACKEND'] = "fs" process.env['USER_FILES_BUCKET_NAME'] = Path.resolve(__dirname + "/../user_files") - process.env['TEMPLATE_FILES_BUCKET_NAME'] = Path.resolve(__dirname + "/../public_files") - process.env['PUBLIC_FILES_BUCKET_NAME'] = Path.resolve(__dirname + "/../template_files") + process.env['TEMPLATE_FILES_BUCKET_NAME'] = Path.resolve(__dirname + "/../template_files") + process.env['PUBLIC_FILES_BUCKET_NAME'] = Path.resolve(__dirname + "/../public_files") settings = internal: @@ -51,8 +51,8 @@ settings = backend: process.env['FALLBACK_BACKEND'] # mapping of bucket names on the fallback, to bucket names on the primary. # e.g. { myS3UserFilesBucketName: 'myGoogleUserFilesBucketName' } - buckets: JSON.parse process.env['FALLBACK_BUCKET_MAPPING'] if process.env['FALLBACK_BUCKET_MAPPING']? - copyOnMiss: if process.env['COPY_ON_MISS'] == 'true' then true else false + buckets: JSON.parse(process.env['FALLBACK_BUCKET_MAPPING'] || '{}') + copyOnMiss: process.env['COPY_ON_MISS'] == 'true' path: uploadFolder: Path.resolve(__dirname + "/../uploads") diff --git a/services/filestore/npm-shrinkwrap.json b/services/filestore/npm-shrinkwrap.json index b343d6ad2c..a4206a94e0 100644 --- a/services/filestore/npm-shrinkwrap.json +++ b/services/filestore/npm-shrinkwrap.json @@ -3129,6 +3129,21 @@ "resolved": "https://registry.npmjs.org/minimist/-/minimist-0.0.8.tgz", "integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0=" }, + "minipass": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/minipass/-/minipass-3.1.1.tgz", + "integrity": "sha512-UFqVihv6PQgwj8/yTGvl9kPz7xIAY+R5z6XYjRInD3Gk3qx6QGSD6zEcpeG4Dy/lQnv1J6zv8ejV90hyYIKf3w==", + "requires": { + "yallist": "^4.0.0" + }, + "dependencies": { + "yallist": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", + "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" + } + } + }, "mkdirp": { "version": "0.5.1", "resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-0.5.1.tgz", diff --git a/services/filestore/package.json b/services/filestore/package.json index 303393bd56..2e9cef8aa0 100644 --- a/services/filestore/package.json +++ b/services/filestore/package.json @@ -31,6 +31,7 @@ "knox": "~0.9.1", "logger-sharelatex": "^1.7.0", "metrics-sharelatex": "^2.2.0", + "minipass": "^3.1.1", "mocha": "5.2.0", "node-transloadit": "0.0.4", "node-uuid": "~1.4.1", diff --git a/services/filestore/test/acceptance/js/FilestoreTests.js b/services/filestore/test/acceptance/js/FilestoreTests.js index 5a0de3abd8..1c96445a3a 100644 --- a/services/filestore/test/acceptance/js/FilestoreTests.js +++ b/services/filestore/test/acceptance/js/FilestoreTests.js @@ -84,11 +84,11 @@ const BackendSettings = { __dirname, '../../../user_files' ), - [process.env.AWS_S3_TEMPLATE_FILES_BUCKET_NAME]: Path.resolve( + [process.env.AWS_S3_PUBLIC_FILES_BUCKET_NAME]: Path.resolve( __dirname, '../../../public_files' ), - [process.env.AWS_S3_PUBLIC_FILES_BUCKET_NAME]: Path.resolve( + [process.env.AWS_S3_TEMPLATE_FILES_BUCKET_NAME]: Path.resolve( __dirname, '../../../template_files' ) @@ -114,9 +114,9 @@ const BackendSettings = { [Path.resolve(__dirname, '../../../user_files')]: process.env .AWS_S3_USER_FILES_BUCKET_NAME, [Path.resolve(__dirname, '../../../public_files')]: process.env - .AWS_S3_TEMPLATE_FILES_BUCKET_NAME, + .AWS_S3_PUBLIC_FILES_BUCKET_NAME, [Path.resolve(__dirname, '../../../template_files')]: process.env - .AWS_S3_PUBLIC_FILES_BUCKET_NAME + .AWS_S3_TEMPLATE_FILES_BUCKET_NAME } } } @@ -130,7 +130,7 @@ describe('Filestore', function() { // redefine the test suite for every available backend Object.keys(BackendSettings).forEach(backend => { describe(backend, function() { - let app, previousEgress, previousIngress + let app, previousEgress, previousIngress, projectId before(async function() { // create the app with the relevant filestore settings @@ -151,6 +151,7 @@ describe('Filestore', function() { getMetric(filestoreUrl, 's3_ingress') ]) } + projectId = `acceptance_tests_${Math.random()}` }) it('should send a 200 for the status endpoint', async function() { @@ -174,7 +175,7 @@ describe('Filestore', function() { beforeEach(async function() { fileId = Math.random() - fileUrl = `${filestoreUrl}/project/acceptance_tests/file/${directoryName}%2F${fileId}` + fileUrl = `${filestoreUrl}/project/${projectId}/file/${directoryName}%2F${fileId}` constantFileContent = [ 'hello world', `line 2 goes here ${Math.random()}`, @@ -242,7 +243,7 @@ describe('Filestore', function() { }) it('should be able to copy files', async function() { - const newProjectID = 'acceptance_tests_copied_project' + const newProjectID = `acceptance_tests_copied_project_${Math.random()}` const newFileId = Math.random() const newFileUrl = `${filestoreUrl}/project/${newProjectID}/file/${directoryName}%2F${newFileId}` const opts = { @@ -250,7 +251,7 @@ describe('Filestore', function() { uri: newFileUrl, json: { source: { - project_id: 'acceptance_tests', + project_id: projectId, file_id: `${directoryName}/${fileId}` } } @@ -304,7 +305,7 @@ describe('Filestore', function() { }) describe('with multiple files', function() { - let fileIds, fileUrls, project + let fileIds, fileUrls const directoryName = 'directory' const localFileReadPaths = [ '/tmp/filestore_acceptance_tests_file_read_1.txt', @@ -331,11 +332,10 @@ describe('Filestore', function() { }) beforeEach(async function() { - project = `acceptance_tests_${Math.random()}` fileIds = [Math.random(), Math.random()] fileUrls = [ - `${filestoreUrl}/project/${project}/file/${directoryName}%2F${fileIds[0]}`, - `${filestoreUrl}/project/${project}/file/${directoryName}%2F${fileIds[1]}` + `${filestoreUrl}/project/${projectId}/file/${directoryName}%2F${fileIds[0]}`, + `${filestoreUrl}/project/${projectId}/file/${directoryName}%2F${fileIds[1]}` ] const writeStreams = [ @@ -359,7 +359,7 @@ describe('Filestore', function() { it('should get the directory size', async function() { const response = await rp.get( - `${filestoreUrl}/project/${project}/size` + `${filestoreUrl}/project/${projectId}/size` ) expect(parseInt(JSON.parse(response.body)['total bytes'])).to.equal( constantFileContents[0].length + constantFileContents[1].length @@ -459,7 +459,6 @@ describe('Filestore', function() { fileUrl, bucket, fallbackBucket - const projectId = 'acceptance_tests' beforeEach(function() { constantFileContent = `This is yet more file content ${Math.random()}` @@ -503,14 +502,20 @@ describe('Filestore', function() { expect(res.body).to.equal(constantFileContent) }) - it('should not copy the file to the primary', async function() { - await rp.get(fileUrl) + describe('when copyOnMiss is disabled', function() { + beforeEach(function() { + Settings.filestore.fallback.copyOnMiss = false + }) - await expectPersistorNotToHaveFile( - app.persistor.primaryPersistor, - bucket, - fileKey - ) + it('should not copy the file to the primary', async function() { + await rp.get(fileUrl) + + await expectPersistorNotToHaveFile( + app.persistor.primaryPersistor, + bucket, + fileKey + ) + }) }) describe('when copyOnMiss is enabled', function() { @@ -534,9 +539,9 @@ describe('Filestore', function() { describe('when copying a file', function() { let newFileId, newFileUrl, newFileKey - const newProjectID = 'acceptance_tests_copied_project' beforeEach(async function() { + const newProjectID = `acceptance_tests_copied_project_${Math.random()}` newFileId = Math.random() newFileUrl = `${filestoreUrl}/project/${newProjectID}/file/${directoryName}%2F${newFileId}` newFileKey = `${newProjectID}/${directoryName}/${newFileId}` @@ -546,7 +551,7 @@ describe('Filestore', function() { uri: newFileUrl, json: { source: { - project_id: 'acceptance_tests', + project_id: projectId, file_id: `${directoryName}/${fileId}` } } @@ -616,7 +621,7 @@ describe('Filestore', function() { await expectPersistorNotToHaveFile( app.persistor.fallbackPersistor, fallbackBucket, - `acceptance_tests/${directoryName}/${fileId}` + `${projectId}/${directoryName}/${fileId}` ) }) }) @@ -706,7 +711,7 @@ describe('Filestore', function() { beforeEach(async function() { fileId = Math.random() - fileUrl = `${filestoreUrl}/project/acceptance_tests/file/${directoryName}%2F${fileId}` + fileUrl = `${filestoreUrl}/project/${projectId}/file/${directoryName}%2F${fileId}` const stat = await fsStat(localFileReadPath) localFileSize = stat.size const writeStream = request.post(fileUrl) diff --git a/services/filestore/test/unit/js/FSPersistorTests.js b/services/filestore/test/unit/js/FSPersistorTests.js index ba343c548c..1be8eea3e2 100644 --- a/services/filestore/test/unit/js/FSPersistorTests.js +++ b/services/filestore/test/unit/js/FSPersistorTests.js @@ -12,19 +12,32 @@ const modulePath = '../../../app/js/FSPersistor.js' describe('FSPersistorTests', function() { const stat = { size: 4, isFile: sinon.stub().returns(true) } const fd = 1234 - const readStream = 'readStream' const writeStream = 'writeStream' const remoteStream = 'remoteStream' const tempFile = '/tmp/potato.txt' const location = '/foo' const error = new Error('guru meditation error') + const md5 = 'ffffffff' const files = ['animals/wombat.tex', 'vegetables/potato.tex'] const globs = [`${location}/${files[0]}`, `${location}/${files[1]}`] const filteredFilenames = ['animals_wombat.tex', 'vegetables_potato.tex'] - let fs, rimraf, stream, LocalFileWriter, FSPersistor, glob + let fs, + rimraf, + stream, + LocalFileWriter, + FSPersistor, + glob, + readStream, + crypto, + Hash beforeEach(function() { + readStream = { + name: 'readStream', + on: sinon.stub().yields(), + pipe: sinon.stub() + } fs = { createReadStream: sinon.stub().returns(readStream), createWriteStream: sinon.stub().returns(writeStream), @@ -41,6 +54,14 @@ describe('FSPersistorTests', function() { deleteFile: sinon.stub().resolves() } } + Hash = { + end: sinon.stub(), + read: sinon.stub().returns(md5), + setEncoding: sinon.stub() + } + crypto = { + createHash: sinon.stub().returns(Hash) + } FSPersistor = SandboxedModule.require(modulePath, { requires: { './LocalFileWriter': LocalFileWriter, @@ -48,7 +69,8 @@ describe('FSPersistorTests', function() { fs, glob, rimraf, - stream + stream, + crypto }, globals: { console } }) @@ -103,6 +125,35 @@ describe('FSPersistorTests', function() { await FSPersistor.promises.sendStream(location, files[0], remoteStream) expect(fs.createReadStream).to.have.been.calledWith(tempFile) }) + + describe('when the md5 hash does not match', function() { + it('should return a write error', async function() { + await expect( + FSPersistor.promises.sendStream( + location, + files[0], + remoteStream, + '00000000' + ) + ) + .to.eventually.be.rejected.and.be.an.instanceOf(Errors.WriteError) + .and.have.property('message', 'md5 hash mismatch') + }) + + it('deletes the copied file', async function() { + try { + await FSPersistor.promises.sendStream( + location, + files[0], + remoteStream, + '00000000' + ) + } catch (_) {} + expect(LocalFileWriter.promises.deleteFile).to.have.been.calledWith( + `${location}/${filteredFilenames[0]}` + ) + }) + }) }) describe('getFileStream', function() { diff --git a/services/filestore/test/unit/js/MigrationPersistorTests.js b/services/filestore/test/unit/js/MigrationPersistorTests.js index 1cc8324d46..83159f38ad 100644 --- a/services/filestore/test/unit/js/MigrationPersistorTests.js +++ b/services/filestore/test/unit/js/MigrationPersistorTests.js @@ -21,35 +21,53 @@ describe('MigrationPersistorTests', function() { const genericError = new Error('guru meditation error') const notFoundError = new Errors.NotFoundError('not found') const size = 33 - const fileStream = 'fileStream' + const md5 = 'ffffffff' - function newPersistor(hasFile) { - return { - promises: { - sendFile: sinon.stub().resolves(), - sendStream: sinon.stub().resolves(), - getFileStream: hasFile - ? sinon.stub().resolves(fileStream) - : sinon.stub().rejects(notFoundError), - deleteDirectory: sinon.stub().resolves(), - getFileSize: hasFile - ? sinon.stub().resolves(size) - : sinon.stub().rejects(notFoundError), - deleteFile: sinon.stub().resolves(), - copyFile: hasFile - ? sinon.stub().resolves() - : sinon.stub().rejects(notFoundError), - checkIfFileExists: sinon.stub().resolves(hasFile), - directorySize: hasFile - ? sinon.stub().resolves(size) - : sinon.stub().rejects(notFoundError) - } - } - } - - let Metrics, Settings, Logger, MigrationPersistor + let Metrics, + Settings, + Logger, + MigrationPersistor, + Minipass, + fileStream, + newPersistor beforeEach(function() { + fileStream = { + name: 'fileStream', + on: sinon + .stub() + .withArgs('end') + .yields(), + pipe: sinon.stub() + } + + newPersistor = function(hasFile) { + return { + promises: { + sendFile: sinon.stub().resolves(), + sendStream: sinon.stub().resolves(), + getFileStream: hasFile + ? sinon.stub().resolves(fileStream) + : sinon.stub().rejects(notFoundError), + deleteDirectory: sinon.stub().resolves(), + getFileSize: hasFile + ? sinon.stub().resolves(size) + : sinon.stub().rejects(notFoundError), + deleteFile: sinon.stub().resolves(), + copyFile: hasFile + ? sinon.stub().resolves() + : sinon.stub().rejects(notFoundError), + checkIfFileExists: sinon.stub().resolves(hasFile), + directorySize: hasFile + ? sinon.stub().resolves(size) + : sinon.stub().rejects(notFoundError), + getFileMd5Hash: hasFile + ? sinon.stub().resolves(md5) + : sinon.stub().rejects(notFoundError) + } + } + } + Settings = { filestore: { fallback: { @@ -68,12 +86,20 @@ describe('MigrationPersistorTests', function() { warn: sinon.stub() } + Minipass = sinon.stub() + Minipass.prototype.on = sinon + .stub() + .withArgs('end') + .yields() + Minipass.prototype.pipe = sinon.stub() + MigrationPersistor = SandboxedModule.require(modulePath, { requires: { 'settings-sharelatex': Settings, './Errors': Errors, 'metrics-sharelatex': Metrics, - 'logger-sharelatex': Logger + 'logger-sharelatex': Logger, + minipass: Minipass }, globals: { console } }) @@ -144,7 +170,7 @@ describe('MigrationPersistorTests', function() { ).to.have.been.calledWithExactly(fallbackBucket, key, options) }) - it('should only create one stream', function() { + it('should create one read stream', function() { expect(fallbackPersistor.promises.getFileStream).to.have.been.calledOnce }) @@ -154,7 +180,10 @@ describe('MigrationPersistorTests', function() { }) describe('when the file should be copied to the primary', function() { - let primaryPersistor, fallbackPersistor, migrationPersistor + let primaryPersistor, + fallbackPersistor, + migrationPersistor, + returnedStream beforeEach(async function() { primaryPersistor = newPersistor(false) fallbackPersistor = newPersistor(true) @@ -163,18 +192,36 @@ describe('MigrationPersistorTests', function() { fallbackPersistor ) Settings.filestore.fallback.copyOnMiss = true - return migrationPersistor.promises.getFileStream(bucket, key, options) + returnedStream = await migrationPersistor.promises.getFileStream( + bucket, + key, + options + ) }) - it('should create two streams', function() { - expect(fallbackPersistor.promises.getFileStream).to.have.been - .calledTwice + it('should create one read stream', function() { + expect(fallbackPersistor.promises.getFileStream).to.have.been.calledOnce }) - it('should send one of the streams to the primary', function() { + it('should get the md5 hash from the source', function() { + expect( + fallbackPersistor.promises.getFileMd5Hash + ).to.have.been.calledWith(fallbackBucket, key) + }) + + it('should send a stream to the primary', function() { expect( primaryPersistor.promises.sendStream - ).to.have.been.calledWithExactly(bucket, key, fileStream) + ).to.have.been.calledWithExactly( + bucket, + key, + sinon.match.instanceOf(Minipass), + md5 + ) + }) + + it('should send a stream to the client', function() { + expect(returnedStream).to.be.an.instanceOf(Minipass) }) }) @@ -420,10 +467,16 @@ describe('MigrationPersistorTests', function() { ).not.to.have.been.calledWithExactly(fallbackBucket, key) }) + it('should get the md5 hash from the source', function() { + expect( + fallbackPersistor.promises.getFileMd5Hash + ).to.have.been.calledWith(fallbackBucket, key) + }) + it('should send the file to the primary', function() { expect( primaryPersistor.promises.sendStream - ).to.have.been.calledWithExactly(bucket, destKey, fileStream) + ).to.have.been.calledWithExactly(bucket, destKey, fileStream, md5) }) }) diff --git a/services/filestore/test/unit/js/S3PersistorTests.js b/services/filestore/test/unit/js/S3PersistorTests.js index 7a945b4d19..4f700c8797 100644 --- a/services/filestore/test/unit/js/S3PersistorTests.js +++ b/services/filestore/test/unit/js/S3PersistorTests.js @@ -26,8 +26,10 @@ describe('S3PersistorTests', function() { { Key: 'hippo', Size: 22 } ] const filesSize = 33 + const md5 = 'ffffffff00000000ffffffff00000000' let Metrics, + Logger, S3, Fs, Meter, @@ -40,7 +42,10 @@ describe('S3PersistorTests', function() { S3AccessDeniedError, FileNotFoundError, EmptyPromise, - settings + settings, + Minipass, + Hash, + crypto beforeEach(function() { settings = { @@ -100,7 +105,8 @@ describe('S3PersistorTests', function() { }), headObject: sinon.stub().returns({ promise: sinon.stub().resolves({ - ContentLength: objectSize + ContentLength: objectSize, + ETag: md5 }) }), listObjects: sinon.stub().returns({ @@ -108,21 +114,46 @@ describe('S3PersistorTests', function() { Contents: files }) }), - upload: sinon.stub().returns(EmptyPromise), + upload: sinon + .stub() + .returns({ promise: sinon.stub().resolves({ ETag: `"${md5}"` }) }), copyObject: sinon.stub().returns(EmptyPromise), deleteObject: sinon.stub().returns(EmptyPromise), deleteObjects: sinon.stub().returns(EmptyPromise) } S3 = sinon.stub().returns(S3Client) + Hash = { + end: sinon.stub(), + read: sinon.stub().returns(md5), + setEncoding: sinon.stub() + } + crypto = { + createHash: sinon.stub().returns(Hash) + } + + Minipass = sinon.stub() + Minipass.prototype.on = sinon + .stub() + .withArgs('end') + .yields() + Minipass.prototype.pipe = sinon.stub() + + Logger = { + warn: sinon.stub() + } + S3Persistor = SandboxedModule.require(modulePath, { requires: { 'aws-sdk/clients/s3': S3, 'settings-sharelatex': settings, + 'logger-sharelatex': Logger, './Errors': Errors, fs: Fs, 'stream-meter': Meter, - 'metrics-sharelatex': Metrics + 'metrics-sharelatex': Metrics, + minipass: Minipass, + crypto }, globals: { console } }) @@ -420,17 +451,49 @@ describe('S3PersistorTests', function() { expect(S3Client.upload).to.have.been.calledWith({ Bucket: bucket, Key: key, - Body: 'readStream' + Body: MeteredStream }) }) it('should meter the stream', function() { - expect(ReadStream.pipe).to.have.been.calledWith(MeteredStream) + expect(Minipass.prototype.pipe).to.have.been.calledWith(MeteredStream) }) it('should record an egress metric', function() { expect(Metrics.count).to.have.been.calledWith('s3.egress', objectSize) }) + + it('calculates the md5 hash of the file', function() { + expect(Minipass.prototype.pipe).to.have.been.calledWith(Hash) + }) + }) + + describe('when a hash is supploed', function() { + beforeEach(async function() { + return S3Persistor.promises.sendStream( + bucket, + key, + ReadStream, + 'aaaaaaaabbbbbbbbaaaaaaaabbbbbbbb' + ) + }) + + it('should not calculate the md5 hash of the file', function() { + expect(Minipass.prototype.pipe).not.to.have.been.calledWith(Hash) + }) + + it('sends the hash in base64', function() { + expect(S3Client.upload).to.have.been.calledWith({ + Bucket: bucket, + Key: key, + Body: MeteredStream, + ContentMD5: 'qqqqqru7u7uqqqqqu7u7uw==' + }) + }) + + it('does not fetch the md5 hash of the uploaded file', function() { + expect(S3Client.headObject).not.to.have.been.called + }) }) describe('when the upload fails', function() { @@ -466,7 +529,7 @@ describe('S3PersistorTests', function() { expect(S3Client.upload).to.have.been.calledWith({ Bucket: bucket, Key: key, - Body: 'readStream' + Body: MeteredStream }) }) })