[document-updater] batch collaborator lookups in project_notifications.mts (#33974)
* Refactor project collaborator check to handle multiple project IDs and optimize Redis caching * Batch mongo queries and use promiseMapWithLimit --------- Co-authored-by: Domagoj Kriskovic <dom.kriskovic@overleaf.com> GitOrigin-RevId: 7d568b4b05894465c930595ebd3f214ebc5c72c0
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import Settings from '@overleaf/settings'
|
||||
import logger from '@overleaf/logger'
|
||||
import { createClient } from '@overleaf/redis-wrapper'
|
||||
import { promiseMapWithLimit } from '@overleaf/promise-utils'
|
||||
import mongodb from '../app/js/mongodb.js'
|
||||
import Queue from 'bull'
|
||||
import minimist from 'minimist'
|
||||
@@ -71,6 +72,8 @@ const queueRedisConfig = {
|
||||
password: process.env.QUEUES_REDIS_PASSWORD,
|
||||
}
|
||||
const QUEUE_NAME = 'project-notification'
|
||||
const MONGO_IN_BATCH_SIZE = 5000
|
||||
const MONGO_BATCH_CONCURRENCY = 5
|
||||
const PROGRESS_LOG_INTERVAL_MS = 15_000
|
||||
|
||||
const projectNotificationQueue = new Queue(QUEUE_NAME, {
|
||||
@@ -99,7 +102,7 @@ async function main() {
|
||||
`Scan complete: scanned=${stats.scanned}, matched=${stats.matched}, skippedNoCollaborators=${stats.skippedNoCollaborators}, skippedNoTimestamp=${stats.skippedNoTimestamp}, skippedInvalidTimestamp=${stats.skippedInvalidTimestamp}, skippedNoProjectId=${stats.skippedNoProjectId}`
|
||||
)
|
||||
console.log(
|
||||
`Collaborator lookups: cacheHitWithCollaborators=${stats.collaboratorCacheHitWithCollaborators}, cacheHitNoCollaborators=${stats.collaboratorCacheHitNoCollaborators}, cacheMissWithCollaborators=${stats.collaboratorCacheMissWithCollaborators}, cacheMissNoCollaborators=${stats.collaboratorCacheMissNoCollaborators}`
|
||||
`Collaborator lookups: cacheHitWithCollaborators=${stats.collaboratorCacheHitWithCollaborators}, cacheHitNoCollaborators=${stats.collaboratorCacheHitNoCollaborators}, cacheMissWithCollaborators=${stats.collaboratorCacheMissWithCollaborators}, cacheMissNoCollaborators=${stats.collaboratorCacheMissNoCollaborators}, mongoQueries=${stats.collaboratorMongoQueries}`
|
||||
)
|
||||
|
||||
if (dryRun) {
|
||||
@@ -182,55 +185,87 @@ type ProjectNotification = {
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a project has any collaborators (excluding owner)
|
||||
* Uses Redis caching with 1-2 hour randomized expiration to avoid repeated MongoDB queries
|
||||
* For a batch of project IDs, return the set of those that have collaborators.
|
||||
* Uses Redis caching with 1-2 hour randomized expiration to avoid repeated MongoDB queries.
|
||||
* Performs a single mget for cache hits, a single $in find for cache misses,
|
||||
* and a single pipelined setex to write back the results.
|
||||
*/
|
||||
async function projectHasCollaborators(
|
||||
projectId: string,
|
||||
async function getProjectsWithCollaborators(
|
||||
projectIds: string[],
|
||||
stats: NotificationStats
|
||||
): Promise<boolean> {
|
||||
// Check Redis cache first
|
||||
const cacheKey = `ProjectHasCollaborators:{${projectId}}`
|
||||
const cachedResult = await redisClient.get(cacheKey)
|
||||
): Promise<Set<string>> {
|
||||
const projectsWithCollaborators = new Set<string>()
|
||||
if (projectIds.length === 0) return projectsWithCollaborators
|
||||
|
||||
if (cachedResult !== null) {
|
||||
if (cachedResult === '1') {
|
||||
const cacheKeys = projectIds.map(id => `ProjectHasCollaborators:{${id}}`)
|
||||
const cached = await redisClient.mget(cacheKeys)
|
||||
|
||||
const projectsNeedingMongoLookup: string[] = []
|
||||
for (const [i, id] of projectIds.entries()) {
|
||||
if (cached[i] === '1') {
|
||||
stats.collaboratorCacheHitWithCollaborators++
|
||||
} else {
|
||||
projectsWithCollaborators.add(id)
|
||||
} else if (cached[i] === '0') {
|
||||
stats.collaboratorCacheHitNoCollaborators++
|
||||
} else {
|
||||
projectsNeedingMongoLookup.push(id)
|
||||
}
|
||||
return cachedResult === '1'
|
||||
}
|
||||
|
||||
// Cache miss - query MongoDB
|
||||
const hasCollaborators = await db.projects.findOne(
|
||||
{
|
||||
_id: new ObjectId(projectId),
|
||||
$or: [
|
||||
{ 'collaberator_refs.0': { $exists: true } }, // check that first element in array exists
|
||||
{ 'readOnly_refs.0': { $exists: true } },
|
||||
{ 'reviewer_refs.0': { $exists: true } },
|
||||
{ 'tokenAccessReadAndWrite_refs.0': { $exists: true } },
|
||||
{ 'tokenAccessReadOnly_refs.0': { $exists: true } },
|
||||
],
|
||||
},
|
||||
{ projection: { _id: 1 }, readPreference: READ_PREFERENCE_SECONDARY }
|
||||
if (projectsNeedingMongoLookup.length === 0) return projectsWithCollaborators
|
||||
|
||||
const batches: string[][] = []
|
||||
for (
|
||||
let i = 0;
|
||||
i < projectsNeedingMongoLookup.length;
|
||||
i += MONGO_IN_BATCH_SIZE
|
||||
) {
|
||||
batches.push(projectsNeedingMongoLookup.slice(i, i + MONGO_IN_BATCH_SIZE))
|
||||
}
|
||||
|
||||
const batchResults = await promiseMapWithLimit(
|
||||
MONGO_BATCH_CONCURRENCY,
|
||||
batches,
|
||||
async (batch: string[]) => {
|
||||
stats.collaboratorMongoQueries++
|
||||
return await db.projects
|
||||
.find(
|
||||
{
|
||||
_id: { $in: batch.map(id => new ObjectId(id)) },
|
||||
$or: [
|
||||
{ 'collaberator_refs.0': { $exists: true } },
|
||||
{ 'readOnly_refs.0': { $exists: true } },
|
||||
{ 'reviewer_refs.0': { $exists: true } },
|
||||
{ 'tokenAccessReadAndWrite_refs.0': { $exists: true } },
|
||||
{ 'tokenAccessReadOnly_refs.0': { $exists: true } },
|
||||
],
|
||||
},
|
||||
{ projection: { _id: 1 }, readPreference: READ_PREFERENCE_SECONDARY }
|
||||
)
|
||||
.toArray()
|
||||
}
|
||||
)
|
||||
|
||||
// Use random TTL between 1-2 hours (3600-7200 seconds) to smooth out cache expiration
|
||||
const randomTTL = 3600 + Math.floor(Math.random() * 3600)
|
||||
const positives = new Set(
|
||||
batchResults.flatMap(docs => docs.map(d => d._id.toString()))
|
||||
)
|
||||
|
||||
if (hasCollaborators === null) {
|
||||
// Cache negative result (no collaborators or project not found)
|
||||
stats.collaboratorCacheMissNoCollaborators++
|
||||
await redisClient.setex(cacheKey, randomTTL, '0')
|
||||
return false
|
||||
const pipeline = redisClient.pipeline()
|
||||
for (const id of projectsNeedingMongoLookup) {
|
||||
// Use random TTL between 1-2 hours (3600-7200 seconds) to smooth out cache expiration
|
||||
const ttl = 3600 + Math.floor(Math.random() * 3600)
|
||||
const hit = positives.has(id)
|
||||
if (hit) {
|
||||
stats.collaboratorCacheMissWithCollaborators++
|
||||
projectsWithCollaborators.add(id)
|
||||
} else {
|
||||
stats.collaboratorCacheMissNoCollaborators++
|
||||
}
|
||||
pipeline.setex(`ProjectHasCollaborators:{${id}}`, ttl, hit ? '1' : '0')
|
||||
}
|
||||
await pipeline.exec()
|
||||
|
||||
// Cache the result in Redis
|
||||
stats.collaboratorCacheMissWithCollaborators++
|
||||
await redisClient.setex(cacheKey, randomTTL, hasCollaborators ? '1' : '0')
|
||||
return true
|
||||
return projectsWithCollaborators
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -247,6 +282,7 @@ type NotificationStats = {
|
||||
collaboratorCacheHitNoCollaborators: number
|
||||
collaboratorCacheMissWithCollaborators: number
|
||||
collaboratorCacheMissNoCollaborators: number
|
||||
collaboratorMongoQueries: number
|
||||
}
|
||||
|
||||
async function getProjectsToNotify(): Promise<{
|
||||
@@ -269,6 +305,7 @@ async function getProjectsToNotify(): Promise<{
|
||||
collaboratorCacheHitNoCollaborators: 0,
|
||||
collaboratorCacheMissWithCollaborators: 0,
|
||||
collaboratorCacheMissNoCollaborators: 0,
|
||||
collaboratorMongoQueries: 0,
|
||||
}
|
||||
let lastProgressLog = Date.now()
|
||||
|
||||
@@ -287,6 +324,8 @@ async function getProjectsToNotify(): Promise<{
|
||||
|
||||
const timestamps = await redisClient.mget(keys)
|
||||
|
||||
// Extract valid (projectId, timestamp) pairs from this batch
|
||||
const candidates: ProjectNotification[] = []
|
||||
for (const [index, key] of keys.entries()) {
|
||||
stats.scanned++
|
||||
const projectId = extractProjectId(key as string)
|
||||
@@ -304,15 +343,6 @@ async function getProjectsToNotify(): Promise<{
|
||||
continue
|
||||
}
|
||||
|
||||
const hasCollaborators = await projectHasCollaborators(
|
||||
projectId,
|
||||
stats
|
||||
)
|
||||
if (!hasCollaborators) {
|
||||
stats.skippedNoCollaborators++
|
||||
continue
|
||||
}
|
||||
|
||||
const numericTimestamp = parseInt(timestamp, 10)
|
||||
if (Number.isNaN(numericTimestamp)) {
|
||||
stats.skippedInvalidTimestamp++
|
||||
@@ -322,8 +352,22 @@ async function getProjectsToNotify(): Promise<{
|
||||
continue
|
||||
}
|
||||
|
||||
candidates.push({ projectId, timestamp })
|
||||
}
|
||||
|
||||
// Bulk-check collaborators for the whole batch
|
||||
const projectsWithCollaborators = await getProjectsWithCollaborators(
|
||||
candidates.map(c => c.projectId),
|
||||
stats
|
||||
)
|
||||
|
||||
for (const c of candidates) {
|
||||
if (!projectsWithCollaborators.has(c.projectId)) {
|
||||
stats.skippedNoCollaborators++
|
||||
continue
|
||||
}
|
||||
stats.matched++
|
||||
projects.push({ projectId, timestamp })
|
||||
projects.push(c)
|
||||
}
|
||||
|
||||
if (Date.now() - lastProgressLog >= PROGRESS_LOG_INTERVAL_MS) {
|
||||
|
||||
Reference in New Issue
Block a user