use S3 & reshift to archive

This commit is contained in:
Evan
2025-03-03 11:53:28 -08:00
parent 9d992c29e3
commit 1132dc5ed9
3 changed files with 1748 additions and 378 deletions
+1673 -217
View File
File diff suppressed because it is too large Load Diff
+2 -2
View File
@@ -66,10 +66,10 @@
"worker-loader": "^3.0.8"
},
"dependencies": {
"@aws-sdk/client-redshift-data": "^3.758.0",
"@aws-sdk/client-s3": "^3.758.0",
"@datastructures-js/priority-queue": "^6.3.1",
"@google-cloud/bigquery": "^7.9.1",
"@google-cloud/secret-manager": "^5.6.0",
"@google-cloud/storage": "^7.14.0",
"@types/dompurify": "^3.0.5",
"@types/express": "^4.17.21",
"@types/google-protobuf": "^3.15.12",
+73 -159
View File
@@ -1,39 +1,27 @@
import {
GameConfig,
GameID,
GameRecord,
GameRecordSchema,
Turn,
} from "../core/Schemas";
import { Storage } from "@google-cloud/storage";
import { BigQuery } from "@google-cloud/bigquery";
import { GameRecord, GameID } from "../core/Schemas";
import { S3 } from "@aws-sdk/client-s3";
import { RedshiftData } from "@aws-sdk/client-redshift-data";
const storage = new Storage();
const bucket = storage.bucket("openfront-games");
const bigquery = new BigQuery();
// Initialize AWS clients
const s3 = new S3();
const bucket = "openfront-games";
const redshiftData = new RedshiftData({ region: "eu-west-1" });
const MAX_RETRIES = 5;
const INITIAL_RETRY_DELAY_MS = 1000; // Start with 1 second delay
// Redshift Serverless configuration
const REDSHIFT_WORKGROUP = "game-analytics";
const REDSHIFT_DATABASE = "game_archive";
export async function archive(gameRecord: GameRecord) {
try {
// First archive to BigQuery with retries
await withRetry(
() => archiveToBigQuery(gameRecord),
"BigQuery archive",
gameRecord.id,
);
// Archive to Redshift Serverless
await archiveToRedshift(gameRecord);
// Then archive to GCS with retries if there are turns
// Archive to S3 if there are turns
if (gameRecord.turns.length > 0) {
console.log(
`${gameRecord.id}: game has more than zero turns, attempting to write to GCS`,
);
await withRetry(
() => archiveToGCS(gameRecord),
"GCS archive",
gameRecord.id,
`${gameRecord.id}: game has more than zero turns, attempting to write to S3`,
);
await archiveToS3(gameRecord);
}
} catch (error) {
console.error(`${gameRecord.id}: Final archive error: ${error}`, {
@@ -45,44 +33,7 @@ export async function archive(gameRecord: GameRecord) {
}
}
async function delay(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function withRetry<T>(
operation: () => Promise<T>,
operationName: string,
gameId: string,
): Promise<T> {
let lastError: Error | null = null;
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error as Error;
if (attempt < MAX_RETRIES) {
const backoffDelay = INITIAL_RETRY_DELAY_MS * Math.pow(2, attempt);
const jitter = Math.random() * 1000;
const totalDelay = backoffDelay + jitter;
console.log(
`${gameId}: ${operationName} attempt ${attempt + 1} failed with ${error}. Retrying in ${Math.round(totalDelay)}ms...`,
);
await delay(totalDelay);
}
}
}
console.error(
`${gameId}: All ${MAX_RETRIES + 1} ${operationName} attempts failed. Last error:`,
lastError,
);
throw lastError;
}
async function archiveToBigQuery(gameRecord: GameRecord) {
async function archiveToRedshift(gameRecord: GameRecord) {
const row = {
id: gameRecord.id,
start: new Date(gameRecord.startTimestampMS),
@@ -93,24 +44,44 @@ async function archiveToBigQuery(gameRecord: GameRecord) {
winner: gameRecord.winner,
difficulty: gameRecord.gameConfig.difficulty,
map: gameRecord.gameConfig.gameMap,
players: gameRecord.players.map((p) => ({
username: p.username,
ip: anonymizeIP(p.ip),
persistentID: p.persistentID,
clientID: p.clientID,
})),
players: JSON.stringify(
gameRecord.players.map((p) => ({
username: p.username,
ip: p.ip,
persistentID: p.persistentID,
clientID: p.clientID,
})),
),
};
const [apiResponse] = await bigquery
.dataset("game_archive")
.table("game_results")
.insert([row]);
// Convert the row to SQL parameters for insertion
const params = {
Sql: `
INSERT INTO game_results (
id, start, end, duration_seconds, number_turns, game_mode,
winner, difficulty, map, players
) VALUES (
'${row.id}',
'${row.start.toISOString()}',
'${row.end.toISOString()}',
${row.duration_seconds},
${row.number_turns},
'${row.game_mode}',
'${row.winner}',
'${row.difficulty}',
'${row.map}',
JSON_PARSE('${row.players}')
)
`,
WorkgroupName: REDSHIFT_WORKGROUP,
Database: REDSHIFT_DATABASE,
};
console.log(`${gameRecord.id}: wrote game metadata to BigQuery`);
return apiResponse;
await redshiftData.executeStatement(params);
console.log(`${gameRecord.id}: wrote game metadata to Redshift`);
}
async function archiveToGCS(gameRecord: GameRecord) {
async function archiveToS3(gameRecord: GameRecord) {
// Create a deep copy to avoid modifying the original
const recordCopy = JSON.parse(JSON.stringify(gameRecord));
@@ -120,35 +91,36 @@ async function archiveToGCS(gameRecord: GameRecord) {
p.persistentID = "REDACTED";
});
const file = bucket.file(recordCopy.id);
try {
await file.save(JSON.stringify(recordCopy), {
contentType: "application/json",
await s3.putObject({
Bucket: bucket,
Key: recordCopy.id,
Body: JSON.stringify(recordCopy),
ContentType: "application/json",
});
} catch (error) {
console.log(`error saving game ${gameRecord.id}`);
throw error;
}
console.log(`${gameRecord.id}: game record successfully written to GCS`);
console.log(`${gameRecord.id}: game record successfully written to S3`);
}
export async function readGameRecord(gameId: GameID): Promise<GameRecord> {
try {
const file = bucket.file(gameId);
// Check if file exists and download in one operation
const response = await s3.getObject({
Bucket: bucket,
Key: gameId,
});
// Check if file exists
const [exists] = await file.exists();
if (!exists) {
throw new Error(`Game record ${gameId} not found in GCS`);
}
// Download and parse file content
const [content] = await file.download();
const gameRecord = JSON.parse(content.toString());
// Parse the response body
const bodyContents = await response.Body.transformToString();
const gameRecord = JSON.parse(bodyContents);
return gameRecord as GameRecord;
} catch (error) {
console.error(`${gameId}: Error reading game record from GCS: ${error}`, {
console.error(`${gameId}: Error reading game record from S3: ${error}`, {
message: error?.message || error,
stack: error?.stack,
name: error?.name,
@@ -160,10 +132,15 @@ export async function readGameRecord(gameId: GameID): Promise<GameRecord> {
export async function gameRecordExists(gameId: GameID): Promise<boolean> {
try {
const file = bucket.file(gameId);
const [exists] = await file.exists();
return exists;
await s3.headObject({
Bucket: bucket,
Key: gameId,
});
return true;
} catch (error) {
if (error.name === "NotFound") {
return false;
}
console.error(`${gameId}: Error checking archive existence: ${error}`, {
message: error?.message || error,
stack: error?.stack,
@@ -173,66 +150,3 @@ export async function gameRecordExists(gameId: GameID): Promise<boolean> {
return false;
}
}
function anonymizeIPv4(ipv4: string): string | null {
const ipv4Regex = /^(\d{1,3}\.){3}\d{1,3}$/;
if (!ipv4Regex.test(ipv4)) {
return null;
}
const octets = ipv4.split(".");
if (
!octets.every((octet) => {
const num = parseInt(octet);
return num >= 0 && num <= 255;
})
) {
return null;
}
octets[3] = "xxx";
return octets.join(".");
}
function anonymizeIPv6(ipv6: string): string | null {
const ipv6Regex = /^(?:[A-F0-9]{1,4}:){7}[A-F0-9]{1,4}$/i;
const normalizedIPv6 = ipv6
.toUpperCase()
.replace(/([^:]):([^:])/g, "$1:0$2")
.replace(/::/, ":0000:");
if (!ipv6Regex.test(normalizedIPv6)) {
return null;
}
const segments = normalizedIPv6.split(":");
if (
!segments.every((segment) => {
const hex = parseInt(segment, 16);
return hex >= 0 && hex <= 65535;
})
) {
return null;
}
for (let i = 4; i < 8; i++) {
segments[i] = "xxxx";
}
return segments.join(":");
}
function anonymizeIP(ip: string): string | null {
const ipv4Result = anonymizeIPv4(ip);
if (ipv4Result) {
return ipv4Result;
}
const ipv6 = anonymizeIPv6(ip);
return ipv6;
}