diff --git a/services/document-updater/app.coffee b/services/document-updater/app.coffee index 41c7a01958..eafe03e402 100644 --- a/services/document-updater/app.coffee +++ b/services/document-updater/app.coffee @@ -4,7 +4,6 @@ Settings = require('settings-sharelatex') logger = require('logger-sharelatex') logger.initialize("documentupdater") RedisManager = require('./app/js/RedisManager') -UpdateManager = require('./app/js/UpdateManager') DispatchManager = require('./app/js/DispatchManager') Keys = require('./app/js/RedisKeyBuilder') Errors = require "./app/js/Errors" @@ -26,19 +25,8 @@ app.configure -> app.use express.bodyParser() app.use app.router -rclient.subscribe("pending-updates") -rclient.on "message", (channel, doc_key) -> - [project_id, doc_id] = Keys.splitProjectIdAndDocId(doc_key) - if !Settings.shuttingDown - UpdateManager.processOutstandingUpdatesWithLock project_id, doc_id, (error) -> - logger.error err: error, project_id: project_id, doc_id: doc_id, "error processing update" if error? - else - logger.log project_id: project_id, doc_id: doc_id, "ignoring incoming update" - DispatchManager.createAndStartDispatchers(Settings.dispatcherCount || 10) -UpdateManager.resumeProcessing() - app.get '/project/:project_id/doc/:doc_id', HttpController.getDoc app.post '/project/:project_id/doc/:doc_id', HttpController.setDoc app.post '/project/:project_id/doc/:doc_id/flush', HttpController.flushDocIfLoaded diff --git a/services/document-updater/app/coffee/RedisKeyBuilder.coffee b/services/document-updater/app/coffee/RedisKeyBuilder.coffee index 391fd0ace3..e55ae99bde 100644 --- a/services/document-updater/app/coffee/RedisKeyBuilder.coffee +++ b/services/document-updater/app/coffee/RedisKeyBuilder.coffee @@ -6,7 +6,6 @@ PENDINGUPDATESKEY = "PendingUpdates" DOCLINES = "doclines" DOCOPS = "DocOps" DOCVERSION = "DocVersion" -DOCIDSWITHPENDINGUPDATES = "DocsWithPendingUpdates" DOCSWITHHISTORYOPS = "DocsWithHistoryOps" UNCOMPRESSED_HISTORY_OPS = "UncompressedHistoryOps" @@ -20,7 +19,6 @@ module.exports = changeQue : (op)-> CHANGEQUE+":"+op.project_id docsInProject : (op)-> DOCSINPROJECT+":"+op.project_id pendingUpdates : (op)-> PENDINGUPDATESKEY+":"+op.doc_id - docsWithPendingUpdates : DOCIDSWITHPENDINGUPDATES combineProjectIdAndDocId: (project_id, doc_id) -> "#{project_id}:#{doc_id}" splitProjectIdAndDocId: (project_and_doc_id) -> project_and_doc_id.split(":") docsWithHistoryOps: (op) -> DOCSWITHHISTORYOPS + ":" + op.project_id diff --git a/services/document-updater/app/coffee/RedisManager.coffee b/services/document-updater/app/coffee/RedisManager.coffee index bcb5d28ae0..1215471ce1 100644 --- a/services/document-updater/app/coffee/RedisManager.coffee +++ b/services/document-updater/app/coffee/RedisManager.coffee @@ -88,21 +88,6 @@ module.exports = RedisManager = getUpdatesLength: (doc_id, callback)-> rclient.llen keys.pendingUpdates(doc_id:doc_id), callback - getDocsWithPendingUpdates: (callback = (error, docs) ->) -> - rclient.smembers keys.docsWithPendingUpdates, (error, doc_keys) -> - return callback(error) if error? - docs = doc_keys.map (doc_key) -> - [project_id, doc_id] = keys.splitProjectIdAndDocId(doc_key) - return { - doc_id: doc_id - project_id: project_id - } - callback null, docs - - clearDocFromPendingUpdatesSet: (project_id, doc_id, callback = (error) ->) -> - doc_key = keys.combineProjectIdAndDocId(project_id, doc_id) - rclient.srem keys.docsWithPendingUpdates, doc_key, callback - getPreviousDocOps: (doc_id, start, end, callback = (error, jsonOps) ->) -> rclient.llen keys.docOps(doc_id: doc_id), (error, length) -> return callback(error) if error? diff --git a/services/document-updater/app/coffee/UpdateManager.coffee b/services/document-updater/app/coffee/UpdateManager.coffee index 2ab74feda1..97c33e8b6f 100644 --- a/services/document-updater/app/coffee/UpdateManager.coffee +++ b/services/document-updater/app/coffee/UpdateManager.coffee @@ -7,26 +7,12 @@ logger = require('logger-sharelatex') Metrics = require "./Metrics" module.exports = UpdateManager = - resumeProcessing: (callback = (error) ->) -> - RedisManager.getDocsWithPendingUpdates (error, docs) => - return callback(error) if error? - jobs = for doc in (docs or []) - do (doc) => - (callback) => @processOutstandingUpdatesWithLock doc.project_id, doc.doc_id, callback - - async.parallelLimit jobs, 5, callback - - processOutstandingUpdates: (project_id, doc_id, _callback = (error) ->) -> + processOutstandingUpdates: (project_id, doc_id, callback = (error) ->) -> timer = new Metrics.Timer("updateManager.processOutstandingUpdates") - callback = (args...) -> + UpdateManager.fetchAndApplyUpdates project_id, doc_id, (error) -> timer.done() - _callback(args...) - - UpdateManager.fetchAndApplyUpdates project_id, doc_id, (error) => return callback(error) if error? - RedisManager.clearDocFromPendingUpdatesSet project_id, doc_id, (error) => - return callback(error) if error? - callback() + callback() processOutstandingUpdatesWithLock: (project_id, doc_id, callback = (error) ->) -> LockManager.tryLock doc_id, (error, gotLock, lockValue) => diff --git a/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee deleted file mode 100644 index c89842f7bc..0000000000 --- a/services/document-updater/test/unit/coffee/RedisManager/clearDocFromPendingUpdatesSetTests.coffee +++ /dev/null @@ -1,29 +0,0 @@ -sinon = require('sinon') -chai = require('chai') -should = chai.should() -modulePath = "../../../../app/js/RedisManager" -SandboxedModule = require('sandboxed-module') - -describe "RedisManager.clearDocFromPendingUpdatesSet", -> - beforeEach -> - @project_id = "project-id" - @doc_id = "document-id" - @callback = sinon.stub() - @RedisManager = SandboxedModule.require modulePath, requires: - "redis-sharelatex" : createClient: () => - @rclient ?= auth:-> # only assign one rclient - "logger-sharelatex": {} - "./ZipManager": {} - - @rclient.srem = sinon.stub().callsArg(2) - @RedisManager.clearDocFromPendingUpdatesSet(@project_id, @doc_id, @callback) - - it "should get the docs with pending updates", -> - @rclient.srem - .calledWith("DocsWithPendingUpdates", "#{@project_id}:#{@doc_id}") - .should.equal true - - it "should return the callback", -> - @callback.called.should.equal true - - diff --git a/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee b/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee deleted file mode 100644 index 45efa4c984..0000000000 --- a/services/document-updater/test/unit/coffee/RedisManager/getDocsWithPendingUpdatesTests.coffee +++ /dev/null @@ -1,35 +0,0 @@ -sinon = require('sinon') -chai = require('chai') -should = chai.should() -modulePath = "../../../../app/js/RedisManager" -SandboxedModule = require('sandboxed-module') - -describe "RedisManager.getDocsWithPendingUpdates", -> - beforeEach -> - @callback = sinon.stub() - @RedisManager = SandboxedModule.require modulePath, requires: - "./ZipManager": {} - "redis-sharelatex" : createClient: () => - @rclient ?= auth:-> - "logger-sharelatex": {} - - @docs = [{ - doc_id: "doc-id-1" - project_id: "project-id-1" - }, { - doc_id: "doc-id-2" - project_id: "project-id-2" - }] - @doc_keys = @docs.map (doc) -> "#{doc.project_id}:#{doc.doc_id}" - - @rclient.smembers = sinon.stub().callsArgWith(1, null, @doc_keys) - @RedisManager.getDocsWithPendingUpdates(@callback) - - it "should get the docs with pending updates", -> - @rclient.smembers - .calledWith("DocsWithPendingUpdates") - .should.equal true - - it "should return the docs with pending updates", -> - @callback.calledWith(null, @docs).should.equal true - diff --git a/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee b/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee index 7b41647f7b..249740973f 100644 --- a/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee +++ b/services/document-updater/test/unit/coffee/UpdateManager/ApplyingUpdates.coffee @@ -18,46 +18,14 @@ describe "UpdateManager", -> Timer: class Timer done: sinon.stub() - describe "resumeProcessing", -> - beforeEach (done) -> - @docs = [{ - doc_id: "doc-1" - project_id: "project-1" - }, { - doc_id: "doc-2" - project_id: "project-2" - }, { - doc_id: "doc-3" - project_id: "project-3" - }] - @RedisManager.getDocsWithPendingUpdates = sinon.stub().callsArgWith(0, null, @docs) - @UpdateManager.processOutstandingUpdatesWithLock = sinon.stub().callsArg(2) - @UpdateManager.resumeProcessing(done) - - it "should the docs that haven't been processed yet", -> - @RedisManager.getDocsWithPendingUpdates - .called.should.equal true - - it "should call processOutstandingUpdatesWithLock for each doc", -> - for doc in @docs - @UpdateManager.processOutstandingUpdatesWithLock - .calledWith(doc.project_id, doc.doc_id) - .should.equal true - describe "processOutstandingUpdates", -> beforeEach -> @UpdateManager.fetchAndApplyUpdates = sinon.stub().callsArg(2) - @RedisManager.clearDocFromPendingUpdatesSet = sinon.stub().callsArg(2) @UpdateManager.processOutstandingUpdates @project_id, @doc_id, @callback it "should apply the updates", -> @UpdateManager.fetchAndApplyUpdates.calledWith(@project_id, @doc_id).should.equal true - it "should clear the doc from the process pending set", -> - @RedisManager.clearDocFromPendingUpdatesSet - .calledWith(@project_id, @doc_id) - .should.equal true - it "should call the callback", -> @callback.called.should.equal true