[history-ot] initial implementation of using doc-level history-ot (#25054)

* [history-v1-ot] initial implementation of using doc-level history-v1-ot

* [web] fix advancing of the otMigrationStage

Use 'nextStage' for the user provided, desired stage when advancing.

Co-authored-by: Brian Gough <brian.gough@overleaf.com>

* [document-updater] document size check in editor-core

* [history-ot] rename history-v1-ot to history-ot and add types

* [history-ot] apply review feedback

- remove extra !!
- merge variable assignment when processing diff-match-match output
- add helper function for getting docstore lines view of StringFileData

Co-authored-by: Alf Eaton <alf.eaton@overleaf.com>

* Revert "[document-updater] add safe rollback point for history-ot (#25283)"

This reverts commit d7230dd14a379a27d2c6ab03a006463a18979d06

Signed-off-by: Jakob Ackermann <jakob.ackermann@overleaf.com>

---------

Signed-off-by: Jakob Ackermann <jakob.ackermann@overleaf.com>
Co-authored-by: Brian Gough <brian.gough@overleaf.com>
Co-authored-by: Alf Eaton <alf.eaton@overleaf.com>
GitOrigin-RevId: 89c497782adb0427635d50d02263d6f535b12481
This commit is contained in:
Jakob Ackermann
2025-05-07 12:53:12 +02:00
committed by Copybot
parent 4d93187e58
commit e8b5ee2ff9
48 changed files with 1828 additions and 223 deletions
+6
View File
@@ -18,6 +18,7 @@ const MoveFileOperation = require('./lib/operation/move_file_operation')
const SetCommentStateOperation = require('./lib/operation/set_comment_state_operation')
const EditFileOperation = require('./lib/operation/edit_file_operation')
const EditNoOperation = require('./lib/operation/edit_no_operation')
const EditOperationTransformer = require('./lib/operation/edit_operation_transformer')
const SetFileMetadataOperation = require('./lib/operation/set_file_metadata_operation')
const NoOperation = require('./lib/operation/no_operation')
const Operation = require('./lib/operation')
@@ -43,6 +44,8 @@ const TrackingProps = require('./lib/file_data/tracking_props')
const Range = require('./lib/range')
const CommentList = require('./lib/file_data/comment_list')
const LazyStringFileData = require('./lib/file_data/lazy_string_file_data')
const StringFileData = require('./lib/file_data/string_file_data')
const EditOperationBuilder = require('./lib/operation/edit_operation_builder')
exports.AddCommentOperation = AddCommentOperation
exports.Author = Author
@@ -58,6 +61,7 @@ exports.DeleteCommentOperation = DeleteCommentOperation
exports.File = File
exports.FileMap = FileMap
exports.LazyStringFileData = LazyStringFileData
exports.StringFileData = StringFileData
exports.History = History
exports.Label = Label
exports.AddFileOperation = AddFileOperation
@@ -65,6 +69,8 @@ exports.MoveFileOperation = MoveFileOperation
exports.SetCommentStateOperation = SetCommentStateOperation
exports.EditFileOperation = EditFileOperation
exports.EditNoOperation = EditNoOperation
exports.EditOperationBuilder = EditOperationBuilder
exports.EditOperationTransformer = EditOperationTransformer
exports.SetFileMetadataOperation = SetFileMetadataOperation
exports.NoOperation = NoOperation
exports.Operation = Operation
@@ -88,6 +88,14 @@ class StringFileData extends FileData {
return content
}
/**
* Return docstore view of a doc: each line separated
* @return {string[]}
*/
getLines() {
return this.getContent({ filterTrackedDeletes: true }).split('\n')
}
/** @inheritdoc */
getByteLength() {
return Buffer.byteLength(this.content)
@@ -36,6 +36,20 @@ class EditOperationBuilder {
}
throw new Error('Unsupported operation in EditOperationBuilder.fromJSON')
}
/**
* @param {unknown} raw
* @return {raw is RawEditOperation}
*/
static isValid(raw) {
return (
isTextOperation(raw) ||
isRawAddCommentOperation(raw) ||
isRawDeleteCommentOperation(raw) ||
isRawSetCommentStateOperation(raw) ||
isRawEditNoOperation(raw)
)
}
}
/**
+1
View File
@@ -42539,6 +42539,7 @@
"lodash": "^4.17.21",
"minimist": "^1.2.8",
"mongodb-legacy": "6.1.3",
"overleaf-editor-core": "*",
"request": "^2.88.2",
"requestretry": "^7.1.0"
},
-2
View File
@@ -212,8 +212,6 @@ app.use((error, req, res, next) => {
return res.status(422).json(error.info)
} else if (error instanceof Errors.FileTooLargeError) {
return res.sendStatus(413)
} else if (error instanceof Errors.ProjectMigratedToHistoryOTError) {
return res.status(422).send(error.message)
} else if (error.statusCode === 413) {
return res.status(413).send('request entity too large')
} else {
+22 -2
View File
@@ -1,4 +1,5 @@
const DMP = require('diff-match-patch')
const { TextOperation } = require('overleaf-editor-core')
const dmp = new DMP()
// Do not attempt to produce a diff for more than 100ms
@@ -16,8 +17,7 @@ module.exports = {
const ops = []
let position = 0
for (const diff of diffs) {
const type = diff[0]
const content = diff[1]
const [type, content] = diff
if (type === this.ADDED) {
ops.push({
i: content,
@@ -37,4 +37,24 @@ module.exports = {
}
return ops
},
diffAsHistoryV1EditOperation(before, after) {
const diffs = dmp.diff_main(before, after)
dmp.diff_cleanupSemantic(diffs)
const op = new TextOperation()
for (const diff of diffs) {
const [type, content] = diff
if (type === this.ADDED) {
op.insert(content)
} else if (type === this.REMOVED) {
op.remove(content.length)
} else if (type === this.UNCHANGED) {
op.retain(content.length)
} else {
throw new Error('Unknown type')
}
}
return op
},
}
@@ -11,10 +11,16 @@ const RangesManager = require('./RangesManager')
const { extractOriginOrSource } = require('./Utils')
const { getTotalSizeOfLines } = require('./Limits')
const Settings = require('@overleaf/settings')
const { StringFileData } = require('overleaf-editor-core')
const MAX_UNFLUSHED_AGE = 300 * 1000 // 5 mins, document should be flushed to mongo this time after a change
const DocumentManager = {
/**
* @param {string} projectId
* @param {string} docId
* @return {Promise<{lines: (string[] | StringFileRawData), version: number, ranges: Ranges, resolvedCommentIds: any[], pathname: string, projectHistoryId: string, unflushedTime: any, alreadyLoaded: boolean, historyRangesSupport: boolean, type: OTType}>}
*/
async getDoc(projectId, docId) {
const {
lines,
@@ -75,6 +81,7 @@ const DocumentManager = {
unflushedTime: null,
alreadyLoaded: false,
historyRangesSupport,
type: Array.isArray(lines) ? 'sharejs-text-ot' : 'history-ot',
}
} else {
return {
@@ -87,16 +94,25 @@ const DocumentManager = {
unflushedTime,
alreadyLoaded: true,
historyRangesSupport,
type: Array.isArray(lines) ? 'sharejs-text-ot' : 'history-ot',
}
}
},
async getDocAndRecentOps(projectId, docId, fromVersion) {
const { lines, version, ranges, pathname, projectHistoryId } =
const { lines, version, ranges, pathname, projectHistoryId, type } =
await DocumentManager.getDoc(projectId, docId)
if (fromVersion === -1) {
return { lines, version, ops: [], ranges, pathname, projectHistoryId }
return {
lines,
version,
ops: [],
ranges,
pathname,
projectHistoryId,
type,
}
} else {
const ops = await RedisManager.promises.getPreviousDocOps(
docId,
@@ -110,15 +126,21 @@ const DocumentManager = {
ranges,
pathname,
projectHistoryId,
type,
}
}
},
async appendToDoc(projectId, docId, linesToAppend, originOrSource, userId) {
const { lines: currentLines } = await DocumentManager.getDoc(
let { lines: currentLines, type } = await DocumentManager.getDoc(
projectId,
docId
)
if (type === 'history-ot') {
const file = StringFileData.fromRaw(currentLines)
// TODO(24596): tc support for history-ot
currentLines = file.getLines()
}
const currentLineSize = getTotalSizeOfLines(currentLines)
const addedSize = getTotalSizeOfLines(linesToAppend)
const newlineSize = '\n'.length
@@ -153,22 +175,42 @@ const DocumentManager = {
throw new Error('No lines were provided to setDoc')
}
// Circular dependencies. Import at runtime.
const HistoryV1OTUpdateManager = require('./HistoryV1OTUpdateManager')
const UpdateManager = require('./UpdateManager')
const {
lines: oldLines,
version,
alreadyLoaded,
type,
} = await DocumentManager.getDoc(projectId, docId)
logger.debug(
{ docId, projectId, oldLines, newLines },
'setting a document via http'
)
const op = DiffCodec.diffAsShareJsOp(oldLines, newLines)
if (undoing) {
for (const o of op || []) {
o.u = true
} // Turn on undo flag for each op for track changes
let op
if (type === 'history-ot') {
const file = StringFileData.fromRaw(oldLines)
const operation = DiffCodec.diffAsHistoryV1EditOperation(
// TODO(24596): tc support for history-ot
file.getContent({ filterTrackedDeletes: true }),
newLines.join('\n')
)
if (operation.isNoop()) {
op = []
} else {
op = [operation.toJSON()]
}
} else {
op = DiffCodec.diffAsShareJsOp(oldLines, newLines)
if (undoing) {
for (const o of op || []) {
o.u = true
} // Turn on undo flag for each op for track changes
}
}
const { origin, source } = extractOriginOrSource(originOrSource)
@@ -203,7 +245,11 @@ const DocumentManager = {
// this update, otherwise the doc would never be
// removed from redis.
if (op.length > 0) {
await UpdateManager.promises.applyUpdate(projectId, docId, update)
if (type === 'history-ot') {
await HistoryV1OTUpdateManager.applyUpdate(projectId, docId, update)
} else {
await UpdateManager.promises.applyUpdate(projectId, docId, update)
}
}
// If the document was loaded already, then someone has it open
@@ -224,7 +270,7 @@ const DocumentManager = {
},
async flushDocIfLoaded(projectId, docId) {
const {
let {
lines,
version,
ranges,
@@ -245,6 +291,11 @@ const DocumentManager = {
logger.debug({ projectId, docId, version }, 'flushing doc')
Metrics.inc('flush-doc-if-loaded', 1, { status: 'modified' })
if (!Array.isArray(lines)) {
const file = StringFileData.fromRaw(lines)
// TODO(24596): tc support for history-ot
lines = file.getLines()
}
const result = await PersistenceManager.promises.setDoc(
projectId,
docId,
@@ -294,6 +345,7 @@ const DocumentManager = {
throw new Errors.NotFoundError(`document not found: ${docId}`)
}
// TODO(24596): tc support for history-ot
const newRanges = RangesManager.acceptChanges(
projectId,
docId,
@@ -360,6 +412,7 @@ const DocumentManager = {
},
async getComment(projectId, docId, commentId) {
// TODO(24596): tc support for history-ot
const { ranges } = await DocumentManager.getDoc(projectId, docId)
const comment = ranges?.comments?.find(comment => comment.id === commentId)
@@ -381,6 +434,7 @@ const DocumentManager = {
throw new Errors.NotFoundError(`document not found: ${docId}`)
}
// TODO(24596): tc support for history-ot
const newRanges = RangesManager.deleteComment(commentId, ranges)
await RedisManager.promises.updateDocument(
@@ -420,7 +474,7 @@ const DocumentManager = {
},
async getDocAndFlushIfOld(projectId, docId) {
const { lines, version, unflushedTime, alreadyLoaded } =
let { lines, version, unflushedTime, alreadyLoaded } =
await DocumentManager.getDoc(projectId, docId)
// if doc was already loaded see if it needs to be flushed
@@ -432,6 +486,12 @@ const DocumentManager = {
await DocumentManager.flushDocIfLoaded(projectId, docId)
}
if (!Array.isArray(lines)) {
const file = StringFileData.fromRaw(lines)
// TODO(24596): tc support for history-ot
lines = file.getLines()
}
return { lines, version }
},
@@ -476,6 +536,11 @@ const DocumentManager = {
if (opts.historyRangesMigration) {
historyRangesSupport = opts.historyRangesMigration === 'forwards'
}
if (!Array.isArray(lines)) {
const file = StringFileData.fromRaw(lines)
// TODO(24596): tc support for history-ot
lines = file.getLines()
}
await ProjectHistoryRedisManager.promises.queueResyncDocContent(
projectId,
@@ -684,6 +749,7 @@ module.exports = {
'ranges',
'pathname',
'projectHistoryId',
'type',
],
getDocAndRecentOpsWithLock: [
'lines',
@@ -692,6 +758,7 @@ module.exports = {
'ranges',
'pathname',
'projectHistoryId',
'type',
],
getCommentWithLock: ['comment'],
},
+10 -2
View File
@@ -5,7 +5,15 @@ class OpRangeNotAvailableError extends OError {}
class ProjectStateChangedError extends OError {}
class DeleteMismatchError extends OError {}
class FileTooLargeError extends OError {}
class ProjectMigratedToHistoryOTError extends OError {}
class OTTypeMismatchError extends OError {
/**
* @param {OTType} got
* @param {OTType} want
*/
constructor(got, want) {
super('ot type mismatch', { got, want })
}
}
module.exports = {
NotFoundError,
@@ -13,5 +21,5 @@ module.exports = {
ProjectStateChangedError,
DeleteMismatchError,
FileTooLargeError,
ProjectMigratedToHistoryOTError,
OTTypeMismatchError,
}
@@ -0,0 +1,158 @@
// @ts-check
const Profiler = require('./Profiler')
const DocumentManager = require('./DocumentManager')
const Errors = require('./Errors')
const RedisManager = require('./RedisManager')
const {
EditOperationBuilder,
StringFileData,
EditOperationTransformer,
} = require('overleaf-editor-core')
const Metrics = require('./Metrics')
const ProjectHistoryRedisManager = require('./ProjectHistoryRedisManager')
const HistoryManager = require('./HistoryManager')
const RealTimeRedisManager = require('./RealTimeRedisManager')
/**
* @typedef {import("./types").Update} Update
* @typedef {import("./types").HistoryV1OTEditOperationUpdate} HistoryV1OTEditOperationUpdate
*/
/**
* @param {Update} update
* @return {update is HistoryV1OTEditOperationUpdate}
*/
function isHistoryOTEditOperationUpdate(update) {
return (
update &&
'doc' in update &&
'op' in update &&
'v' in update &&
Array.isArray(update.op) &&
EditOperationBuilder.isValid(update.op[0])
)
}
/**
* Try to apply an update to the given document
*
* @param {string} projectId
* @param {string} docId
* @param {HistoryV1OTEditOperationUpdate} update
* @param {Profiler} profiler
*/
async function tryApplyUpdate(projectId, docId, update, profiler) {
let { lines, version, pathname, type } =
await DocumentManager.promises.getDoc(projectId, docId)
profiler.log('getDoc')
if (lines == null || version == null) {
throw new Errors.NotFoundError(`document not found: ${docId}`)
}
if (type !== 'history-ot') {
throw new Errors.OTTypeMismatchError(type, 'history-ot')
}
let op = EditOperationBuilder.fromJSON(update.op[0])
if (version !== update.v) {
const transformUpdates = await RedisManager.promises.getPreviousDocOps(
docId,
update.v,
version
)
for (const transformUpdate of transformUpdates) {
if (!isHistoryOTEditOperationUpdate(transformUpdate)) {
throw new Errors.OTTypeMismatchError('sharejs-text-ot', 'history-ot')
}
if (
transformUpdate.meta.source &&
update.dupIfSource?.includes(transformUpdate.meta.source)
) {
update.dup = true
break
}
const other = EditOperationBuilder.fromJSON(transformUpdate.op[0])
op = EditOperationTransformer.transform(op, other)[0]
}
update.op = [op.toJSON()]
}
if (!update.dup) {
const file = StringFileData.fromRaw(lines)
file.edit(op)
version += 1
update.meta.ts = Date.now()
await RedisManager.promises.updateDocument(
projectId,
docId,
file.toRaw(),
version,
[update],
{},
update.meta
)
Metrics.inc('history-queue', 1, { status: 'project-history' })
try {
const projectOpsLength =
await ProjectHistoryRedisManager.promises.queueOps(projectId, [
JSON.stringify({
...update,
meta: {
...update.meta,
pathname,
},
}),
])
HistoryManager.recordAndFlushHistoryOps(
projectId,
[update],
projectOpsLength
)
profiler.log('recordAndFlushHistoryOps')
} catch (err) {
// The full project history can re-sync a project in case
// updates went missing.
// Just record the error here and acknowledge the write-op.
Metrics.inc('history-queue-error')
}
}
RealTimeRedisManager.sendData({
project_id: projectId,
doc_id: docId,
op: update,
})
}
/**
* Apply an update to the given document
*
* @param {string} projectId
* @param {string} docId
* @param {HistoryV1OTEditOperationUpdate} update
*/
async function applyUpdate(projectId, docId, update) {
const profiler = new Profiler('applyUpdate', {
project_id: projectId,
doc_id: docId,
type: 'history-ot',
})
try {
await tryApplyUpdate(projectId, docId, update, profiler)
} catch (error) {
RealTimeRedisManager.sendData({
project_id: projectId,
doc_id: docId,
error: error instanceof Error ? error.message : error,
})
profiler.log('sendData')
throw error
} finally {
profiler.end()
}
}
module.exports = { isHistoryOTEditOperationUpdate, applyUpdate }
@@ -9,6 +9,7 @@ const Metrics = require('./Metrics')
const DeleteQueueManager = require('./DeleteQueueManager')
const { getTotalSizeOfLines } = require('./Limits')
const async = require('async')
const { StringFileData } = require('overleaf-editor-core')
function getDoc(req, res, next) {
let fromVersion
@@ -27,7 +28,7 @@ function getDoc(req, res, next) {
projectId,
docId,
fromVersion,
(error, lines, version, ops, ranges, pathname) => {
(error, lines, version, ops, ranges, pathname, _projectHistoryId, type) => {
timer.done()
if (error) {
return next(error)
@@ -36,6 +37,11 @@ function getDoc(req, res, next) {
if (lines == null || version == null) {
return next(new Errors.NotFoundError('document not found'))
}
if (!Array.isArray(lines) && req.query.historyV1OTSupport !== 'true') {
const file = StringFileData.fromRaw(lines)
// TODO(24596): tc support for history-ot
lines = file.getLines()
}
res.json({
id: docId,
lines,
@@ -44,6 +50,7 @@ function getDoc(req, res, next) {
ranges,
pathname,
ttlInS: RedisManager.DOC_OPS_TTL,
type,
})
}
)
@@ -84,6 +91,11 @@ function peekDoc(req, res, next) {
if (lines == null || version == null) {
return next(new Errors.NotFoundError('document not found'))
}
if (!Array.isArray(lines) && req.query.historyV1OTSupport !== 'true') {
const file = StringFileData.fromRaw(lines)
// TODO(24596): tc support for history-ot
lines = file.getLines()
}
res.json({ id: docId, lines, version })
})
}
@@ -95,6 +95,13 @@ function getDoc(projectId, docId, options = {}, _callback) {
status: body.pathname === '' ? 'zero-length' : 'undefined',
})
}
if (body.otMigrationStage > 0) {
// Use history-ot
body.lines = { content: body.lines.join('\n') }
body.ranges = {}
}
callback(
null,
body.lines,
+43 -59
View File
@@ -1,68 +1,52 @@
/* eslint-disable
no-unused-vars,
*/
// TODO: This file was created by bulk-decaffeinate.
// Fix any style issues and re-enable lint.
/*
* decaffeinate suggestions:
* DS206: Consider reworking classes to avoid initClass
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
*/
let Profiler
const Settings = require('@overleaf/settings')
const logger = require('@overleaf/logger')
const deltaMs = function (ta, tb) {
function deltaMs(ta, tb) {
const nanoSeconds = (ta[0] - tb[0]) * 1e9 + (ta[1] - tb[1])
const milliSeconds = Math.floor(nanoSeconds * 1e-6)
return milliSeconds
}
module.exports = Profiler = (function () {
Profiler = class Profiler {
static initClass() {
this.prototype.LOG_CUTOFF_TIME = 15 * 1000
this.prototype.LOG_SYNC_CUTOFF_TIME = 1000
}
class Profiler {
LOG_CUTOFF_TIME = 15 * 1000
LOG_SYNC_CUTOFF_TIME = 1000
constructor(name, args) {
this.name = name
this.args = args
this.t0 = this.t = process.hrtime()
this.start = new Date()
this.updateTimes = []
this.totalSyncTime = 0
}
log(label, options = {}) {
const t1 = process.hrtime()
const dtMilliSec = deltaMs(t1, this.t)
this.t = t1
this.totalSyncTime += options.sync ? dtMilliSec : 0
this.updateTimes.push([label, dtMilliSec]) // timings in ms
return this // make it chainable
}
end(message) {
const totalTime = deltaMs(this.t, this.t0)
const exceedsCutoff = totalTime > this.LOG_CUTOFF_TIME
const exceedsSyncCutoff = this.totalSyncTime > this.LOG_SYNC_CUTOFF_TIME
if (exceedsCutoff || exceedsSyncCutoff) {
// log anything greater than cutoffs
const args = {}
for (const k in this.args) {
const v = this.args[k]
args[k] = v
}
args.updateTimes = this.updateTimes
args.start = this.start
args.end = new Date()
args.status = { exceedsCutoff, exceedsSyncCutoff }
logger.warn(args, this.name)
}
return totalTime
}
constructor(name, args) {
this.name = name
this.args = args
this.t0 = this.t = process.hrtime()
this.start = new Date()
this.updateTimes = []
this.totalSyncTime = 0
}
Profiler.initClass()
return Profiler
})()
log(label, options = {}) {
const t1 = process.hrtime()
const dtMilliSec = deltaMs(t1, this.t)
this.t = t1
this.totalSyncTime += options.sync ? dtMilliSec : 0
this.updateTimes.push([label, dtMilliSec]) // timings in ms
return this // make it chainable
}
end() {
const totalTime = deltaMs(this.t, this.t0)
const exceedsCutoff = totalTime > this.LOG_CUTOFF_TIME
const exceedsSyncCutoff = this.totalSyncTime > this.LOG_SYNC_CUTOFF_TIME
if (exceedsCutoff || exceedsSyncCutoff) {
// log anything greater than cutoffs
const args = {}
for (const k in this.args) {
const v = this.args[k]
args[k] = v
}
args.updateTimes = this.updateTimes
args.start = this.start
args.end = new Date()
args.status = { exceedsCutoff, exceedsSyncCutoff }
logger.warn(args, this.name)
}
return totalTime
}
}
module.exports = Profiler
@@ -48,6 +48,7 @@ const RedisManager = {
timer.done()
_callback(error)
}
const shareJSTextOT = Array.isArray(docLines)
const docLinesArray = docLines
docLines = JSON.stringify(docLines)
if (docLines.indexOf('\u0000') !== -1) {
@@ -60,7 +61,10 @@ const RedisManager = {
// Do an optimised size check on the docLines using the serialised
// length as an upper bound
const sizeBound = docLines.length
if (docIsTooLarge(sizeBound, docLinesArray, Settings.max_doc_length)) {
if (
shareJSTextOT && // editor-core has a size check in TextOperation.apply and TextOperation.applyToLength.
docIsTooLarge(sizeBound, docLinesArray, Settings.max_doc_length)
) {
const docSize = docLines.length
const err = new Error('blocking doc insert into redis: doc is too large')
logger.error({ projectId, docId, err, docSize }, err.message)
@@ -324,13 +328,6 @@ const RedisManager = {
} catch (e) {
return callback(e)
}
if (docLines != null && !Array.isArray(docLines)) {
return callback(
new Errors.ProjectMigratedToHistoryOTError(
'refusing to process doc that was migrated to history-ot'
)
)
}
version = parseInt(version || 0, 10)
// check doc is in requested project
@@ -468,6 +465,7 @@ const RedisManager = {
if (appliedOps == null) {
appliedOps = []
}
const shareJSTextOT = Array.isArray(docLines)
RedisManager.getDocVersion(docId, (error, currentVersion) => {
if (error) {
return callback(error)
@@ -507,7 +505,10 @@ const RedisManager = {
// Do an optimised size check on the docLines using the serialised
// length as an upper bound
const sizeBound = newDocLines.length
if (docIsTooLarge(sizeBound, docLines, Settings.max_doc_length)) {
if (
shareJSTextOT && // editor-core has a size check in TextOperation.apply and TextOperation.applyToLength.
docIsTooLarge(sizeBound, docLines, Settings.max_doc_length)
) {
const err = new Error('blocking doc update: doc is too large')
const docSize = newDocLines.length
logger.error({ projectId, docId, err, docSize }, err.message)
@@ -15,9 +15,10 @@ const RangesManager = require('./RangesManager')
const SnapshotManager = require('./SnapshotManager')
const Profiler = require('./Profiler')
const { isInsert, isDelete, getDocLength, computeDocHash } = require('./Utils')
const HistoryV1OTUpdateManager = require('./HistoryV1OTUpdateManager')
/**
* @import { DeleteOp, InsertOp, Op, Ranges, Update, HistoryUpdate } from "./types"
* @import { Ranges, Update, HistoryUpdate } from "./types"
*/
const UpdateManager = {
@@ -80,7 +81,11 @@ const UpdateManager = {
profile.log('getPendingUpdatesForDoc')
for (const update of updates) {
await UpdateManager.applyUpdate(projectId, docId, update)
if (HistoryV1OTUpdateManager.isHistoryOTEditOperationUpdate(update)) {
await HistoryV1OTUpdateManager.applyUpdate(projectId, docId, update)
} else {
await UpdateManager.applyUpdate(projectId, docId, update)
}
profile.log('applyUpdate')
}
profile.log('async done').end()
@@ -110,12 +115,16 @@ const UpdateManager = {
pathname,
projectHistoryId,
historyRangesSupport,
type,
} = await DocumentManager.promises.getDoc(projectId, docId)
profile.log('getDoc')
if (lines == null || version == null) {
throw new Errors.NotFoundError(`document not found: ${docId}`)
}
if (type !== 'sharejs-text-ot') {
throw new Errors.OTTypeMismatchError(type, 'sharejs-text-ot')
}
const previousVersion = version
const incomingUpdateVersion = update.v
+10
View File
@@ -1,12 +1,17 @@
import {
TrackingPropsRawData,
ClearTrackingPropsRawData,
RawEditOperation,
} from 'overleaf-editor-core/lib/types'
export type OTType = 'sharejs-text-ot' | 'history-ot'
/**
* An update coming from the editor
*/
export type Update = {
dup?: boolean
dupIfSource?: string[]
doc: string
op: Op[]
v: number
@@ -18,6 +23,11 @@ export type Update = {
projectHistoryId?: string
}
export type HistoryV1OTEditOperationUpdate = Omit<Update, 'op'> & {
op: RawEditOperation[]
meta: Update['meta'] & { source: string }
}
export type Op = InsertOp | DeleteOp | CommentOp | RetainOp
export type InsertOp = {
+1
View File
@@ -34,6 +34,7 @@
"lodash": "^4.17.21",
"minimist": "^1.2.8",
"mongodb-legacy": "6.1.3",
"overleaf-editor-core": "*",
"request": "^2.88.2",
"requestretry": "^7.1.0"
},
@@ -31,6 +31,12 @@ describe('Applying updates to a doc', function () {
op: [this.op],
v: this.version,
}
this.historyV1OTUpdate = {
doc: this.doc_id,
op: [{ textOperation: [4, 'one and a half\n', 9] }],
v: this.version,
meta: { source: 'random-publicId' },
}
this.result = ['one', 'one and a half', 'two', 'three']
DocUpdaterApp.ensureRunning(done)
})
@@ -284,6 +290,260 @@ describe('Applying updates to a doc', function () {
})
})
describe('when the document is not loaded (history-ot)', function () {
beforeEach(function (done) {
this.startTime = Date.now()
MockWebApi.insertDoc(this.project_id, this.doc_id, {
lines: this.lines,
version: this.version,
otMigrationStage: 1,
})
DocUpdaterClient.sendUpdate(
this.project_id,
this.doc_id,
this.historyV1OTUpdate,
error => {
if (error != null) {
throw error
}
setTimeout(() => {
rclientProjectHistory.get(
ProjectHistoryKeys.projectHistoryFirstOpTimestamp({
project_id: this.project_id,
}),
(error, result) => {
if (error != null) {
throw error
}
result = parseInt(result, 10)
this.firstOpTimestamp = result
done()
}
)
}, 200)
}
)
})
it('should load the document from the web API', function () {
MockWebApi.getDocument
.calledWith(this.project_id, this.doc_id)
.should.equal(true)
})
it('should update the doc', function (done) {
DocUpdaterClient.getDoc(
this.project_id,
this.doc_id,
(error, res, doc) => {
if (error) done(error)
doc.lines.should.deep.equal(this.result)
done()
}
)
})
it('should push the applied updates to the project history changes api', function (done) {
rclientProjectHistory.lrange(
ProjectHistoryKeys.projectHistoryOps({ project_id: this.project_id }),
0,
-1,
(error, updates) => {
if (error != null) {
throw error
}
JSON.parse(updates[0]).op.should.deep.equal(this.historyV1OTUpdate.op)
JSON.parse(updates[0]).meta.pathname.should.equal('/a/b/c.tex')
done()
}
)
})
it('should set the first op timestamp', function () {
this.firstOpTimestamp.should.be.within(this.startTime, Date.now())
})
it('should yield last updated time', function (done) {
DocUpdaterClient.getProjectLastUpdatedAt(
this.project_id,
(error, res, body) => {
if (error != null) {
throw error
}
res.statusCode.should.equal(200)
body.lastUpdatedAt.should.be.within(this.startTime, Date.now())
done()
}
)
})
it('should yield no last updated time for another project', function (done) {
DocUpdaterClient.getProjectLastUpdatedAt(
DocUpdaterClient.randomId(),
(error, res, body) => {
if (error != null) {
throw error
}
res.statusCode.should.equal(200)
body.should.deep.equal({})
done()
}
)
})
describe('when sending another update', function () {
beforeEach(function (done) {
this.timeout(10000)
this.second_update = Object.assign({}, this.historyV1OTUpdate)
this.second_update.op = [
{
textOperation: [4, 'one and a half\n', 24],
},
]
this.second_update.v = this.version + 1
this.secondStartTime = Date.now()
DocUpdaterClient.sendUpdate(
this.project_id,
this.doc_id,
this.second_update,
error => {
if (error != null) {
throw error
}
setTimeout(done, 200)
}
)
})
it('should update the doc', function (done) {
DocUpdaterClient.getDoc(
this.project_id,
this.doc_id,
(error, res, doc) => {
if (error) done(error)
doc.lines.should.deep.equal([
'one',
'one and a half',
'one and a half',
'two',
'three',
])
done()
}
)
})
it('should not change the first op timestamp', function (done) {
rclientProjectHistory.get(
ProjectHistoryKeys.projectHistoryFirstOpTimestamp({
project_id: this.project_id,
}),
(error, result) => {
if (error != null) {
throw error
}
result = parseInt(result, 10)
result.should.equal(this.firstOpTimestamp)
done()
}
)
})
it('should yield last updated time', function (done) {
DocUpdaterClient.getProjectLastUpdatedAt(
this.project_id,
(error, res, body) => {
if (error != null) {
throw error
}
res.statusCode.should.equal(200)
body.lastUpdatedAt.should.be.within(
this.secondStartTime,
Date.now()
)
done()
}
)
})
})
describe('when another client is sending a concurrent update', function () {
beforeEach(function (done) {
this.timeout(10000)
this.otherUpdate = {
doc: this.doc_id,
op: [{ textOperation: [8, 'two and a half\n', 5] }],
v: this.version,
meta: { source: 'other-random-publicId' },
}
this.secondStartTime = Date.now()
DocUpdaterClient.sendUpdate(
this.project_id,
this.doc_id,
this.otherUpdate,
error => {
if (error != null) {
throw error
}
setTimeout(done, 200)
}
)
})
it('should update the doc', function (done) {
DocUpdaterClient.getDoc(
this.project_id,
this.doc_id,
(error, res, doc) => {
if (error) done(error)
doc.lines.should.deep.equal([
'one',
'one and a half',
'two',
'two and a half',
'three',
])
done()
}
)
})
it('should not change the first op timestamp', function (done) {
rclientProjectHistory.get(
ProjectHistoryKeys.projectHistoryFirstOpTimestamp({
project_id: this.project_id,
}),
(error, result) => {
if (error != null) {
throw error
}
result = parseInt(result, 10)
result.should.equal(this.firstOpTimestamp)
done()
}
)
})
it('should yield last updated time', function (done) {
DocUpdaterClient.getProjectLastUpdatedAt(
this.project_id,
(error, res, body) => {
if (error != null) {
throw error
}
res.statusCode.should.equal(200)
body.lastUpdatedAt.should.be.within(
this.secondStartTime,
Date.now()
)
done()
}
)
})
})
})
describe('when the document is loaded', function () {
beforeEach(function (done) {
MockWebApi.insertDoc(this.project_id, this.doc_id, {
@@ -390,6 +650,58 @@ describe('Applying updates to a doc', function () {
})
})
describe('when the document is loaded (history-ot)', function () {
beforeEach(function (done) {
MockWebApi.insertDoc(this.project_id, this.doc_id, {
lines: this.lines,
version: this.version,
otMigrationStage: 1,
})
DocUpdaterClient.preloadDoc(this.project_id, this.doc_id, error => {
if (error != null) {
throw error
}
DocUpdaterClient.sendUpdate(
this.project_id,
this.doc_id,
this.historyV1OTUpdate,
error => {
if (error != null) {
throw error
}
setTimeout(done, 200)
}
)
})
})
it('should update the doc', function (done) {
DocUpdaterClient.getDoc(
this.project_id,
this.doc_id,
(error, res, doc) => {
if (error) return done(error)
doc.lines.should.deep.equal(this.result)
done()
}
)
})
it('should push the applied updates to the project history changes api', function (done) {
rclientProjectHistory.lrange(
ProjectHistoryKeys.projectHistoryOps({ project_id: this.project_id }),
0,
-1,
(error, updates) => {
if (error) return done(error)
JSON.parse(updates[0]).op.should.deep.equal(this.historyV1OTUpdate.op)
JSON.parse(updates[0]).meta.pathname.should.equal('/a/b/c.tex')
done()
}
)
})
})
describe('when the document has been deleted', function () {
describe('when the ops come in a single linear order', function () {
beforeEach(function (done) {
@@ -596,6 +908,160 @@ describe('Applying updates to a doc', function () {
})
})
describe('with a broken update (history-ot)', function () {
beforeEach(function (done) {
this.broken_update = {
doc: this.doc_id,
v: this.version,
op: [{ textOperation: [99, -1] }],
meta: { source: '42' },
}
MockWebApi.insertDoc(this.project_id, this.doc_id, {
lines: this.lines,
version: this.version,
otMigrationStage: 1,
})
DocUpdaterClient.subscribeToAppliedOps(
(this.messageCallback = sinon.stub())
)
DocUpdaterClient.sendUpdate(
this.project_id,
this.doc_id,
this.broken_update,
error => {
if (error != null) {
throw error
}
setTimeout(done, 200)
}
)
})
it('should not update the doc', function (done) {
DocUpdaterClient.getDoc(
this.project_id,
this.doc_id,
(error, res, doc) => {
if (error) return done(error)
doc.lines.should.deep.equal(this.lines)
done()
}
)
})
it('should send a message with an error', function () {
this.messageCallback.called.should.equal(true)
const [channel, message] = this.messageCallback.args[0]
channel.should.equal('applied-ops')
JSON.parse(message).should.deep.include({
project_id: this.project_id,
doc_id: this.doc_id,
error:
"The operation's base length must be equal to the string's length.",
})
})
})
describe('when mixing ot types (sharejs-text-ot -> history-ot)', function () {
beforeEach(function (done) {
MockWebApi.insertDoc(this.project_id, this.doc_id, {
lines: this.lines,
version: this.version,
otMigrationStage: 0,
})
DocUpdaterClient.subscribeToAppliedOps(
(this.messageCallback = sinon.stub())
)
DocUpdaterClient.sendUpdate(
this.project_id,
this.doc_id,
this.historyV1OTUpdate,
error => {
if (error != null) {
throw error
}
setTimeout(done, 200)
}
)
})
it('should not update the doc', function (done) {
DocUpdaterClient.getDoc(
this.project_id,
this.doc_id,
(error, res, doc) => {
if (error) return done(error)
doc.lines.should.deep.equal(this.lines)
done()
}
)
})
it('should send a message with an error', function () {
this.messageCallback.called.should.equal(true)
const [channel, message] = this.messageCallback.args[0]
channel.should.equal('applied-ops')
JSON.parse(message).should.deep.include({
project_id: this.project_id,
doc_id: this.doc_id,
error: 'ot type mismatch',
})
})
})
describe('when mixing ot types (history-ot -> sharejs-text-ot)', function () {
beforeEach(function (done) {
MockWebApi.insertDoc(this.project_id, this.doc_id, {
lines: this.lines,
version: this.version,
otMigrationStage: 1,
})
DocUpdaterClient.subscribeToAppliedOps(
(this.messageCallback = sinon.stub())
)
DocUpdaterClient.sendUpdate(
this.project_id,
this.doc_id,
this.update,
error => {
if (error != null) {
throw error
}
setTimeout(done, 200)
}
)
})
it('should not update the doc', function (done) {
DocUpdaterClient.getDoc(
this.project_id,
this.doc_id,
(error, res, doc) => {
if (error) return done(error)
doc.lines.should.deep.equal(this.lines)
done()
}
)
})
it('should send a message with an error', function () {
this.messageCallback.called.should.equal(true)
const [channel, message] = this.messageCallback.args[0]
channel.should.equal('applied-ops')
JSON.parse(message).should.deep.include({
project_id: this.project_id,
doc_id: this.doc_id,
error: 'ot type mismatch',
})
})
})
describe('when there is no version in Mongo', function () {
beforeEach(function (done) {
MockWebApi.insertDoc(this.project_id, this.doc_id, {
@@ -716,6 +1182,84 @@ describe('Applying updates to a doc', function () {
})
})
describe('when sending duplicate ops (history-ot)', function () {
beforeEach(function (done) {
MockWebApi.insertDoc(this.project_id, this.doc_id, {
lines: this.lines,
version: this.version,
otMigrationStage: 1,
})
DocUpdaterClient.subscribeToAppliedOps(
(this.messageCallback = sinon.stub())
)
// One user delete 'one', the next turns it into 'once'. The second becomes a NOP.
DocUpdaterClient.sendUpdate(
this.project_id,
this.doc_id,
{
doc: this.doc_id,
op: [{ textOperation: [4, 'one and a half\n', 9] }],
v: this.version,
meta: {
source: 'ikHceq3yfAdQYzBo4-xZ',
},
},
error => {
if (error != null) {
throw error
}
setTimeout(() => {
DocUpdaterClient.sendUpdate(
this.project_id,
this.doc_id,
{
doc: this.doc_id,
op: [
{
textOperation: [4, 'one and a half\n', 9],
},
],
v: this.version,
dupIfSource: ['ikHceq3yfAdQYzBo4-xZ'],
meta: {
source: 'ikHceq3yfAdQYzBo4-xZ',
},
},
error => {
if (error != null) {
throw error
}
setTimeout(done, 200)
}
)
}, 200)
}
)
})
it('should update the doc', function (done) {
DocUpdaterClient.getDoc(
this.project_id,
this.doc_id,
(error, res, doc) => {
if (error) return done(error)
doc.lines.should.deep.equal(this.result)
done()
}
)
})
it('should return a message about duplicate ops', function () {
this.messageCallback.calledTwice.should.equal(true)
this.messageCallback.args[0][0].should.equal('applied-ops')
expect(JSON.parse(this.messageCallback.args[0][1]).op.dup).to.be.undefined
this.messageCallback.args[1][0].should.equal('applied-ops')
expect(JSON.parse(this.messageCallback.args[1][1]).op.dup).to.equal(true)
})
})
describe('when sending updates for a non-existing doc id', function () {
beforeEach(function (done) {
this.non_existing = {
@@ -13,11 +13,6 @@ const { expect } = require('chai')
const MockWebApi = require('./helpers/MockWebApi')
const DocUpdaterClient = require('./helpers/DocUpdaterClient')
const DocUpdaterApp = require('./helpers/DocUpdaterApp')
const Settings = require('@overleaf/settings')
const docUpdaterRedis = require('@overleaf/redis-wrapper').createClient(
Settings.redis.documentupdater
)
const Keys = Settings.redis.documentupdater.key_schema
describe('Getting a document', function () {
before(function (done) {
@@ -114,59 +109,6 @@ describe('Getting a document', function () {
})
})
describe('when the document is migrated (history-ot)', function () {
before(function (done) {
;[this.project_id, this.doc_id] = Array.from([
DocUpdaterClient.randomId(),
DocUpdaterClient.randomId(),
])
MockWebApi.insertDoc(this.project_id, this.doc_id, {
lines: this.lines,
version: this.version,
})
DocUpdaterClient.preloadDoc(this.project_id, this.doc_id, error => {
if (error != null) {
throw error
}
sinon.spy(MockWebApi, 'getDocument')
docUpdaterRedis.set(
Keys.docLines({ doc_id: this.doc_id }),
JSON.stringify({ content: this.lines.join('\n') }),
err => {
if (err) return done(err)
DocUpdaterClient.getDoc(
this.project_id,
this.doc_id,
(error, res, body) => {
if (error) return done(error)
this.res = res
this.body = body
done()
}
)
}
)
})
})
after(function () {
MockWebApi.getDocument.restore()
})
it('should not load the document from the web API', function () {
MockWebApi.getDocument.called.should.equal(false)
})
it('should return an error', function () {
expect(this.res.statusCode).to.equal(422)
expect(this.body).to.equal(
'refusing to process doc that was migrated to history-ot'
)
})
})
describe('when the request asks for some recent ops', function () {
before(function (done) {
;[this.project_id, this.doc_id] = Array.from([
@@ -196,6 +196,167 @@ describe('Setting a document', function () {
})
})
describe('when the updated doc exists in the doc updater (history-ot)', function () {
before(function (done) {
numberOfReceivedUpdates = 0
this.project_id = DocUpdaterClient.randomId()
this.doc_id = DocUpdaterClient.randomId()
this.historyV1OTUpdate = {
doc: this.doc_id,
op: [{ textOperation: [4, 'one and a half\n', 9] }],
v: this.version,
meta: { source: 'random-publicId' },
}
MockWebApi.insertDoc(this.project_id, this.doc_id, {
lines: this.lines,
version: this.version,
otMigrationStage: 1,
})
DocUpdaterClient.preloadDoc(this.project_id, this.doc_id, error => {
if (error) {
throw error
}
DocUpdaterClient.sendUpdate(
this.project_id,
this.doc_id,
this.historyV1OTUpdate,
error => {
if (error) {
throw error
}
setTimeout(() => {
DocUpdaterClient.setDocLines(
this.project_id,
this.doc_id,
this.newLines,
this.source,
this.user_id,
false,
(error, res, body) => {
if (error) {
return done(error)
}
this.statusCode = res.statusCode
this.body = body
done()
}
)
}, 200)
}
)
})
})
after(function () {
MockProjectHistoryApi.flushProject.resetHistory()
MockWebApi.setDocument.resetHistory()
})
it('should return a 200 status code', function () {
this.statusCode.should.equal(200)
})
it('should emit two updates (from sendUpdate and setDocLines)', function () {
expect(numberOfReceivedUpdates).to.equal(2)
})
it('should send the updated doc lines and version to the web api', function () {
MockWebApi.setDocument
.calledWith(this.project_id, this.doc_id, this.newLines)
.should.equal(true)
})
it('should update the lines in the doc updater', function (done) {
DocUpdaterClient.getDoc(
this.project_id,
this.doc_id,
(error, res, doc) => {
if (error) {
return done(error)
}
doc.lines.should.deep.equal(this.newLines)
done()
}
)
})
it('should bump the version in the doc updater', function (done) {
DocUpdaterClient.getDoc(
this.project_id,
this.doc_id,
(error, res, doc) => {
if (error) {
return done(error)
}
doc.version.should.equal(this.version + 2)
done()
}
)
})
it('should leave the document in redis', function (done) {
docUpdaterRedis.get(
Keys.docLines({ doc_id: this.doc_id }),
(error, lines) => {
if (error) {
throw error
}
expect(JSON.parse(lines)).to.deep.equal({
content: this.newLines.join('\n'),
})
done()
}
)
})
it('should return the mongo rev in the json response', function () {
this.body.should.deep.equal({ rev: '123' })
})
describe('when doc has the same contents', function () {
beforeEach(function (done) {
numberOfReceivedUpdates = 0
DocUpdaterClient.setDocLines(
this.project_id,
this.doc_id,
this.newLines,
this.source,
this.user_id,
false,
(error, res, body) => {
if (error) {
return done(error)
}
this.statusCode = res.statusCode
this.body = body
done()
}
)
})
it('should not bump the version in doc updater', function (done) {
DocUpdaterClient.getDoc(
this.project_id,
this.doc_id,
(error, res, doc) => {
if (error) {
return done(error)
}
doc.version.should.equal(this.version + 2)
done()
}
)
})
it('should not emit any updates', function (done) {
setTimeout(() => {
expect(numberOfReceivedUpdates).to.equal(0)
done()
}, 100) // delay by 100ms: make sure we do not check too early!
})
})
})
describe('when the updated doc does not exist in the doc updater', function () {
before(function (done) {
this.project_id = DocUpdaterClient.randomId()
+1
View File
@@ -31,6 +31,7 @@ SandboxedModule.configure({
requires: {
'@overleaf/logger': stubs.logger,
'mongodb-legacy': require('mongodb-legacy'), // for ObjectId comparisons
'overleaf-editor-core': require('overleaf-editor-core'), // does not play nice with sandbox
},
globals: { Buffer, JSON, Math, console, process },
sourceTransformers: {
@@ -49,6 +49,9 @@ describe('DocumentManager', function () {
applyUpdate: sinon.stub().resolves(),
},
}
this.HistoryV1OTUpdateManager = {
applyUpdate: sinon.stub().resolves(),
}
this.RangesManager = {
acceptChanges: sinon.stub(),
deleteComment: sinon.stub(),
@@ -66,6 +69,7 @@ describe('DocumentManager', function () {
'./Metrics': this.Metrics,
'./DiffCodec': this.DiffCodec,
'./UpdateManager': this.UpdateManager,
'./HistoryV1OTUpdateManager': this.HistoryV1OTUpdateManager,
'./RangesManager': this.RangesManager,
'./Errors': Errors,
'@overleaf/settings': this.Settings,
@@ -222,6 +226,7 @@ describe('DocumentManager', function () {
ranges: this.ranges,
pathname: this.pathname,
projectHistoryId: this.projectHistoryId,
type: 'sharejs-text-ot',
})
this.RedisManager.promises.getPreviousDocOps.resolves(this.ops)
this.result = await this.DocumentManager.promises.getDocAndRecentOps(
@@ -251,6 +256,7 @@ describe('DocumentManager', function () {
ranges: this.ranges,
pathname: this.pathname,
projectHistoryId: this.projectHistoryId,
type: 'sharejs-text-ot',
})
})
})
@@ -263,6 +269,7 @@ describe('DocumentManager', function () {
ranges: this.ranges,
pathname: this.pathname,
projectHistoryId: this.projectHistoryId,
type: 'sharejs-text-ot',
})
this.RedisManager.promises.getPreviousDocOps.resolves(this.ops)
this.result = await this.DocumentManager.promises.getDocAndRecentOps(
@@ -290,6 +297,7 @@ describe('DocumentManager', function () {
ranges: this.ranges,
pathname: this.pathname,
projectHistoryId: this.projectHistoryId,
type: 'sharejs-text-ot',
})
})
})
@@ -333,6 +341,7 @@ describe('DocumentManager', function () {
unflushedTime: this.unflushedTime,
alreadyLoaded: true,
historyRangesSupport: this.historyRangesSupport,
type: 'sharejs-text-ot',
})
})
})
@@ -400,6 +409,7 @@ describe('DocumentManager', function () {
unflushedTime: null,
alreadyLoaded: false,
historyRangesSupport: this.historyRangesSupport,
type: 'sharejs-text-ot',
})
})
})
@@ -26,6 +26,7 @@ describe('HttpController', function () {
this.Metrics.Timer.prototype.done = sinon.stub()
this.project_id = 'project-id-123'
this.projectHistoryId = '123'
this.doc_id = 'doc-id-123'
this.source = 'editor'
this.next = sinon.stub()
@@ -65,7 +66,9 @@ describe('HttpController', function () {
this.version,
[],
this.ranges,
this.pathname
this.pathname,
this.projectHistoryId,
'sharejs-text-ot'
)
this.HttpController.getDoc(this.req, this.res, this.next)
})
@@ -77,17 +80,16 @@ describe('HttpController', function () {
})
it('should return the doc as JSON', function () {
this.res.json
.calledWith({
id: this.doc_id,
lines: this.lines,
version: this.version,
ops: [],
ranges: this.ranges,
pathname: this.pathname,
ttlInS: 42,
})
.should.equal(true)
this.res.json.should.have.been.calledWith({
id: this.doc_id,
lines: this.lines,
version: this.version,
ops: [],
ranges: this.ranges,
pathname: this.pathname,
ttlInS: 42,
type: 'sharejs-text-ot',
})
})
it('should log the request', function () {
@@ -115,7 +117,9 @@ describe('HttpController', function () {
this.version,
this.ops,
this.ranges,
this.pathname
this.pathname,
this.projectHistoryId,
'sharejs-text-ot'
)
this.req.query = { fromVersion: `${this.fromVersion}` }
this.HttpController.getDoc(this.req, this.res, this.next)
@@ -128,17 +132,16 @@ describe('HttpController', function () {
})
it('should return the doc as JSON', function () {
this.res.json
.calledWith({
id: this.doc_id,
lines: this.lines,
version: this.version,
ops: this.ops,
ranges: this.ranges,
pathname: this.pathname,
ttlInS: 42,
})
.should.equal(true)
this.res.json.should.have.been.calledWith({
id: this.doc_id,
lines: this.lines,
version: this.version,
ops: this.ops,
ranges: this.ranges,
pathname: this.pathname,
ttlInS: 42,
type: 'sharejs-text-ot',
})
})
it('should log the request', function () {
@@ -331,6 +331,7 @@ describe('UpdateManager', function () {
pathname: this.pathname,
projectHistoryId: this.projectHistoryId,
historyRangesSupport: false,
type: 'sharejs-text-ot',
})
this.RangesManager.applyUpdate.returns({
newRanges: this.updated_ranges,
@@ -502,6 +503,7 @@ describe('UpdateManager', function () {
pathname: this.pathname,
projectHistoryId: this.projectHistoryId,
historyRangesSupport: true,
type: 'sharejs-text-ot',
})
await this.UpdateManager.promises.applyUpdate(
this.project_id,
@@ -2,6 +2,7 @@
import OError from '@overleaf/o-error'
import DMP from 'diff-match-patch'
import { EditOperationBuilder } from 'overleaf-editor-core'
/**
* @import { DeleteOp, InsertOp, Op, Update } from './types'
@@ -230,6 +231,15 @@ function _concatTwoUpdates(firstUpdate, secondUpdate) {
return [firstUpdate, secondUpdate]
}
const firstUpdateIsHistoryV1OT = EditOperationBuilder.isValid(firstUpdate.op)
const secondUpdateIsHistoryV1OT = EditOperationBuilder.isValid(
secondUpdate.op
)
if (firstUpdateIsHistoryV1OT !== secondUpdateIsHistoryV1OT) {
// cannot merge mix of sharejs-text-op and history-ot, should not happen.
return [firstUpdate, secondUpdate]
}
if (
firstUpdate.doc !== secondUpdate.doc ||
firstUpdate.pathname !== secondUpdate.pathname
@@ -276,6 +286,15 @@ function _concatTwoUpdates(firstUpdate, secondUpdate) {
return [firstUpdate, secondUpdate]
}
if (firstUpdateIsHistoryV1OT && secondUpdateIsHistoryV1OT) {
const op1 = EditOperationBuilder.fromJSON(firstUpdate.op)
const op2 = EditOperationBuilder.fromJSON(secondUpdate.op)
if (!op1.canBeComposedWith(op2)) return [firstUpdate, secondUpdate]
return [
mergeUpdatesWithOp(firstUpdate, secondUpdate, op1.compose(op2).toJSON()),
]
}
if (
firstUpdate.op.trackedDeleteRejection ||
secondUpdate.op.trackedDeleteRejection
@@ -440,8 +459,7 @@ export function diffAsShareJsOps(before, after) {
const ops = []
let position = 0
for (const diff of diffs) {
const type = diff[0]
const content = diff[1]
const [type, content] = diff
if (type === ADDED) {
ops.push({
i: content,
@@ -7,7 +7,7 @@ import * as OperationsCompressor from './OperationsCompressor.js'
import { isInsert, isRetain, isDelete, isComment } from './Utils.js'
/**
* @import { AddDocUpdate, AddFileUpdate, DeleteCommentUpdate, Op, RawScanOp } from './types'
* @import { AddDocUpdate, AddFileUpdate, DeleteCommentUpdate, HistoryV1OTEditOperationUpdate, Op, RawScanOp } from './types'
* @import { RenameUpdate, TextUpdate, TrackingDirective, TrackingProps } from './types'
* @import { SetCommentStateUpdate, SetFileMetadataOperation, Update, UpdateWithBlob } from './types'
*/
@@ -60,6 +60,16 @@ function _convertToChange(projectId, updateWithBlob) {
}
operations = [op]
projectVersion = update.version
} else if (isHistoryOTEditOperationUpdate(update)) {
let { pathname } = update.meta
pathname = _convertPathname(pathname)
if (update.v != null) {
v2DocVersions[update.doc] = { pathname, v: update.v }
}
operations = update.op.map(op => {
// Turn EditOperation into EditFileOperation by adding the pathname field.
return { pathname, ...op }
})
} else if (isTextUpdate(update)) {
const docLength = update.meta.history_doc_length ?? update.meta.doc_length
let pathname = update.meta.pathname
@@ -194,6 +204,22 @@ export function isTextUpdate(update) {
)
}
/**
* @param {Update} update
* @returns {update is HistoryV1OTEditOperationUpdate}
*/
export function isHistoryOTEditOperationUpdate(update) {
return (
'doc' in update &&
update.doc != null &&
'op' in update &&
update.op != null &&
'pathname' in update.meta &&
update.meta.pathname != null &&
Core.EditOperationBuilder.isValid(update.op[0])
)
}
export function isProjectStructureUpdate(update) {
return isAddUpdate(update) || _isRenameUpdate(update)
}
+14 -1
View File
@@ -1,5 +1,9 @@
import { HistoryRanges } from '../../../document-updater/app/js/types'
import { LinkedFileData, RawOrigin } from 'overleaf-editor-core/lib/types'
import {
LinkedFileData,
RawEditOperation,
RawOrigin,
} from 'overleaf-editor-core/lib/types'
export type Update =
| TextUpdate
@@ -40,6 +44,15 @@ export type TextUpdate = {
}
}
export type HistoryV1OTEditOperationUpdate = {
doc: string
op: RawEditOperation[]
v: number
meta: UpdateMeta & {
pathname: string
}
}
export type SetCommentStateUpdate = {
pathname: string
commentId: string
@@ -29,6 +29,10 @@ function recordProjectNotEmptySinceMetric(res, status) {
}
module.exports = {
countConnectedClients(projectId, callback) {
rclient.scard(Keys.clientsInProject({ project_id: projectId }), callback)
},
// Use the same method for when a user connects, and when a user sends a cursor
// update. This way we don't care if the connected_user key has expired when
// we receive a cursor update.
@@ -19,7 +19,7 @@ const Keys = settings.redis.documentupdater.key_schema
const DocumentUpdaterManager = {
getDocument(projectId, docId, fromVersion, callback) {
const timer = new metrics.Timer('get-document')
const url = `${settings.apis.documentupdater.url}/project/${projectId}/doc/${docId}?fromVersion=${fromVersion}`
const url = `${settings.apis.documentupdater.url}/project/${projectId}/doc/${docId}?fromVersion=${fromVersion}&historyV1OTSupport=true`
logger.debug(
{ projectId, docId, fromVersion },
'getting doc from document updater'
@@ -48,7 +48,8 @@ const DocumentUpdaterManager = {
body.version,
body.ranges,
body.ops,
body.ttlInS
body.ttlInS,
body.type
)
} else if (res.statusCode === 422 && body?.firstVersionInRedis) {
callback(new ClientRequestedMissingOpsError(422, body))
@@ -1,8 +1,23 @@
const WebsocketLoadBalancer = require('./WebsocketLoadBalancer')
const DrainManager = require('./DrainManager')
const ConnectedUsersManager = require('./ConnectedUsersManager')
const logger = require('@overleaf/logger')
module.exports = {
countConnectedClients(req, res) {
const { projectId } = req.params
ConnectedUsersManager.countConnectedClients(
projectId,
(err, nConnectedClients) => {
if (err) {
logger.err({ err, projectId }, 'count connected clients failed')
return res.sendStatus(500)
}
res.json({ nConnectedClients })
}
)
},
sendMessage(req, res) {
logger.debug({ message: req.params.message }, 'sending message')
if (Array.isArray(req.body)) {
+4
View File
@@ -113,6 +113,10 @@ module.exports = Router = {
bodyParser.json({ limit: '5mb' }),
HttpApiController.sendMessage
)
app.get(
'/project/:projectId/count-connected-clients',
HttpApiController.countConnectedClients
)
app.post('/drain', HttpApiController.startDrain)
app.post(
@@ -8,6 +8,7 @@ const ConnectedUsersManager = require('./ConnectedUsersManager')
const WebsocketLoadBalancer = require('./WebsocketLoadBalancer')
const RoomManager = require('./RoomManager')
const {
CodedError,
JoinLeaveEpochMismatchError,
NotAuthorizedError,
NotJoinedError,
@@ -283,7 +284,7 @@ module.exports = WebsocketController = {
projectId,
docId,
fromVersion,
function (error, lines, version, ranges, ops, ttlInS) {
function (error, lines, version, ranges, ops, ttlInS, type) {
if (error) {
if (error instanceof ClientRequestedMissingOpsError) {
emitJoinDocCatchUpMetrics('missing', error.info)
@@ -307,36 +308,53 @@ module.exports = WebsocketController = {
// See http://ecmanaut.blogspot.co.uk/2006/07/encoding-decoding-utf8-in-javascript.html
const encodeForWebsockets = text =>
unescape(encodeURIComponent(text))
const escapedLines = []
for (let line of lines) {
try {
line = encodeForWebsockets(line)
} catch (err) {
OError.tag(err, 'error encoding line uri component', { line })
return callback(err)
metrics.inc('client_supports_history_v1_ot', 1, {
status: options.supportsHistoryV1OT ? 'success' : 'failure',
})
let escapedLines
if (type === 'history-ot') {
if (!options.supportsHistoryV1OT) {
RoomManager.leaveDoc(client, docId)
// TODO(24596): ask the user to reload the editor page (via out-of-sync modal when there are pending ops).
return callback(
new CodedError('client does not support history-ot')
)
}
escapedLines.push(line)
}
if (options.encodeRanges) {
try {
for (const comment of (ranges && ranges.comments) || []) {
if (comment.op.c) {
comment.op.c = encodeForWebsockets(comment.op.c)
}
escapedLines = lines
} else {
escapedLines = []
for (let line of lines) {
try {
line = encodeForWebsockets(line)
} catch (err) {
OError.tag(err, 'error encoding line uri component', {
line,
})
return callback(err)
}
for (const change of (ranges && ranges.changes) || []) {
if (change.op.i) {
change.op.i = encodeForWebsockets(change.op.i)
escapedLines.push(line)
}
if (options.encodeRanges) {
try {
for (const comment of (ranges && ranges.comments) || []) {
if (comment.op.c) {
comment.op.c = encodeForWebsockets(comment.op.c)
}
}
if (change.op.d) {
change.op.d = encodeForWebsockets(change.op.d)
for (const change of (ranges && ranges.changes) || []) {
if (change.op.i) {
change.op.i = encodeForWebsockets(change.op.i)
}
if (change.op.d) {
change.op.d = encodeForWebsockets(change.op.d)
}
}
} catch (err) {
OError.tag(err, 'error encoding range uri component', {
ranges,
})
return callback(err)
}
} catch (err) {
OError.tag(err, 'error encoding range uri component', {
ranges,
})
return callback(err)
}
}
@@ -351,7 +369,7 @@ module.exports = WebsocketController = {
},
'client joined doc'
)
callback(null, escapedLines, version, ops, ranges)
callback(null, escapedLines, version, ops, ranges, type)
}
)
})
@@ -19,6 +19,80 @@ const FixturesManager = require('./helpers/FixturesManager')
const async = require('async')
describe('clientTracking', function () {
describe('when another logged in user joins a project', function () {
before(function (done) {
return async.series(
[
cb => {
return FixturesManager.setUpProject(
{
privilegeLevel: 'owner',
project: { name: 'Test Project' },
},
(error, { user_id: userId, project_id: projectId }) => {
if (error) return done(error)
this.user_id = userId
this.project_id = projectId
return cb()
}
)
},
cb => {
return FixturesManager.setUpDoc(
this.project_id,
{ lines: this.lines, version: this.version, ops: this.ops },
(e, { doc_id: docId }) => {
this.doc_id = docId
return cb(e)
}
)
},
cb => {
this.clientA = RealTimeClient.connect(this.project_id, cb)
},
cb => {
RealTimeClient.countConnectedClients(
this.project_id,
(err, body) => {
if (err) return cb(err)
expect(body).to.deep.equal({ nConnectedClients: 1 })
cb()
}
)
},
cb => {
this.clientB = RealTimeClient.connect(this.project_id, cb)
},
],
done
)
})
it('should record the initial state in getConnectedUsers', function (done) {
this.clientA.emit('clientTracking.getConnectedUsers', (error, users) => {
if (error) return done(error)
for (const user of Array.from(users)) {
if (user.client_id === this.clientB.publicId) {
expect(user.cursorData).to.not.exist
return done()
}
}
throw new Error('other user was never found')
})
})
it('should list both clients via HTTP', function (done) {
RealTimeClient.countConnectedClients(this.project_id, (err, body) => {
if (err) return done(err)
expect(body).to.deep.equal({ nConnectedClients: 2 })
done()
})
})
})
describe('when a client updates its cursor location', function () {
before(function (done) {
return async.series(
@@ -89,6 +89,7 @@ describe('joinDoc', function () {
this.version,
this.ops,
this.ranges,
'sharejs-text-ot',
])
})
@@ -168,6 +169,7 @@ describe('joinDoc', function () {
this.version,
this.ops,
this.ranges,
'sharejs-text-ot',
])
})
@@ -247,6 +249,7 @@ describe('joinDoc', function () {
this.version,
this.ops,
this.ranges,
'sharejs-text-ot',
])
})
@@ -408,6 +411,7 @@ describe('joinDoc', function () {
this.version,
this.ops,
this.ranges,
'sharejs-text-ot',
])
})
@@ -489,6 +493,7 @@ describe('joinDoc', function () {
this.version,
this.ops,
this.ranges,
'sharejs-text-ot',
])
})
@@ -504,7 +509,7 @@ describe('joinDoc', function () {
})
})
return describe('with fromVersion and options', function () {
describe('with fromVersion and options', function () {
before(function (done) {
this.fromVersion = 36
this.options = { encodeRanges: true }
@@ -572,6 +577,7 @@ describe('joinDoc', function () {
this.version,
this.ops,
this.ranges,
'sharejs-text-ot',
])
})
@@ -586,4 +592,139 @@ describe('joinDoc', function () {
)
})
})
describe('with type=history-ot', function () {
before(function (done) {
async.series(
[
cb => {
FixturesManager.setUpProject(
{ privilegeLevel: 'owner' },
(e, { project_id: projectId, user_id: userId }) => {
this.project_id = projectId
this.user_id = userId
cb(e)
}
)
},
cb => {
FixturesManager.setUpDoc(
this.project_id,
{
lines: this.lines,
version: this.version,
ops: this.ops,
ranges: this.ranges,
type: 'history-ot',
},
(e, { doc_id: docId }) => {
this.doc_id = docId
cb(e)
}
)
},
],
done
)
})
describe('when support is indicated', function () {
before(function (done) {
MockDocUpdaterServer.getDocument.resetHistory()
async.series(
[
cb => {
this.client = RealTimeClient.connect(this.project_id, cb)
},
cb =>
this.client.emit(
'joinDoc',
this.doc_id,
{ supportsHistoryV1OT: true },
(error, ...rest) => {
;[...this.returnedArgs] = Array.from(rest)
cb(error)
}
),
],
done
)
})
it('should get the doc from the doc updater', function () {
MockDocUpdaterServer.getDocument
.calledWith(this.project_id, this.doc_id, -1)
.should.equal(true)
})
it('should return the doc lines, version, ranges and ops', function () {
this.returnedArgs.should.deep.equal([
this.lines,
this.version,
this.ops,
this.ranges,
'history-ot',
])
})
it('should have joined the doc room', function (done) {
RealTimeClient.getConnectedClient(
this.client.socket.sessionid,
(error, client) => {
if (error) return done(error)
expect(client.rooms).to.deep.equal([this.project_id, this.doc_id])
done()
}
)
})
})
describe('when support is not indicated', function () {
before(function (done) {
MockDocUpdaterServer.getDocument.resetHistory()
async.series(
[
cb => {
this.client = RealTimeClient.connect(this.project_id, cb)
},
cb =>
this.client.emit('joinDoc', this.doc_id, (error, ...rest) => {
this.error = error
;[...this.returnedArgs] = Array.from(rest)
cb()
}),
],
done
)
})
it('should get the doc from the doc updater', function () {
MockDocUpdaterServer.getDocument
.calledWith(this.project_id, this.doc_id, -1)
.should.equal(true)
})
it('should return an error', function () {
expect(this.error).to.deep.equal({
message: 'client does not support history-ot',
})
})
it('should not return the doc lines, version, ranges and ops', function () {
this.returnedArgs.should.deep.equal([])
})
it('should leave the doc room again', function (done) {
RealTimeClient.getConnectedClient(
this.client.socket.sessionid,
(error, client) => {
if (error) return done(error)
expect(client.rooms).to.deep.equal([this.project_id])
done()
}
)
})
})
})
})
@@ -108,13 +108,17 @@ module.exports = FixturesManager = {
if (!options.ops) {
options.ops = ['mock', 'ops']
}
const { doc_id: docId, lines, version, ops, ranges } = options
if (!options.type) {
options.type = 'sharejs-text-ot'
}
const { doc_id: docId, lines, version, ops, ranges, type } = options
MockDocUpdaterServer.createMockDoc(projectId, docId, {
lines,
version,
ops,
ranges,
type,
})
return MockDocUpdaterServer.run(error => {
if (error != null) {
@@ -123,6 +123,16 @@ module.exports = Client = {
)
},
countConnectedClients(projectId, callback) {
request.get(
{
url: `http://127.0.0.1:3026/project/${projectId}/count-connected-clients`,
json: true,
},
(error, response, data) => callback(error, data)
)
},
getConnectedClient(clientId, callback) {
if (callback == null) {
callback = function () {}
@@ -79,7 +79,7 @@ describe('DocumentUpdaterManager', function () {
})
it('should get the document from the document updater', function () {
const url = `${this.settings.apis.documentupdater.url}/project/${this.project_id}/doc/${this.doc_id}?fromVersion=${this.fromVersion}`
const url = `${this.settings.apis.documentupdater.url}/project/${this.project_id}/doc/${this.doc_id}?fromVersion=${this.fromVersion}&historyV1OTSupport=true`
return this.request.get.calledWith(url).should.equal(true)
})
@@ -52,6 +52,11 @@ async function getDocument(req, res) {
'overleaf.history.rangesSupportEnabled',
false
)
const otMigrationStage = _.get(
project,
'overleaf.history.otMigrationStage',
0
)
// all projects are now migrated to Full Project History, keeping the field
// for API compatibility
@@ -65,6 +70,7 @@ async function getDocument(req, res) {
projectHistoryId,
projectHistoryType,
historyRangesSupport,
otMigrationStage,
resolvedCommentIds,
})
}
@@ -300,6 +300,18 @@ class NonDeletableEntityError extends OError {
}
}
class FoundConnectedClientsError extends OError {
constructor(nConnectedClients) {
super(`found ${nConnectedClients} remaining connected clients`)
}
}
class ConcurrentLoadingOfDocsDetectedError extends OError {
constructor() {
super('concurrent loading of docs detected')
}
}
module.exports = {
OError,
BackwardCompatibleError,
@@ -356,4 +368,6 @@ module.exports = {
InvalidEmailError,
InvalidInstitutionalEmailError,
NonDeletableEntityError,
FoundConnectedClientsError,
ConcurrentLoadingOfDocsDetectedError,
}
@@ -0,0 +1,56 @@
import ProjectGetter from '../Project/ProjectGetter.js'
import DocumentUpdaterHandler from '../DocumentUpdater/DocumentUpdaterHandler.js'
import HistoryManager from '../History/HistoryManager.js'
import * as RealTimeHandler from '../References/RealTime/RealTimeHandler.mjs'
import ProjectOptionsHandler from '../Project/ProjectOptionsHandler.js'
import {
NotFoundError,
FoundConnectedClientsError,
ConcurrentLoadingOfDocsDetectedError,
} from '../Errors/Errors.js'
async function ensureNoConnectedClients(projectId) {
const n = await RealTimeHandler.countConnectedClients(projectId)
if (n > 0) throw new FoundConnectedClientsError(n)
}
/**
* @param {string} projectId
* @param {number} nextStage
* @return {Promise<{otMigrationStage: number}>}
*/
export async function advanceOTMigrationStage(projectId, nextStage) {
const project = await ProjectGetter.promises.getProject(projectId, {
overleaf: true,
})
if (!project) throw new NotFoundError()
const { otMigrationStage } = project?.overleaf?.history || {}
if (otMigrationStage >= nextStage) return { otMigrationStage }
// NOTE: For the single connected client case, we could emit a pub/sub event here asking any (inactive) client without pending edits to disconnect briefly.
// e.g. EditorRealTimeController.emitToRoom(projectId, 'attempt-history-ot-migration')
// Ensure we can perform the hard migration
await ensureNoConnectedClients(projectId)
// Flush ahead of migrating to keep the time under lock down.
await DocumentUpdaterHandler.promises.flushProjectToMongoAndDelete(projectId)
// Avoid mixing update types
await HistoryManager.promises.flushProject(projectId)
// Obtain lock
if (!(await DocumentUpdaterHandler.promises.blockProject(projectId))) {
throw new ConcurrentLoadingOfDocsDetectedError()
}
try {
// Perform the mongo update and tell caller about the latest stage.
return await ProjectOptionsHandler.promises.setOTMigrationStage(
projectId,
nextStage
)
} finally {
// Unlock again (The lock will expire after 30s otherwise)
await DocumentUpdaterHandler.promises.unblockProject(projectId)
}
}
@@ -2,6 +2,8 @@ const { Project } = require('../../models/Project')
const settings = require('@overleaf/settings')
const { callbackify } = require('util')
const { db, ObjectId } = require('../../infrastructure/mongodb')
const Errors = require('../Errors/Errors')
const { ReturnDocument } = require('mongodb-legacy')
const safeCompilers = ['xelatex', 'pdflatex', 'latex', 'lualatex']
const ProjectOptionsHandler = {
@@ -73,6 +75,21 @@ const ProjectOptionsHandler = {
// because rangesSupportEnabled is not part of the schema?
return db.projects.updateOne(conditions, update)
},
async setOTMigrationStage(projectId, nextStage) {
const project = await db.projects.findOneAndUpdate(
{ _id: new ObjectId(projectId) },
// Use $max to ensure that we never downgrade the migration stage.
{ $max: { 'overleaf.history.otMigrationStage': nextStage } },
{
returnDocument: ReturnDocument.AFTER,
projection: { 'overleaf.history.otMigrationStage': 1 },
}
)
if (!project) throw new Errors.NotFoundError('project does not exist')
const { otMigrationStage } = project.overleaf.history
return { otMigrationStage }
},
}
module.exports = {
@@ -0,0 +1,9 @@
import Settings from '@overleaf/settings'
import { fetchJson } from '@overleaf/fetch-utils'
export async function countConnectedClients(projectId) {
const url = new URL(Settings.apis.realTime.url)
url.pathname = `/project/${projectId}/count-connected-clients`
const { nConnectedClients } = await fetchJson(url)
return nConnectedClients
}
+1
View File
@@ -99,6 +99,7 @@ const ProjectSchema = new Schema(
allowDowngrade: { type: Boolean },
zipFileArchivedInProject: { type: Boolean },
rangesSupportEnabled: { type: Boolean },
otMigrationStage: { type: Number },
},
},
collabratecUsers: [
@@ -2,7 +2,7 @@
// Migrated from services/web/frontend/js/ide/editor/Document.js
import RangesTracker from '@overleaf/ranges-tracker'
import { ShareJsDoc } from './share-js-doc'
import { OTType, ShareJsDoc } from './share-js-doc'
import { debugConsole } from '@/utils/debugging'
import { Socket } from '@/features/ide-react/connection/types/socket'
import { IdeEventEmitter } from '@/features/ide-react/create-ide-event-emitter'
@@ -28,6 +28,7 @@ import {
} from '@/features/ide-react/editor/types/document'
import { ThreadId } from '../../../../../types/review-panel/review-panel'
import getMeta from '@/utils/meta'
import OError from '@overleaf/o-error'
const MAX_PENDING_OP_SIZE = 64
@@ -447,16 +448,36 @@ export class DocumentContainer extends EventEmitter {
'joinDoc',
this.doc_id,
this.doc.getVersion(),
{ encodeRanges: true, age: this.doc.getTimeSinceLastServerActivity() },
(error, docLines, version, updates, ranges) => {
{
encodeRanges: true,
age: this.doc.getTimeSinceLastServerActivity(),
supportsHistoryV1OT: true,
},
(
error,
docLines,
version,
updates,
ranges,
type = 'sharejs-text-ot'
) => {
if (error) {
callback?.(error)
return
}
this.joined = true
this.doc?.catchUp(updates)
this.decodeRanges(ranges)
this.catchUpRanges(ranges?.changes, ranges?.comments)
if (this.doc?.getType() !== type) {
// TODO(24596): page reload after checking for pending ops?
throw new OError('ot type mismatch', {
got: type,
want: this.doc?.getType(),
})
}
if (type === 'sharejs-text-ot') {
this.decodeRanges(ranges)
this.catchUpRanges(ranges?.changes, ranges?.comments)
}
callback?.()
}
)
@@ -464,8 +485,18 @@ export class DocumentContainer extends EventEmitter {
this.socket.emit(
'joinDoc',
this.doc_id,
{ encodeRanges: true },
(error, docLines, version, updates, ranges) => {
{
encodeRanges: true,
supportsHistoryV1OT: true,
},
(
error,
docLines,
version,
updates,
ranges,
type: OTType = 'sharejs-text-ot'
) => {
if (error) {
callback?.(error)
return
@@ -477,9 +508,12 @@ export class DocumentContainer extends EventEmitter {
version,
this.socket,
this.globalEditorWatchdogManager,
this.ideEventEmitter
this.ideEventEmitter,
type
)
this.decodeRanges(ranges)
if (type === 'sharejs-text-ot') {
this.decodeRanges(ranges)
}
this.ranges = new RangesTracker(ranges?.changes, ranges?.comments)
this.bindToShareJsDocEvents()
callback?.()
@@ -580,7 +614,9 @@ export class DocumentContainer extends EventEmitter {
this.doc.on(
'change',
(ops: AnyOperation[], oldSnapshot: any, msg: Message) => {
this.applyOpsToRanges(ops, msg)
if (this.getType() === 'sharejs-text-ot') {
this.applyOpsToRanges(ops, msg)
}
if (docChangedTimeout) {
window.clearTimeout(docChangedTimeout)
}
@@ -2,7 +2,7 @@
// Migrated from services/web/frontend/js/ide/editor/ShareJsDoc.js
import EventEmitter from '../../../utils/EventEmitter'
import { Doc } from '@/vendor/libs/sharejs'
import sharejs, { Doc } from '@/vendor/libs/sharejs'
import { Socket } from '@/features/ide-react/connection/types/socket'
import { debugConsole } from '@/utils/debugging'
import { decodeUtf8 } from '@/utils/decode-utf8'
@@ -12,11 +12,18 @@ import {
Message,
ShareJsConnectionState,
ShareJsOperation,
ShareJsTextType,
TrackChangesIdSeeds,
} from '@/features/ide-react/editor/types/document'
import { EditorFacade } from '@/features/source-editor/extensions/realtime'
import { recordDocumentFirstChangeEvent } from '@/features/event-tracking/document-first-change-event'
import getMeta from '@/utils/meta'
import { HistoryOTType } from './share-js-history-ot-type'
import { StringFileData } from 'overleaf-editor-core/index'
import {
RawEditOperation,
StringFileRawData,
} from 'overleaf-editor-core/lib/types'
// All times below are in milliseconds
const SINGLE_USER_FLUSH_DELAY = 2000
@@ -27,6 +34,7 @@ const FATAL_OP_TIMEOUT = 45000
const RECENT_ACK_LIMIT = 2 * SINGLE_USER_FLUSH_DELAY
type Update = Record<string, any>
export type OTType = 'sharejs-text-ot' | 'history-ot'
type Connection = {
send: (update: Update) => void
@@ -35,7 +43,6 @@ type Connection = {
}
export class ShareJsDoc extends EventEmitter {
type: string
track_changes = false
track_changes_id_seeds: TrackChangesIdSeeds | null = null
connection: Connection
@@ -57,12 +64,24 @@ export class ShareJsDoc extends EventEmitter {
version: number,
readonly socket: Socket,
private readonly globalEditorWatchdogManager: EditorWatchdogManager,
private readonly eventEmitter: IdeEventEmitter
private readonly eventEmitter: IdeEventEmitter,
readonly type: OTType = 'sharejs-text-ot'
) {
super()
this.type = 'text'
let sharejsType: ShareJsTextType = sharejs.types.text
// Decode any binary bits of data
const snapshot = docLines.map(line => decodeUtf8(line)).join('\n')
let snapshot: string | StringFileData
if (this.type === 'history-ot') {
snapshot = StringFileData.fromRaw(
docLines as unknown as StringFileRawData
)
sharejsType = new HistoryOTType(snapshot) as ShareJsTextType<
StringFileData,
RawEditOperation[]
>
} else {
snapshot = docLines.map(line => decodeUtf8(line)).join('\n')
}
this.connection = {
send: (update: Update) => {
@@ -89,7 +108,7 @@ export class ShareJsDoc extends EventEmitter {
}
this._doc = new Doc(this.connection, this.doc_id, {
type: this.type,
type: sharejsType,
})
this._doc.setFlushDelay(SINGLE_USER_FLUSH_DELAY)
this._doc.on('change', (...args: any[]) => {
@@ -0,0 +1,131 @@
import EventEmitter from '@/utils/EventEmitter'
import {
EditOperationBuilder,
InsertOp,
RemoveOp,
RetainOp,
StringFileData,
TextOperation,
} from 'overleaf-editor-core'
import { RawEditOperation } from 'overleaf-editor-core/lib/types'
function loadTextOperation(raw: RawEditOperation): TextOperation {
const operation = EditOperationBuilder.fromJSON(raw)
if (!(operation instanceof TextOperation)) {
throw new Error(`operation not supported: ${operation.constructor.name}`)
}
return operation
}
export class HistoryOTType extends EventEmitter {
// stub interface, these are actually on the Doc
api: HistoryOTType
snapshot: StringFileData
constructor(snapshot: StringFileData) {
super()
this.api = this
this.snapshot = snapshot
}
transformX(raw1: RawEditOperation[], raw2: RawEditOperation[]) {
const [a, b] = TextOperation.transform(
loadTextOperation(raw1[0]),
loadTextOperation(raw2[0])
)
return [[a.toJSON()], [b.toJSON()]]
}
apply(snapshot: StringFileData, rawEditOperation: RawEditOperation[]) {
const operation = loadTextOperation(rawEditOperation[0])
const afterFile = StringFileData.fromRaw(snapshot.toRaw())
afterFile.edit(operation)
this.snapshot = afterFile
return afterFile
}
compose(op1: RawEditOperation[], op2: RawEditOperation[]) {
return [
loadTextOperation(op1[0]).compose(loadTextOperation(op2[0])).toJSON(),
]
}
// Do not provide normalize, used by submitOp to fixup bad input.
// normalize(op: TextOperation) {}
// Do not provide invert, only needed for reverting a rejected update.
// We are displaying an out-of-sync modal when an op is rejected.
// invert(op: TextOperation) {}
// API
insert(pos: number, text: string, fromUndo: boolean) {
const old = this.getText()
const op = new TextOperation()
op.retain(pos)
op.insert(text)
op.retain(old.length - pos)
this.submitOp([op.toJSON()])
}
del(pos: number, length: number, fromUndo: boolean) {
const old = this.getText()
const op = new TextOperation()
op.retain(pos)
op.remove(length)
op.retain(old.length - pos - length)
this.submitOp([op.toJSON()])
}
getText() {
return this.snapshot.getContent({ filterTrackedDeletes: true })
}
getLength() {
return this.getText().length
}
_register() {
this.on(
'remoteop',
(rawEditOperation: RawEditOperation[], oldSnapshot: StringFileData) => {
const operation = loadTextOperation(rawEditOperation[0])
const str = oldSnapshot.getContent()
if (str.length !== operation.baseLength)
throw new TextOperation.ApplyError(
"The operation's base length must be equal to the string's length.",
operation,
str
)
let outputCursor = 0
let inputCursor = 0
for (const op of operation.ops) {
if (op instanceof RetainOp) {
inputCursor += op.length
outputCursor += op.length
} else if (op instanceof InsertOp) {
this.emit('insert', outputCursor, op.insertion, op.insertion.length)
outputCursor += op.insertion.length
} else if (op instanceof RemoveOp) {
this.emit(
'delete',
outputCursor,
str.slice(inputCursor, inputCursor + op.length)
)
inputCursor += op.length
}
}
if (inputCursor !== str.length)
throw new TextOperation.ApplyError(
"The operation didn't operate on the whole string.",
operation,
str
)
}
)
}
// stub-interface, provided by sharejs.Doc
submitOp(op: RawEditOperation[]) {}
}
@@ -1,3 +1,4 @@
import { StringFileData } from 'overleaf-editor-core'
import { AnyOperation } from '../../../../../../types/change'
export type Version = number
@@ -8,6 +9,23 @@ export type ShareJsOperation = AnyOperation[]
export type TrackChangesIdSeeds = { inflight: string; pending: string }
export interface ShareJsTextType<Snapshot = any, Operation = any> {
transformX(op1: Operation, op2: Operation): Operation[]
apply(snapshot: Snapshot, op: Operation): Snapshot
compose(op1: Operation, op2: Operation): Operation
api: {
insert(pos: number, text: string, fromUndo: boolean): void
del(pos: number, length: number, fromUndo: boolean): void
getText(): string
getLength(): number
_register(): void
}
// stub-interface, provided by sharejs.Doc
submitOp(op: Operation): void
}
// TODO: check the properties of this type
export type Message = {
v: Version
@@ -16,5 +34,6 @@ export type Message = {
type?: string
}
doc?: string
snapshot?: string
snapshot?: string | StringFileData
type?: ShareJsTextType
}
+1
View File
@@ -88,6 +88,7 @@ function getSandboxedModuleRequires() {
'sshpk',
'xml2js',
'mongodb',
'mongodb-legacy',
]
for (const modulePath of internalModules) {
requires[Path.resolve(__dirname, modulePath)] = require(modulePath)
@@ -126,6 +126,7 @@ describe('DocumentController', function () {
projectHistoryType: 'project-history',
resolvedCommentIds: ['comment2'],
historyRangesSupport: false,
otMigrationStage: 0,
})
})
})