diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 38251e6..d7f5a0e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -27,6 +27,9 @@ jobs: build: name: Build Docker Image runs-on: ubuntu-latest + permissions: + contents: read # Needed to checkout the repository + packages: write # Needed to push Docker images to GHCR steps: - name: Check out code uses: actions/checkout@v3 @@ -42,12 +45,28 @@ jobs: uses: docker/metadata-action@v4 with: images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + # Tag with the PR number if it's a pull request event + type=match,pattern=pull_request,value=pr-{{number}} + # Tag with the git SHA + type=sha,prefix= + # Tag with 'latest' if on the main branch (though this workflow only runs on PRs to main) + type=ref,event=branch,pattern=main,value=latest + + - name: Log in to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} - name: Build Docker image uses: docker/build-push-action@v4 with: context: ./exporter - push: false # Do not push on PRs + # Push the image if the event is a pull request. + # The workflow currently only triggers on pull_request events. + push: ${{ github.event_name == 'pull_request' }} tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} platforms: linux/amd64,linux/arm64 diff --git a/charts/iperf3-monitor/templates/exporter-deployment.yaml b/charts/iperf3-monitor/templates/exporter-deployment.yaml index b6d4df3..e58bd7b 100644 --- a/charts/iperf3-monitor/templates/exporter-deployment.yaml +++ b/charts/iperf3-monitor/templates/exporter-deployment.yaml @@ -34,6 +34,8 @@ spec: value: "{{ .Values.exporter.testInterval }}" - name: IPERF_TEST_PROTOCOL value: "{{ .Values.exporter.testProtocol }}" + - name: LOG_LEVEL + value: "{{ .Values.exporter.logLevel }}" - name: IPERF_SERVER_PORT value: "5201" # Hardcoded as per server DaemonSet - name: IPERF_SERVER_NAMESPACE diff --git a/charts/iperf3-monitor/values.yaml b/charts/iperf3-monitor/values.yaml index d2de952..2d31853 100644 --- a/charts/iperf3-monitor/values.yaml +++ b/charts/iperf3-monitor/values.yaml @@ -24,6 +24,9 @@ exporter: # -- Interval in seconds between complete test cycles (i.e., testing all server nodes). testInterval: 300 + # -- Log level for the iperf3 exporter (e.g., DEBUG, INFO, WARNING, ERROR, CRITICAL). + logLevel: INFO + # -- Timeout in seconds for a single iperf3 test run. testTimeout: 10 diff --git a/exporter/exporter.py b/exporter/exporter.py index c3ab378..920d65b 100644 --- a/exporter/exporter.py +++ b/exporter/exporter.py @@ -1,28 +1,60 @@ +""" +Prometheus exporter for iperf3 network performance monitoring. + +This script runs iperf3 tests between the node it's running on (source) and +other iperf3 server pods discovered in a Kubernetes cluster. It then exposes +these metrics for Prometheus consumption. + +Configuration is primarily through environment variables and command-line arguments +for log level. +""" import os import time import logging +import argparse +import sys from kubernetes import client, config from prometheus_client import start_http_server, Gauge import iperf3 -# --- Configuration --- -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +# --- Global Configuration & Setup --- + +# Argument parsing for log level configuration +# The command-line --log-level argument takes precedence over the LOG_LEVEL env var. +# Defaults to INFO if neither is set. +parser = argparse.ArgumentParser(description="iperf3 Prometheus exporter.") +parser.add_argument( + '--log-level', + default=os.environ.get('LOG_LEVEL', 'INFO').upper(), + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], + help='Set the logging level. Overrides LOG_LEVEL environment variable. (Default: INFO)' +) +args = parser.parse_args() +log_level_str = args.log_level + +# Convert log level string (e.g., 'INFO') to its numeric representation (e.g., logging.INFO) +numeric_level = getattr(logging, log_level_str.upper(), None) +if not isinstance(numeric_level, int): + # This case should ideally not be reached if choices in argparse are respected. + logging.error(f"Invalid log level: {log_level_str}. Defaulting to INFO.") + numeric_level = logging.INFO +logging.basicConfig(level=numeric_level, format='%(asctime)s - %(levelname)s - %(message)s') # --- Prometheus Metrics Definition --- +# These gauges will be used to expose iperf3 test results. IPERF_BANDWIDTH_MBPS = Gauge( 'iperf_network_bandwidth_mbps', - 'Network bandwidth measured by iperf3 in Megabits per second', + 'Network bandwidth measured by iperf3 in Megabits per second (Mbps)', ['source_node', 'destination_node', 'protocol'] ) IPERF_JITTER_MS = Gauge( 'iperf_network_jitter_ms', - 'Network jitter measured by iperf3 in milliseconds', + 'Network jitter measured by iperf3 in milliseconds (ms) for UDP tests', ['source_node', 'destination_node', 'protocol'] ) IPERF_PACKETS_TOTAL = Gauge( 'iperf_network_packets_total', - 'Total packets transmitted or received during the iperf3 test', + 'Total packets transmitted/received during the iperf3 UDP test', ['source_node', 'destination_node', 'protocol'] ) IPERF_LOST_PACKETS = Gauge( @@ -38,12 +70,21 @@ IPERF_TEST_SUCCESS = Gauge( def discover_iperf_servers(): """ - Discover iperf3 server pods in the cluster using the Kubernetes API. + Discovers iperf3 server pods within a Kubernetes cluster. + + It uses the in-cluster Kubernetes configuration to connect to the API. + The target namespace and label selector for iperf3 server pods are configured + via environment variables: + - IPERF_SERVER_NAMESPACE (default: 'default') + - IPERF_SERVER_LABEL_SELECTOR (default: 'app=iperf3-server') + + Returns: + list: A list of dictionaries, where each dictionary contains the 'ip' + and 'node_name' of a discovered iperf3 server pod. Returns an + empty list if discovery fails or no servers are found. """ try: - # Load in-cluster configuration - # Assumes the exporter runs in a pod with a service account having permissions - config.load_incluster_config() + config.load_incluster_config() # Assumes running inside a Kubernetes pod v1 = client.CoreV1Api() namespace = os.getenv('IPERF_SERVER_NAMESPACE', 'default') @@ -51,110 +92,206 @@ def discover_iperf_servers(): logging.info(f"Discovering iperf3 servers with label '{label_selector}' in namespace '{namespace}'") - # List pods across all namespaces with the specified label selector - # Note: list_pod_for_all_namespaces requires cluster-wide permissions ret = v1.list_pod_for_all_namespaces(label_selector=label_selector, watch=False) servers = [] - for i in ret.items: - # Ensure pod has an IP and is running - if i.status.pod_ip and i.status.phase == 'Running': + for item in ret.items: + if item.status.pod_ip and item.status.phase == 'Running': servers.append({ - 'ip': i.status.pod_ip, - 'node_name': i.spec.node_name + 'ip': item.status.pod_ip, + 'node_name': item.spec.node_name # Node where the iperf server pod is running }) logging.info(f"Discovered {len(servers)} iperf3 server pods.") return servers + except config.ConfigException as e: + logging.error(f"Kubernetes config error: {e}. Is the exporter running in a cluster with RBAC permissions?") + return [] except Exception as e: logging.error(f"Error discovering iperf servers: {e}") - return [] # Return empty list on error to avoid crashing the loop + return [] # Return empty list on error to avoid crashing the main loop -def run_iperf_test(server_ip, server_port, protocol, source_node, dest_node): +def run_iperf_test(server_ip, server_port, protocol, source_node_name, dest_node_name): """ - Runs a single iperf3 test and updates Prometheus metrics. + Runs a single iperf3 test against a specified server and publishes metrics. + + Args: + server_ip (str): The IP address of the iperf3 server. + server_port (int): The port number of the iperf3 server. + protocol (str): The protocol to use ('tcp' or 'udp'). + source_node_name (str): The name of the source node (where this exporter is running). + dest_node_name (str): The name of the destination node (where the server is running). + + The test duration is controlled by the IPERF_TEST_DURATION environment variable + (default: 5 seconds). """ - logging.info(f"Running iperf3 test from {source_node} to {dest_node} ({server_ip}:{server_port}) using {protocol.upper()}") + logging.info(f"Running iperf3 {protocol.upper()} test from {source_node_name} to {dest_node_name} ({server_ip}:{server_port})") - client = iperf3.Client() - client.server_hostname = server_ip - client.port = server_port - client.protocol = protocol - # Duration of the test (seconds) - client.duration = int(os.getenv('IPERF_TEST_DURATION', 5)) - # Output results as JSON for easy parsing - client.json_output = True + iperf_client = iperf3.Client() + iperf_client.server_hostname = server_ip + iperf_client.port = server_port + iperf_client.protocol = protocol + iperf_client.duration = int(os.getenv('IPERF_TEST_DURATION', 5)) # Test duration in seconds + iperf_client.json_output = True # Enables easy parsing of results - result = client.run() - - # Parse results and update metrics - parse_and_publish_metrics(result, source_node, dest_node, protocol) - -def parse_and_publish_metrics(result, source_node, dest_node, protocol): - """ - Parses the iperf3 result and updates Prometheus gauges. - Handles both successful and failed tests. - """ - labels = {'source_node': source_node, 'destination_node': dest_node, 'protocol': protocol} - - if result and result.error: - logging.error(f"Test from {source_node} to {dest_node} failed: {result.error}") + try: + result = iperf_client.run() + parse_and_publish_metrics(result, source_node_name, dest_node_name, protocol) + except Exception as e: + # Catch unexpected errors during client.run() or parsing + logging.error(f"Exception during iperf3 test or metric parsing for {dest_node_name}: {e}") + labels = {'source_node': source_node_name, 'destination_node': dest_node_name, 'protocol': protocol} IPERF_TEST_SUCCESS.labels(**labels).set(0) - # Set metrics to 0 on failure try: IPERF_BANDWIDTH_MBPS.labels(**labels).set(0) IPERF_JITTER_MS.labels(**labels).set(0) IPERF_PACKETS_TOTAL.labels(**labels).set(0) IPERF_LOST_PACKETS.labels(**labels).set(0) except KeyError: - # Labels might not be registered yet if this is the first failure - pass + logging.debug(f"KeyError setting failure metrics for {labels} after client.run() exception.") + + +def parse_and_publish_metrics(result, source_node, dest_node, protocol): + """ + Parses the iperf3 test result and updates Prometheus gauges. + + Args: + result (iperf3.TestResult): The result object from the iperf3 client. + source_node (str): Name of the source node. + dest_node (str): Name of the destination node. + protocol (str): Protocol used for the test ('tcp' or 'udp'). + """ + labels = {'source_node': source_node, 'destination_node': dest_node, 'protocol': protocol} + + # Handle failed tests (e.g., server unreachable) or missing result object + if not result or result.error: + error_message = result.error if result and result.error else "No result object from iperf3 client" + logging.warning(f"Test from {source_node} to {dest_node} ({protocol.upper()}) failed: {error_message}") + IPERF_TEST_SUCCESS.labels(**labels).set(0) + # Set all relevant metrics to 0 on failure to clear stale values from previous successes + try: + IPERF_BANDWIDTH_MBPS.labels(**labels).set(0) + IPERF_JITTER_MS.labels(**labels).set(0) # Applicable for UDP, zeroed for TCP later + IPERF_PACKETS_TOTAL.labels(**labels).set(0) # Applicable for UDP, zeroed for TCP later + IPERF_LOST_PACKETS.labels(**labels).set(0) # Applicable for UDP, zeroed for TCP later + except KeyError: + # This can happen if labels were never registered due to continuous failures + logging.debug(f"KeyError when setting failure metrics for {labels}. Gauges might not be initialized.") return - if not result: - logging.error(f"Test from {source_node} to {dest_node} failed to return a result object.") - IPERF_TEST_SUCCESS.labels(**labels).set(0) - try: - IPERF_BANDWIDTH_MBPS.labels(**labels).set(0) - IPERF_JITTER_MS.labels(**labels).set(0) - IPERF_PACKETS_TOTAL.labels(**labels).set(0) - IPERF_LOST_PACKETS.labels(**labels).set(0) - except KeyError: - pass - return - - + # If we reach here, the test itself was successful in execution IPERF_TEST_SUCCESS.labels(**labels).set(1) - # The summary data is typically in result.json['end']['sum_sent'] or result.json['end']['sum_received'] - # The iperf3-python client often exposes this directly as attributes like sent_Mbps or received_Mbps - # For TCP, we usually care about the received bandwidth on the client side (which is the exporter) - # For UDP, the client report contains jitter, lost packets, etc. + # Determine bandwidth: + # Order of preference: received_Mbps, sent_Mbps, Mbps, then JSON fallbacks. + # received_Mbps is often most relevant for TCP client perspective. + # sent_Mbps can be relevant for UDP or as a TCP fallback. bandwidth_mbps = 0 if hasattr(result, 'received_Mbps') and result.received_Mbps is not None: bandwidth_mbps = result.received_Mbps elif hasattr(result, 'sent_Mbps') and result.sent_Mbps is not None: - # Fallback, though received_Mbps is usually more relevant for TCP client bandwidth_mbps = result.sent_Mbps - # Add a check for the raw JSON output structure as a fallback - elif result.json and 'end' in result.json and 'sum_received' in result.json['end'] and result.json['end']['sum_received']['bits_per_second'] is not None: - bandwidth_mbps = result.json['end']['sum_received']['bits_per_second'] / 1000000 - elif result.json and 'end' in result.json and 'sum_sent' in result.json['end'] and result.json['end']['sum_sent']['bits_per_second'] is not None: - bandwidth_mbps = result.json['end']['sum_sent']['bits_per_second'] / 1000000 - + elif hasattr(result, 'Mbps') and result.Mbps is not None: # General attribute from iperf3 library + bandwidth_mbps = result.Mbps + # Fallback to raw JSON if direct attributes are None or missing + elif result.json: + # Prefer received sum, then sent sum from the JSON output's 'end' summary + if 'end' in result.json and 'sum_received' in result.json['end'] and \ + result.json['end']['sum_received'].get('bits_per_second') is not None: + bandwidth_mbps = result.json['end']['sum_received']['bits_per_second'] / 1000000.0 + elif 'end' in result.json and 'sum_sent' in result.json['end'] and \ + result.json['end']['sum_sent'].get('bits_per_second') is not None: + bandwidth_mbps = result.json['end']['sum_sent']['bits_per_second'] / 1000000.0 IPERF_BANDWIDTH_MBPS.labels(**labels).set(bandwidth_mbps) # UDP specific metrics if protocol == 'udp': - # iperf3-python exposes UDP results directly - IPERF_JITTER_MS.labels(**labels).set(result.jitter_ms if hasattr(result, 'jitter_ms') and result.jitter_ms is not None else 0) - IPERF_PACKETS_TOTAL.labels(**labels).set(result.packets if hasattr(result, 'packets') and result.packets is not None else 0) - IPERF_LOST_PACKETS.labels(**labels).set(result.lost_packets if hasattr(result, 'lost_packets') and result.lost_packets is not None else 0) + # These attributes are specific to UDP tests in iperf3 + IPERF_JITTER_MS.labels(**labels).set(getattr(result, 'jitter_ms', 0) if result.jitter_ms is not None else 0) + IPERF_PACKETS_TOTAL.labels(**labels).set(getattr(result, 'packets', 0) if result.packets is not None else 0) + IPERF_LOST_PACKETS.labels(**labels).set(getattr(result, 'lost_packets', 0) if result.lost_packets is not None else 0) else: - # Ensure UDP metrics are zeroed or absent for TCP tests + # For TCP tests, ensure UDP-specific metrics are set to 0 try: IPERF_JITTER_MS.labels(**labels).set(0) IPERF_PACKETS_TOTAL.labels(**labels).set(0) IPERF_LOST_PACKETS.labels(**labels).set(0) except KeyError: + # Can occur if labels not yet registered (e.g. first test is TCP) + logging.debug(f"KeyError for {labels} when zeroing UDP metrics for TCP test.") pass + +def main_loop(): + """ + Main operational loop of the iperf3 exporter. + + This loop periodically: + 1. Fetches configuration from environment variables: + - IPERF_TEST_INTERVAL (default: 300s): Time between test cycles. + - IPERF_SERVER_PORT (default: 5201): Port for iperf3 servers. + - IPERF_TEST_PROTOCOL (default: 'tcp'): 'tcp' or 'udp'. + - SOURCE_NODE_NAME (critical): Name of the node this exporter runs on. + 2. Discovers iperf3 server pods in the Kubernetes cluster. + 3. Runs iperf3 tests against each discovered server (unless it's on the same node). + 4. Sleeps for the configured test interval. + + If SOURCE_NODE_NAME is not set, the script will log an error and exit. + """ + # Fetch operational configuration from environment variables + test_interval = int(os.getenv('IPERF_TEST_INTERVAL', 300)) + server_port = int(os.getenv('IPERF_SERVER_PORT', 5201)) + protocol = os.getenv('IPERF_TEST_PROTOCOL', 'tcp').lower() # Ensure lowercase + source_node_name = os.getenv('SOURCE_NODE_NAME') + + # SOURCE_NODE_NAME is crucial for labeling metrics correctly. + if not source_node_name: + logging.error("CRITICAL: SOURCE_NODE_NAME environment variable not set. This is required. Exiting.") + sys.exit(1) + + logging.info( + f"Exporter configured. Source Node: {source_node_name}, " + f"Test Interval: {test_interval}s, Server Port: {server_port}, Protocol: {protocol.upper()}" + ) + + while True: + logging.info("Starting new iperf test cycle...") + servers = discover_iperf_servers() + + if not servers: + logging.warning("No iperf servers discovered in this cycle. Check K8s setup and RBAC permissions.") + else: + for server in servers: + dest_node_name = server.get('node_name', 'unknown_destination_node') # Default if key missing + server_ip = server.get('ip') + + if not server_ip: + logging.warning(f"Discovered server entry missing an IP: {server}. Skipping.") + continue + + # Avoid testing a node against itself + if dest_node_name == source_node_name: + logging.info(f"Skipping test to self: {source_node_name} to {server_ip} (on same node: {dest_node_name}).") + continue + + run_iperf_test(server_ip, server_port, protocol, source_node_name, dest_node_name) + + logging.info(f"Test cycle completed. Sleeping for {test_interval} seconds.") + time.sleep(test_interval) + +if __name__ == '__main__': + # Initial logging (like log level) is configured globally at the start of the script. + + # Fetch Prometheus exporter listen port from environment variable + listen_port = int(os.getenv('LISTEN_PORT', 9876)) + + try: + # Start the Prometheus HTTP server to expose metrics. + start_http_server(listen_port) + logging.info(f"Prometheus exporter listening on port {listen_port}") + except Exception as e: + logging.error(f"Failed to start Prometheus HTTP server on port {listen_port}: {e}") + sys.exit(1) # Exit if the metrics server cannot start + + # Enter the main operational loop. + # main_loop() contains its own critical checks (e.g., SOURCE_NODE_NAME) and will exit if necessary. + main_loop()