From efb252f94b3cd67944e3425591e697ad0d41cf89 Mon Sep 17 00:00:00 2001 From: scamiv <6170744+scamiv@users.noreply.github.com> Date: Sun, 25 Jan 2026 18:39:18 +0100 Subject: [PATCH] Refactor game update processing to async RAF-based queue with credit-based backpressure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Main thread no longer processes worker game_update inline; it queues updates and time-slices them on RAF (ClientGameRunner.ts:267). This prevents blocking the main thread during heavy update processing. Key changes: - Drain budgets are bounded (ms + count) to maintain frame timing - Every applied update still calls renderer.tick() (critical for UI layers that assume one tick per game tick; coalescing would break timers) - turnComplete(processedCount) is now "credits" (acked in batches), not per-turn lockstep - LocalServer is wall-clock paced for ×0.5/×1/×2, and backlog-capped for max (LocalServer.ts:78) - maxOutstandingTurns is the key knob (5 for fixed-rate, 200 for max right now) - Transport forwards the credit count (Transport.ts:395) --- src/client/ClientGameRunner.ts | 124 +++++++++++++++++++++++++++------ src/client/LocalServer.ts | 63 ++++++++++++----- src/client/Transport.ts | 4 +- 3 files changed, 150 insertions(+), 41 deletions(-) diff --git a/src/client/ClientGameRunner.ts b/src/client/ClientGameRunner.ts index 86f7d3e95..67790b8d7 100644 --- a/src/client/ClientGameRunner.ts +++ b/src/client/ClientGameRunner.ts @@ -262,6 +262,10 @@ export class ClientGameRunner { private lastTickReceiveTime: number = 0; private currentTickDelay: number | undefined = undefined; + private pendingUpdates: GameUpdateViewData[] = []; + private pendingUpdateIndex: number = 0; + private drainPendingUpdatesRafId: number | null = null; + constructor( private lobby: LobbyConfig, private eventBus: EventBus, @@ -361,28 +365,7 @@ export class ClientGameRunner { this.stop(); return; } - gu.updates[GameUpdateType.Hash].forEach((hu: HashUpdate) => { - this.eventBus.emit(new SendHashEvent(hu.tick, hu.hash)); - }); - this.gameView.update(gu); - this.renderer.tick(); - - // Emit tick metrics event for performance overlay - this.eventBus.emit( - new TickMetricsEvent(gu.tickExecutionDuration, this.currentTickDelay), - ); - - // Reset tick delay for next measurement - this.currentTickDelay = undefined; - - if (gu.updates[GameUpdateType.Win].length > 0) { - this.saveGame(gu.updates[GameUpdateType.Win][0]); - } - - // In singleplayer/replay (local server), only acknowledge the turn once the - // update has been fully applied and the renderer has ticked. This prevents - // the local server from queuing turns faster than we can process them. - this.transport.turnComplete(); + this.enqueueWorkerUpdate(gu); }); const onconnect = () => { @@ -506,6 +489,12 @@ export class ClientGameRunner { if (!this.isActive) return; this.isActive = false; + if (this.drainPendingUpdatesRafId !== null) { + cancelAnimationFrame(this.drainPendingUpdatesRafId); + this.drainPendingUpdatesRafId = null; + } + this.pendingUpdates = []; + this.pendingUpdateIndex = 0; this.worker.cleanup(); this.transport.leaveGame(); if (this.connectionCheckInterval) { @@ -518,6 +507,97 @@ export class ClientGameRunner { } } + private enqueueWorkerUpdate(update: GameUpdateViewData): void { + this.pendingUpdates.push(update); + + if (this.drainPendingUpdatesRafId !== null || !this.isActive) { + return; + } + + this.drainPendingUpdatesRafId = requestAnimationFrame(() => { + this.drainPendingUpdatesRafId = null; + this.drainPendingUpdates(); + }); + } + + private drainPendingUpdates(): void { + if (!this.isActive) { + return; + } + + const backlog = this.pendingUpdates.length - this.pendingUpdateIndex; + if (backlog <= 0) { + return; + } + + const startTime = performance.now(); + const maxDrainMs = backlog > 200 ? 12 : 8; + const maxUpdates = backlog > 200 ? 1000 : 200; + + let processed = 0; + let totalTickExecutionDuration = 0; + let winUpdate: WinUpdate | null = null; + + while ( + processed < maxUpdates && + this.pendingUpdateIndex < this.pendingUpdates.length && + performance.now() - startTime < maxDrainMs + ) { + const gu = this.pendingUpdates[this.pendingUpdateIndex]; + this.pendingUpdateIndex++; + processed++; + + totalTickExecutionDuration += gu.tickExecutionDuration ?? 0; + + gu.updates[GameUpdateType.Hash].forEach((hu: HashUpdate) => { + this.eventBus.emit(new SendHashEvent(hu.tick, hu.hash)); + }); + + if (gu.updates[GameUpdateType.Win].length > 0) { + winUpdate = gu.updates[GameUpdateType.Win][0]; + } + + this.gameView.update(gu); + this.renderer.tick(); + } + + if (processed > 0) { + const avgTickExecutionDuration = totalTickExecutionDuration / processed; + this.eventBus.emit( + new TickMetricsEvent(avgTickExecutionDuration, this.currentTickDelay), + ); + this.currentTickDelay = undefined; + + if (winUpdate) { + this.saveGame(winUpdate); + } + + // In singleplayer/replay (local server), acknowledge how many turns we've + // actually applied this frame. LocalServer uses this as bounded backpressure. + this.transport.turnComplete(processed); + } + + // Compact the queue occasionally to avoid unbounded growth from array holes. + if (this.pendingUpdateIndex >= this.pendingUpdates.length) { + this.pendingUpdates = []; + this.pendingUpdateIndex = 0; + } else if (this.pendingUpdateIndex > 1024) { + this.pendingUpdates = this.pendingUpdates.slice(this.pendingUpdateIndex); + this.pendingUpdateIndex = 0; + } + + // Keep draining if we still have backlog. + if ( + this.pendingUpdates.length - this.pendingUpdateIndex > 0 && + this.drainPendingUpdatesRafId === null + ) { + this.drainPendingUpdatesRafId = requestAnimationFrame(() => { + this.drainPendingUpdatesRafId = null; + this.drainPendingUpdates(); + }); + } + } + private inputEvent(event: MouseUpEvent) { if (!this.isActive || this.renderer.uiState.ghostStructure !== null) { return; diff --git a/src/client/LocalServer.ts b/src/client/LocalServer.ts index 75121b38a..fdb478034 100644 --- a/src/client/LocalServer.ts +++ b/src/client/LocalServer.ts @@ -36,6 +36,7 @@ export class LocalServer { private intents: Intent[] = []; private startedAt: number; + private nextTurnAtMs: number = 0; private paused = false; private replaySpeedMultiplier = defaultReplaySpeedMultiplier; @@ -44,7 +45,6 @@ export class LocalServer { private allPlayersStats: AllPlayersStats = {}; private turnsExecuted = 0; - private turnStartTime = 0; private turnCheckInterval: NodeJS.Timeout; private clientConnect: () => void; @@ -66,30 +66,54 @@ export class LocalServer { start() { console.log("local server starting"); + this.nextTurnAtMs = Date.now(); this.turnCheckInterval = setInterval(() => { - const turnIntervalMs = - this.lobbyConfig.serverConfig.turnIntervalMs() * - this.replaySpeedMultiplier; - const backlog = Math.max(0, this.turns.length - this.turnsExecuted); - const allowReplayBacklog = - this.replaySpeedMultiplier === ReplaySpeedMultiplier.fastest && - this.lobbyConfig.gameRecord !== undefined; - const maxBacklog = allowReplayBacklog ? MAX_REPLAY_BACKLOG_TURNS : 0; + if (this.paused) { + return; + } - const canQueueNextTurn = - backlog === 0 || (maxBacklog > 0 && backlog < maxBacklog); - if ( - canQueueNextTurn && - Date.now() > this.turnStartTime + turnIntervalMs + const baseTurnIntervalMs = this.lobbyConfig.serverConfig.turnIntervalMs(); + const turnIntervalMs = baseTurnIntervalMs * this.replaySpeedMultiplier; + + // Outstanding work is the number of turns we've emitted that the client hasn't applied yet. + const outstandingTurns = this.turns.length - this.turnsExecuted; + + // For ×0.5/×1/×2 we aim to stay close to real time. + // For "fastest" (interval 0), allow a larger but bounded backlog so the sim can sprint + // while the main thread drains updates opportunistically. + const maxOutstandingTurns = turnIntervalMs === 0 ? 200 : 5; + const maxTurnsToEmitPerCheck = turnIntervalMs === 0 ? 200 : 5; + + if (outstandingTurns >= maxOutstandingTurns) { + return; + } + + const now = Date.now(); + let emitted = 0; + while ( + emitted < maxTurnsToEmitPerCheck && + this.turns.length - this.turnsExecuted < maxOutstandingTurns && + (turnIntervalMs === 0 || now >= this.nextTurnAtMs) ) { - this.turnStartTime = Date.now(); // End turn on the server means the client will start processing the turn. this.endTurn(); + emitted++; + + if (turnIntervalMs === 0) { + // "Fastest": no wall-clock pacing; rely on backlog caps above. + this.nextTurnAtMs = now; + } else { + // Fixed-rate pacing: do not try to "catch up" after stalls; resume from now. + this.nextTurnAtMs = now + turnIntervalMs; + } } }, 5); this.eventBus.on(ReplaySpeedChangeEvent, (event) => { this.replaySpeedMultiplier = event.replaySpeedMultiplier; + // Apply speed changes immediately; the next scheduled turn time will be + // recalculated by the interval loop. + this.nextTurnAtMs = Date.now(); }); this.startedAt = Date.now(); @@ -126,11 +150,13 @@ export class LocalServer { this.intents.push(clientMsg.intent); this.endTurn(); this.paused = true; + this.nextTurnAtMs = Date.now(); } else { // Unpausing: clear pause flag before adding intent so next turn can execute this.paused = false; this.intents.push(clientMsg.intent); this.endTurn(); + this.nextTurnAtMs = Date.now(); } return; } @@ -185,8 +211,11 @@ export class LocalServer { } // This is so the client can tell us when it finished processing the turn. - public turnComplete() { - this.turnsExecuted++; + public turnComplete(count: number = 1) { + this.turnsExecuted = Math.min( + this.turnsExecuted + count, + this.turns.length, + ); } // endTurn in this context means the server has collected all the intents diff --git a/src/client/Transport.ts b/src/client/Transport.ts index 58307e113..5248f37d0 100644 --- a/src/client/Transport.ts +++ b/src/client/Transport.ts @@ -392,9 +392,9 @@ export class Transport { this.connect(this.onconnect, this.onmessage); } - public turnComplete() { + public turnComplete(count: number = 1) { if (this.isLocal) { - this.localServer.turnComplete(); + this.localServer.turnComplete(count); } }