mirror of
https://github.com/linuxserver/docker-ci.git
synced 2026-02-05 11:08:54 +08:00
- Replace the old package dump with SBOM output. - Move all created files to be in /ci/output - Update template for the new sbom output.
568 lines
26 KiB
Python
Executable File
568 lines
26 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from multiprocessing.pool import ThreadPool
|
|
from threading import current_thread
|
|
import os
|
|
import shutil
|
|
import time
|
|
import logging
|
|
import mimetypes
|
|
import requests
|
|
import json
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
import boto3
|
|
from boto3.exceptions import S3UploadFailedError
|
|
from botocore.exceptions import ClientError
|
|
import docker
|
|
from docker.errors import APIError,ContainerError,ImageNotFound
|
|
from docker.models.containers import Container
|
|
import anybadge
|
|
from ansi2html import Ansi2HTMLConverter
|
|
from selenium import webdriver
|
|
from selenium.common.exceptions import TimeoutException, WebDriverException
|
|
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
|
from pyvirtualdisplay import Display
|
|
|
|
|
|
class SetEnvs():
|
|
"""Simple helper class that sets up the ENVs"""
|
|
def __init__(self) -> None:
|
|
self.logger = logging.getLogger("SetEnvs")
|
|
|
|
os.environ['S6_VERBOSITY'] = os.environ.get("CI_S6_VERBOSITY","2")
|
|
# Set the optional parameters
|
|
self.dockerenv = self.convert_env(os.environ.get("DOCKER_ENV", ""))
|
|
self.webauth = os.environ.get('WEB_AUTH', 'user:password')
|
|
self.webpath = os.environ.get('WEB_PATH', '')
|
|
self.screenshot = os.environ.get('WEB_SCREENSHOT', 'false')
|
|
self.screenshot_delay = os.environ.get('WEB_SCREENSHOT_DELAY', '30')
|
|
self.logs_delay = os.environ.get('DOCKER_LOGS_DELAY', '300')
|
|
self.port = os.environ.get('PORT', '80')
|
|
self.ssl = os.environ.get('SSL', 'false')
|
|
self.region = os.environ.get('S3_REGION', 'us-east-1')
|
|
self.bucket = os.environ.get('S3_BUCKET', 'ci-tests.linuxserver.io')
|
|
self.test_container_delay = os.environ.get('DELAY_START', '5')
|
|
self.check_env()
|
|
|
|
|
|
def convert_env(self, envs:str = None) -> dict:
|
|
"""Convert env DOCKER_ENV to dictionary"""
|
|
env_dict = {}
|
|
if envs:
|
|
self.logger.info("Converting envs")
|
|
try:
|
|
if '|' in envs:
|
|
for varpair in envs.split('|'):
|
|
var = varpair.split('=')
|
|
env_dict[var[0]] = var[1]
|
|
else:
|
|
var = envs.split('=')
|
|
env_dict[var[0]] = var[1]
|
|
env_dict["S6_VERBOSITY"] = os.environ.get('S6_VERBOSITY')
|
|
except Exception as error:
|
|
self.logger.exception("Failed to convert DOCKER_ENV: %s to dictionary!", envs)
|
|
raise CIError(f"Failed converting DOCKER_ENV: {envs} to dictionary") from error
|
|
return env_dict
|
|
|
|
|
|
def check_env(self) -> None:
|
|
"""Make sure all needed ENVs are set"""
|
|
try:
|
|
self.image = os.environ['IMAGE']
|
|
self.base = os.environ['BASE']
|
|
self.s3_key = os.environ['ACCESS_KEY']
|
|
self.s3_secret = os.environ['SECRET_KEY']
|
|
self.meta_tag = os.environ['META_TAG']
|
|
self.tags_env = os.environ['TAGS']
|
|
except KeyError as error:
|
|
self.logger.exception("Key is not set in ENV!")
|
|
raise CIError(f'Key {error} is not set in ENV!') from error
|
|
|
|
|
|
class CI(SetEnvs):
|
|
"""CI object to use for testing image tags.
|
|
|
|
Args:
|
|
SetEnvs (Object): Helper class that initializes and checks that all the necessary enviroment variables exists. Object is initialized upon init of CI.
|
|
"""
|
|
def __init__(self) -> None:
|
|
super().__init__() # Init the SetEnvs object.
|
|
self.logger = logging.getLogger("LSIO CI")
|
|
logging.getLogger("botocore.auth").setLevel(logging.INFO) # Don't log the S3 authentication steps.
|
|
|
|
self.client = docker.from_env()
|
|
self.tags = list(self.tags_env.split('|'))
|
|
self.tag_report_tests = {tag: {'test':{}} for tag in self.tags} # Adds all the tags as keys with an empty dict as value to the dict
|
|
self.report_containers: dict[str,dict] = {}
|
|
self.report_status = 'PASS'
|
|
self.outdir = f'{os.path.dirname(os.path.realpath(__file__))}/output/{self.image}/{self.meta_tag}'
|
|
os.makedirs(self.outdir, exist_ok=True)
|
|
self.s3_client = boto3.Session().client(
|
|
's3',
|
|
region_name=self.region,
|
|
aws_access_key_id=self.s3_key,
|
|
aws_secret_access_key=self.s3_secret)
|
|
|
|
def run(self,tags: list) -> None:
|
|
"""Will iterate over all the tags running container_test() on each tag, multithreaded.
|
|
|
|
Also does a pull of the linuxserver/tester:latest image before running container_test.
|
|
|
|
Args:
|
|
`tags` (list): All the tags we will test on the image.
|
|
|
|
"""
|
|
self.logger.info("Pulling ghcr.io/linuxserver/tester:latest")
|
|
self.client.images.pull(repository="ghcr.io/linuxserver/tester", tag="latest") # Pulls latest tester image.
|
|
thread_pool = ThreadPool(processes=10)
|
|
thread_pool.map(self.container_test,tags)
|
|
display = Display(size=(1920, 1080)) # Setup an x virtual frame buffer (Xvfb) that Selenium can use during the tests.
|
|
display.start()
|
|
thread_pool.close()
|
|
thread_pool.join()
|
|
display.stop()
|
|
|
|
|
|
def container_test(self, tag: str) -> None:
|
|
"""Main container test logic.
|
|
|
|
Args:
|
|
`tag` (str): The container tag
|
|
|
|
1. Spins up the container tag
|
|
Checks the container logs for either `[services.d] done.` or `[ls.io-init] done.`
|
|
2. Export the build version from the Container object.
|
|
3. Export the package info from the Container object.
|
|
4. Take a screenshot for the report.
|
|
5. Add report information to report.json.
|
|
"""
|
|
# Name the thread for easier debugging.
|
|
if "amd" in tag or "arm" in tag:
|
|
current_thread().name = f"{tag[:5].upper()}Thread"
|
|
|
|
# Start the container
|
|
self.logger.info('Starting test of: %s', tag)
|
|
container: Container = self.client.containers.run(f'{self.image}:{tag}',
|
|
detach=True,
|
|
environment=self.dockerenv)
|
|
container_config = container.attrs["Config"]["Env"]
|
|
self.logger.info("Container config of tag %s: %s",tag,container_config)
|
|
|
|
|
|
logsfound = self.watch_container_logs(container, tag) # Watch the logs for no more than 5 minutes
|
|
if not logsfound:
|
|
self._endtest(container, tag, "ERROR", "ERROR", False)
|
|
return
|
|
|
|
build_version = self.get_build_version(container,tag) # Get the image build version
|
|
if build_version == "ERROR":
|
|
self._endtest(container, tag, build_version, "ERROR", False)
|
|
return
|
|
|
|
sbom = self.generate_sbom(tag)
|
|
if sbom == "ERROR":
|
|
self._endtest(container, tag, build_version, sbom, False)
|
|
return
|
|
|
|
# Screenshot web interface and check connectivity
|
|
if self.screenshot == 'true':
|
|
self.take_screenshot(container, tag)
|
|
|
|
self._endtest(container, tag, build_version, sbom, True)
|
|
self.logger.info("Testing of %s PASSED", tag)
|
|
return
|
|
|
|
def _endtest(self: 'CI', container:Container, tag:str, build_version:str, packages:str, test_success: bool) -> None:
|
|
"""End the test with as much info as we have and append to the report.
|
|
|
|
Args:
|
|
`container` (Container): Container object
|
|
`tag` (str): The container tag
|
|
`build_version` (str): The Container build version
|
|
`packages` (str): SBOM dump from the container
|
|
`test_success` (bool): If the testing of the container failed or not
|
|
"""
|
|
logblob = container.logs().decode('utf-8')
|
|
self.create_html_ansi_file(logblob, tag, "log") # Generate html container log file based on the latest logs
|
|
try:
|
|
container.remove(force='true')
|
|
except APIError:
|
|
self.logger.exception("Failed to remove container %s",tag)
|
|
warning_texts = {
|
|
"dotnet": "May be a .NET app. Service might not start on ARM32 with QEMU",
|
|
"uwsgi": "This image uses uWSGI and might not start on ARM/QEMU"
|
|
}
|
|
# Add the info to the report
|
|
self.report_containers[tag] = {
|
|
'logs': logblob,
|
|
'sysinfo': packages,
|
|
'warnings': {
|
|
'dotnet': warning_texts["dotnet"] if "icu-libs" in packages and "arm32" in tag else "",
|
|
'uwsgi': warning_texts["uwsgi"] if "uwsgi" in packages and "arm" in tag else ""
|
|
},
|
|
'build_version': build_version,
|
|
'test_results': self.tag_report_tests[tag]['test'],
|
|
'test_success': test_success,
|
|
}
|
|
self.report_containers[tag]["has_warnings"] = any(warning[1] for warning in self.report_containers[tag]["warnings"].items())
|
|
|
|
def export_package_info(self, container:Container, tag:str) -> str:
|
|
"""Dump the package info into a string for the report
|
|
|
|
Args:
|
|
container (Container): The container we are testing
|
|
tag (str): The tag we are testing
|
|
|
|
Returns:
|
|
str: Return the output of the dump command or 'ERROR'
|
|
"""
|
|
# Dump package information
|
|
dump_commands = {
|
|
'alpine': 'apk info -v',
|
|
'debian': 'apt list',
|
|
'ubuntu': 'apt list',
|
|
'fedora': 'rpm -qa',
|
|
'arch': 'pacman -Q'
|
|
}
|
|
try:
|
|
self.logger.info('Dumping package info for %s',tag)
|
|
info = container.exec_run(dump_commands[self.base])
|
|
packages = info[1].decode('utf-8')
|
|
if info[0] != 0:
|
|
raise CIError(f"Failed to dump packages. Output: {packages}")
|
|
self.tag_report_tests[tag]['test']['Dump package info'] = (dict(sorted({
|
|
'status':'PASS',
|
|
'message':'-'}.items())))
|
|
self.logger.info('Dump package info %s: PASS', tag)
|
|
except (APIError, IndexError,CIError) as error:
|
|
packages = 'ERROR'
|
|
self.logger.exception('Dumping package info on %s: FAIL', tag)
|
|
self.tag_report_tests[tag]['test']['Dump package info'] = (dict(sorted({
|
|
'Dump package info':'FAIL',
|
|
'message':str(error)}.items())))
|
|
self.report_status = 'FAIL'
|
|
return packages
|
|
|
|
def generate_sbom(self, tag:str) -> str:
|
|
"""Generate the SBOM for the image tag.
|
|
|
|
Creates the output file in `{self.outdir}/{tag}.sbom.html`
|
|
|
|
Args:
|
|
tag (str): The tag we are testing
|
|
|
|
Returns:
|
|
bool: Return the output if successful otherwise "ERROR".
|
|
"""
|
|
syft:Container = self.client.containers.run(image="anchore/syft:latest",command=f"{self.image}:{tag}", detach=True)
|
|
self.logger.info('Creating SBOM package list on %s',tag)
|
|
|
|
t_end = time.time() + int(self.logs_delay)
|
|
self.logger.info("Tailing the syft container logs for %s seconds looking the 'VERSION' message on tag: %s",self.logs_delay,tag)
|
|
while time.time() < t_end:
|
|
time.sleep(5)
|
|
try:
|
|
logblob = syft.logs().decode('utf-8')
|
|
if 'VERSION' in logblob:
|
|
self.logger.info('Get package versions for %s completed', tag)
|
|
self.tag_report_tests[tag]['test']['Create SBOM'] = (dict(sorted({
|
|
'status':'PASS',
|
|
'message':'-'}.items())))
|
|
self.logger.info('Create SBOM package list %s: PASS', tag)
|
|
self.create_html_ansi_file(str(logblob),tag,"sbom")
|
|
return logblob
|
|
except (APIError,ContainerError,ImageNotFound) as error:
|
|
self.logger.exception('Creating SBOM package list on %s: FAIL', tag)
|
|
self.tag_report_tests[tag]['test']['Create SBOM'] = (dict(sorted({
|
|
'Create SBOM':'FAIL',
|
|
'message':str(error)}.items())))
|
|
self.report_status = 'FAIL'
|
|
try:
|
|
syft.remove(force=True)
|
|
except Exception:
|
|
self.logger.exception("Failed to remove the syft container, %s",tag)
|
|
return "ERROR"
|
|
|
|
def get_build_version(self,container:Container,tag:str) -> str:
|
|
"""Fetch the build version from the container object attributes.
|
|
|
|
Args:
|
|
container (Container): The container we are testing
|
|
tag (str): The current tag we are testing
|
|
|
|
Returns:
|
|
str: Returns the build version or 'ERROR'
|
|
"""
|
|
try:
|
|
self.logger.info("Fetching build version on tag: %s",tag)
|
|
build_version = container.attrs['Config']['Labels']['build_version']
|
|
self.tag_report_tests[tag]['test']['Get build version'] = (dict(sorted({
|
|
'status':'PASS',
|
|
'message':'-'}.items())))
|
|
self.logger.info('Get build version on tag "%s": PASS', tag)
|
|
except (APIError,KeyError) as error:
|
|
self.logger.exception('Get build version on tag "%s": FAIL', tag)
|
|
build_version = 'ERROR'
|
|
if isinstance(error,KeyError):
|
|
error = f"KeyError: {error}"
|
|
self.tag_report_tests[tag]['test']['Get build version'] = (dict(sorted({
|
|
'status':'FAIL',
|
|
'message':str(error)}.items())))
|
|
self.report_status = 'FAIL'
|
|
return build_version
|
|
|
|
def watch_container_logs(self, container:Container, tag:str) -> bool:
|
|
"""Tail the container logs for 5 minutes and look for the init done message that tells us the container started up
|
|
successfully.
|
|
|
|
Args:
|
|
container (Container): The container we are testing
|
|
tag (str): The tag we are testing
|
|
|
|
Returns:
|
|
bool: Return True if the 'done' message is found, otherwise False.
|
|
"""
|
|
t_end = time.time() + int(self.logs_delay)
|
|
self.logger.info("Tailing the %s logs for %s seconds looking for the 'done' message", tag, self.logs_delay)
|
|
while time.time() < t_end:
|
|
try:
|
|
logblob = container.logs().decode('utf-8')
|
|
if '[services.d] done.' in logblob or '[ls.io-init] done.' in logblob:
|
|
self.logger.info('Container startup completed for %s', tag)
|
|
self.tag_report_tests[tag]['test']['Container startup'] = (dict(sorted({
|
|
'status':'PASS',
|
|
'message':'-'}.items())))
|
|
self.logger.info('Container startup %s: PASS', tag)
|
|
return True
|
|
time.sleep(1)
|
|
except APIError as error:
|
|
self.logger.exception('Container startup %s: FAIL - INIT NOT FINISHED', tag)
|
|
self.tag_report_tests[tag]['test']['Container startup'] = (dict(sorted({
|
|
'status':'FAIL',
|
|
'message': f'INIT NOT FINISHED: {str(error)}'
|
|
}.items())))
|
|
self.report_status = 'FAIL'
|
|
return False
|
|
self.logger.error('Container startup failed for %s', tag)
|
|
self.tag_report_tests[tag]['test']['Container startup'] = (dict(sorted({
|
|
'status':'FAIL',
|
|
'message':'INIT NOT FINISHED'}.items())))
|
|
self.logger.error('Container startup %s: FAIL - INIT NOT FINISHED', tag)
|
|
self.report_status = 'FAIL'
|
|
return False
|
|
|
|
def report_render(self) -> None:
|
|
"""Render the index file for upload"""
|
|
self.logger.info('Rendering Report')
|
|
env = Environment(autoescape=select_autoescape(enabled_extensions=('html', 'xml'),default_for_string=True),
|
|
loader = FileSystemLoader(os.path.dirname(os.path.realpath(__file__))) )
|
|
template = env.get_template('template.html')
|
|
self.report_containers = json.loads(json.dumps(self.report_containers,sort_keys=True))
|
|
with open(f'{self.outdir}/index.html', mode="w", encoding='utf-8') as file_:
|
|
file_.write(template.render(
|
|
report_containers=self.report_containers,
|
|
report_status=self.report_status,
|
|
meta_tag=self.meta_tag,
|
|
image=self.image,
|
|
bucket=self.bucket,
|
|
region=self.region,
|
|
screenshot=self.screenshot
|
|
))
|
|
|
|
def badge_render(self) -> None:
|
|
"""Render the badge file for upload"""
|
|
self.logger.info("Creating badge")
|
|
try:
|
|
badge = anybadge.Badge('CI', self.report_status, thresholds={
|
|
'PASS': 'green', 'FAIL': 'red'})
|
|
badge.write_badge(f'{self.outdir}/badge.svg')
|
|
with open(f'{self.outdir}/ci-status.yml', 'w', encoding='utf-8') as file:
|
|
file.write(f'CI: "{self.report_status}"')
|
|
except (ValueError,RuntimeError,FileNotFoundError,OSError):
|
|
self.logger.exception("Failed to render badge file!")
|
|
|
|
def json_render(self) -> None:
|
|
"""Create a JSON file of the report data."""
|
|
self.logger.info("Creating report.json file")
|
|
try:
|
|
with open(f'{self.outdir}/report.json', mode="w", encoding='utf-8') as file:
|
|
json.dump(self.report_containers, file, indent=2, sort_keys=True)
|
|
except (OSError,FileNotFoundError,TypeError,Exception):
|
|
self.logger.exception("Failed to render JSON file!")
|
|
|
|
def report_upload(self) -> None:
|
|
"""Upload report files to S3
|
|
|
|
Raises:
|
|
Exception: S3UploadFailedError
|
|
Exception: ValueError
|
|
Exception: ClientError
|
|
"""
|
|
self.logger.info('Uploading report files')
|
|
try:
|
|
shutil.copyfile(f'{os.path.dirname(os.path.realpath(__file__))}/404.jpg', f'{self.outdir}/404.jpg')
|
|
except Exception:
|
|
self.logger.exception("Failed to copy 404 file!")
|
|
# Loop through files in outdir and upload
|
|
for filename in os.listdir(self.outdir):
|
|
time.sleep(0.5)
|
|
ctype = mimetypes.guess_type(filename.lower(), strict=False)
|
|
ctype = {'ContentType': ctype[0] if ctype[0] else 'text/plain', 'ACL': 'public-read', 'CacheControl': 'no-cache'} # Set content types for files
|
|
try:
|
|
self.upload_file(f'{self.outdir}/{filename}', filename, ctype)
|
|
except (S3UploadFailedError, ValueError, ClientError) as error:
|
|
self.logger.exception('Upload Error!')
|
|
self.log_upload()
|
|
raise CIError(f'Upload Error: {error}') from error
|
|
self.logger.info('Report available on https://ci-tests.linuxserver.io/%s/index.html', f'{self.image}/{self.meta_tag}')
|
|
|
|
def create_html_ansi_file(self, blob:str, tag:str, name:str) -> None:
|
|
"""Creates an HTML file in the 'self.outdir' directory that we upload to S3
|
|
|
|
Args:
|
|
blob (str): The blob you want to convert
|
|
tag (str): The tag we are testing
|
|
name (str): The name of the file. File name will be `{tag}.{name}.html`
|
|
"""
|
|
try:
|
|
self.logger.info(f"Creating {tag}.{name}.html")
|
|
converter = Ansi2HTMLConverter()
|
|
html_logs = converter.convert(blob)
|
|
with open(f'{self.outdir}/{tag}.{name}.html', 'w', encoding='utf-8') as file:
|
|
file.write(html_logs)
|
|
except Exception:
|
|
self.logger.exception("Failed to create %s.%s.html", tag,name)
|
|
|
|
def upload_file(self, file_path:str, object_name:str, content_type:dict) -> None:
|
|
"""Upload a file to an S3 bucket
|
|
|
|
Args:
|
|
`file_path` (str): File to upload
|
|
`bucket` (str): Bucket to upload to
|
|
`object_name` (str): S3 object name.
|
|
"""
|
|
self.logger.info('Uploading %s to %s bucket',file_path, self.bucket)
|
|
destination_dir = f'{self.image}/{self.meta_tag}'
|
|
latest_dir = f'{self.image}/latest'
|
|
self.s3_client.upload_file(file_path, self.bucket, f'{destination_dir}/{object_name}', ExtraArgs=content_type)
|
|
self.s3_client.upload_file(file_path, self.bucket, f'{latest_dir}/{object_name}', ExtraArgs=content_type)
|
|
|
|
def log_upload(self) -> None:
|
|
"""Upload the ci.log to S3
|
|
|
|
Raises:
|
|
Exception: S3UploadFailedError
|
|
Exception: ClientError
|
|
"""
|
|
self.logger.info('Uploading logs')
|
|
try:
|
|
self.upload_file(f"{self.outdir}/ci.log", 'ci.log', {'ContentType': 'text/plain', 'ACL': 'public-read'})
|
|
except (S3UploadFailedError, ClientError):
|
|
self.logger.exception('Failed to upload the CI logs!')
|
|
|
|
|
|
def take_screenshot(self, container: Container, tag:str) -> None:
|
|
"""Take a screenshot and save it to self.outdir
|
|
|
|
Spins up an ghcr.io/linuxserver/tester container and takes a screenshot using Selenium.
|
|
|
|
Args:
|
|
`container` (Container): Container object
|
|
`tag` (str): The container tag we are testing.
|
|
"""
|
|
proto = 'https' if self.ssl.upper() == 'TRUE' else 'http'
|
|
# Sleep for the user specified amount of time
|
|
self.logger.info('Sleeping for %s seconds before reloading container: %s and refreshing container attrs', self.test_container_delay, container.image)
|
|
time.sleep(int(self.test_container_delay))
|
|
container.reload()
|
|
try:
|
|
ip_adr = container.attrs['NetworkSettings']['Networks']['bridge']['IPAddress']
|
|
endpoint = f'{proto}://{self.webauth}@{ip_adr}:{self.port}{self.webpath}'
|
|
testercontainer, test_endpoint = self.start_tester(proto,endpoint,tag)
|
|
driver = self.setup_driver()
|
|
driver.get(test_endpoint)
|
|
self.logger.info('Sleeping for %s seconds before creating a screenshot on %s', self.screenshot_delay, tag)
|
|
time.sleep(int(self.screenshot_delay))
|
|
self.logger.info('Taking screenshot of %s at %s', tag, endpoint)
|
|
driver.get_screenshot_as_file(f'{self.outdir}/{tag}.png')
|
|
self.tag_report_tests[tag]['test']['Get screenshot'] = (dict(sorted({
|
|
'status':'PASS',
|
|
'message':'-'}.items())))
|
|
self.logger.info('Screenshot %s: PASS', tag)
|
|
except (requests.Timeout, requests.ConnectionError, KeyError) as error:
|
|
self.tag_report_tests[tag]['test']['Get screenshot'] = (dict(sorted({
|
|
'status':'FAIL',
|
|
'message': f'CONNECTION ERROR: {str(error)}'}.items())))
|
|
self.logger.exception('Screenshot %s FAIL CONNECTION ERROR', tag)
|
|
except TimeoutException as error:
|
|
self.tag_report_tests[tag]['test']['Get screenshot'] = (dict(sorted({
|
|
'status':'FAIL',
|
|
'message':f'TIMEOUT: {str(error)}'}.items())))
|
|
self.logger.exception('Screenshot %s FAIL TIMEOUT', tag)
|
|
except (WebDriverException, Exception) as error:
|
|
self.tag_report_tests[tag]['test']['Get screenshot'] = (dict(sorted({
|
|
'status':'FAIL',
|
|
'message':f'UNKNOWN: {str(error)}'}.items())))
|
|
self.logger.exception('Screenshot %s FAIL UNKNOWN', tag)
|
|
finally:
|
|
try:
|
|
testercontainer.remove(force='true')
|
|
except Exception:
|
|
self.logger.exception("Failed to remove tester container")
|
|
|
|
|
|
def start_tester(self, proto:str, endpoint:str, tag:str) -> tuple[Container,str]:
|
|
"""Spin up an RDP test container to load the container web ui.
|
|
|
|
Args:
|
|
`proto` (str): The protocol to use for the endpoint.
|
|
`endpoint` (str): The container endpoint to use with the tester container.
|
|
`tag` (str): The container tag
|
|
|
|
Returns:
|
|
Container/str: Returns the tester Container object and the tester endpoint
|
|
"""
|
|
self.logger.info("Starting tester container for tag: %s", tag)
|
|
testercontainer: Container = self.client.containers.run('ghcr.io/linuxserver/tester:latest',
|
|
shm_size='1G',
|
|
security_opt=["seccomp=unconfined"],
|
|
detach=True,
|
|
environment={'URL': endpoint})
|
|
#Sleep for the user specified amount of time
|
|
self.logger.info('Sleeping for %s seconds before reloading %s and refreshing container attrs on %s run', self.test_container_delay, testercontainer.image, tag)
|
|
time.sleep(int(self.test_container_delay))
|
|
testercontainer.reload()
|
|
testerip = testercontainer.attrs['NetworkSettings']['Networks']['bridge']['IPAddress']
|
|
testerendpoint = f'http://{testerip}:3000'
|
|
session = requests.Session()
|
|
retries = Retry(total=10, backoff_factor=2,status_forcelist=[502, 503, 504])
|
|
session.mount(proto, HTTPAdapter(max_retries=retries))
|
|
session.get(testerendpoint)
|
|
return testercontainer, testerendpoint
|
|
|
|
|
|
def setup_driver(self) -> webdriver.Chrome:
|
|
"""Return a single ChromiumDriver object the class can use
|
|
|
|
Returns:
|
|
Webdriver: Returns a Chromedriver object
|
|
"""
|
|
self.logger.info("Init Chromedriver")
|
|
# Selenium webdriver options
|
|
chrome_options = webdriver.ChromeOptions()
|
|
chrome_options.add_argument('--no-sandbox')
|
|
chrome_options.add_argument('--headless')
|
|
chrome_options.add_argument('--disable-gpu')
|
|
chrome_options.add_argument('--window-size=1920x1080')
|
|
chrome_options.add_argument('--disable-extensions')
|
|
chrome_options.add_argument('--ignore-certificate-errors')
|
|
chrome_options.add_argument('--disable-dev-shm-usage') # https://developers.google.com/web/tools/puppeteer/troubleshooting#tips
|
|
driver = webdriver.Chrome(options=chrome_options)
|
|
driver.set_page_load_timeout(60)
|
|
return driver
|
|
|
|
class CIError(Exception):
|
|
pass
|