docker-ci/ci/ci.py

439 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool
import os
import time
import logging
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import boto3
from boto3.exceptions import S3UploadFailedError
import docker
from docker.errors import APIError
import anybadge
from selenium import webdriver
from selenium.common.exceptions import TimeoutException, WebDriverException
from jinja2 import Environment, FileSystemLoader
from pyvirtualdisplay import Display
class SetEnvs():
"""Simple helper class that sets up the ENVs"""
def __init__(self) -> None:
self.logger = logging.getLogger("SetEnvs")
# Set the optional parameters
self.dockerenv = self.convert_env(os.environ.get("DOCKER_ENV", ""))
self.webauth = os.environ.get('WEB_AUTH', 'user:password')
self.webpath = os.environ.get('WEB_PATH', '')
self.screenshot = os.environ.get('WEB_SCREENSHOT', 'false')
self.screenshot_delay = os.environ.get('WEB_SCREENSHOT_DELAY', '30')
self.port = os.environ.get('PORT', '80')
self.ssl = os.environ.get('SSL', 'false')
self.region = os.environ.get('S3_REGION', 'us-east-1')
self.bucket = os.environ.get('S3_BUCKET', 'ci-tests.linuxserver.io')
self.test_container_delay = os.environ.get('DELAY_START', '5')
self.check_env()
def convert_env(self, envs:str = None):
"""Convert env DOCKER_ENV to dictionary"""
env_dict = {}
if envs:
self.logger.info("Converting envs")
try:
if '|' in envs:
for varpair in envs.split('|'):
var = varpair.split('=')
env_dict[var[0]] = var[1]
else:
var = envs.split('=')
env_dict[var[0]] = var[1]
except Exception as error:
self.logger.exception(error)
raise Exception(f"Failed converting DOCKER_ENV: {envs} to dictionary") from error
return env_dict
def check_env(self):
"""Make sure all needed ENVs are set"""
try:
self.image = os.environ['IMAGE']
self.base = os.environ['BASE']
self.s3_key = os.environ['ACCESS_KEY']
self.s3_secret = os.environ['SECRET_KEY']
self.meta_tag = os.environ['META_TAG']
self.tags_env = os.environ['TAGS']
self.tags = []
if '|' in self.tags_env:
for tag in self.tags_env.split('|'):
self.tags.append(tag)
else:
self.tags.append(self.tags_env)
except KeyError as error:
self.logger.exception("Key %s is not set in ENV!", error)
raise Exception(f'Key {error} is not set in ENV!') from error
class CI(SetEnvs):
"""CI object to use for testing image tags.
Args:
SetEnvs (Object): Helper class that initializes and checks that all the necessary enviroment variables exists. Object is initialized upon init of CI.
"""
def __init__(self) -> None:
super().__init__() # Init the SetEnvs object.
self.logger = logging.getLogger("LSIO CI")
logging.getLogger("botocore.auth").setLevel(logging.INFO) # Don't log the S3 authentication steps.
self.client = docker.from_env()
self.session = boto3.Session()
self.tag_report_tests = {}
self.report_containers = []
self.report_status = 'PASS'
self.outdir = f'{os.path.dirname(os.path.realpath(__file__))}/output/{self.image}/{self.meta_tag}'
os.makedirs(self.outdir, exist_ok=True)
def run(self,tags: list):
"""Will iterate over all the tags running container_test() on each tag multithreaded.
Args:
tags (list): All the tags we will test on the image.
"""
thread_pool = ThreadPool(processes=10)
thread_pool.map(self.container_test,tags)
display = Display(size=(1920, 1080)) # Setup an x virtual frame buffer (Xvfb) that Selenium can use during the tests.
display.start()
thread_pool.close()
thread_pool.join()
display.stop()
def container_test(self, tag: str):
"""Main container test logic.
Args:
`tag` (str): The container tag
1. Spins up the container tag
Checks the container logs for either `[services.d] done.` or `[ls.io-init] done.`
2. Export the build version from the Container object.
3. Export the package info from the Container object.
4. Take a screenshot for the report.
"""
def _endtest(self: CI, container, tag:str , build_version:str , packages:str):
"""End the test with as much info as we have and append to the report.
Args:
container (Container): Container object
tag (str): The container tag
build_version (str): The Container build version
packages (str): Package dump from the container
"""
logblob = container.logs().decode('utf-8')
container.remove(force='true')
# Add the info to the report
self.report_containers.append({
'tag': tag,
'logs': logblob,
'sysinfo': packages,
'dotnet': bool("icu-libs" in packages),
'build_version': build_version,
'tag_tests': self.tag_report_tests[tag]
})
# Start the container
self.logger.info('Starting test of: %s', tag)
self.tag_report_tests[tag] = [] # Adds the tag key with an empty list as value to the dict
container = self.client.containers.run(f'{self.image}:{tag}',
detach=True,
environment=self.dockerenv)
# Watch the logs for no more than 5 minutes
logsfound = False
t_end = time.time() + 60 * 5
while time.time() < t_end:
try:
logblob = container.logs().decode('utf-8')
if '[services.d] done.' in logblob or '[ls.io-init] done.' in logblob:
logsfound = True
break
time.sleep(1)
except APIError as error:
self.logger.exception('Container startup failed for %s', tag)
self.tag_report_tests[tag].append(['Container startup', 'FAIL', f'INIT NOT FINISHED: {error}'])
self.report_status = 'FAIL'
_endtest(self, container, tag, 'ERROR', 'ERROR')
return
# Grab build version
try:
build_version = container.attrs['Config']['Labels']['build_version']
self.tag_report_tests[tag].append(['Get build version', 'PASS', '-'])
self.logger.info('Get build version %s: PASS', tag)
except APIError as error:
build_version = 'ERROR'
self.tag_report_tests[tag].append(['Get build version', 'FAIL', error])
self.logger.exception('Get build version %s: FAIL', tag)
self.report_status = 'FAIL'
_endtest(self, container, tag, build_version, 'ERROR')
return
# Check if the startup marker was found in the logs during the 2 minute spinup
if logsfound:
self.logger.info('Container startup completed for %s', tag)
self.tag_report_tests[tag].append(['Container startup', 'PASS', '-'])
self.logger.info('Container startup %s: PASS', tag)
else:
self.logger.warning('Container startup failed for %s', tag)
self.tag_report_tests[tag].append(['Container startup', 'FAIL','INIT NOT FINISHED'])
self.logger.error('Container startup %s: FAIL - INIT NOT FINISHED', tag)
self.report_status = 'FAIL'
_endtest(self, container, tag, build_version, 'ERROR')
return
# Dump package information
self.logger.info('Dumping package info for %s',tag)
if self.base == 'alpine':
command = 'apk info -v'
elif self.base in ('debian', 'ubuntu'):
command = 'apt list'
elif self.base == 'fedora':
command = 'rpm -qa'
elif self.base == 'arch':
command = 'pacman -Q'
try:
info = container.exec_run(command)
packages = info[1].decode('utf-8')
self.tag_report_tests[tag].append(['Dump package info', 'PASS', '-'])
self.logger.info('Dump package info %s: PASS', tag)
except (APIError, IndexError) as error:
packages = 'ERROR'
self.logger.exception(str(error))
self.tag_report_tests[tag].append(['Dump package info', 'FAIL', error])
self.logger.error('Dump package info %s: FAIL', tag)
self.report_status = 'FAIL'
_endtest(self, container, tag, build_version, packages)
return
# Screenshot web interface and check connectivity
if self.screenshot == 'true':
self.take_screenshot(container, tag)
# If all info is present end test
_endtest(self, container, tag, build_version, packages)
return
def report_render(self):
"""Render the index file for upload"""
self.logger.info('Rendering Report')
env = Environment( loader = FileSystemLoader(os.path.dirname(os.path.realpath(__file__))) )
template = env.get_template('template.html')
with open(f'{os.path.dirname(os.path.realpath(__file__))}/index.html', mode="w", encoding='utf-8') as file_:
file_.write(template.render(
report_containers=self.report_containers,
report_status=self.report_status,
meta_tag=self.meta_tag,
image=self.image,
bucket=self.bucket,
region=self.region,
screenshot=self.screenshot
))
def badge_render(self):
"""Render the badge file for upload"""
self.logger.info("Creating badge")
try:
badge = anybadge.Badge('CI', self.report_status, thresholds={
'PASS': 'green', 'FAIL': 'red'})
badge.write_badge(f'{self.outdir}/badge.svg')
with open(f'{self.outdir}/ci-status.yml', 'w', encoding='utf-8') as file:
file.write(f'CI: "{self.report_status}"')
except (ValueError,RuntimeError,FileNotFoundError,OSError) as error:
self.logger.exception(error)
def report_upload(self):
"""Upload report to S3
Raises:
Exception: S3UploadFailedError
Exception: ValueError
"""
self.logger.info('Uploading Report')
destination_dir = f'{self.image}/{self.meta_tag}'
latest_dir = f'{self.image}/latest'
s3_instance = self.session.client(
's3',
region_name=self.region,
aws_access_key_id=self.s3_key,
aws_secret_access_key=self.s3_secret)
# Index file upload
index_file = f'{os.path.dirname(os.path.realpath(__file__))}/index.html'
try:
s3_instance.upload_file(
index_file,
self.bucket,
f'{destination_dir}/index.html',
ExtraArgs={'ContentType': 'text/html', 'ACL': 'public-read'})
s3_instance.upload_file(
index_file,
self.bucket,
f'{latest_dir}/index.html',
ExtraArgs={'ContentType': 'text/html', 'ACL': 'public-read'})
except (S3UploadFailedError, ValueError) as error:
self.logger.exception('Upload Error: %s',error)
self.log_upload()
raise Exception(f'Upload Error: {error}') from error
# Loop for all others
for filename in os.listdir(self.outdir):
time.sleep(0.5)
# Set content types for files
if filename.lower().endswith('.svg'):
ctype = 'image/svg+xml'
elif filename.lower().endswith('.png'):
ctype = 'image/png'
elif filename.lower().endswith('.md'):
ctype = 'text/markdown'
elif filename.lower().endswith('.yml'):
ctype = 'text/yaml'
else:
ctype = 'text/plain'
try:
s3_instance.upload_file(
f'{self.outdir}/{filename}',
self.bucket,
f'{destination_dir}/{filename}',
ExtraArgs={'ContentType': ctype, 'ACL': 'public-read', 'CacheControl': 'no-cache'})
s3_instance.upload_file(
f'{self.outdir}/{filename}',
self.bucket,
f'{latest_dir}/{filename}',
ExtraArgs={'ContentType': ctype, 'ACL': 'public-read', 'CacheControl': 'no-cache'})
except (S3UploadFailedError, ValueError) as error:
self.logger.exception('Upload Error: %s',error)
self.log_upload()
raise Exception(f'Upload Error: {error}') from error
self.logger.info("Report available on https://ci-tests.linuxserver.io/%s/index.html",destination_dir)
def log_upload(self):
"""Upload debug.log to S3
Raises:
Exception: S3UploadFailedError
Exception: ValueError
"""
self.logger.info('Uploading logs')
destination_dir = f'{self.image}/{self.meta_tag}'
latest_dir = f'{self.image}/latest'
s3_instance = self.session.client(
's3',
region_name=self.region,
aws_access_key_id=self.s3_key,
aws_secret_access_key=self.s3_secret)
# Log file upload
try:
s3_instance.upload_file(
'/debug.log',
self.bucket,
f'{destination_dir}/debug.log',
ExtraArgs={'ContentType': 'text/plain', 'ACL': 'public-read'})
s3_instance.upload_file(
'/debug.log',
self.bucket,
f'{latest_dir}/debug.log',
ExtraArgs={'ContentType': 'text/plain', 'ACL': 'public-read'})
except (S3UploadFailedError, ValueError) as error:
self.logger.exception('Upload Error: %s',error)
def take_screenshot(self, container, tag:str):
"""Take a screenshot and save it to self.outdir
Spins up an lsiodev/tester container and takes a screenshot using Seleium.
Args:
container (Container): Container object
tag (str): The container tag we are testing.
"""
proto = 'https' if self.ssl.upper() == 'TRUE' else 'http'
# Sleep for the user specified amount of time
self.logger.info('Sleeping for %s seconds before reloading container: %s and refreshing container attrs', self.test_container_delay, container.image)
time.sleep(int(self.test_container_delay))
container.reload()
ip_adr = container.attrs['NetworkSettings']['Networks']['bridge']['IPAddress']
endpoint = f'{proto}://{self.webauth}@{ip_adr}:{self.port}{self.webpath}'
testercontainer, test_endpoint = self.start_tester(proto,endpoint,tag)
try:
driver = self.setup_driver()
driver.get(test_endpoint)
self.logger.info('Sleeping for %s seconds before creating a screenshot on %s', self.screenshot_delay, tag)
time.sleep(int(self.screenshot_delay))
self.logger.info('Taking screenshot of %s at %s', tag, endpoint)
driver.get_screenshot_as_file(f'{self.outdir}/{tag}.png')
self.tag_report_tests[tag].append(['Get screenshot', 'PASS','-'])
self.logger.info('Screenshot %s: PASS', tag)
except (requests.Timeout, requests.ConnectionError, KeyError) as error:
self.tag_report_tests[tag].append(
['Get screenshot', 'FAIL', f'CONNECTION ERROR: {error}'])
self.logger.exception('Screenshot %s FAIL CONNECTION ERROR', tag)
except TimeoutException as error:
self.tag_report_tests[tag].append(['Get screenshot', 'FAIL', f'TIMEOUT: {error}'])
self.logger.exception('Screenshot %s FAIL TIMEOUT', tag)
except (WebDriverException, Exception) as error:
self.tag_report_tests[tag].append(
['Get screenshot', 'FAIL', f'UNKNOWN: {error}'])
self.logger.exception('Screenshot %s FAIL UNKNOWN: %s', tag, error)
testercontainer.remove(force='true')
def start_tester(self, proto:str, endpoint:str, tag:str):
"""Spin up an RDP test container to load the container web ui.
Args:
proto (str): The protocol to use for the endpoint.
endpoint (str): The container endpoint to use with the tester container.
tag (str): The container tag
Returns:
Container/str: Returns the tester Container object and the tester endpoint
"""
self.logger.info("Starting tester container for tag: %s", tag)
testercontainer = self.client.containers.run('lsiodev/tester:latest',
shm_size='1G',
detach=True,
environment={'URL': endpoint})
#Sleep for the user specified amount of time
self.logger.info('Sleeping for %s seconds before reloading %s and refreshing container attrs on %s run', self.test_container_delay, testercontainer.image, tag)
time.sleep(int(self.test_container_delay))
testercontainer.reload()
testerip = testercontainer.attrs['NetworkSettings']['Networks']['bridge']['IPAddress']
testerendpoint = f'http://{testerip}:3000'
session = requests.Session()
retries = Retry(total=10, backoff_factor=2,status_forcelist=[502, 503, 504])
session.mount(proto, HTTPAdapter(max_retries=retries))
session.get(testerendpoint)
return testercontainer, testerendpoint
def setup_driver(self):
"""Return a single ChromiumDriver object the class can use
Returns:
Webdriver: Returns a Chromedriver object
"""
self.logger.info("Init Chromedriver")
# Selenium webdriver options
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920x1080')
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--disable-dev-shm-usage') # https://developers.google.com/web/tools/puppeteer/troubleshooting#tips
driver = webdriver.Chrome(options=chrome_options)
driver.set_page_load_timeout(60)
return driver