Files
Kristina 577ff63eca [document-updater] batch collaborator lookups in project_notifications.mts (#33974)
* Refactor project collaborator check to handle multiple project IDs and optimize Redis caching
* Batch mongo queries and use promiseMapWithLimit

---------

Co-authored-by: Domagoj Kriskovic <dom.kriskovic@overleaf.com>
GitOrigin-RevId: 7d568b4b05894465c930595ebd3f214ebc5c72c0
2026-05-27 08:07:35 +00:00

419 lines
12 KiB
TypeScript

import Settings from '@overleaf/settings'
import logger from '@overleaf/logger'
import { createClient } from '@overleaf/redis-wrapper'
import { promiseMapWithLimit } from '@overleaf/promise-utils'
import mongodb from '../app/js/mongodb.js'
import Queue from 'bull'
import minimist from 'minimist'
logger.logger.level('fatal')
const argv = minimist(process.argv.slice(2), {
boolean: ['dry-run', 'help'],
alias: {
n: 'dry-run',
h: 'help',
},
default: {
'dry-run': false,
help: false,
},
})
if (argv.help) {
console.log(`
project_notifications.mts - Queue project update notifications
This script scans Redis for projects that have pending notification timestamps and queues
them for notification. It's used to notify project collaborators when changes have been
made to a project. Only projects with collaborators are processed.
Usage: node scripts/project_notifications.mts [options]
Options:
-n, --dry-run Show what would be done without making changes
-h, --help Show this help message
Examples:
# Dry run to see what would be notified
node scripts/project_notifications.mts --dry-run
# Actually queue the notifications
node scripts/project_notifications.mts
`)
process.exit(0)
}
const dryRun = argv['dry-run']
const { db, ObjectId, READ_PREFERENCE_SECONDARY } = mongodb
const docUpdaterKeys = Settings.redis.documentupdater.key_schema
const redisClient = createClient(Settings.redis.documentupdater)
// Define Lua script to safely delete the key only if it matches expected value
redisClient.defineCommand('deleteProjectNotificationTimestamp', {
numberOfKeys: 1,
lua: `
local projectNotificationKey = KEYS[1]
local expectedTimestamp = ARGV[1]
local currentTimestamp = redis.call('GET', projectNotificationKey)
if currentTimestamp and currentTimestamp == expectedTimestamp then
redis.call('DEL', projectNotificationKey)
return 1
end
return 0
`,
})
const queueRedisConfig = {
host: process.env.QUEUES_REDIS_HOST || '127.0.0.1',
port: parseInt(process.env.QUEUES_REDIS_PORT || '6379', 10),
password: process.env.QUEUES_REDIS_PASSWORD,
}
const QUEUE_NAME = 'project-notification'
const MONGO_IN_BATCH_SIZE = 5000
const MONGO_BATCH_CONCURRENCY = 5
const PROGRESS_LOG_INTERVAL_MS = 15_000
const projectNotificationQueue = new Queue(QUEUE_NAME, {
redis: queueRedisConfig,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: { count: 50000, age: 3600 },
attempts: 3,
backoff: {
type: 'exponential',
delay: 3000,
},
},
})
async function main() {
console.time('total')
if (dryRun) {
console.log('[DRY RUN MODE] - No changes will be made')
}
console.log('Scanning for projects that need to be notified...')
const { projects, stats } = await getProjectsToNotify()
console.log(
`Scan complete: scanned=${stats.scanned}, matched=${stats.matched}, skippedNoCollaborators=${stats.skippedNoCollaborators}, skippedNoTimestamp=${stats.skippedNoTimestamp}, skippedInvalidTimestamp=${stats.skippedInvalidTimestamp}, skippedNoProjectId=${stats.skippedNoProjectId}`
)
console.log(
`Collaborator lookups: cacheHitWithCollaborators=${stats.collaboratorCacheHitWithCollaborators}, cacheHitNoCollaborators=${stats.collaboratorCacheHitNoCollaborators}, cacheMissWithCollaborators=${stats.collaboratorCacheMissWithCollaborators}, cacheMissNoCollaborators=${stats.collaboratorCacheMissNoCollaborators}, mongoQueries=${stats.collaboratorMongoQueries}`
)
if (dryRun) {
console.log('\n[DRY RUN] Projects that would be queued:')
for (const { projectId, timestamp } of projects) {
const date = new Date(parseInt(timestamp, 10))
console.log(
` ${projectId}: ${timestamp} (${date.toISOString()}) - would be queued`
)
}
console.timeEnd('total')
return
}
console.log('Waiting for queue to be ready...')
await projectNotificationQueue.isReady()
console.log('Queue is ready.')
let queued = 0
let failed = 0
let deleteMismatches = 0
let lastProgressLog = Date.now()
for (const { projectId, timestamp } of projects) {
const numericTimestamp = parseInt(timestamp, 10)
try {
await projectNotificationQueue.add(
{ projectId, timestamp: numericTimestamp },
{
jobId: projectId,
delay: 1000,
}
)
const deleted = await deleteProjectNotificationTimestamp(
projectId,
timestamp
)
if (!deleted) {
deleteMismatches++
}
queued++
} catch (err) {
failed++
console.error(
`Error scheduling notification for project ${projectId}:`,
err
)
}
if (Date.now() - lastProgressLog >= PROGRESS_LOG_INTERVAL_MS) {
console.log(
`Queue progress: queued=${queued}, failed=${failed} of ${projects.length}`
)
lastProgressLog = Date.now()
}
}
console.log(
`Queue complete: queued=${queued}, failed=${failed}, deleteMismatches=${deleteMismatches}`
)
console.timeEnd('total')
}
/**
* Extract project ID from a ProjectNotificationTimestamp key
* Key format: ProjectNotificationTimestamp:{project_id}
*/
function extractProjectId(key: string): string | undefined {
const matches = key.match(/ProjectNotificationTimestamp:\{(.*?)\}/)
if (matches) {
return matches[1]
}
}
type ProjectNotification = {
projectId: string
timestamp: string
}
/**
* For a batch of project IDs, return the set of those that have collaborators.
* Uses Redis caching with 1-2 hour randomized expiration to avoid repeated MongoDB queries.
* Performs a single mget for cache hits, a single $in find for cache misses,
* and a single pipelined setex to write back the results.
*/
async function getProjectsWithCollaborators(
projectIds: string[],
stats: NotificationStats
): Promise<Set<string>> {
const projectsWithCollaborators = new Set<string>()
if (projectIds.length === 0) return projectsWithCollaborators
const cacheKeys = projectIds.map(id => `ProjectHasCollaborators:{${id}}`)
const cached = await redisClient.mget(cacheKeys)
const projectsNeedingMongoLookup: string[] = []
for (const [i, id] of projectIds.entries()) {
if (cached[i] === '1') {
stats.collaboratorCacheHitWithCollaborators++
projectsWithCollaborators.add(id)
} else if (cached[i] === '0') {
stats.collaboratorCacheHitNoCollaborators++
} else {
projectsNeedingMongoLookup.push(id)
}
}
if (projectsNeedingMongoLookup.length === 0) return projectsWithCollaborators
const batches: string[][] = []
for (
let i = 0;
i < projectsNeedingMongoLookup.length;
i += MONGO_IN_BATCH_SIZE
) {
batches.push(projectsNeedingMongoLookup.slice(i, i + MONGO_IN_BATCH_SIZE))
}
const batchResults = await promiseMapWithLimit(
MONGO_BATCH_CONCURRENCY,
batches,
async (batch: string[]) => {
stats.collaboratorMongoQueries++
return await db.projects
.find(
{
_id: { $in: batch.map(id => new ObjectId(id)) },
$or: [
{ 'collaberator_refs.0': { $exists: true } },
{ 'readOnly_refs.0': { $exists: true } },
{ 'reviewer_refs.0': { $exists: true } },
{ 'tokenAccessReadAndWrite_refs.0': { $exists: true } },
{ 'tokenAccessReadOnly_refs.0': { $exists: true } },
],
},
{ projection: { _id: 1 }, readPreference: READ_PREFERENCE_SECONDARY }
)
.toArray()
}
)
const positives = new Set(
batchResults.flatMap(docs => docs.map(d => d._id.toString()))
)
const pipeline = redisClient.pipeline()
for (const id of projectsNeedingMongoLookup) {
// Use random TTL between 1-2 hours (3600-7200 seconds) to smooth out cache expiration
const ttl = 3600 + Math.floor(Math.random() * 3600)
const hit = positives.has(id)
if (hit) {
stats.collaboratorCacheMissWithCollaborators++
projectsWithCollaborators.add(id)
} else {
stats.collaboratorCacheMissNoCollaborators++
}
pipeline.setex(`ProjectHasCollaborators:{${id}}`, ttl, hit ? '1' : '0')
}
await pipeline.exec()
return projectsWithCollaborators
}
/**
* Scan Redis for all projectNotificationTimestamp keys and return list of projects with timestamps
*/
type NotificationStats = {
scanned: number
matched: number
skippedNoCollaborators: number
skippedNoTimestamp: number
skippedInvalidTimestamp: number
skippedNoProjectId: number
collaboratorCacheHitWithCollaborators: number
collaboratorCacheHitNoCollaborators: number
collaboratorCacheMissWithCollaborators: number
collaboratorCacheMissNoCollaborators: number
collaboratorMongoQueries: number
}
async function getProjectsToNotify(): Promise<{
projects: ProjectNotification[]
stats: NotificationStats
}> {
const nodes = (typeof redisClient.nodes === 'function'
? redisClient.nodes('master')
: undefined) || [redisClient]
const projects: ProjectNotification[] = []
const stats: NotificationStats = {
scanned: 0,
matched: 0,
skippedNoCollaborators: 0,
skippedNoTimestamp: 0,
skippedInvalidTimestamp: 0,
skippedNoProjectId: 0,
collaboratorCacheHitWithCollaborators: 0,
collaboratorCacheHitNoCollaborators: 0,
collaboratorCacheMissWithCollaborators: 0,
collaboratorCacheMissNoCollaborators: 0,
collaboratorMongoQueries: 0,
}
let lastProgressLog = Date.now()
console.time('redis-scan')
try {
for (const node of nodes) {
const stream = node.scanStream({
match: docUpdaterKeys.projectNotificationTimestamp({ project_id: '*' }),
count: 1000,
})
for await (const keys of stream) {
if (keys.length === 0) {
continue
}
const timestamps = await redisClient.mget(keys)
// Extract valid (projectId, timestamp) pairs from this batch
const candidates: ProjectNotification[] = []
for (const [index, key] of keys.entries()) {
stats.scanned++
const projectId = extractProjectId(key as string)
const timestamp = timestamps[index]
if (!projectId) {
stats.skippedNoProjectId++
console.error('Could not extract project ID from key:', key)
continue
}
if (!timestamp) {
stats.skippedNoTimestamp++
console.error(`No timestamp found for key: ${key}`)
continue
}
const numericTimestamp = parseInt(timestamp, 10)
if (Number.isNaN(numericTimestamp)) {
stats.skippedInvalidTimestamp++
console.error(
`Non-numeric timestamp for project ${projectId}: ${timestamp}`
)
continue
}
candidates.push({ projectId, timestamp })
}
// Bulk-check collaborators for the whole batch
const projectsWithCollaborators = await getProjectsWithCollaborators(
candidates.map(c => c.projectId),
stats
)
for (const c of candidates) {
if (!projectsWithCollaborators.has(c.projectId)) {
stats.skippedNoCollaborators++
continue
}
stats.matched++
projects.push(c)
}
if (Date.now() - lastProgressLog >= PROGRESS_LOG_INTERVAL_MS) {
console.log(
`Scan progress: scanned=${stats.scanned}, matched=${stats.matched}, skipped=${stats.scanned - stats.matched}`
)
lastProgressLog = Date.now()
}
}
}
} finally {
console.timeEnd('redis-scan')
}
return { projects, stats }
}
/**
* Delete the projectNotificationTimestamp key for a project
* Only deletes if the timestamp matches the expected value to avoid race conditions
*/
async function deleteProjectNotificationTimestamp(
projectId: string,
expectedTimestamp: string
): Promise<boolean> {
const key = docUpdaterKeys.projectNotificationTimestamp({
project_id: projectId,
})
const deleted = await redisClient.deleteProjectNotificationTimestamp(
key,
expectedTimestamp
)
return deleted === 1
}
main()
.then(() => {
process.exit(0)
})
.catch(error => {
console.error('Error scanning for project notifications:', error)
console.timeEnd('total')
process.exit(1)
})
.finally(async () => {
// Close the Bull queue connection
await projectNotificationQueue.close()
})