janny/main.py (view raw)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
import threading
import itertools
from janny.utils import get
from janny.cleanup import clean_up
from janny.config import logger
RUNNING = []
def get_resource_urls() -> list:
"""
Returns a list of tuples of all namespaced resources.
"""
apis = get("/apis/")
apiv1 = get("/api/v1")
resource_urls = list()
for r in apiv1.resources:
if r.namespaced and "/" not in r.name:
resource_urls.append(("/api/v1", r))
for g in apis.groups:
version = g.preferredVersion.groupVersion
for r in get("/apis/" + version).resources:
if r.namespaced and "/" not in r.name:
resource_urls.append(("/apis/" + version, r))
return resource_urls
def filter_included_resources(include_list: list, resource_tuple_list: list) -> list:
"""
Filters the list returned by get_resource_urls() according to
a list of included resources.
"""
filtered_resource_list = list()
for k, v in resource_tuple_list:
if v.name in include_list:
filtered_resource_list.append((k, v))
return filtered_resource_list
def spawn_clean_up_job(resource_tuple: tuple, namespace: str):
"""
Spawns a clean up job -- runs on a new thread.
"""
url, resource = resource_tuple
resource_list = get(f"{url}/namespaces/{namespace}/{resource.name}")
try:
for r in resource_list.items:
try:
if (
"janny.ttl" in vars(r.metadata.annotations)
and r.metadata.name not in RUNNING
):
logger.info(
f"New resource to clean up: {resource.name}/{r.metadata.name}: {vars(r.metadata.annotations)}"
)
kill_time = vars(r.metadata.annotations)["janny.ttl"]
t = threading.Thread(
target=clean_up,
args=[url, resource.name, r.metadata.name, kill_time, namespace],
)
logger.info(f"Starting cleaner thread for {r.metadata.name}")
RUNNING.append(r.metadata.name)
t.start()
except AttributeError:
pass
except AttributeError:
logger.error(f"Received: {resource_list.message}")
def main(include_list, namespace_list):
filtered = filter_included_resources(include_list, get_resource_urls())
for f, n in itertools.product(filtered, namespace_list):
spawn_clean_up_job(f, n)
|