diff --git a/gateway_services.yaml b/gateway_services.yaml index cd1664d..fd831eb 100644 --- a/gateway_services.yaml +++ b/gateway_services.yaml @@ -1,3 +1,4 @@ gateway_services: - - name: kube-system/traefik-tcp + - name: traefik-tcp + namespace: kube-system label_pattern: app.kubernetes.io/name=traefik diff --git a/node_external_ip_controller_async.py b/node_external_ip_controller_async.py index 0bc817b..f083f29 100644 --- a/node_external_ip_controller_async.py +++ b/node_external_ip_controller_async.py @@ -8,9 +8,7 @@ import yaml with open("gateway_services.yaml", "r") as f: gateway_services = yaml.safe_load(f) -SERVICE_NAME = gateway_services["gateway_services"][0]["name"].split("/")[1] -SERVICE_NAME_LABEL_PATTERN = gateway_services["gateway_services"][0]["label_pattern"] -NAMESPACE = gateway_services["gateway_services"][0]["name"].split("/")[0] +services = gateway_services["gateway_services"] ANNOTATION_KEY = os.getenv("ANNOTATION_KEY", "kube-vip.io/loadbalancerIPs") ZERO_GATEWAY_IP = os.getenv("ZERO_GATEWAY_IP", "172.28.10.1") NODE_LABEL = os.getenv("NODE_LABEL", "svccontroller.k3s.cattle.io/enablelb=true") @@ -18,18 +16,20 @@ SERVICE_REQUEST_TIMEOUT = int(os.getenv("SERVICE_REQUEST_TIMEOUT", 300)) NODE_REQUEST_TIMEOUT = int(os.getenv("NODE_REQUEST_TIMEOUT", 30)) -async def update_service_annotation(v1, service_name, external_ips): +async def update_service_annotation(v1, service, external_ips): try: # Get the current service object - service = await v1.read_namespaced_service(service_name, NAMESPACE) + service_name = service["name"] + namespace = service["namespace"] + service_obj = await v1.read_namespaced_service(service_name, namespace) # Check if the annotation needs to be updated - current_annotation = service.metadata.annotations.get(ANNOTATION_KEY) + current_annotation = service_obj.metadata.annotations.get(ANNOTATION_KEY) target_annotation = ",".join(external_ips) + "," + ZERO_GATEWAY_IP if current_annotation != target_annotation: # Update the annotation body = {"metadata": {"annotations": {ANNOTATION_KEY: target_annotation}}} - await v1.patch_namespaced_service(service_name, NAMESPACE, body) + await v1.patch_namespaced_service(service_name, namespace, body) print( f"Updated service {service_name} with new external IP: {target_annotation}", flush=True, @@ -65,8 +65,8 @@ async def watch_nodes(): f"Detected external IPs {str(external_ips)} for node {str(node_names)}", flush=True, ) - service = await v1.read_namespaced_service(SERVICE_NAME, NAMESPACE) - await update_service_annotation(v1, service.metadata.name, external_ips) + for service in services: + await update_service_annotation(v1, service, external_ips) except client.exceptions.ApiException as e: print(f"API Exception in watch_nodes: {e}", flush=True) @@ -81,22 +81,27 @@ async def watch_nodes(): await asyncio.sleep(5) -async def watch_services(): +async def watch_service(service): config.load_incluster_config() v1 = client.CoreV1Api() w = watch.Watch() + service_name = service["name"] + namespace = service["namespace"] + label_selector = service["label_pattern"] while True: try: async for event in w.stream( v1.list_namespaced_service, - NAMESPACE, - label_selector=SERVICE_NAME_LABEL_PATTERN, + namespace, + label_selector=label_selector, _request_timeout=SERVICE_REQUEST_TIMEOUT, ): - service = event["object"] - service_name = service.metadata.name - if event["type"] == "ADDED" and service_name == SERVICE_NAME: + service_obj = event["object"] + if ( + event["type"] == "ADDED" + and service_obj.metadata.name == service_name + ): print(f"New service detected: {service_name}", flush=True) # Fetch current external IP for nodes that match the label @@ -106,7 +111,7 @@ async def watch_services(): for address in node.status.addresses: if address.type == "ExternalIP": external_ips.append(address.address) - await update_service_annotation(v1, service_name, external_ips) + await update_service_annotation(v1, service, external_ips) except client.exceptions.ApiException as e: print(f"API Exception in watch_services: {e}", flush=True) @@ -122,9 +127,9 @@ async def watch_services(): async def main(): - # Run watch_nodes and watch_services concurrently - await asyncio.gather(watch_nodes(), watch_services()) + service_watchers = [watch_service(service) for service in services] + await asyncio.gather(watch_nodes(), *service_watchers) if __name__ == "__main__":