mirror of
https://github.com/rommapp/muos-app.git
synced 2026-02-20 00:55:26 +08:00
519 lines
20 KiB
Python
519 lines
20 KiB
Python
import base64
|
|
import json
|
|
import math
|
|
import os
|
|
import re
|
|
import zipfile
|
|
from typing import Tuple
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.parse import quote
|
|
from urllib.request import Request, urlopen
|
|
|
|
from dotenv import load_dotenv
|
|
from filesystem import MUOS_SUPPORTED_PLATFORMS, Filesystem
|
|
from models import Collection, Platform, Rom
|
|
from PIL import Image
|
|
from status import Status, View
|
|
|
|
# Load .env file from one folder above
|
|
load_dotenv(os.path.join(os.path.dirname(__file__), ".env"))
|
|
|
|
|
|
class API:
|
|
_platforms_endpoint = "api/platforms"
|
|
_platform_icon_url = "assets/platforms"
|
|
_collections_endpoint = "api/collections"
|
|
_virtual_collections_endpoint = "api/collections/virtual"
|
|
_roms_endpoint = "api/roms"
|
|
_user_me_endpoint = "api/users/me"
|
|
_user_profile_picture_url = "assets/romm/assets"
|
|
|
|
def __init__(self):
|
|
self.host = os.getenv("HOST", "")
|
|
self.username = os.getenv("USERNAME", "")
|
|
self.password = os.getenv("PASSWORD", "")
|
|
self.headers = {}
|
|
self._exclude_platforms = set(os.getenv("EXCLUDE_PLATFORMS") or [])
|
|
self._include_collections = set(os.getenv("INCLUDE_COLLECTIONS") or [])
|
|
self._exclude_collections = set(os.getenv("EXCLUDE_COLLECTIONS") or [])
|
|
self._collection_type = os.getenv("COLLECTION_TYPE", "collection")
|
|
self._status = Status()
|
|
self._file_system = Filesystem()
|
|
|
|
if self.username and self.password:
|
|
credentials = f"{self.username}:{self.password}"
|
|
auth_token = base64.b64encode(credentials.encode("utf-8")).decode("utf-8")
|
|
self.headers = {"Authorization": f"Basic {auth_token}"}
|
|
|
|
@staticmethod
|
|
def _human_readable_size(size_bytes: int) -> Tuple[float, str]:
|
|
if size_bytes == 0:
|
|
return 0, "B"
|
|
size_name = ("B", "KB", "MB", "GB")
|
|
i = int(math.floor(math.log(size_bytes, 1024)))
|
|
p = math.pow(1024, i)
|
|
s = round(size_bytes / p, 2)
|
|
return (s, size_name[i])
|
|
|
|
def _sanitize_filename(self, filename: str) -> str:
|
|
invalid_chars = r"[\/\\\*\?\"|\<\>:\t\n\r\b]"
|
|
return re.sub(invalid_chars, "_", filename)
|
|
|
|
def _fetch_user_profile_picture(self, avatar_path: str) -> None:
|
|
fs_extension = avatar_path.split(".")[-1]
|
|
try:
|
|
request = Request(
|
|
f"{self.host}/{self._user_profile_picture_url}/{avatar_path}",
|
|
headers=self.headers,
|
|
)
|
|
except ValueError as e:
|
|
print(e)
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
try:
|
|
if request.type not in ("http", "https"):
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
response = urlopen(request, timeout=60) # trunk-ignore(bandit/B310)
|
|
except HTTPError as e:
|
|
print(e)
|
|
if e.code == 403:
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = False
|
|
return
|
|
else:
|
|
raise
|
|
except URLError as e:
|
|
print(e)
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
if not os.path.exists(self._file_system.resources_path):
|
|
os.makedirs(self._file_system.resources_path)
|
|
self._status.profile_pic_path = (
|
|
f"{self._file_system.resources_path}/{self.username}.{fs_extension}"
|
|
)
|
|
with open(self._status.profile_pic_path, "wb") as f:
|
|
f.write(response.read())
|
|
icon = Image.open(self._status.profile_pic_path)
|
|
icon = icon.resize((26, 26))
|
|
icon.save(self._status.profile_pic_path)
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = True
|
|
|
|
def fetch_me(self) -> None:
|
|
try:
|
|
request = Request(
|
|
f"{self.host}/{self._user_me_endpoint}", headers=self.headers
|
|
)
|
|
except ValueError as e:
|
|
print(e)
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
try:
|
|
if request.type not in ("http", "https"):
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
response = urlopen(request, timeout=60) # trunk-ignore(bandit/B310)
|
|
except HTTPError as e:
|
|
print(e)
|
|
if e.code == 403:
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = False
|
|
return
|
|
else:
|
|
raise
|
|
except URLError as e:
|
|
print(e)
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
me = json.loads(response.read().decode("utf-8"))
|
|
self._status.me = me
|
|
if me["avatar_path"]:
|
|
self._fetch_user_profile_picture(me["avatar_path"])
|
|
self._status.me_ready.set()
|
|
|
|
def _fetch_platform_icon(self, platform_slug) -> None:
|
|
try:
|
|
request = Request(
|
|
f"{self.host}/{self._platform_icon_url}/{platform_slug}.ico",
|
|
headers=self.headers,
|
|
)
|
|
except ValueError as e:
|
|
print(e)
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
|
|
try:
|
|
if request.type not in ("http", "https"):
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
response = urlopen(request, timeout=60) # trunk-ignore(bandit/B310)
|
|
except HTTPError as e:
|
|
print(e)
|
|
if e.code == 403:
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = False
|
|
return
|
|
# Icon is missing on the server
|
|
elif e.code == 404:
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = True
|
|
return
|
|
else:
|
|
raise
|
|
except URLError as e:
|
|
print(e)
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
|
|
if not os.path.exists(self._file_system.resources_path):
|
|
os.makedirs(self._file_system.resources_path)
|
|
|
|
with open(f"{self._file_system.resources_path}/{platform_slug}.ico", "wb") as f:
|
|
f.write(response.read())
|
|
|
|
icon = Image.open(f"{self._file_system.resources_path}/{platform_slug}.ico")
|
|
icon = icon.resize((30, 30))
|
|
icon.save(f"{self._file_system.resources_path}/{platform_slug}.ico")
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = True
|
|
|
|
def fetch_platforms(self) -> None:
|
|
try:
|
|
request = Request(
|
|
f"{self.host}/{self._platforms_endpoint}", headers=self.headers
|
|
)
|
|
except ValueError:
|
|
self._status.platforms = []
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
try:
|
|
if request.type not in ("http", "https"):
|
|
self._status.platforms = []
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
response = urlopen(request, timeout=60) # trunk-ignore(bandit/B310)
|
|
except HTTPError as e:
|
|
if e.code == 403:
|
|
self._status.platforms = []
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = False
|
|
return
|
|
else:
|
|
raise
|
|
except URLError:
|
|
self._status.platforms = []
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
platforms = json.loads(response.read().decode("utf-8"))
|
|
if isinstance(platforms, dict):
|
|
platforms = platforms["items"]
|
|
_platforms: list[Platform] = []
|
|
for platform in platforms:
|
|
if platform["rom_count"] > 0:
|
|
if (
|
|
platform["slug"].lower() not in MUOS_SUPPORTED_PLATFORMS
|
|
or platform["slug"] in self._exclude_platforms
|
|
):
|
|
continue
|
|
_platforms.append(
|
|
Platform(
|
|
id=platform["id"],
|
|
display_name=platform["display_name"],
|
|
rom_count=platform["rom_count"],
|
|
slug=platform["slug"],
|
|
)
|
|
)
|
|
if not os.path.exists(
|
|
f"{self._file_system.resources_path}/{platform['slug']}.ico"
|
|
):
|
|
self._fetch_platform_icon(platform["slug"])
|
|
_platforms.sort(key=lambda platform: platform.display_name)
|
|
self._status.platforms = _platforms
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = True
|
|
self._status.platforms_ready.set()
|
|
|
|
def fetch_collections(self) -> None:
|
|
try:
|
|
collections_request = Request(
|
|
f"{self.host}/{self._collections_endpoint}", headers=self.headers
|
|
)
|
|
v_collections_request = Request(
|
|
f"{self.host}/{self._virtual_collections_endpoint}?type={self._collection_type}",
|
|
headers=self.headers,
|
|
)
|
|
except ValueError:
|
|
self._status.collections = []
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
|
|
try:
|
|
if collections_request.type not in ("http", "https"):
|
|
self._status.collections = []
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
|
|
collections_response = urlopen( # trunk-ignore(bandit/B310)
|
|
collections_request, timeout=60
|
|
)
|
|
v_collections_response = urlopen( # trunk-ignore(bandit/B310)
|
|
v_collections_request, timeout=60
|
|
)
|
|
except HTTPError as e:
|
|
if e.code == 403:
|
|
self._status.collections = []
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = False
|
|
return
|
|
else:
|
|
raise
|
|
except URLError:
|
|
self._status.collections = []
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
|
|
collections = json.loads(collections_response.read().decode("utf-8"))
|
|
v_collections = json.loads(v_collections_response.read().decode("utf-8"))
|
|
|
|
if isinstance(collections, dict):
|
|
collections = collections["items"]
|
|
if isinstance(v_collections, dict):
|
|
v_collections = v_collections["items"]
|
|
|
|
_collections: list[Collection] = []
|
|
|
|
for collection in collections:
|
|
if collection["rom_count"] > 0:
|
|
if self._include_collections:
|
|
if collection["name"] not in self._include_collections:
|
|
continue
|
|
elif self._exclude_collections:
|
|
if collection["name"] in self._exclude_collections:
|
|
continue
|
|
_collections.append(
|
|
Collection(
|
|
id=collection["id"],
|
|
name=collection["name"],
|
|
rom_count=collection["rom_count"],
|
|
virtual=False,
|
|
)
|
|
)
|
|
|
|
for v_collection in v_collections:
|
|
if v_collection["rom_count"] > 0:
|
|
if self._include_collections:
|
|
if v_collection["name"] not in self._include_collections:
|
|
continue
|
|
elif self._exclude_collections:
|
|
if v_collection["name"] in self._exclude_collections:
|
|
continue
|
|
_collections.append(
|
|
Collection(
|
|
id=v_collection["id"],
|
|
name=v_collection["name"],
|
|
rom_count=v_collection["rom_count"],
|
|
virtual=True,
|
|
)
|
|
)
|
|
|
|
_collections.sort(key=lambda collection: collection.name)
|
|
|
|
self._status.collections = _collections
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = True
|
|
self._status.collections_ready.set()
|
|
|
|
def fetch_roms(self) -> None:
|
|
if self._status.selected_platform:
|
|
view = View.PLATFORMS
|
|
id = self._status.selected_platform.id
|
|
elif self._status.selected_collection:
|
|
view = View.COLLECTIONS
|
|
id = self._status.selected_collection.id
|
|
elif self._status.selected_virtual_collection:
|
|
view = View.VIRTUAL_COLLECTIONS
|
|
id = self._status.selected_virtual_collection.id
|
|
else:
|
|
return
|
|
|
|
try:
|
|
request = Request(
|
|
f"{self.host}/{self._roms_endpoint}?{view}_id={id}&order_by=name&order_dir=asc",
|
|
headers=self.headers,
|
|
)
|
|
except ValueError:
|
|
self._status.roms = []
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
try:
|
|
if request.type not in ("http", "https"):
|
|
self._status.roms = []
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
response = urlopen(request, timeout=1800) # trunk-ignore(bandit/B310)
|
|
except HTTPError as e:
|
|
if e.code == 403:
|
|
self._status.roms = []
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = False
|
|
return
|
|
else:
|
|
raise
|
|
except URLError:
|
|
self._status.roms = []
|
|
self._status.valid_host = False
|
|
self._status.valid_credentials = False
|
|
return
|
|
roms = json.loads(response.read().decode("utf-8"))
|
|
if isinstance(roms, dict):
|
|
roms = roms["items"]
|
|
_roms = [
|
|
Rom(
|
|
id=rom["id"],
|
|
name=rom["name"],
|
|
fs_name=rom["fs_name"],
|
|
platform_slug=rom["platform_slug"],
|
|
fs_extension=rom["fs_extension"],
|
|
fs_size=self._human_readable_size(rom["fs_size_bytes"]),
|
|
fs_size_bytes=rom["fs_size_bytes"],
|
|
multi=rom["multi"],
|
|
languages=rom["languages"],
|
|
regions=rom["regions"],
|
|
revision=rom["revision"],
|
|
tags=rom["tags"],
|
|
)
|
|
for rom in roms
|
|
if rom["platform_slug"] in MUOS_SUPPORTED_PLATFORMS
|
|
]
|
|
_roms.sort(key=lambda rom: rom.name)
|
|
self._status.roms = _roms
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = True
|
|
self._status.roms_ready.set()
|
|
|
|
def _reset_download_status(
|
|
self, valid_host: bool = False, valid_credentials: bool = False
|
|
) -> None:
|
|
self._status.total_downloaded_bytes = 0
|
|
self._status.downloaded_percent = 0
|
|
self._status.valid_host = valid_host
|
|
self._status.valid_credentials = valid_credentials
|
|
self._status.downloading_rom = None
|
|
self._status.extracting_rom = False
|
|
self._status.multi_selected_roms = []
|
|
self._status.download_queue = []
|
|
self._status.download_rom_ready.set()
|
|
self._status.abort_download.set()
|
|
|
|
def download_rom(self) -> None:
|
|
self._status.download_queue.sort(key=lambda rom: rom.name)
|
|
for i, rom in enumerate(self._status.download_queue):
|
|
self._status.downloading_rom = rom
|
|
self._status.downloading_rom_position = i + 1
|
|
dest_path = os.path.join(
|
|
self._file_system.get_sd_storage_platform_path(rom.platform_slug),
|
|
self._sanitize_filename(rom.fs_name),
|
|
)
|
|
url = f"{self.host}/{self._roms_endpoint}/{rom.id}/content/{quote(rom.fs_name)}?hidden_folder=true"
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
|
|
try:
|
|
print(f"Fetching: {url}")
|
|
request = Request(url, headers=self.headers)
|
|
except ValueError:
|
|
self._reset_download_status()
|
|
return
|
|
try:
|
|
if request.type not in ("http", "https"):
|
|
self._reset_download_status()
|
|
return
|
|
print(f"Downloading {rom.name} to {dest_path}")
|
|
with urlopen(request) as response, open( # trunk-ignore(bandit/B310)
|
|
dest_path, "wb"
|
|
) as out_file:
|
|
self._status.total_downloaded_bytes = 0
|
|
chunk_size = 1024
|
|
while True:
|
|
if not self._status.abort_download.is_set():
|
|
chunk = response.read(chunk_size)
|
|
if not chunk:
|
|
print("Finalized download")
|
|
break
|
|
out_file.write(chunk)
|
|
self._status.valid_host = True
|
|
self._status.valid_credentials = True
|
|
self._status.total_downloaded_bytes += len(chunk)
|
|
self._status.downloaded_percent = (
|
|
self._status.total_downloaded_bytes
|
|
/ (
|
|
self._status.downloading_rom.fs_size_bytes + 1
|
|
) # Add 1 virtual byte to avoid division by zero
|
|
) * 100
|
|
else:
|
|
self._reset_download_status(True, True)
|
|
os.remove(dest_path)
|
|
return
|
|
if rom.multi:
|
|
self._status.extracting_rom = True
|
|
print("Multi file rom detected. Extracting...")
|
|
with zipfile.ZipFile(dest_path, "r") as zip_ref:
|
|
total_size = sum(file.file_size for file in zip_ref.infolist())
|
|
extracted_size = 0
|
|
chunk_size = 1024
|
|
for file in zip_ref.infolist():
|
|
if not self._status.abort_download.is_set():
|
|
file_path = os.path.join(
|
|
os.path.dirname(dest_path),
|
|
self._sanitize_filename(file.filename),
|
|
)
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
|
with zip_ref.open(file) as source, open(
|
|
file_path, "wb"
|
|
) as target:
|
|
while True:
|
|
chunk = source.read(chunk_size)
|
|
if not chunk:
|
|
break
|
|
target.write(chunk)
|
|
extracted_size += len(chunk)
|
|
self._status.extracted_percent = (
|
|
extracted_size / total_size
|
|
) * 100
|
|
else:
|
|
self._reset_download_status(True, True)
|
|
os.remove(dest_path)
|
|
return
|
|
self._status.extracting_rom = False
|
|
self._status.downloading_rom = None
|
|
os.remove(dest_path)
|
|
print(f"Extracted {rom.name} at {os.path.dirname(dest_path)}")
|
|
except HTTPError as e:
|
|
if e.code == 403:
|
|
self._reset_download_status(valid_host=True)
|
|
return
|
|
else:
|
|
raise
|
|
except URLError:
|
|
self._reset_download_status(valid_host=True)
|
|
return
|
|
# End of download
|
|
self._reset_download_status(valid_host=True, valid_credentials=True)
|