diff --git a/src/client/ClientGameRunner.ts b/src/client/ClientGameRunner.ts index 829053201..b1c90180b 100644 --- a/src/client/ClientGameRunner.ts +++ b/src/client/ClientGameRunner.ts @@ -385,15 +385,6 @@ export class ClientGameRunner { } }); - const worker = this.worker; - const keepWorkerAlive = () => { - if (this.isActive) { - worker.sendHeartbeat(); - requestAnimationFrame(keepWorkerAlive); - } - }; - requestAnimationFrame(keepWorkerAlive); - const onconnect = () => { console.log("Connected to game server!"); this.transport.rejoinGame(this.turnsSeen); diff --git a/src/core/worker/Worker.worker.ts b/src/core/worker/Worker.worker.ts index 808b28088..b34e68c82 100644 --- a/src/core/worker/Worker.worker.ts +++ b/src/core/worker/Worker.worker.ts @@ -16,32 +16,110 @@ import { const ctx: Worker = self as any; let gameRunner: Promise | null = null; const mapLoader = new FetchGameMapLoader(`/maps`, version); -const MAX_TICKS_PER_HEARTBEAT = 4; +// Yield threshold; not a backlog cap. Used to avoid monopolizing the worker task +// and flooding the main thread with messages during catch-up. +const MAX_TICKS_BEFORE_YIELD = 4; -function gameUpdate(gu: GameUpdateViewData | ErrorUpdate) { - // skip if ErrorUpdate - if (!("updates" in gu)) { +let drainScheduled = false; +let draining = false; +let drainRequested = false; + +function scheduleDrain(): void { + drainRequested = true; + if (drainScheduled || draining) { return; } - sendMessage({ - type: "game_update", - gameUpdate: gu, - }); + drainScheduled = true; + setTimeout(() => { + void drain().catch((e) => { + console.error("Worker drain failed:", e); + }); + }, 0); +} + +async function drain(): Promise { + drainScheduled = false; + if (draining) { + return; + } + if (!gameRunner) { + return; + } + + draining = true; + drainRequested = false; + let shouldContinue = false; + try { + const gr = await gameRunner; + if (!gr) { + return; + } + + const batch: GameUpdateViewData[] = []; + const onTickUpdate = (gu: GameUpdateViewData | ErrorUpdate) => { + if (!("updates" in gu)) { + return; + } + batch.push(gu); + }; + + // Temporarily route tick callbacks into this drain's batch. + tickUpdateSink = onTickUpdate; + + let ticksRun = 0; + while (ticksRun < MAX_TICKS_BEFORE_YIELD && gr.pendingTurns() > 0) { + const ok = gr.executeNextTick(gr.pendingTurns()); + if (!ok) { + break; + } + ticksRun++; + } + + tickUpdateSink = null; + + sendGameUpdateBatch(batch); + + shouldContinue = gr.pendingTurns() > 0; + } finally { + tickUpdateSink = null; + draining = false; + } + + if (shouldContinue || drainRequested) { + scheduleDrain(); + } +} + +let tickUpdateSink: ((gu: GameUpdateViewData | ErrorUpdate) => void) | null = + null; + +function gameUpdate(gu: GameUpdateViewData | ErrorUpdate) { + tickUpdateSink?.(gu); +} + +function sendGameUpdateBatch(gameUpdates: GameUpdateViewData[]): void { + if (gameUpdates.length === 0) { + return; + } + + const transfers: Transferable[] = []; + for (const gu of gameUpdates) { + transfers.push(gu.packedTileUpdates.buffer); + if (gu.packedMotionPlans) { + transfers.push(gu.packedMotionPlans.buffer); + } + } + + ctx.postMessage( + { + type: "game_update_batch", + gameUpdates, + } as WorkerMessage, + transfers, + ); } function sendMessage(message: WorkerMessage) { - if (message.type === "game_update") { - // Transfer the packed tile updates buffer to avoid structured-clone copies and - // reduce worker-side memory churn during long runs / catch-up. - const transfers: Transferable[] = [ - message.gameUpdate.packedTileUpdates.buffer, - ]; - if (message.gameUpdate.packedMotionPlans) { - transfers.push(message.gameUpdate.packedMotionPlans.buffer); - } - ctx.postMessage(message, transfers); - return; - } ctx.postMessage(message); } @@ -49,20 +127,6 @@ ctx.addEventListener("message", async (e: MessageEvent) => { const message = e.data; switch (message.type) { - case "heartbeat": { - const gr = await gameRunner; - if (!gr) { - break; - } - const pendingTurns = gr.pendingTurns(); - const ticksToRun = Math.min(pendingTurns, MAX_TICKS_PER_HEARTBEAT); - for (let i = 0; i < ticksToRun; i++) { - if (!gr.executeNextTick(gr.pendingTurns())) { - break; - } - } - break; - } case "init": try { gameRunner = createGameRunner( @@ -90,7 +154,8 @@ ctx.addEventListener("message", async (e: MessageEvent) => { try { const gr = await gameRunner; - await gr.addTurn(message.turn); + gr.addTurn(message.turn); + scheduleDrain(); } catch (error) { console.error("Failed to process turn:", error); throw error; diff --git a/src/core/worker/WorkerClient.ts b/src/core/worker/WorkerClient.ts index a5039e9d7..80867706a 100644 --- a/src/core/worker/WorkerClient.ts +++ b/src/core/worker/WorkerClient.ts @@ -45,6 +45,13 @@ export class WorkerClient { this.gameUpdateCallback(message.gameUpdate); } break; + case "game_update_batch": + if (this.gameUpdateCallback && message.gameUpdates) { + for (const gu of message.gameUpdates) { + this.gameUpdateCallback(gu); + } + } + break; case "initialized": default: @@ -103,12 +110,6 @@ export class WorkerClient { }); } - sendHeartbeat() { - this.worker.postMessage({ - type: "heartbeat", - }); - } - playerProfile(playerID: number): Promise { return new Promise((resolve, reject) => { if (!this.isInitialized) { diff --git a/src/core/worker/WorkerMessages.ts b/src/core/worker/WorkerMessages.ts index 795df5497..07b0cc550 100644 --- a/src/core/worker/WorkerMessages.ts +++ b/src/core/worker/WorkerMessages.ts @@ -10,11 +10,11 @@ import { GameUpdateViewData } from "../game/GameUpdates"; import { ClientID, GameStartInfo, Turn } from "../Schemas"; export type WorkerMessageType = - | "heartbeat" | "init" | "initialized" | "turn" | "game_update" + | "game_update_batch" | "player_actions" | "player_actions_result" | "player_profile" @@ -32,10 +32,6 @@ interface BaseWorkerMessage { id?: string; } -export interface HeartbeatMessage extends BaseWorkerMessage { - type: "heartbeat"; -} - // Messages from main thread to worker export interface InitMessage extends BaseWorkerMessage { type: "init"; @@ -58,6 +54,11 @@ export interface GameUpdateMessage extends BaseWorkerMessage { gameUpdate: GameUpdateViewData; } +export interface GameUpdateBatchMessage extends BaseWorkerMessage { + type: "game_update_batch"; + gameUpdates: GameUpdateViewData[]; +} + export interface PlayerActionsMessage extends BaseWorkerMessage { type: "player_actions"; playerID: PlayerID; @@ -116,7 +117,6 @@ export interface TransportShipSpawnResultMessage extends BaseWorkerMessage { // Union types for type safety export type MainThreadMessage = - | HeartbeatMessage | InitMessage | TurnMessage | PlayerActionsMessage @@ -129,6 +129,7 @@ export type MainThreadMessage = export type WorkerMessage = | InitializedMessage | GameUpdateMessage + | GameUpdateBatchMessage | PlayerActionsResultMessage | PlayerProfileResultMessage | PlayerBorderTilesResultMessage