From 78481e010e0dcd8b2578b51dc74168da28e24f4e Mon Sep 17 00:00:00 2001 From: Andrew Rumble Date: Thu, 13 Mar 2025 17:50:39 +0000 Subject: [PATCH] Add verification looper and handle shutdown signals Shutdown signals become more relevant now that we are looping as we want to gracefully stop processing records rather than continue looping. GitOrigin-RevId: dbb499388c86d552d77954988f8fc27d140da3f1 --- services/history-v1/backup-verifier-app.mjs | 30 +++++++- .../backupVerifier/ProjectVerifier.mjs | 70 ++++++++++++++++--- .../scripts/verify_sampled_projects.mjs | 30 ++++++-- 3 files changed, 111 insertions(+), 19 deletions(-) diff --git a/services/history-v1/backup-verifier-app.mjs b/services/history-v1/backup-verifier-app.mjs index febbb99b71..099eaef29b 100644 --- a/services/history-v1/backup-verifier-app.mjs +++ b/services/history-v1/backup-verifier-app.mjs @@ -17,6 +17,11 @@ import { mongodb } from './storage/index.js' import { expressify } from '@overleaf/promise-utils' import { Blob } from 'overleaf-editor-core' import { loadGlobalBlobs } from './storage/lib/blob_store/index.js' +import { EventEmitter } from 'node:events' +import { + loopRandomProjects, + setWriteMetrics, +} from './backupVerifier/ProjectVerifier.mjs' const app = express() @@ -49,6 +54,7 @@ app.get( ) app.get('/status', (req, res) => { + logger.info({}, 'status check') res.send('history-v1-backup-verifier is up') }) @@ -66,6 +72,23 @@ app.use((err, req, res, next) => { next(err) }) +const shutdownEmitter = new EventEmitter() + +shutdownEmitter.once('shutdown', async code => { + logger.info({}, 'shutting down') + await mongodb.client.close() + await setTimeout(100) + process.exit(code) +}) + +process.on('SIGTERM', () => { + shutdownEmitter.emit('shutdown', 0) +}) + +process.on('SIGINT', () => { + shutdownEmitter.emit('shutdown', 0) +}) + /** * @param {number} port * @return {Promise} @@ -76,18 +99,19 @@ export async function startApp(port) { await healthCheck() const server = http.createServer(app) await promisify(server.listen.bind(server, port))() + loopRandomProjects(shutdownEmitter) return server } +setWriteMetrics(true) + // Run this if we're called directly if (process.argv[1] === fileURLToPath(import.meta.url)) { const PORT = parseInt(process.env.PORT || '3102', 10) try { await startApp(PORT) } catch (error) { + shutdownEmitter.emit('shutdown', 1) logger.error({ error }, 'error starting app') - await mongodb.client.close() - await setTimeout(100) - process.exit(1) } } diff --git a/services/history-v1/backupVerifier/ProjectVerifier.mjs b/services/history-v1/backupVerifier/ProjectVerifier.mjs index 7edec38b01..bf1ca00a13 100644 --- a/services/history-v1/backupVerifier/ProjectVerifier.mjs +++ b/services/history-v1/backupVerifier/ProjectVerifier.mjs @@ -9,6 +9,7 @@ import { getProjectsUpdatedInDateRangeCursor, } from './ProjectSampler.mjs' import OError from '@overleaf/o-error' +import { setTimeout } from 'node:timers/promises' const MS_PER_30_DAYS = 30 * 24 * 60 * 60 * 1000 @@ -20,6 +21,10 @@ const METRICS = { let WRITE_METRICS = false +/** + * @typedef {import('node:events').EventEmitter} EventEmitter + */ + /** * Allows writing metrics to be enabled or disabled. * @param {Boolean} writeMetrics @@ -65,14 +70,24 @@ function splitJobs(startDate, endDate, interval) { /** * - * @param historyIdCursor + * @param {AsyncGenerator} historyIdCursor + * @param {EventEmitter} [eventEmitter] * @return {Promise<{verified: number, total: number, errorTypes: *[], hasFailure: boolean}>} */ -async function verifyProjectsFromCursor(historyIdCursor) { +async function verifyProjectsFromCursor(historyIdCursor, eventEmitter) { const errorTypes = [] let verified = 0 let total = 0 + let receivedShutdownSignal = false + if (eventEmitter) { + eventEmitter.once('shutdown', () => { + receivedShutdownSignal = true + }) + } for await (const historyId of historyIdCursor) { + if (receivedShutdownSignal) { + break + } total++ try { await verifyProjectWithErrorContext(historyId) @@ -81,7 +96,8 @@ async function verifyProjectsFromCursor(historyIdCursor) { metrics.inc(METRICS.backup_project_verification_succeeded) verified++ } catch (error) { - errorTypes.push(handleVerificationError(error, historyId)) + const errorType = handleVerificationError(error, historyId) + errorTypes.push(errorType) } } return { @@ -95,11 +111,12 @@ async function verifyProjectsFromCursor(historyIdCursor) { /** * * @param {number} nProjectsToSample + * @param {EventEmitter} [signal] * @return {Promise} */ -export async function verifyRandomProjectSample(nProjectsToSample) { +export async function verifyRandomProjectSample(nProjectsToSample, signal) { const historyIds = await getSampleProjectsCursor(nProjectsToSample) - return await verifyProjectsFromCursor(historyIds) + return await verifyProjectsFromCursor(historyIds, signal) } /** @@ -108,13 +125,15 @@ export async function verifyRandomProjectSample(nProjectsToSample) { * @param {Date} startDate * @param {Date} endDate * @param {number} projectsPerRange + * @param {EventEmitter} [signal] * @return {Promise} */ -async function verifyRange(startDate, endDate, projectsPerRange) { +async function verifyRange(startDate, endDate, projectsPerRange, signal) { logger.info({ startDate, endDate }, 'verifying range') const results = await verifyProjectsFromCursor( - getProjectsCreatedInDateRangeCursor(startDate, endDate, projectsPerRange) + getProjectsCreatedInDateRangeCursor(startDate, endDate, projectsPerRange), + signal ) if (results.total === 0) { @@ -154,6 +173,7 @@ async function verifyRange(startDate, endDate, projectsPerRange) { * @property {number} [interval] * @property {number} [projectsPerRange] * @property {number} [concurrency] + * @property {EventEmitter} [signal] */ /** @@ -167,6 +187,7 @@ export async function verifyProjectsCreatedInDateRange({ startDate, endDate, interval = MS_PER_30_DAYS, + signal, }) { const jobs = splitJobs(startDate, endDate, interval) if (jobs.length === 0) { @@ -180,7 +201,7 @@ export async function verifyProjectsCreatedInDateRange({ concurrency, jobs, ({ startDate, endDate }) => - verifyRange(startDate, endDate, projectsPerRange) + verifyRange(startDate, endDate, projectsPerRange, signal) ) return settlements.reduce( /** @@ -220,19 +241,22 @@ export async function verifyProjectsCreatedInDateRange({ * @param {Date} startDate * @param {Date} endDate * @param {number} nProjects + * @param {EventEmitter} [signal] * @return {Promise} */ export async function verifyProjectsUpdatedInDateRange( startDate, endDate, - nProjects + nProjects, + signal ) { logger.debug( { startDate, endDate, nProjects }, 'Sampling projects updated in date range' ) const results = await verifyProjectsFromCursor( - getProjectsUpdatedInDateRangeCursor(startDate, endDate, nProjects) + getProjectsUpdatedInDateRangeCursor(startDate, endDate, nProjects), + signal ) if (results.total === 0) { @@ -254,3 +278,29 @@ export async function verifyProjectsUpdatedInDateRange( ) return jobStatus } + +/** + * + * @param {EventEmitter} signal + * @return {void} + */ +export function loopRandomProjects(signal) { + let shutdown = false + signal.on('shutdown', function () { + shutdown = true + }) + async function loop() { + do { + try { + const result = await verifyRandomProjectSample(100, signal) + logger.debug({ result }, 'verified random project sample') + } catch (error) { + logger.error({ error }, 'error verifying random project sample') + } + + await setTimeout(300_000) + // eslint-disable-next-line no-unmodified-loop-condition + } while (!shutdown) + } + loop() +} diff --git a/services/history-v1/storage/scripts/verify_sampled_projects.mjs b/services/history-v1/storage/scripts/verify_sampled_projects.mjs index afc739b2fd..e5b2d0c347 100644 --- a/services/history-v1/storage/scripts/verify_sampled_projects.mjs +++ b/services/history-v1/storage/scripts/verify_sampled_projects.mjs @@ -12,6 +12,8 @@ import { setTimeout } from 'node:timers/promises' import logger from '@overleaf/logger' import { loadGlobalBlobs } from '../lib/blob_store/index.js' import { getDatesBeforeRPO } from '../../backupVerifier/utils.mjs' +import { EventEmitter } from 'node:events' +import { mongodb } from '../index.js' logger.logger.level('fatal') @@ -39,7 +41,7 @@ const STATS = { /** * @typedef {Object} CLIOptions - * @property {() => Promise} projectVerifier + * @property {(signal: EventEmitter) => Promise} projectVerifier * @property {boolean} verbose */ @@ -88,17 +90,18 @@ function getOptions() { console.log('Verifying random projects') return { verbose, - projectVerifier: () => verifyRandomProjectSample(nProjects), + projectVerifier: signal => verifyRandomProjectSample(nProjects, signal), } case 'recent': return { verbose, - projectVerifier: async () => { + projectVerifier: async signal => { const { startDate, endDate } = getDatesBeforeRPO(3 * 3600) return await verifyProjectsUpdatedInDateRange( startDate, endDate, - nProjects + nProjects, + signal ) }, } @@ -122,12 +125,13 @@ function getOptions() { } STATS.ranges = 0 return { - projectVerifier: () => + projectVerifier: signal => verifyProjectsCreatedInDateRange({ startDate: new Date(start), endDate: new Date(end), projectsPerRange: nProjects, concurrency, + signal, }), verbose, } @@ -181,10 +185,24 @@ function displayStats(stats) { } } +const shutdownEmitter = new EventEmitter() + +shutdownEmitter.on('shutdown', async () => { + await gracefulShutdown() +}) + +process.on('SIGTERM', () => { + shutdownEmitter.emit('shutdown') +}) + +process.on('SIGINT', () => { + shutdownEmitter.emit('shutdown') +}) + await loadGlobalBlobs() try { - const stats = await projectVerifier() + const stats = await projectVerifier(shutdownEmitter) displayStats(stats) console.log(`completed`) } catch (error) {