chore(): Migrating to testing python automation
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/promote/production Build is passing

This commit is contained in:
Elia el Lazkani 2023-07-05 22:56:03 +02:00
parent ca43188adf
commit f2b4f86269
10 changed files with 302 additions and 82 deletions

View file

@ -59,11 +59,8 @@ steps:
REGISTRY_PASSWORD:
from_secret: registry_password
commands:
- trivy image --image-src remote scm.project42.io/elia/tricks:"${DRONE_COMMIT_SHA:0:8}"
- trivy image --format json --output result.json --image-src remote scm.project42.io/elia/tricks:"${DRONE_COMMIT_SHA:0:8}"
- export TIMESTAMP=$(date "+%F %T %Z")
- echo $TIMESTAMP
- oras attach --username "$REGISTRY_USERNAME" --password "$REGISTRY_PASSWORD" -a "org.opencontainers.trivy.created=$TIMESTAMP" -a "org.opencontainers.trivy.status=Passed" -a "org.opencontainers.trivy.tag=${DRONE_COMMIT_SHA:0:8}" --artifact-type application/json "scm.project42.io/elia/tricks:${DRONE_COMMIT_SHA:0:8}" result.json
- apk add python3
- scripts/generate-scan-report -i scm.project42.io/elia/tricks -t "${DRONE_COMMIT_SHA:0:8}" -g "${DRONE_COMMIT_SHA:0:8}"
when:
event:
exclude:
@ -90,7 +87,8 @@ steps:
REGISTRY_PASSWORD:
from_secret: registry_password
commands:
- get-scan-report "scm.project42.io/elia/tricks:${DRONE_COMMIT_SHA:0:8}"
- apk add python3
- scripts/check-scan-report -i scm.project42.io/elia/tricks -t "${DRONE_COMMIT_SHA:0:8}"
- oras tag --username "$REGISTRY_USERNAME" --password "$REGISTRY_PASSWORD" "scm.project42.io/elia/tricks:${DRONE_COMMIT_SHA:0:8}" latest
when:
event:

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
__pycache__/

View file

@ -1,4 +1,4 @@
FROM alpine
FROM python:alpine
MAINTAINER Elia El Lazkani <git@lazkani.io>
ARG ORAS_VERSION="1.0.0"

23
scripts/args.py Normal file
View file

@ -0,0 +1,23 @@
import argparse
def argument_parse() -> argparse.ArgumentParser:
"""
Method to extract the arguments from the command line.
:returns: The argument parser.
"""
parser = argparse.ArgumentParser(
description="A tool to automate image manipulation in the pipeline.")
parser.add_argument(
'-i', '--image', type=str,
help='The image name.')
parser.add_argument(
'-t', '--tag', type=str,
help='The image tag.')
parser.add_argument(
'-g', '--git-tag', type=str,
required=False,
help='The git tag to attach to a report.')
return parser.parse_args()

View file

@ -1,31 +0,0 @@
#!/bin/sh
set -xe
TIMESTAMP=$(date "+%F %T %Z")
image="$@"
printf "Set image to $image...\n"
extra_vars=""
if [ ! -z $REGISTRY_USERNAME ]; then
printf "Found registry username...\n"
extra_vars="$extra_vars --username $REGISTRY_USERNAME"
fi
if [ ! -z $REGISTRY_PASSWORD ]; then
printf "Found registry password\n"
extra_vars="$extra_vars --password $REGISTRY_PASSWORD"
fi
extra_vars="$extra_vars -a \"org.opencontainers.trivy.created=$TIMESTAMP\" -a \"org.opencontainers.trivy.status=Passed\" -a \"org.opencontainers.trivy.tag=${DRONE_COMMIT_SHA:0:8}\""
printf "Checking for result file...\n"
if [ -e result.json ]; then
printf "Result file found, attaching it to container...\n"
oras attach $extra_vars --artifact-type=application/json $image "result.json"
else
printf "Result file not found !\n"
exit 1
fi

22
scripts/check-scan-report Executable file
View file

