diff --git a/services/document-updater/scripts/project_notifications.mts b/services/document-updater/scripts/project_notifications.mts index 65f5aedfbd..46e6b0f5fc 100644 --- a/services/document-updater/scripts/project_notifications.mts +++ b/services/document-updater/scripts/project_notifications.mts @@ -1,6 +1,7 @@ import Settings from '@overleaf/settings' import logger from '@overleaf/logger' import { createClient } from '@overleaf/redis-wrapper' +import { promiseMapWithLimit } from '@overleaf/promise-utils' import mongodb from '../app/js/mongodb.js' import Queue from 'bull' import minimist from 'minimist' @@ -71,6 +72,8 @@ const queueRedisConfig = { password: process.env.QUEUES_REDIS_PASSWORD, } const QUEUE_NAME = 'project-notification' +const MONGO_IN_BATCH_SIZE = 5000 +const MONGO_BATCH_CONCURRENCY = 5 const PROGRESS_LOG_INTERVAL_MS = 15_000 const projectNotificationQueue = new Queue(QUEUE_NAME, { @@ -99,7 +102,7 @@ async function main() { `Scan complete: scanned=${stats.scanned}, matched=${stats.matched}, skippedNoCollaborators=${stats.skippedNoCollaborators}, skippedNoTimestamp=${stats.skippedNoTimestamp}, skippedInvalidTimestamp=${stats.skippedInvalidTimestamp}, skippedNoProjectId=${stats.skippedNoProjectId}` ) console.log( - `Collaborator lookups: cacheHitWithCollaborators=${stats.collaboratorCacheHitWithCollaborators}, cacheHitNoCollaborators=${stats.collaboratorCacheHitNoCollaborators}, cacheMissWithCollaborators=${stats.collaboratorCacheMissWithCollaborators}, cacheMissNoCollaborators=${stats.collaboratorCacheMissNoCollaborators}` + `Collaborator lookups: cacheHitWithCollaborators=${stats.collaboratorCacheHitWithCollaborators}, cacheHitNoCollaborators=${stats.collaboratorCacheHitNoCollaborators}, cacheMissWithCollaborators=${stats.collaboratorCacheMissWithCollaborators}, cacheMissNoCollaborators=${stats.collaboratorCacheMissNoCollaborators}, mongoQueries=${stats.collaboratorMongoQueries}` ) if (dryRun) { @@ -182,55 +185,87 @@ type ProjectNotification = { } /** - * Check if a project has any collaborators (excluding owner) - * Uses Redis caching with 1-2 hour randomized expiration to avoid repeated MongoDB queries + * For a batch of project IDs, return the set of those that have collaborators. + * Uses Redis caching with 1-2 hour randomized expiration to avoid repeated MongoDB queries. + * Performs a single mget for cache hits, a single $in find for cache misses, + * and a single pipelined setex to write back the results. */ -async function projectHasCollaborators( - projectId: string, +async function getProjectsWithCollaborators( + projectIds: string[], stats: NotificationStats -): Promise { - // Check Redis cache first - const cacheKey = `ProjectHasCollaborators:{${projectId}}` - const cachedResult = await redisClient.get(cacheKey) +): Promise> { + const projectsWithCollaborators = new Set() + if (projectIds.length === 0) return projectsWithCollaborators - if (cachedResult !== null) { - if (cachedResult === '1') { + const cacheKeys = projectIds.map(id => `ProjectHasCollaborators:{${id}}`) + const cached = await redisClient.mget(cacheKeys) + + const projectsNeedingMongoLookup: string[] = [] + for (const [i, id] of projectIds.entries()) { + if (cached[i] === '1') { stats.collaboratorCacheHitWithCollaborators++ - } else { + projectsWithCollaborators.add(id) + } else if (cached[i] === '0') { stats.collaboratorCacheHitNoCollaborators++ + } else { + projectsNeedingMongoLookup.push(id) } - return cachedResult === '1' } - // Cache miss - query MongoDB - const hasCollaborators = await db.projects.findOne( - { - _id: new ObjectId(projectId), - $or: [ - { 'collaberator_refs.0': { $exists: true } }, // check that first element in array exists - { 'readOnly_refs.0': { $exists: true } }, - { 'reviewer_refs.0': { $exists: true } }, - { 'tokenAccessReadAndWrite_refs.0': { $exists: true } }, - { 'tokenAccessReadOnly_refs.0': { $exists: true } }, - ], - }, - { projection: { _id: 1 }, readPreference: READ_PREFERENCE_SECONDARY } + if (projectsNeedingMongoLookup.length === 0) return projectsWithCollaborators + + const batches: string[][] = [] + for ( + let i = 0; + i < projectsNeedingMongoLookup.length; + i += MONGO_IN_BATCH_SIZE + ) { + batches.push(projectsNeedingMongoLookup.slice(i, i + MONGO_IN_BATCH_SIZE)) + } + + const batchResults = await promiseMapWithLimit( + MONGO_BATCH_CONCURRENCY, + batches, + async (batch: string[]) => { + stats.collaboratorMongoQueries++ + return await db.projects + .find( + { + _id: { $in: batch.map(id => new ObjectId(id)) }, + $or: [ + { 'collaberator_refs.0': { $exists: true } }, + { 'readOnly_refs.0': { $exists: true } }, + { 'reviewer_refs.0': { $exists: true } }, + { 'tokenAccessReadAndWrite_refs.0': { $exists: true } }, + { 'tokenAccessReadOnly_refs.0': { $exists: true } }, + ], + }, + { projection: { _id: 1 }, readPreference: READ_PREFERENCE_SECONDARY } + ) + .toArray() + } ) - // Use random TTL between 1-2 hours (3600-7200 seconds) to smooth out cache expiration - const randomTTL = 3600 + Math.floor(Math.random() * 3600) + const positives = new Set( + batchResults.flatMap(docs => docs.map(d => d._id.toString())) + ) - if (hasCollaborators === null) { - // Cache negative result (no collaborators or project not found) - stats.collaboratorCacheMissNoCollaborators++ - await redisClient.setex(cacheKey, randomTTL, '0') - return false + const pipeline = redisClient.pipeline() + for (const id of projectsNeedingMongoLookup) { + // Use random TTL between 1-2 hours (3600-7200 seconds) to smooth out cache expiration + const ttl = 3600 + Math.floor(Math.random() * 3600) + const hit = positives.has(id) + if (hit) { + stats.collaboratorCacheMissWithCollaborators++ + projectsWithCollaborators.add(id) + } else { + stats.collaboratorCacheMissNoCollaborators++ + } + pipeline.setex(`ProjectHasCollaborators:{${id}}`, ttl, hit ? '1' : '0') } + await pipeline.exec() - // Cache the result in Redis - stats.collaboratorCacheMissWithCollaborators++ - await redisClient.setex(cacheKey, randomTTL, hasCollaborators ? '1' : '0') - return true + return projectsWithCollaborators } /** @@ -247,6 +282,7 @@ type NotificationStats = { collaboratorCacheHitNoCollaborators: number collaboratorCacheMissWithCollaborators: number collaboratorCacheMissNoCollaborators: number + collaboratorMongoQueries: number } async function getProjectsToNotify(): Promise<{ @@ -269,6 +305,7 @@ async function getProjectsToNotify(): Promise<{ collaboratorCacheHitNoCollaborators: 0, collaboratorCacheMissWithCollaborators: 0, collaboratorCacheMissNoCollaborators: 0, + collaboratorMongoQueries: 0, } let lastProgressLog = Date.now() @@ -287,6 +324,8 @@ async function getProjectsToNotify(): Promise<{ const timestamps = await redisClient.mget(keys) + // Extract valid (projectId, timestamp) pairs from this batch + const candidates: ProjectNotification[] = [] for (const [index, key] of keys.entries()) { stats.scanned++ const projectId = extractProjectId(key as string) @@ -304,15 +343,6 @@ async function getProjectsToNotify(): Promise<{ continue } - const hasCollaborators = await projectHasCollaborators( - projectId, - stats - ) - if (!hasCollaborators) { - stats.skippedNoCollaborators++ - continue - } - const numericTimestamp = parseInt(timestamp, 10) if (Number.isNaN(numericTimestamp)) { stats.skippedInvalidTimestamp++ @@ -322,8 +352,22 @@ async function getProjectsToNotify(): Promise<{ continue } + candidates.push({ projectId, timestamp }) + } + + // Bulk-check collaborators for the whole batch + const projectsWithCollaborators = await getProjectsWithCollaborators( + candidates.map(c => c.projectId), + stats + ) + + for (const c of candidates) { + if (!projectsWithCollaborators.has(c.projectId)) { + stats.skippedNoCollaborators++ + continue + } stats.matched++ - projects.push({ projectId, timestamp }) + projects.push(c) } if (Date.now() - lastProgressLog >= PROGRESS_LOG_INTERVAL_MS) {