mirror of
https://github.com/openfrontio/OpenFrontIO.git
synced 2026-06-21 09:40:44 +00:00
fix: replace setInterval with recursive setTimeout in Master.ts to pr… (#2869)
If this PR fixes an issue, link it below. If not, delete these two lines. Resolves #2868 ## Description: This PR addresses a critical memory leak in the Master server process (causing ~30GB RAM usage). The issue was caused by `setInterval` calling `fetchLobbies()` every 100ms. When `fetchLobbies` took longer than 100ms to complete (due to network latency or load), requests would pile up indefinitely, creating a massive queue of pending Promises and open sockets. I have refactored the polling logic into a generic `startPolling` utility (in `src/server/PollingLoop.ts`) that uses a recursive `setTimeout` pattern. This ensures that the next `fetchLobbies` call is only scheduled *after* the previous one has completed (successfully or failed), preventing any request pile-up. ## Please complete the following: - [x] I have added screenshots for all UI updates (N/A - backend only) - [x] I process any text displayed to the user through translateText() and I've added it to the en.json file (N/A - no user facing text) - [x] I have added relevant tests to the test directory (`tests/PollingLoop.test.ts`) - [x] I confirm I have thoroughly tested these changes and take full responsibility for any bugs introduced ## Please put your Discord username so you can be contacted if a bug or regression is found: codimo
This commit is contained in:
@@ -12,6 +12,7 @@ import { GameInfo } from "../core/Schemas";
|
||||
import { generateID } from "../core/Util";
|
||||
import { logger } from "./Logger";
|
||||
import { MapPlaylist } from "./MapPlaylist";
|
||||
import { startPolling } from "./PollingLoop";
|
||||
import { renderHtml } from "./RenderHtml";
|
||||
|
||||
const config = getServerConfigFromServer();
|
||||
@@ -176,15 +177,12 @@ export async function startMaster() {
|
||||
});
|
||||
};
|
||||
|
||||
setInterval(
|
||||
() =>
|
||||
fetchLobbies().then((lobbies) => {
|
||||
if (lobbies === 0) {
|
||||
scheduleLobbies();
|
||||
}
|
||||
}),
|
||||
100,
|
||||
);
|
||||
startPolling(async () => {
|
||||
const lobbies = await fetchLobbies();
|
||||
if (lobbies === 0) {
|
||||
scheduleLobbies();
|
||||
}
|
||||
}, 100);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
import { logger } from "./Logger";
|
||||
|
||||
const log = logger.child({ comp: "polling" });
|
||||
|
||||
/**
|
||||
* Starts a polling loop that executes the given async task effectively recursively using setTimeout.
|
||||
* This guarantees that the next execution only starts after the previous one has completed (or failed),
|
||||
* preventing request pile-ups.
|
||||
*
|
||||
* @param task The async function to execute.
|
||||
* @param intervalMs The delay in milliseconds before the next execution.
|
||||
*/
|
||||
export function startPolling(task: () => Promise<void>, intervalMs: number) {
|
||||
const runLoop = () => {
|
||||
task()
|
||||
.catch((error) => {
|
||||
log.error("Error in polling loop:", error);
|
||||
})
|
||||
.finally(() => {
|
||||
setTimeout(runLoop, intervalMs);
|
||||
});
|
||||
};
|
||||
runLoop();
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
import { base64url } from "jose";
|
||||
import { Logger } from "winston";
|
||||
import { CosmeticsSchema } from "../core/CosmeticSchemas";
|
||||
import { startPolling } from "./PollingLoop";
|
||||
import {
|
||||
FailOpenPrivilegeChecker,
|
||||
PrivilegeChecker,
|
||||
@@ -28,12 +29,7 @@ export class PrivilegeRefresher {
|
||||
this.log.info(
|
||||
`Starting privilege refresher with interval ${this.refreshInterval}`,
|
||||
);
|
||||
// Add some jitter to the initial load and the interval.
|
||||
setTimeout(() => this.loadPrivilegeChecker(), Math.random() * 1000);
|
||||
setInterval(
|
||||
() => this.loadPrivilegeChecker(),
|
||||
this.refreshInterval + Math.random() * 1000,
|
||||
);
|
||||
startPolling(() => this.loadPrivilegeChecker(), this.refreshInterval);
|
||||
}
|
||||
|
||||
public get(): PrivilegeChecker {
|
||||
|
||||
+52
-53
@@ -27,6 +27,7 @@ import { logger } from "./Logger";
|
||||
|
||||
import { GameEnv } from "../core/configuration/Config";
|
||||
import { MapPlaylist } from "./MapPlaylist";
|
||||
import { startPolling } from "./PollingLoop";
|
||||
import { PrivilegeRefresher } from "./PrivilegeRefresher";
|
||||
import { verifyTurnstileToken } from "./Turnstile";
|
||||
import { initWorkerMetrics } from "./WorkerMetrics";
|
||||
@@ -43,7 +44,7 @@ export async function startWorker() {
|
||||
|
||||
setTimeout(
|
||||
() => {
|
||||
pollLobby(gm);
|
||||
startMatchmakingPolling(gm);
|
||||
},
|
||||
1000 + Math.random() * 2000,
|
||||
);
|
||||
@@ -483,63 +484,61 @@ export async function startWorker() {
|
||||
});
|
||||
}
|
||||
|
||||
async function pollLobby(gm: GameManager) {
|
||||
try {
|
||||
const url = `${config.jwtIssuer() + "/matchmaking/checkin"}`;
|
||||
const gameId = generateGameIdForWorker();
|
||||
if (gameId === null) {
|
||||
log.warn(`Failed to generate game ID for worker ${workerId}`);
|
||||
return;
|
||||
}
|
||||
async function startMatchmakingPolling(gm: GameManager) {
|
||||
startPolling(
|
||||
async () => {
|
||||
try {
|
||||
const url = `${config.jwtIssuer() + "/matchmaking/checkin"}`;
|
||||
const gameId = generateGameIdForWorker();
|
||||
if (gameId === null) {
|
||||
log.warn(`Failed to generate game ID for worker ${workerId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const controller = new AbortController();
|
||||
const timeoutId = setTimeout(() => controller.abort(), 20000);
|
||||
const response = await fetch(url, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"x-api-key": config.apiKey(),
|
||||
},
|
||||
body: JSON.stringify({
|
||||
id: workerId,
|
||||
gameId: gameId,
|
||||
ccu: gm.activeClients(),
|
||||
instanceId: process.env.INSTANCE_ID,
|
||||
}),
|
||||
signal: controller.signal,
|
||||
});
|
||||
const controller = new AbortController();
|
||||
const timeoutId = setTimeout(() => controller.abort(), 20000);
|
||||
const response = await fetch(url, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"x-api-key": config.apiKey(),
|
||||
},
|
||||
body: JSON.stringify({
|
||||
id: workerId,
|
||||
gameId: gameId,
|
||||
ccu: gm.activeClients(),
|
||||
instanceId: process.env.INSTANCE_ID,
|
||||
}),
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
clearTimeout(timeoutId);
|
||||
clearTimeout(timeoutId);
|
||||
|
||||
if (!response.ok) {
|
||||
log.warn(
|
||||
`Failed to poll lobby: ${response.status} ${response.statusText}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (!response.ok) {
|
||||
log.warn(
|
||||
`Failed to poll lobby: ${response.status} ${response.statusText}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const data = await response.json();
|
||||
log.info(`Lobby poll successful:`, data);
|
||||
const data = await response.json();
|
||||
log.info(`Lobby poll successful:`, data);
|
||||
|
||||
if (data.assignment) {
|
||||
const gameConfig = playlist.get1v1Config();
|
||||
const game = gm.createGame(gameId, gameConfig);
|
||||
setTimeout(() => {
|
||||
// Wait a few seconds to allow clients to connect.
|
||||
console.log(`Starting game ${gameId}`);
|
||||
game.start();
|
||||
}, 5000);
|
||||
}
|
||||
} catch (error) {
|
||||
log.error(`Error polling lobby:`, error);
|
||||
} finally {
|
||||
setTimeout(
|
||||
() => {
|
||||
pollLobby(gm);
|
||||
},
|
||||
5000 + Math.random() * 1000,
|
||||
);
|
||||
}
|
||||
if (data.assignment) {
|
||||
const gameConfig = playlist.get1v1Config();
|
||||
const game = gm.createGame(gameId, gameConfig);
|
||||
setTimeout(() => {
|
||||
// Wait a few seconds to allow clients to connect.
|
||||
console.log(`Starting game ${gameId}`);
|
||||
game.start();
|
||||
}, 5000);
|
||||
}
|
||||
} catch (error) {
|
||||
log.error(`Error polling lobby:`, error);
|
||||
}
|
||||
},
|
||||
5000 + Math.random() * 1000,
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: This is a hack to generate a game ID for the worker.
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { startPolling } from "../../src/server/PollingLoop";
|
||||
|
||||
vi.mock("../../src/server/Logger", () => ({
|
||||
logger: {
|
||||
child: () => ({
|
||||
error: vi.fn(),
|
||||
}),
|
||||
},
|
||||
}));
|
||||
|
||||
describe("PollingLoop", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it("should not start the next task until the previous one completes", async () => {
|
||||
let taskCallCount = 0;
|
||||
let resolveTask: ((value?: void) => void) | undefined;
|
||||
|
||||
const task = vi.fn().mockImplementation(() => {
|
||||
taskCallCount++;
|
||||
return new Promise<void>((resolve) => {
|
||||
resolveTask = resolve;
|
||||
});
|
||||
});
|
||||
|
||||
startPolling(task, 100);
|
||||
|
||||
// Initial call
|
||||
expect(taskCallCount).toBe(1);
|
||||
|
||||
// Advance time past the interval - should NOT trigger next call yet
|
||||
await vi.advanceTimersByTimeAsync(200);
|
||||
expect(taskCallCount).toBe(1);
|
||||
|
||||
// Resolve the first task
|
||||
if (resolveTask) resolveTask();
|
||||
|
||||
// Wait for microtasks (promise callbacks, finally block) to run
|
||||
await new Promise(process.nextTick);
|
||||
|
||||
// NOW advance time to trigger the scheduled continuation
|
||||
await vi.advanceTimersByTimeAsync(100);
|
||||
|
||||
expect(taskCallCount).toBe(2);
|
||||
});
|
||||
|
||||
it("should continue polling even if a task fails", async () => {
|
||||
let taskCallCount = 0;
|
||||
const task = vi.fn().mockImplementation(async () => {
|
||||
taskCallCount++;
|
||||
if (taskCallCount === 1) {
|
||||
throw new Error("Task failed");
|
||||
}
|
||||
});
|
||||
|
||||
startPolling(task, 100);
|
||||
|
||||
// First call
|
||||
expect(taskCallCount).toBe(1);
|
||||
|
||||
// Wait for rejection and finally block
|
||||
await new Promise(process.nextTick);
|
||||
await new Promise(process.nextTick);
|
||||
|
||||
// Advance time
|
||||
await vi.advanceTimersByTimeAsync(100);
|
||||
|
||||
// Second call
|
||||
expect(taskCallCount).toBe(2);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user