retry archiving to bigquery & gcs

This commit is contained in:
Evan
2024-12-21 19:44:51 -08:00
parent 44afb2361b
commit 361b1f7d6c
+104 -76
View File
@@ -2,94 +2,122 @@ import { GameConfig, GameID, GameRecord, GameRecordSchema, Turn } from "../core/
import { Storage } from '@google-cloud/storage';
import { BigQuery } from '@google-cloud/bigquery';
const storage = new Storage();
const bucket = storage.bucket("openfront-games");
const bigquery = new BigQuery();
const MAX_RETRIES = 5;
const INITIAL_RETRY_DELAY_MS = 1000; // Start with 1 second delay
export async function archive(gameRecord: GameRecord) {
try {
// Save metadata to BigQuery
const row = {
id: gameRecord.id,
start: new Date(gameRecord.startTimestampMS),
end: new Date(gameRecord.endTimestampMS),
duration_seconds: gameRecord.durationSeconds,
number_turns: gameRecord.num_turns,
game_mode: gameRecord.gameConfig.gameType,
winner: null,
difficulty: gameRecord.gameConfig.difficulty,
map: gameRecord.gameConfig.gameMap,
players: gameRecord.players.map(p => ({
username: p.username,
// Masks last couple of bits from ip for
// user privacy.
ip: anonymizeIP(p.ip),
persistentID: p.persistentID,
clientID: p.clientID,
})),
};
const [apiResponse] = await bigquery
.dataset('game_archive')
.table('game_results')
.insert([row]);
console.log(`wrote game metadata to BigQuery: ${gameRecord.id}`);
// First archive to BigQuery with retries
await withRetry(
() => archiveToBigQuery(gameRecord),
'BigQuery archive',
gameRecord.id
);
// Then archive to GCS with retries if there are turns
if (gameRecord.turns.length > 0) {
console.log(`${gameRecord.id}: game has more than zero turns, attempting to write to gcs`)
// Players may see this so make sure to clear PII
gameRecord.players.forEach(p => {
p.ip = "REDACTED"
p.persistentID = "REDACTED"
});
console.log(`writing game ${gameRecord.id} to gcs`);
const bucket = storage.bucket("openfront-games");
const file = bucket.file(gameRecord.id);
await file.save(JSON.stringify(GameRecordSchema.parse(gameRecord)), {
contentType: 'application/json'
});
console.log(`${gameRecord.id}: game record successfully writting to gcs`)
console.log(`${gameRecord.id}: game has more than zero turns, attempting to write to GCS`);
await withRetry(
() => archiveToGCS(gameRecord),
'GCS archive',
gameRecord.id
);
}
} catch (error) {
try {
console.error(`Error archiving game ${gameRecord.id}:`);
if (Array.isArray(error?.errors)) {
// Handle BigQuery insertion errors which come as an array
error.errors.forEach((err, index) => {
console.error(`${gameRecord.id}: Archive Error ${index + 1}:`, {
reason: err.reason,
message: err.message,
location: err.location,
debugInfo: err.debugInfo
});
});
} else if (error?.code) {
// Handle Google Cloud Storage errors which typically have error codes
console.error(`${gameRecord.id}: Archive: Storage error:`, {
code: error.code,
message: error.message,
stack: error.stack,
details: error.errors
});
} else {
// Handle generic errors
console.error(`${gameRecord.id}: Archive: Unexpected error:`, {
message: error?.message || error,
stack: error?.stack,
name: error?.name,
...(error && typeof error === 'object' ? error : {})
});
}
} catch (error) {
console.log(`error handling archive error: ${error}`)
}
console.error(`${gameRecord.id}: Final archive error: ${error}`, {
message: error?.message || error,
stack: error?.stack,
name: error?.name,
...(error && typeof error === 'object' ? error : {})
});
throw error;
}
}
async function delay(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async function withRetry<T>(
operation: () => Promise<T>,
operationName: string,
gameId: string,
): Promise<T> {
let lastError: Error | null = null;
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error as Error;
if (attempt < MAX_RETRIES) {
const backoffDelay = INITIAL_RETRY_DELAY_MS * Math.pow(2, attempt);
const jitter = Math.random() * 1000;
const totalDelay = backoffDelay + jitter;
console.log(`${gameId}: ${operationName} attempt ${attempt + 1} failed with ${error}. Retrying in ${Math.round(totalDelay)}ms...`);
await delay(totalDelay);
}
}
}
console.error(`${gameId}: All ${MAX_RETRIES + 1} ${operationName} attempts failed. Last error:`, lastError);
throw lastError;
}
async function archiveToBigQuery(gameRecord: GameRecord) {
const row = {
id: gameRecord.id,
start: new Date(gameRecord.startTimestampMS),
end: new Date(gameRecord.endTimestampMS),
duration_seconds: gameRecord.durationSeconds,
number_turns: gameRecord.num_turns,
game_mode: gameRecord.gameConfig.gameType,
winner: null,
difficulty: gameRecord.gameConfig.difficulty,
map: gameRecord.gameConfig.gameMap,
players: gameRecord.players.map(p => ({
username: p.username,
ip: anonymizeIP(p.ip),
persistentID: p.persistentID,
clientID: p.clientID,
})),
};
const [apiResponse] = await bigquery
.dataset('game_archive')
.table('game_results')
.insert([row]);
console.log(`${gameRecord.id}: wrote game metadata to BigQuery`);
return apiResponse;
}
async function archiveToGCS(gameRecord: GameRecord) {
// Create a deep copy to avoid modifying the original
const recordCopy = JSON.parse(JSON.stringify(gameRecord));
// Players may see this so make sure to clear PII
recordCopy.players.forEach(p => {
p.ip = "REDACTED";
p.persistentID = "REDACTED";
});
const file = bucket.file(recordCopy.id);
await file.save(JSON.stringify(GameRecordSchema.parse(recordCopy)), {
contentType: 'application/json'
});
console.log(`${gameRecord.id}: game record successfully written to GCS`);
}
function anonymizeIP(ip: string): string {
// IPv4 regex that validates octets are 0-255
const ipv4Regex = /^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$/;