import { Worker } from "cluster"; import winston from "winston"; import { ServerConfig } from "../core/configuration/Config"; import { GameConfig, PublicGameInfo, PublicGameType } from "../core/Schemas"; import { generateID } from "../core/Util"; import { MasterCreateGame, MasterLobbiesBroadcast, MasterUpdateGame, WorkerMessageSchema, } from "./IPCBridgeSchema"; import { logger } from "./Logger"; import { MapPlaylist } from "./MapPlaylist"; import { startPolling } from "./PollingLoop"; export interface MasterLobbyServiceOptions { config: ServerConfig; playlist: MapPlaylist; log: typeof logger; } export class MasterLobbyService { private readonly workers = new Map(); // Worker id => the lobbies it owns. private readonly workerLobbies = new Map(); private readonly readyWorkers = new Set(); private started = false; constructor( private config: ServerConfig, private playlist: MapPlaylist, private log: winston.Logger, ) {} registerWorker(workerId: number, worker: Worker) { this.workers.set(workerId, worker); worker.on("message", (raw: unknown) => { const result = WorkerMessageSchema.safeParse(raw); if (!result.success) { this.log.error("Invalid IPC message from worker:", raw); return; } const msg = result.data; switch (msg.type) { case "workerReady": this.handleWorkerReady(msg.workerId); break; case "lobbyList": this.workerLobbies.set(workerId, msg.lobbies); break; } }); } removeWorker(workerId: number) { this.workers.delete(workerId); this.workerLobbies.delete(workerId); this.readyWorkers.delete(workerId); } isHealthy(): boolean { // We consider the lobby service healthy if at least half of the workers are ready. // This allows for some leeway if a worker crashes. const minWorkers = Math.max(this.config.numWorkers() / 2, 1); return this.started && this.readyWorkers.size >= minWorkers; } private handleWorkerReady(workerId: number) { this.readyWorkers.add(workerId); this.log.info( `Worker ${workerId} is ready. (${this.readyWorkers.size}/${this.config.numWorkers()} ready)`, ); if (this.readyWorkers.size === this.config.numWorkers() && !this.started) { this.started = true; this.log.info("All workers ready, starting game scheduling"); startPolling(async () => this.broadcastLobbies(), 500); startPolling(async () => await this.maybeScheduleLobby(), 1000); } } private getAllLobbies(): Record { const lobbies = Array.from(this.workerLobbies.values()).flat(); const result: Record = { ffa: [], team: [], special: [], }; for (const lobby of lobbies) { result[lobby.publicGameType].push(lobby); } for (const type of Object.keys(result) as PublicGameType[]) { result[type].sort((a, b) => { if (a.startsAt === undefined && b.startsAt === undefined) { // Sort by game id for stability. return a.gameID > b.gameID ? 1 : -1; } // If a lobby has startsAt set, we assume it's the active one. if (a.startsAt === undefined) return 1; if (b.startsAt === undefined) return -1; return a.startsAt - b.startsAt; }); } return result; } private broadcastLobbies() { const msg = { type: "lobbiesBroadcast", publicGames: { serverTime: Date.now(), games: this.getAllLobbies(), }, } satisfies MasterLobbiesBroadcast; for (const [workerId, worker] of this.workers.entries()) { worker.send(msg, (e) => { if (e) { this.log.error( `Failed to send lobbies broadcast to worker ${workerId}, killing worker:`, e, ); worker.kill(); } }); } } private async maybeScheduleLobby() { const lobbiesByType = this.getAllLobbies(); const lobbyTypes = Object.keys(lobbiesByType) as PublicGameType[]; const usedMaps = new Set(); const usedTeamTypes = new Set(); const usedMaxPlayers = new Set(); const recordInUse = (config: GameConfig) => { usedMaps.add(config.gameMap); if (config.playerTeams !== undefined) { usedTeamTypes.add(String(config.playerTeams)); } if (config.maxPlayers !== undefined) { usedMaxPlayers.add(config.maxPlayers); } }; for (const type of lobbyTypes) { const lobbies = lobbiesByType[type]; const nextLobby = lobbies[0]; if (nextLobby && nextLobby.gameConfig) { recordInUse(nextLobby.gameConfig); } } for (const type of lobbyTypes) { const lobbies = lobbiesByType[type]; // Always ensure the next lobby has a timer, even if we already have 2+ // lobbies. This prevents a race where two lobbies are created before // either receives a startsAt (IPC round-trip delay), leaving both stuck // without a countdown. const nextLobby = lobbies[0]; if (nextLobby && nextLobby.startsAt === undefined) { this.sendMessageToWorker({ type: "updateLobby", gameID: nextLobby.gameID, startsAt: Date.now() + this.config.gameCreationRate(), }); } if (lobbies.length >= 2) { continue; } const gameConfig = await this.playlist.gameConfigNotInUse(type, (c) => { if (usedMaps.has(c.gameMap)) return false; if ( c.playerTeams !== undefined && usedTeamTypes.has(String(c.playerTeams)) ) { return false; } if (c.maxPlayers !== undefined && usedMaxPlayers.has(c.maxPlayers)) { return false; } return true; }); recordInUse(gameConfig); this.sendMessageToWorker({ type: "createGame", gameID: generateID(), gameConfig, publicGameType: type, } satisfies MasterCreateGame); } } private sendMessageToWorker(msg: MasterCreateGame | MasterUpdateGame): void { const workerId = this.config.workerIndex(msg.gameID); const worker = this.workers.get(workerId); if (!worker) { this.log.error(`Worker ${workerId} not found`); return; } worker.send(msg, (e) => { if (e) { this.log.error( `Failed to send message to worker ${workerId}, killing worker:`, e, ); worker.kill(); } }); } }