mirror of
https://github.com/openfrontio/OpenFrontIO.git
synced 2026-06-21 12:00:44 +00:00
perf(worker): remove heartbeat; batch game updates (#3308)
## Description: Removes the client-driven heartbeat loop and switches worker tick execution to a worker-owned drain scheduler with batched game update delivery. ## Why The previous flow required the client to send a `heartbeat` every animation frame just to keep the worker progressing turns. That had two costs: 1. Simulation progress was coupled to browser frame cadence. 2. Catch-up periods produced many single `game_update` messages, increasing message overhead and main-thread wakeups. ## What Changed ### 1) Remove heartbeat protocol - Deleted `heartbeat` from `WorkerMessageType`. - Removed `HeartbeatMessage` from `MainThreadMessage`. - Removed `sendHeartbeat()` from `WorkerClient`. - Removed the `requestAnimationFrame` keep-alive loop in `ClientGameRunner`. Files: - `src/client/ClientGameRunner.ts` - `src/core/worker/WorkerClient.ts` - `src/core/worker/WorkerMessages.ts` - `src/core/worker/Worker.worker.ts` ### 2) Add batched worker-to-client updates - Added `game_update_batch` message type and `GameUpdateBatchMessage`. - Worker now emits one batch message containing multiple tick updates. - `WorkerClient` handles `game_update_batch` by replaying updates to the existing callback in order. Files: - `src/core/worker/WorkerMessages.ts` - `src/core/worker/WorkerClient.ts` ### 3) Move tick draining into worker - Added a scheduler (`scheduleDrain`) and drain loop (`drain`) in `Worker.worker.ts`. - On each `turn` message, worker enqueues turn and schedules drain. - Drain executes up to `MAX_TICKS_BEFORE_YIELD = 4` ticks per cycle, then yields with `setTimeout(..., 0)`. - Tick updates are collected into a batch and sent once with transferables: - `packedTileUpdates.buffer` - `packedMotionPlans.buffer` (when present) - If backlog remains, drain reschedules itself. File: - `src/core/worker/Worker.worker.ts` ## Behavioral Notes - No server protocol changes. - Ggame update callback contract remains the same (still receives one `GameUpdateViewData` at a time in order). - Ordering is preserved: `WorkerClient` iterates batch entries in sequence. - Error updates are still filtered from update delivery in the worker batch path (same effective behavior as before for normal update flow). ## Expected Impact - Fewer `postMessage` calls during backlog and burst turn delivery. - Lower message overhead and fewer main-thread interrupts. - Less dependence on UI frame timing for worker progress. - Better catch-up stability due to explicit periodic yielding. ## Risk Areas - Drain scheduling edge cases (re-entrancy / lost wake-ups). - Mitigated with `drainScheduled`, `draining`, and `drainRequested` flags. - Larger per-message payloads due to batching. - Bounded by `MAX_TICKS_BEFORE_YIELD`. - Any assumptions in downstream code about receiving only `game_update`. - Handled by adding `game_update_batch` support in `WorkerClient`. ## Please complete the following: - [x] I have added screenshots for all UI updates - [x] I process any text displayed to the user through translateText() and I've added it to the en.json file - [x] I have added relevant tests to the test directory - [x] I confirm I have thoroughly tested these changes and take full responsibility for any bugs introduced ## Please put your Discord username so you can be contacted if a bug or regression is found: DISCORD_USERNAME
This commit is contained in:
@@ -385,15 +385,6 @@ export class ClientGameRunner {
|
||||
}
|
||||
});
|
||||
|
||||
const worker = this.worker;
|
||||
const keepWorkerAlive = () => {
|
||||
if (this.isActive) {
|
||||
worker.sendHeartbeat();
|
||||
requestAnimationFrame(keepWorkerAlive);
|
||||
}
|
||||
};
|
||||
requestAnimationFrame(keepWorkerAlive);
|
||||
|
||||
const onconnect = () => {
|
||||
console.log("Connected to game server!");
|
||||
this.transport.rejoinGame(this.turnsSeen);
|
||||
|
||||
@@ -16,32 +16,110 @@ import {
|
||||
const ctx: Worker = self as any;
|
||||
let gameRunner: Promise<GameRunner> | null = null;
|
||||
const mapLoader = new FetchGameMapLoader(`/maps`, version);
|
||||
const MAX_TICKS_PER_HEARTBEAT = 4;
|
||||
// Yield threshold; not a backlog cap. Used to avoid monopolizing the worker task
|
||||
// and flooding the main thread with messages during catch-up.
|
||||
const MAX_TICKS_BEFORE_YIELD = 4;
|
||||
|
||||
function gameUpdate(gu: GameUpdateViewData | ErrorUpdate) {
|
||||
// skip if ErrorUpdate
|
||||
if (!("updates" in gu)) {
|
||||
let drainScheduled = false;
|
||||
let draining = false;
|
||||
let drainRequested = false;
|
||||
|
||||
function scheduleDrain(): void {
|
||||
drainRequested = true;
|
||||
if (drainScheduled || draining) {
|
||||
return;
|
||||
}
|
||||
sendMessage({
|
||||
type: "game_update",
|
||||
gameUpdate: gu,
|
||||
});
|
||||
drainScheduled = true;
|
||||
setTimeout(() => {
|
||||
void drain().catch((e) => {
|
||||
console.error("Worker drain failed:", e);
|
||||
});
|
||||
}, 0);
|
||||
}
|
||||
|
||||
async function drain(): Promise<void> {
|
||||
drainScheduled = false;
|
||||
if (draining) {
|
||||
return;
|
||||
}
|
||||
if (!gameRunner) {
|
||||
return;
|
||||
}
|
||||
|
||||
draining = true;
|
||||
drainRequested = false;
|
||||
let shouldContinue = false;
|
||||
try {
|
||||
const gr = await gameRunner;
|
||||
if (!gr) {
|
||||
return;
|
||||
}
|
||||
|
||||
const batch: GameUpdateViewData[] = [];
|
||||
const onTickUpdate = (gu: GameUpdateViewData | ErrorUpdate) => {
|
||||
if (!("updates" in gu)) {
|
||||
return;
|
||||
}
|
||||
batch.push(gu);
|
||||
};
|
||||
|
||||
// Temporarily route tick callbacks into this drain's batch.
|
||||
tickUpdateSink = onTickUpdate;
|
||||
|
||||
let ticksRun = 0;
|
||||
while (ticksRun < MAX_TICKS_BEFORE_YIELD && gr.pendingTurns() > 0) {
|
||||
const ok = gr.executeNextTick(gr.pendingTurns());
|
||||
if (!ok) {
|
||||
break;
|
||||
}
|
||||
ticksRun++;
|
||||
}
|
||||
|
||||
tickUpdateSink = null;
|
||||
|
||||
sendGameUpdateBatch(batch);
|
||||
|
||||
shouldContinue = gr.pendingTurns() > 0;
|
||||
} finally {
|
||||
tickUpdateSink = null;
|
||||
draining = false;
|
||||
}
|
||||
|
||||
if (shouldContinue || drainRequested) {
|
||||
scheduleDrain();
|
||||
}
|
||||
}
|
||||
|
||||
let tickUpdateSink: ((gu: GameUpdateViewData | ErrorUpdate) => void) | null =
|
||||
null;
|
||||
|
||||
function gameUpdate(gu: GameUpdateViewData | ErrorUpdate) {
|
||||
tickUpdateSink?.(gu);
|
||||
}
|
||||
|
||||
function sendGameUpdateBatch(gameUpdates: GameUpdateViewData[]): void {
|
||||
if (gameUpdates.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const transfers: Transferable[] = [];
|
||||
for (const gu of gameUpdates) {
|
||||
transfers.push(gu.packedTileUpdates.buffer);
|
||||
if (gu.packedMotionPlans) {
|
||||
transfers.push(gu.packedMotionPlans.buffer);
|
||||
}
|
||||
}
|
||||
|
||||
ctx.postMessage(
|
||||
{
|
||||
type: "game_update_batch",
|
||||
gameUpdates,
|
||||
} as WorkerMessage,
|
||||
transfers,
|
||||
);
|
||||
}
|
||||
|
||||
function sendMessage(message: WorkerMessage) {
|
||||
if (message.type === "game_update") {
|
||||
// Transfer the packed tile updates buffer to avoid structured-clone copies and
|
||||
// reduce worker-side memory churn during long runs / catch-up.
|
||||
const transfers: Transferable[] = [
|
||||
message.gameUpdate.packedTileUpdates.buffer,
|
||||
];
|
||||
if (message.gameUpdate.packedMotionPlans) {
|
||||
transfers.push(message.gameUpdate.packedMotionPlans.buffer);
|
||||
}
|
||||
ctx.postMessage(message, transfers);
|
||||
return;
|
||||
}
|
||||
ctx.postMessage(message);
|
||||
}
|
||||
|
||||
@@ -49,20 +127,6 @@ ctx.addEventListener("message", async (e: MessageEvent<MainThreadMessage>) => {
|
||||
const message = e.data;
|
||||
|
||||
switch (message.type) {
|
||||
case "heartbeat": {
|
||||
const gr = await gameRunner;
|
||||
if (!gr) {
|
||||
break;
|
||||
}
|
||||
const pendingTurns = gr.pendingTurns();
|
||||
const ticksToRun = Math.min(pendingTurns, MAX_TICKS_PER_HEARTBEAT);
|
||||
for (let i = 0; i < ticksToRun; i++) {
|
||||
if (!gr.executeNextTick(gr.pendingTurns())) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case "init":
|
||||
try {
|
||||
gameRunner = createGameRunner(
|
||||
@@ -90,7 +154,8 @@ ctx.addEventListener("message", async (e: MessageEvent<MainThreadMessage>) => {
|
||||
|
||||
try {
|
||||
const gr = await gameRunner;
|
||||
await gr.addTurn(message.turn);
|
||||
gr.addTurn(message.turn);
|
||||
scheduleDrain();
|
||||
} catch (error) {
|
||||
console.error("Failed to process turn:", error);
|
||||
throw error;
|
||||
|
||||
@@ -45,6 +45,13 @@ export class WorkerClient {
|
||||
this.gameUpdateCallback(message.gameUpdate);
|
||||
}
|
||||
break;
|
||||
case "game_update_batch":
|
||||
if (this.gameUpdateCallback && message.gameUpdates) {
|
||||
for (const gu of message.gameUpdates) {
|
||||
this.gameUpdateCallback(gu);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case "initialized":
|
||||
default:
|
||||
@@ -103,12 +110,6 @@ export class WorkerClient {
|
||||
});
|
||||
}
|
||||
|
||||
sendHeartbeat() {
|
||||
this.worker.postMessage({
|
||||
type: "heartbeat",
|
||||
});
|
||||
}
|
||||
|
||||
playerProfile(playerID: number): Promise<PlayerProfile> {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!this.isInitialized) {
|
||||
|
||||
@@ -10,11 +10,11 @@ import { GameUpdateViewData } from "../game/GameUpdates";
|
||||
import { ClientID, GameStartInfo, Turn } from "../Schemas";
|
||||
|
||||
export type WorkerMessageType =
|
||||
| "heartbeat"
|
||||
| "init"
|
||||
| "initialized"
|
||||
| "turn"
|
||||
| "game_update"
|
||||
| "game_update_batch"
|
||||
| "player_actions"
|
||||
| "player_actions_result"
|
||||
| "player_profile"
|
||||
@@ -32,10 +32,6 @@ interface BaseWorkerMessage {
|
||||
id?: string;
|
||||
}
|
||||
|
||||
export interface HeartbeatMessage extends BaseWorkerMessage {
|
||||
type: "heartbeat";
|
||||
}
|
||||
|
||||
// Messages from main thread to worker
|
||||
export interface InitMessage extends BaseWorkerMessage {
|
||||
type: "init";
|
||||
@@ -58,6 +54,11 @@ export interface GameUpdateMessage extends BaseWorkerMessage {
|
||||
gameUpdate: GameUpdateViewData;
|
||||
}
|
||||
|
||||
export interface GameUpdateBatchMessage extends BaseWorkerMessage {
|
||||
type: "game_update_batch";
|
||||
gameUpdates: GameUpdateViewData[];
|
||||
}
|
||||
|
||||
export interface PlayerActionsMessage extends BaseWorkerMessage {
|
||||
type: "player_actions";
|
||||
playerID: PlayerID;
|
||||
@@ -116,7 +117,6 @@ export interface TransportShipSpawnResultMessage extends BaseWorkerMessage {
|
||||
|
||||
// Union types for type safety
|
||||
export type MainThreadMessage =
|
||||
| HeartbeatMessage
|
||||
| InitMessage
|
||||
| TurnMessage
|
||||
| PlayerActionsMessage
|
||||
@@ -129,6 +129,7 @@ export type MainThreadMessage =
|
||||
export type WorkerMessage =
|
||||
| InitializedMessage
|
||||
| GameUpdateMessage
|
||||
| GameUpdateBatchMessage
|
||||
| PlayerActionsResultMessage
|
||||
| PlayerProfileResultMessage
|
||||
| PlayerBorderTilesResultMessage
|
||||
|
||||
Reference in New Issue
Block a user