@ -0,0 +1,22 @@
#!/usr/bin/env python
import sys
from args import argument_parse
from oras import Oras
from trivy import Trivy
def main():
args = argument_parse()
oras = Oras(args.image, args.tag)
scan_report = oras.check_scan_report()
if not scan_report:
sys.exit(1)
trivy = Trivy(args.image, args.tag)
scan = trivy.scan_to_promote(image_src="remote")
if not scan:
sys.exit(1)
if __name__ == '__main__':
main()

26
scripts/generate-scan-report Executable file
View file

@ -0,0 +1,26 @@
#!/usr/bin/env python
import sys
from args import argument_parse
from oras import Oras
from trivy import Trivy
def main():
args = argument_parse()
trivy = Trivy(args.image, args.tag)
scan = trivy.full_scan(image_src="remote")
if not scan:
sys.exit(1)
print("Full scan successful...")
print("Attaching CycloneDX report to container...")
oras = Oras(args.image, args.tag)
cdx = oras.post_attached_file(git_tag=args.git_tag)
oras.clean_downloaded_file()
if not cdx:
sys.exit(1)
sys.exit(0)
if __name__ == '__main__':
main()

View file

@ -1,44 +0,0 @@
#!/bin/sh
set -e
image=$@
image_information=$(oras discover --artifact-type application/json "$image")
#printf "$image_information\n"
printf "Found image $image...\n"
report_digest=$(echo "$image_information" | tail -n1 | awk -F ' ' '{print $2}')
#printf "$report_digest\n"
printf "Found digests for scan report...\n"
extra_vars=""
if [ ! -z $REGISTRY_USERNAME ]; then
printf "Found registry username...\n"
extra_vars="$extra_vars --username $REGISTRY_USERNAME"
fi
if [ ! -z REGISTRY_PASSWORD ]; then
printf "Found registry password\n"
extra_vars="$extra_vars --password $REGISTRY_PASSWORD"
fi
printf "Cleaning result file, if it already exists...\n"
if [ -e result.json ]; then
rm result.json
fi
image_base=$(echo "$image" | awk -F ':' '{print $1}')
printf "Pulling $image_base:@$report_digest...\n"
oras pull $extra_vars $image_base:@$report_digest
printf "Checking for result file..."
if [ -e result.json ]; then
printf "Result file found !"
exit 0
else
printf "Result file not found !"
exit 1
fi

109
scripts/oras.py Normal file
View file

@ -0,0 +1,109 @@
import os
import subprocess
from datetime import datetime
"""
Oras class
This class should provide the basic methods required by the command line
interface.
"""
class Oras:
"""
Oras class
"""
def __init__(self, image: str, tag: str):
self.image = image
self.tag = tag
self.registry_username = os.environ.get("REGISTRY_USERNAME", None)
self.registry_password = os.environ.get("REGISTRY_PASSWORD", None)
def check_downloaded_file(self):
cwd = os.getcwd()
result = f"{cwd}/result.cdx"
print(f"Checking if '{result}' exists...")
return os.path.isfile(result)
def clean_downloaded_file(self):
cwd = os.getcwd()
result = f"{cwd}/result.cdx"
print(f"Removing '{result}'...")
try:
os.remove(result)
except FileNotFoundError:
return self.error()
return self.success()
def os_system(self, base: str, suffix: str):
extra_vars = None
if self.registry_username and self.registry_password:
extra_vars = f"--username {self.registry_username} --password {self.registry_password}"
cmd = f"{base} {suffix}"
if extra_vars:
cmd = f"{base} {extra_vars} {suffix}"
cmd_result = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE)
return cmd_result.stdout.decode("utf-8"), cmd_result.returncode
def get_attached_cdx_digest(self):
base = "oras discover"
suffix = f"--artifact-type application/json {self.image}:{self.tag}"
print(f"Retrieving attached digest {self.image}:{self.tag}...")
cmd_reply, return_code = self.os_system(base, suffix)
if return_code != 0:
return False
return cmd_reply.split("\n")[-2].split(" ")[-1]
def get_attached_file(self):
base = "oras pull"
digest = self.get_attached_cdx_digest()
suffix = f"{self.image}:@{digest}"
print(f"Downloading attached file {self.image}:@{digest}...")
cmd_reply, _ = self.os_system(base, suffix)
return cmd_reply
def post_attached_file(self, git_tag: str = None):
if not git_tag:
print("Error: Please provide a git tag...")
return self.error()
base = "oras attach"
suffix = f"--artifact-type application/json {self.image}:{self.tag} result.cdx"
dt = datetime.now()
annotations = f"-a 'org.opencontainers.trivy.created={dt.isoformat()}"
annotations = f"{annotations} -a 'org.opencontainers.trivy.status=Passed"
annotations = f"{annotations} -a 'org.opencontainers.trivy.tag={git_tag}"
base = f"{base} {annotations}"
print(f"Attaching result report file to digest {self.image}:{self.tag}...")
cmd_reply, return_code = self.os_system(base, suffix)
if return_code != 0:
return self.error()
return self.success()
def check_scan_report(self):
get_result = self.get_attached_file()
if not get_result:
return self.error()
if not self.check_downloaded_file():
return self.error()
else:
print("Results file found...")
self.clean_downloaded_file()
return self.success()
def success(self):
return True
def error(self):
return False

