heartbeat

This commit is contained in:
evanpelle
2025-06-04 16:53:17 -07:00
parent b249b364a0
commit f81fc9a0d7
14 changed files with 372 additions and 131 deletions
+1
View File
@@ -49,6 +49,7 @@ export interface LobbyConfig {
gameStartInfo?: GameStartInfo;
// GameRecord exists when replaying an archived game.
gameRecord?: GameRecord;
workerAddress: string;
}
export function joinLobby(
+9
View File
@@ -1,6 +1,7 @@
import page from "page";
import favicon from "../../resources/images/Favicon.svg";
import { GameRecord, GameStartInfo } from "../core/Schemas";
import { workerAddress } from "../core/Util";
import { getServerConfigFromClient } from "../core/configuration/ConfigLoader";
import { GameType } from "../core/game/Game";
import { UserSettings } from "../core/game/UserSettings";
@@ -272,11 +273,19 @@ class Client {
this.gameStop();
}
const config = await getServerConfigFromClient();
const subdomainResponse = await fetch("/api/domain");
if (!subdomainResponse.ok) {
throw new Error(
`Failed to fetch subdomain: ${subdomainResponse.status} ${subdomainResponse.statusText}`,
);
}
const { subdomain, domain } = await subdomainResponse.json();
this.gameStop = joinLobby(
{
gameID: lobby.gameID,
serverConfig: config,
workerAddress: workerAddress(lobby.gameID, subdomain, domain),
flag:
this.flagInput === null || this.flagInput.getCurrentFlag() === "xx"
? ""
+7 -5
View File
@@ -163,6 +163,8 @@ export class Transport {
private onmessage: (msg: ServerMessage) => void;
private pingInterval: number | null = null;
private workerAddress: string;
public readonly isLocal: boolean;
constructor(
private lobbyConfig: LobbyConfig,
@@ -174,6 +176,10 @@ export class Transport {
lobbyConfig.gameRecord !== undefined ||
lobbyConfig.gameStartInfo?.config.gameType === GameType.Singleplayer;
if (!this.isLocal) {
this.workerAddress = lobbyConfig.workerAddress;
}
this.eventBus.on(SendAllianceRequestIntentEvent, (e) =>
this.onSendAllianceRequest(e),
);
@@ -276,12 +282,8 @@ export class Transport {
) {
this.startPing();
this.killExistingSocket();
const wsHost = window.location.host;
const wsProtocol = window.location.protocol === "https:" ? "wss:" : "ws:";
const workerPath = this.lobbyConfig.serverConfig.workerPath(
this.lobbyConfig.gameID,
);
this.socket = new WebSocket(`${wsProtocol}//${wsHost}/${workerPath}`);
this.socket = new WebSocket(`${wsProtocol}//${this.workerAddress}`);
this.onconnect = onconnect;
this.onmessage = onmessage;
this.socket.onopen = () => {
+1 -4
View File
@@ -153,10 +153,7 @@ const EmojiSchema = z
.number()
.nonnegative()
.max(flattenedEmojiTable.length - 1);
const ID = z
.string()
.regex(/^[a-zA-Z0-9]+$/)
.length(8);
const ID = z.string().regex(/^[a-zA-Z0-9]+$/);
export const AllPlayersStatsSchema = z.record(ID, PlayerStatsSchema);
+12 -3
View File
@@ -247,10 +247,10 @@ export function assertNever(x: never): never {
throw new Error("Unexpected value: " + x);
}
export function generateID(): GameID {
export function generateID(length: number = 8): GameID {
const nanoid = customAlphabet(
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ",
8,
"123456789abcdefghijkmnopqrstuvwxyzABCDEFGHJKMNPQRSTUVWXYZ",
length,
);
return nanoid();
}
@@ -315,3 +315,12 @@ export const flattenedEmojiTable: string[] = emojiTable.flat();
export function replacer(_key: string, value: any): any {
return typeof value === "bigint" ? value.toString() : value;
}
export function workerAddress(
gameID: GameID,
subdomain: string,
domain: string,
): string {
const id = gameID.slice(4);
return `https://w${id}-${subdomain}.${domain}`;
}
+6
View File
@@ -49,6 +49,10 @@ export interface ServerConfig {
r2Endpoint(): string;
r2AccessKey(): string;
r2SecretKey(): string;
cloudflareAccountId(): string;
cloudflareApiToken(): string;
domain(): string;
subdomain(): string;
otelEndpoint(): string;
otelUsername(): string;
otelPassword(): string;
@@ -60,6 +64,8 @@ export interface ServerConfig {
subdomain(): string;
cloudflareAccountId(): string;
cloudflareApiToken(): string;
masterAddress(): string;
workerAddress(workerId: number): string;
}
export interface NukeMagnitude {
+14
View File
@@ -66,6 +66,20 @@ const numPlayersConfig = {
} as const satisfies Record<GameMapType, [number, number, number]>;
export abstract class DefaultServerConfig implements ServerConfig {
masterAddress(): string {
if (this.env() === GameEnv.Dev) {
return `http://localhost:3000`;
} else {
return `https://${this.subdomain()}.${this.domain()}`;
}
}
workerAddress(workerId: number): string {
if (this.env() === GameEnv.Dev) {
return `http://localhost:${3000 + workerId + 1}`;
} else {
return `https://w${workerId}-${this.subdomain()}.${this.domain()}`;
}
}
domain(): string {
return process.env.DOMAIN ?? "";
}
+12
View File
@@ -2,6 +2,7 @@ import { Logger } from "winston";
import { ServerConfig } from "../core/configuration/Config";
import { Difficulty, GameMapType, GameMode, GameType } from "../core/game/Game";
import { GameConfig, GameID } from "../core/Schemas";
import { generateID } from "../core/Util";
import { Client } from "./Client";
import { GamePhase, GameServer } from "./GameServer";
@@ -11,10 +12,21 @@ export class GameManager {
constructor(
private config: ServerConfig,
private log: Logger,
private workerIndex: number,
) {
setInterval(() => this.tick(), 1000);
}
public createGameID(): GameID {
for (let i = 0; i < 1000; i++) {
const id = generateID(4) + this.workerIndex;
if (!this.games.has(id)) {
return id;
}
}
throw new Error("Failed to create game ID");
}
public game(id: GameID): GameServer | null {
return this.games.get(id) ?? null;
}
+85 -101
View File
@@ -1,4 +1,3 @@
import cluster from "cluster";
import express from "express";
import rateLimit from "express-rate-limit";
import http from "http";
@@ -6,14 +5,14 @@ import path from "path";
import { fileURLToPath } from "url";
import { getServerConfigFromServer } from "../core/configuration/ConfigLoader";
import { GameInfo } from "../core/Schemas";
import { generateID } from "../core/Util";
import { gatekeeper, LimiterType } from "./Gatekeeper";
import { logger } from "./Logger";
import { MapPlaylist } from "./MapPlaylist";
import { WorkerDiscoveryService } from "./WorkerDiscoveryService";
const config = getServerConfigFromServer();
const playlist = new MapPlaylist();
const readyWorkers = new Set();
const workerDiscovery = new WorkerDiscoveryService();
const app = express();
const server = http.createServer(app);
@@ -61,94 +60,48 @@ app.use(
let publicLobbiesJsonStr = "";
const publicLobbyIDs: Set<string> = new Set();
interface PublicLobby {
gameID: string;
dns: string;
}
const publicLobbies: Map<string, PublicLobby> = new Map();
// Start the master process
export async function startMaster() {
if (!cluster.isPrimary) {
throw new Error(
"startMaster() should only be called in the primary process",
);
}
log.info(`Primary ${process.pid} is running`);
log.info(`Setting up ${config.numWorkers()} workers...`);
// Fork workers
for (let i = 0; i < config.numWorkers(); i++) {
const worker = cluster.fork({
WORKER_ID: i,
});
log.info(`Started worker ${i} (PID: ${worker.process.pid})`);
}
cluster.on("message", (worker, message) => {
if (message.type === "WORKER_READY") {
const workerId = message.workerId;
readyWorkers.add(workerId);
log.info(
`Worker ${workerId} is ready. (${readyWorkers.size}/${config.numWorkers()} ready)`,
);
// Start scheduling when all workers are ready
if (readyWorkers.size === config.numWorkers()) {
log.info("All workers ready, starting game scheduling");
const scheduleLobbies = () => {
schedulePublicGame(playlist).catch((error) => {
log.error("Error scheduling public game:", error);
});
};
setInterval(
() =>
fetchLobbies().then((lobbies) => {
if (lobbies === 0) {
scheduleLobbies();
}
}),
100,
);
}
}
});
// Handle worker crashes
cluster.on("exit", (worker, code, signal) => {
const workerId = (worker as any).process?.env?.WORKER_ID;
if (!workerId) {
log.error(`worker crashed could not find id`);
return;
}
log.warn(
`Worker ${workerId} (PID: ${worker.process.pid}) died with code: ${code} and signal: ${signal}`,
);
log.info(`Restarting worker ${workerId}...`);
// Restart the worker with the same ID
const newWorker = cluster.fork({
WORKER_ID: workerId,
});
log.info(
`Restarted worker ${workerId} (New PID: ${newWorker.process.pid})`,
);
});
const PORT = 3000;
server.listen(PORT, () => {
log.info(`Master HTTP server listening on port ${PORT}`);
});
const scheduleLobbies = () => {
schedulePublicGame(playlist).catch((error) => {
log.error("Error scheduling public game:", error);
});
};
// Wait for the workers to start
sleep(5 * 1000).then(() => {
setInterval(
() =>
fetchLobbies().then((lobbies) => {
if (lobbies === 0) {
scheduleLobbies();
}
}),
100, // TODO: set this back to 100
);
});
}
app.get(
"/api/env",
gatekeeper.httpHandler(LimiterType.Get, async (req, res) => {
const envConfig = {
res.status(200).json({
game_env: process.env.GAME_ENV || "prod",
};
res.json(envConfig);
subdomain: config.subdomain(),
domain: config.domain(),
});
}),
);
@@ -193,14 +146,46 @@ app.post(
}),
);
app.post(
"/api/worker_heartbeat",
gatekeeper.httpHandler(LimiterType.Post, async (req, res) => {
log.info(`Received heartbeat from ${req.body.dns}...`);
if (req.headers[config.adminHeader()] !== config.adminToken()) {
res.status(401).send("Unauthorized");
return;
}
const { workerId, dns, activeClients } = req.body;
if (!workerId || !dns || typeof activeClients !== "number") {
res.status(400).json({ error: "Missing required fields" });
return;
}
workerDiscovery.updateWorkerHeartbeat(workerId, dns, activeClients);
res.status(200).json({ success: true });
}),
);
app.get(
"/api/worker_address",
gatekeeper.httpHandler(LimiterType.Post, async (req, res) => {
const worker = workerDiscovery.getAvailableWorker();
if (!worker) {
res.status(500).json({ error: "No available workers" });
return;
}
res.status(200).json({ dns: worker.dns });
}),
);
async function fetchLobbies(): Promise<number> {
const fetchPromises: Promise<GameInfo | null>[] = [];
for (const gameID of new Set(publicLobbyIDs)) {
for (const lobby of publicLobbies.values()) {
const controller = new AbortController();
setTimeout(() => controller.abort(), 5000); // 5 second timeout
const port = config.workerPort(gameID);
const promise = fetch(`http://localhost:${port}/api/game/${gameID}`, {
const promise = fetch(`${lobby.dns}/api/game/${lobby.gameID}`, {
headers: { [config.adminHeader()]: config.adminToken() },
signal: controller.signal,
})
@@ -209,9 +194,9 @@ async function fetchLobbies(): Promise<number> {
return json as GameInfo;
})
.catch((error) => {
log.error(`Error fetching game ${gameID}:`, error);
log.error(`Error fetching game ${lobby.gameID}:`, error);
// Return null or a placeholder if fetch fails
publicLobbyIDs.delete(gameID);
publicLobbies.delete(lobby.gameID);
return null;
});
@@ -239,7 +224,7 @@ async function fetchLobbies(): Promise<number> {
l.msUntilStart !== undefined &&
l.msUntilStart <= 250
) {
publicLobbyIDs.delete(l.gameID);
publicLobbies.delete(l.gameID);
return;
}
@@ -252,7 +237,7 @@ async function fetchLobbies(): Promise<number> {
l.numClients !== undefined &&
l.gameConfig.maxPlayers <= l.numClients
) {
publicLobbyIDs.delete(l.gameID);
publicLobbies.delete(l.gameID);
return;
}
});
@@ -262,37 +247,36 @@ async function fetchLobbies(): Promise<number> {
lobbies: lobbyInfos,
});
return publicLobbyIDs.size;
return publicLobbies.size;
}
// Function to schedule a new public game
async function schedulePublicGame(playlist: MapPlaylist) {
const gameID = generateID();
publicLobbyIDs.add(gameID);
const workerPath = config.workerPath(gameID);
const dns = workerDiscovery.getAvailableWorker().dns;
log.info(`Scheduling public game on worker ${dns}...`);
// Send request to the worker to start the game
try {
const response = await fetch(
`http://localhost:${config.workerPort(gameID)}/api/create_game/${gameID}`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
[config.adminHeader()]: config.adminToken(),
},
body: JSON.stringify(playlist.gameConfig()),
const response = await fetch(`${dns}/api/create_game`, {
method: "POST",
headers: {
"Content-Type": "application/json",
[config.adminHeader()]: config.adminToken(),
},
);
body: JSON.stringify(playlist.gameConfig()),
});
if (!response.ok) {
throw new Error(`Failed to schedule public game: ${response.statusText}`);
}
const data = await response.json();
publicLobbies.set(data.gameID, {
gameID: data.gameID,
dns,
});
} catch (error) {
log.error(`Failed to schedule public game on worker ${workerPath}:`, error);
log.error(`Failed to schedule public game on worker ${dns}:`, error);
throw error;
}
}
+65 -7
View File
@@ -3,26 +3,30 @@ import * as dotenv from "dotenv";
import { GameEnv } from "../core/configuration/Config";
import { getServerConfigFromServer } from "../core/configuration/ConfigLoader";
import { Cloudflare, TunnelConfig } from "./Cloudflare";
import { logger } from "./Logger";
import { startMaster } from "./Master";
import { startWorker } from "./Worker";
const log = logger.child({
comp: "startup",
});
const config = getServerConfigFromServer();
dotenv.config();
// Main entry point of the application
async function main() {
// Check if this is the primary (master) process
if (cluster.isPrimary) {
if (config.env() !== GameEnv.Dev) {
setupTunnels();
}
console.log("Starting master process...");
await startMaster();
if (config.env() !== GameEnv.Dev) {
await setupTunnels();
}
await startWorkers();
} else {
// This is a worker process
console.log("Starting worker process...");
await startWorker();
console.log(`Starting worker process ${process.env.WORKER_ID}...`);
startWorker();
}
}
@@ -58,3 +62,57 @@ async function setupTunnels() {
await cloudflare.startCloudflared(tunnel.tunnelToken);
}
// Start the master process
export async function startWorkers() {
const readyWorkers = new Set();
log.info(`Primary ${process.pid} is running`);
log.info(`Setting up ${config.numWorkers()} workers...`);
// Fork workers
for (let i = 0; i < config.numWorkers(); i++) {
const worker = cluster.fork({
WORKER_ID: i,
});
log.info(`Started worker ${i} (PID: ${worker.process.pid})`);
}
cluster.on("message", (worker, message) => {
if (message.type === "WORKER_READY") {
const workerId = message.workerId;
readyWorkers.add(workerId);
log.info(
`Worker ${workerId} is ready. (${readyWorkers.size}/${config.numWorkers()} ready)`,
);
// Start scheduling when all workers are ready
if (readyWorkers.size === config.numWorkers()) {
log.info("All workers ready, starting game scheduling");
}
}
});
// Handle worker crashes
cluster.on("exit", (worker, code, signal) => {
const workerId = (worker as any).process?.env?.WORKER_ID;
if (!workerId) {
log.error(`worker crashed could not find id`);
return;
}
log.warn(
`Worker ${workerId} (PID: ${worker.process.pid}) died with code: ${code} and signal: ${signal}`,
);
log.info(`Restarting worker ${workerId}...`);
// Restart the worker with the same ID
const newWorker = cluster.fork({
WORKER_ID: workerId,
});
log.info(
`Restarted worker ${workerId} (New PID: ${newWorker.process.pid})`,
);
});
}
+49 -11
View File
@@ -26,8 +26,14 @@ import { initWorkerMetrics } from "./WorkerMetrics";
const config = getServerConfigFromServer();
const workerId = parseInt(process.env.WORKER_ID || "0");
const workerAddress = config.workerAddress(workerId);
const masterAddress = config.masterAddress();
const log = logger.child({ comp: `w_${workerId}` });
const isRunning = false;
// Worker setup
export function startWorker() {
log.info(`Worker starting...`);
@@ -39,7 +45,7 @@ export function startWorker() {
const server = http.createServer(app);
const wss = new WebSocketServer({ server });
const gm = new GameManager(config, log);
const gm = new GameManager(config, log, workerId);
if (config.env() === GameEnv.Prod && config.otelEnabled()) {
initWorkerMetrics(gm);
@@ -80,10 +86,41 @@ export function startWorker() {
}),
);
sendHeartbeat();
setInterval(async () => {
const jitter = Math.random() * 1000;
await new Promise((resolve) => setTimeout(resolve, jitter));
await sendHeartbeat();
}, 15 * 1000);
async function sendHeartbeat(): Promise<any> {
log.info(`Sending heartbeat to ${masterAddress}...`);
const response = await fetch(`${masterAddress}/api/worker_heartbeat`, {
method: "POST",
headers: {
[config.adminHeader()]: config.adminToken(),
"Content-Type": "application/json",
},
body: JSON.stringify({
workerId: workerId,
dns: workerAddress,
activeClients: gm.activeClients(),
}),
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
return response.json();
}
app.post(
"/api/create_game/:id",
"/api/create_game",
gatekeeper.httpHandler(LimiterType.Post, async (req, res) => {
const id = req.params.id;
console.log("create_game!!!!");
const id = gm.createGameID();
if (!id) {
log.warn(`cannot create game, id not found`);
return res.status(400).json({ error: "Game ID is required" });
@@ -91,6 +128,7 @@ export function startWorker() {
const clientIP = req.ip || req.socket.remoteAddress || "unknown";
const result = CreateGameInputSchema.safeParse(req.body);
if (!result.success) {
console.log("create_game error", result.error);
const error = z.prettifyError(result.error);
return res.status(400).json({ error });
}
@@ -106,14 +144,14 @@ export function startWorker() {
return res.status(401).send("Unauthorized");
}
// Double-check this worker should host this game
const expectedWorkerId = config.workerIndex(id);
if (expectedWorkerId !== workerId) {
log.warn(
`This game ${id} should be on worker ${expectedWorkerId}, but this is worker ${workerId}`,
);
return res.status(400).json({ error: "Worker, game id mismatch" });
}
// // Double-check this worker should host this game
// const expectedWorkerId = config.workerIndex(id);
// if (expectedWorkerId !== workerId) {
// log.warn(
// `This game ${id} should be on worker ${expectedWorkerId}, but this is worker ${workerId}`,
// );
// return res.status(400).json({ error: "Worker, game id mismatch" });
// }
const game = gm.createGame(id, gc);
+104
View File
@@ -0,0 +1,104 @@
import { PseudoRandom } from "../core/PseudoRandom";
interface WorkerInfo {
id: string;
dns: string;
activeClients: number;
lastHeartbeat: Date;
healthy: boolean;
}
// WorkerDiscoveryService - manages worker registry and load balancing
export class WorkerDiscoveryService {
private workers: Map<string, WorkerInfo> = new Map();
private readonly HEARTBEAT_TIMEOUT = 60000; // 60 seconds
private readonly CLEANUP_INTERVAL = 5000; // Check every 5 seconds
private readonly MAX_ACTIVE_CLIENTS = 500; // Can actually handle up to 1000, but we don't want to overload the workers
private readonly rand = new PseudoRandom(1);
constructor() {
// Periodically clean up dead workers
setInterval(() => this.cleanupDeadWorkers(), this.CLEANUP_INTERVAL);
}
// Worker sends heartbeat with current state
updateWorkerHeartbeat(
workerId: string,
dns: string,
activeClients: number,
): void {
const existingWorker = this.workers.get(workerId);
this.workers.set(workerId, {
id: workerId,
dns,
activeClients,
lastHeartbeat: new Date(),
healthy: true,
});
// Log if this is a new worker
if (!existingWorker) {
console.log(`New worker registered: ${workerId} at ${dns}`);
}
}
getAvailableWorker(): WorkerInfo {
let healthyWorkers = Array.from(this.workers.values())
.filter((w) => w.healthy && w.activeClients < this.MAX_ACTIVE_CLIENTS)
.sort((a, b) => {
// Sort by load percentage (ascending)
const loadA = a.activeClients / this.MAX_ACTIVE_CLIENTS;
const loadB = b.activeClients / this.MAX_ACTIVE_CLIENTS;
return loadA - loadB;
});
if (healthyWorkers.length === 0) {
healthyWorkers = Array.from(this.workers.values());
}
if (healthyWorkers.length === 0) {
throw new Error("No workers available");
}
return this.rand.randElement(healthyWorkers);
}
// Get specific worker info
getWorker(workerId: string): WorkerInfo | null {
const worker = this.workers.get(workerId);
return worker?.healthy ? worker : null;
}
// Remove dead workers
private cleanupDeadWorkers() {
const now = Date.now();
for (const [workerId, worker] of this.workers) {
const timeSinceHeartbeat = now - worker.lastHeartbeat.getTime();
if (timeSinceHeartbeat > this.HEARTBEAT_TIMEOUT && worker.healthy) {
// Mark as unhealthy first (soft delete)
worker.healthy = false;
console.log(
`Worker ${workerId} marked unhealthy (no heartbeat for ${timeSinceHeartbeat}ms)`,
);
} else if (timeSinceHeartbeat > this.HEARTBEAT_TIMEOUT * 3) {
// Hard delete after 30 seconds
this.workers.delete(workerId);
console.log(
`Worker ${workerId} removed (dead for ${timeSinceHeartbeat}ms)`,
);
}
}
}
// Manually remove a worker (for graceful shutdown)
removeWorker(workerId: string): boolean {
const existed = this.workers.has(workerId);
this.workers.delete(workerId);
if (existed) {
console.log(`Worker ${workerId} manually removed`);
}
return existed;
}
}
+6
View File
@@ -4,6 +4,12 @@ import { GameMapType } from "../../src/core/game/Game";
import { GameID } from "../../src/core/Schemas";
export class TestServerConfig implements ServerConfig {
masterAddress(): string {
throw new Error("Method not implemented.");
}
workerAddress(workerId: number): string {
throw new Error("Method not implemented.");
}
domain(): string {
throw new Error("Method not implemented.");
}
+1
View File
@@ -232,6 +232,7 @@ export default async (env, argv) => {
"/api/auth/callback",
"/api/auth/discord",
"/api/kick_player",
"/api/worker_address",
],
target: "http://localhost:3000",
secure: false,