mirror of
https://github.com/openfrontio/OpenFrontIO.git
synced 2026-06-21 16:50:15 +00:00
Refactor game update processing to async RAF-based queue with credit-based backpressure
Main thread no longer processes worker game_update inline; it queues updates and time-slices them on RAF (ClientGameRunner.ts:267). This prevents blocking the main thread during heavy update processing. Key changes: - Drain budgets are bounded (ms + count) to maintain frame timing - Every applied update still calls renderer.tick() (critical for UI layers that assume one tick per game tick; coalescing would break timers) - turnComplete(processedCount) is now "credits" (acked in batches), not per-turn lockstep - LocalServer is wall-clock paced for ×0.5/×1/×2, and backlog-capped for max (LocalServer.ts:78) - maxOutstandingTurns is the key knob (5 for fixed-rate, 200 for max right now) - Transport forwards the credit count (Transport.ts:395)
This commit is contained in:
+102
-22
@@ -262,6 +262,10 @@ export class ClientGameRunner {
|
||||
private lastTickReceiveTime: number = 0;
|
||||
private currentTickDelay: number | undefined = undefined;
|
||||
|
||||
private pendingUpdates: GameUpdateViewData[] = [];
|
||||
private pendingUpdateIndex: number = 0;
|
||||
private drainPendingUpdatesRafId: number | null = null;
|
||||
|
||||
constructor(
|
||||
private lobby: LobbyConfig,
|
||||
private eventBus: EventBus,
|
||||
@@ -361,28 +365,7 @@ export class ClientGameRunner {
|
||||
this.stop();
|
||||
return;
|
||||
}
|
||||
gu.updates[GameUpdateType.Hash].forEach((hu: HashUpdate) => {
|
||||
this.eventBus.emit(new SendHashEvent(hu.tick, hu.hash));
|
||||
});
|
||||
this.gameView.update(gu);
|
||||
this.renderer.tick();
|
||||
|
||||
// Emit tick metrics event for performance overlay
|
||||
this.eventBus.emit(
|
||||
new TickMetricsEvent(gu.tickExecutionDuration, this.currentTickDelay),
|
||||
);
|
||||
|
||||
// Reset tick delay for next measurement
|
||||
this.currentTickDelay = undefined;
|
||||
|
||||
if (gu.updates[GameUpdateType.Win].length > 0) {
|
||||
this.saveGame(gu.updates[GameUpdateType.Win][0]);
|
||||
}
|
||||
|
||||
// In singleplayer/replay (local server), only acknowledge the turn once the
|
||||
// update has been fully applied and the renderer has ticked. This prevents
|
||||
// the local server from queuing turns faster than we can process them.
|
||||
this.transport.turnComplete();
|
||||
this.enqueueWorkerUpdate(gu);
|
||||
});
|
||||
|
||||
const onconnect = () => {
|
||||
@@ -506,6 +489,12 @@ export class ClientGameRunner {
|
||||
if (!this.isActive) return;
|
||||
|
||||
this.isActive = false;
|
||||
if (this.drainPendingUpdatesRafId !== null) {
|
||||
cancelAnimationFrame(this.drainPendingUpdatesRafId);
|
||||
this.drainPendingUpdatesRafId = null;
|
||||
}
|
||||
this.pendingUpdates = [];
|
||||
this.pendingUpdateIndex = 0;
|
||||
this.worker.cleanup();
|
||||
this.transport.leaveGame();
|
||||
if (this.connectionCheckInterval) {
|
||||
@@ -518,6 +507,97 @@ export class ClientGameRunner {
|
||||
}
|
||||
}
|
||||
|
||||
private enqueueWorkerUpdate(update: GameUpdateViewData): void {
|
||||
this.pendingUpdates.push(update);
|
||||
|
||||
if (this.drainPendingUpdatesRafId !== null || !this.isActive) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.drainPendingUpdatesRafId = requestAnimationFrame(() => {
|
||||
this.drainPendingUpdatesRafId = null;
|
||||
this.drainPendingUpdates();
|
||||
});
|
||||
}
|
||||
|
||||
private drainPendingUpdates(): void {
|
||||
if (!this.isActive) {
|
||||
return;
|
||||
}
|
||||
|
||||
const backlog = this.pendingUpdates.length - this.pendingUpdateIndex;
|
||||
if (backlog <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const startTime = performance.now();
|
||||
const maxDrainMs = backlog > 200 ? 12 : 8;
|
||||
const maxUpdates = backlog > 200 ? 1000 : 200;
|
||||
|
||||
let processed = 0;
|
||||
let totalTickExecutionDuration = 0;
|
||||
let winUpdate: WinUpdate | null = null;
|
||||
|
||||
while (
|
||||
processed < maxUpdates &&
|
||||
this.pendingUpdateIndex < this.pendingUpdates.length &&
|
||||
performance.now() - startTime < maxDrainMs
|
||||
) {
|
||||
const gu = this.pendingUpdates[this.pendingUpdateIndex];
|
||||
this.pendingUpdateIndex++;
|
||||
processed++;
|
||||
|
||||
totalTickExecutionDuration += gu.tickExecutionDuration ?? 0;
|
||||
|
||||
gu.updates[GameUpdateType.Hash].forEach((hu: HashUpdate) => {
|
||||
this.eventBus.emit(new SendHashEvent(hu.tick, hu.hash));
|
||||
});
|
||||
|
||||
if (gu.updates[GameUpdateType.Win].length > 0) {
|
||||
winUpdate = gu.updates[GameUpdateType.Win][0];
|
||||
}
|
||||
|
||||
this.gameView.update(gu);
|
||||
this.renderer.tick();
|
||||
}
|
||||
|
||||
if (processed > 0) {
|
||||
const avgTickExecutionDuration = totalTickExecutionDuration / processed;
|
||||
this.eventBus.emit(
|
||||
new TickMetricsEvent(avgTickExecutionDuration, this.currentTickDelay),
|
||||
);
|
||||
this.currentTickDelay = undefined;
|
||||
|
||||
if (winUpdate) {
|
||||
this.saveGame(winUpdate);
|
||||
}
|
||||
|
||||
// In singleplayer/replay (local server), acknowledge how many turns we've
|
||||
// actually applied this frame. LocalServer uses this as bounded backpressure.
|
||||
this.transport.turnComplete(processed);
|
||||
}
|
||||
|
||||
// Compact the queue occasionally to avoid unbounded growth from array holes.
|
||||
if (this.pendingUpdateIndex >= this.pendingUpdates.length) {
|
||||
this.pendingUpdates = [];
|
||||
this.pendingUpdateIndex = 0;
|
||||
} else if (this.pendingUpdateIndex > 1024) {
|
||||
this.pendingUpdates = this.pendingUpdates.slice(this.pendingUpdateIndex);
|
||||
this.pendingUpdateIndex = 0;
|
||||
}
|
||||
|
||||
// Keep draining if we still have backlog.
|
||||
if (
|
||||
this.pendingUpdates.length - this.pendingUpdateIndex > 0 &&
|
||||
this.drainPendingUpdatesRafId === null
|
||||
) {
|
||||
this.drainPendingUpdatesRafId = requestAnimationFrame(() => {
|
||||
this.drainPendingUpdatesRafId = null;
|
||||
this.drainPendingUpdates();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private inputEvent(event: MouseUpEvent) {
|
||||
if (!this.isActive || this.renderer.uiState.ghostStructure !== null) {
|
||||
return;
|
||||
|
||||
+46
-17
@@ -36,6 +36,7 @@ export class LocalServer {
|
||||
|
||||
private intents: Intent[] = [];
|
||||
private startedAt: number;
|
||||
private nextTurnAtMs: number = 0;
|
||||
|
||||
private paused = false;
|
||||
private replaySpeedMultiplier = defaultReplaySpeedMultiplier;
|
||||
@@ -44,7 +45,6 @@ export class LocalServer {
|
||||
private allPlayersStats: AllPlayersStats = {};
|
||||
|
||||
private turnsExecuted = 0;
|
||||
private turnStartTime = 0;
|
||||
|
||||
private turnCheckInterval: NodeJS.Timeout;
|
||||
private clientConnect: () => void;
|
||||
@@ -66,30 +66,54 @@ export class LocalServer {
|
||||
|
||||
start() {
|
||||
console.log("local server starting");
|
||||
this.nextTurnAtMs = Date.now();
|
||||
this.turnCheckInterval = setInterval(() => {
|
||||
const turnIntervalMs =
|
||||
this.lobbyConfig.serverConfig.turnIntervalMs() *
|
||||
this.replaySpeedMultiplier;
|
||||
const backlog = Math.max(0, this.turns.length - this.turnsExecuted);
|
||||
const allowReplayBacklog =
|
||||
this.replaySpeedMultiplier === ReplaySpeedMultiplier.fastest &&
|
||||
this.lobbyConfig.gameRecord !== undefined;
|
||||
const maxBacklog = allowReplayBacklog ? MAX_REPLAY_BACKLOG_TURNS : 0;
|
||||
if (this.paused) {
|
||||
return;
|
||||
}
|
||||
|
||||
const canQueueNextTurn =
|
||||
backlog === 0 || (maxBacklog > 0 && backlog < maxBacklog);
|
||||
if (
|
||||
canQueueNextTurn &&
|
||||
Date.now() > this.turnStartTime + turnIntervalMs
|
||||
const baseTurnIntervalMs = this.lobbyConfig.serverConfig.turnIntervalMs();
|
||||
const turnIntervalMs = baseTurnIntervalMs * this.replaySpeedMultiplier;
|
||||
|
||||
// Outstanding work is the number of turns we've emitted that the client hasn't applied yet.
|
||||
const outstandingTurns = this.turns.length - this.turnsExecuted;
|
||||
|
||||
// For ×0.5/×1/×2 we aim to stay close to real time.
|
||||
// For "fastest" (interval 0), allow a larger but bounded backlog so the sim can sprint
|
||||
// while the main thread drains updates opportunistically.
|
||||
const maxOutstandingTurns = turnIntervalMs === 0 ? 200 : 5;
|
||||
const maxTurnsToEmitPerCheck = turnIntervalMs === 0 ? 200 : 5;
|
||||
|
||||
if (outstandingTurns >= maxOutstandingTurns) {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
let emitted = 0;
|
||||
while (
|
||||
emitted < maxTurnsToEmitPerCheck &&
|
||||
this.turns.length - this.turnsExecuted < maxOutstandingTurns &&
|
||||
(turnIntervalMs === 0 || now >= this.nextTurnAtMs)
|
||||
) {
|
||||
this.turnStartTime = Date.now();
|
||||
// End turn on the server means the client will start processing the turn.
|
||||
this.endTurn();
|
||||
emitted++;
|
||||
|
||||
if (turnIntervalMs === 0) {
|
||||
// "Fastest": no wall-clock pacing; rely on backlog caps above.
|
||||
this.nextTurnAtMs = now;
|
||||
} else {
|
||||
// Fixed-rate pacing: do not try to "catch up" after stalls; resume from now.
|
||||
this.nextTurnAtMs = now + turnIntervalMs;
|
||||
}
|
||||
}
|
||||
}, 5);
|
||||
|
||||
this.eventBus.on(ReplaySpeedChangeEvent, (event) => {
|
||||
this.replaySpeedMultiplier = event.replaySpeedMultiplier;
|
||||
// Apply speed changes immediately; the next scheduled turn time will be
|
||||
// recalculated by the interval loop.
|
||||
this.nextTurnAtMs = Date.now();
|
||||
});
|
||||
|
||||
this.startedAt = Date.now();
|
||||
@@ -126,11 +150,13 @@ export class LocalServer {
|
||||
this.intents.push(clientMsg.intent);
|
||||
this.endTurn();
|
||||
this.paused = true;
|
||||
this.nextTurnAtMs = Date.now();
|
||||
} else {
|
||||
// Unpausing: clear pause flag before adding intent so next turn can execute
|
||||
this.paused = false;
|
||||
this.intents.push(clientMsg.intent);
|
||||
this.endTurn();
|
||||
this.nextTurnAtMs = Date.now();
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -185,8 +211,11 @@ export class LocalServer {
|
||||
}
|
||||
|
||||
// This is so the client can tell us when it finished processing the turn.
|
||||
public turnComplete() {
|
||||
this.turnsExecuted++;
|
||||
public turnComplete(count: number = 1) {
|
||||
this.turnsExecuted = Math.min(
|
||||
this.turnsExecuted + count,
|
||||
this.turns.length,
|
||||
);
|
||||
}
|
||||
|
||||
// endTurn in this context means the server has collected all the intents
|
||||
|
||||
@@ -392,9 +392,9 @@ export class Transport {
|
||||
this.connect(this.onconnect, this.onmessage);
|
||||
}
|
||||
|
||||
public turnComplete() {
|
||||
public turnComplete(count: number = 1) {
|
||||
if (this.isLocal) {
|
||||
this.localServer.turnComplete();
|
||||
this.localServer.turnComplete(count);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user