import base64 import json import math import os import re import zipfile from typing import Tuple from urllib.error import HTTPError, URLError from urllib.parse import quote from urllib.request import Request, urlopen from dotenv import load_dotenv from filesystem import MUOS_SUPPORTED_PLATFORMS, Filesystem from models import Collection, Platform, Rom from PIL import Image from status import Status, View # Load .env file from one folder above load_dotenv(os.path.join(os.path.dirname(__file__), ".env")) class API: _platforms_endpoint = "api/platforms" _platform_icon_url = "assets/platforms" _collections_endpoint = "api/collections" _virtual_collections_endpoint = "api/collections/virtual" _roms_endpoint = "api/roms" _user_me_endpoint = "api/users/me" _user_profile_picture_url = "assets/romm/assets" def __init__(self): self.host = os.getenv("HOST", "") self.username = os.getenv("USERNAME", "") self.password = os.getenv("PASSWORD", "") self.headers = {} self._exclude_platforms = set(os.getenv("EXCLUDE_PLATFORMS") or []) self._include_collections = set(os.getenv("INCLUDE_COLLECTIONS") or []) self._exclude_collections = set(os.getenv("EXCLUDE_COLLECTIONS") or []) self._collection_type = os.getenv("COLLECTION_TYPE", "collection") self._status = Status() self._file_system = Filesystem() if self.username and self.password: credentials = f"{self.username}:{self.password}" auth_token = base64.b64encode(credentials.encode("utf-8")).decode("utf-8") self.headers = {"Authorization": f"Basic {auth_token}"} @staticmethod def _human_readable_size(size_bytes: int) -> Tuple[float, str]: if size_bytes == 0: return 0, "B" size_name = ("B", "KB", "MB", "GB") i = int(math.floor(math.log(size_bytes, 1024))) p = math.pow(1024, i) s = round(size_bytes / p, 2) return (s, size_name[i]) def _sanitize_filename(self, filename: str) -> str: invalid_chars = r"[\/\\\*\?\"|\<\>:\t\n\r\b]" return re.sub(invalid_chars, "_", filename) def _fetch_user_profile_picture(self, avatar_path: str) -> None: fs_extension = avatar_path.split(".")[-1] try: request = Request( f"{self.host}/{self._user_profile_picture_url}/{avatar_path}", headers=self.headers, ) except ValueError as e: print(e) self._status.valid_host = False self._status.valid_credentials = False return try: if request.type not in ("http", "https"): self._status.valid_host = False self._status.valid_credentials = False return response = urlopen(request, timeout=60) # trunk-ignore(bandit/B310) except HTTPError as e: print(e) if e.code == 403: self._status.valid_host = True self._status.valid_credentials = False return else: raise except URLError as e: print(e) self._status.valid_host = False self._status.valid_credentials = False return if not os.path.exists(self._file_system.resources_path): os.makedirs(self._file_system.resources_path) self._status.profile_pic_path = ( f"{self._file_system.resources_path}/{self.username}.{fs_extension}" ) with open(self._status.profile_pic_path, "wb") as f: f.write(response.read()) icon = Image.open(self._status.profile_pic_path) icon = icon.resize((26, 26)) icon.save(self._status.profile_pic_path) self._status.valid_host = True self._status.valid_credentials = True def fetch_me(self) -> None: try: request = Request( f"{self.host}/{self._user_me_endpoint}", headers=self.headers ) except ValueError as e: print(e) self._status.valid_host = False self._status.valid_credentials = False return try: if request.type not in ("http", "https"): self._status.valid_host = False self._status.valid_credentials = False return response = urlopen(request, timeout=60) # trunk-ignore(bandit/B310) except HTTPError as e: print(e) if e.code == 403: self._status.valid_host = True self._status.valid_credentials = False return else: raise except URLError as e: print(e) self._status.valid_host = False self._status.valid_credentials = False return me = json.loads(response.read().decode("utf-8")) self._status.me = me if me["avatar_path"]: self._fetch_user_profile_picture(me["avatar_path"]) self._status.me_ready.set() def _fetch_platform_icon(self, platform_slug) -> None: try: request = Request( f"{self.host}/{self._platform_icon_url}/{platform_slug}.ico", headers=self.headers, ) except ValueError as e: print(e) self._status.valid_host = False self._status.valid_credentials = False return try: if request.type not in ("http", "https"): self._status.valid_host = False self._status.valid_credentials = False return response = urlopen(request, timeout=60) # trunk-ignore(bandit/B310) except HTTPError as e: print(e) if e.code == 403: self._status.valid_host = True self._status.valid_credentials = False return # Icon is missing on the server elif e.code == 404: self._status.valid_host = True self._status.valid_credentials = True return else: raise except URLError as e: print(e) self._status.valid_host = False self._status.valid_credentials = False return if not os.path.exists(self._file_system.resources_path): os.makedirs(self._file_system.resources_path) with open(f"{self._file_system.resources_path}/{platform_slug}.ico", "wb") as f: f.write(response.read()) icon = Image.open(f"{self._file_system.resources_path}/{platform_slug}.ico") icon = icon.resize((30, 30)) icon.save(f"{self._file_system.resources_path}/{platform_slug}.ico") self._status.valid_host = True self._status.valid_credentials = True def fetch_platforms(self) -> None: try: request = Request( f"{self.host}/{self._platforms_endpoint}", headers=self.headers ) except ValueError: self._status.platforms = [] self._status.valid_host = False self._status.valid_credentials = False return try: if request.type not in ("http", "https"): self._status.platforms = [] self._status.valid_host = False self._status.valid_credentials = False return response = urlopen(request, timeout=60) # trunk-ignore(bandit/B310) except HTTPError as e: if e.code == 403: self._status.platforms = [] self._status.valid_host = True self._status.valid_credentials = False return else: raise except URLError: self._status.platforms = [] self._status.valid_host = False self._status.valid_credentials = False return platforms = json.loads(response.read().decode("utf-8")) if isinstance(platforms, dict): platforms = platforms["items"] _platforms: list[Platform] = [] for platform in platforms: if platform["rom_count"] > 0: if ( platform["slug"].lower() not in MUOS_SUPPORTED_PLATFORMS or platform["slug"] in self._exclude_platforms ): continue _platforms.append( Platform( id=platform["id"], display_name=platform["display_name"], rom_count=platform["rom_count"], slug=platform["slug"], ) ) if not os.path.exists( f"{self._file_system.resources_path}/{platform['slug']}.ico" ): self._fetch_platform_icon(platform["slug"]) _platforms.sort(key=lambda platform: platform.display_name) self._status.platforms = _platforms self._status.valid_host = True self._status.valid_credentials = True self._status.platforms_ready.set() def fetch_collections(self) -> None: try: collections_request = Request( f"{self.host}/{self._collections_endpoint}", headers=self.headers ) v_collections_request = Request( f"{self.host}/{self._virtual_collections_endpoint}?type={self._collection_type}", headers=self.headers, ) except ValueError: self._status.collections = [] self._status.valid_host = False self._status.valid_credentials = False return try: if collections_request.type not in ("http", "https"): self._status.collections = [] self._status.valid_host = False self._status.valid_credentials = False return collections_response = urlopen( # trunk-ignore(bandit/B310) collections_request, timeout=60 ) v_collections_response = urlopen( # trunk-ignore(bandit/B310) v_collections_request, timeout=60 ) except HTTPError as e: if e.code == 403: self._status.collections = [] self._status.valid_host = True self._status.valid_credentials = False return else: raise except URLError: self._status.collections = [] self._status.valid_host = False self._status.valid_credentials = False return collections = json.loads(collections_response.read().decode("utf-8")) v_collections = json.loads(v_collections_response.read().decode("utf-8")) if isinstance(collections, dict): collections = collections["items"] if isinstance(v_collections, dict): v_collections = v_collections["items"] _collections: list[Collection] = [] for collection in collections: if collection["rom_count"] > 0: if self._include_collections: if collection["name"] not in self._include_collections: continue elif self._exclude_collections: if collection["name"] in self._exclude_collections: continue _collections.append( Collection( id=collection["id"], name=collection["name"], rom_count=collection["rom_count"], virtual=False, ) ) for v_collection in v_collections: if v_collection["rom_count"] > 0: if self._include_collections: if v_collection["name"] not in self._include_collections: continue elif self._exclude_collections: if v_collection["name"] in self._exclude_collections: continue _collections.append( Collection( id=v_collection["id"], name=v_collection["name"], rom_count=v_collection["rom_count"], virtual=True, ) ) _collections.sort(key=lambda collection: collection.name) self._status.collections = _collections self._status.valid_host = True self._status.valid_credentials = True self._status.collections_ready.set() def fetch_roms(self) -> None: if self._status.selected_platform: view = View.PLATFORMS id = self._status.selected_platform.id elif self._status.selected_collection: view = View.COLLECTIONS id = self._status.selected_collection.id elif self._status.selected_virtual_collection: view = View.VIRTUAL_COLLECTIONS id = self._status.selected_virtual_collection.id else: return try: request = Request( f"{self.host}/{self._roms_endpoint}?{view}_id={id}&order_by=name&order_dir=asc", headers=self.headers, ) except ValueError: self._status.roms = [] self._status.valid_host = False self._status.valid_credentials = False return try: if request.type not in ("http", "https"): self._status.roms = [] self._status.valid_host = False self._status.valid_credentials = False return response = urlopen(request, timeout=1800) # trunk-ignore(bandit/B310) except HTTPError as e: if e.code == 403: self._status.roms = [] self._status.valid_host = True self._status.valid_credentials = False return else: raise except URLError: self._status.roms = [] self._status.valid_host = False self._status.valid_credentials = False return roms = json.loads(response.read().decode("utf-8")) if isinstance(roms, dict): roms = roms["items"] _roms = [ Rom( id=rom["id"], name=rom["name"], fs_name=rom["fs_name"], platform_slug=rom["platform_slug"], fs_extension=rom["fs_extension"], fs_size=self._human_readable_size(rom["fs_size_bytes"]), fs_size_bytes=rom["fs_size_bytes"], multi=rom["multi"], languages=rom["languages"], regions=rom["regions"], revision=rom["revision"], tags=rom["tags"], ) for rom in roms if rom["platform_slug"] in MUOS_SUPPORTED_PLATFORMS ] _roms.sort(key=lambda rom: rom.name) self._status.roms = _roms self._status.valid_host = True self._status.valid_credentials = True self._status.roms_ready.set() def _reset_download_status( self, valid_host: bool = False, valid_credentials: bool = False ) -> None: self._status.total_downloaded_bytes = 0 self._status.downloaded_percent = 0 self._status.valid_host = valid_host self._status.valid_credentials = valid_credentials self._status.downloading_rom = None self._status.extracting_rom = False self._status.multi_selected_roms = [] self._status.download_queue = [] self._status.download_rom_ready.set() self._status.abort_download.set() def download_rom(self) -> None: self._status.download_queue.sort(key=lambda rom: rom.name) for i, rom in enumerate(self._status.download_queue): self._status.downloading_rom = rom self._status.downloading_rom_position = i + 1 dest_path = os.path.join( self._file_system.get_sd_storage_platform_path(rom.platform_slug), self._sanitize_filename(rom.fs_name), ) url = f"{self.host}/{self._roms_endpoint}/{rom.id}/content/{quote(rom.fs_name)}?hidden_folder=true" os.makedirs(os.path.dirname(dest_path), exist_ok=True) try: print(f"Fetching: {url}") request = Request(url, headers=self.headers) except ValueError: self._reset_download_status() return try: if request.type not in ("http", "https"): self._reset_download_status() return print(f"Downloading {rom.name} to {dest_path}") with urlopen(request) as response, open( # trunk-ignore(bandit/B310) dest_path, "wb" ) as out_file: self._status.total_downloaded_bytes = 0 chunk_size = 1024 while True: if not self._status.abort_download.is_set(): chunk = response.read(chunk_size) if not chunk: print("Finalized download") break out_file.write(chunk) self._status.valid_host = True self._status.valid_credentials = True self._status.total_downloaded_bytes += len(chunk) self._status.downloaded_percent = ( self._status.total_downloaded_bytes / ( self._status.downloading_rom.fs_size_bytes + 1 ) # Add 1 virtual byte to avoid division by zero ) * 100 else: self._reset_download_status(True, True) os.remove(dest_path) return if rom.multi: self._status.extracting_rom = True print("Multi file rom detected. Extracting...") with zipfile.ZipFile(dest_path, "r") as zip_ref: total_size = sum(file.file_size for file in zip_ref.infolist()) extracted_size = 0 chunk_size = 1024 for file in zip_ref.infolist(): if not self._status.abort_download.is_set(): file_path = os.path.join( os.path.dirname(dest_path), self._sanitize_filename(file.filename), ) os.makedirs(os.path.dirname(file_path), exist_ok=True) with zip_ref.open(file) as source, open( file_path, "wb" ) as target: while True: chunk = source.read(chunk_size) if not chunk: break target.write(chunk) extracted_size += len(chunk) self._status.extracted_percent = ( extracted_size / total_size ) * 100 else: self._reset_download_status(True, True) os.remove(dest_path) return self._status.extracting_rom = False self._status.downloading_rom = None os.remove(dest_path) print(f"Extracted {rom.name} at {os.path.dirname(dest_path)}") except HTTPError as e: if e.code == 403: self._reset_download_status(valid_host=True) return else: raise except URLError: self._reset_download_status(valid_host=True) return # End of download self._reset_download_status(valid_host=True, valid_credentials=True)