Files
OpenFrontIO/src/server/WorkerLobbyService.ts
T
Evan 48609fa70a Reduce lobby broadcast bandwidth via counts-only deltas (#4116)
## Description:

- The lobby WebSocket broadcast (`/lobbies`) was re-sending the full
`PublicGames` snapshot — including each lobby's `gameConfig` — to every
connected client every 500ms. Almost nothing in that payload changes
tick-to-tick; only `numClients` moves.
- `WorkerLobbyService` now tracks the sorted set of `gameID`s it last
sent as a full snapshot. On each incoming broadcast it sends a `full`
only when that set changes; otherwise it sends a `counts` delta carrying
just `{gameID → numClients}`.
- This relies on the master-side coupling at
[MasterLobbyService.ts:140-159](src/server/MasterLobbyService.ts#L140-L159):
when master finds a lobby without `startsAt`, it both sets `startsAt`
AND schedules a fresh lobby on the same tick, so the gameID change
brings the `startsAt` (and `gameConfig`) along with it.
- New WS connections are primed with the worker's cached last `full` so
late joiners don't have to wait for the next structural change.
- `LobbySocket` parses the new discriminated union (`PublicLobbyMessage
= full | counts`), keeps the last full snapshot in memory, and merges
counts into it before invoking the existing callback. `GameModeSelector`
is unchanged.
- Master → worker IPC is unchanged — still sends the full snapshot every
500ms. The optimization only applies to the worker → WS-client boundary,
which is the fan-out point.

## Please complete the following:

- [x] I have added screenshots for all UI updates
- [x] I process any text displayed to the user through translateText()
and I've added it to the en.json file
- [x] I have added relevant tests to the test directory

## Please put your Discord username so you can be contacted if a bug or
regression is found:

evan
2026-06-02 15:52:14 -07:00

217 lines
6.6 KiB
TypeScript

import http from "http";
import { WebSocket, WebSocketServer } from "ws";
import {
PublicGameInfo,
PublicGames,
PublicLobbyMessage,
} from "../core/Schemas";
import { GameManager } from "./GameManager";
import {
MasterMessageSchema,
WorkerLobbyList,
WorkerReady,
} from "./IPCBridgeSchema";
import { logger } from "./Logger";
export class WorkerLobbyService {
private readonly lobbiesWss: WebSocketServer;
private readonly lobbyClients: Set<WebSocket> = new Set();
// Most recent snapshot from master, serialized on demand for new
// connections so they don't have to wait for the next broadcast.
private lastPublicGames: PublicGames | null = null;
// Sorted gameIDs of the last full we broadcast, or null if we've never
// broadcast one. When the set changes we send a fresh full; otherwise a
// counts-only delta is enough. This relies on master creating a new lobby
// whenever it sets startsAt on the previous one, so structural state
// (startsAt, gameConfig) rides along with a gameID change. Null (not "")
// is used so that an empty-lobby first broadcast still emits a full.
private lastFullGameIds: string | null = null;
constructor(
private readonly server: http.Server,
private readonly gameWss: WebSocketServer,
private readonly gm: GameManager,
private readonly log: typeof logger,
) {
this.lobbiesWss = new WebSocketServer({
noServer: true,
maxPayload: 256 * 1024,
});
this.setupUpgradeHandler();
this.setupLobbiesWebSocket();
this.setupIPCListener();
}
private setupIPCListener() {
process.on("message", (raw: unknown) => {
const result = MasterMessageSchema.safeParse(raw);
if (!result.success) {
this.log.error("Invalid IPC message from master:", raw);
return;
}
const msg = result.data;
switch (msg.type) {
case "lobbiesBroadcast":
this.lastPublicGames = msg.publicGames;
// Forward message to all clients
this.broadcastLobbiesToClients(msg.publicGames);
// Update master with my lobby info
this.sendMyLobbiesToMaster();
break;
case "createGame": {
if (this.gm.game(msg.gameID) !== null) {
this.log.warn(`Game ${msg.gameID} already exists, skipping create`);
return;
}
this.log.info(`Creating public game ${msg.gameID} from master`);
const game = this.gm.createGame(
msg.gameID,
msg.gameConfig,
undefined,
undefined,
msg.publicGameType,
);
if (game === null) {
this.log.warn(`Game ${msg.gameID} already exists, skipping create`);
}
break;
}
case "updateLobby": {
const game = this.gm.game(msg.gameID);
if (!game) {
this.log.warn("cannot update game, not found", {
gameID: msg.gameID,
});
return;
}
game.setStartsAt(msg.startsAt);
break;
}
}
});
}
sendReady(workerId: number) {
const msg: WorkerReady = { type: "workerReady", workerId };
process.send?.(msg);
}
private sendMyLobbiesToMaster() {
const lobbies = this.gm
.publicLobbies()
.map((g) => g.gameInfo())
.map((gi) => {
return {
gameID: gi.gameID,
numClients: gi.clients?.length ?? 0,
startsAt: gi.startsAt,
gameConfig: gi.gameConfig,
publicGameType: gi.publicGameType!,
} satisfies PublicGameInfo;
});
process.send?.({ type: "lobbyList", lobbies } satisfies WorkerLobbyList);
}
private setupUpgradeHandler() {
this.server.on("upgrade", (request, socket, head) => {
const pathname = request.url ?? "";
if (pathname === "/lobbies" || pathname.endsWith("/lobbies")) {
this.lobbiesWss.handleUpgrade(request, socket, head, (ws) => {
this.lobbiesWss.emit("connection", ws, request);
});
} else {
this.gameWss.handleUpgrade(request, socket, head, (ws) => {
this.gameWss.emit("connection", ws, request);
});
}
});
}
private setupLobbiesWebSocket() {
this.lobbiesWss.on("connection", (ws: WebSocket) => {
this.lobbyClients.add(ws);
// Prime the new client with the most recent snapshot — otherwise it
// would only see counts-only deltas (which it can't apply without a
// base) until the next structural change.
if (this.lastPublicGames !== null) {
const fullJson = JSON.stringify({
type: "full",
serverTime: this.lastPublicGames.serverTime,
games: this.lastPublicGames.games,
} satisfies PublicLobbyMessage);
ws.send(fullJson);
}
ws.on("message", () => {
ws.terminate();
});
ws.on("close", () => {
this.lobbyClients.delete(ws);
});
ws.on("error", (error) => {
this.log.error(`Lobbies WebSocket error:`, error);
this.lobbyClients.delete(ws);
try {
if (
ws.readyState === WebSocket.OPEN ||
ws.readyState === WebSocket.CONNECTING
) {
ws.close(1011, "WebSocket internal error");
}
} catch (closeError) {
this.log.error("Error closing lobbies WebSocket:", closeError);
}
});
});
}
private broadcastLobbiesToClients(publicGames: PublicGames) {
const gameIds: string[] = [];
for (const list of Object.values(publicGames.games)) {
for (const lobby of list) {
gameIds.push(lobby.gameID);
}
}
gameIds.sort();
const fingerprint = gameIds.join(",");
const shouldSendFull = fingerprint !== this.lastFullGameIds;
let payload: PublicLobbyMessage;
if (shouldSendFull) {
payload = {
type: "full",
serverTime: publicGames.serverTime,
games: publicGames.games,
};
this.lastFullGameIds = fingerprint;
} else {
const counts: Record<string, number> = {};
for (const list of Object.values(publicGames.games)) {
for (const lobby of list) {
counts[lobby.gameID] = lobby.numClients;
}
}
payload = {
type: "counts",
serverTime: publicGames.serverTime,
counts,
};
}
const json = JSON.stringify(payload);
const clientsToRemove: WebSocket[] = [];
this.lobbyClients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(json);
} else {
clientsToRemove.push(client);
}
});
clientsToRemove.forEach((client) => {
this.lobbyClients.delete(client);
});
}
}