all repos — janny @ ec88575db2d8915a34cc523c2e2f01af6575ee4f

clean up Kubernetes resources after a set TTL

janny/main.py (view raw)

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
import threading
import itertools

from janny.utils import get, RUNNING
from janny.cleanup import clean_up
from janny.config import logger


def get_resource_urls() -> list:
    """
    Returns a list of tuples of all namespaced resources.
    """
    apis = get("/apis/")
    apiv1 = get("/api/v1")

    resource_urls = list()
    for r in apiv1.resources:
        if r.namespaced and "/" not in r.name:
            resource_urls.append(("/api/v1", r))

    for g in apis.groups:
        version = g.preferredVersion.groupVersion
        for r in get("/apis/" + version).resources:
            if r.namespaced and "/" not in r.name:
                resource_urls.append(("/apis/" + version, r))

    return resource_urls


def filter_included_resources(include_list: list, resource_tuple_list: list) -> list:
    """
    Filters the list returned by get_resource_urls() according to
    a list of included resources.
    """
    filtered_resource_list = list()
    for k, v in resource_tuple_list:
        if v.name in include_list:
            filtered_resource_list.append((k, v))

    return filtered_resource_list


def spawn_clean_up_job(resource_tuple: tuple, namespace: str):
    """
    Spawns a clean up job -- runs on a new thread.
    """
    url, resource = resource_tuple
    resource_list = get(f"{url}/namespaces/{namespace}/{resource.name}")
    try:
        for r in resource_list.items:
            try:
                annotations = vars(r.metadata.annotations)
                if (
                    "janny.ttl" in annotations
                    and r.metadata.name not in RUNNING
                ):
                    logger.info(
                        f"New resource to clean up: {resource.name}/{r.metadata.name}: ttl: {annotations['janny.ttl']}"
                    )
                    kill_time = vars(r.metadata.annotations)["janny.ttl"]
                    t = threading.Thread(
                        target=clean_up,
                        args=[
                            url,
                            resource,
                            r.metadata.name,
                            kill_time,
                            namespace,
                        ],
                    )
                    logger.info(f"Starting cleaner thread for {r.metadata.name}")
                    RUNNING.append(r.metadata.name)
                    t.start()
            except AttributeError:
                pass
    except AttributeError:
        logger.error(f"Received: {resource_list.message}")


def main(include_list, namespace_list):
    filtered = filter_included_resources(include_list, get_resource_urls())
    for f, n in itertools.product(filtered, namespace_list):
        spawn_clean_up_job(f, n)