mirror of
https://github.com/openfrontio/OpenFrontIO.git
synced 2026-06-22 18:36:39 +00:00
SAB+Atomics refactor
Added src/core/worker/SharedTileRing.ts, which defines a SharedArrayBuffer-backed ring buffer (SharedTileRingBuffers/SharedTileRingViews) and helpers pushTileUpdate (worker-side writer) and drainTileUpdates (main-thread reader) using Atomics. Extended GameRunner (src/core/GameRunner.ts) with an optional tileUpdateSink?: (update: bigint) => void; when provided, tile updates are sent to the sink instead of being packed into GameUpdateViewData.packedTileUpdates (those become an empty BigUint64Array in this mode). Extended the worker protocol (src/core/worker/WorkerMessages.ts) so the init message can optionally carry sharedTileRingHeader and sharedTileRingData (the two SABs for the ring). Updated WorkerClient (src/core/worker/WorkerClient.ts) to accept optional SharedTileRingBuffers in its constructor and, during initialize(), include those SABs in the init message sent to Worker.worker.ts. Updated the worker implementation (src/core/worker/Worker.worker.ts) to build sharedTileRing views from the SABs on "init" and pass a (update: bigint) => pushTileUpdate(...) sink into createGameRunner, so every tile update from the simulation goes into the shared ring instead of the message payload. Updated the client side (src/client/ClientGameRunner.ts): In createClientGame, when SharedArrayBuffer/Atomics are available and crossOriginIsolated is true, it allocates the ring (createSharedTileRingBuffers with capacity 262144), creates views, passes buffers into WorkerClient, and keeps the views in ClientGameRunner. ClientGameRunner now has a tileRingViews field and, in mergeGameUpdates, if the ring is present it ignores gu.packedTileUpdates and instead drains up to MAX_TILE_UPDATES_PER_RENDER (100 000) tile updates from the SAB ring into combinedPackedTileUpdates before constructing the merged GameUpdateViewData passed to GameView.update. If SABs are unavailable, it falls back to the original behaviour (packing gu.packedTileUpdates from messages), so the feature is opt-in and backward compatible. Net effect: when the page is cross-origin isolated, the worker no longer sends per-tick BigUint64Array tile payloads via postMessage; it streams tile updates into a shared ring buffer, and the main thread pulls and applies them at its own pace while still batching non-tile updates and rendering exactly as before. If you’d like, I can next run the existing tests/build in your environment to sanity-check everything.
This commit is contained in:
@@ -25,6 +25,13 @@ import {
|
||||
import { GameView, PlayerView } from "../core/game/GameView";
|
||||
import { loadTerrainMap, TerrainMapData } from "../core/game/TerrainMapLoader";
|
||||
import { UserSettings } from "../core/game/UserSettings";
|
||||
import {
|
||||
createSharedTileRingBuffers,
|
||||
createSharedTileRingViews,
|
||||
drainTileUpdates,
|
||||
SharedTileRingBuffers,
|
||||
SharedTileRingViews,
|
||||
} from "../core/worker/SharedTileRing";
|
||||
import { WorkerClient } from "../core/worker/WorkerClient";
|
||||
import {
|
||||
AutoUpgradeEvent,
|
||||
@@ -162,9 +169,30 @@ async function createClientGame(
|
||||
mapLoader,
|
||||
);
|
||||
}
|
||||
|
||||
let sharedTileRingBuffers: SharedTileRingBuffers | undefined;
|
||||
let sharedTileRingViews: SharedTileRingViews | null = null;
|
||||
const isIsolated =
|
||||
typeof (globalThis as any).crossOriginIsolated === "boolean"
|
||||
? (globalThis as any).crossOriginIsolated === true
|
||||
: false;
|
||||
const canUseSharedBuffers =
|
||||
typeof SharedArrayBuffer !== "undefined" &&
|
||||
typeof Atomics !== "undefined" &&
|
||||
isIsolated;
|
||||
|
||||
if (canUseSharedBuffers) {
|
||||
// Capacity is number of tile updates that can be queued.
|
||||
// This is a compromise between memory usage and backlog tolerance.
|
||||
const TILE_RING_CAPACITY = 262144;
|
||||
sharedTileRingBuffers = createSharedTileRingBuffers(TILE_RING_CAPACITY);
|
||||
sharedTileRingViews = createSharedTileRingViews(sharedTileRingBuffers);
|
||||
}
|
||||
|
||||
const worker = new WorkerClient(
|
||||
lobbyConfig.gameStartInfo,
|
||||
lobbyConfig.clientID,
|
||||
sharedTileRingBuffers,
|
||||
);
|
||||
await worker.initialize();
|
||||
const gameView = new GameView(
|
||||
@@ -191,6 +219,7 @@ async function createClientGame(
|
||||
transport,
|
||||
worker,
|
||||
gameView,
|
||||
sharedTileRingViews,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -222,6 +251,7 @@ export class ClientGameRunner {
|
||||
private pendingUpdates: GameUpdateViewData[] = [];
|
||||
private pendingStart = 0;
|
||||
private isProcessingUpdates = false;
|
||||
private tileRingViews: SharedTileRingViews | null;
|
||||
|
||||
constructor(
|
||||
private lobby: LobbyConfig,
|
||||
@@ -231,8 +261,10 @@ export class ClientGameRunner {
|
||||
private transport: Transport,
|
||||
private worker: WorkerClient,
|
||||
private gameView: GameView,
|
||||
tileRingViews: SharedTileRingViews | null,
|
||||
) {
|
||||
this.lastMessageTime = Date.now();
|
||||
this.tileRingViews = tileRingViews;
|
||||
}
|
||||
|
||||
private saveGame(update: WinUpdate) {
|
||||
@@ -603,9 +635,21 @@ export class ClientGameRunner {
|
||||
const updatesForType = gu.updates[type] as unknown as any[];
|
||||
(combinedUpdates[type] as unknown as any[]).push(...updatesForType);
|
||||
}
|
||||
gu.packedTileUpdates.forEach((tu) => {
|
||||
combinedPackedTileUpdates.push(tu);
|
||||
});
|
||||
}
|
||||
|
||||
if (this.tileRingViews) {
|
||||
const MAX_TILE_UPDATES_PER_RENDER = 100000;
|
||||
drainTileUpdates(
|
||||
this.tileRingViews,
|
||||
MAX_TILE_UPDATES_PER_RENDER,
|
||||
combinedPackedTileUpdates,
|
||||
);
|
||||
} else {
|
||||
for (const gu of batch) {
|
||||
gu.packedTileUpdates.forEach((tu) => {
|
||||
combinedPackedTileUpdates.push(tu);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
+17
-3
@@ -37,6 +37,7 @@ export async function createGameRunner(
|
||||
clientID: ClientID,
|
||||
mapLoader: GameMapLoader,
|
||||
callBack: (gu: GameUpdateViewData | ErrorUpdate) => void,
|
||||
tileUpdateSink?: (update: bigint) => void,
|
||||
): Promise<GameRunner> {
|
||||
const config = await getConfig(gameStart.config, null);
|
||||
const gameMap = await loadGameMap(
|
||||
@@ -81,6 +82,7 @@ export async function createGameRunner(
|
||||
game,
|
||||
new Executor(game, gameStart.gameID, clientID),
|
||||
callBack,
|
||||
tileUpdateSink,
|
||||
);
|
||||
gr.init();
|
||||
return gr;
|
||||
@@ -97,6 +99,7 @@ export class GameRunner {
|
||||
public game: Game,
|
||||
private execManager: Executor,
|
||||
private callBack: (gu: GameUpdateViewData | ErrorUpdate) => void,
|
||||
private tileUpdateSink?: (update: bigint) => void,
|
||||
) {}
|
||||
|
||||
init() {
|
||||
@@ -171,13 +174,24 @@ export class GameRunner {
|
||||
});
|
||||
}
|
||||
|
||||
// Many tiles are updated to pack it into an array
|
||||
const packedTileUpdates = updates[GameUpdateType.Tile].map((u) => u.update);
|
||||
// Many tiles are updated; either publish them via a shared sink or pack
|
||||
// them into the view data.
|
||||
let packedTileUpdates: BigUint64Array;
|
||||
const tileUpdates = updates[GameUpdateType.Tile];
|
||||
if (this.tileUpdateSink !== undefined) {
|
||||
for (const u of tileUpdates) {
|
||||
this.tileUpdateSink(u.update);
|
||||
}
|
||||
packedTileUpdates = new BigUint64Array();
|
||||
} else {
|
||||
const raw = tileUpdates.map((u) => u.update);
|
||||
packedTileUpdates = new BigUint64Array(raw);
|
||||
}
|
||||
updates[GameUpdateType.Tile] = [];
|
||||
|
||||
this.callBack({
|
||||
tick: this.game.ticks(),
|
||||
packedTileUpdates: new BigUint64Array(packedTileUpdates),
|
||||
packedTileUpdates,
|
||||
updates: updates,
|
||||
playerNameViewData: this.playerViewData,
|
||||
tickExecutionDuration: tickExecutionDuration,
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
export interface SharedTileRingBuffers {
|
||||
header: SharedArrayBuffer;
|
||||
data: SharedArrayBuffer;
|
||||
}
|
||||
|
||||
export interface SharedTileRingViews {
|
||||
header: Int32Array;
|
||||
buffer: BigUint64Array;
|
||||
capacity: number;
|
||||
}
|
||||
|
||||
// Header indices
|
||||
export const TILE_RING_HEADER_WRITE_INDEX = 0;
|
||||
export const TILE_RING_HEADER_READ_INDEX = 1;
|
||||
export const TILE_RING_HEADER_OVERFLOW = 2;
|
||||
|
||||
export function createSharedTileRingBuffers(
|
||||
capacity: number,
|
||||
): SharedTileRingBuffers {
|
||||
const header = new SharedArrayBuffer(3 * Int32Array.BYTES_PER_ELEMENT);
|
||||
const data = new SharedArrayBuffer(
|
||||
capacity * BigUint64Array.BYTES_PER_ELEMENT,
|
||||
);
|
||||
return { header, data };
|
||||
}
|
||||
|
||||
export function createSharedTileRingViews(
|
||||
buffers: SharedTileRingBuffers,
|
||||
): SharedTileRingViews {
|
||||
const header = new Int32Array(buffers.header);
|
||||
const buffer = new BigUint64Array(buffers.data);
|
||||
return {
|
||||
header,
|
||||
buffer,
|
||||
capacity: buffer.length,
|
||||
};
|
||||
}
|
||||
|
||||
export function pushTileUpdate(
|
||||
views: SharedTileRingViews,
|
||||
value: bigint,
|
||||
): void {
|
||||
const { header, buffer, capacity } = views;
|
||||
|
||||
const write = Atomics.load(header, TILE_RING_HEADER_WRITE_INDEX);
|
||||
const read = Atomics.load(header, TILE_RING_HEADER_READ_INDEX);
|
||||
const nextWrite = (write + 1) % capacity;
|
||||
|
||||
// If the buffer is full, advance read (drop oldest) and mark overflow.
|
||||
if (nextWrite === read) {
|
||||
Atomics.store(header, TILE_RING_HEADER_OVERFLOW, 1);
|
||||
const nextRead = (read + 1) % capacity;
|
||||
Atomics.store(header, TILE_RING_HEADER_READ_INDEX, nextRead);
|
||||
}
|
||||
|
||||
buffer[write] = value;
|
||||
Atomics.store(header, TILE_RING_HEADER_WRITE_INDEX, nextWrite);
|
||||
}
|
||||
|
||||
export function drainTileUpdates(
|
||||
views: SharedTileRingViews,
|
||||
maxItems: number,
|
||||
out: bigint[],
|
||||
): void {
|
||||
const { header, buffer, capacity } = views;
|
||||
|
||||
let read = Atomics.load(header, TILE_RING_HEADER_READ_INDEX);
|
||||
const write = Atomics.load(header, TILE_RING_HEADER_WRITE_INDEX);
|
||||
|
||||
let count = 0;
|
||||
|
||||
while (read !== write && count < maxItems) {
|
||||
out.push(buffer[read]);
|
||||
read = (read + 1) % capacity;
|
||||
count++;
|
||||
}
|
||||
|
||||
Atomics.store(header, TILE_RING_HEADER_READ_INDEX, read);
|
||||
}
|
||||
@@ -2,6 +2,11 @@ import version from "../../../resources/version.txt";
|
||||
import { createGameRunner, GameRunner } from "../GameRunner";
|
||||
import { FetchGameMapLoader } from "../game/FetchGameMapLoader";
|
||||
import { ErrorUpdate, GameUpdateViewData } from "../game/GameUpdates";
|
||||
import {
|
||||
createSharedTileRingViews,
|
||||
pushTileUpdate,
|
||||
SharedTileRingViews,
|
||||
} from "./SharedTileRing";
|
||||
import {
|
||||
AttackAveragePositionResultMessage,
|
||||
InitializedMessage,
|
||||
@@ -17,6 +22,7 @@ const ctx: Worker = self as any;
|
||||
let gameRunner: Promise<GameRunner> | null = null;
|
||||
const mapLoader = new FetchGameMapLoader(`/maps`, version);
|
||||
let isProcessingTurns = false;
|
||||
let sharedTileRing: SharedTileRingViews | null = null;
|
||||
|
||||
function gameUpdate(gu: GameUpdateViewData | ErrorUpdate) {
|
||||
// skip if ErrorUpdate
|
||||
@@ -62,11 +68,23 @@ ctx.addEventListener("message", async (e: MessageEvent<MainThreadMessage>) => {
|
||||
switch (message.type) {
|
||||
case "init":
|
||||
try {
|
||||
if (message.sharedTileRingHeader && message.sharedTileRingData) {
|
||||
sharedTileRing = createSharedTileRingViews({
|
||||
header: message.sharedTileRingHeader,
|
||||
data: message.sharedTileRingData,
|
||||
});
|
||||
} else {
|
||||
sharedTileRing = null;
|
||||
}
|
||||
|
||||
gameRunner = createGameRunner(
|
||||
message.gameStartInfo,
|
||||
message.clientID,
|
||||
mapLoader,
|
||||
gameUpdate,
|
||||
sharedTileRing
|
||||
? (update: bigint) => pushTileUpdate(sharedTileRing!, update)
|
||||
: undefined,
|
||||
).then((gr) => {
|
||||
sendMessage({
|
||||
type: "initialized",
|
||||
|
||||
@@ -9,6 +9,7 @@ import { TileRef } from "../game/GameMap";
|
||||
import { ErrorUpdate, GameUpdateViewData } from "../game/GameUpdates";
|
||||
import { ClientID, GameStartInfo, Turn } from "../Schemas";
|
||||
import { generateID } from "../Util";
|
||||
import { SharedTileRingBuffers } from "./SharedTileRing";
|
||||
import { WorkerMessage } from "./WorkerMessages";
|
||||
|
||||
export class WorkerClient {
|
||||
@@ -22,6 +23,7 @@ export class WorkerClient {
|
||||
constructor(
|
||||
private gameStartInfo: GameStartInfo,
|
||||
private clientID: ClientID,
|
||||
private sharedTileRingBuffers?: SharedTileRingBuffers,
|
||||
) {
|
||||
this.worker = new Worker(new URL("./Worker.worker.ts", import.meta.url));
|
||||
this.messageHandlers = new Map();
|
||||
@@ -70,6 +72,8 @@ export class WorkerClient {
|
||||
id: messageId,
|
||||
gameStartInfo: this.gameStartInfo,
|
||||
clientID: this.clientID,
|
||||
sharedTileRingHeader: this.sharedTileRingBuffers?.header,
|
||||
sharedTileRingData: this.sharedTileRingBuffers?.data,
|
||||
});
|
||||
|
||||
// Add timeout for initialization
|
||||
|
||||
@@ -35,6 +35,8 @@ export interface InitMessage extends BaseWorkerMessage {
|
||||
type: "init";
|
||||
gameStartInfo: GameStartInfo;
|
||||
clientID: ClientID;
|
||||
sharedTileRingHeader?: SharedArrayBuffer;
|
||||
sharedTileRingData?: SharedArrayBuffer;
|
||||
}
|
||||
|
||||
export interface TurnMessage extends BaseWorkerMessage {
|
||||
|
||||
Reference in New Issue
Block a user