109 lines
3.3 KiB
Python
109 lines
3.3 KiB
Python
import os
|
|
import subprocess
|
|
from datetime import datetime
|
|
|
|
"""
|
|
Oras class
|
|
|
|
This class should provide the basic methods required by the command line
|
|
interface.
|
|
"""
|
|
class Oras:
|
|
"""
|
|
Oras class
|
|
"""
|
|
def __init__(self, image: str, tag: str):
|
|
self.image = image
|
|
self.tag = tag
|
|
self.registry_username = os.environ.get("REGISTRY_USERNAME", None)
|
|
self.registry_password = os.environ.get("REGISTRY_PASSWORD", None)
|
|
|
|
def check_downloaded_file(self):
|
|
cwd = os.getcwd()
|
|
result = f"{cwd}/result.cdx"
|
|
print(f"Checking if '{result}' exists...")
|
|
return os.path.isfile(result)
|
|
|
|
def clean_downloaded_file(self):
|
|
cwd = os.getcwd()
|
|
result = f"{cwd}/result.cdx"
|
|
print(f"Removing '{result}'...")
|
|
try:
|
|
os.remove(result)
|
|
except FileNotFoundError:
|
|
return self.error()
|
|
|
|
return self.success()
|
|
|
|
def os_system(self, base: str, suffix: str):
|
|
extra_vars = None
|
|
if self.registry_username and self.registry_password:
|
|
extra_vars = f"--username {self.registry_username} --password {self.registry_password}"
|
|
|
|
cmd = f"{base} {suffix}"
|
|
if extra_vars:
|
|
cmd = f"{base} {extra_vars} {suffix}"
|
|
cmd_result = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE)
|
|
return cmd_result.stdout.decode("utf-8"), cmd_result.returncode
|
|
|
|
def get_attached_cdx_digest(self):
|
|
base = "oras discover"
|
|
suffix = f"--artifact-type application/json {self.image}:{self.tag}"
|
|
|
|
print(f"Retrieving attached digest {self.image}:{self.tag}...")
|
|
cmd_reply, return_code = self.os_system(base, suffix)
|
|
|
|
if return_code != 0:
|
|
return False
|
|
|
|
return cmd_reply.split("\n")[-2].split(" ")[-1]
|
|
|
|
def get_attached_file(self):
|
|
base = "oras pull"
|
|
digest = self.get_attached_cdx_digest()
|
|
suffix = f"{self.image}:@{digest}"
|
|
|
|
print(f"Downloading attached file {self.image}:@{digest}...")
|
|
cmd_reply, _ = self.os_system(base, suffix)
|
|
|
|
return cmd_reply
|
|
|
|
def post_attached_file(self, git_tag: str = None):
|
|
if not git_tag:
|
|
print("Error: Please provide a git tag...")
|
|
return self.error()
|
|
base = "oras attach"
|
|
suffix = f"--artifact-type application/json {self.image}:{self.tag} result.cdx"
|
|
|
|
dt = datetime.now()
|
|
|
|
annotations = f"-a 'org.opencontainers.trivy.created={dt.isoformat()}"
|
|
annotations = f"{annotations} -a 'org.opencontainers.trivy.status=Passed"
|
|
annotations = f"{annotations} -a 'org.opencontainers.trivy.tag={git_tag}"
|
|
|
|
base = f"{base} {annotations}"
|
|
|
|
print(f"Attaching result report file to digest {self.image}:{self.tag}...")
|
|
cmd_reply, return_code = self.os_system(base, suffix)
|
|
|
|
if return_code != 0:
|
|
return self.error()
|
|
return self.success()
|
|
|
|
def check_scan_report(self):
|
|
get_result = self.get_attached_file()
|
|
if not get_result:
|
|
return self.error()
|
|
if not self.check_downloaded_file():
|
|
return self.error()
|
|
else:
|
|
print("Results file found...")
|
|
|
|
self.clean_downloaded_file()
|
|
return self.success()
|
|
|
|
def success(self):
|
|
return True
|
|
|
|
def error(self):
|
|
return False
|