From 48609fa70a6a3e5e2777dc4e8ce43113fc94c2b2 Mon Sep 17 00:00:00 2001 From: Evan Date: Tue, 2 Jun 2026 15:52:14 -0700 Subject: [PATCH] Reduce lobby broadcast bandwidth via counts-only deltas (#4116) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description: - The lobby WebSocket broadcast (`/lobbies`) was re-sending the full `PublicGames` snapshot — including each lobby's `gameConfig` — to every connected client every 500ms. Almost nothing in that payload changes tick-to-tick; only `numClients` moves. - `WorkerLobbyService` now tracks the sorted set of `gameID`s it last sent as a full snapshot. On each incoming broadcast it sends a `full` only when that set changes; otherwise it sends a `counts` delta carrying just `{gameID → numClients}`. - This relies on the master-side coupling at [MasterLobbyService.ts:140-159](src/server/MasterLobbyService.ts#L140-L159): when master finds a lobby without `startsAt`, it both sets `startsAt` AND schedules a fresh lobby on the same tick, so the gameID change brings the `startsAt` (and `gameConfig`) along with it. - New WS connections are primed with the worker's cached last `full` so late joiners don't have to wait for the next structural change. - `LobbySocket` parses the new discriminated union (`PublicLobbyMessage = full | counts`), keeps the last full snapshot in memory, and merges counts into it before invoking the existing callback. `GameModeSelector` is unchanged. - Master → worker IPC is unchanged — still sends the full snapshot every 500ms. The optimization only applies to the worker → WS-client boundary, which is the fan-out point. ## Please complete the following: - [x] I have added screenshots for all UI updates - [x] I process any text displayed to the user through translateText() and I've added it to the en.json file - [x] I have added relevant tests to the test directory ## Please put your Discord username so you can be contacted if a bug or regression is found: evan --- src/client/LobbySocket.ts | 43 +++++++++- src/core/Schemas.ts | 22 +++++ src/server/WorkerLobbyService.ts | 63 +++++++++++++- tests/LobbySocket.test.ts | 138 +++++++++++++++++++++++++++++++ 4 files changed, 260 insertions(+), 6 deletions(-) create mode 100644 tests/LobbySocket.test.ts diff --git a/src/client/LobbySocket.ts b/src/client/LobbySocket.ts index 92202ff63..dbd114e04 100644 --- a/src/client/LobbySocket.ts +++ b/src/client/LobbySocket.ts @@ -1,5 +1,5 @@ import { ClientEnv } from "src/client/ClientEnv"; -import { PublicGames, PublicGamesSchema } from "../core/Schemas"; +import { PublicGames, PublicLobbyMessageSchema } from "../core/Schemas"; interface LobbySocketOptions { reconnectDelay?: number; @@ -19,6 +19,8 @@ export class PublicLobbySocket { private wsAttemptCounted = false; private workerPath: string = ""; private stopped = true; + // Latest full snapshot, used as the base for applying counts-only deltas. + private lastFull: PublicGames | null = null; private readonly reconnectDelay: number; private readonly maxWsAttempts: number; @@ -41,6 +43,7 @@ export class PublicLobbySocket { stop() { this.stopped = true; + this.lastFull = null; this.disconnectWebSocket(); } @@ -51,6 +54,9 @@ export class PublicLobbySocket { this.ws.close(); this.ws = null; } + // Drop any cached snapshot — the server primes new connections with a + // fresh full message, and a stale base could mis-merge incoming deltas. + this.lastFull = null; const protocol = window.location.protocol === "https:" ? "wss:" : "ws:"; const wsUrl = `${protocol}//${window.location.host}${this.workerPath}/lobbies`; @@ -78,10 +84,41 @@ export class PublicLobbySocket { private handleMessage(event: MessageEvent) { try { - const publicGames = PublicGamesSchema.parse( + const message = PublicLobbyMessageSchema.parse( JSON.parse(event.data as string), ); - this.onLobbiesUpdate(publicGames); + if (message.type === "full") { + this.lastFull = { + serverTime: message.serverTime, + games: message.games, + }; + this.onLobbiesUpdate(this.lastFull); + return; + } + // counts: patch numClients onto the last full snapshot. If we have no + // base yet (shouldn't happen — server primes on connect), ignore it + // and wait for the next full. + if (this.lastFull === null) { + return; + } + const patchedGames = { ...this.lastFull.games }; + for (const type of Object.keys(patchedGames) as Array< + keyof typeof patchedGames + >) { + const list = patchedGames[type]; + if (!list) continue; + patchedGames[type] = list.map((lobby) => { + const next = message.counts[lobby.gameID]; + return next === undefined || next === lobby.numClients + ? lobby + : { ...lobby, numClients: next }; + }); + } + this.lastFull = { + serverTime: message.serverTime, + games: patchedGames, + }; + this.onLobbiesUpdate(this.lastFull); } catch (error) { console.error("Error parsing WebSocket message:", error); if (this.ws && this.ws.readyState === WebSocket.OPEN) { diff --git a/src/core/Schemas.ts b/src/core/Schemas.ts index 39f207e19..f23516f38 100644 --- a/src/core/Schemas.ts +++ b/src/core/Schemas.ts @@ -183,6 +183,28 @@ export const PublicGamesSchema = z.object({ games: z.record(PublicGameTypeSchema, z.array(PublicGameInfoSchema)), }); +// Wire message sent from server to lobby WebSocket clients. +// "full" carries the complete snapshot; "counts" carries only the +// per-lobby player counts, which change far more often than the rest. +export const PublicLobbyFullSchema = z.object({ + type: z.literal("full"), + serverTime: z.number(), + games: z.record(PublicGameTypeSchema, z.array(PublicGameInfoSchema)), +}); + +export const PublicLobbyCountsSchema = z.object({ + type: z.literal("counts"), + serverTime: z.number(), + counts: z.record(z.string(), z.number()), +}); + +export const PublicLobbyMessageSchema = z.discriminatedUnion("type", [ + PublicLobbyFullSchema, + PublicLobbyCountsSchema, +]); + +export type PublicLobbyMessage = z.infer; + export class LobbyInfoEvent implements GameEvent { constructor( public lobby: GameInfo, diff --git a/src/server/WorkerLobbyService.ts b/src/server/WorkerLobbyService.ts index 3c5dab1d5..6e7f405a1 100644 --- a/src/server/WorkerLobbyService.ts +++ b/src/server/WorkerLobbyService.ts @@ -1,6 +1,10 @@ import http from "http"; import { WebSocket, WebSocketServer } from "ws"; -import { PublicGameInfo, PublicGames } from "../core/Schemas"; +import { + PublicGameInfo, + PublicGames, + PublicLobbyMessage, +} from "../core/Schemas"; import { GameManager } from "./GameManager"; import { MasterMessageSchema, @@ -12,6 +16,16 @@ import { logger } from "./Logger"; export class WorkerLobbyService { private readonly lobbiesWss: WebSocketServer; private readonly lobbyClients: Set = new Set(); + // Most recent snapshot from master, serialized on demand for new + // connections so they don't have to wait for the next broadcast. + private lastPublicGames: PublicGames | null = null; + // Sorted gameIDs of the last full we broadcast, or null if we've never + // broadcast one. When the set changes we send a fresh full; otherwise a + // counts-only delta is enough. This relies on master creating a new lobby + // whenever it sets startsAt on the previous one, so structural state + // (startsAt, gameConfig) rides along with a gameID change. Null (not "") + // is used so that an empty-lobby first broadcast still emits a full. + private lastFullGameIds: string | null = null; constructor( private readonly server: http.Server, @@ -39,6 +53,7 @@ export class WorkerLobbyService { const msg = result.data; switch (msg.type) { case "lobbiesBroadcast": + this.lastPublicGames = msg.publicGames; // Forward message to all clients this.broadcastLobbiesToClients(msg.publicGames); // Update master with my lobby info @@ -116,6 +131,17 @@ export class WorkerLobbyService { private setupLobbiesWebSocket() { this.lobbiesWss.on("connection", (ws: WebSocket) => { this.lobbyClients.add(ws); + // Prime the new client with the most recent snapshot — otherwise it + // would only see counts-only deltas (which it can't apply without a + // base) until the next structural change. + if (this.lastPublicGames !== null) { + const fullJson = JSON.stringify({ + type: "full", + serverTime: this.lastPublicGames.serverTime, + games: this.lastPublicGames.games, + } satisfies PublicLobbyMessage); + ws.send(fullJson); + } ws.on("message", () => { ws.terminate(); }); @@ -141,12 +167,43 @@ export class WorkerLobbyService { } private broadcastLobbiesToClients(publicGames: PublicGames) { - const message = JSON.stringify(publicGames); + const gameIds: string[] = []; + for (const list of Object.values(publicGames.games)) { + for (const lobby of list) { + gameIds.push(lobby.gameID); + } + } + gameIds.sort(); + const fingerprint = gameIds.join(","); + const shouldSendFull = fingerprint !== this.lastFullGameIds; + + let payload: PublicLobbyMessage; + if (shouldSendFull) { + payload = { + type: "full", + serverTime: publicGames.serverTime, + games: publicGames.games, + }; + this.lastFullGameIds = fingerprint; + } else { + const counts: Record = {}; + for (const list of Object.values(publicGames.games)) { + for (const lobby of list) { + counts[lobby.gameID] = lobby.numClients; + } + } + payload = { + type: "counts", + serverTime: publicGames.serverTime, + counts, + }; + } + const json = JSON.stringify(payload); const clientsToRemove: WebSocket[] = []; this.lobbyClients.forEach((client) => { if (client.readyState === WebSocket.OPEN) { - client.send(message); + client.send(json); } else { clientsToRemove.push(client); } diff --git a/tests/LobbySocket.test.ts b/tests/LobbySocket.test.ts new file mode 100644 index 000000000..21b88aa71 --- /dev/null +++ b/tests/LobbySocket.test.ts @@ -0,0 +1,138 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { PublicLobbySocket } from "../src/client/LobbySocket"; +import { + PublicGameInfo, + PublicGames, + PublicGameType, +} from "../src/core/Schemas"; + +function lobby( + gameID: string, + numClients: number, + publicGameType: PublicGameType = "ffa", +): PublicGameInfo { + return { gameID, numClients, publicGameType }; +} + +function fullMessage( + serverTime: number, + games: Partial>, +) { + return JSON.stringify({ + type: "full", + serverTime, + games: { ffa: [], team: [], special: [], ...games }, + }); +} + +function countsMessage(serverTime: number, counts: Record) { + return JSON.stringify({ type: "counts", serverTime, counts }); +} + +function makeSocket() { + const callback = vi.fn<(g: PublicGames) => void>(); + const socket = new PublicLobbySocket(callback); + const dispatch = (data: string) => { + (socket as any).handleMessage({ data } as MessageEvent); + }; + return { socket, callback, dispatch }; +} + +describe("PublicLobbySocket.handleMessage", () => { + beforeEach(() => { + vi.spyOn(console, "error").mockImplementation(() => {}); + }); + + it("delivers a full snapshot to the callback", () => { + const { callback, dispatch } = makeSocket(); + dispatch( + fullMessage(1000, { + ffa: [lobby("g1", 3)], + team: [lobby("g2", 5, "team")], + }), + ); + + expect(callback).toHaveBeenCalledTimes(1); + const arg = callback.mock.calls[0][0]; + expect(arg.serverTime).toBe(1000); + expect(arg.games.ffa).toEqual([lobby("g1", 3)]); + expect(arg.games.team).toEqual([lobby("g2", 5, "team")]); + }); + + it("patches numClients onto the last full snapshot when counts arrives", () => { + const { callback, dispatch } = makeSocket(); + dispatch(fullMessage(1000, { ffa: [lobby("g1", 3), lobby("g2", 4)] })); + callback.mockClear(); + + dispatch(countsMessage(1500, { g1: 7, g2: 4 })); + + expect(callback).toHaveBeenCalledTimes(1); + const arg = callback.mock.calls[0][0]; + expect(arg.serverTime).toBe(1500); + expect(arg.games.ffa).toEqual([lobby("g1", 7), lobby("g2", 4)]); + // Static fields (gameConfig, startsAt, publicGameType) survive the patch. + expect(arg.games.ffa?.[0].publicGameType).toBe("ffa"); + }); + + it("ignores counts arriving before any full snapshot", () => { + const { callback, dispatch } = makeSocket(); + dispatch(countsMessage(1000, { g1: 5 })); + expect(callback).not.toHaveBeenCalled(); + }); + + it("leaves lobbies whose gameID is absent from counts unchanged", () => { + const { callback, dispatch } = makeSocket(); + dispatch(fullMessage(1000, { ffa: [lobby("g1", 3), lobby("g2", 4)] })); + callback.mockClear(); + + dispatch(countsMessage(1500, { g1: 9 })); + + const arg = callback.mock.calls[0][0]; + expect(arg.games.ffa).toEqual([lobby("g1", 9), lobby("g2", 4)]); + }); + + it("applies consecutive counts deltas on top of the merged state", () => { + const { callback, dispatch } = makeSocket(); + dispatch(fullMessage(1000, { ffa: [lobby("g1", 1)] })); + dispatch(countsMessage(1500, { g1: 2 })); + dispatch(countsMessage(2000, { g1: 3 })); + + expect(callback).toHaveBeenCalledTimes(3); + expect(callback.mock.calls[2][0].games.ffa).toEqual([lobby("g1", 3)]); + expect(callback.mock.calls[2][0].serverTime).toBe(2000); + }); + + it("replaces lobby set when a fresh full snapshot arrives", () => { + const { callback, dispatch } = makeSocket(); + dispatch(fullMessage(1000, { ffa: [lobby("g1", 3)] })); + dispatch(fullMessage(2000, { ffa: [lobby("g2", 5)] })); + + const arg = callback.mock.calls[1][0]; + expect(arg.games.ffa).toEqual([lobby("g2", 5)]); + expect(arg.serverTime).toBe(2000); + }); + + it("does not call the callback on malformed JSON", () => { + const { callback, dispatch } = makeSocket(); + dispatch("not json"); + expect(callback).not.toHaveBeenCalled(); + }); + + it("does not call the callback on schema-invalid messages", () => { + const { callback, dispatch } = makeSocket(); + dispatch(JSON.stringify({ type: "bogus", serverTime: 1 })); + expect(callback).not.toHaveBeenCalled(); + }); + + it("does not mutate the previously-delivered snapshot when applying counts", () => { + const { callback, dispatch } = makeSocket(); + dispatch(fullMessage(1000, { ffa: [lobby("g1", 3)] })); + const prevSnapshot = callback.mock.calls[0][0]; + const prevFfa = prevSnapshot.games.ffa; + + dispatch(countsMessage(1500, { g1: 99 })); + + expect(prevSnapshot.serverTime).toBe(1000); + expect(prevFfa).toEqual([lobby("g1", 3)]); + }); +});