diff --git a/nginx.conf b/nginx.conf index 12e7282f1..2da48c6af 100644 --- a/nginx.conf +++ b/nginx.conf @@ -102,25 +102,6 @@ server { add_header Cache-Control "public, max-age=86400"; # 24 hours } - # /api/public_lobbies endpoint - Cache for 1 second to handle high request volume - location = /api/public_lobbies { - proxy_pass http://127.0.0.1:3000; - proxy_http_version 1.1; - - # Cache configuration - proxy_cache API_CACHE; - proxy_cache_valid 200 1s; - proxy_cache_use_stale updating error timeout http_500 http_502 http_503 http_504; - proxy_cache_lock on; - add_header X-Cache-Status $upstream_cache_status; - - # Standard proxy headers - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - } - # /api/env endpoint - Cache for 1 hour location = /api/env { proxy_pass http://127.0.0.1:3000; diff --git a/src/client/JoinLobbyModal.ts b/src/client/JoinLobbyModal.ts index 8a1bf281d..7cd330d64 100644 --- a/src/client/JoinLobbyModal.ts +++ b/src/client/JoinLobbyModal.ts @@ -70,7 +70,7 @@ export class JoinLobbyModal extends BaseModal { } this.updateFromLobby({ ...lobby, - msUntilStart: lobby.msUntilStart ?? undefined, + startsAt: lobby.startsAt ?? undefined, }); }; @@ -94,7 +94,7 @@ export class JoinLobbyModal extends BaseModal { }) : translateText("public_lobby.started"); const maxPlayers = this.gameConfig?.maxPlayers ?? 0; - const playerCount = this.playerCount; + const playerCount = this.players?.length ?? 0; const hostClientID = this.isPrivateLobby() ? (this.lobbyCreatorClientID ?? "") : ""; @@ -283,13 +283,13 @@ export class JoinLobbyModal extends BaseModal { `; } - public open(lobbyId: string = "", lobbyInfo?: GameInfo) { + public open(lobbyId: string = "", isPublic: boolean = false) { super.open(); if (lobbyId) { - this.startTrackingLobby(lobbyId, lobbyInfo); + this.startTrackingLobby(lobbyId); // If opened with lobbyInfo (public lobby case), auto-join the lobby - if (lobbyInfo) { - this.joinPublicLobby(lobbyId, lobbyInfo); + if (isPublic) { + this.joinPublicLobby(lobbyId); } else { // If opened with lobbyId but no lobbyInfo (URL join case), check if active and join this.handleUrlJoin(lobbyId); @@ -329,14 +329,13 @@ export class JoinLobbyModal extends BaseModal { } } - private joinPublicLobby(lobbyId: string, lobbyInfo: GameInfo) { + private joinPublicLobby(lobbyId: string) { // Dispatch join-lobby event to actually connect to the lobby this.dispatchEvent( new CustomEvent("join-lobby", { detail: { gameID: lobbyId, clientID: this.currentClientID, - publicLobbyInfo: lobbyInfo, } as JoinLobbyEvent, bubbles: true, composed: true, @@ -349,7 +348,6 @@ export class JoinLobbyModal extends BaseModal { this.currentClientID = getClientIDForGame(lobbyId); this.gameConfig = null; this.players = []; - this.playerCount = 0; this.nationCount = 0; this.lobbyStartAt = null; this.lobbyCreatorClientID = null; @@ -397,7 +395,6 @@ export class JoinLobbyModal extends BaseModal { if (this.lobbyIdInput) this.lobbyIdInput.value = ""; this.gameConfig = null; this.players = []; - this.playerCount = 0; this.currentLobbyId = ""; this.currentClientID = ""; this.nationCount = 0; @@ -536,18 +533,8 @@ export class JoinLobbyModal extends BaseModal { // --- Lobby event handling --- private updateFromLobby(lobby: GameInfo) { - if (lobby.clients) { - this.players = lobby.clients; - this.playerCount = lobby.clients.length; - } else { - this.players = []; - this.playerCount = lobby.numClients ?? 0; - } - if (lobby.msUntilStart !== undefined) { - this.lobbyStartAt = lobby.msUntilStart + Date.now(); - } else { - this.lobbyStartAt = null; - } + this.players = lobby.clients ?? []; + this.lobbyStartAt = lobby.startsAt ?? null; this.syncCountdownTimer(); if (lobby.gameConfig) { const mapChanged = this.gameConfig?.gameMap !== lobby.gameConfig.gameMap; diff --git a/src/client/LobbySocket.ts b/src/client/LobbySocket.ts index f398da054..7b71dc21a 100644 --- a/src/client/LobbySocket.ts +++ b/src/client/LobbySocket.ts @@ -1,6 +1,5 @@ -import { GameInfo } from "../core/Schemas"; - -type LobbyUpdateHandler = (lobbies: GameInfo[]) => void; +import { getServerConfigFromClient } from "../core/configuration/ConfigLoader"; +import { PublicGames, PublicGamesSchema } from "../core/Schemas"; interface LobbySocketOptions { reconnectDelay?: number; @@ -8,36 +7,39 @@ interface LobbySocketOptions { pollIntervalMs?: number; } +function getRandomWorkerPath(numWorkers: number): string { + const workerIndex = Math.floor(Math.random() * numWorkers); + return `/w${workerIndex}`; +} + export class PublicLobbySocket { private ws: WebSocket | null = null; private wsReconnectTimeout: number | null = null; - private fallbackPollInterval: number | null = null; private wsConnectionAttempts = 0; private wsAttemptCounted = false; + private workerPath: string = ""; private readonly reconnectDelay: number; private readonly maxWsAttempts: number; - private readonly pollIntervalMs: number; - private readonly onLobbiesUpdate: LobbyUpdateHandler; constructor( - onLobbiesUpdate: LobbyUpdateHandler, + private onLobbiesUpdate: (data: PublicGames) => void, options?: LobbySocketOptions, ) { - this.onLobbiesUpdate = onLobbiesUpdate; this.reconnectDelay = options?.reconnectDelay ?? 3000; this.maxWsAttempts = options?.maxWsAttempts ?? 3; - this.pollIntervalMs = options?.pollIntervalMs ?? 1000; } - start() { + async start() { this.wsConnectionAttempts = 0; + // Get config to determine number of workers, then pick a random one + const config = await getServerConfigFromClient(); + this.workerPath = getRandomWorkerPath(config.numWorkers()); this.connectWebSocket(); } stop() { this.disconnectWebSocket(); - this.stopFallbackPolling(); } private connectWebSocket() { @@ -49,7 +51,7 @@ export class PublicLobbySocket { } const protocol = window.location.protocol === "https:" ? "wss:" : "ws:"; - const wsUrl = `${protocol}//${window.location.host}/lobbies`; + const wsUrl = `${protocol}//${window.location.host}${this.workerPath}/lobbies`; this.ws = new WebSocket(wsUrl); this.wsAttemptCounted = false; @@ -70,15 +72,14 @@ export class PublicLobbySocket { clearTimeout(this.wsReconnectTimeout); this.wsReconnectTimeout = null; } - this.stopFallbackPolling(); } private handleMessage(event: MessageEvent) { try { - const message = JSON.parse(event.data as string); - if (message.type === "lobbies_update") { - this.onLobbiesUpdate(message.data?.lobbies ?? []); - } + const publicGames = PublicGamesSchema.parse( + JSON.parse(event.data as string), + ); + this.onLobbiesUpdate(publicGames); } catch (error) { console.error("Error parsing WebSocket message:", error); if (this.ws && this.ws.readyState === WebSocket.OPEN) { @@ -101,10 +102,7 @@ export class PublicLobbySocket { this.wsConnectionAttempts++; } if (this.wsConnectionAttempts >= this.maxWsAttempts) { - console.log( - "Max WebSocket attempts reached, falling back to HTTP polling", - ); - this.startFallbackPolling(); + console.error("Max WebSocket attempts reached"); } else { this.scheduleReconnect(); } @@ -121,7 +119,7 @@ export class PublicLobbySocket { this.wsConnectionAttempts++; } if (this.wsConnectionAttempts >= this.maxWsAttempts) { - this.startFallbackPolling(); + alert("error connecting to game service"); } else { this.scheduleReconnect(); } @@ -145,33 +143,4 @@ export class PublicLobbySocket { this.wsReconnectTimeout = null; } } - - private startFallbackPolling() { - if (this.fallbackPollInterval !== null) return; - console.log("Starting HTTP fallback polling"); - this.fetchLobbiesHTTP(); - this.fallbackPollInterval = window.setInterval(() => { - this.fetchLobbiesHTTP(); - }, this.pollIntervalMs); - } - - private stopFallbackPolling() { - if (this.fallbackPollInterval !== null) { - clearInterval(this.fallbackPollInterval); - this.fallbackPollInterval = null; - } - } - - private async fetchLobbiesHTTP() { - try { - const response = await fetch(`/api/public_lobbies`); - if (!response.ok) { - throw new Error(`HTTP error! status: ${response.status}`); - } - const data = await response.json(); - this.onLobbiesUpdate(data.lobbies as GameInfo[]); - } catch (error) { - console.error("Error fetching lobbies via HTTP:", error); - } - } } diff --git a/src/client/Main.ts b/src/client/Main.ts index c48d0f06f..ba6569889 100644 --- a/src/client/Main.ts +++ b/src/client/Main.ts @@ -1,12 +1,7 @@ import version from "resources/version.txt?raw"; import { UserMeResponse } from "../core/ApiSchemas"; import { EventBus } from "../core/EventBus"; -import { - GAME_ID_REGEX, - GameInfo, - GameRecord, - GameStartInfo, -} from "../core/Schemas"; +import { GAME_ID_REGEX, GameRecord, GameStartInfo } from "../core/Schemas"; import { GameEnv } from "../core/configuration/Config"; import { getServerConfigFromClient } from "../core/configuration/ConfigLoader"; import { GameType } from "../core/game/Game"; @@ -223,7 +218,6 @@ export interface JoinLobbyEvent { // GameRecord exists when replaying an archived game. gameRecord?: GameRecord; source?: "public" | "private" | "host" | "matchmaking" | "singleplayer"; - publicLobbyInfo?: GameInfo; } class Client { @@ -773,7 +767,7 @@ class Client { } const config = await getServerConfigFromClient(); // Only update URL immediately for private lobbies, not public ones - if (!lobby.publicLobbyInfo && lobby.source !== "public") { + if (lobby.source !== "public") { this.updateJoinUrlForShare(lobby.gameID, config); } @@ -908,7 +902,7 @@ class Client { // Open the join lobby modal page and pass the lobby info window.showPage?.("page-join-lobby"); - this.joinModal?.open(lobby.gameID, lobby); + this.joinModal?.open(lobby.gameID, true); } private async handleLeaveLobby(/* event: CustomEvent */) { diff --git a/src/client/PublicLobby.ts b/src/client/PublicLobby.ts index dc60087fd..ef9729c93 100644 --- a/src/client/PublicLobby.ts +++ b/src/client/PublicLobby.ts @@ -1,7 +1,7 @@ import { html, LitElement } from "lit"; import { customElement, state } from "lit/decorators.js"; import { GameMapType } from "../core/game/Game"; -import { GameID, GameInfo } from "../core/Schemas"; +import { GameID, PublicGameInfo, PublicGames } from "../core/Schemas"; import { PublicLobbySocket } from "./LobbySocket"; import { terrainMapFileLoader } from "./TerrainMapFileLoader"; import { @@ -13,16 +13,19 @@ import { } from "./Utils"; export interface ShowPublicLobbyModalEvent { - lobby: GameInfo; + lobby: PublicGameInfo; } @customElement("public-lobby") export class PublicLobby extends LitElement { - @state() private lobbies: GameInfo[] = []; + @state() private publicGames: PublicGames | null = null; + @state() public isLobbyHighlighted: boolean = false; @state() private mapImages: Map = new Map(); + private lobbyIDToStart = new Map(); - private lobbySocket = new PublicLobbySocket((lobbies) => - this.handleLobbiesUpdate(lobbies), + private serverTimeOffset = 0; + private lobbySocket = new PublicLobbySocket((data) => + this.handleLobbiesUpdate(data), ); createRenderRoot() { @@ -39,12 +42,18 @@ export class PublicLobby extends LitElement { this.lobbySocket.stop(); } - private handleLobbiesUpdate(lobbies: GameInfo[]) { - this.lobbies = lobbies; - this.lobbies.forEach((l) => { + private handleLobbiesUpdate(publicGames: PublicGames) { + this.publicGames = publicGames; + + // Calculate offset between server time and client time + if (this.publicGames) { + this.serverTimeOffset = this.publicGames.serverTime - Date.now(); + } + this.publicGames.games.forEach((l) => { if (!this.lobbyIDToStart.has(l.gameID)) { - const msUntilStart = l.msUntilStart ?? 0; - this.lobbyIDToStart.set(l.gameID, msUntilStart + Date.now()); + // Convert server's startsAt to client time by subtracting offset + const startsAt = l.startsAt ?? Date.now(); + this.lobbyIDToStart.set(l.gameID, startsAt - this.serverTimeOffset); } if (l.gameConfig && !this.mapImages.has(l.gameID)) { @@ -66,9 +75,9 @@ export class PublicLobby extends LitElement { } render() { - if (this.lobbies.length === 0) return html``; + if (!this.publicGames) return html``; - const lobby = this.lobbies[0]; + const lobby = this.publicGames.games[0]; if (!lobby?.gameConfig) return html``; const start = this.lobbyIDToStart.get(lobby.gameID) ?? 0; @@ -200,7 +209,7 @@ export class PublicLobby extends LitElement { this.lobbySocket.stop(); } - private lobbyClicked(lobby: GameInfo) { + private lobbyClicked(lobby: PublicGameInfo) { // Validate username before opening the modal const usernameInput = document.querySelector("username-input") as any; if ( diff --git a/src/core/Schemas.ts b/src/core/Schemas.ts index dc257151b..0aa06a316 100644 --- a/src/core/Schemas.ts +++ b/src/core/Schemas.ts @@ -136,6 +136,9 @@ export type PlayerPattern = z.infer; export type PlayerColor = z.infer; export type Flag = z.infer; export type GameStartInfo = z.infer; +export type GameInfo = z.infer; +export type PublicGames = z.infer; +export type PublicGameInfo = z.infer; const ClientInfoSchema = z.object({ clientID: z.string(), @@ -145,18 +148,22 @@ const ClientInfoSchema = z.object({ export const GameInfoSchema = z.object({ gameID: z.string(), clients: z.array(ClientInfoSchema).optional(), - numClients: z.number().optional(), - msUntilStart: z.number().optional(), + startsAt: z.number().optional(), + serverTime: z.number(), gameConfig: z.lazy(() => GameConfigSchema).optional(), }); -export interface GameInfo { - gameID: GameID; - clients?: ClientInfo[]; - numClients?: number; - msUntilStart?: number; - gameConfig?: GameConfig; -} +export const PublicGameInfoSchema = z.object({ + gameID: z.string(), + numClients: z.number(), + startsAt: z.number(), + gameConfig: z.lazy(() => GameConfigSchema).optional(), +}); + +export const PublicGamesSchema = z.object({ + serverTime: z.number(), + games: PublicGameInfoSchema.array(), +}); export class LobbyInfoEvent implements GameEvent { constructor(public lobby: GameInfo) {} diff --git a/src/server/GameManager.ts b/src/server/GameManager.ts index d2588ae00..7b7e4358a 100644 --- a/src/server/GameManager.ts +++ b/src/server/GameManager.ts @@ -26,6 +26,12 @@ export class GameManager { return this.games.get(id) ?? null; } + public publicLobbies(): GameServer[] { + return Array.from(this.games.values()).filter( + (g) => g.phase() === GamePhase.Lobby && g.isPublic(), + ); + } + joinClient(client: Client, gameID: GameID): boolean { const game = this.games.get(gameID); if (game) { @@ -52,6 +58,7 @@ export class GameManager { id: GameID, gameConfig: GameConfig | undefined, creatorClientID?: string, + startsAt?: number, ) { const game = new GameServer( id, @@ -77,6 +84,7 @@ export class GameManager { ...gameConfig, }, creatorClientID, + startsAt, ); this.games.set(id, game); return game; diff --git a/src/server/GamePreviewBuilder.ts b/src/server/GamePreviewBuilder.ts index a68c335c2..ea9916025 100644 --- a/src/server/GamePreviewBuilder.ts +++ b/src/server/GamePreviewBuilder.ts @@ -147,8 +147,7 @@ export function buildPreview( activePlayers = countActivePlayers(players); } else { activePlayers = - countActivePlayers(players) || - (lobby?.numClients ?? lobby?.clients?.length ?? 0); + countActivePlayers(players) || (lobby?.clients?.length ?? 0); } const map = lobby?.gameConfig?.gameMap ?? config.gameMap; let mode = lobby?.gameConfig?.gameMode ?? config.gameMode ?? GameMode.FFA; diff --git a/src/server/GameServer.ts b/src/server/GameServer.ts index ef476f1ec..f8b183dce 100644 --- a/src/server/GameServer.ts +++ b/src/server/GameServer.ts @@ -88,6 +88,7 @@ export class GameServer { private config: ServerConfig, public gameConfig: GameConfig, private lobbyCreatorID?: string, + private startsAt?: number, ) { this.log = log_.child({ gameID: id }); } @@ -506,15 +507,6 @@ export class GameServer { return this.activeClients.length; } - public startTime(): number { - if (this._startTime !== null && this._startTime > 0) { - return this._startTime; - } else { - //game hasn't started yet, only works for public games - return this.createdAt + this.config.gameCreationRate(); - } - } - public prestart() { if (this.hasStarted()) { return; @@ -787,8 +779,9 @@ export class GameServer { } } - const msSinceCreation = now - this.createdAt; - const lessThanLifetime = msSinceCreation < this.config.gameCreationRate(); + // Public Games + + const lessThanLifetime = Date.now() < this.startsAt!; const notEnoughPlayers = this.gameConfig.gameType === GameType.Public && this.gameConfig.maxPlayers && @@ -796,8 +789,7 @@ export class GameServer { if (lessThanLifetime && notEnoughPlayers) { return GamePhase.Lobby; } - const warmupOver = - now > this.createdAt + this.config.gameCreationRate() + 30 * 1000; + const warmupOver = now > this.startsAt! + 30 * 1000; if (noActive && warmupOver && noRecentPings) { return GamePhase.Finished; } @@ -817,15 +809,11 @@ export class GameServer { clientID: c.clientID, })), gameConfig: this.gameConfig, - msUntilStart: this.isPublic() ? this.getMsUntilStart() : undefined, + startsAt: this.startsAt, + serverTime: Date.now(), }; } - private getMsUntilStart(): number { - const startTime = this.createdAt + this.config.gameCreationRate(); - return Math.max(0, startTime - Date.now()); - } - public isPublic(): boolean { return this.gameConfig.gameType === GameType.Public; } diff --git a/src/server/IPCBridgeSchema.ts b/src/server/IPCBridgeSchema.ts new file mode 100644 index 000000000..e6034091e --- /dev/null +++ b/src/server/IPCBridgeSchema.ts @@ -0,0 +1,56 @@ +import { z } from "zod"; +import { + GameConfigSchema, + PublicGameInfoSchema, + PublicGamesSchema, +} from "../core/Schemas"; + +export type WorkerLobbyList = z.infer; +export type WorkerReady = z.infer; +export type MasterLobbiesBroadcast = z.infer< + typeof MasterLobbiesBroadcastSchema +>; +export type MasterCreateGame = z.infer; +export type WorkerMessage = z.infer; +export type MasterMessage = z.infer; + +// --- Worker Messages --- + +// Worker tells the master about its lobbies. +const WorkerLobbyListSchema = z.object({ + type: z.literal("lobbyList"), + lobbies: z.array(PublicGameInfoSchema), +}); + +const WorkerReadySchema = z.object({ + type: z.literal("workerReady"), + workerId: z.number(), +}); + +export const WorkerMessageSchema = z.discriminatedUnion("type", [ + WorkerLobbyListSchema, + WorkerReadySchema, +]); + +// --- Master Messages --- + +// Broadcasts all public game info to all workers. +// Workers need information on all public lobbies so +// it can send it to the client. +const MasterLobbiesBroadcastSchema = z.object({ + type: z.literal("lobbiesBroadcast"), + publicGames: PublicGamesSchema, +}); + +// Master sends a message to worker to schedule a new public game/lobby. +const MasterCreateGameSchema = z.object({ + type: z.literal("createGame"), + gameID: z.string(), + gameConfig: GameConfigSchema, + startsAt: z.number(), +}); + +export const MasterMessageSchema = z.discriminatedUnion("type", [ + MasterLobbiesBroadcastSchema, + MasterCreateGameSchema, +]); diff --git a/src/server/Master.ts b/src/server/Master.ts index c2b131c2d..a9322d8ff 100644 --- a/src/server/Master.ts +++ b/src/server/Master.ts @@ -5,19 +5,16 @@ import rateLimit from "express-rate-limit"; import http from "http"; import path from "path"; import { fileURLToPath } from "url"; -import { WebSocket, WebSocketServer } from "ws"; import { GameEnv } from "../core/configuration/Config"; import { getServerConfigFromServer } from "../core/configuration/ConfigLoader"; -import { GameInfo } from "../core/Schemas"; -import { generateID } from "../core/Util"; import { logger } from "./Logger"; import { MapPlaylist } from "./MapPlaylist"; -import { startPolling } from "./PollingLoop"; +import { MasterLobbyService } from "./MasterLobbyService"; import { renderHtml } from "./RenderHtml"; const config = getServerConfigFromServer(); const playlist = new MapPlaylist(); -const readyWorkers = new Set(); +let lobbyService: MasterLobbyService; const app = express(); const server = http.createServer(app); @@ -68,33 +65,6 @@ app.use( }), ); -let publicLobbiesData: { lobbies: GameInfo[] } = { lobbies: [] }; - -const publicLobbyIDs: Set = new Set(); -const connectedClients: Set = new Set(); - -// Broadcast lobbies to all connected clients -function broadcastLobbies() { - const message = JSON.stringify({ - type: "lobbies_update", - data: publicLobbiesData, - }); - - const clientsToRemove: WebSocket[] = []; - - connectedClients.forEach((client) => { - if (client.readyState === WebSocket.OPEN) { - client.send(message); - } else { - clientsToRemove.push(client); - } - }); - - clientsToRemove.forEach((client) => { - connectedClients.delete(client); - }); -} - // Start the master process export async function startMaster() { if (!cluster.isPrimary) { @@ -106,36 +76,7 @@ export async function startMaster() { log.info(`Primary ${process.pid} is running`); log.info(`Setting up ${config.numWorkers()} workers...`); - // Setup WebSocket server for clients - const wss = new WebSocketServer({ server, path: "/lobbies" }); - - wss.on("connection", (ws: WebSocket) => { - connectedClients.add(ws); - - // Send current lobbies immediately (always send, even if empty) - ws.send( - JSON.stringify({ type: "lobbies_update", data: publicLobbiesData }), - ); - - ws.on("close", () => { - connectedClients.delete(ws); - }); - - ws.on("error", (error) => { - log.error(`WebSocket error:`, error); - connectedClients.delete(ws); - try { - if ( - ws.readyState === WebSocket.OPEN || - ws.readyState === WebSocket.CONNECTING - ) { - ws.close(1011, "WebSocket internal error"); - } - } catch (closeError) { - log.error("Error while closing WebSocket after error:", closeError); - } - }); - }); + lobbyService = new MasterLobbyService(config, playlist, log); // Generate admin token for worker authentication const ADMIN_TOKEN = crypto.randomBytes(16).toString("hex"); @@ -157,44 +98,21 @@ export async function startMaster() { INSTANCE_ID, }); + lobbyService.registerWorker(i, worker); log.info(`Started worker ${i} (PID: ${worker.process.pid})`); } - cluster.on("message", (worker, message) => { - if (message.type === "WORKER_READY") { - const workerId = message.workerId; - readyWorkers.add(workerId); - log.info( - `Worker ${workerId} is ready. (${readyWorkers.size}/${config.numWorkers()} ready)`, - ); - // Start scheduling when all workers are ready - if (readyWorkers.size === config.numWorkers()) { - log.info("All workers ready, starting game scheduling"); - - const scheduleLobbies = () => { - schedulePublicGame(playlist).catch((error) => { - log.error("Error scheduling public game:", error); - }); - }; - - startPolling(async () => { - const lobbies = await fetchLobbies(); - if (lobbies === 0) { - scheduleLobbies(); - } - }, 100); - } - } - }); - // Handle worker crashes cluster.on("exit", (worker, code, signal) => { const workerId = (worker as any).process?.env?.WORKER_ID; - if (!workerId) { + if (workerId === undefined) { log.error(`worker crashed could not find id`); return; } + const workerIdNum = parseInt(workerId); + lobbyService.removeWorker(workerIdNum); + log.warn( `Worker ${workerId} (PID: ${worker.process.pid}) died with code: ${code} and signal: ${signal}`, ); @@ -207,6 +125,7 @@ export async function startMaster() { INSTANCE_ID, }); + lobbyService.registerWorker(workerIdNum, newWorker); log.info( `Restarted worker ${workerId} (New PID: ${newWorker.process.pid})`, ); @@ -226,115 +145,6 @@ app.get("/api/env", async (req, res) => { res.json(envConfig); }); -// Add lobbies endpoint to list public games for this worker -app.get("/api/public_lobbies", async (req, res) => { - res.json(publicLobbiesData); -}); - -async function fetchLobbies(): Promise { - const fetchPromises: Promise[] = []; - - for (const gameID of new Set(publicLobbyIDs)) { - const controller = new AbortController(); - setTimeout(() => controller.abort(), 5000); // 5 second timeout - const port = config.workerPort(gameID); - const promise = fetch(`http://localhost:${port}/api/game/${gameID}`, { - headers: { [config.adminHeader()]: config.adminToken() }, - signal: controller.signal, - }) - .then((resp) => resp.json()) - .then((json) => { - return json as GameInfo; - }) - .catch((error) => { - log.error(`Error fetching game ${gameID}:`, error); - // Return null or a placeholder if fetch fails - publicLobbyIDs.delete(gameID); - return null; - }); - - fetchPromises.push(promise); - } - - // Wait for all promises to resolve - const results = await Promise.all(fetchPromises); - - // Filter out any null results from failed fetches - const lobbyInfos: GameInfo[] = results - .filter((result) => result !== null) - .map((gi: GameInfo) => { - return { - gameID: gi.gameID, - numClients: gi?.clients?.length ?? 0, - gameConfig: gi.gameConfig, - msUntilStart: gi.msUntilStart, - } as GameInfo; - }); - - lobbyInfos.forEach((l) => { - if ( - "msUntilStart" in l && - l.msUntilStart !== undefined && - l.msUntilStart <= 250 - ) { - publicLobbyIDs.delete(l.gameID); - return; - } - - if ( - "gameConfig" in l && - l.gameConfig !== undefined && - "maxPlayers" in l.gameConfig && - l.gameConfig.maxPlayers !== undefined && - "numClients" in l && - l.numClients !== undefined && - l.gameConfig.maxPlayers <= l.numClients - ) { - publicLobbyIDs.delete(l.gameID); - return; - } - }); - - // Update the lobbies data - publicLobbiesData = { - lobbies: lobbyInfos, - }; - - broadcastLobbies(); - - return publicLobbyIDs.size; -} - -// Function to schedule a new public game -async function schedulePublicGame(playlist: MapPlaylist) { - const gameID = generateID(); - publicLobbyIDs.add(gameID); - - const workerPath = config.workerPath(gameID); - - // Send request to the worker to start the game - try { - const response = await fetch( - `http://localhost:${config.workerPort(gameID)}/api/create_game/${gameID}`, - { - method: "POST", - headers: { - "Content-Type": "application/json", - [config.adminHeader()]: config.adminToken(), - }, - body: JSON.stringify(await playlist.gameConfig()), - }, - ); - - if (!response.ok) { - throw new Error(`Failed to schedule public game: ${response.statusText}`); - } - } catch (error) { - log.error(`Failed to schedule public game on worker ${workerPath}:`, error); - throw error; - } -} - // SPA fallback route app.get("*", async function (_req, res) { try { diff --git a/src/server/MasterLobbyService.ts b/src/server/MasterLobbyService.ts new file mode 100644 index 000000000..c82684cd9 --- /dev/null +++ b/src/server/MasterLobbyService.ts @@ -0,0 +1,135 @@ +import { Worker } from "cluster"; +import winston from "winston"; +import { ServerConfig } from "../core/configuration/Config"; +import { PublicGameInfo } from "../core/Schemas"; +import { generateID } from "../core/Util"; +import { + MasterCreateGame, + MasterLobbiesBroadcast, + WorkerMessageSchema, +} from "./IPCBridgeSchema"; +import { logger } from "./Logger"; +import { MapPlaylist } from "./MapPlaylist"; +import { startPolling } from "./PollingLoop"; + +export interface MasterLobbyServiceOptions { + config: ServerConfig; + playlist: MapPlaylist; + log: typeof logger; +} + +export class MasterLobbyService { + private readonly workers = new Map(); + // Worker id => the lobbies it owns. + private readonly workerLobbies = new Map(); + private readonly readyWorkers = new Set(); + private started = false; + + constructor( + private config: ServerConfig, + private playlist: MapPlaylist, + private log: winston.Logger, + ) {} + + registerWorker(workerId: number, worker: Worker) { + this.workers.set(workerId, worker); + + worker.on("message", (raw: unknown) => { + const result = WorkerMessageSchema.safeParse(raw); + if (!result.success) { + this.log.error("Invalid IPC message from worker:", raw); + return; + } + + const msg = result.data; + switch (msg.type) { + case "workerReady": + this.handleWorkerReady(msg.workerId); + break; + case "lobbyList": + this.workerLobbies.set(workerId, msg.lobbies); + break; + } + }); + } + + removeWorker(workerId: number) { + this.workers.delete(workerId); + this.workerLobbies.delete(workerId); + this.readyWorkers.delete(workerId); + } + + private handleWorkerReady(workerId: number) { + this.readyWorkers.add(workerId); + this.log.info( + `Worker ${workerId} is ready. (${this.readyWorkers.size}/${this.config.numWorkers()} ready)`, + ); + if (this.readyWorkers.size === this.config.numWorkers() && !this.started) { + this.started = true; + this.log.info("All workers ready, starting game scheduling"); + startPolling(async () => this.broadcastLobbies(), 250); + startPolling(async () => await this.maybeScheduleLobby(), 1000); + } + } + + private getAllLobbies(): PublicGameInfo[] { + const lobbies = Array.from(this.workerLobbies.values()) + .flat() + .sort((a, b) => a.startsAt! - b.startsAt); + return lobbies; + } + + private broadcastLobbies() { + const msg = { + type: "lobbiesBroadcast", + publicGames: { + serverTime: Date.now(), + games: this.getAllLobbies(), + }, + } satisfies MasterLobbiesBroadcast; + for (const worker of this.workers.values()) { + worker.send(msg, (e) => { + if (e) { + this.log.error("Failed to send lobbies broadcast to worker:", e); + } + }); + } + } + + private async maybeScheduleLobby() { + const lobbies = this.getAllLobbies(); + if (lobbies.length >= 2) { + return; + } + + const lastStart = lobbies.reduce( + (max, pb) => Math.max(max, pb.startsAt), + Date.now(), + ); + + const gameID = generateID(); + const workerId = this.config.workerIndex(gameID); + + const gameConfig = await this.playlist.gameConfig(); + const worker = this.workers.get(workerId); + if (!worker) { + this.log.error(`Worker ${workerId} not found`); + return; + } + + worker.send( + { + type: "createGame", + gameID, + gameConfig, + startsAt: lastStart + this.config.gameCreationRate(), + } satisfies MasterCreateGame, + (e) => { + if (e) { + this.log.error("Failed to schedule lobby on worker:", e); + } + }, + ); + this.log.info(`Scheduled public game ${gameID} on worker ${workerId}`); + } +} diff --git a/src/server/Worker.ts b/src/server/Worker.ts index e9f340c27..2d8d8dcea 100644 --- a/src/server/Worker.ts +++ b/src/server/Worker.ts @@ -30,6 +30,7 @@ import { MapPlaylist } from "./MapPlaylist"; import { startPolling } from "./PollingLoop"; import { PrivilegeRefresher } from "./PrivilegeRefresher"; import { verifyTurnstileToken } from "./Turnstile"; +import { WorkerLobbyService } from "./WorkerLobbyService"; import { initWorkerMetrics } from "./WorkerMetrics"; const config = getServerConfigFromServer(); @@ -42,6 +43,18 @@ const playlist = new MapPlaylist(true); export async function startWorker() { log.info(`Worker starting...`); + const __filename = fileURLToPath(import.meta.url); + const __dirname = path.dirname(__filename); + + const app = express(); + const server = http.createServer(app); + const wss = new WebSocketServer({ noServer: true }); + + const gm = new GameManager(config, log); + + // Initialize lobby service (handles WebSocket upgrade routing) + const lobbyService = new WorkerLobbyService(server, wss, gm, log); + setTimeout( () => { startMatchmakingPolling(gm); @@ -49,15 +62,6 @@ export async function startWorker() { 1000 + Math.random() * 2000, ); - const __filename = fileURLToPath(import.meta.url); - const __dirname = path.dirname(__filename); - - const app = express(); - const server = http.createServer(app); - const wss = new WebSocketServer({ server }); - - const gm = new GameManager(config, log); - if (config.otelEnabled()) { initWorkerMetrics(gm); } @@ -459,13 +463,8 @@ export async function startWorker() { log.info(`running on http://localhost:${PORT}`); log.info(`Handling requests with path prefix /w${workerId}/`); // Signal to the master process that this worker is ready - if (process.send) { - process.send({ - type: "WORKER_READY", - workerId: workerId, - }); - log.info(`signaled ready state to master`); - } + lobbyService.sendReady(workerId); + log.info(`signaled ready state to master`); }); // Global error handler diff --git a/src/server/WorkerLobbyService.ts b/src/server/WorkerLobbyService.ts new file mode 100644 index 000000000..d06609294 --- /dev/null +++ b/src/server/WorkerLobbyService.ts @@ -0,0 +1,136 @@ +import http from "http"; +import { WebSocket, WebSocketServer } from "ws"; +import { PublicGameInfo, PublicGames } from "../core/Schemas"; +import { GameManager } from "./GameManager"; +import { + MasterMessageSchema, + WorkerLobbyList, + WorkerReady, +} from "./IPCBridgeSchema"; +import { logger } from "./Logger"; + +export class WorkerLobbyService { + private readonly lobbiesWss: WebSocketServer; + private readonly lobbyClients: Set = new Set(); + + constructor( + private readonly server: http.Server, + private readonly gameWss: WebSocketServer, + private readonly gm: GameManager, + private readonly log: typeof logger, + ) { + this.lobbiesWss = new WebSocketServer({ noServer: true }); + this.setupUpgradeHandler(); + this.setupLobbiesWebSocket(); + this.setupIPCListener(); + } + + private setupIPCListener() { + process.on("message", (raw: unknown) => { + const result = MasterMessageSchema.safeParse(raw); + if (!result.success) { + this.log.error("Invalid IPC message from master:", raw); + return; + } + + const msg = result.data; + switch (msg.type) { + case "lobbiesBroadcast": + // Forward message to all clients + this.broadcastLobbiesToClients(msg.publicGames); + // Update master with my lobby info + this.sendMyLobbiesToMaster(); + break; + case "createGame": + if (this.gm.game(msg.gameID) !== null) { + this.log.warn(`Game ${msg.gameID} already exists, skipping create`); + return; + } + this.log.info(`Creating public game ${msg.gameID} from master`); + this.gm.createGame( + msg.gameID, + msg.gameConfig, + undefined, + msg.startsAt, + ); + break; + } + }); + } + + sendReady(workerId: number) { + const msg: WorkerReady = { type: "workerReady", workerId }; + process.send?.(msg); + } + + private sendMyLobbiesToMaster() { + const lobbies = this.gm + .publicLobbies() + .map((g) => g.gameInfo()) + .map((gi) => { + return { + gameID: gi.gameID, + numClients: gi.clients?.length ?? 0, + startsAt: gi.startsAt!, + gameConfig: gi.gameConfig, + } satisfies PublicGameInfo; + }); + process.send?.({ type: "lobbyList", lobbies } satisfies WorkerLobbyList); + } + + private setupUpgradeHandler() { + this.server.on("upgrade", (request, socket, head) => { + const pathname = request.url ?? ""; + if (pathname === "/lobbies" || pathname.endsWith("/lobbies")) { + this.lobbiesWss.handleUpgrade(request, socket, head, (ws) => { + this.lobbiesWss.emit("connection", ws, request); + }); + } else { + this.gameWss.handleUpgrade(request, socket, head, (ws) => { + this.gameWss.emit("connection", ws, request); + }); + } + }); + } + + private setupLobbiesWebSocket() { + this.lobbiesWss.on("connection", (ws: WebSocket) => { + this.lobbyClients.add(ws); + ws.on("close", () => { + this.lobbyClients.delete(ws); + }); + + ws.on("error", (error) => { + this.log.error(`Lobbies WebSocket error:`, error); + this.lobbyClients.delete(ws); + try { + if ( + ws.readyState === WebSocket.OPEN || + ws.readyState === WebSocket.CONNECTING + ) { + ws.close(1011, "WebSocket internal error"); + } + } catch (closeError) { + this.log.error("Error closing lobbies WebSocket:", closeError); + } + }); + }); + } + + private broadcastLobbiesToClients(publicGames: PublicGames) { + const message = JSON.stringify(publicGames); + + const clientsToRemove: WebSocket[] = []; + this.lobbyClients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(message); + } else { + clientsToRemove.push(client); + } + }); + + clientsToRemove.forEach((client) => { + this.lobbyClients.delete(client); + }); + } +} diff --git a/tests/client/LobbySocket.test.ts b/tests/client/LobbySocket.test.ts deleted file mode 100644 index 3e6e8d901..000000000 --- a/tests/client/LobbySocket.test.ts +++ /dev/null @@ -1,113 +0,0 @@ -import { PublicLobbySocket } from "../../src/client/LobbySocket"; - -class MockWebSocket extends EventTarget { - static instances: MockWebSocket[] = []; - static readonly OPEN = 1; - static readonly CLOSED = 3; - - readonly url: string; - readyState = MockWebSocket.OPEN; - - constructor(url: string) { - super(); - this.url = url; - MockWebSocket.instances.push(this); - } - - addEventListener( - type: string, - listener: EventListenerOrEventListenerObject, - options?: boolean | AddEventListenerOptions, - ): void { - super.addEventListener(type, listener, options); - } - - close(code?: number, reason?: string) { - this.readyState = MockWebSocket.CLOSED; - this.dispatchEvent(new CloseEvent("close", { code, reason })); - } - - send(_data: unknown) {} -} - -describe("PublicLobbySocket", () => { - const originalWebSocket = globalThis.WebSocket; - const originalFetch = globalThis.fetch; - - beforeEach(() => { - MockWebSocket.instances = []; - // @ts-expect-error assign test mock - globalThis.WebSocket = MockWebSocket; - }); - - afterEach(() => { - globalThis.WebSocket = originalWebSocket; - globalThis.fetch = originalFetch; - vi.useRealTimers(); - }); - - it("delivers lobby updates from websocket messages", () => { - const updates: unknown[][] = []; - const socket = new PublicLobbySocket((lobbies) => updates.push(lobbies)); - - socket.start(); - const ws = MockWebSocket.instances.at(-1); - expect(ws?.url).toContain("/lobbies"); - - ws?.dispatchEvent( - new MessageEvent("message", { - data: JSON.stringify({ - type: "lobbies_update", - data: { - lobbies: [ - { - gameID: "g1", - numClients: 1, - gameConfig: { - maxPlayers: 2, - gameMode: 0, - gameMap: "Earth", - }, - }, - ], - }, - }), - }), - ); - - expect(updates).toHaveLength(1); - expect((updates[0][0] as { gameID: string }).gameID).toBe("g1"); - - socket.stop(); - }); - - it("falls back to HTTP polling after max websocket attempts", async () => { - vi.useFakeTimers(); - - const fetchMock = vi.fn().mockResolvedValue({ - ok: true, - json: async () => ({ lobbies: [] }), - }); - - globalThis.fetch = fetchMock as unknown as typeof fetch; - - const socket = new PublicLobbySocket(() => {}, { - maxWsAttempts: 1, - reconnectDelay: 0, - pollIntervalMs: 50, - }); - - socket.start(); - const ws = MockWebSocket.instances.at(-1); - ws?.dispatchEvent(new CloseEvent("close")); - - await Promise.resolve(); - expect(fetchMock).toHaveBeenCalledTimes(1); - - vi.advanceTimersByTime(60); - await Promise.resolve(); - expect(fetchMock).toHaveBeenCalledTimes(2); - - socket.stop(); - }); -});