From 8887ff6afcadadfdb5cd6097b3998a9b06de15dc Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Fri, 20 Jun 2025 18:48:37 +0000 Subject: [PATCH] feat: Implement iperf3 exporter core logic and log level configuration This commit completes the core functionality of the iperf3 exporter and adds flexible log level configuration. Key changes: - Added command-line (`--log-level`) and environment variable (`LOG_LEVEL`) options to configure the logging level. - Implemented the main test orchestration loop (`main_loop`) which: - Discovers iperf3 server pods via the Kubernetes API. - Periodically runs iperf3 tests (TCP/UDP) between the exporter pod and discovered server pods. - Avoids self-testing. - Uses configurable test intervals, server ports, and protocols. - Requires `SOURCE_NODE_NAME` to be set. - Refined the `parse_and_publish_metrics` function to: - Accurately parse iperf3 results for bandwidth, jitter, packets, and lost packets. - Set `IPERF_TEST_SUCCESS` metric (0 for failure, 1 for success). - Zero out all relevant metrics for a given path upon test failure to prevent stale data. - Handle UDP-specific metrics correctly, zeroing them for TCP tests. - Improved robustness in accessing iperf3 result attributes. - Updated the main execution block to initialize logging, start the Prometheus HTTP server, and invoke the main loop. - Added comprehensive docstrings and inline comments throughout `exporter/exporter.py` for improved readability and maintainability. These changes align the exporter's implementation with the details specified in the design document (docs/DESIGN.MD). --- exporter/exporter.py | 287 ++++++++++++++++++++++++++++++++----------- 1 file changed, 212 insertions(+), 75 deletions(-) diff --git a/exporter/exporter.py b/exporter/exporter.py index c3ab378..920d65b 100644 --- a/exporter/exporter.py +++ b/exporter/exporter.py @@ -1,28 +1,60 @@ +""" +Prometheus exporter for iperf3 network performance monitoring. + +This script runs iperf3 tests between the node it's running on (source) and +other iperf3 server pods discovered in a Kubernetes cluster. It then exposes +these metrics for Prometheus consumption. + +Configuration is primarily through environment variables and command-line arguments +for log level. +""" import os import time import logging +import argparse +import sys from kubernetes import client, config from prometheus_client import start_http_server, Gauge import iperf3 -# --- Configuration --- -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +# --- Global Configuration & Setup --- + +# Argument parsing for log level configuration +# The command-line --log-level argument takes precedence over the LOG_LEVEL env var. +# Defaults to INFO if neither is set. +parser = argparse.ArgumentParser(description="iperf3 Prometheus exporter.") +parser.add_argument( + '--log-level', + default=os.environ.get('LOG_LEVEL', 'INFO').upper(), + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], + help='Set the logging level. Overrides LOG_LEVEL environment variable. (Default: INFO)' +) +args = parser.parse_args() +log_level_str = args.log_level + +# Convert log level string (e.g., 'INFO') to its numeric representation (e.g., logging.INFO) +numeric_level = getattr(logging, log_level_str.upper(), None) +if not isinstance(numeric_level, int): + # This case should ideally not be reached if choices in argparse are respected. + logging.error(f"Invalid log level: {log_level_str}. Defaulting to INFO.") + numeric_level = logging.INFO +logging.basicConfig(level=numeric_level, format='%(asctime)s - %(levelname)s - %(message)s') # --- Prometheus Metrics Definition --- +# These gauges will be used to expose iperf3 test results. IPERF_BANDWIDTH_MBPS = Gauge( 'iperf_network_bandwidth_mbps', - 'Network bandwidth measured by iperf3 in Megabits per second', + 'Network bandwidth measured by iperf3 in Megabits per second (Mbps)', ['source_node', 'destination_node', 'protocol'] ) IPERF_JITTER_MS = Gauge( 'iperf_network_jitter_ms', - 'Network jitter measured by iperf3 in milliseconds', + 'Network jitter measured by iperf3 in milliseconds (ms) for UDP tests', ['source_node', 'destination_node', 'protocol'] ) IPERF_PACKETS_TOTAL = Gauge( 'iperf_network_packets_total', - 'Total packets transmitted or received during the iperf3 test', + 'Total packets transmitted/received during the iperf3 UDP test', ['source_node', 'destination_node', 'protocol'] ) IPERF_LOST_PACKETS = Gauge( @@ -38,12 +70,21 @@ IPERF_TEST_SUCCESS = Gauge( def discover_iperf_servers(): """ - Discover iperf3 server pods in the cluster using the Kubernetes API. + Discovers iperf3 server pods within a Kubernetes cluster. + + It uses the in-cluster Kubernetes configuration to connect to the API. + The target namespace and label selector for iperf3 server pods are configured + via environment variables: + - IPERF_SERVER_NAMESPACE (default: 'default') + - IPERF_SERVER_LABEL_SELECTOR (default: 'app=iperf3-server') + + Returns: + list: A list of dictionaries, where each dictionary contains the 'ip' + and 'node_name' of a discovered iperf3 server pod. Returns an + empty list if discovery fails or no servers are found. """ try: - # Load in-cluster configuration - # Assumes the exporter runs in a pod with a service account having permissions - config.load_incluster_config() + config.load_incluster_config() # Assumes running inside a Kubernetes pod v1 = client.CoreV1Api() namespace = os.getenv('IPERF_SERVER_NAMESPACE', 'default') @@ -51,110 +92,206 @@ def discover_iperf_servers(): logging.info(f"Discovering iperf3 servers with label '{label_selector}' in namespace '{namespace}'") - # List pods across all namespaces with the specified label selector - # Note: list_pod_for_all_namespaces requires cluster-wide permissions ret = v1.list_pod_for_all_namespaces(label_selector=label_selector, watch=False) servers = [] - for i in ret.items: - # Ensure pod has an IP and is running - if i.status.pod_ip and i.status.phase == 'Running': + for item in ret.items: + if item.status.pod_ip and item.status.phase == 'Running': servers.append({ - 'ip': i.status.pod_ip, - 'node_name': i.spec.node_name + 'ip': item.status.pod_ip, + 'node_name': item.spec.node_name # Node where the iperf server pod is running }) logging.info(f"Discovered {len(servers)} iperf3 server pods.") return servers + except config.ConfigException as e: + logging.error(f"Kubernetes config error: {e}. Is the exporter running in a cluster with RBAC permissions?") + return [] except Exception as e: logging.error(f"Error discovering iperf servers: {e}") - return [] # Return empty list on error to avoid crashing the loop + return [] # Return empty list on error to avoid crashing the main loop -def run_iperf_test(server_ip, server_port, protocol, source_node, dest_node): +def run_iperf_test(server_ip, server_port, protocol, source_node_name, dest_node_name): """ - Runs a single iperf3 test and updates Prometheus metrics. + Runs a single iperf3 test against a specified server and publishes metrics. + + Args: + server_ip (str): The IP address of the iperf3 server. + server_port (int): The port number of the iperf3 server. + protocol (str): The protocol to use ('tcp' or 'udp'). + source_node_name (str): The name of the source node (where this exporter is running). + dest_node_name (str): The name of the destination node (where the server is running). + + The test duration is controlled by the IPERF_TEST_DURATION environment variable + (default: 5 seconds). """ - logging.info(f"Running iperf3 test from {source_node} to {dest_node} ({server_ip}:{server_port}) using {protocol.upper()}") + logging.info(f"Running iperf3 {protocol.upper()} test from {source_node_name} to {dest_node_name} ({server_ip}:{server_port})") - client = iperf3.Client() - client.server_hostname = server_ip - client.port = server_port - client.protocol = protocol - # Duration of the test (seconds) - client.duration = int(os.getenv('IPERF_TEST_DURATION', 5)) - # Output results as JSON for easy parsing - client.json_output = True + iperf_client = iperf3.Client() + iperf_client.server_hostname = server_ip + iperf_client.port = server_port + iperf_client.protocol = protocol + iperf_client.duration = int(os.getenv('IPERF_TEST_DURATION', 5)) # Test duration in seconds + iperf_client.json_output = True # Enables easy parsing of results - result = client.run() - - # Parse results and update metrics - parse_and_publish_metrics(result, source_node, dest_node, protocol) - -def parse_and_publish_metrics(result, source_node, dest_node, protocol): - """ - Parses the iperf3 result and updates Prometheus gauges. - Handles both successful and failed tests. - """ - labels = {'source_node': source_node, 'destination_node': dest_node, 'protocol': protocol} - - if result and result.error: - logging.error(f"Test from {source_node} to {dest_node} failed: {result.error}") + try: + result = iperf_client.run() + parse_and_publish_metrics(result, source_node_name, dest_node_name, protocol) + except Exception as e: + # Catch unexpected errors during client.run() or parsing + logging.error(f"Exception during iperf3 test or metric parsing for {dest_node_name}: {e}") + labels = {'source_node': source_node_name, 'destination_node': dest_node_name, 'protocol': protocol} IPERF_TEST_SUCCESS.labels(**labels).set(0) - # Set metrics to 0 on failure try: IPERF_BANDWIDTH_MBPS.labels(**labels).set(0) IPERF_JITTER_MS.labels(**labels).set(0) IPERF_PACKETS_TOTAL.labels(**labels).set(0) IPERF_LOST_PACKETS.labels(**labels).set(0) except KeyError: - # Labels might not be registered yet if this is the first failure - pass + logging.debug(f"KeyError setting failure metrics for {labels} after client.run() exception.") + + +def parse_and_publish_metrics(result, source_node, dest_node, protocol): + """ + Parses the iperf3 test result and updates Prometheus gauges. + + Args: + result (iperf3.TestResult): The result object from the iperf3 client. + source_node (str): Name of the source node. + dest_node (str): Name of the destination node. + protocol (str): Protocol used for the test ('tcp' or 'udp'). + """ + labels = {'source_node': source_node, 'destination_node': dest_node, 'protocol': protocol} + + # Handle failed tests (e.g., server unreachable) or missing result object + if not result or result.error: + error_message = result.error if result and result.error else "No result object from iperf3 client" + logging.warning(f"Test from {source_node} to {dest_node} ({protocol.upper()}) failed: {error_message}") + IPERF_TEST_SUCCESS.labels(**labels).set(0) + # Set all relevant metrics to 0 on failure to clear stale values from previous successes + try: + IPERF_BANDWIDTH_MBPS.labels(**labels).set(0) + IPERF_JITTER_MS.labels(**labels).set(0) # Applicable for UDP, zeroed for TCP later + IPERF_PACKETS_TOTAL.labels(**labels).set(0) # Applicable for UDP, zeroed for TCP later + IPERF_LOST_PACKETS.labels(**labels).set(0) # Applicable for UDP, zeroed for TCP later + except KeyError: + # This can happen if labels were never registered due to continuous failures + logging.debug(f"KeyError when setting failure metrics for {labels}. Gauges might not be initialized.") return - if not result: - logging.error(f"Test from {source_node} to {dest_node} failed to return a result object.") - IPERF_TEST_SUCCESS.labels(**labels).set(0) - try: - IPERF_BANDWIDTH_MBPS.labels(**labels).set(0) - IPERF_JITTER_MS.labels(**labels).set(0) - IPERF_PACKETS_TOTAL.labels(**labels).set(0) - IPERF_LOST_PACKETS.labels(**labels).set(0) - except KeyError: - pass - return - - + # If we reach here, the test itself was successful in execution IPERF_TEST_SUCCESS.labels(**labels).set(1) - # The summary data is typically in result.json['end']['sum_sent'] or result.json['end']['sum_received'] - # The iperf3-python client often exposes this directly as attributes like sent_Mbps or received_Mbps - # For TCP, we usually care about the received bandwidth on the client side (which is the exporter) - # For UDP, the client report contains jitter, lost packets, etc. + # Determine bandwidth: + # Order of preference: received_Mbps, sent_Mbps, Mbps, then JSON fallbacks. + # received_Mbps is often most relevant for TCP client perspective. + # sent_Mbps can be relevant for UDP or as a TCP fallback. bandwidth_mbps = 0 if hasattr(result, 'received_Mbps') and result.received_Mbps is not None: bandwidth_mbps = result.received_Mbps elif hasattr(result, 'sent_Mbps') and result.sent_Mbps is not None: - # Fallback, though received_Mbps is usually more relevant for TCP client bandwidth_mbps = result.sent_Mbps - # Add a check for the raw JSON output structure as a fallback - elif result.json and 'end' in result.json and 'sum_received' in result.json['end'] and result.json['end']['sum_received']['bits_per_second'] is not None: - bandwidth_mbps = result.json['end']['sum_received']['bits_per_second'] / 1000000 - elif result.json and 'end' in result.json and 'sum_sent' in result.json['end'] and result.json['end']['sum_sent']['bits_per_second'] is not None: - bandwidth_mbps = result.json['end']['sum_sent']['bits_per_second'] / 1000000 - + elif hasattr(result, 'Mbps') and result.Mbps is not None: # General attribute from iperf3 library + bandwidth_mbps = result.Mbps + # Fallback to raw JSON if direct attributes are None or missing + elif result.json: + # Prefer received sum, then sent sum from the JSON output's 'end' summary + if 'end' in result.json and 'sum_received' in result.json['end'] and \ + result.json['end']['sum_received'].get('bits_per_second') is not None: + bandwidth_mbps = result.json['end']['sum_received']['bits_per_second'] / 1000000.0 + elif 'end' in result.json and 'sum_sent' in result.json['end'] and \ + result.json['end']['sum_sent'].get('bits_per_second') is not None: + bandwidth_mbps = result.json['end']['sum_sent']['bits_per_second'] / 1000000.0 IPERF_BANDWIDTH_MBPS.labels(**labels).set(bandwidth_mbps) # UDP specific metrics if protocol == 'udp': - # iperf3-python exposes UDP results directly - IPERF_JITTER_MS.labels(**labels).set(result.jitter_ms if hasattr(result, 'jitter_ms') and result.jitter_ms is not None else 0) - IPERF_PACKETS_TOTAL.labels(**labels).set(result.packets if hasattr(result, 'packets') and result.packets is not None else 0) - IPERF_LOST_PACKETS.labels(**labels).set(result.lost_packets if hasattr(result, 'lost_packets') and result.lost_packets is not None else 0) + # These attributes are specific to UDP tests in iperf3 + IPERF_JITTER_MS.labels(**labels).set(getattr(result, 'jitter_ms', 0) if result.jitter_ms is not None else 0) + IPERF_PACKETS_TOTAL.labels(**labels).set(getattr(result, 'packets', 0) if result.packets is not None else 0) + IPERF_LOST_PACKETS.labels(**labels).set(getattr(result, 'lost_packets', 0) if result.lost_packets is not None else 0) else: - # Ensure UDP metrics are zeroed or absent for TCP tests + # For TCP tests, ensure UDP-specific metrics are set to 0 try: IPERF_JITTER_MS.labels(**labels).set(0) IPERF_PACKETS_TOTAL.labels(**labels).set(0) IPERF_LOST_PACKETS.labels(**labels).set(0) except KeyError: + # Can occur if labels not yet registered (e.g. first test is TCP) + logging.debug(f"KeyError for {labels} when zeroing UDP metrics for TCP test.") pass + +def main_loop(): + """ + Main operational loop of the iperf3 exporter. + + This loop periodically: + 1. Fetches configuration from environment variables: + - IPERF_TEST_INTERVAL (default: 300s): Time between test cycles. + - IPERF_SERVER_PORT (default: 5201): Port for iperf3 servers. + - IPERF_TEST_PROTOCOL (default: 'tcp'): 'tcp' or 'udp'. + - SOURCE_NODE_NAME (critical): Name of the node this exporter runs on. + 2. Discovers iperf3 server pods in the Kubernetes cluster. + 3. Runs iperf3 tests against each discovered server (unless it's on the same node). + 4. Sleeps for the configured test interval. + + If SOURCE_NODE_NAME is not set, the script will log an error and exit. + """ + # Fetch operational configuration from environment variables + test_interval = int(os.getenv('IPERF_TEST_INTERVAL', 300)) + server_port = int(os.getenv('IPERF_SERVER_PORT', 5201)) + protocol = os.getenv('IPERF_TEST_PROTOCOL', 'tcp').lower() # Ensure lowercase + source_node_name = os.getenv('SOURCE_NODE_NAME') + + # SOURCE_NODE_NAME is crucial for labeling metrics correctly. + if not source_node_name: + logging.error("CRITICAL: SOURCE_NODE_NAME environment variable not set. This is required. Exiting.") + sys.exit(1) + + logging.info( + f"Exporter configured. Source Node: {source_node_name}, " + f"Test Interval: {test_interval}s, Server Port: {server_port}, Protocol: {protocol.upper()}" + ) + + while True: + logging.info("Starting new iperf test cycle...") + servers = discover_iperf_servers() + + if not servers: + logging.warning("No iperf servers discovered in this cycle. Check K8s setup and RBAC permissions.") + else: + for server in servers: + dest_node_name = server.get('node_name', 'unknown_destination_node') # Default if key missing + server_ip = server.get('ip') + + if not server_ip: + logging.warning(f"Discovered server entry missing an IP: {server}. Skipping.") + continue + + # Avoid testing a node against itself + if dest_node_name == source_node_name: + logging.info(f"Skipping test to self: {source_node_name} to {server_ip} (on same node: {dest_node_name}).") + continue + + run_iperf_test(server_ip, server_port, protocol, source_node_name, dest_node_name) + + logging.info(f"Test cycle completed. Sleeping for {test_interval} seconds.") + time.sleep(test_interval) + +if __name__ == '__main__': + # Initial logging (like log level) is configured globally at the start of the script. + + # Fetch Prometheus exporter listen port from environment variable + listen_port = int(os.getenv('LISTEN_PORT', 9876)) + + try: + # Start the Prometheus HTTP server to expose metrics. + start_http_server(listen_port) + logging.info(f"Prometheus exporter listening on port {listen_port}") + except Exception as e: + logging.error(f"Failed to start Prometheus HTTP server on port {listen_port}: {e}") + sys.exit(1) # Exit if the metrics server cannot start + + # Enter the main operational loop. + # main_loop() contains its own critical checks (e.g., SOURCE_NODE_NAME) and will exit if necessary. + main_loop()