diff --git a/src/common.py b/src/common.py index 5275a41..964057a 100644 --- a/src/common.py +++ b/src/common.py @@ -1,15 +1,17 @@ from datetime import datetime + class Serialize: def boolean(string: str) -> bool: if string == "true" or string == "True": return True return False - def integer(string: str) -> int: return int(string) - def time(string: str) -> datetime: - return datetime.strptime(string, '%Y-%m-%dT%H:%M:%S.%f%z') + try: + return datetime.strptime(string, "%Y-%m-%dT%H:%M:%S.%f%z") + except TypeError: + return string diff --git a/src/images.py b/src/images.py index c91faef..4869729 100644 --- a/src/images.py +++ b/src/images.py @@ -1,10 +1,13 @@ +import re from typing import Self from src.common import Serialize + class Images: """ Images class object. """ + def __init__(self, images: list) -> None: """ Initialize an images object. @@ -12,6 +15,7 @@ class Images: self.images = [] self.deserialize(images=images) + self.sort_by_time() def deserialize(self, images: list) -> Self: """ @@ -30,11 +34,41 @@ class Images: images.append(image.serialize()) return images + def filter_by_os(self, os="") -> Self: + images = [] + for image in self.images: + if image.os == os: + images.append(image.serialize()) + return Images(images) + + def filter_by_os_version(self, os_version="") -> Self: + images = [] + for image in self.images: + if image.os_version == os_version: + images.append(image.serialize()) + return Images(images) + + def filter_by_architecture(self, architecture="") -> Self: + images = [] + for image in self.images: + if image.architecture == architecture: + images.append(image.serialize()) + return Images(images) + + def sort_by_time(self) -> Self: + self.images.sort(key=lambda image: image.last_pushed, reverse=True) + return self + + def get_latest(self) -> Self: + self.sort_by_time() + return Images([self.serialize()[0]]) + class Image: """ Image class object. """ + def __init__(self, image: dict) -> None: """ Initialize an image object. @@ -42,7 +76,9 @@ class Image: self.architecture = "arm" self.features = "" self.variant = "v7" - self.digest = "sha256:2627e55acb9ac183f1c94e6a44f869620d164bbb10d7c42b29df08513eb20c29" + self.digest = ( + "sha256:2627e55acb9ac183f1c94e6a44f869620d164bbb10d7c42b29df08513eb20c29" + ) self.os = "linux" self.os_features = "" self.os_version = None @@ -57,17 +93,17 @@ class Image: """ Method to deserialize image dictionary. """ - self.architecture = image.get('architecture', None) - self.features = image.get('features', None) - self.variant = image.get('variant', None) - self.digest = image.get('digest', None) - self.os = image.get('os', None) - self.os_features = image.get('os_features', None) - self.os_version = image.get('os_version', None) - self.size = Serialize.integer(image.get('size', -1)) - self.status = image.get('status', None) - self.last_pulled = Serialize.time(image.get('last_pulled', None)) - self.last_pushed = Serialize.time(image.get('last_pushed', None)) + self.architecture = image.get("architecture", None) + self.features = image.get("features", None) + self.variant = image.get("variant", None) + self.digest = image.get("digest", None) + self.os = image.get("os", None) + self.os_features = image.get("os_features", None) + self.os_version = image.get("os_version", None) + self.size = Serialize.integer(image.get("size", -1)) + self.status = image.get("status", None) + self.last_pulled = Serialize.time(image.get("last_pulled", None)) + self.last_pushed = Serialize.time(image.get("last_pushed", None)) return self diff --git a/src/main.py b/src/main.py index 0021173..d6ff3e0 100644 --- a/src/main.py +++ b/src/main.py @@ -8,8 +8,12 @@ password = None def main(): reg = Registry(username=username, password=password) - registry = reg.populate_tags("traefik", namespace="rapidfort", url_params="page_size=25") - print(registry.tags.serialize()) + registry = reg.populate_tags(image="alpine", url_params="page_size=25") + + latest = registry.tags.filter_by_os(os="linux").filter_by_architecture(architecture="arm").get_latest().serialize() + digest = latest[0]['images'][0]['digest'] + digest_algorithm, digest_hash = digest.split(":") + print(digest_hash) if __name__ == "__main__": main() diff --git a/src/registry.py b/src/registry.py index d31b607..9f48e19 100644 --- a/src/registry.py +++ b/src/registry.py @@ -2,25 +2,29 @@ from typing import Self from src.upstream import Upstream from src.tags import Tags + class Registry(Upstream): """ Registry class Keeps track of the image tags pulled from Upstream """ + def __init__( - self, - url: str = "https://hub.docker.com", - username: str = None, - password: str = None, - ) -> None: + self, + url: str = "https://hub.docker.com", + username: str = None, + password: str = None, + ) -> None: """ Initialization method. """ self.tags = None Upstream.__init__(self, url=url, username=username, password=password) - def populate_tags(self, image: str, namespace: str = None, url_params: str = None) -> Self: + def populate_tags( + self, image: str, namespace: str = "library", url_params: str = None + ) -> Self: tags = self.get_tags(image=image, namespace=namespace, url_params=url_params) self.tags = Tags(tags) diff --git a/src/tags.py b/src/tags.py index 20e6163..17afbb7 100644 --- a/src/tags.py +++ b/src/tags.py @@ -1,11 +1,14 @@ from typing import Self +from copy import deepcopy from src.images import Images from src.common import Serialize + class Tags: """ Tags class object. """ + def __init__(self, tags: list) -> None: """ Initialize a tag object. @@ -31,10 +34,43 @@ class Tags: tags.append(tag.serialize()) return tags + def filter_by_os(self, os="") -> Self: + # TODO: Abstract filter functions + tags = [] + for tag in self.tags: + tags.append(tag.filter_by_os(os=os).serialize()) + return Tags(tags) + + def filter_by_os_version(self, os_version="") -> Self: + tags = [] + for tag in self.tags: + tags.append(tag.filter_by_os_version(os_version=os_version).serialize()) + return Tags(tags) + + def filter_by_architecture(self, architecture="") -> Self: + tags = [] + for tag in self.tags: + tags.append(tag.filter_by_architecture(architecture=architecture).serialize()) + return Tags(tags) + + def sort_by_image_time(self) -> Self: + self.tags.sort(key=lambda tag: tag.images.serialize()[0]['last_pushed'], reverse=True) + return self + + def get_latest(self) -> Self: + tags = deepcopy(self) + for tag in tags.tags: + tag.images = tag.images.get_latest() + tags.sort_by_image_time() + tags.tags = [tags.tags[0]] + return tags + + class Tag: """ Tag class object. """ + def __init__(self, tag: dict) -> None: """ Initialize a tag object. @@ -62,8 +98,8 @@ class Tag: """ Method to parse tag dictionary. """ - self.creator = Serialize.integer(tag.get('creator', -1)) - self.id = Serialize.integer(tag.get('id', -1)) + self.creator = Serialize.integer(tag.get("creator", -1)) + self.id = Serialize.integer(tag.get("id", -1)) self.images = Images(tag.get("images", None)) self.last_updated = Serialize.time(tag.get("last_updated", None)) self.last_updater = Serialize.integer(tag.get("last_updater", None)) @@ -86,5 +122,20 @@ class Tag: Method to serialize tag object. """ tags = self.__dict__ - tags['images'] = self.images.serialize() + tags["images"] = self.images.serialize() return tags + + def filter_by_os(self, os="") -> Self: + tag = deepcopy(self) + tag.images = self.images.filter_by_os(os=os) + return tag + + def filter_by_os_version(self, os_version="") -> Self: + tag = deepcopy(self) + tag.images = self.images.filter_by_os_version(os_version=os_version) + return tag + + def filter_by_architecture(self, architecture="") -> Self: + tag = deepcopy(self) + tag.images = self.images.filter_by_architecture(architecture=architecture) + return tag diff --git a/src/upstream.py b/src/upstream.py index 6db7a78..e5876c5 100644 --- a/src/upstream.py +++ b/src/upstream.py @@ -21,13 +21,18 @@ class Upstream: if username and password: self.req.get_token(url + "/v2/users/login", username, password) - def get_tags(self, image, namespace: str = "library", url_params: str = None) -> dict: + def get_tags( + self, image, namespace: str = "library", url_params: str = None + ) -> dict: """ Method which returns the tags page in a dictionary. """ if url_params: url = "{registry_url}/v2/namespaces/{namespace}/repositories/{repository}/tags?{url_params}".format( - registry_url=self.url, namespace=namespace, repository=image, url_params=url_params + registry_url=self.url, + namespace=namespace, + repository=image, + url_params=url_params, ) else: url = "{registry_url}/v2/namespaces/{namespace}/repositories/{repository}/tags".format(