diff --git a/.drone.yml b/.drone.yml index 6b2ef4b..1b63622 100644 --- a/.drone.yml +++ b/.drone.yml @@ -59,11 +59,8 @@ steps: REGISTRY_PASSWORD: from_secret: registry_password commands: - - trivy image --image-src remote scm.project42.io/elia/tricks:"${DRONE_COMMIT_SHA:0:8}" - - trivy image --format json --output result.json --image-src remote scm.project42.io/elia/tricks:"${DRONE_COMMIT_SHA:0:8}" - - export TIMESTAMP=$(date "+%F %T %Z") - - echo $TIMESTAMP - - oras attach --username "$REGISTRY_USERNAME" --password "$REGISTRY_PASSWORD" -a "org.opencontainers.trivy.created=$TIMESTAMP" -a "org.opencontainers.trivy.status=Passed" -a "org.opencontainers.trivy.tag=${DRONE_COMMIT_SHA:0:8}" --artifact-type application/json "scm.project42.io/elia/tricks:${DRONE_COMMIT_SHA:0:8}" result.json + - apk add python3 + - scripts/generate-scan-report -i scm.project42.io/elia/tricks -t "${DRONE_COMMIT_SHA:0:8}" -g "${DRONE_COMMIT_SHA:0:8}" when: event: exclude: @@ -90,7 +87,8 @@ steps: REGISTRY_PASSWORD: from_secret: registry_password commands: - - get-scan-report "scm.project42.io/elia/tricks:${DRONE_COMMIT_SHA:0:8}" + - apk add python3 + - scripts/check-scan-report -i scm.project42.io/elia/tricks -t "${DRONE_COMMIT_SHA:0:8}" - oras tag --username "$REGISTRY_USERNAME" --password "$REGISTRY_PASSWORD" "scm.project42.io/elia/tricks:${DRONE_COMMIT_SHA:0:8}" latest when: event: diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c18dd8d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/Dockerfile b/Dockerfile index 18a9a3c..6c9ebc1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM alpine +FROM python:alpine MAINTAINER Elia El Lazkani ARG ORAS_VERSION="1.0.0" diff --git a/scripts/args.py b/scripts/args.py new file mode 100644 index 0000000..4de6b91 --- /dev/null +++ b/scripts/args.py @@ -0,0 +1,23 @@ +import argparse + +def argument_parse() -> argparse.ArgumentParser: + """ + Method to extract the arguments from the command line. + + :returns: The argument parser. + """ + parser = argparse.ArgumentParser( + description="A tool to automate image manipulation in the pipeline.") + + parser.add_argument( + '-i', '--image', type=str, + help='The image name.') + parser.add_argument( + '-t', '--tag', type=str, + help='The image tag.') + parser.add_argument( + '-g', '--git-tag', type=str, + required=False, + help='The git tag to attach to a report.') + + return parser.parse_args() diff --git a/scripts/attach-scan-report b/scripts/attach-scan-report deleted file mode 100755 index c2989d9..0000000 --- a/scripts/attach-scan-report +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/sh - -set -xe - -TIMESTAMP=$(date "+%F %T %Z") - -image="$@" -printf "Set image to $image...\n" - -extra_vars="" - -if [ ! -z $REGISTRY_USERNAME ]; then - printf "Found registry username...\n" - extra_vars="$extra_vars --username $REGISTRY_USERNAME" -fi - -if [ ! -z $REGISTRY_PASSWORD ]; then - printf "Found registry password\n" - extra_vars="$extra_vars --password $REGISTRY_PASSWORD" -fi - -extra_vars="$extra_vars -a \"org.opencontainers.trivy.created=$TIMESTAMP\" -a \"org.opencontainers.trivy.status=Passed\" -a \"org.opencontainers.trivy.tag=${DRONE_COMMIT_SHA:0:8}\"" - -printf "Checking for result file...\n" -if [ -e result.json ]; then - printf "Result file found, attaching it to container...\n" - oras attach $extra_vars --artifact-type=application/json $image "result.json" -else - printf "Result file not found !\n" - exit 1 -fi diff --git a/scripts/check-scan-report b/scripts/check-scan-report new file mode 100755 index 0000000..0e12e70 --- /dev/null +++ b/scripts/check-scan-report @@ -0,0 +1,22 @@ +#!/usr/bin/env python +import sys +from args import argument_parse +from oras import Oras +from trivy import Trivy + +def main(): + + args = argument_parse() + + oras = Oras(args.image, args.tag) + scan_report = oras.check_scan_report() + if not scan_report: + sys.exit(1) + + trivy = Trivy(args.image, args.tag) + scan = trivy.scan_to_promote(image_src="remote") + if not scan: + sys.exit(1) + +if __name__ == '__main__': + main() diff --git a/scripts/generate-scan-report b/scripts/generate-scan-report new file mode 100755 index 0000000..6302bfb --- /dev/null +++ b/scripts/generate-scan-report @@ -0,0 +1,26 @@ +#!/usr/bin/env python +import sys +from args import argument_parse +from oras import Oras +from trivy import Trivy + +def main(): + + args = argument_parse() + + trivy = Trivy(args.image, args.tag) + scan = trivy.full_scan(image_src="remote") + if not scan: + sys.exit(1) + print("Full scan successful...") + + print("Attaching CycloneDX report to container...") + oras = Oras(args.image, args.tag) + cdx = oras.post_attached_file(git_tag=args.git_tag) + oras.clean_downloaded_file() + if not cdx: + sys.exit(1) + sys.exit(0) + +if __name__ == '__main__': + main() diff --git a/scripts/get-scan-report b/scripts/get-scan-report deleted file mode 100755 index e21f616..0000000 --- a/scripts/get-scan-report +++ /dev/null @@ -1,44 +0,0 @@ -#!/bin/sh - -set -e - -image=$@ - -image_information=$(oras discover --artifact-type application/json "$image") -#printf "$image_information\n" -printf "Found image $image...\n" - -report_digest=$(echo "$image_information" | tail -n1 | awk -F ' ' '{print $2}') -#printf "$report_digest\n" -printf "Found digests for scan report...\n" - -extra_vars="" - -if [ ! -z $REGISTRY_USERNAME ]; then - printf "Found registry username...\n" - extra_vars="$extra_vars --username $REGISTRY_USERNAME" -fi - -if [ ! -z REGISTRY_PASSWORD ]; then - printf "Found registry password\n" - extra_vars="$extra_vars --password $REGISTRY_PASSWORD" -fi - - -printf "Cleaning result file, if it already exists...\n" -if [ -e result.json ]; then - rm result.json -fi - -image_base=$(echo "$image" | awk -F ':' '{print $1}') -printf "Pulling $image_base:@$report_digest...\n" -oras pull $extra_vars $image_base:@$report_digest - -printf "Checking for result file..." -if [ -e result.json ]; then - printf "Result file found !" - exit 0 -else - printf "Result file not found !" - exit 1 -fi diff --git a/scripts/oras.py b/scripts/oras.py new file mode 100644 index 0000000..a91d034 --- /dev/null +++ b/scripts/oras.py @@ -0,0 +1,109 @@ +import os +import subprocess +from datetime import datetime + +""" +Oras class + +This class should provide the basic methods required by the command line +interface. +""" +class Oras: + """ + Oras class + """ + def __init__(self, image: str, tag: str): + self.image = image + self.tag = tag + self.registry_username = os.environ.get("REGISTRY_USERNAME", None) + self.registry_password = os.environ.get("REGISTRY_PASSWORD", None) + + def check_downloaded_file(self): + cwd = os.getcwd() + result = f"{cwd}/result.cdx" + print(f"Checking if '{result}' exists...") + return os.path.isfile(result) + + def clean_downloaded_file(self): + cwd = os.getcwd() + result = f"{cwd}/result.cdx" + print(f"Removing '{result}'...") + try: + os.remove(result) + except FileNotFoundError: + return self.error() + + return self.success() + + def os_system(self, base: str, suffix: str): + extra_vars = None + if self.registry_username and self.registry_password: + extra_vars = f"--username {self.registry_username} --password {self.registry_password}" + + cmd = f"{base} {suffix}" + if extra_vars: + cmd = f"{base} {extra_vars} {suffix}" + cmd_result = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE) + return cmd_result.stdout.decode("utf-8"), cmd_result.returncode + + def get_attached_cdx_digest(self): + base = "oras discover" + suffix = f"--artifact-type application/json {self.image}:{self.tag}" + + print(f"Retrieving attached digest {self.image}:{self.tag}...") + cmd_reply, return_code = self.os_system(base, suffix) + + if return_code != 0: + return False + + return cmd_reply.split("\n")[-2].split(" ")[-1] + + def get_attached_file(self): + base = "oras pull" + digest = self.get_attached_cdx_digest() + suffix = f"{self.image}:@{digest}" + + print(f"Downloading attached file {self.image}:@{digest}...") + cmd_reply, _ = self.os_system(base, suffix) + + return cmd_reply + + def post_attached_file(self, git_tag: str = None): + if not git_tag: + print("Error: Please provide a git tag...") + return self.error() + base = "oras attach" + suffix = f"--artifact-type application/json {self.image}:{self.tag} result.cdx" + + dt = datetime.now() + + annotations = f"-a 'org.opencontainers.trivy.created={dt.isoformat()}" + annotations = f"{annotations} -a 'org.opencontainers.trivy.status=Passed" + annotations = f"{annotations} -a 'org.opencontainers.trivy.tag={git_tag}" + + base = f"{base} {annotations}" + + print(f"Attaching result report file to digest {self.image}:{self.tag}...") + cmd_reply, return_code = self.os_system(base, suffix) + + if return_code != 0: + return self.error() + return self.success() + + def check_scan_report(self): + get_result = self.get_attached_file() + if not get_result: + return self.error() + if not self.check_downloaded_file(): + return self.error() + else: + print("Results file found...") + + self.clean_downloaded_file() + return self.success() + + def success(self): + return True + + def error(self): + return False diff --git a/scripts/trivy.py b/scripts/trivy.py new file mode 100644 index 0000000..44bf39a --- /dev/null +++ b/scripts/trivy.py @@ -0,0 +1,116 @@ +import os +import subprocess + +""" +Trivy class + +This class should provide the basic methods required by the command line +interface. +""" +class Trivy: + """ + Trivy class + """ + def __init__(self, image: str, tag: str): + self.image = image + self.tag = tag + self.registry_username = os.environ.get("REGISTRY_USERNAME", None) + self.registry_password = os.environ.get("REGISTRY_PASSWORD", None) + + def scan_critical_severity(self, image_src: str = None): + base = "trivy image" + suffix = f"{self.image}:{self.tag}" + if image_src: + base = f"{base} --image-src {image_src}" + + base = f"{base} --severity Critical --exit-code 1" + + print("Scanning for critical security vulnerabilities...") + return self.os_system(base=base, suffix=suffix) + + def full_report(self, image_src: str = None): + base = "trivy image" + suffix = f"{self.image}:{self.tag}" + if image_src: + base = f"{base} --image-src {image_src}" + + base = f"{base} --exit-code 0" + + print("Generating a full scan report...") + return self.os_system(base=base, suffix=suffix) + + def generate_cdx_report(self, image_src: str = None): + base = "trivy image" + suffix = f"{self.image}:{self.tag}" + if image_src: + base = f"{base} --image-src {image_src}" + + base = f"{base} --format cyclonedx --output result.cdx" + self.clean_cdx_report() + print("Generating a CycloneDX report...") + return self.os_system(base=base, suffix=suffix) + + def scan_to_promote(self, image_src: str = None): + output, return_code = self.scan_critical_severity(image_src=image_src) + print(output) + if return_code == 1: + return self.error() + return self.success() + + def full_scan(self, image_src: str = None): + severity_check, sc_return_code = self.scan_critical_severity(image_src=image_src) + full_report, _ = self.full_report(image_src=image_src) + + print("Printing full report...") + print(full_report) + + if sc_return_code == 1: + print("Failed security check scan...") + return self.error() + print("Passed security check scan...") + + _, _ = self.generate_cdx_report(image_src=image_src) + + result = self.get_result() + if not result: + print("Failed to generato cdx report...") + return self.error() + + print(f"Generated cdx report '{result}' successfully...") + return self.success() + + def os_system(self, base: str, suffix: str): + extra_vars = None + if self.registry_username and self.registry_password: + extra_vars = f"--username {self.registry_username} --password {self.registry_password}" + + cmd = f"{base} {suffix}" + if extra_vars: + cmd = f"{base} {extra_vars} {suffix}" + cmd_result = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return cmd_result.stdout.decode("utf-8"), cmd_result.returncode + + def clean_cdx_report(self): + cwd = os.getcwd() + result = f"{cwd}/result.cdx" + print("Cleaning up old cdx file...") + try: + os.remove(result) + except FileNotFoundError: + return self.error() + + return self.success() + + def get_result(self): + cwd = os.getcwd() + result = f"{cwd}/result.cdx" + if os.path.isfile(result): + return result + else: + return False + + def success(self): + return True + + def error(self): + return False