all repos — janny @ a5e17725e7d2ea9c0d165c5c9e96d5da10f782c0

clean up Kubernetes resources after a set TTL

janny/main.py (view raw)

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
import threading

from janny.utils import get
from janny.cleanup import clean_up
from janny.config import logger


RUNNING = []


def get_resource_urls() -> list:
    """
    Returns a list of tuples of all namespaced resources.
    """
    apis = get("/apis/")
    apiv1 = get("/api/v1")

    resource_urls = list()
    for r in apiv1.resources:
        if r.namespaced and "/" not in r.name:
            resource_urls.append(("/api/v1", r))

    for g in apis.groups:
        version = g.preferredVersion.groupVersion
        for r in get("/apis/" + version).resources:
            if r.namespaced and "/" not in r.name:
                resource_urls.append(("/apis/" + version, r))

    return resource_urls


def filter_included_resources(include_list: list, resource_tuple_list: list) -> list:
    """
    Filters the list returned by get_resource_urls() according to
    a list of included resources.
    """
    filtered_resource_list = list()
    for k, v in resource_tuple_list:
        if v.name in include_list:
            filtered_resource_list.append((k, v))

    return filtered_resource_list


def spawn_clean_up_job(resource_tuple: tuple, namespace: str):
    """
    Spawns a clean up job -- runs on a new thread.
    """
    url, resource = resource_tuple
    resource_list = get(f"{url}/namespaces/{namespace}/{resource.name}")
    for r in resource_list.items:
        if (
            "janny.ttl" in vars(r.metadata.annotations)
            and r.metadata.name not in RUNNING
        ):
            logger.info(
                f"New resource to clean up: {resource.name}/{r.metadata.name}: {vars(r.metadata.annotations)}"
            )
            kill_time = vars(r.metadata.annotations)["janny.ttl"]
            t = threading.Thread(
                target=clean_up,
                args=[url, resource.name, r.metadata.name, kill_time, namespace],
            )
            logger.info(f"Starting cleaner thread for {r.metadata.name}")
            RUNNING.append(r.metadata.name)
            t.start()


def main():
    while True:
        filtered = filter_included_resources(["deployments"], get_resource_urls())
        for f in filtered:
            spawn_clean_up_job(f, "default")