mirror of
https://github.com/openfrontio/OpenFrontIO.git
synced 2026-06-21 09:30:45 +00:00
use S3 & reshift to archive
This commit is contained in:
Generated
+1673
-217
File diff suppressed because it is too large
Load Diff
+2
-2
@@ -65,10 +65,10 @@
|
||||
"worker-loader": "^3.0.8"
|
||||
},
|
||||
"dependencies": {
|
||||
"@aws-sdk/client-redshift-data": "^3.758.0",
|
||||
"@aws-sdk/client-s3": "^3.758.0",
|
||||
"@datastructures-js/priority-queue": "^6.3.1",
|
||||
"@google-cloud/bigquery": "^7.9.1",
|
||||
"@google-cloud/secret-manager": "^5.6.0",
|
||||
"@google-cloud/storage": "^7.14.0",
|
||||
"@types/dompurify": "^3.0.5",
|
||||
"@types/express": "^4.17.21",
|
||||
"@types/google-protobuf": "^3.15.12",
|
||||
|
||||
+73
-159
@@ -1,39 +1,27 @@
|
||||
import {
|
||||
GameConfig,
|
||||
GameID,
|
||||
GameRecord,
|
||||
GameRecordSchema,
|
||||
Turn,
|
||||
} from "../core/Schemas";
|
||||
import { Storage } from "@google-cloud/storage";
|
||||
import { BigQuery } from "@google-cloud/bigquery";
|
||||
import { GameRecord, GameID } from "../core/Schemas";
|
||||
import { S3 } from "@aws-sdk/client-s3";
|
||||
import { RedshiftData } from "@aws-sdk/client-redshift-data";
|
||||
|
||||
const storage = new Storage();
|
||||
const bucket = storage.bucket("openfront-games");
|
||||
const bigquery = new BigQuery();
|
||||
// Initialize AWS clients
|
||||
const s3 = new S3();
|
||||
const bucket = "openfront-games";
|
||||
const redshiftData = new RedshiftData({ region: "eu-west-1" });
|
||||
|
||||
const MAX_RETRIES = 5;
|
||||
const INITIAL_RETRY_DELAY_MS = 1000; // Start with 1 second delay
|
||||
// Redshift Serverless configuration
|
||||
const REDSHIFT_WORKGROUP = "game-analytics";
|
||||
const REDSHIFT_DATABASE = "game_archive";
|
||||
|
||||
export async function archive(gameRecord: GameRecord) {
|
||||
try {
|
||||
// First archive to BigQuery with retries
|
||||
await withRetry(
|
||||
() => archiveToBigQuery(gameRecord),
|
||||
"BigQuery archive",
|
||||
gameRecord.id,
|
||||
);
|
||||
// Archive to Redshift Serverless
|
||||
await archiveToRedshift(gameRecord);
|
||||
|
||||
// Then archive to GCS with retries if there are turns
|
||||
// Archive to S3 if there are turns
|
||||
if (gameRecord.turns.length > 0) {
|
||||
console.log(
|
||||
`${gameRecord.id}: game has more than zero turns, attempting to write to GCS`,
|
||||
);
|
||||
await withRetry(
|
||||
() => archiveToGCS(gameRecord),
|
||||
"GCS archive",
|
||||
gameRecord.id,
|
||||
`${gameRecord.id}: game has more than zero turns, attempting to write to S3`,
|
||||
);
|
||||
await archiveToS3(gameRecord);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`${gameRecord.id}: Final archive error: ${error}`, {
|
||||
@@ -45,44 +33,7 @@ export async function archive(gameRecord: GameRecord) {
|
||||
}
|
||||
}
|
||||
|
||||
async function delay(ms: number) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
async function withRetry<T>(
|
||||
operation: () => Promise<T>,
|
||||
operationName: string,
|
||||
gameId: string,
|
||||
): Promise<T> {
|
||||
let lastError: Error | null = null;
|
||||
|
||||
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error) {
|
||||
lastError = error as Error;
|
||||
|
||||
if (attempt < MAX_RETRIES) {
|
||||
const backoffDelay = INITIAL_RETRY_DELAY_MS * Math.pow(2, attempt);
|
||||
const jitter = Math.random() * 1000;
|
||||
const totalDelay = backoffDelay + jitter;
|
||||
|
||||
console.log(
|
||||
`${gameId}: ${operationName} attempt ${attempt + 1} failed with ${error}. Retrying in ${Math.round(totalDelay)}ms...`,
|
||||
);
|
||||
await delay(totalDelay);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
console.error(
|
||||
`${gameId}: All ${MAX_RETRIES + 1} ${operationName} attempts failed. Last error:`,
|
||||
lastError,
|
||||
);
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
async function archiveToBigQuery(gameRecord: GameRecord) {
|
||||
async function archiveToRedshift(gameRecord: GameRecord) {
|
||||
const row = {
|
||||
id: gameRecord.id,
|
||||
start: new Date(gameRecord.startTimestampMS),
|
||||
@@ -93,24 +44,44 @@ async function archiveToBigQuery(gameRecord: GameRecord) {
|
||||
winner: gameRecord.winner,
|
||||
difficulty: gameRecord.gameConfig.difficulty,
|
||||
map: gameRecord.gameConfig.gameMap,
|
||||
players: gameRecord.players.map((p) => ({
|
||||
username: p.username,
|
||||
ip: anonymizeIP(p.ip),
|
||||
persistentID: p.persistentID,
|
||||
clientID: p.clientID,
|
||||
})),
|
||||
players: JSON.stringify(
|
||||
gameRecord.players.map((p) => ({
|
||||
username: p.username,
|
||||
ip: p.ip,
|
||||
persistentID: p.persistentID,
|
||||
clientID: p.clientID,
|
||||
})),
|
||||
),
|
||||
};
|
||||
|
||||
const [apiResponse] = await bigquery
|
||||
.dataset("game_archive")
|
||||
.table("game_results")
|
||||
.insert([row]);
|
||||
// Convert the row to SQL parameters for insertion
|
||||
const params = {
|
||||
Sql: `
|
||||
INSERT INTO game_results (
|
||||
id, start, end, duration_seconds, number_turns, game_mode,
|
||||
winner, difficulty, map, players
|
||||
) VALUES (
|
||||
'${row.id}',
|
||||
'${row.start.toISOString()}',
|
||||
'${row.end.toISOString()}',
|
||||
${row.duration_seconds},
|
||||
${row.number_turns},
|
||||
'${row.game_mode}',
|
||||
'${row.winner}',
|
||||
'${row.difficulty}',
|
||||
'${row.map}',
|
||||
JSON_PARSE('${row.players}')
|
||||
)
|
||||
`,
|
||||
WorkgroupName: REDSHIFT_WORKGROUP,
|
||||
Database: REDSHIFT_DATABASE,
|
||||
};
|
||||
|
||||
console.log(`${gameRecord.id}: wrote game metadata to BigQuery`);
|
||||
return apiResponse;
|
||||
await redshiftData.executeStatement(params);
|
||||
console.log(`${gameRecord.id}: wrote game metadata to Redshift`);
|
||||
}
|
||||
|
||||
async function archiveToGCS(gameRecord: GameRecord) {
|
||||
async function archiveToS3(gameRecord: GameRecord) {
|
||||
// Create a deep copy to avoid modifying the original
|
||||
const recordCopy = JSON.parse(JSON.stringify(gameRecord));
|
||||
|
||||
@@ -120,35 +91,36 @@ async function archiveToGCS(gameRecord: GameRecord) {
|
||||
p.persistentID = "REDACTED";
|
||||
});
|
||||
|
||||
const file = bucket.file(recordCopy.id);
|
||||
try {
|
||||
await file.save(JSON.stringify(recordCopy), {
|
||||
contentType: "application/json",
|
||||
await s3.putObject({
|
||||
Bucket: bucket,
|
||||
Key: recordCopy.id,
|
||||
Body: JSON.stringify(recordCopy),
|
||||
ContentType: "application/json",
|
||||
});
|
||||
} catch (error) {
|
||||
console.log(`error saving game ${gameRecord.id}`);
|
||||
throw error;
|
||||
}
|
||||
|
||||
console.log(`${gameRecord.id}: game record successfully written to GCS`);
|
||||
console.log(`${gameRecord.id}: game record successfully written to S3`);
|
||||
}
|
||||
|
||||
export async function readGameRecord(gameId: GameID): Promise<GameRecord> {
|
||||
try {
|
||||
const file = bucket.file(gameId);
|
||||
// Check if file exists and download in one operation
|
||||
const response = await s3.getObject({
|
||||
Bucket: bucket,
|
||||
Key: gameId,
|
||||
});
|
||||
|
||||
// Check if file exists
|
||||
const [exists] = await file.exists();
|
||||
if (!exists) {
|
||||
throw new Error(`Game record ${gameId} not found in GCS`);
|
||||
}
|
||||
|
||||
// Download and parse file content
|
||||
const [content] = await file.download();
|
||||
const gameRecord = JSON.parse(content.toString());
|
||||
// Parse the response body
|
||||
const bodyContents = await response.Body.transformToString();
|
||||
const gameRecord = JSON.parse(bodyContents);
|
||||
|
||||
return gameRecord as GameRecord;
|
||||
} catch (error) {
|
||||
console.error(`${gameId}: Error reading game record from GCS: ${error}`, {
|
||||
console.error(`${gameId}: Error reading game record from S3: ${error}`, {
|
||||
message: error?.message || error,
|
||||
stack: error?.stack,
|
||||
name: error?.name,
|
||||
@@ -160,10 +132,15 @@ export async function readGameRecord(gameId: GameID): Promise<GameRecord> {
|
||||
|
||||
export async function gameRecordExists(gameId: GameID): Promise<boolean> {
|
||||
try {
|
||||
const file = bucket.file(gameId);
|
||||
const [exists] = await file.exists();
|
||||
return exists;
|
||||
await s3.headObject({
|
||||
Bucket: bucket,
|
||||
Key: gameId,
|
||||
});
|
||||
return true;
|
||||
} catch (error) {
|
||||
if (error.name === "NotFound") {
|
||||
return false;
|
||||
}
|
||||
console.error(`${gameId}: Error checking archive existence: ${error}`, {
|
||||
message: error?.message || error,
|
||||
stack: error?.stack,
|
||||
@@ -173,66 +150,3 @@ export async function gameRecordExists(gameId: GameID): Promise<boolean> {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function anonymizeIPv4(ipv4: string): string | null {
|
||||
const ipv4Regex = /^(\d{1,3}\.){3}\d{1,3}$/;
|
||||
|
||||
if (!ipv4Regex.test(ipv4)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const octets = ipv4.split(".");
|
||||
|
||||
if (
|
||||
!octets.every((octet) => {
|
||||
const num = parseInt(octet);
|
||||
return num >= 0 && num <= 255;
|
||||
})
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
|
||||
octets[3] = "xxx";
|
||||
|
||||
return octets.join(".");
|
||||
}
|
||||
|
||||
function anonymizeIPv6(ipv6: string): string | null {
|
||||
const ipv6Regex = /^(?:[A-F0-9]{1,4}:){7}[A-F0-9]{1,4}$/i;
|
||||
|
||||
const normalizedIPv6 = ipv6
|
||||
.toUpperCase()
|
||||
.replace(/([^:]):([^:])/g, "$1:0$2")
|
||||
.replace(/::/, ":0000:");
|
||||
|
||||
if (!ipv6Regex.test(normalizedIPv6)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const segments = normalizedIPv6.split(":");
|
||||
|
||||
if (
|
||||
!segments.every((segment) => {
|
||||
const hex = parseInt(segment, 16);
|
||||
return hex >= 0 && hex <= 65535;
|
||||
})
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
|
||||
for (let i = 4; i < 8; i++) {
|
||||
segments[i] = "xxxx";
|
||||
}
|
||||
|
||||
return segments.join(":");
|
||||
}
|
||||
|
||||
function anonymizeIP(ip: string): string | null {
|
||||
const ipv4Result = anonymizeIPv4(ip);
|
||||
if (ipv4Result) {
|
||||
return ipv4Result;
|
||||
}
|
||||
|
||||
const ipv6 = anonymizeIPv6(ip);
|
||||
return ipv6;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user