janny/main.py (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
import threading import itertools from janny.utils import get, RUNNING from janny.cleanup import clean_up from janny.config import logger def get_resource_urls() -> list: """ Returns a list of tuples of all namespaced resources. """ apis = get("/apis/") apiv1 = get("/api/v1") resource_urls = list() for r in apiv1.resources: if r.namespaced and "/" not in r.name: resource_urls.append(("/api/v1", r)) for g in apis.groups: version = g.preferredVersion.groupVersion for r in get("/apis/" + version).resources: if r.namespaced and "/" not in r.name: resource_urls.append(("/apis/" + version, r)) return resource_urls def filter_included_resources(include_list: list, resource_tuple_list: list) -> list: """ Filters the list returned by get_resource_urls() according to a list of included resources. """ filtered_resource_list = list() for k, v in resource_tuple_list: if v.name in include_list: filtered_resource_list.append((k, v)) return filtered_resource_list def spawn_clean_up_job(resource_tuple: tuple, namespace: str): """ Spawns a clean up job -- runs on a new thread. """ url, resource = resource_tuple resource_list = get(f"{url}/namespaces/{namespace}/{resource.name}") try: for r in resource_list.items: try: annotations = vars(r.metadata.annotations) if ( "janny.ttl" in annotations and r.metadata.name not in RUNNING ): logger.info( f"New resource to clean up: {resource.name}/{r.metadata.name}: ttl: {annotations['janny.ttl']}" ) kill_time = vars(r.metadata.annotations)["janny.ttl"] t = threading.Thread( target=clean_up, args=[ url, resource, r.metadata.name, kill_time, namespace, ], ) logger.info(f"Starting cleaner thread for {r.metadata.name}") RUNNING.append(r.metadata.name) t.start() except AttributeError: pass except AttributeError: logger.error(f"Received: {resource_list.message}") def main(include_list, namespace_list): filtered = filter_included_resources(include_list, get_resource_urls()) for f, n in itertools.product(filtered, namespace_list): spawn_clean_up_job(f, n) |