Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 102 additions & 27 deletions src/client/ClientGameRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ export class ClientGameRunner {
private lastTickReceiveTime: number = 0;
private currentTickDelay: number | undefined = undefined;

private pendingUpdates: GameUpdateViewData[] = [];
private pendingUpdateIndex: number = 0;
private drainPendingUpdatesRafId: number | null = null;

constructor(
private lobby: LobbyConfig,
private eventBus: EventBus,
Expand Down Expand Up @@ -361,35 +365,9 @@ export class ClientGameRunner {
this.stop();
return;
}
this.transport.turnComplete();
gu.updates[GameUpdateType.Hash].forEach((hu: HashUpdate) => {
this.eventBus.emit(new SendHashEvent(hu.tick, hu.hash));
});
this.gameView.update(gu);
this.renderer.tick();

// Emit tick metrics event for performance overlay
this.eventBus.emit(
new TickMetricsEvent(gu.tickExecutionDuration, this.currentTickDelay),
);

// Reset tick delay for next measurement
this.currentTickDelay = undefined;

if (gu.updates[GameUpdateType.Win].length > 0) {
this.saveGame(gu.updates[GameUpdateType.Win][0]);
}
this.enqueueWorkerUpdate(gu);
});

const worker = this.worker;
const keepWorkerAlive = () => {
if (this.isActive) {
worker.sendHeartbeat();
requestAnimationFrame(keepWorkerAlive);
}
};
requestAnimationFrame(keepWorkerAlive);

