Elia el Lazkani
9d81a323d0
- Adds quick scan capability for scanning without pushing the image - Automates pulling Oras' latest version
119 lines
3.6 KiB
Python
119 lines
3.6 KiB
Python
import os
|
|
import subprocess
|
|
|
|
"""
|
|
Trivy class
|
|
|
|
This class should provide the basic methods required by the command line
|
|
interface.
|
|
"""
|
|
class Trivy:
|
|
"""
|
|
Trivy class
|
|
"""
|
|
def __init__(self, image: str, tag: str):
|
|
self.image = image
|
|
self.tag = tag
|
|
self.registry_username = os.environ.get("REGISTRY_USERNAME", None)
|
|
self.registry_password = os.environ.get("REGISTRY_PASSWORD", None)
|
|
|
|
def scan_critical_severity(self, image_src: str = None):
|
|
base = "trivy image"
|
|
suffix = f"{self.image}:{self.tag}"
|
|
if image_src:
|
|
base = f"{base} --image-src {image_src}"
|
|
|
|
base = f"{base} --severity Critical --exit-code 1"
|
|
|
|
print("Scanning for critical security vulnerabilities...")
|
|
return self.os_system(base=base, suffix=suffix)
|
|
|
|
def full_report(self, image_src: str = None):
|
|
base = "trivy image"
|
|
suffix = f"{self.image}:{self.tag}"
|
|
if image_src:
|
|
base = f"{base} --image-src {image_src}"
|
|
|
|
base = f"{base} --exit-code 0"
|
|
|
|
print("Generating a full scan report...")
|
|
return self.os_system(base=base, suffix=suffix)
|
|
|
|
def generate_cdx_report(self, image_src: str = None):
|
|
base = "trivy image"
|
|
suffix = f"{self.image}:{self.tag}"
|
|
if image_src:
|
|
base = f"{base} --image-src {image_src}"
|
|
|
|
base = f"{base} --format cyclonedx --output result.cdx"
|
|
self.clean_cdx_report()
|
|
print("Generating a CycloneDX report...")
|
|
return self.os_system(base=base, suffix=suffix)
|
|
|
|
def scan_to_promote(self, image_src: str = None):
|
|
output, return_code = self.scan_critical_severity(image_src=image_src)
|
|
print(output)
|
|
if return_code == 1:
|
|
return self.error()
|
|
return self.success()
|
|
|
|
def full_scan(self, image_src: str = None, generate_report: bool = True):
|
|
severity_check, sc_return_code = self.scan_critical_severity(image_src=image_src)
|
|
full_report, _ = self.full_report(image_src=image_src)
|
|
|
|
print("Printing full report...")
|
|
print(full_report)
|
|
|
|
if sc_return_code == 1:
|
|
print("Failed security check scan...")
|
|
return self.error()
|
|
print("Passed security check scan...")
|
|
|
|
if not generate_report:
|
|
return self.success()
|
|
|
|
_, _ = self.generate_cdx_report(image_src=image_src)
|
|
|
|
result = self.get_result()
|
|
if not result:
|
|
print("Failed to generato cdx report...")
|
|
return self.error()
|
|
|
|
print(f"Generated cdx report '{result}' successfully...")
|
|
return self.success()
|
|
|
|
def os_system(self, base: str, suffix: str):
|
|
extra_vars = None
|
|
if self.registry_username and self.registry_password:
|
|
extra_vars = f"--username {self.registry_username} --password {self.registry_password}"
|
|
|
|
cmd = f"{base} {suffix}"
|
|
if extra_vars:
|
|
cmd = f"{base} {extra_vars} {suffix}"
|
|
cmd_result = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
return cmd_result.stdout.decode("utf-8"), cmd_result.returncode
|
|
|
|
def clean_cdx_report(self):
|
|
cwd = os.getcwd()
|
|
result = f"{cwd}/result.cdx"
|
|
print("Cleaning up old cdx file...")
|
|
try:
|
|
os.remove(result)
|
|
except FileNotFoundError:
|
|
return self.error()
|
|
|
|
return self.success()
|
|
|
|
def get_result(self):
|
|
cwd = os.getcwd()
|
|
result = f"{cwd}/result.cdx"
|
|
if os.path.isfile(result):
|
|
return result
|
|
else:
|
|
return False
|
|
|
|
def success(self):
|
|
return True
|
|
|
|
def error(self):
|
|
return False
|