116
scripts/trivy.py Normal file
View file

@ -0,0 +1,116 @@
import os
import subprocess
"""
Trivy class
This class should provide the basic methods required by the command line
interface.
"""
class Trivy:
"""
Trivy class
"""
def __init__(self, image: str, tag: str):
self.image = image
self.tag = tag
self.registry_username = os.environ.get("REGISTRY_USERNAME", None)
self.registry_password = os.environ.get("REGISTRY_PASSWORD", None)
def scan_critical_severity(self, image_src: str = None):
base = "trivy image"
suffix = f"{self.image}:{self.tag}"
if image_src:
base = f"{base} --image-src {image_src}"
base = f"{base} --severity Critical --exit-code 1"
print("Scanning for critical security vulnerabilities...")
return self.os_system(base=base, suffix=suffix)
def full_report(self, image_src: str = None):
base = "trivy image"
suffix = f"{self.image}:{self.tag}"
if image_src:
base = f"{base} --image-src {image_src}"
base = f"{base} --exit-code 0"
print("Generating a full scan report...")
return self.os_system(base=base, suffix=suffix)
def generate_cdx_report(self, image_src: str = None):
base = "trivy image"
suffix = f"{self.image}:{self.tag}"
if image_src:
base = f"{base} --image-src {image_src}"
base = f"{base} --format cyclonedx --output result.cdx"
self.clean_cdx_report()
print("Generating a CycloneDX report...")
return self.os_system(base=base, suffix=suffix)
def scan_to_promote(self, image_src: str = None):
output, return_code = self.scan_critical_severity(image_src=image_src)
print(output)
if return_code == 1:
return self.error()
return self.success()
def full_scan(self, image_src: str = None):
severity_check, sc_return_code = self.scan_critical_severity(image_src=image_src)
full_report, _ = self.full_report(image_src=image_src)
print("Printing full report...")
print(full_report)
if sc_return_code == 1:
print("Failed security check scan...")
return self.error()
print("Passed security check scan...")
_, _ = self.generate_cdx_report(image_src=image_src)
result = self.get_result()
if not result:
print("Failed to generato cdx report...")
return self.error()
print(f"Generated cdx report '{result}' successfully...")
return self.success()
def os_system(self, base: str, suffix: str):
extra_vars = None
if self.registry_username and self.registry_password:
extra_vars = f"--username {self.registry_username} --password {self.registry_password}"
cmd = f"{base} {suffix}"
if extra_vars:
cmd = f"{base} {extra_vars} {suffix}"
cmd_result = subprocess.run(cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return cmd_result.stdout.decode("utf-8"), cmd_result.returncode
def clean_cdx_report(self):
cwd = os.getcwd()
result = f"{cwd}/result.cdx"
print("Cleaning up old cdx file...")
try:
os.remove(result)
except FileNotFoundError:
return self.error()
return self.success()
def get_result(self):
cwd = os.getcwd()
result = f"{cwd}/result.cdx"
if os.path.isfile(result):
return result
else:
return False
def success(self):
return True
def error(self):
return False