diff --git a/src/server/MasterMetrics.ts b/src/server/MasterMetrics.ts index bfc667d80..ebf0253f9 100644 --- a/src/server/MasterMetrics.ts +++ b/src/server/MasterMetrics.ts @@ -12,61 +12,171 @@ const metricsServer = http.createServer(metricsApp); // Initialize the Prometheus registry for the master's own metrics const register = new promClient.Registry(); +// Default Prometheus metrics +promClient.collectDefaultMetrics({ register }); + // Prometheus metrics endpoint that gathers metrics from workers export function setupMetricsServer() { metricsApp.get("/metrics", async (req, res) => { + // Set a timeout for the request to avoid hanging + const timeout = setTimeout(() => { + res.status(500).end("# Error: Request timed out after 30 seconds"); + }, 30000); console.log("Metrics requested"); try { // Get the master's metrics const masterMetrics = await register.metrics(); - // Collect metrics from all workers - const workerMetricsPromises = []; + // Track seen metric names to avoid duplicate metadata + const seenMetrics = new Set(); + const processedLines = []; + const allMetricValues = []; - // For each worker, fetch their metrics + // Process all metadata information in the master metrics first + const masterLines = masterMetrics.split("\n"); + + for (let j = 0; j < masterLines.length; j++) { + const line = masterLines[j]; + + if (line.startsWith("# HELP ")) { + const metricName = line.split(" ")[2]; + seenMetrics.add(metricName); + processedLines.push(line); + } else if (line.startsWith("# TYPE ")) { + const metricName = line.split(" ")[2]; + if (seenMetrics.has(metricName)) { + processedLines.push(line); + } + } else if (line.trim() && !line.startsWith("#")) { + // Add worker label to each metric line and collect for later + const processedLine = line.replace( + /^([a-z][a-z0-9_]*)(?:{([^}]*)})?(\s+[0-9\.e+-]+.*)/, + (match, metricName, existingLabels, valueAndRest) => { + if (existingLabels) { + return `${metricName}{${existingLabels},worker="master"}${valueAndRest}`; + } else { + return `${metricName}{worker="master"}${valueAndRest}`; + } + }, + ); + allMetricValues.push(processedLine); + } + } + + // Collect metrics from all workers for (let i = 0; i < config.numWorkers(); i++) { const workerPort = config.workerPortByIndex(i); const workerUrl = `http://localhost:${workerPort}/metrics`; console.log(`Fetching metrics from worker ${i} at ${workerUrl}`); - const workerMetricsPromise = fetch(workerUrl, { - headers: { - [config.adminHeader()]: config.adminToken(), - }, - }) - .then((response) => { - if (!response.ok) { - throw new Error(`Worker ${i} returned status ${response.status}`); - } - return response.text(); - }) - .then((metricsText) => { - // Add worker label to each metric line - return metricsText.replace( - /^([a-z][a-z0-9_]*(?:{[^}]*})?)\s/gm, - `$1{worker="worker-${i}"} `, - ); - }) - .catch((error) => { - console.error(`Error fetching metrics from worker ${i}:`, error); - return `# Error fetching metrics from worker ${i}: ${error.message}`; + + try { + const response = await fetch(workerUrl, { + headers: { + [config.adminHeader()]: config.adminToken(), + }, }); - workerMetricsPromises.push(workerMetricsPromise); + + if (!response.ok) { + console.error(`Worker ${i} returned status ${response.status}`); + continue; + } + + const metricsText = await response.text(); + const lines = metricsText.split("\n"); + + for (let j = 0; j < lines.length; j++) { + const line = lines[j]; + + // Collect HELP and TYPE info if we haven't seen this metric before + if (line.startsWith("# HELP ")) { + const metricName = line.split(" ")[2]; + if (!seenMetrics.has(metricName)) { + seenMetrics.add(metricName); + processedLines.push(line); + } + } else if (line.startsWith("# TYPE ")) { + const metricName = line.split(" ")[2]; + if ( + seenMetrics.has(metricName) && + !processedLines.some((l) => + l.startsWith(`# TYPE ${metricName}`), + ) + ) { + processedLines.push(line); + } + } else if (line.trim() && !line.startsWith("#")) { + // Process and collect actual metric values + try { + const processedLine = line.replace( + /^([a-z][a-z0-9_]*)(?:{([^}]*)})?(\s+[0-9\.e+-]+.*)/, + (match, metricName, existingLabels, valueAndRest) => { + if (existingLabels) { + return `${metricName}{${existingLabels},worker="worker-${i}"}${valueAndRest}`; + } else { + return `${metricName}{worker="worker-${i}"}${valueAndRest}`; + } + }, + ); + + // Make sure the line was actually processed (regex matched) + if (processedLine !== line) { + allMetricValues.push(processedLine); + } else if ( + line.match(/^[a-z][a-z0-9_]*(?:{[^}]*})?\s+[0-9\.e+-]+.*/) + ) { + // This looks like a metric line but didn't match our regex, try a more general approach + const parts = line.split(/({|\s+)/); + if (parts.length >= 3) { + const metricName = parts[0]; + if (line.includes("{")) { + // Has labels + const labelEndIndex = line.indexOf("}"); + const valueStartIndex = labelEndIndex + 1; + if (labelEndIndex > 0 && valueStartIndex < line.length) { + const labels = line.substring( + line.indexOf("{") + 1, + labelEndIndex, + ); + const valueAndRest = line.substring(valueStartIndex); + allMetricValues.push( + `${metricName}{${labels},worker="worker-${i}"}${valueAndRest}`, + ); + } + } else { + // No labels + const valueAndRest = line.substring(metricName.length); + allMetricValues.push( + `${metricName}{worker="worker-${i}"}${valueAndRest}`, + ); + } + } + } + } catch (error) { + console.error(`Error processing metric line: ${line}`, error); + // Skip this line if there's an error + } + } + } + } catch (error) { + console.error(`Error fetching metrics from worker ${i}:`, error); + allMetricValues.push( + `# Error fetching metrics from worker ${i}: ${error.message}`, + ); + } } - // Wait for all worker metrics to be fetched - const workerMetricsArray = await Promise.all(workerMetricsPromises); - - // Add worker label to the master metrics - const masterMetricsWithLabel = masterMetrics.replace( - /^([a-z][a-z0-9_]*(?:{[^}]*})?)\s/gm, - `$1{worker="master"} `, + // Combine metadata with all metric values and ensure it ends with a newline + const combinedMetrics = [...processedLines, ...allMetricValues].join( + "\n", ); - // Combine all metrics and send the response + // Send the combined response with a final newline to prevent unexpected end of input + clearTimeout(timeout); res.set("Content-Type", register.contentType); - res.end(`${masterMetricsWithLabel}\n${workerMetricsArray.join("\n")}`); + res.end(combinedMetrics + "\n"); } catch (error) { console.error("Error collecting metrics:", error); + clearTimeout(timeout); res.status(500).end(`# Error collecting metrics: ${error.message}`); } });