move lobby websockets to worker (#2974)

## Description:

Currently only the master process sends public lobby updates to clients.
This is not scalable since it could overload the master process.

In this PR, the master uses IPC to send public lobby info to all
workers. Then clients connect to a random worker to get public lobby
updates via websocket. This way clients never connect directly to the
master websocket.

The flow looks like this:

Every 100ms:
1. Master schedules a public game on a random worker if new games are
needed
2. Master broadcasts public lobby info to all workers (all public games
& num clients connected to each game)
3. Each worker responds to that update with the number of clients
connected to its own public games
4. Master then updates its public lobby state so it knows how many
clients are connected to each public game

## Please complete the following:

- [x] I have added screenshots for all UI updates
- [x] I process any text displayed to the user through translateText()
and I've added it to the en.json file
- [x] I have added relevant tests to the test directory
- [x] I confirm I have thoroughly tested these changes and take full
responsibility for any bugs introduced

## Please put your Discord username so you can be contacted if a bug or
regression is found:

evan
This commit is contained in:
Evan
2026-02-03 18:26:38 -08:00
committed by GitHub
parent 9294b73a88
commit 294a1b4784
15 changed files with 437 additions and 472 deletions
-19
View File
@@ -102,25 +102,6 @@ server {
add_header Cache-Control "public, max-age=86400"; # 24 hours
}
# /api/public_lobbies endpoint - Cache for 1 second to handle high request volume
location = /api/public_lobbies {
proxy_pass http://127.0.0.1:3000;
proxy_http_version 1.1;
# Cache configuration
proxy_cache API_CACHE;
proxy_cache_valid 200 1s;
proxy_cache_use_stale updating error timeout http_500 http_502 http_503 http_504;
proxy_cache_lock on;
add_header X-Cache-Status $upstream_cache_status;
# Standard proxy headers
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# /api/env endpoint - Cache for 1 hour
location = /api/env {
proxy_pass http://127.0.0.1:3000;
+9 -22
View File
@@ -70,7 +70,7 @@ export class JoinLobbyModal extends BaseModal {
}
this.updateFromLobby({
...lobby,
msUntilStart: lobby.msUntilStart ?? undefined,
startsAt: lobby.startsAt ?? undefined,
});
};
@@ -94,7 +94,7 @@ export class JoinLobbyModal extends BaseModal {
})
: translateText("public_lobby.started");
const maxPlayers = this.gameConfig?.maxPlayers ?? 0;
const playerCount = this.playerCount;
const playerCount = this.players?.length ?? 0;
const hostClientID = this.isPrivateLobby()
? (this.lobbyCreatorClientID ?? "")
: "";
@@ -283,13 +283,13 @@ export class JoinLobbyModal extends BaseModal {
`;
}
public open(lobbyId: string = "", lobbyInfo?: GameInfo) {
public open(lobbyId: string = "", isPublic: boolean = false) {
super.open();
if (lobbyId) {
this.startTrackingLobby(lobbyId, lobbyInfo);
this.startTrackingLobby(lobbyId);
// If opened with lobbyInfo (public lobby case), auto-join the lobby
if (lobbyInfo) {
this.joinPublicLobby(lobbyId, lobbyInfo);
if (isPublic) {
this.joinPublicLobby(lobbyId);
} else {
// If opened with lobbyId but no lobbyInfo (URL join case), check if active and join
this.handleUrlJoin(lobbyId);
@@ -329,14 +329,13 @@ export class JoinLobbyModal extends BaseModal {
}
}
private joinPublicLobby(lobbyId: string, lobbyInfo: GameInfo) {
private joinPublicLobby(lobbyId: string) {
// Dispatch join-lobby event to actually connect to the lobby
this.dispatchEvent(
new CustomEvent("join-lobby", {
detail: {
gameID: lobbyId,
clientID: this.currentClientID,
publicLobbyInfo: lobbyInfo,
} as JoinLobbyEvent,
bubbles: true,
composed: true,
@@ -349,7 +348,6 @@ export class JoinLobbyModal extends BaseModal {
this.currentClientID = getClientIDForGame(lobbyId);
this.gameConfig = null;
this.players = [];
this.playerCount = 0;
this.nationCount = 0;
this.lobbyStartAt = null;
this.lobbyCreatorClientID = null;
@@ -397,7 +395,6 @@ export class JoinLobbyModal extends BaseModal {
if (this.lobbyIdInput) this.lobbyIdInput.value = "";
this.gameConfig = null;
this.players = [];
this.playerCount = 0;
this.currentLobbyId = "";
this.currentClientID = "";
this.nationCount = 0;
@@ -536,18 +533,8 @@ export class JoinLobbyModal extends BaseModal {
// --- Lobby event handling ---
private updateFromLobby(lobby: GameInfo) {
if (lobby.clients) {
this.players = lobby.clients;
this.playerCount = lobby.clients.length;
} else {
this.players = [];
this.playerCount = lobby.numClients ?? 0;
}
if (lobby.msUntilStart !== undefined) {
this.lobbyStartAt = lobby.msUntilStart + Date.now();
} else {
this.lobbyStartAt = null;
}
this.players = lobby.clients ?? [];
this.lobbyStartAt = lobby.startsAt ?? null;
this.syncCountdownTimer();
if (lobby.gameConfig) {
const mapChanged = this.gameConfig?.gameMap !== lobby.gameConfig.gameMap;
+20 -51
View File
@@ -1,6 +1,5 @@
import { GameInfo } from "../core/Schemas";
type LobbyUpdateHandler = (lobbies: GameInfo[]) => void;
import { getServerConfigFromClient } from "../core/configuration/ConfigLoader";
import { PublicGames, PublicGamesSchema } from "../core/Schemas";
interface LobbySocketOptions {
reconnectDelay?: number;
@@ -8,36 +7,39 @@ interface LobbySocketOptions {
pollIntervalMs?: number;
}
function getRandomWorkerPath(numWorkers: number): string {
const workerIndex = Math.floor(Math.random() * numWorkers);
return `/w${workerIndex}`;
}
export class PublicLobbySocket {
private ws: WebSocket | null = null;
private wsReconnectTimeout: number | null = null;
private fallbackPollInterval: number | null = null;
private wsConnectionAttempts = 0;
private wsAttemptCounted = false;
private workerPath: string = "";
private readonly reconnectDelay: number;
private readonly maxWsAttempts: number;
private readonly pollIntervalMs: number;
private readonly onLobbiesUpdate: LobbyUpdateHandler;
constructor(
onLobbiesUpdate: LobbyUpdateHandler,
private onLobbiesUpdate: (data: PublicGames) => void,
options?: LobbySocketOptions,
) {
this.onLobbiesUpdate = onLobbiesUpdate;
this.reconnectDelay = options?.reconnectDelay ?? 3000;
this.maxWsAttempts = options?.maxWsAttempts ?? 3;
this.pollIntervalMs = options?.pollIntervalMs ?? 1000;
}
start() {
async start() {
this.wsConnectionAttempts = 0;
// Get config to determine number of workers, then pick a random one
const config = await getServerConfigFromClient();
this.workerPath = getRandomWorkerPath(config.numWorkers());
this.connectWebSocket();
}
stop() {
this.disconnectWebSocket();
this.stopFallbackPolling();
}
private connectWebSocket() {
@@ -49,7 +51,7 @@ export class PublicLobbySocket {
}
const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
const wsUrl = `${protocol}//${window.location.host}/lobbies`;
const wsUrl = `${protocol}//${window.location.host}${this.workerPath}/lobbies`;
this.ws = new WebSocket(wsUrl);
this.wsAttemptCounted = false;
@@ -70,15 +72,14 @@ export class PublicLobbySocket {
clearTimeout(this.wsReconnectTimeout);
this.wsReconnectTimeout = null;
}
this.stopFallbackPolling();
}
private handleMessage(event: MessageEvent) {
try {
const message = JSON.parse(event.data as string);
if (message.type === "lobbies_update") {
this.onLobbiesUpdate(message.data?.lobbies ?? []);
}
const publicGames = PublicGamesSchema.parse(
JSON.parse(event.data as string),
);
this.onLobbiesUpdate(publicGames);
} catch (error) {
console.error("Error parsing WebSocket message:", error);
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
@@ -101,10 +102,7 @@ export class PublicLobbySocket {
this.wsConnectionAttempts++;
}
if (this.wsConnectionAttempts >= this.maxWsAttempts) {
console.log(
"Max WebSocket attempts reached, falling back to HTTP polling",
);
this.startFallbackPolling();
console.error("Max WebSocket attempts reached");
} else {
this.scheduleReconnect();
}
@@ -121,7 +119,7 @@ export class PublicLobbySocket {
this.wsConnectionAttempts++;
}
if (this.wsConnectionAttempts >= this.maxWsAttempts) {
this.startFallbackPolling();
alert("error connecting to game service");
} else {
this.scheduleReconnect();
}
@@ -145,33 +143,4 @@ export class PublicLobbySocket {
this.wsReconnectTimeout = null;
}
}
private startFallbackPolling() {
if (this.fallbackPollInterval !== null) return;
console.log("Starting HTTP fallback polling");
this.fetchLobbiesHTTP();
this.fallbackPollInterval = window.setInterval(() => {
this.fetchLobbiesHTTP();
}, this.pollIntervalMs);
}
private stopFallbackPolling() {
if (this.fallbackPollInterval !== null) {
clearInterval(this.fallbackPollInterval);
this.fallbackPollInterval = null;
}
}
private async fetchLobbiesHTTP() {
try {
const response = await fetch(`/api/public_lobbies`);
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
this.onLobbiesUpdate(data.lobbies as GameInfo[]);
} catch (error) {
console.error("Error fetching lobbies via HTTP:", error);
}
}
}
+3 -9
View File
@@ -1,12 +1,7 @@
import version from "resources/version.txt?raw";
import { UserMeResponse } from "../core/ApiSchemas";
import { EventBus } from "../core/EventBus";
import {
GAME_ID_REGEX,
GameInfo,
GameRecord,
GameStartInfo,
} from "../core/Schemas";
import { GAME_ID_REGEX, GameRecord, GameStartInfo } from "../core/Schemas";
import { GameEnv } from "../core/configuration/Config";
import { getServerConfigFromClient } from "../core/configuration/ConfigLoader";
import { GameType } from "../core/game/Game";
@@ -223,7 +218,6 @@ export interface JoinLobbyEvent {
// GameRecord exists when replaying an archived game.
gameRecord?: GameRecord;
source?: "public" | "private" | "host" | "matchmaking" | "singleplayer";
publicLobbyInfo?: GameInfo;
}
class Client {
@@ -773,7 +767,7 @@ class Client {
}
const config = await getServerConfigFromClient();
// Only update URL immediately for private lobbies, not public ones
if (!lobby.publicLobbyInfo && lobby.source !== "public") {
if (lobby.source !== "public") {
this.updateJoinUrlForShare(lobby.gameID, config);
}
@@ -908,7 +902,7 @@ class Client {
// Open the join lobby modal page and pass the lobby info
window.showPage?.("page-join-lobby");
this.joinModal?.open(lobby.gameID, lobby);
this.joinModal?.open(lobby.gameID, true);
}
private async handleLeaveLobby(/* event: CustomEvent */) {
+22 -13
View File
@@ -1,7 +1,7 @@
import { html, LitElement } from "lit";
import { customElement, state } from "lit/decorators.js";
import { GameMapType } from "../core/game/Game";
import { GameID, GameInfo } from "../core/Schemas";
import { GameID, PublicGameInfo, PublicGames } from "../core/Schemas";
import { PublicLobbySocket } from "./LobbySocket";
import { terrainMapFileLoader } from "./TerrainMapFileLoader";
import {
@@ -13,16 +13,19 @@ import {
} from "./Utils";
export interface ShowPublicLobbyModalEvent {
lobby: GameInfo;
lobby: PublicGameInfo;
}
@customElement("public-lobby")
export class PublicLobby extends LitElement {
@state() private lobbies: GameInfo[] = [];
@state() private publicGames: PublicGames | null = null;
@state() public isLobbyHighlighted: boolean = false;
@state() private mapImages: Map<GameID, string> = new Map();
private lobbyIDToStart = new Map<GameID, number>();
private lobbySocket = new PublicLobbySocket((lobbies) =>
this.handleLobbiesUpdate(lobbies),
private serverTimeOffset = 0;
private lobbySocket = new PublicLobbySocket((data) =>
this.handleLobbiesUpdate(data),
);
createRenderRoot() {
@@ -39,12 +42,18 @@ export class PublicLobby extends LitElement {
this.lobbySocket.stop();
}
private handleLobbiesUpdate(lobbies: GameInfo[]) {
this.lobbies = lobbies;
this.lobbies.forEach((l) => {
private handleLobbiesUpdate(publicGames: PublicGames) {
this.publicGames = publicGames;
// Calculate offset between server time and client time
if (this.publicGames) {
this.serverTimeOffset = this.publicGames.serverTime - Date.now();
}
this.publicGames.games.forEach((l) => {
if (!this.lobbyIDToStart.has(l.gameID)) {
const msUntilStart = l.msUntilStart ?? 0;
this.lobbyIDToStart.set(l.gameID, msUntilStart + Date.now());
// Convert server's startsAt to client time by subtracting offset
const startsAt = l.startsAt ?? Date.now();
this.lobbyIDToStart.set(l.gameID, startsAt - this.serverTimeOffset);
}
if (l.gameConfig && !this.mapImages.has(l.gameID)) {
@@ -66,9 +75,9 @@ export class PublicLobby extends LitElement {
}
render() {
if (this.lobbies.length === 0) return html``;
if (!this.publicGames) return html``;
const lobby = this.lobbies[0];
const lobby = this.publicGames.games[0];
if (!lobby?.gameConfig) return html``;
const start = this.lobbyIDToStart.get(lobby.gameID) ?? 0;
@@ -200,7 +209,7 @@ export class PublicLobby extends LitElement {
this.lobbySocket.stop();
}
private lobbyClicked(lobby: GameInfo) {
private lobbyClicked(lobby: PublicGameInfo) {
// Validate username before opening the modal
const usernameInput = document.querySelector("username-input") as any;
if (
+16 -9
View File
@@ -136,6 +136,9 @@ export type PlayerPattern = z.infer<typeof PlayerPatternSchema>;
export type PlayerColor = z.infer<typeof PlayerColorSchema>;
export type Flag = z.infer<typeof FlagSchema>;
export type GameStartInfo = z.infer<typeof GameStartInfoSchema>;
export type GameInfo = z.infer<typeof GameInfoSchema>;
export type PublicGames = z.infer<typeof PublicGamesSchema>;
export type PublicGameInfo = z.infer<typeof PublicGameInfoSchema>;
const ClientInfoSchema = z.object({
clientID: z.string(),
@@ -145,18 +148,22 @@ const ClientInfoSchema = z.object({
export const GameInfoSchema = z.object({
gameID: z.string(),
clients: z.array(ClientInfoSchema).optional(),
numClients: z.number().optional(),
msUntilStart: z.number().optional(),
startsAt: z.number().optional(),
serverTime: z.number(),
gameConfig: z.lazy(() => GameConfigSchema).optional(),
});
export interface GameInfo {
gameID: GameID;
clients?: ClientInfo[];
numClients?: number;
msUntilStart?: number;
gameConfig?: GameConfig;
}
export const PublicGameInfoSchema = z.object({
gameID: z.string(),
numClients: z.number(),
startsAt: z.number(),
gameConfig: z.lazy(() => GameConfigSchema).optional(),
});
export const PublicGamesSchema = z.object({
serverTime: z.number(),
games: PublicGameInfoSchema.array(),
});
export class LobbyInfoEvent implements GameEvent {
constructor(public lobby: GameInfo) {}
+8
View File
@@ -26,6 +26,12 @@ export class GameManager {
return this.games.get(id) ?? null;
}
public publicLobbies(): GameServer[] {
return Array.from(this.games.values()).filter(
(g) => g.phase() === GamePhase.Lobby && g.isPublic(),
);
}
joinClient(client: Client, gameID: GameID): boolean {
const game = this.games.get(gameID);
if (game) {
@@ -52,6 +58,7 @@ export class GameManager {
id: GameID,
gameConfig: GameConfig | undefined,
creatorClientID?: string,
startsAt?: number,
) {
const game = new GameServer(
id,
@@ -77,6 +84,7 @@ export class GameManager {
...gameConfig,
},
creatorClientID,
startsAt,
);
this.games.set(id, game);
return game;
+1 -2
View File
@@ -147,8 +147,7 @@ export function buildPreview(
activePlayers = countActivePlayers(players);
} else {
activePlayers =
countActivePlayers(players) ||
(lobby?.numClients ?? lobby?.clients?.length ?? 0);
countActivePlayers(players) || (lobby?.clients?.length ?? 0);
}
const map = lobby?.gameConfig?.gameMap ?? config.gameMap;
let mode = lobby?.gameConfig?.gameMode ?? config.gameMode ?? GameMode.FFA;
+7 -19
View File
@@ -88,6 +88,7 @@ export class GameServer {
private config: ServerConfig,
public gameConfig: GameConfig,
private lobbyCreatorID?: string,
private startsAt?: number,
) {
this.log = log_.child({ gameID: id });
}
@@ -506,15 +507,6 @@ export class GameServer {
return this.activeClients.length;
}
public startTime(): number {
if (this._startTime !== null && this._startTime > 0) {
return this._startTime;
} else {
//game hasn't started yet, only works for public games
return this.createdAt + this.config.gameCreationRate();
}
}
public prestart() {
if (this.hasStarted()) {
return;
@@ -787,8 +779,9 @@ export class GameServer {
}
}
const msSinceCreation = now - this.createdAt;
const lessThanLifetime = msSinceCreation < this.config.gameCreationRate();
// Public Games
const lessThanLifetime = Date.now() < this.startsAt!;
const notEnoughPlayers =
this.gameConfig.gameType === GameType.Public &&
this.gameConfig.maxPlayers &&
@@ -796,8 +789,7 @@ export class GameServer {
if (lessThanLifetime && notEnoughPlayers) {
return GamePhase.Lobby;
}
const warmupOver =
now > this.createdAt + this.config.gameCreationRate() + 30 * 1000;
const warmupOver = now > this.startsAt! + 30 * 1000;
if (noActive && warmupOver && noRecentPings) {
return GamePhase.Finished;
}
@@ -817,15 +809,11 @@ export class GameServer {
clientID: c.clientID,
})),
gameConfig: this.gameConfig,
msUntilStart: this.isPublic() ? this.getMsUntilStart() : undefined,
startsAt: this.startsAt,
serverTime: Date.now(),
};
}
private getMsUntilStart(): number {
const startTime = this.createdAt + this.config.gameCreationRate();
return Math.max(0, startTime - Date.now());
}
public isPublic(): boolean {
return this.gameConfig.gameType === GameType.Public;
}
+56
View File
@@ -0,0 +1,56 @@
import { z } from "zod";
import {
GameConfigSchema,
PublicGameInfoSchema,
PublicGamesSchema,
} from "../core/Schemas";
export type WorkerLobbyList = z.infer<typeof WorkerLobbyListSchema>;
export type WorkerReady = z.infer<typeof WorkerReadySchema>;
export type MasterLobbiesBroadcast = z.infer<
typeof MasterLobbiesBroadcastSchema
>;
export type MasterCreateGame = z.infer<typeof MasterCreateGameSchema>;
export type WorkerMessage = z.infer<typeof WorkerMessageSchema>;
export type MasterMessage = z.infer<typeof MasterMessageSchema>;
// --- Worker Messages ---
// Worker tells the master about its lobbies.
const WorkerLobbyListSchema = z.object({
type: z.literal("lobbyList"),
lobbies: z.array(PublicGameInfoSchema),
});
const WorkerReadySchema = z.object({
type: z.literal("workerReady"),
workerId: z.number(),
});
export const WorkerMessageSchema = z.discriminatedUnion("type", [
WorkerLobbyListSchema,
WorkerReadySchema,
]);
// --- Master Messages ---
// Broadcasts all public game info to all workers.
// Workers need information on all public lobbies so
// it can send it to the client.
const MasterLobbiesBroadcastSchema = z.object({
type: z.literal("lobbiesBroadcast"),
publicGames: PublicGamesSchema,
});
// Master sends a message to worker to schedule a new public game/lobby.
const MasterCreateGameSchema = z.object({
type: z.literal("createGame"),
gameID: z.string(),
gameConfig: GameConfigSchema,
startsAt: z.number(),
});
export const MasterMessageSchema = z.discriminatedUnion("type", [
MasterLobbiesBroadcastSchema,
MasterCreateGameSchema,
]);
+9 -199
View File
@@ -5,19 +5,16 @@ import rateLimit from "express-rate-limit";
import http from "http";
import path from "path";
import { fileURLToPath } from "url";
import { WebSocket, WebSocketServer } from "ws";
import { GameEnv } from "../core/configuration/Config";
import { getServerConfigFromServer } from "../core/configuration/ConfigLoader";
import { GameInfo } from "../core/Schemas";
import { generateID } from "../core/Util";
import { logger } from "./Logger";
import { MapPlaylist } from "./MapPlaylist";
import { startPolling } from "./PollingLoop";
import { MasterLobbyService } from "./MasterLobbyService";
import { renderHtml } from "./RenderHtml";
const config = getServerConfigFromServer();
const playlist = new MapPlaylist();
const readyWorkers = new Set();
let lobbyService: MasterLobbyService;
const app = express();
const server = http.createServer(app);
@@ -68,33 +65,6 @@ app.use(
}),
);
let publicLobbiesData: { lobbies: GameInfo[] } = { lobbies: [] };
const publicLobbyIDs: Set<string> = new Set();
const connectedClients: Set<WebSocket> = new Set();
// Broadcast lobbies to all connected clients
function broadcastLobbies() {
const message = JSON.stringify({
type: "lobbies_update",
data: publicLobbiesData,
});
const clientsToRemove: WebSocket[] = [];
connectedClients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
} else {
clientsToRemove.push(client);
}
});
clientsToRemove.forEach((client) => {
connectedClients.delete(client);
});
}
// Start the master process
export async function startMaster() {
if (!cluster.isPrimary) {
@@ -106,36 +76,7 @@ export async function startMaster() {
log.info(`Primary ${process.pid} is running`);
log.info(`Setting up ${config.numWorkers()} workers...`);
// Setup WebSocket server for clients
const wss = new WebSocketServer({ server, path: "/lobbies" });
wss.on("connection", (ws: WebSocket) => {
connectedClients.add(ws);
// Send current lobbies immediately (always send, even if empty)
ws.send(
JSON.stringify({ type: "lobbies_update", data: publicLobbiesData }),
);
ws.on("close", () => {
connectedClients.delete(ws);
});
ws.on("error", (error) => {
log.error(`WebSocket error:`, error);
connectedClients.delete(ws);
try {
if (
ws.readyState === WebSocket.OPEN ||
ws.readyState === WebSocket.CONNECTING
) {
ws.close(1011, "WebSocket internal error");
}
} catch (closeError) {
log.error("Error while closing WebSocket after error:", closeError);
}
});
});
lobbyService = new MasterLobbyService(config, playlist, log);
// Generate admin token for worker authentication
const ADMIN_TOKEN = crypto.randomBytes(16).toString("hex");
@@ -157,44 +98,21 @@ export async function startMaster() {
INSTANCE_ID,
});
lobbyService.registerWorker(i, worker);
log.info(`Started worker ${i} (PID: ${worker.process.pid})`);
}
cluster.on("message", (worker, message) => {
if (message.type === "WORKER_READY") {
const workerId = message.workerId;
readyWorkers.add(workerId);
log.info(
`Worker ${workerId} is ready. (${readyWorkers.size}/${config.numWorkers()} ready)`,
);
// Start scheduling when all workers are ready
if (readyWorkers.size === config.numWorkers()) {
log.info("All workers ready, starting game scheduling");
const scheduleLobbies = () => {
schedulePublicGame(playlist).catch((error) => {
log.error("Error scheduling public game:", error);
});
};
startPolling(async () => {
const lobbies = await fetchLobbies();
if (lobbies === 0) {
scheduleLobbies();
}
}, 100);
}
}
});
// Handle worker crashes
cluster.on("exit", (worker, code, signal) => {
const workerId = (worker as any).process?.env?.WORKER_ID;
if (!workerId) {
if (workerId === undefined) {
log.error(`worker crashed could not find id`);
return;
}
const workerIdNum = parseInt(workerId);
lobbyService.removeWorker(workerIdNum);
log.warn(
`Worker ${workerId} (PID: ${worker.process.pid}) died with code: ${code} and signal: ${signal}`,
);
@@ -207,6 +125,7 @@ export async function startMaster() {
INSTANCE_ID,
});
lobbyService.registerWorker(workerIdNum, newWorker);
log.info(
`Restarted worker ${workerId} (New PID: ${newWorker.process.pid})`,
);
@@ -226,115 +145,6 @@ app.get("/api/env", async (req, res) => {
res.json(envConfig);
});
// Add lobbies endpoint to list public games for this worker
app.get("/api/public_lobbies", async (req, res) => {
res.json(publicLobbiesData);
});
async function fetchLobbies(): Promise<number> {
const fetchPromises: Promise<GameInfo | null>[] = [];
for (const gameID of new Set(publicLobbyIDs)) {
const controller = new AbortController();
setTimeout(() => controller.abort(), 5000); // 5 second timeout
const port = config.workerPort(gameID);
const promise = fetch(`http://localhost:${port}/api/game/${gameID}`, {
headers: { [config.adminHeader()]: config.adminToken() },
signal: controller.signal,
})
.then((resp) => resp.json())
.then((json) => {
return json as GameInfo;
})
.catch((error) => {
log.error(`Error fetching game ${gameID}:`, error);
// Return null or a placeholder if fetch fails
publicLobbyIDs.delete(gameID);
return null;
});
fetchPromises.push(promise);
}
// Wait for all promises to resolve
const results = await Promise.all(fetchPromises);
// Filter out any null results from failed fetches
const lobbyInfos: GameInfo[] = results
.filter((result) => result !== null)
.map((gi: GameInfo) => {
return {
gameID: gi.gameID,
numClients: gi?.clients?.length ?? 0,
gameConfig: gi.gameConfig,
msUntilStart: gi.msUntilStart,
} as GameInfo;
});
lobbyInfos.forEach((l) => {
if (
"msUntilStart" in l &&
l.msUntilStart !== undefined &&
l.msUntilStart <= 250
) {
publicLobbyIDs.delete(l.gameID);
return;
}
if (
"gameConfig" in l &&
l.gameConfig !== undefined &&
"maxPlayers" in l.gameConfig &&
l.gameConfig.maxPlayers !== undefined &&
"numClients" in l &&
l.numClients !== undefined &&
l.gameConfig.maxPlayers <= l.numClients
) {
publicLobbyIDs.delete(l.gameID);
return;
}
});
// Update the lobbies data
publicLobbiesData = {
lobbies: lobbyInfos,
};
broadcastLobbies();
return publicLobbyIDs.size;
}
// Function to schedule a new public game
async function schedulePublicGame(playlist: MapPlaylist) {
const gameID = generateID();
publicLobbyIDs.add(gameID);
const workerPath = config.workerPath(gameID);
// Send request to the worker to start the game
try {
const response = await fetch(
`http://localhost:${config.workerPort(gameID)}/api/create_game/${gameID}`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
[config.adminHeader()]: config.adminToken(),
},
body: JSON.stringify(await playlist.gameConfig()),
},
);
if (!response.ok) {
throw new Error(`Failed to schedule public game: ${response.statusText}`);
}
} catch (error) {
log.error(`Failed to schedule public game on worker ${workerPath}:`, error);
throw error;
}
}
// SPA fallback route
app.get("*", async function (_req, res) {
try {
+135
View File
@@ -0,0 +1,135 @@
import { Worker } from "cluster";
import winston from "winston";
import { ServerConfig } from "../core/configuration/Config";
import { PublicGameInfo } from "../core/Schemas";
import { generateID } from "../core/Util";
import {
MasterCreateGame,
MasterLobbiesBroadcast,
WorkerMessageSchema,
} from "./IPCBridgeSchema";
import { logger } from "./Logger";
import { MapPlaylist } from "./MapPlaylist";
import { startPolling } from "./PollingLoop";
export interface MasterLobbyServiceOptions {
config: ServerConfig;
playlist: MapPlaylist;
log: typeof logger;
}
export class MasterLobbyService {
private readonly workers = new Map<number, Worker>();
// Worker id => the lobbies it owns.
private readonly workerLobbies = new Map<number, PublicGameInfo[]>();
private readonly readyWorkers = new Set<number>();
private started = false;
constructor(
private config: ServerConfig,
private playlist: MapPlaylist,
private log: winston.Logger,
) {}
registerWorker(workerId: number, worker: Worker) {
this.workers.set(workerId, worker);
worker.on("message", (raw: unknown) => {
const result = WorkerMessageSchema.safeParse(raw);
if (!result.success) {
this.log.error("Invalid IPC message from worker:", raw);
return;
}
const msg = result.data;
switch (msg.type) {
case "workerReady":
this.handleWorkerReady(msg.workerId);
break;
case "lobbyList":
this.workerLobbies.set(workerId, msg.lobbies);
break;
}
});
}
removeWorker(workerId: number) {
this.workers.delete(workerId);
this.workerLobbies.delete(workerId);
this.readyWorkers.delete(workerId);
}
private handleWorkerReady(workerId: number) {
this.readyWorkers.add(workerId);
this.log.info(
`Worker ${workerId} is ready. (${this.readyWorkers.size}/${this.config.numWorkers()} ready)`,
);
if (this.readyWorkers.size === this.config.numWorkers() && !this.started) {
this.started = true;
this.log.info("All workers ready, starting game scheduling");
startPolling(async () => this.broadcastLobbies(), 250);
startPolling(async () => await this.maybeScheduleLobby(), 1000);
}
}
private getAllLobbies(): PublicGameInfo[] {
const lobbies = Array.from(this.workerLobbies.values())
.flat()
.sort((a, b) => a.startsAt! - b.startsAt);
return lobbies;
}
private broadcastLobbies() {
const msg = {
type: "lobbiesBroadcast",
publicGames: {
serverTime: Date.now(),
games: this.getAllLobbies(),
},
} satisfies MasterLobbiesBroadcast;
for (const worker of this.workers.values()) {
worker.send(msg, (e) => {
if (e) {
this.log.error("Failed to send lobbies broadcast to worker:", e);
}
});
}
}
private async maybeScheduleLobby() {
const lobbies = this.getAllLobbies();
if (lobbies.length >= 2) {
return;
}
const lastStart = lobbies.reduce(
(max, pb) => Math.max(max, pb.startsAt),
Date.now(),
);
const gameID = generateID();
const workerId = this.config.workerIndex(gameID);
const gameConfig = await this.playlist.gameConfig();
const worker = this.workers.get(workerId);
if (!worker) {
this.log.error(`Worker ${workerId} not found`);
return;
}
worker.send(
{
type: "createGame",
gameID,
gameConfig,
startsAt: lastStart + this.config.gameCreationRate(),
} satisfies MasterCreateGame,
(e) => {
if (e) {
this.log.error("Failed to schedule lobby on worker:", e);
}
},
);
this.log.info(`Scheduled public game ${gameID} on worker ${workerId}`);
}
}
+15 -16
View File
@@ -30,6 +30,7 @@ import { MapPlaylist } from "./MapPlaylist";
import { startPolling } from "./PollingLoop";
import { PrivilegeRefresher } from "./PrivilegeRefresher";
import { verifyTurnstileToken } from "./Turnstile";
import { WorkerLobbyService } from "./WorkerLobbyService";
import { initWorkerMetrics } from "./WorkerMetrics";
const config = getServerConfigFromServer();
@@ -42,6 +43,18 @@ const playlist = new MapPlaylist(true);
export async function startWorker() {
log.info(`Worker starting...`);
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const app = express();
const server = http.createServer(app);
const wss = new WebSocketServer({ noServer: true });
const gm = new GameManager(config, log);
// Initialize lobby service (handles WebSocket upgrade routing)
const lobbyService = new WorkerLobbyService(server, wss, gm, log);
setTimeout(
() => {
startMatchmakingPolling(gm);
@@ -49,15 +62,6 @@ export async function startWorker() {
1000 + Math.random() * 2000,
);
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const app = express();
const server = http.createServer(app);
const wss = new WebSocketServer({ server });
const gm = new GameManager(config, log);
if (config.otelEnabled()) {
initWorkerMetrics(gm);
}
@@ -459,13 +463,8 @@ export async function startWorker() {
log.info(`running on http://localhost:${PORT}`);
log.info(`Handling requests with path prefix /w${workerId}/`);
// Signal to the master process that this worker is ready
if (process.send) {
process.send({
type: "WORKER_READY",
workerId: workerId,
});
log.info(`signaled ready state to master`);
}
lobbyService.sendReady(workerId);
log.info(`signaled ready state to master`);
});
// Global error handler
+136
View File
@@ -0,0 +1,136 @@
import http from "http";
import { WebSocket, WebSocketServer } from "ws";
import { PublicGameInfo, PublicGames } from "../core/Schemas";
import { GameManager } from "./GameManager";
import {
MasterMessageSchema,
WorkerLobbyList,
WorkerReady,
} from "./IPCBridgeSchema";
import { logger } from "./Logger";
export class WorkerLobbyService {
private readonly lobbiesWss: WebSocketServer;
private readonly lobbyClients: Set<WebSocket> = new Set();
constructor(
private readonly server: http.Server,
private readonly gameWss: WebSocketServer,
private readonly gm: GameManager,
private readonly log: typeof logger,
) {
this.lobbiesWss = new WebSocketServer({ noServer: true });
this.setupUpgradeHandler();
this.setupLobbiesWebSocket();
this.setupIPCListener();
}
private setupIPCListener() {
process.on("message", (raw: unknown) => {
const result = MasterMessageSchema.safeParse(raw);
if (!result.success) {
this.log.error("Invalid IPC message from master:", raw);
return;
}
const msg = result.data;
switch (msg.type) {
case "lobbiesBroadcast":
// Forward message to all clients
this.broadcastLobbiesToClients(msg.publicGames);
// Update master with my lobby info
this.sendMyLobbiesToMaster();
break;
case "createGame":
if (this.gm.game(msg.gameID) !== null) {
this.log.warn(`Game ${msg.gameID} already exists, skipping create`);
return;
}
this.log.info(`Creating public game ${msg.gameID} from master`);
this.gm.createGame(
msg.gameID,
msg.gameConfig,
undefined,
msg.startsAt,
);
break;
}
});
}
sendReady(workerId: number) {
const msg: WorkerReady = { type: "workerReady", workerId };
process.send?.(msg);
}
private sendMyLobbiesToMaster() {
const lobbies = this.gm
.publicLobbies()
.map((g) => g.gameInfo())
.map((gi) => {
return {
gameID: gi.gameID,
numClients: gi.clients?.length ?? 0,
startsAt: gi.startsAt!,
gameConfig: gi.gameConfig,
} satisfies PublicGameInfo;
});
process.send?.({ type: "lobbyList", lobbies } satisfies WorkerLobbyList);
}
private setupUpgradeHandler() {
this.server.on("upgrade", (request, socket, head) => {
const pathname = request.url ?? "";
if (pathname === "/lobbies" || pathname.endsWith("/lobbies")) {
this.lobbiesWss.handleUpgrade(request, socket, head, (ws) => {
this.lobbiesWss.emit("connection", ws, request);
});
} else {
this.gameWss.handleUpgrade(request, socket, head, (ws) => {
this.gameWss.emit("connection", ws, request);
});
}
});
}
private setupLobbiesWebSocket() {
this.lobbiesWss.on("connection", (ws: WebSocket) => {
this.lobbyClients.add(ws);
ws.on("close", () => {
this.lobbyClients.delete(ws);
});
ws.on("error", (error) => {
this.log.error(`Lobbies WebSocket error:`, error);
this.lobbyClients.delete(ws);
try {
if (
ws.readyState === WebSocket.OPEN ||
ws.readyState === WebSocket.CONNECTING
) {
ws.close(1011, "WebSocket internal error");
}
} catch (closeError) {
this.log.error("Error closing lobbies WebSocket:", closeError);
}
});
});
}
private broadcastLobbiesToClients(publicGames: PublicGames) {
const message = JSON.stringify(publicGames);
const clientsToRemove: WebSocket[] = [];
this.lobbyClients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
} else {
clientsToRemove.push(client);
}
});
clientsToRemove.forEach((client) => {
this.lobbyClients.delete(client);
});
}
}
-113
View File
@@ -1,113 +0,0 @@
import { PublicLobbySocket } from "../../src/client/LobbySocket";
class MockWebSocket extends EventTarget {
static instances: MockWebSocket[] = [];
static readonly OPEN = 1;
static readonly CLOSED = 3;
readonly url: string;
readyState = MockWebSocket.OPEN;
constructor(url: string) {
super();
this.url = url;
MockWebSocket.instances.push(this);
}
addEventListener(
type: string,
listener: EventListenerOrEventListenerObject,
options?: boolean | AddEventListenerOptions,
): void {
super.addEventListener(type, listener, options);
}
close(code?: number, reason?: string) {
this.readyState = MockWebSocket.CLOSED;
this.dispatchEvent(new CloseEvent("close", { code, reason }));
}
send(_data: unknown) {}
}
describe("PublicLobbySocket", () => {
const originalWebSocket = globalThis.WebSocket;
const originalFetch = globalThis.fetch;
beforeEach(() => {
MockWebSocket.instances = [];
// @ts-expect-error assign test mock
globalThis.WebSocket = MockWebSocket;
});
afterEach(() => {
globalThis.WebSocket = originalWebSocket;
globalThis.fetch = originalFetch;
vi.useRealTimers();
});
it("delivers lobby updates from websocket messages", () => {
const updates: unknown[][] = [];
const socket = new PublicLobbySocket((lobbies) => updates.push(lobbies));
socket.start();
const ws = MockWebSocket.instances.at(-1);
expect(ws?.url).toContain("/lobbies");
ws?.dispatchEvent(
new MessageEvent("message", {
data: JSON.stringify({
type: "lobbies_update",
data: {
lobbies: [
{
gameID: "g1",
numClients: 1,
gameConfig: {
maxPlayers: 2,
gameMode: 0,
gameMap: "Earth",
},
},
],
},
}),
}),
);
expect(updates).toHaveLength(1);
expect((updates[0][0] as { gameID: string }).gameID).toBe("g1");
socket.stop();
});
it("falls back to HTTP polling after max websocket attempts", async () => {
vi.useFakeTimers();
const fetchMock = vi.fn().mockResolvedValue({
ok: true,
json: async () => ({ lobbies: [] }),
});
globalThis.fetch = fetchMock as unknown as typeof fetch;
const socket = new PublicLobbySocket(() => {}, {
maxWsAttempts: 1,
reconnectDelay: 0,
pollIntervalMs: 50,
});
socket.start();
const ws = MockWebSocket.instances.at(-1);
ws?.dispatchEvent(new CloseEvent("close"));
await Promise.resolve();
expect(fetchMock).toHaveBeenCalledTimes(1);
vi.advanceTimersByTime(60);
await Promise.resolve();
expect(fetchMock).toHaveBeenCalledTimes(2);
socket.stop();
});
});