from kubernetes import client, config, watch import os import time # Load in-cluster config config.load_incluster_config() # Set up Kubernetes API client v1 = client.CoreV1Api() # Configuration SERVICE_NAME = "traefik" NAMESPACE = "kube-system" ANNOTATION_KEY = "kube-vip.io/loadbalancerIPs" NODE_LABEL = "svccontroller.k3s.cattle.io/enablelb=true" def update_service_annotation(external_ip): # Get the current service object service = v1.read_namespaced_service(SERVICE_NAME, NAMESPACE) # Check if the annotation needs to be updated current_annotation = service.metadata.annotations.get(ANNOTATION_KEY) ipam_address = service.spec.load_balancer_ip if ipam_address: print( f"service {SERVICE_NAME} has existing ipam IP: {ipam_address}", flush=True ) target_annotation = ",".join({ipam_address, external_ip}) else: target_annotation = external_ip if current_annotation != target_annotation: # Update the annotation body = {"metadata": {"annotations": {ANNOTATION_KEY: target_annotation}}} v1.patch_namespaced_service(SERVICE_NAME, NAMESPACE, body) print( f"Updated service {SERVICE_NAME} with new external IPs: {target_annotation}", flush=True, ) def main(): w = watch.Watch() while True: try: for event in w.stream( v1.list_node, label_selector=NODE_LABEL, _request_timeout=300 ): node = event["object"] node_name = node.metadata.name # Extract the external IP if it exists external_ip = None for address in node.status.addresses: if address.type == "ExternalIP": external_ip = address.address break if external_ip: print( f"Detected external IP {external_ip} for node {node_name}", flush=True, ) update_service_annotation(external_ip) except client.exceptions.ApiException as e: print(f"API Exception: {e}", flush=True) time.sleep(5) # Wait before retrying except Exception as e: print(f"Unexpected error: {e}", flush=True) time.sleep(5) if __name__ == "__main__": main()