from kubernetes import client, config, watch import os import time # Load in-cluster config config.load_incluster_config() # Set up Kubernetes API client v1 = client.CoreV1Api() # Configuration SERVICE_NAME = "traefik" NAMESPACE = "kube-system" ANNOTATION_KEY = "kube-vip.io/loadbalancerIPs" NODE_LABEL = "svccontroller.k3s.cattle.io/enablelb=true" def update_service_annotation(external_ip): # Get the current service object service = v1.read_namespaced_service(SERVICE_NAME, NAMESPACE) # Check if the annotation needs to be updated current_annotation = service.metadata.annotations.get(ANNOTATION_KEY) if current_annotation != external_ip: # Update the annotation body = {"metadata": {"annotations": {ANNOTATION_KEY: external_ip}}} v1.patch_namespaced_service(SERVICE_NAME, NAMESPACE, body) print(f"Updated service {SERVICE_NAME} with new external IP: {external_ip}") def main(): w = watch.Watch() while True: try: for event in w.stream( v1.list_node, label_selector=NODE_LABEL, _request_timeout=300 ): node = event["object"] node_name = node.metadata.name # Extract the external IP if it exists external_ip = None for address in node.status.addresses: if address.type == "ExternalIP": external_ip = address.address break if external_ip: print(f"Detected external IP {external_ip} for node {node_name}") update_service_annotation(external_ip) except client.exceptions.ApiException as e: print(f"API Exception: {e}") time.sleep(5) # Wait before retrying except Exception as e: print(f"Unexpected error: {e}") time.sleep(5) if __name__ == "__main__": main()