Process zip import entries in parallel (#33176)
GitOrigin-RevId: f77c2b08d4c085b51a8608d2621dd5bbe1134258
This commit is contained in:
@@ -168,37 +168,91 @@ async function _initializeProjectWithZipContents(
|
||||
return { fileEntries, docEntries }
|
||||
}
|
||||
|
||||
/**
|
||||
* Create project docs and files using concurrent workers.
|
||||
* Returns the created entries grouped by type, in the correct order.
|
||||
*/
|
||||
async function _createEntriesFromImports(project, importEntries) {
|
||||
const fileEntries = []
|
||||
const docEntries = []
|
||||
for (const importEntry of importEntries) {
|
||||
switch (importEntry.type) {
|
||||
case 'doc': {
|
||||
const docEntry = await _createDoc(
|
||||
project,
|
||||
importEntry.projectPath,
|
||||
importEntry.lines
|
||||
)
|
||||
docEntries.push(docEntry)
|
||||
break
|
||||
const createdEntries = new Array(importEntries.length)
|
||||
let nextIndex = 0
|
||||
let firstError
|
||||
|
||||
async function worker() {
|
||||
while (firstError == null) {
|
||||
const currentIndex = nextIndex
|
||||
nextIndex++
|
||||
|
||||
if (currentIndex >= importEntries.length) {
|
||||
return
|
||||
}
|
||||
case 'file': {
|
||||
const fileEntry = await _createFile(
|
||||
|
||||
try {
|
||||
createdEntries[currentIndex] = await _createEntryFromImport(
|
||||
project,
|
||||
importEntry.projectPath,
|
||||
importEntry.fsPath
|
||||
importEntries[currentIndex]
|
||||
)
|
||||
fileEntries.push(fileEntry)
|
||||
break
|
||||
}
|
||||
default: {
|
||||
throw new Error(`Invalid import type: ${importEntry.type}`)
|
||||
} catch (error) {
|
||||
firstError = firstError ?? error
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.allSettled(
|
||||
Array.from({ length: Math.min(5, importEntries.length) }, () => worker())
|
||||
)
|
||||
|
||||
if (firstError != null) {
|
||||
throw firstError
|
||||
}
|
||||
|
||||
const fileEntries = []
|
||||
const docEntries = []
|
||||
|
||||
for (const createdEntry of createdEntries) {
|
||||
switch (createdEntry.type) {
|
||||
case 'doc': {
|
||||
docEntries.push(createdEntry.entry)
|
||||
break
|
||||
}
|
||||
case 'file': {
|
||||
fileEntries.push(createdEntry.entry)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { fileEntries, docEntries }
|
||||
}
|
||||
|
||||
async function _createEntryFromImport(project, importEntry) {
|
||||
switch (importEntry.type) {
|
||||
case 'doc': {
|
||||
return {
|
||||
type: 'doc',
|
||||
entry: await _createDoc(
|
||||
project,
|
||||
importEntry.projectPath,
|
||||
importEntry.lines
|
||||
),
|
||||
}
|
||||
}
|
||||
case 'file': {
|
||||
return {
|
||||
type: 'file',
|
||||
entry: await _createFile(
|
||||
project,
|
||||
importEntry.projectPath,
|
||||
importEntry.fsPath
|
||||
),
|
||||
}
|
||||
}
|
||||
default: {
|
||||
throw new Error(`Invalid import type: ${importEntry.type}`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function _createDoc(project, projectPath, docLines) {
|
||||
const projectId = project._id
|
||||
const docName = Path.basename(projectPath)
|
||||
|
||||
@@ -505,4 +505,224 @@ describe('ProjectUploadManager', function () {
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('createProjectFromZipArchiveWithName import concurrency', function () {
|
||||
it('should create import entries in parallel with a limit of five', async function (ctx) {
|
||||
ctx.importEntries = Array.from({ length: 6 }, (_, index) => ({
|
||||
type: 'file',
|
||||
projectPath: `/file-${index}.png`,
|
||||
fsPath: `/path/to/file-${index}.png`,
|
||||
}))
|
||||
ctx.FileSystemImportManager.promises.importDir.resetBehavior()
|
||||
ctx.FileSystemImportManager.promises.importDir
|
||||
.withArgs(ctx.topLevelDestination)
|
||||
.resolves(ctx.importEntries)
|
||||
|
||||
let releaseUploads
|
||||
const uploadsReleased = new Promise(resolve => {
|
||||
releaseUploads = resolve
|
||||
})
|
||||
|
||||
ctx.FileStoreHandler.promises.uploadFileFromDiskWithHistoryId.resetBehavior()
|
||||
ctx.FileStoreHandler.promises.uploadFileFromDiskWithHistoryId.callsFake(
|
||||
async (_projectId, _historyId, fileMeta) => {
|
||||
await uploadsReleased
|
||||
return {
|
||||
createdBlob: true,
|
||||
fileRef: {
|
||||
_id: new ObjectId(),
|
||||
name: fileMeta.name,
|
||||
},
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
const uploadPromise =
|
||||
ctx.ProjectUploadManager.promises.createProjectFromZipArchiveWithName(
|
||||
ctx.ownerId,
|
||||
ctx.projectName,
|
||||
ctx.zipPath
|
||||
)
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(
|
||||
ctx.FileStoreHandler.promises.uploadFileFromDiskWithHistoryId
|
||||
.callCount
|
||||
).to.equal(5)
|
||||
})
|
||||
|
||||
releaseUploads()
|
||||
|
||||
const { fileEntries, docEntries } = await uploadPromise
|
||||
|
||||
expect(
|
||||
ctx.FileStoreHandler.promises.uploadFileFromDiskWithHistoryId.callCount
|
||||
).to.equal(6)
|
||||
expect(fileEntries).to.have.length(6)
|
||||
expect(docEntries).to.have.length(0)
|
||||
})
|
||||
|
||||
it('should stop starting queued imports after the first failure', async function (ctx) {
|
||||
ctx.importEntries = Array.from({ length: 6 }, (_, index) => ({
|
||||
type: 'file',
|
||||
projectPath: `/file-${index}.png`,
|
||||
fsPath: `/path/to/file-${index}.png`,
|
||||
}))
|
||||
ctx.FileSystemImportManager.promises.importDir.resetBehavior()
|
||||
ctx.FileSystemImportManager.promises.importDir
|
||||
.withArgs(ctx.topLevelDestination)
|
||||
.resolves(ctx.importEntries)
|
||||
|
||||
const uploadError = new Error('upload failed')
|
||||
const releaseUploadsByName = new Map()
|
||||
let failUpload
|
||||
|
||||
ctx.FileStoreHandler.promises.uploadFileFromDiskWithHistoryId.resetBehavior()
|
||||
ctx.FileStoreHandler.promises.uploadFileFromDiskWithHistoryId.callsFake(
|
||||
async (_projectId, _historyId, fileMeta) => {
|
||||
if (fileMeta.name === 'file-0.png') {
|
||||
await new Promise((_resolve, reject) => {
|
||||
failUpload = () => reject(uploadError)
|
||||
})
|
||||
} else {
|
||||
await new Promise(resolve => {
|
||||
releaseUploadsByName.set(fileMeta.name, resolve)
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
createdBlob: true,
|
||||
fileRef: {
|
||||
_id: new ObjectId(),
|
||||
name: fileMeta.name,
|
||||
},
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
const uploadPromise =
|
||||
ctx.ProjectUploadManager.promises.createProjectFromZipArchiveWithName(
|
||||
ctx.ownerId,
|
||||
ctx.projectName,
|
||||
ctx.zipPath
|
||||
)
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(
|
||||
ctx.FileStoreHandler.promises.uploadFileFromDiskWithHistoryId
|
||||
.callCount
|
||||
).to.equal(5)
|
||||
expect(releaseUploadsByName.size).to.equal(4)
|
||||
expect(failUpload).to.be.a('function')
|
||||
})
|
||||
|
||||
failUpload()
|
||||
|
||||
for (const releaseUpload of releaseUploadsByName.values()) {
|
||||
releaseUpload()
|
||||
}
|
||||
|
||||
await expect(uploadPromise).to.be.rejectedWith('upload failed')
|
||||
expect(
|
||||
ctx.FileStoreHandler.promises.uploadFileFromDiskWithHistoryId.callCount
|
||||
).to.equal(5)
|
||||
})
|
||||
|
||||
it('should preserve doc and file order from the import entries', async function (ctx) {
|
||||
ctx.importEntries = [
|
||||
{
|
||||
type: 'doc',
|
||||
projectPath: '/a.tex',
|
||||
lines: ['a'],
|
||||
},
|
||||
{
|
||||
type: 'file',
|
||||
projectPath: '/b.png',
|
||||
fsPath: '/path/to/b.png',
|
||||
},
|
||||
{
|
||||
type: 'doc',
|
||||
projectPath: '/c.tex',
|
||||
lines: ['c'],
|
||||
},
|
||||
{
|
||||
type: 'file',
|
||||
projectPath: '/d.png',
|
||||
fsPath: '/path/to/d.png',
|
||||
},
|
||||
]
|
||||
ctx.FileSystemImportManager.promises.importDir.resetBehavior()
|
||||
ctx.FileSystemImportManager.promises.importDir
|
||||
.withArgs(ctx.topLevelDestination)
|
||||
.resolves(ctx.importEntries)
|
||||
|
||||
const docNameById = new Map()
|
||||
const releaseByPath = new Map()
|
||||
|
||||
ctx.Doc.resetBehavior()
|
||||
ctx.Doc.callsFake(({ name }) => {
|
||||
const doc = {
|
||||
_id: new ObjectId(),
|
||||
name,
|
||||
}
|
||||
docNameById.set(doc._id.toString(), name)
|
||||
return doc
|
||||
})
|
||||
|
||||
ctx.DocstoreManager.promises.updateDoc.resetBehavior()
|
||||
ctx.DocstoreManager.promises.updateDoc.callsFake(
|
||||
async (_projectId, docId) => {
|
||||
const releasePromise = new Promise(resolve => {
|
||||
releaseByPath.set(docNameById.get(docId), resolve)
|
||||
})
|
||||
await releasePromise
|
||||
return { _id: docId }
|
||||
}
|
||||
)
|
||||
|
||||
ctx.FileStoreHandler.promises.uploadFileFromDiskWithHistoryId.resetBehavior()
|
||||
ctx.FileStoreHandler.promises.uploadFileFromDiskWithHistoryId.callsFake(
|
||||
async (_projectId, _historyId, fileMeta) => {
|
||||
const releasePromise = new Promise(resolve => {
|
||||
releaseByPath.set(fileMeta.name, resolve)
|
||||
})
|
||||
await releasePromise
|
||||
return {
|
||||
createdBlob: true,
|
||||
fileRef: {
|
||||
_id: new ObjectId(),
|
||||
name: fileMeta.name,
|
||||
},
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
const uploadPromise =
|
||||
ctx.ProjectUploadManager.promises.createProjectFromZipArchiveWithName(
|
||||
ctx.ownerId,
|
||||
ctx.projectName,
|
||||
ctx.zipPath
|
||||
)
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(releaseByPath.size).to.equal(4)
|
||||
})
|
||||
|
||||
releaseByPath.get('d.png')()
|
||||
releaseByPath.get('c.tex')()
|
||||
releaseByPath.get('b.png')()
|
||||
releaseByPath.get('a.tex')()
|
||||
|
||||
const { fileEntries, docEntries } = await uploadPromise
|
||||
|
||||
expect(docEntries.map(entry => entry.path)).to.deep.equal([
|
||||
'/a.tex',
|
||||
'/c.tex',
|
||||
])
|
||||
expect(fileEntries.map(entry => entry.path)).to.deep.equal([
|
||||
'/b.png',
|
||||
'/d.png',
|
||||
])
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user