diff --git a/src/server/Master.ts b/src/server/Master.ts index d94fa3c65..fd3fdbb22 100644 --- a/src/server/Master.ts +++ b/src/server/Master.ts @@ -12,6 +12,7 @@ import { GameInfo } from "../core/Schemas"; import { generateID } from "../core/Util"; import { logger } from "./Logger"; import { MapPlaylist } from "./MapPlaylist"; +import { startPolling } from "./PollingLoop"; import { renderHtml } from "./RenderHtml"; const config = getServerConfigFromServer(); @@ -176,15 +177,12 @@ export async function startMaster() { }); }; - setInterval( - () => - fetchLobbies().then((lobbies) => { - if (lobbies === 0) { - scheduleLobbies(); - } - }), - 100, - ); + startPolling(async () => { + const lobbies = await fetchLobbies(); + if (lobbies === 0) { + scheduleLobbies(); + } + }, 100); } } }); diff --git a/src/server/PollingLoop.ts b/src/server/PollingLoop.ts new file mode 100644 index 000000000..1869a324f --- /dev/null +++ b/src/server/PollingLoop.ts @@ -0,0 +1,24 @@ +import { logger } from "./Logger"; + +const log = logger.child({ comp: "polling" }); + +/** + * Starts a polling loop that executes the given async task effectively recursively using setTimeout. + * This guarantees that the next execution only starts after the previous one has completed (or failed), + * preventing request pile-ups. + * + * @param task The async function to execute. + * @param intervalMs The delay in milliseconds before the next execution. + */ +export function startPolling(task: () => Promise, intervalMs: number) { + const runLoop = () => { + task() + .catch((error) => { + log.error("Error in polling loop:", error); + }) + .finally(() => { + setTimeout(runLoop, intervalMs); + }); + }; + runLoop(); +} diff --git a/src/server/PrivilegeRefresher.ts b/src/server/PrivilegeRefresher.ts index 89bdcb1ac..030da9621 100644 --- a/src/server/PrivilegeRefresher.ts +++ b/src/server/PrivilegeRefresher.ts @@ -1,6 +1,7 @@ import { base64url } from "jose"; import { Logger } from "winston"; import { CosmeticsSchema } from "../core/CosmeticSchemas"; +import { startPolling } from "./PollingLoop"; import { FailOpenPrivilegeChecker, PrivilegeChecker, @@ -28,12 +29,7 @@ export class PrivilegeRefresher { this.log.info( `Starting privilege refresher with interval ${this.refreshInterval}`, ); - // Add some jitter to the initial load and the interval. - setTimeout(() => this.loadPrivilegeChecker(), Math.random() * 1000); - setInterval( - () => this.loadPrivilegeChecker(), - this.refreshInterval + Math.random() * 1000, - ); + startPolling(() => this.loadPrivilegeChecker(), this.refreshInterval); } public get(): PrivilegeChecker { diff --git a/src/server/Worker.ts b/src/server/Worker.ts index 97a9706c7..32b2eff6e 100644 --- a/src/server/Worker.ts +++ b/src/server/Worker.ts @@ -27,6 +27,7 @@ import { logger } from "./Logger"; import { GameEnv } from "../core/configuration/Config"; import { MapPlaylist } from "./MapPlaylist"; +import { startPolling } from "./PollingLoop"; import { PrivilegeRefresher } from "./PrivilegeRefresher"; import { verifyTurnstileToken } from "./Turnstile"; import { initWorkerMetrics } from "./WorkerMetrics"; @@ -43,7 +44,7 @@ export async function startWorker() { setTimeout( () => { - pollLobby(gm); + startMatchmakingPolling(gm); }, 1000 + Math.random() * 2000, ); @@ -483,63 +484,61 @@ export async function startWorker() { }); } -async function pollLobby(gm: GameManager) { - try { - const url = `${config.jwtIssuer() + "/matchmaking/checkin"}`; - const gameId = generateGameIdForWorker(); - if (gameId === null) { - log.warn(`Failed to generate game ID for worker ${workerId}`); - return; - } +async function startMatchmakingPolling(gm: GameManager) { + startPolling( + async () => { + try { + const url = `${config.jwtIssuer() + "/matchmaking/checkin"}`; + const gameId = generateGameIdForWorker(); + if (gameId === null) { + log.warn(`Failed to generate game ID for worker ${workerId}`); + return; + } - const controller = new AbortController(); - const timeoutId = setTimeout(() => controller.abort(), 20000); - const response = await fetch(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - "x-api-key": config.apiKey(), - }, - body: JSON.stringify({ - id: workerId, - gameId: gameId, - ccu: gm.activeClients(), - instanceId: process.env.INSTANCE_ID, - }), - signal: controller.signal, - }); + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), 20000); + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-api-key": config.apiKey(), + }, + body: JSON.stringify({ + id: workerId, + gameId: gameId, + ccu: gm.activeClients(), + instanceId: process.env.INSTANCE_ID, + }), + signal: controller.signal, + }); - clearTimeout(timeoutId); + clearTimeout(timeoutId); - if (!response.ok) { - log.warn( - `Failed to poll lobby: ${response.status} ${response.statusText}`, - ); - return; - } + if (!response.ok) { + log.warn( + `Failed to poll lobby: ${response.status} ${response.statusText}`, + ); + return; + } - const data = await response.json(); - log.info(`Lobby poll successful:`, data); + const data = await response.json(); + log.info(`Lobby poll successful:`, data); - if (data.assignment) { - const gameConfig = playlist.get1v1Config(); - const game = gm.createGame(gameId, gameConfig); - setTimeout(() => { - // Wait a few seconds to allow clients to connect. - console.log(`Starting game ${gameId}`); - game.start(); - }, 5000); - } - } catch (error) { - log.error(`Error polling lobby:`, error); - } finally { - setTimeout( - () => { - pollLobby(gm); - }, - 5000 + Math.random() * 1000, - ); - } + if (data.assignment) { + const gameConfig = playlist.get1v1Config(); + const game = gm.createGame(gameId, gameConfig); + setTimeout(() => { + // Wait a few seconds to allow clients to connect. + console.log(`Starting game ${gameId}`); + game.start(); + }, 5000); + } + } catch (error) { + log.error(`Error polling lobby:`, error); + } + }, + 5000 + Math.random() * 1000, + ); } // TODO: This is a hack to generate a game ID for the worker. diff --git a/tests/server/PollingLoop.test.ts b/tests/server/PollingLoop.test.ts new file mode 100644 index 000000000..008a2f7d7 --- /dev/null +++ b/tests/server/PollingLoop.test.ts @@ -0,0 +1,77 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { startPolling } from "../../src/server/PollingLoop"; + +vi.mock("../../src/server/Logger", () => ({ + logger: { + child: () => ({ + error: vi.fn(), + }), + }, +})); + +describe("PollingLoop", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it("should not start the next task until the previous one completes", async () => { + let taskCallCount = 0; + let resolveTask: ((value?: void) => void) | undefined; + + const task = vi.fn().mockImplementation(() => { + taskCallCount++; + return new Promise((resolve) => { + resolveTask = resolve; + }); + }); + + startPolling(task, 100); + + // Initial call + expect(taskCallCount).toBe(1); + + // Advance time past the interval - should NOT trigger next call yet + await vi.advanceTimersByTimeAsync(200); + expect(taskCallCount).toBe(1); + + // Resolve the first task + if (resolveTask) resolveTask(); + + // Wait for microtasks (promise callbacks, finally block) to run + await new Promise(process.nextTick); + + // NOW advance time to trigger the scheduled continuation + await vi.advanceTimersByTimeAsync(100); + + expect(taskCallCount).toBe(2); + }); + + it("should continue polling even if a task fails", async () => { + let taskCallCount = 0; + const task = vi.fn().mockImplementation(async () => { + taskCallCount++; + if (taskCallCount === 1) { + throw new Error("Task failed"); + } + }); + + startPolling(task, 100); + + // First call + expect(taskCallCount).toBe(1); + + // Wait for rejection and finally block + await new Promise(process.nextTick); + await new Promise(process.nextTick); + + // Advance time + await vi.advanceTimersByTimeAsync(100); + + // Second call + expect(taskCallCount).toBe(2); + }); +});