a9a9f6ee6b
* migrate recover_zip_from_backup from archiver to zip-stream Replace the `archiver` package with `zip-stream` (the lower-level library that `archiver` wraps) in the `recover_zip_from_backup.mjs` script and `backupArchiver.mjs` library. The `archiver` package has known issues with hanging when creating large zip files and is no longer actively maintained. Changes: - Add `zip-stream@^7.0.2` as a direct dependency - Update `backupArchiver.mjs` to use promisified `ZipStream.entry()` instead of `Archiver.append()` - Rewrite `recover_zip_from_backup.mjs` to use `ZipStream` with `stream/promises.pipeline` for cleaner async flow - Keep `archiver` dependency for `project_archive.js` (separate code path) Agent-Logs-Url: https://github.com/overleaf/internal/sessions/0df27a8b-97f1-43cc-ac26-f5247a84313f Co-authored-by: briangough <7457354+briangough@users.noreply.github.com> * extract finalize timeout to named constant Agent-Logs-Url: https://github.com/overleaf/internal/sessions/0df27a8b-97f1-43cc-ac26-f5247a84313f Co-authored-by: briangough <7457354+briangough@users.noreply.github.com> * convert recover_zip.js to zip-stream, remove finalize timeout, add verbose logging Agent-Logs-Url: https://github.com/overleaf/internal/sessions/9380d08a-d813-4e9f-a2ac-4891122c163b Co-authored-by: briangough <7457354+briangough@users.noreply.github.com> * add acceptance tests for recover_zip_from_backup in raw and latest modes Agent-Logs-Url: https://github.com/overleaf/internal/sessions/9380d08a-d813-4e9f-a2ac-4891122c163b Co-authored-by: briangough <7457354+briangough@users.noreply.github.com> * fix comment formatting in recover_zip_from_backup.mjs Agent-Logs-Url: https://github.com/overleaf/internal/sessions/9380d08a-d813-4e9f-a2ac-4891122c163b Co-authored-by: briangough <7457354+briangough@users.noreply.github.com> * restore EventEmitter.defaultMaxListeners in recover_zip.js, add acceptance test Agent-Logs-Url: https://github.com/overleaf/internal/sessions/e7443126-22d5-4d0e-a176-a7a5dba49ffd Co-authored-by: briangough <7457354+briangough@users.noreply.github.com> * fix formatting * refactor: simplify stream handling by using named imports for pipeline * fix blob hash verification in backup acceptance tests * fix recover_zip script and tests * fix: exit with non-zero status on error in recover_zip.js Agent-Logs-Url: https://github.com/overleaf/internal/sessions/ef3f109b-488f-47c9-84a5-b5269387166a Co-authored-by: briangough <7457354+briangough@users.noreply.github.com> * migrate from npm to yarn --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: briangough <7457354+briangough@users.noreply.github.com> Co-authored-by: Brian Gough <briangough@users.noreply.github.com> GitOrigin-RevId: 6255f9610f3c846790e2ed8b1979ac08b7effece
512 lines
13 KiB
JavaScript
512 lines
13 KiB
JavaScript
// @ts-check
|
|
import path from 'node:path'
|
|
import projectKey from '@overleaf/object-persistor/src/ProjectKey.js'
|
|
import {
|
|
chunksBucket,
|
|
backupPersistor,
|
|
projectBlobsBucket,
|
|
globalBlobsBucket as backupGlobalBlobsBucket,
|
|
} from './backupPersistor.mjs'
|
|
import core, { Chunk, History } from 'overleaf-editor-core'
|
|
import {
|
|
GLOBAL_BLOBS,
|
|
makeProjectKey,
|
|
getStringLengthOfFile,
|
|
makeGlobalKey,
|
|
} from './blob_store/index.js'
|
|
import streams from './streams.js'
|
|
import objectPersistor from '@overleaf/object-persistor'
|
|
import OError from '@overleaf/o-error'
|
|
import chunkStore from './chunk_store/index.js'
|
|
import logger from '@overleaf/logger'
|
|
import fs from 'node:fs'
|
|
import { pipeline } from 'node:stream/promises'
|
|
import withTmpDir from '../../api/controllers/with_tmp_dir.js'
|
|
import { loadChunk } from './backupVerifier.mjs'
|
|
import globalBlobPersistor from './persistor.js'
|
|
import config from 'config'
|
|
import { NoKEKMatchedError } from '@overleaf/object-persistor/src/Errors.js'
|
|
|
|
const globalBlobsBucket = config.get('blobStore.globalBucket')
|
|
|
|
class BackupBlobStore {
|
|
/**
|
|
*
|
|
* @param {string} historyId
|
|
* @param {string} tmp
|
|
* @param {CachedPerProjectEncryptedS3Persistor} persistor
|
|
* @param {boolean} useBackupGlobalBlobs
|
|
*/
|
|
constructor(historyId, tmp, persistor, useBackupGlobalBlobs) {
|
|
this.historyId = historyId
|
|
this.tmp = tmp
|
|
this.blobs = new Map()
|
|
this.persistor = persistor
|
|
this.useBackupGlobalBlobs = useBackupGlobalBlobs
|
|
}
|
|
|
|
/**
|
|
* Required for BlobStore interface - not supported.
|
|
*
|
|
* @template T
|
|
* @param {string} hash
|
|
* @return {Promise<T>}
|
|
*/
|
|
async getObject(hash) {
|
|
try {
|
|
const stream = await this.getStream(hash)
|
|
const buffer = await streams.readStreamToBuffer(stream)
|
|
return JSON.parse(buffer.toString())
|
|
} catch (err) {
|
|
logger.warn({ err, hash }, 'Failed to fetch chunk blob')
|
|
throw err
|
|
}
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {Set<string>} hashes
|
|
* @return {Promise<void>}
|
|
*/
|
|
async fetchBlobs(hashes) {
|
|
for await (const hash of hashes) {
|
|
if (this.blobs.has(hash)) return
|
|
const path = `${this.tmp}/${hash}`
|
|
/** @type {core.Blob} */
|
|
let blob
|
|
/** @type {NodeJS.ReadableStream} */
|
|
let blobStream
|
|
if (GLOBAL_BLOBS.has(hash)) {
|
|
try {
|
|
const blobData = await this.fetchGlobalBlob(hash)
|
|
await pipeline(blobData.stream, fs.createWriteStream(path))
|
|
blob = blobData.blob
|
|
} catch (err) {
|
|
logger.warn({ hash, err }, 'Failed to fetch global blob')
|
|
continue
|
|
}
|
|
} else {
|
|
try {
|
|
blobStream = await fetchBlob(this.historyId, hash, this.persistor)
|
|
await pipeline(blobStream, fs.createWriteStream(path))
|
|
blob = await this.makeBlob(hash, path)
|
|
} catch (err) {
|
|
logger.warn({ err, hash }, 'Failed to fetch chunk blob')
|
|
continue
|
|
}
|
|
}
|
|
|
|
this.blobs.set(hash, blob)
|
|
}
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {string} hash
|
|
* @return {Promise<{ blob: core.Blob, stream: NodeJS.ReadableStream }>}
|
|
*/
|
|
async fetchGlobalBlob(hash) {
|
|
const globalBlob = GLOBAL_BLOBS.get(hash)
|
|
if (!globalBlob) {
|
|
throw new Error('blob does not exist or is not a global blob')
|
|
}
|
|
let stream
|
|
|
|
const key = makeGlobalKey(hash)
|
|
|
|
if (this.useBackupGlobalBlobs) {
|
|
stream = await this.persistor.getObjectStream(
|
|
backupGlobalBlobsBucket,
|
|
key
|
|
)
|
|
} else {
|
|
stream = await globalBlobPersistor.getObjectStream(globalBlobsBucket, key)
|
|
}
|
|
return { blob: globalBlob.blob, stream }
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {string} hash
|
|
* @param {string} pathname
|
|
* @return {Promise<core.Blob>}
|
|
*/
|
|
async makeBlob(hash, pathname) {
|
|
const stat = await fs.promises.stat(pathname)
|
|
const byteLength = stat.size
|
|
const stringLength = await getStringLengthOfFile(byteLength, pathname)
|
|
if (stringLength) {
|
|
return new core.Blob(hash, byteLength, stringLength)
|
|
}
|
|
return new core.Blob(hash, byteLength)
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {string} hash
|
|
* @return {Promise<string>}
|
|
*/
|
|
async getString(hash) {
|
|
const stream = await this.getStream(hash)
|
|
const buffer = await streams.readStreamToBuffer(stream)
|
|
return buffer.toString()
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {string} hash
|
|
* @return {Promise<fs.ReadStream>}
|
|
*/
|
|
async getStream(hash) {
|
|
return fs.createReadStream(this.getBlobPathname(hash))
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {string} hash
|
|
* @return {Promise<core.Blob>}
|
|
*/
|
|
async getBlob(hash) {
|
|
return this.blobs.get(hash)
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {string} hash
|
|
* @return {string}
|
|
*/
|
|
getBlobPathname(hash) {
|
|
return path.join(this.tmp, hash)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @typedef {(import('@overleaf/object-persistor/src/PerProjectEncryptedS3Persistor.js').CachedPerProjectEncryptedS3Persistor)} CachedPerProjectEncryptedS3Persistor
|
|
*/
|
|
|
|
/**
|
|
* @typedef {(import('zip-stream').default)} ZipStream
|
|
*/
|
|
|
|
/**
|
|
* @typedef {(import('overleaf-editor-core').FileMap)} FileMap
|
|
*/
|
|
|
|
/**
|
|
* Promisified wrapper for ZipStream's entry method.
|
|
*
|
|
* @param {ZipStream} archive
|
|
* @param {Buffer|NodeJS.ReadableStream|string} source
|
|
* @param {{ name: string }} data
|
|
* @return {Promise<void>}
|
|
*/
|
|
function addEntry(archive, source, data) {
|
|
return new Promise((resolve, reject) => {
|
|
archive.entry(source, data, err => {
|
|
if (err) reject(err)
|
|
else resolve()
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param historyId
|
|
* @return {Promise<CachedPerProjectEncryptedS3Persistor>}
|
|
*/
|
|
async function getProjectPersistor(historyId) {
|
|
try {
|
|
return await backupPersistor.forProjectRO(
|
|
projectBlobsBucket,
|
|
makeProjectKey(historyId, '')
|
|
)
|
|
} catch (error) {
|
|
if (error instanceof NoKEKMatchedError) {
|
|
logger.info({}, 'no kek matched')
|
|
}
|
|
throw new BackupPersistorError(
|
|
'Failed to get project persistor',
|
|
{ historyId },
|
|
error instanceof Error ? error : undefined
|
|
)
|
|
}
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param persistor
|
|
* @param {string} key
|
|
* @return {Promise<{chunkData: any, buffer: Buffer}>}
|
|
*/
|
|
async function loadChunkByKey(persistor, key) {
|
|
try {
|
|
const buf = await streams.gunzipStreamToBuffer(
|
|
await persistor.getObjectStream(chunksBucket, key)
|
|
)
|
|
return { chunkData: JSON.parse(buf.toString('utf-8')), buffer: buf }
|
|
} catch (err) {
|
|
if (err instanceof objectPersistor.Errors.NotFoundError) {
|
|
throw new Chunk.NotPersistedError('chunk not found')
|
|
}
|
|
if (err instanceof Error) {
|
|
throw OError.tag(err, 'Failed to load chunk', { key })
|
|
}
|
|
throw err
|
|
}
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {string} historyId
|
|
* @param {string} hash
|
|
* @param {CachedPerProjectEncryptedS3Persistor} persistor
|
|
* @return {Promise<NodeJS.ReadableStream>}
|
|
*/
|
|
async function fetchBlob(historyId, hash, persistor) {
|
|
const path = makeProjectKey(historyId, hash)
|
|
return await persistor.getObjectStream(projectBlobsBucket, path, {
|
|
autoGunzip: true,
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @typedef {object} AddChunkOptions
|
|
* @property {string} [prefix]
|
|
* @property {boolean} [useBackupGlobalBlobs]
|
|
* @property {boolean} [verbose]
|
|
*/
|
|
|
|
/**
|
|
*
|
|
* @param {History} history
|
|
* @param {ZipStream} archive
|
|
* @param {CachedPerProjectEncryptedS3Persistor} projectCache
|
|
* @param {string} historyId
|
|
* @param {AddChunkOptions} [options]
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async function addChunkToArchive(
|
|
history,
|
|
archive,
|
|
projectCache,
|
|
historyId,
|
|
{ prefix = '', useBackupGlobalBlobs = false, verbose = false } = {}
|
|
) {
|
|
const chunkBlobs = new Set()
|
|
history.findBlobHashes(chunkBlobs)
|
|
|
|
await withTmpDir('recovery-blob-', async tmpDir => {
|
|
const blobStore = new BackupBlobStore(
|
|
historyId,
|
|
tmpDir,
|
|
projectCache,
|
|
useBackupGlobalBlobs
|
|
)
|
|
await blobStore.fetchBlobs(chunkBlobs)
|
|
|
|
await history.loadFiles('lazy', blobStore)
|
|
|
|
const snapshot = history.getSnapshot()
|
|
snapshot.applyAll(history.getChanges())
|
|
|
|
const filePaths = snapshot.getFilePathnames()
|
|
|
|
if (filePaths.length === 0) {
|
|
logger.warn(
|
|
{ historyId, projectVersion: snapshot.projectVersion },
|
|
'No files found in snapshot backup'
|
|
)
|
|
}
|
|
for (const filePath of filePaths) {
|
|
/** @type {core.File | null | undefined} */
|
|
const file = snapshot.getFile(filePath)
|
|
if (!file) {
|
|
logger.error({ filePath }, 'File not found in snapshot')
|
|
continue
|
|
}
|
|
|
|
try {
|
|
await file.load('eager', blobStore)
|
|
} catch (err) {
|
|
logger.error(
|
|
{ filePath, err },
|
|
'Failed to load file from snapshot, skipping'
|
|
)
|
|
continue
|
|
}
|
|
|
|
const hash = file.getHash()
|
|
|
|
/** @type {string | fs.ReadStream | null | undefined} */
|
|
let content = file.getContent({ filterTrackedDeletes: true })
|
|
|
|
if (content === null) {
|
|
if (!hash) {
|
|
logger.error({ filePath }, 'File does not have a hash')
|
|
continue
|
|
}
|
|
const blob = await blobStore.getBlob(hash)
|
|
if (!blob) {
|
|
logger.error({ filePath }, 'Blob not found in blob store')
|
|
continue
|
|
}
|
|
content = await blobStore.getStream(hash)
|
|
}
|
|
if (content == null) {
|
|
logger.error({ filePath }, 'File content is empty')
|
|
continue
|
|
}
|
|
await addEntry(archive, content, {
|
|
name: `${prefix}${filePath}`,
|
|
})
|
|
if (verbose) {
|
|
logger.info({ filePath: `${prefix}${filePath}` }, 'added to archive')
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {string} historyId
|
|
* @return {Promise<number>}
|
|
*/
|
|
async function findStartVersionOfLatestChunk(historyId) {
|
|
const backend = chunkStore.getBackend(historyId)
|
|
const chunk = await backend.getLatestChunk(historyId, { readOnly: true })
|
|
if (!chunk) {
|
|
throw new Error('Latest chunk could not be loaded')
|
|
}
|
|
return chunk.startVersion
|
|
}
|
|
|
|
/**
|
|
* Restore a project from the latest snapshot
|
|
*
|
|
* There is an assumption that the database backup
|
|
* has been restored.
|
|
*
|
|
* @param {ZipStream} archive
|
|
* @param {string} historyId
|
|
* @param {boolean} [useBackupGlobalBlobs]
|
|
* @param {boolean} [verbose]
|
|
* @return {Promise<void>}
|
|
*/
|
|
export async function archiveLatestChunk(
|
|
archive,
|
|
historyId,
|
|
useBackupGlobalBlobs = false,
|
|
verbose = false
|
|
) {
|
|
logger.info({ historyId, useBackupGlobalBlobs }, 'Archiving latest chunk')
|
|
|
|
const projectCache = await getProjectPersistor(historyId)
|
|
|
|
const startVersion = await findStartVersionOfLatestChunk(historyId)
|
|
|
|
const backedUpChunkRaw = await loadChunk(
|
|
historyId,
|
|
startVersion,
|
|
projectCache
|
|
)
|
|
|
|
const backedUpChunk = History.fromRaw(backedUpChunkRaw)
|
|
|
|
await addChunkToArchive(backedUpChunk, archive, projectCache, historyId, {
|
|
useBackupGlobalBlobs,
|
|
verbose,
|
|
})
|
|
|
|
return archive
|
|
}
|
|
|
|
/**
|
|
* Fetches all raw blobs from the project and adds
|
|
* them to the archive.
|
|
*
|
|
* @param {string} historyId
|
|
* @param {ZipStream} archive
|
|
* @param {CachedPerProjectEncryptedS3Persistor} projectCache
|
|
* @param {boolean} [verbose]
|
|
* @return {Promise<void>}
|
|
*/
|
|
async function addRawBlobsToArchive(
|
|
historyId,
|
|
archive,
|
|
projectCache,
|
|
verbose = false
|
|
) {
|
|
const blobKeys = await projectCache.listDirectoryKeys(
|
|
projectBlobsBucket,
|
|
projectKey.format(historyId)
|
|
)
|
|
for (const key of blobKeys) {
|
|
try {
|
|
const stream = await projectCache.getObjectStream(
|
|
projectBlobsBucket,
|
|
key,
|
|
{ autoGunzip: true }
|
|
)
|
|
const entryName = path.join(historyId, 'blobs', key)
|
|
await addEntry(archive, stream, {
|
|
name: entryName,
|
|
})
|
|
if (verbose) {
|
|
logger.info({ entryName }, 'added to archive')
|
|
}
|
|
} catch (err) {
|
|
logger.warn({ err, path: key }, 'Failed to append blob to archive')
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Download raw files from the backup.
|
|
*
|
|
* This can work without the database being backed up.
|
|
*
|
|
* It will split the project into chunks per directory
|
|
* and download the blobs alongside the chunk.
|
|
*
|
|
* @param {ZipStream} archive
|
|
* @param {string} historyId
|
|
* @param {boolean} [useBackupGlobalBlobs]
|
|
* @param {boolean} [verbose]
|
|
* @return {Promise<void>}
|
|
*/
|
|
export async function archiveRawProject(
|
|
archive,
|
|
historyId,
|
|
useBackupGlobalBlobs = false,
|
|
verbose = false
|
|
) {
|
|
const projectCache = await getProjectPersistor(historyId)
|
|
|
|
const chunkKeys = await projectCache.listDirectoryKeys(
|
|
chunksBucket,
|
|
projectKey.format(historyId)
|
|
)
|
|
|
|
if (chunkKeys.length === 0) {
|
|
throw new Error('No chunks found')
|
|
}
|
|
|
|
for (const key of chunkKeys) {
|
|
const chunkId = key.split('/').pop()
|
|
logger.debug({ chunkId, key }, 'Processing chunk')
|
|
|
|
const { buffer } = await loadChunkByKey(projectCache, key)
|
|
|
|
const entryName = `${historyId}/chunks/${chunkId}/chunk.json`
|
|
await addEntry(archive, buffer, {
|
|
name: entryName,
|
|
})
|
|
if (verbose) {
|
|
logger.info({ entryName }, 'added to archive')
|
|
}
|
|
}
|
|
await addRawBlobsToArchive(historyId, archive, projectCache, verbose)
|
|
}
|
|
|
|
export class BackupPersistorError extends OError {}
|