Files
OpenFrontIO/src/server/WorkerLobbyService.ts
T
Evan 5fb7f75f3d Server-side WebSocket message rate limiting & size enforcement (#3424)
## Description:

* Adds ClientMsgRateLimiter — a per-client token-bucket rate limiter
that gates all incoming WebSocket messages. Returns "ok", "limit"
(drop), or "kick" based on the violation type.

* Intent messages are capped at 500 bytes each (they are stored in turn
history for the game duration, so oversized intents
accumulate in server RAM). Violations kick the client.

* Winner messages bypass the byte rate limit (they include stats for all
players and can be 100s of KB) but are strictly capped at one per client
— a second winner message kicks the client.

* All other messages go through the standard per-second (10/s) and
per-minute (150/min) rate limits. Violations drop the message; byte
budget exhaustion kicks the client.

* WebSocket maxPayload set to 2 MB on game workers.
Invalid (unparseable) messages now immediately kick the client rather
than being silently dropped.
Unit tests added for all rate limiting behaviors.

## Please complete the following:

- [x] I have added screenshots for all UI updates
- [x] I process any text displayed to the user through translateText()
and I've added it to the en.json file
- [x] I have added relevant tests to the test directory
- [x] I confirm I have thoroughly tested these changes and take full
responsibility for any bugs introduced

## Please put your Discord username so you can be contacted if a bug or
regression is found:

evan
2026-03-13 21:15:10 -07:00

156 lines
4.4 KiB
TypeScript

import http from "http";
import { WebSocket, WebSocketServer } from "ws";
import { PublicGameInfo, PublicGames } from "../core/Schemas";
import { GameManager } from "./GameManager";
import {
MasterMessageSchema,
WorkerLobbyList,
WorkerReady,
} from "./IPCBridgeSchema";
import { logger } from "./Logger";
export class WorkerLobbyService {
private readonly lobbiesWss: WebSocketServer;
private readonly lobbyClients: Set<WebSocket> = new Set();
constructor(
private readonly server: http.Server,
private readonly gameWss: WebSocketServer,
private readonly gm: GameManager,
private readonly log: typeof logger,
) {
this.lobbiesWss = new WebSocketServer({
noServer: true,
maxPayload: 256 * 1024,
});
this.setupUpgradeHandler();
this.setupLobbiesWebSocket();
this.setupIPCListener();
}
private setupIPCListener() {
process.on("message", (raw: unknown) => {
const result = MasterMessageSchema.safeParse(raw);
if (!result.success) {
this.log.error("Invalid IPC message from master:", raw);
return;
}
const msg = result.data;
switch (msg.type) {
case "lobbiesBroadcast":
// Forward message to all clients
this.broadcastLobbiesToClients(msg.publicGames);
// Update master with my lobby info
this.sendMyLobbiesToMaster();
break;
case "createGame":
if (this.gm.game(msg.gameID) !== null) {
this.log.warn(`Game ${msg.gameID} already exists, skipping create`);
return;
}
this.log.info(`Creating public game ${msg.gameID} from master`);
this.gm.createGame(
msg.gameID,
msg.gameConfig,
undefined,
undefined,
msg.publicGameType,
);
break;
case "updateLobby": {
const game = this.gm.game(msg.gameID);
if (!game) {
this.log.warn("cannot update game, not found", {
gameID: msg.gameID,
});
return;
}
game.setStartsAt(msg.startsAt);
break;
}
}
});
}
sendReady(workerId: number) {
const msg: WorkerReady = { type: "workerReady", workerId };
process.send?.(msg);
}
private sendMyLobbiesToMaster() {
const lobbies = this.gm
.publicLobbies()
.map((g) => g.gameInfo())
.map((gi) => {
return {
gameID: gi.gameID,
numClients: gi.clients?.length ?? 0,
startsAt: gi.startsAt,
gameConfig: gi.gameConfig,
publicGameType: gi.publicGameType!,
} satisfies PublicGameInfo;
});
process.send?.({ type: "lobbyList", lobbies } satisfies WorkerLobbyList);
}
private setupUpgradeHandler() {
this.server.on("upgrade", (request, socket, head) => {
const pathname = request.url ?? "";
if (pathname === "/lobbies" || pathname.endsWith("/lobbies")) {
this.lobbiesWss.handleUpgrade(request, socket, head, (ws) => {
this.lobbiesWss.emit("connection", ws, request);
});
} else {
this.gameWss.handleUpgrade(request, socket, head, (ws) => {
this.gameWss.emit("connection", ws, request);
});
}
});
}
private setupLobbiesWebSocket() {
this.lobbiesWss.on("connection", (ws: WebSocket) => {
this.lobbyClients.add(ws);
ws.on("message", () => {
ws.terminate();
});
ws.on("close", () => {
this.lobbyClients.delete(ws);
});
ws.on("error", (error) => {
this.log.error(`Lobbies WebSocket error:`, error);
this.lobbyClients.delete(ws);
try {
if (
ws.readyState === WebSocket.OPEN ||
ws.readyState === WebSocket.CONNECTING
) {
ws.close(1011, "WebSocket internal error");
}
} catch (closeError) {
this.log.error("Error closing lobbies WebSocket:", closeError);
}
});
});
}
private broadcastLobbiesToClients(publicGames: PublicGames) {
const message = JSON.stringify(publicGames);
const clientsToRemove: WebSocket[] = [];
this.lobbyClients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
} else {
clientsToRemove.push(client);
}
});
clientsToRemove.forEach((client) => {
this.lobbyClients.delete(client);
});
}
}