import os import subprocess """ Trivy class This class should provide the basic methods required by the command line interface. """ class Trivy: """ Trivy class """ def __init__(self, image: str, tag: str): self.image = image self.tag = tag self.registry_username = os.environ.get("REGISTRY_USERNAME", None) self.registry_password = os.environ.get("REGISTRY_PASSWORD", None) def scan_critical_severity(self, image_src: str = None): base = "trivy image" suffix = f"{self.image}:{self.tag}" if image_src: base = f"{base} --image-src {image_src}" base = f"{base} --severity Critical --exit-code 1" print("Scanning for critical security vulnerabilities...") return self.os_system(base=base, suffix=suffix) def full_report(self, image_src: str = None): base = "trivy image" suffix = f"{self.image}:{self.tag}" if image_src: base = f"{base} --image-src {image_src}" base = f"{base} --exit-code 0" print("Generating a full scan report...") return self.os_system(base=base, suffix=suffix) def generate_cdx_report(self, image_src: str = None): base = "trivy image" suffix = f"{self.image}:{self.tag}" if image_src: base = f"{base} --image-src {image_src}" base = f"{base} --format cyclonedx --output result.cdx" self.clean_cdx_report() print("Generating a CycloneDX report...") return self.os_system(base=base, suffix=suffix) def scan_to_promote(self, image_src: str = None): output, return_code = self.scan_critical_severity(image_src=image_src) print(output) if return_code == 1: return self.error() return self.success() def full_scan(self, image_src: str = None, generate_report: bool = True): severity_check, sc_return_code = self.scan_critical_severity(image_src=image_src) full_report, _ = self.full_report(image_src=image_src) print("Printing full report...") print(full_report) if sc_return_code == 1: print("Failed security check scan...") return self.error() print("Passed security check scan...") if not generate_report: return self.success() _, _ = self.generate_cdx_report(image_src=image_src) result = self.get_result() if not result: print("Failed to generato cdx report...") return self.error() print(f"Generated cdx report '{result}' successfully...") return self.success() def os_system(self, base: str, suffix: str): extra_vars = None if self.registry_username and self.registry_password: extra_vars = f"--username {self.registry_username} --password {self.registry_password}" cmd = f"{base} {suffix}" if extra_vars: cmd = f"{base} {extra_vars} {suffix}" cmd_result = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) return cmd_result.stdout.decode("utf-8"), cmd_result.returncode def clean_cdx_report(self): cwd = os.getcwd() result = f"{cwd}/result.cdx" print("Cleaning up old cdx file...") try: os.remove(result) except FileNotFoundError: return self.error() return self.success() def get_result(self): cwd = os.getcwd() result = f"{cwd}/result.cdx" if os.path.isfile(result): return result else: return False def success(self): return True def error(self): return False