all repos — janny @ efbb09b9be2e820ddec0280db023c0a05e7d7ff6

clean up Kubernetes resources after a set TTL

Filter included resources and check for annotations
Anirudh Oppiliappan x@icyphox.sh
Mon, 08 Mar 2021 10:58:49 +0530
commit

efbb09b9be2e820ddec0280db023c0a05e7d7ff6

parent

bcfc8d6f2d4205994e1cf0889d7b05cbc540f147

1 files changed, 42 insertions(+), 7 deletions(-)

jump to
M janny/main.pyjanny/main.py

@@ -1,21 +1,56 @@

from janny.utils import get -def get_resource_urls(): +def get_resource_urls() -> list: + """ + Returns a list of tuples of all namespaced resources. + """ apis = get("/apis/") apiv1 = get("/api/v1") - resource_urls = dict() - resource_urls["/api/v1"] = apiv1.resources + resource_urls = list() + for r in apiv1.resources: + if r.namespaced and "/" not in r.name: + resource_urls.append(("/api/v1", r)) for g in apis.groups: version = g.preferredVersion.groupVersion - resource_urls["/apis/" + version] = get( - "/apis/" + g.preferredVersion.groupVersion - ).resources + for r in get("/apis/" + version).resources: + if r.namespaced and "/" not in r.name: + resource_urls.append(("/apis/" + version, r)) return resource_urls +def filter_included_resources(include_list: list, resource_tuple_list: list) -> list: + """ + Filters the list returned by get_resource_urls() according to + a list of included resources. + """ + filtered_resource_list = list() + for k, v in resource_tuple_list: + if v.name in include_list: + filtered_resource_list.append((k, v)) + + return filtered_resource_list + + +def check_annotations(resource_tuple: tuple, namespace: str) -> list: + """ + Returns a list of resource tuples which have the 'janny.ttl' + annotation. + """ + url, resource = resource_tuple + resource_list = get(f"{url}/namespaces/{namespace}/{resource.name}") + annotated_list = list() + for r in resource_list.items: + if "janny.ttl" in vars(r.metadata.annotations): + annotated_list.append(r) + + return annotated_list + + def main(): - get_resource_urls() + filtered = filter_included_resources(["deployments"], get_resource_urls()) + for f in filtered: + print(check_annotations(f, "default"))