From f9a27135308e95f2ae0830f3a721fc6f30b069b2 Mon Sep 17 00:00:00 2001 From: Malar Invention Date: Fri, 1 Nov 2024 14:10:35 +0530 Subject: [PATCH] use service label patter --- node_external_ip_controller_async.py | 53 +++++++++++++++------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/node_external_ip_controller_async.py b/node_external_ip_controller_async.py index 4bbf4d6..61360db 100644 --- a/node_external_ip_controller_async.py +++ b/node_external_ip_controller_async.py @@ -2,9 +2,8 @@ import asyncio from kubernetes_asyncio import client, config, watch # Configuration -SERVICE_NAME_PATTERN = ( - "traefik" # Define the service name pattern or label to identify services -) +SERVICE_NAME = "traefik" +SERVICE_NAME_LABEL_PATTERN = "app.kubernetes.io/name=traefik" # Define the service name pattern or label to identify services NAMESPACE = "kube-system" ANNOTATION_KEY = "kube-vip.io/loadbalancerIPs" ZERO_GATEWAY_IP = "172.28.10.1" @@ -24,15 +23,16 @@ async def update_service_annotation(v1, service_name, external_ip): body = {"metadata": {"annotations": {ANNOTATION_KEY: target_annotation}}} await v1.patch_namespaced_service(service_name, NAMESPACE, body) print( - f"Updated service {service_name} with new external IP: {target_annotation}" + f"Updated service {service_name} with new external IP: {target_annotation}", + flush=True, ) except client.exceptions.ApiException as e: - print(f"API Exception in update_service_annotation: {e}") + print(f"API Exception in update_service_annotation: {e}", flush=True) async def watch_nodes(): - await config.load_incluster_config() + config.load_incluster_config() v1 = client.CoreV1Api() w = watch.Watch() @@ -52,32 +52,36 @@ async def watch_nodes(): break if external_ip: - print(f"Detected external IP {external_ip} for node {node_name}") + print( + f"Detected external IP {external_ip} for node {node_name}", + flush=True, + ) # Get all services that need to be updated with this external IP - services = await v1.list_namespaced_service( - NAMESPACE, label_selector=SERVICE_NAME_PATTERN + # services = await v1.list_namespaced_service( + # NAMESPACE, label_selector=SERVICE_NAME_PATTERN + # ) + service = await v1.read_namespaced_service(SERVICE_NAME, NAMESPACE) + # for service in services.items: + await update_service_annotation( + v1, service.metadata.name, external_ip ) - for service in services.items: - await update_service_annotation( - v1, service.metadata.name, external_ip - ) except client.exceptions.ApiException as e: - print(f"API Exception in watch_nodes: {e}") + print(f"API Exception in watch_nodes: {e}", flush=True) await asyncio.sleep(5) except asyncio.CancelledError: - print("Watch task was cancelled.") + print("Watch task was cancelled.", flush=True) break except Exception as e: - print(f"Unexpected error in watch_nodes: {e}") + print(f"Unexpected error in watch_nodes: {e}", flush=True) await asyncio.sleep(5) async def watch_services(): - await config.load_incluster_config() + config.load_incluster_config() v1 = client.CoreV1Api() w = watch.Watch() @@ -86,13 +90,13 @@ async def watch_services(): async for event in w.stream( v1.list_namespaced_service, NAMESPACE, - label_selector=SERVICE_NAME_PATTERN, + label_selector=SERVICE_NAME_LABEL_PATTERN, _request_timeout=300, ): + service = event["object"] + service_name = service.metadata.name if event["type"] == "ADDED": - service = event["object"] - service_name = service.metadata.name - print(f"New service detected: {service_name}") + print(f"New service detected: {service_name}", flush=True) # Fetch current external IP for nodes that match the label nodes = await v1.list_node(label_selector=NODE_LABEL) @@ -106,20 +110,19 @@ async def watch_services(): break except client.exceptions.ApiException as e: - print(f"API Exception in watch_services: {e}") + print(f"API Exception in watch_services: {e}", flush=True) await asyncio.sleep(5) except asyncio.CancelledError: - print("Watch task was cancelled.") + print("Watch task was cancelled.", flush=True) break except Exception as e: - print(f"Unexpected error in watch_services: {e}") + print(f"Unexpected error in watch_services: {e}", flush=True) await asyncio.sleep(5) async def main(): - await config.load_incluster_config() # Run watch_nodes and watch_services concurrently await asyncio.gather(watch_nodes(), watch_services())