const onconnect = () => {
console.log("Connected to game server!");
this.transport.rejoinGame(this.turnsSeen);
Expand Down Expand Up @@ -511,6 +489,12 @@ export class ClientGameRunner {
if (!this.isActive) return;

this.isActive = false;
if (this.drainPendingUpdatesRafId !== null) {
cancelAnimationFrame(this.drainPendingUpdatesRafId);
this.drainPendingUpdatesRafId = null;
}
this.pendingUpdates = [];
this.pendingUpdateIndex = 0;
this.worker.cleanup();
this.transport.leaveGame();
if (this.connectionCheckInterval) {
Expand All @@ -523,6 +507,97 @@ export class ClientGameRunner {
}
}

private enqueueWorkerUpdate(update: GameUpdateViewData): void {
this.pendingUpdates.push(update);

if (this.drainPendingUpdatesRafId !== null || !this.isActive) {
return;
}

this.drainPendingUpdatesRafId = requestAnimationFrame(() => {
this.drainPendingUpdatesRafId = null;
this.drainPendingUpdates();
});
}

private drainPendingUpdates(): void {
if (!this.isActive) {
return;
}

const backlog = this.pendingUpdates.length - this.pendingUpdateIndex;
if (backlog <= 0) {
return;
}

const startTime = performance.now();
const maxDrainMs = backlog > 200 ? 12 : 8;
const maxUpdates = backlog > 200 ? 1000 : 200;

let processed = 0;
let totalTickExecutionDuration = 0;
let winUpdate: WinUpdate | null = null;

while (
processed < maxUpdates &&
this.pendingUpdateIndex < this.pendingUpdates.length &&
performance.now() - startTime < maxDrainMs
) {
const gu = this.pendingUpdates[this.pendingUpdateIndex];
this.pendingUpdateIndex++;
processed++;

totalTickExecutionDuration += gu.tickExecutionDuration ?? 0;

gu.updates[GameUpdateType.Hash].forEach((hu: HashUpdate) => {
this.eventBus.emit(new SendHashEvent(hu.tick, hu.hash));
});

if (gu.updates[GameUpdateType.Win].length > 0) {
winUpdate = gu.updates[GameUpdateType.Win][0];
}

this.gameView.update(gu);
this.renderer.tick();
}

if (processed > 0) {
const avgTickExecutionDuration = totalTickExecutionDuration / processed;
this.eventBus.emit(
new TickMetricsEvent(avgTickExecutionDuration, this.currentTickDelay),
);
this.currentTickDelay = undefined;

if (winUpdate) {
this.saveGame(winUpdate);
}

// In singleplayer/replay (local server), acknowledge how many turns we've
// actually applied this frame. LocalServer uses this as bounded backpressure.
this.transport.turnComplete(processed);
}

// Compact the queue occasionally to avoid unbounded growth from array holes.
if (this.pendingUpdateIndex >= this.pendingUpdates.length) {
this.pendingUpdates = [];
this.pendingUpdateIndex = 0;
} else if (this.pendingUpdateIndex > 1024) {
this.pendingUpdates = this.pendingUpdates.slice(this.pendingUpdateIndex);
this.pendingUpdateIndex = 0;
}

// Keep draining if we still have backlog.
if (
this.pendingUpdates.length - this.pendingUpdateIndex > 0 &&
this.drainPendingUpdatesRafId === null
) {
this.drainPendingUpdatesRafId = requestAnimationFrame(() => {
this.drainPendingUpdatesRafId = null;
this.drainPendingUpdates();
});
}
}

private inputEvent(event: MouseUpEvent) {
if (!this.isActive || this.renderer.uiState.ghostStructure !== null) {
return;
Expand Down
71 changes: 47 additions & 24 deletions src/client/LocalServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,7 @@ import {
import { getPersistentID } from "./Auth";
import { LobbyConfig } from "./ClientGameRunner";
import { ReplaySpeedChangeEvent } from "./InputHandler";
import {
defaultReplaySpeedMultiplier,
ReplaySpeedMultiplier,
} from "./utilities/ReplaySpeedMultiplier";

// build a small backlog so MAX can catch up.
const MAX_REPLAY_BACKLOG_TURNS = 60;
import { defaultReplaySpeedMultiplier } from "./utilities/ReplaySpeedMultiplier";

export class LocalServer {
// All turns from the game record on replay.
Expand All @@ -36,6 +30,7 @@ export class LocalServer {

private intents: Intent[] = [];
private startedAt: number;
private nextTurnAtMs: number = 0;

private paused = false;
private replaySpeedMultiplier = defaultReplaySpeedMultiplier;
Expand All @@ -44,7 +39,6 @@ export class LocalServer {
private allPlayersStats: AllPlayersStats = {};

private turnsExecuted = 0;
private turnStartTime = 0;

private turnCheckInterval: NodeJS.Timeout;
private clientConnect: () => void;
Expand All @@ -66,30 +60,54 @@ export class LocalServer {

start() {
console.log("local server starting");
this.nextTurnAtMs = Date.now();
this.turnCheckInterval = setInterval(() => {
const turnIntervalMs =
this.lobbyConfig.serverConfig.turnIntervalMs() *
this.replaySpeedMultiplier;
const backlog = Math.max(0, this.turns.length - this.turnsExecuted);
const allowReplayBacklog =
this.replaySpeedMultiplier === ReplaySpeedMultiplier.fastest &&
this.lobbyConfig.gameRecord !== undefined;
const maxBacklog = allowReplayBacklog ? MAX_REPLAY_BACKLOG_TURNS : 0;
if (this.paused) {
return;
}

const baseTurnIntervalMs = this.lobbyConfig.serverConfig.turnIntervalMs();
const turnIntervalMs = baseTurnIntervalMs * this.replaySpeedMultiplier;

// Outstanding work is the number of turns we've emitted that the client hasn't applied yet.
const outstandingTurns = this.turns.length - this.turnsExecuted;

// For ×0.5/×1/×2 we aim to stay close to real time.
// For "fastest" (interval 0), allow a larger but bounded backlog so the sim can sprint
// while the main thread drains updates opportunistically.
const maxOutstandingTurns = turnIntervalMs === 0 ? 200 : 5;
const maxTurnsToEmitPerCheck = turnIntervalMs === 0 ? 200 : 5;

const canQueueNextTurn =
backlog === 0 || (maxBacklog > 0 && backlog < maxBacklog);
if (
canQueueNextTurn &&
Date.now() > this.turnStartTime + turnIntervalMs
if (outstandingTurns >= maxOutstandingTurns) {
return;
}

const now = Date.now();
let emitted = 0;
while (
emitted < maxTurnsToEmitPerCheck &&
this.turns.length - this.turnsExecuted < maxOutstandingTurns &&
(turnIntervalMs === 0 || now >= this.nextTurnAtMs)
) {
this.turnStartTime = Date.now();
// End turn on the server means the client will start processing the turn.
this.endTurn();
emitted++;

if (turnIntervalMs === 0) {
// "Fastest": no wall-clock pacing; rely on backlog caps above.
this.nextTurnAtMs = now;
} else {
// Fixed-rate pacing: do not try to "catch up" after stalls; resume from now.
this.nextTurnAtMs = now + turnIntervalMs;
}
}
}, 5);

this.eventBus.on(ReplaySpeedChangeEvent, (event) => {
this.replaySpeedMultiplier = event.replaySpeedMultiplier;
// Apply speed changes immediately; the next scheduled turn time will be
// recalculated by the interval loop.
this.nextTurnAtMs = Date.now();
});

this.startedAt = Date.now();
Expand Down Expand Up @@ -126,11 +144,13 @@ export class LocalServer {
this.intents.push(clientMsg.intent);
this.endTurn();
this.paused = true;
this.nextTurnAtMs = Date.now();
} else {
// Unpausing: clear pause flag before adding intent so next turn can execute
this.paused = false;
this.intents.push(clientMsg.intent);
this.endTurn();
this.nextTurnAtMs = Date.now();
}
return;
}
Expand Down Expand Up @@ -185,8 +205,11 @@ export class LocalServer {
}

// This is so the client can tell us when it finished processing the turn.
public turnComplete() {
this.turnsExecuted++;
public turnComplete(count: number = 1) {
this.turnsExecuted = Math.min(
this.turnsExecuted + count,
this.turns.length,
);
}

// endTurn in this context means the server has collected all the intents
Expand Down
4 changes: 2 additions & 2 deletions src/client/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,9 @@ export class Transport {
this.connect(this.onconnect, this.onmessage);
}

public turnComplete() {
public turnComplete(count: number = 1) {
if (this.isLocal) {
this.localServer.turnComplete();
this.localServer.turnComplete(count);
}
}

Expand Down
4 changes: 0 additions & 4 deletions src/core/GameRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,6 @@ export class GameRunner {
return true;
}

public pendingTurns(): number {
return Math.max(0, this.turns.length - this.currTurn);
}

public playerActions(
playerID: PlayerID,
x?: number,
Expand Down
40 changes: 26 additions & 14 deletions src/core/worker/Worker.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
const ctx: Worker = self as any;
let gameRunner: Promise<GameRunner> | null = null;
const mapLoader = new FetchGameMapLoader(`/maps`, version);
const MAX_TICKS_PER_HEARTBEAT = 4;
let isProcessingTurns = false;

function gameUpdate(gu: GameUpdateViewData | ErrorUpdate) {
// skip if ErrorUpdate
Expand All @@ -33,23 +33,33 @@ function sendMessage(message: WorkerMessage) {
ctx.postMessage(message);
}

async function processPendingTurns() {
if (isProcessingTurns) {
return;
}
if (!gameRunner) {
return;
}

const gr = await gameRunner;
if (!gr) {
return;
}

isProcessingTurns = true;
try {
while (gr.executeNextTick()) {
// Keep running until no pending turns.
}
} finally {
isProcessingTurns = false;
}
}
Comment on lines +36 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Synchronous loop may block worker for extended periods.

The while (gr.executeNextTick()) loop runs until all pending turns are processed without yielding. If a large backlog exists (e.g., after reconnect or replay fast-forward), this could block the worker thread and delay handling of other messages like player_actions or player_profile.

Consider yielding periodically or batching with a limit per invocation.

Proposed batched approach
 async function processPendingTurns() {
   if (isProcessingTurns) {
     return;
   }
   if (!gameRunner) {
     return;
   }

   const gr = await gameRunner;
   if (!gr) {
     return;
   }

   isProcessingTurns = true;
   try {
-    while (gr.executeNextTick()) {
-      // Keep running until no pending turns.
-    }
+    const maxTicksPerBatch = 100;
+    let processed = 0;
+    while (processed < maxTicksPerBatch && gr.executeNextTick()) {
+      processed++;
+    }
+    // Reschedule if more work remains
+    if (processed === maxTicksPerBatch) {
+      setTimeout(processPendingTurns, 0);
+    }
   } finally {
     isProcessingTurns = false;
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async function processPendingTurns() {
if (isProcessingTurns) {
return;
}
if (!gameRunner) {
return;
}
const gr = await gameRunner;
if (!gr) {
return;
}
isProcessingTurns = true;
try {
while (gr.executeNextTick()) {
// Keep running until no pending turns.
}
} finally {
isProcessingTurns = false;
}
}
async function processPendingTurns() {
if (isProcessingTurns) {
return;
}
if (!gameRunner) {
return;
}
const gr = await gameRunner;
if (!gr) {
return;
}
isProcessingTurns = true;
try {
const maxTicksPerBatch = 100;
let processed = 0;
while (processed < maxTicksPerBatch && gr.executeNextTick()) {
processed++;
}
// Reschedule if more work remains
if (processed === maxTicksPerBatch) {
setTimeout(processPendingTurns, 0);
}
} finally {
isProcessingTurns = false;
}
}
🤖 Prompt for AI Agents
In `@src/core/worker/Worker.worker.ts` around lines 36 - 57, The current
processPendingTurns function blocks the worker by running while
(gr.executeNextTick()) until all turns complete; update processPendingTurns (and
the isProcessingTurns guard) to process turns in bounded batches and yield
between batches (e.g., run up to N iterations of gr.executeNextTick() per
invocation and await a short tick or next-microtask before continuing) so other
messages (player_actions, player_profile) can be processed; refer to
processPendingTurns, isProcessingTurns, gameRunner and the gr.executeNextTick()
call and implement a configurable batch limit and an async yield
(setImmediate/timeout or Promise.resolve) between batches.


ctx.addEventListener("message", async (e: MessageEvent<MainThreadMessage>) => {
const message = e.data;

switch (message.type) {
case "heartbeat": {
const gr = await gameRunner;
if (!gr) {
break;
}
const ticksToRun = Math.min(gr.pendingTurns(), MAX_TICKS_PER_HEARTBEAT);
for (let i = 0; i < ticksToRun; i++) {
if (!gr.executeNextTick()) {
break;
}
}
break;
}
case "init":
try {
gameRunner = createGameRunner(
Expand All @@ -62,6 +72,7 @@ ctx.addEventListener("message", async (e: MessageEvent<MainThreadMessage>) => {
type: "initialized",
id: message.id,
} as InitializedMessage);
processPendingTurns();
return gr;
});
} catch (error) {
Expand All @@ -78,6 +89,7 @@ ctx.addEventListener("message", async (e: MessageEvent<MainThreadMessage>) => {
try {
const gr = await gameRunner;
await gr.addTurn(message.turn);
processPendingTurns();
} catch (error) {
console.error("Failed to process turn:", error);
throw error;
Expand Down
6 changes: 0 additions & 6 deletions src/core/worker/WorkerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,6 @@ export class WorkerClient {
});
}

sendHeartbeat() {
this.worker.postMessage({
type: "heartbeat",
});
}

playerProfile(playerID: number): Promise<PlayerProfile> {
return new Promise((resolve, reject) => {
if (!this.isInitialized) {
Expand Down
Loading
Loading