all repos — janny @ 4f685e1161bc6eb5e47c4922d15d620d28bccd0f

clean up Kubernetes resources after a set TTL

janny/utils.py (view raw)

 1
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
import json
import re
from datetime import timedelta
from types import SimpleNamespace

from janny.config import API_HOST
from janny.auth import SESSION


def get(path: str) -> SimpleNamespace:
    """Convert a JSON response into a Python object"""
    s = SESSION
    data = s.get(API_HOST + path).content
    obj = json.loads(data, object_hook=lambda d: SimpleNamespace(**d))
    return obj


timedelta_regex = (
    r"((?P<days>-?\d+)d)?" r"((?P<hours>-?\d+)h)?" r"((?P<minutes>-?\d+)m)?"
)

timedelta_pattern = re.compile(timedelta_regex, re.IGNORECASE)


def parse_delta(delta: str) -> timedelta:
    """Parses a human readable timedelta (3d5h19m) into a datetime.timedelta.
    Delta includes:
    * Xd days
    * Xh hours
    * Xm minutes
    Values can be negative following timedelta's rules. Eg: -5h-30m
    """
    match = timedelta_pattern.match(delta)
    if match:
        parts = {k: int(v) for k, v in match.groupdict().items() if v}
        return timedelta(**parts)
    return timedelta(0)

# List of running threads
RUNNING = list()