mirror of
https://github.com/openfrontio/OpenFrontIO.git
synced 2026-06-21 08:11:54 +00:00
Reduce lobby broadcast bandwidth via counts-only deltas (#4116)
## Description:
- The lobby WebSocket broadcast (`/lobbies`) was re-sending the full
`PublicGames` snapshot — including each lobby's `gameConfig` — to every
connected client every 500ms. Almost nothing in that payload changes
tick-to-tick; only `numClients` moves.
- `WorkerLobbyService` now tracks the sorted set of `gameID`s it last
sent as a full snapshot. On each incoming broadcast it sends a `full`
only when that set changes; otherwise it sends a `counts` delta carrying
just `{gameID → numClients}`.
- This relies on the master-side coupling at
[MasterLobbyService.ts:140-159](src/server/MasterLobbyService.ts#L140-L159):
when master finds a lobby without `startsAt`, it both sets `startsAt`
AND schedules a fresh lobby on the same tick, so the gameID change
brings the `startsAt` (and `gameConfig`) along with it.
- New WS connections are primed with the worker's cached last `full` so
late joiners don't have to wait for the next structural change.
- `LobbySocket` parses the new discriminated union (`PublicLobbyMessage
= full | counts`), keeps the last full snapshot in memory, and merges
counts into it before invoking the existing callback. `GameModeSelector`
is unchanged.
- Master → worker IPC is unchanged — still sends the full snapshot every
500ms. The optimization only applies to the worker → WS-client boundary,
which is the fan-out point.
## Please complete the following:
- [x] I have added screenshots for all UI updates
- [x] I process any text displayed to the user through translateText()
and I've added it to the en.json file
- [x] I have added relevant tests to the test directory
## Please put your Discord username so you can be contacted if a bug or
regression is found:
evan
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import { ClientEnv } from "src/client/ClientEnv";
|
||||
import { PublicGames, PublicGamesSchema } from "../core/Schemas";
|
||||
import { PublicGames, PublicLobbyMessageSchema } from "../core/Schemas";
|
||||
|
||||
interface LobbySocketOptions {
|
||||
reconnectDelay?: number;
|
||||
@@ -19,6 +19,8 @@ export class PublicLobbySocket {
|
||||
private wsAttemptCounted = false;
|
||||
private workerPath: string = "";
|
||||
private stopped = true;
|
||||
// Latest full snapshot, used as the base for applying counts-only deltas.
|
||||
private lastFull: PublicGames | null = null;
|
||||
|
||||
private readonly reconnectDelay: number;
|
||||
private readonly maxWsAttempts: number;
|
||||
@@ -41,6 +43,7 @@ export class PublicLobbySocket {
|
||||
|
||||
stop() {
|
||||
this.stopped = true;
|
||||
this.lastFull = null;
|
||||
this.disconnectWebSocket();
|
||||
}
|
||||
|
||||
@@ -51,6 +54,9 @@ export class PublicLobbySocket {
|
||||
this.ws.close();
|
||||
this.ws = null;
|
||||
}
|
||||
// Drop any cached snapshot — the server primes new connections with a
|
||||
// fresh full message, and a stale base could mis-merge incoming deltas.
|
||||
this.lastFull = null;
|
||||
|
||||
const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
|
||||
const wsUrl = `${protocol}//${window.location.host}${this.workerPath}/lobbies`;
|
||||
@@ -78,10 +84,41 @@ export class PublicLobbySocket {
|
||||
|
||||
private handleMessage(event: MessageEvent) {
|
||||
try {
|
||||
const publicGames = PublicGamesSchema.parse(
|
||||
const message = PublicLobbyMessageSchema.parse(
|
||||
JSON.parse(event.data as string),
|
||||
);
|
||||
this.onLobbiesUpdate(publicGames);
|
||||
if (message.type === "full") {
|
||||
this.lastFull = {
|
||||
serverTime: message.serverTime,
|
||||
games: message.games,
|
||||
};
|
||||
this.onLobbiesUpdate(this.lastFull);
|
||||
return;
|
||||
}
|
||||
// counts: patch numClients onto the last full snapshot. If we have no
|
||||
// base yet (shouldn't happen — server primes on connect), ignore it
|
||||
// and wait for the next full.
|
||||
if (this.lastFull === null) {
|
||||
return;
|
||||
}
|
||||
const patchedGames = { ...this.lastFull.games };
|
||||
for (const type of Object.keys(patchedGames) as Array<
|
||||
keyof typeof patchedGames
|
||||
>) {
|
||||
const list = patchedGames[type];
|
||||
if (!list) continue;
|
||||
patchedGames[type] = list.map((lobby) => {
|
||||
const next = message.counts[lobby.gameID];
|
||||
return next === undefined || next === lobby.numClients
|
||||
? lobby
|
||||
: { ...lobby, numClients: next };
|
||||
});
|
||||
}
|
||||
this.lastFull = {
|
||||
serverTime: message.serverTime,
|
||||
games: patchedGames,
|
||||
};
|
||||
this.onLobbiesUpdate(this.lastFull);
|
||||
} catch (error) {
|
||||
console.error("Error parsing WebSocket message:", error);
|
||||
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
|
||||
|
||||
@@ -183,6 +183,28 @@ export const PublicGamesSchema = z.object({
|
||||
games: z.record(PublicGameTypeSchema, z.array(PublicGameInfoSchema)),
|
||||
});
|
||||
|
||||
// Wire message sent from server to lobby WebSocket clients.
|
||||
// "full" carries the complete snapshot; "counts" carries only the
|
||||
// per-lobby player counts, which change far more often than the rest.
|
||||
export const PublicLobbyFullSchema = z.object({
|
||||
type: z.literal("full"),
|
||||
serverTime: z.number(),
|
||||
games: z.record(PublicGameTypeSchema, z.array(PublicGameInfoSchema)),
|
||||
});
|
||||
|
||||
export const PublicLobbyCountsSchema = z.object({
|
||||
type: z.literal("counts"),
|
||||
serverTime: z.number(),
|
||||
counts: z.record(z.string(), z.number()),
|
||||
});
|
||||
|
||||
export const PublicLobbyMessageSchema = z.discriminatedUnion("type", [
|
||||
PublicLobbyFullSchema,
|
||||
PublicLobbyCountsSchema,
|
||||
]);
|
||||
|
||||
export type PublicLobbyMessage = z.infer<typeof PublicLobbyMessageSchema>;
|
||||
|
||||
export class LobbyInfoEvent implements GameEvent {
|
||||
constructor(
|
||||
public lobby: GameInfo,
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
import http from "http";
|
||||
import { WebSocket, WebSocketServer } from "ws";
|
||||
import { PublicGameInfo, PublicGames } from "../core/Schemas";
|
||||
import {
|
||||
PublicGameInfo,
|
||||
PublicGames,
|
||||
PublicLobbyMessage,
|
||||
} from "../core/Schemas";
|
||||
import { GameManager } from "./GameManager";
|
||||
import {
|
||||
MasterMessageSchema,
|
||||
@@ -12,6 +16,16 @@ import { logger } from "./Logger";
|
||||
export class WorkerLobbyService {
|
||||
private readonly lobbiesWss: WebSocketServer;
|
||||
private readonly lobbyClients: Set<WebSocket> = new Set();
|
||||
// Most recent snapshot from master, serialized on demand for new
|
||||
// connections so they don't have to wait for the next broadcast.
|
||||
private lastPublicGames: PublicGames | null = null;
|
||||
// Sorted gameIDs of the last full we broadcast, or null if we've never
|
||||
// broadcast one. When the set changes we send a fresh full; otherwise a
|
||||
// counts-only delta is enough. This relies on master creating a new lobby
|
||||
// whenever it sets startsAt on the previous one, so structural state
|
||||
// (startsAt, gameConfig) rides along with a gameID change. Null (not "")
|
||||
// is used so that an empty-lobby first broadcast still emits a full.
|
||||
private lastFullGameIds: string | null = null;
|
||||
|
||||
constructor(
|
||||
private readonly server: http.Server,
|
||||
@@ -39,6 +53,7 @@ export class WorkerLobbyService {
|
||||
const msg = result.data;
|
||||
switch (msg.type) {
|
||||
case "lobbiesBroadcast":
|
||||
this.lastPublicGames = msg.publicGames;
|
||||
// Forward message to all clients
|
||||
this.broadcastLobbiesToClients(msg.publicGames);
|
||||
// Update master with my lobby info
|
||||
@@ -116,6 +131,17 @@ export class WorkerLobbyService {
|
||||
private setupLobbiesWebSocket() {
|
||||
this.lobbiesWss.on("connection", (ws: WebSocket) => {
|
||||
this.lobbyClients.add(ws);
|
||||
// Prime the new client with the most recent snapshot — otherwise it
|
||||
// would only see counts-only deltas (which it can't apply without a
|
||||
// base) until the next structural change.
|
||||
if (this.lastPublicGames !== null) {
|
||||
const fullJson = JSON.stringify({
|
||||
type: "full",
|
||||
serverTime: this.lastPublicGames.serverTime,
|
||||
games: this.lastPublicGames.games,
|
||||
} satisfies PublicLobbyMessage);
|
||||
ws.send(fullJson);
|
||||
}
|
||||
ws.on("message", () => {
|
||||
ws.terminate();
|
||||
});
|
||||
@@ -141,12 +167,43 @@ export class WorkerLobbyService {
|
||||
}
|
||||
|
||||
private broadcastLobbiesToClients(publicGames: PublicGames) {
|
||||
const message = JSON.stringify(publicGames);
|
||||
const gameIds: string[] = [];
|
||||
for (const list of Object.values(publicGames.games)) {
|
||||
for (const lobby of list) {
|
||||
gameIds.push(lobby.gameID);
|
||||
}
|
||||
}
|
||||
gameIds.sort();
|
||||
const fingerprint = gameIds.join(",");
|
||||
const shouldSendFull = fingerprint !== this.lastFullGameIds;
|
||||
|
||||
let payload: PublicLobbyMessage;
|
||||
if (shouldSendFull) {
|
||||
payload = {
|
||||
type: "full",
|
||||
serverTime: publicGames.serverTime,
|
||||
games: publicGames.games,
|
||||
};
|
||||
this.lastFullGameIds = fingerprint;
|
||||
} else {
|
||||
const counts: Record<string, number> = {};
|
||||
for (const list of Object.values(publicGames.games)) {
|
||||
for (const lobby of list) {
|
||||
counts[lobby.gameID] = lobby.numClients;
|
||||
}
|
||||
}
|
||||
payload = {
|
||||
type: "counts",
|
||||
serverTime: publicGames.serverTime,
|
||||
counts,
|
||||
};
|
||||
}
|
||||
const json = JSON.stringify(payload);
|
||||
|
||||
const clientsToRemove: WebSocket[] = [];
|
||||
this.lobbyClients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(message);
|
||||
client.send(json);
|
||||
} else {
|
||||
clientsToRemove.push(client);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,138 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { PublicLobbySocket } from "../src/client/LobbySocket";
|
||||
import {
|
||||
PublicGameInfo,
|
||||
PublicGames,
|
||||
PublicGameType,
|
||||
} from "../src/core/Schemas";
|
||||
|
||||
function lobby(
|
||||
gameID: string,
|
||||
numClients: number,
|
||||
publicGameType: PublicGameType = "ffa",
|
||||
): PublicGameInfo {
|
||||
return { gameID, numClients, publicGameType };
|
||||
}
|
||||
|
||||
function fullMessage(
|
||||
serverTime: number,
|
||||
games: Partial<Record<PublicGameType, PublicGameInfo[]>>,
|
||||
) {
|
||||
return JSON.stringify({
|
||||
type: "full",
|
||||
serverTime,
|
||||
games: { ffa: [], team: [], special: [], ...games },
|
||||
});
|
||||
}
|
||||
|
||||
function countsMessage(serverTime: number, counts: Record<string, number>) {
|
||||
return JSON.stringify({ type: "counts", serverTime, counts });
|
||||
}
|
||||
|
||||
function makeSocket() {
|
||||
const callback = vi.fn<(g: PublicGames) => void>();
|
||||
const socket = new PublicLobbySocket(callback);
|
||||
const dispatch = (data: string) => {
|
||||
(socket as any).handleMessage({ data } as MessageEvent);
|
||||
};
|
||||
return { socket, callback, dispatch };
|
||||
}
|
||||
|
||||
describe("PublicLobbySocket.handleMessage", () => {
|
||||
beforeEach(() => {
|
||||
vi.spyOn(console, "error").mockImplementation(() => {});
|
||||
});
|
||||
|
||||
it("delivers a full snapshot to the callback", () => {
|
||||
const { callback, dispatch } = makeSocket();
|
||||
dispatch(
|
||||
fullMessage(1000, {
|
||||
ffa: [lobby("g1", 3)],
|
||||
team: [lobby("g2", 5, "team")],
|
||||
}),
|
||||
);
|
||||
|
||||
expect(callback).toHaveBeenCalledTimes(1);
|
||||
const arg = callback.mock.calls[0][0];
|
||||
expect(arg.serverTime).toBe(1000);
|
||||
expect(arg.games.ffa).toEqual([lobby("g1", 3)]);
|
||||
expect(arg.games.team).toEqual([lobby("g2", 5, "team")]);
|
||||
});
|
||||
|
||||
it("patches numClients onto the last full snapshot when counts arrives", () => {
|
||||
const { callback, dispatch } = makeSocket();
|
||||
dispatch(fullMessage(1000, { ffa: [lobby("g1", 3), lobby("g2", 4)] }));
|
||||
callback.mockClear();
|
||||
|
||||
dispatch(countsMessage(1500, { g1: 7, g2: 4 }));
|
||||
|
||||
expect(callback).toHaveBeenCalledTimes(1);
|
||||
const arg = callback.mock.calls[0][0];
|
||||
expect(arg.serverTime).toBe(1500);
|
||||
expect(arg.games.ffa).toEqual([lobby("g1", 7), lobby("g2", 4)]);
|
||||
// Static fields (gameConfig, startsAt, publicGameType) survive the patch.
|
||||
expect(arg.games.ffa?.[0].publicGameType).toBe("ffa");
|
||||
});
|
||||
|
||||
it("ignores counts arriving before any full snapshot", () => {
|
||||
const { callback, dispatch } = makeSocket();
|
||||
dispatch(countsMessage(1000, { g1: 5 }));
|
||||
expect(callback).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("leaves lobbies whose gameID is absent from counts unchanged", () => {
|
||||
const { callback, dispatch } = makeSocket();
|
||||
dispatch(fullMessage(1000, { ffa: [lobby("g1", 3), lobby("g2", 4)] }));
|
||||
callback.mockClear();
|
||||
|
||||
dispatch(countsMessage(1500, { g1: 9 }));
|
||||
|
||||
const arg = callback.mock.calls[0][0];
|
||||
expect(arg.games.ffa).toEqual([lobby("g1", 9), lobby("g2", 4)]);
|
||||
});
|
||||
|
||||
it("applies consecutive counts deltas on top of the merged state", () => {
|
||||
const { callback, dispatch } = makeSocket();
|
||||
dispatch(fullMessage(1000, { ffa: [lobby("g1", 1)] }));
|
||||
dispatch(countsMessage(1500, { g1: 2 }));
|
||||
dispatch(countsMessage(2000, { g1: 3 }));
|
||||
|
||||
expect(callback).toHaveBeenCalledTimes(3);
|
||||
expect(callback.mock.calls[2][0].games.ffa).toEqual([lobby("g1", 3)]);
|
||||
expect(callback.mock.calls[2][0].serverTime).toBe(2000);
|
||||
});
|
||||
|
||||
it("replaces lobby set when a fresh full snapshot arrives", () => {
|
||||
const { callback, dispatch } = makeSocket();
|
||||
dispatch(fullMessage(1000, { ffa: [lobby("g1", 3)] }));
|
||||
dispatch(fullMessage(2000, { ffa: [lobby("g2", 5)] }));
|
||||
|
||||
const arg = callback.mock.calls[1][0];
|
||||
expect(arg.games.ffa).toEqual([lobby("g2", 5)]);
|
||||
expect(arg.serverTime).toBe(2000);
|
||||
});
|
||||
|
||||
it("does not call the callback on malformed JSON", () => {
|
||||
const { callback, dispatch } = makeSocket();
|
||||
dispatch("not json");
|
||||
expect(callback).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not call the callback on schema-invalid messages", () => {
|
||||
const { callback, dispatch } = makeSocket();
|
||||
dispatch(JSON.stringify({ type: "bogus", serverTime: 1 }));
|
||||
expect(callback).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not mutate the previously-delivered snapshot when applying counts", () => {
|
||||
const { callback, dispatch } = makeSocket();
|
||||
dispatch(fullMessage(1000, { ffa: [lobby("g1", 3)] }));
|
||||
const prevSnapshot = callback.mock.calls[0][0];
|
||||
const prevFfa = prevSnapshot.games.ffa;
|
||||
|
||||
dispatch(countsMessage(1500, { g1: 99 }));
|
||||
|
||||
expect(prevSnapshot.serverTime).toBe(1000);
|
||||
expect(prevFfa).toEqual([lobby("g1", 3)]);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user