Merge branch 'v30'

This commit is contained in:
evanpelle
2026-03-14 19:28:38 -07:00
21 changed files with 518 additions and 286 deletions
+72
View File
@@ -0,0 +1,72 @@
import { RateLimiter } from "limiter";
import { ClientID } from "../core/Schemas";
const INTENTS_PER_SECOND = 10;
const INTENTS_PER_MINUTE = 150;
const MAX_BYTES_PER_MINUTE = 25 * 1024; // 25KB/min per client
const MAX_INTENT_BYTES = 500; // intents are stored in turns, keep them small
export type RateLimitResult = "ok" | "limit" | "kick";
// Allow 3 winner messages per client since a player can rejoin and resend.
const MAX_WINNER_MSGS = 3;
interface ClientBucket {
perSecond: RateLimiter;
perMinute: RateLimiter;
bytesPerMinute: RateLimiter;
winnerMsgCount: number;
}
export class ClientMsgRateLimiter {
private buckets = new Map<ClientID, ClientBucket>();
check(clientID: ClientID, type: string, bytes: number): RateLimitResult {
const bucket = this.getOrCreate(clientID);
// Winner message contains stats for all players and can be large (100s of KB).
// It bypasses the byte rate limit but is strictly limited to one per client.
if (type === "winner") {
if (bucket.winnerMsgCount >= MAX_WINNER_MSGS) return "kick";
bucket.winnerMsgCount++;
return "ok";
}
// Intents are stored in turn history for the duration of the game, so
// oversized intents would accumulate and fill up server RAM.
if (type === "intent" && bytes > MAX_INTENT_BYTES) return "kick";
if (!bucket.bytesPerMinute.tryRemoveTokens(bytes)) return "kick";
if (
!bucket.perSecond.tryRemoveTokens(1) ||
!bucket.perMinute.tryRemoveTokens(1)
)
return "limit";
return "ok";
}
private getOrCreate(clientID: ClientID): ClientBucket {
const existing = this.buckets.get(clientID);
if (existing) {
return existing;
}
const bucket = {
perSecond: new RateLimiter({
tokensPerInterval: INTENTS_PER_SECOND,
interval: "second",
}),
perMinute: new RateLimiter({
tokensPerInterval: INTENTS_PER_MINUTE,
interval: "minute",
}),
bytesPerMinute: new RateLimiter({
tokensPerInterval: MAX_BYTES_PER_MINUTE,
interval: "minute",
}),
winnerMsgCount: 0,
};
this.buckets.set(clientID, bucket);
return bucket;
}
}
+52 -16
View File
@@ -26,6 +26,7 @@ import {
import { createPartialGameRecord, getClanTag } from "../core/Util";
import { archive, finalizeGameRecord } from "./Archive";
import { Client } from "./Client";
import { ClientMsgRateLimiter } from "./ClientMsgRateLimiter";
export enum GamePhase {
Lobby = "LOBBY",
Active = "ACTIVE",
@@ -34,10 +35,14 @@ export enum GamePhase {
const KICK_REASON_DUPLICATE_SESSION = "kick_reason.duplicate_session";
const KICK_REASON_LOBBY_CREATOR = "kick_reason.lobby_creator";
const KICK_REASON_TOO_MUCH_DATA = "kick_reason.too_much_data";
const KICK_REASON_INVALID_MESSAGE = "kick_reason.invalid_message";
export class GameServer {
private sentDesyncMessageClients = new Set<ClientID>();
private intentRateLimiter = new ClientMsgRateLimiter();
private maxGameDuration = 3 * 60 * 60 * 1000; // 3 hours
private disconnectedTimeout = 1 * 30 * 1000; // 30 seconds
@@ -51,6 +56,7 @@ export class GameServer {
private clientsDisconnectedStatus: Map<ClientID, boolean> = new Map();
private _hasStarted = false;
private _startTime: number | null = null;
private hasReachedMaxPlayerCount: boolean = false;
private endTurnIntervalID: ReturnType<typeof setInterval> | undefined;
@@ -247,6 +253,10 @@ export class GameServer {
this.addListeners(client);
this.startLobbyInfoBroadcast();
if (this.activeClients.length >= (this.gameConfig.maxPlayers ?? Infinity)) {
this.hasReachedMaxPlayerCount = true;
}
// In case a client joined the game late and missed the start message.
if (this._hasStarted) {
this.sendStartGameMsg(client.ws, 0);
@@ -306,22 +316,48 @@ export class GameServer {
client.ws.removeAllListeners("message");
client.ws.on("message", async (message: string) => {
try {
const parsed = ClientMessageSchema.safeParse(JSON.parse(message));
if (!parsed.success) {
const error = z.prettifyError(parsed.error);
this.log.warn(`Failed to parse client message ${error}`, {
let json: unknown;
try {
json = JSON.parse(message);
} catch (e) {
this.log.warn(`Failed to parse client message JSON, kicking`, {
clientID: client.clientID,
error: String(e),
});
client.ws.send(
JSON.stringify({
type: "error",
error,
message: `Server could not parse message from client: ${message}`,
} satisfies ServerErrorMessage),
);
this.kickClient(client.clientID, KICK_REASON_INVALID_MESSAGE);
return;
}
const parsed = ClientMessageSchema.safeParse(json);
if (!parsed.success) {
this.log.warn(`Failed to parse client message, kicking`, {
clientID: client.clientID,
error: z.prettifyError(parsed.error),
});
this.kickClient(client.clientID, KICK_REASON_INVALID_MESSAGE);
return;
}
const clientMsg = parsed.data;
const bytes = Buffer.byteLength(message, "utf8");
const rateResult = this.intentRateLimiter.check(
client.clientID,
clientMsg.type,
bytes,
);
if (rateResult === "kick") {
this.log.warn(`Client rate limit exceeded, kicking`, {
clientID: client.clientID,
type: clientMsg.type,
});
this.kickClient(client.clientID, KICK_REASON_TOO_MUCH_DATA);
return;
}
if (rateResult === "limit") {
this.log.warn(`Client message rate limit exceeded, dropping`, {
clientID: client.clientID,
type: clientMsg.type,
});
return;
}
switch (clientMsg.type) {
case "rejoin": {
// Client is already connected, no auth required, send start game message if game has started
@@ -813,11 +849,11 @@ export class GameServer {
// Public Games
const lessThanLifetime = this.startsAt ? Date.now() < this.startsAt : true;
const notEnoughPlayers =
this.gameConfig.gameType === GameType.Public &&
this.gameConfig.maxPlayers &&
this.activeClients.length < this.gameConfig.maxPlayers;
if (lessThanLifetime && notEnoughPlayers) {
if (
lessThanLifetime &&
!this.hasStarted() &&
!this.hasReachedMaxPlayerCount
) {
return GamePhase.Lobby;
}
const warmupOver = now > this.startsAt! + 30 * 1000;
+21 -8
View File
@@ -75,7 +75,7 @@ export class MasterLobbyService {
if (this.readyWorkers.size === this.config.numWorkers() && !this.started) {
this.started = true;
this.log.info("All workers ready, starting game scheduling");
startPolling(async () => this.broadcastLobbies(), 250);
startPolling(async () => this.broadcastLobbies(), 500);
startPolling(async () => await this.maybeScheduleLobby(), 1000);
}
}
@@ -117,10 +117,14 @@ export class MasterLobbyService {
games: this.getAllLobbies(),
},
} satisfies MasterLobbiesBroadcast;
for (const worker of this.workers.values()) {
for (const [workerId, worker] of this.workers.entries()) {
worker.send(msg, (e) => {
if (e) {
this.log.error("Failed to send lobbies broadcast to worker:", e);
this.log.error(
`Failed to send lobbies broadcast to worker ${workerId}, killing worker:`,
e,
);
worker.kill();
}
});
}
@@ -131,12 +135,13 @@ export class MasterLobbyService {
for (const type of Object.keys(lobbiesByType) as PublicGameType[]) {
const lobbies = lobbiesByType[type];
if (lobbies.length >= 2) {
continue;
}
// Always ensure the next lobby has a timer, even if we already have 2+
// lobbies. This prevents a race where two lobbies are created before
// either receives a startsAt (IPC round-trip delay), leaving both stuck
// without a countdown.
const nextLobby = lobbies[0];
if (nextLobby && nextLobby.startsAt === undefined) {
// The previous game has started, so we need to set the timer on the next game.
this.sendMessageToWorker({
type: "updateLobby",
gameID: nextLobby.gameID,
@@ -144,6 +149,10 @@ export class MasterLobbyService {
});
}
if (lobbies.length >= 2) {
continue;
}
this.sendMessageToWorker({
type: "createGame",
gameID: generateID(),
@@ -162,7 +171,11 @@ export class MasterLobbyService {
}
worker.send(msg, (e) => {
if (e) {
this.log.error("Failed to send message to worker:", e);
this.log.error(
`Failed to send message to worker ${workerId}, killing worker:`,
e,
);
worker.kill();
}
});
}
+4 -1
View File
@@ -48,7 +48,10 @@ export async function startWorker() {
const app = express();
app.use(express.json({ limit: "5mb" }));
const server = http.createServer(app);
const wss = new WebSocketServer({ noServer: true });
const wss = new WebSocketServer({
noServer: true,
maxPayload: 2 * 1024 * 1024,
});
const gm = new GameManager(config, log);
+7 -1
View File
@@ -19,7 +19,10 @@ export class WorkerLobbyService {
private readonly gm: GameManager,
private readonly log: typeof logger,
) {
this.lobbiesWss = new WebSocketServer({ noServer: true });
this.lobbiesWss = new WebSocketServer({
noServer: true,
maxPayload: 256 * 1024,
});
this.setupUpgradeHandler();
this.setupLobbiesWebSocket();
this.setupIPCListener();
@@ -109,6 +112,9 @@ export class WorkerLobbyService {
private setupLobbiesWebSocket() {
this.lobbiesWss.on("connection", (ws: WebSocket) => {
this.lobbyClients.add(ws);
ws.on("message", () => {
ws.terminate();
});
ws.on("close", () => {
this.lobbyClients.delete(ws);
});