mirror of
https://github.com/rommapp/muos-app.git
synced 2026-01-09 06:33:16 +08:00
709 lines
29 KiB
Python
709 lines
29 KiB
Python
import base64
|
|
import datetime
|
|
import json
|
|
import math
|
|
import os
|
|
import re
|
|
import zipfile
|
|
from typing import Tuple
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.parse import quote
|
|
from urllib.request import Request, urlopen
|
|
|
|
import platform_maps
|
|
from filesystem import Filesystem
|
|
from imageutils import ImageUtils
|
|
from models import Collection, Platform, Rom
|
|
from PIL import Image
|
|
from status import Status, View
|
|
|
|
|
|
class API:
|
|
_platforms_endpoint = "api/platforms"
|
|
_platform_icon_url = "assets/platforms"
|
|
_collections_endpoint = "api/collections"
|
|
_virtual_collections_endpoint = "api/collections/virtual"
|
|
_roms_endpoint = "api/roms"
|
|
_user_me_endpoint = "api/users/me"
|
|
_user_profile_picture_url = "assets/romm/assets"
|
|
|
|
def __init__(self):
|
|
self.status = Status()
|
|
self.file_system = Filesystem()
|
|
self.image_utils = ImageUtils()
|
|
|
|
self.host = os.getenv("HOST", "").strip("/")
|
|
self.username = os.getenv("USERNAME", "")
|
|
self.password = os.getenv("PASSWORD", "")
|
|
self.headers = {}
|
|
self._exclude_platforms = set(self._getenv_list("EXCLUDE_PLATFORMS"))
|
|
self._include_collections = set(self._getenv_list("INCLUDE_COLLECTIONS"))
|
|
self._exclude_collections = set(self._getenv_list("EXCLUDE_COLLECTIONS"))
|
|
self._collection_type = os.getenv("COLLECTION_TYPE", "collection")
|
|
self._download_assets = os.getenv("DOWNLOAD_ASSETS", "false") in ("true", "1")
|
|
self._fullscreen_assets = os.getenv("FULLSCREEN_ASSETS", "false") in (
|
|
"true",
|
|
"1",
|
|
)
|
|
|
|
if self.username and self.password:
|
|
credentials = f"{self.username}:{self.password}"
|
|
auth_token = base64.b64encode(credentials.encode("utf-8")).decode("utf-8")
|
|
self.headers = {"Authorization": f"Basic {auth_token}"}
|
|
|
|
@staticmethod
|
|
def _getenv_list(key: str) -> list[str]:
|
|
value = os.getenv(key)
|
|
return [item.strip() for item in value.split(",")] if value is not None else []
|
|
|
|
@staticmethod
|
|
def _human_readable_size(size_bytes: int) -> Tuple[float, str]:
|
|
if size_bytes == 0:
|
|
return 0, "B"
|
|
size_name = ("B", "KB", "MB", "GB")
|
|
i = int(math.floor(math.log(size_bytes, 1024)))
|
|
p = math.pow(1024, i)
|
|
s = round(size_bytes / p, 2)
|
|
return (s, size_name[i])
|
|
|
|
def _sanitize_filename(self, filename: str) -> str:
|
|
path_parts = os.path.normpath(filename).split(os.sep)
|
|
sanitized_parts = []
|
|
|
|
for _i, part in enumerate(path_parts):
|
|
sanitized = re.sub(r'[\\/*?:"<>|\t\n\r\b]', "_", part)
|
|
sanitized_parts.append(sanitized)
|
|
|
|
return os.path.join(*sanitized_parts)
|
|
|
|
def _fetch_user_profile_picture(self, avatar_path: str) -> None:
|
|
fs_extension = avatar_path.split(".")[-1]
|
|
try:
|
|
request = Request(
|
|
f"{self.host}/{self._user_profile_picture_url}/{avatar_path}",
|
|
headers=self.headers,
|
|
)
|
|
except ValueError as e:
|
|
print(e)
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
try:
|
|
if request.type not in ("http", "https"):
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
response = urlopen(request, timeout=60) # trunk-ignore(bandit/B310)
|
|
except HTTPError as e:
|
|
print(e)
|
|
if e.code == 403:
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = False
|
|
return
|
|
else:
|
|
raise
|
|
except URLError as e:
|
|
print(e)
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
if not os.path.exists(self.file_system.resources_path):
|
|
os.makedirs(self.file_system.resources_path)
|
|
self.status.profile_pic_path = (
|
|
f"{self.file_system.resources_path}/{self.username}.{fs_extension}"
|
|
)
|
|
with open(self.status.profile_pic_path, "wb") as f:
|
|
f.write(response.read())
|
|
icon = Image.open(self.status.profile_pic_path)
|
|
icon = icon.resize((26, 26))
|
|
icon.save(self.status.profile_pic_path)
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = True
|
|
|
|
def fetch_me(self) -> None:
|
|
try:
|
|
request = Request(
|
|
f"{self.host}/{self._user_me_endpoint}", headers=self.headers
|
|
)
|
|
except ValueError as e:
|
|
print(e)
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
try:
|
|
if request.type not in ("http", "https"):
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
response = urlopen(request, timeout=60) # trunk-ignore(bandit/B310)
|
|
except HTTPError as e:
|
|
print(e)
|
|
if e.code == 403:
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = False
|
|
return
|
|
else:
|
|
raise
|
|
except URLError as e:
|
|
print(e)
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
me = json.loads(response.read().decode("utf-8"))
|
|
self.status.me = me
|
|
if me["avatar_path"]:
|
|
self._fetch_user_profile_picture(me["avatar_path"])
|
|
self.status.me_ready.set()
|
|
|
|
def _fetch_platform_icon(self, platform_slug) -> None:
|
|
try:
|
|
mapped_slug, icon_filename = platform_maps.ES_FOLDER_MAP.get(
|
|
platform_slug.lower(), (platform_slug, platform_slug)
|
|
)
|
|
icon_url = f"{self.host}/{self._platform_icon_url}/{icon_filename}.ico"
|
|
request = Request(
|
|
f"{self.host}/{self._platform_icon_url}/{icon_filename}.ico",
|
|
headers=self.headers,
|
|
)
|
|
except ValueError as e:
|
|
print(e)
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
|
|
try:
|
|
if request.type not in ("http", "https"):
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
response = urlopen(request, timeout=60) # trunk-ignore(bandit/B310)
|
|
except HTTPError as e:
|
|
print(e)
|
|
if e.code == 403:
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = False
|
|
return
|
|
# Icon is missing on the server
|
|
elif e.code == 404:
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = True
|
|
print(f"Requested icon not found: {icon_url}")
|
|
return
|
|
else:
|
|
raise
|
|
except URLError as e:
|
|
print(e)
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
|
|
self.file_system.resources_path = os.getcwd() + "/resources"
|
|
if not os.path.exists(self.file_system.resources_path):
|
|
os.makedirs(self.file_system.resources_path)
|
|
|
|
with open(f"{self.file_system.resources_path}/{platform_slug}.ico", "wb") as f:
|
|
f.write(response.read())
|
|
|
|
icon = Image.open(f"{self.file_system.resources_path}/{platform_slug}.ico")
|
|
icon = icon.resize((30, 30))
|
|
icon.save(f"{self.file_system.resources_path}/{platform_slug}.ico")
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = True
|
|
|
|
def fetch_platforms(self) -> None:
|
|
try:
|
|
request = Request(
|
|
f"{self.host}/{self._platforms_endpoint}", headers=self.headers
|
|
)
|
|
except ValueError:
|
|
self.status.platforms = []
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
try:
|
|
if request.type not in ("http", "https"):
|
|
self.status.platforms = []
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
response = urlopen(request, timeout=60) # trunk-ignore(bandit/B310)
|
|
except HTTPError as e:
|
|
print(f"HTTP Error in fetching platforms: {e}")
|
|
if e.code == 403:
|
|
self.status.platforms = []
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = False
|
|
return
|
|
else:
|
|
raise
|
|
except URLError:
|
|
print("URLError in fetching platforms")
|
|
self.status.platforms = []
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
|
|
platforms = json.loads(response.read().decode("utf-8"))
|
|
_platforms: list[Platform] = []
|
|
|
|
# Get the list of subfolders in the ROMs directory for PM filtering
|
|
roms_subfolders = set()
|
|
if not self.file_system.is_muos and not self.file_system.is_spruceos:
|
|
roms_path = self.file_system.get_roms_storage_path()
|
|
print(f"ROMs path: {roms_path}")
|
|
if os.path.exists(roms_path):
|
|
roms_subfolders = {
|
|
d.lower()
|
|
for d in os.listdir(roms_path)
|
|
if os.path.isdir(os.path.join(roms_path, d))
|
|
}
|
|
|
|
for platform in platforms:
|
|
if platform["rom_count"] > 0:
|
|
platform_slug: str = platform["slug"].lower()
|
|
if (
|
|
platform_maps._env_maps
|
|
and platform_slug in platform_maps._env_platforms
|
|
and platform_slug not in self._exclude_platforms
|
|
):
|
|
# A custom map from the .env was found, no need to check defaults
|
|
pass
|
|
elif self.file_system.is_muos:
|
|
if (
|
|
platform_slug not in platform_maps.MUOS_SUPPORTED_PLATFORMS
|
|
or platform_slug in self._exclude_platforms
|
|
):
|
|
continue
|
|
elif self.file_system.is_spruceos:
|
|
if (
|
|
platform_slug not in platform_maps.SPRUCEOS_SUPPORTED_PLATFORMS
|
|
or platform_slug in self._exclude_platforms
|
|
):
|
|
continue
|
|
else:
|
|
# Map the slug to the folder name for non-muOS
|
|
mapped_folder, icon_file = platform_maps.ES_FOLDER_MAP.get(
|
|
platform_slug.lower(), (platform_slug, platform_slug)
|
|
)
|
|
if (
|
|
mapped_folder.lower() not in roms_subfolders
|
|
or platform_slug in self._exclude_platforms
|
|
):
|
|
continue
|
|
|
|
_platforms.append(
|
|
Platform(
|
|
id=platform["id"],
|
|
display_name=platform["display_name"],
|
|
rom_count=platform["rom_count"],
|
|
slug=platform["slug"],
|
|
)
|
|
)
|
|
|
|
self.file_system.resources_path = os.getcwd() + "/resources"
|
|
icon_path = f"{self.file_system.resources_path}/{platform['slug']}.ico"
|
|
if not os.path.exists(icon_path):
|
|
self._fetch_platform_icon(platform["slug"])
|
|
|
|
self.status.platforms = _platforms
|
|
print(f"Fetched {len(_platforms)} platforms")
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = True
|
|
self.status.platforms_ready.set()
|
|
|
|
def fetch_collections(self) -> None:
|
|
try:
|
|
collections_request = Request(
|
|
f"{self.host}/{self._collections_endpoint}", headers=self.headers
|
|
)
|
|
v_collections_request = Request(
|
|
f"{self.host}/{self._virtual_collections_endpoint}?type={self._collection_type}",
|
|
headers=self.headers,
|
|
)
|
|
except ValueError:
|
|
self.status.collections = []
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
|
|
try:
|
|
if collections_request.type not in ("http", "https"):
|
|
self.status.collections = []
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
|
|
collections_response = urlopen( # trunk-ignore(bandit/B310)
|
|
collections_request, timeout=60
|
|
)
|
|
v_collections_response = urlopen( # trunk-ignore(bandit/B310)
|
|
v_collections_request, timeout=60
|
|
)
|
|
except HTTPError as e:
|
|
if e.code == 403:
|
|
self.status.collections = []
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = False
|
|
return
|
|
else:
|
|
raise
|
|
except URLError:
|
|
self.status.collections = []
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
|
|
collections = json.loads(collections_response.read().decode("utf-8"))
|
|
v_collections = json.loads(v_collections_response.read().decode("utf-8"))
|
|
|
|
if isinstance(collections, dict):
|
|
collections = collections["items"]
|
|
if isinstance(v_collections, dict):
|
|
v_collections = v_collections["items"]
|
|
|
|
_collections: list[Collection] = []
|
|
|
|
for collection in collections:
|
|
if collection["rom_count"] > 0:
|
|
if self._include_collections:
|
|
if collection["name"] not in self._include_collections:
|
|
continue
|
|
elif self._exclude_collections:
|
|
if collection["name"] in self._exclude_collections:
|
|
continue
|
|
_collections.append(
|
|
Collection(
|
|
id=collection["id"],
|
|
name=collection["name"],
|
|
rom_count=collection["rom_count"],
|
|
virtual=False,
|
|
)
|
|
)
|
|
|
|
for v_collection in v_collections:
|
|
if v_collection["rom_count"] > 0:
|
|
if self._include_collections:
|
|
if v_collection["name"] not in self._include_collections:
|
|
continue
|
|
elif self._exclude_collections:
|
|
if v_collection["name"] in self._exclude_collections:
|
|
continue
|
|
_collections.append(
|
|
Collection(
|
|
id=v_collection["id"],
|
|
name=v_collection["name"],
|
|
rom_count=v_collection["rom_count"],
|
|
virtual=True,
|
|
)
|
|
)
|
|
|
|
self.status.collections = _collections
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = True
|
|
self.status.collections_ready.set()
|
|
|
|
def fetch_roms(self) -> None:
|
|
if self.status.selected_platform:
|
|
view = View.PLATFORMS
|
|
id = self.status.selected_platform.id
|
|
selected_platform_slug = self.status.selected_platform.slug.lower()
|
|
elif self.status.selected_collection:
|
|
view = View.COLLECTIONS
|
|
id = self.status.selected_collection.id
|
|
selected_platform_slug = None
|
|
elif self.status.selected_virtual_collection:
|
|
view = View.VIRTUAL_COLLECTIONS
|
|
id = self.status.selected_virtual_collection.id
|
|
selected_platform_slug = None
|
|
else:
|
|
return
|
|
|
|
try:
|
|
request = Request(
|
|
f"{self.host}/{self._roms_endpoint}?{view}_id={id}&order_by=name&order_dir=asc&limit=10000",
|
|
headers=self.headers,
|
|
)
|
|
except ValueError:
|
|
self.status.roms = []
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
try:
|
|
if request.type not in ("http", "https"):
|
|
self.status.roms = []
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
response = urlopen(request, timeout=1800) # trunk-ignore(bandit/B310)
|
|
except HTTPError as e:
|
|
if e.code == 403:
|
|
self.status.roms = []
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = False
|
|
return
|
|
else:
|
|
raise
|
|
except URLError:
|
|
self.status.roms = []
|
|
self.status.valid_host = False
|
|
self.status.valid_credentials = False
|
|
return
|
|
|
|
# { 'items': list[dict], 'total': number, 'limit': number, 'offset': number }
|
|
roms = json.loads(response.read().decode("utf-8"))
|
|
if isinstance(roms, dict):
|
|
roms = roms["items"]
|
|
|
|
# Get the list of subfolders in the ROMs directory for non-muOS filtering
|
|
roms_subfolders = set()
|
|
if not self.file_system.is_muos and not self.file_system.is_spruceos:
|
|
roms_path = self.file_system.get_roms_storage_path()
|
|
if os.path.exists(roms_path):
|
|
roms_subfolders = {
|
|
d.lower()
|
|
for d in os.listdir(roms_path)
|
|
if os.path.isdir(os.path.join(roms_path, d))
|
|
}
|
|
|
|
_roms = []
|
|
for rom in roms:
|
|
platform_slug: str = rom["platform_slug"].lower()
|
|
if (
|
|
platform_maps._env_maps
|
|
and platform_slug in platform_maps._env_platforms
|
|
):
|
|
pass
|
|
elif self.file_system.is_muos:
|
|
if platform_slug not in platform_maps.MUOS_SUPPORTED_PLATFORMS:
|
|
continue
|
|
elif self.file_system.is_spruceos:
|
|
if platform_slug not in platform_maps.SPRUCEOS_SUPPORTED_PLATFORMS:
|
|
continue
|
|
else:
|
|
mapped_folder, icon_file = platform_maps.ES_FOLDER_MAP.get(
|
|
platform_slug.lower(), (platform_slug, platform_slug)
|
|
)
|
|
if mapped_folder.lower() not in roms_subfolders:
|
|
continue
|
|
|
|
if view == View.PLATFORMS and platform_slug != selected_platform_slug:
|
|
continue
|
|
|
|
metadatum = rom.get("metadatum", {})
|
|
_roms.append(
|
|
Rom(
|
|
id=rom["id"],
|
|
platform_id=rom["platform_id"],
|
|
platform_slug=rom["platform_slug"],
|
|
fs_name=rom["fs_name"],
|
|
fs_name_no_tags=rom["fs_name_no_tags"],
|
|
fs_name_no_ext=rom["fs_name_no_ext"],
|
|
fs_extension=rom["fs_extension"],
|
|
fs_size=self._human_readable_size(rom["fs_size_bytes"]),
|
|
fs_size_bytes=rom["fs_size_bytes"],
|
|
name=rom["name"],
|
|
slug=rom["slug"],
|
|
summary=rom["summary"],
|
|
youtube_video_id=rom.get("youtube_video_id", None),
|
|
path_cover_small=rom["path_cover_small"],
|
|
path_cover_large=rom["path_cover_large"],
|
|
is_identified=rom["is_identified"],
|
|
revision=rom.get("revision", None),
|
|
regions=rom.get("regions", []),
|
|
languages=rom.get("languages", []),
|
|
tags=rom.get("tags", []),
|
|
crc_hash=rom.get("crc_hash", ""),
|
|
md5_hash=rom.get("md5_hash", ""),
|
|
sha1_hash=rom.get("sha1_hash", ""),
|
|
has_simple_single_file=rom.get("has_simple_single_file", False),
|
|
has_nested_single_file=rom.get("has_nested_single_file", False),
|
|
has_multiple_files=rom.get("has_multiple_files", False),
|
|
merged_screenshots=rom.get("merged_screenshots", []),
|
|
genres=metadatum.get("genres", []),
|
|
franchises=metadatum.get("franchises", []),
|
|
collections=metadatum.get("collections", []),
|
|
companies=metadatum.get("companies", []),
|
|
game_modes=metadatum.get("game_modes", []),
|
|
age_ratings=metadatum.get("age_ratings", []),
|
|
first_release_date=metadatum.get("first_release_date", None),
|
|
average_rating=metadatum.get("average_rating", None),
|
|
)
|
|
)
|
|
|
|
self.status.roms = _roms
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = True
|
|
self.status.roms_ready.set()
|
|
|
|
def _reset_download_status(
|
|
self, valid_host: bool = False, valid_credentials: bool = False
|
|
) -> None:
|
|
self.status.total_downloaded_bytes = 0
|
|
self.status.downloaded_percent = 0.0
|
|
self.status.valid_host = valid_host
|
|
self.status.valid_credentials = valid_credentials
|
|
self.status.downloading_rom = None
|
|
self.status.extracting_rom = False
|
|
self.status.multi_selected_roms = []
|
|
self.status.download_queue = []
|
|
self.status.download_rom_ready.set()
|
|
self.status.abort_download.set()
|
|
|
|
def download_rom(self) -> None:
|
|
self.status.download_queue.sort(key=lambda rom: rom.name)
|
|
for i, rom in enumerate(self.status.download_queue):
|
|
self.status.downloading_rom = rom
|
|
self.status.downloading_rom_position = i + 1
|
|
dest_path = os.path.join(
|
|
self.file_system.get_platforms_storage_path(rom.platform_slug),
|
|
self._sanitize_filename(rom.fs_name),
|
|
)
|
|
url = f"{self.host}/{self._roms_endpoint}/{rom.id}/content/{quote(rom.fs_name)}?hidden_folder=true"
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
|
|
try:
|
|
print(f"Fetching: {url}")
|
|
request = Request(url, headers=self.headers)
|
|
except ValueError:
|
|
self._reset_download_status()
|
|
return
|
|
|
|
try:
|
|
if request.type not in ("http", "https"):
|
|
self._reset_download_status()
|
|
return
|
|
print(f"Downloading {rom.name} to {dest_path}")
|
|
with (
|
|
urlopen(request) as response, # trunk-ignore(bandit/B310)
|
|
open(dest_path, "wb") as out_file,
|
|
):
|
|
self.status.total_downloaded_bytes = 0
|
|
chunk_size = 1024
|
|
while True:
|
|
if not self.status.abort_download.is_set():
|
|
chunk = response.read(chunk_size)
|
|
if not chunk:
|
|
print("Finalized download")
|
|
break
|
|
out_file.write(chunk)
|
|
self.status.valid_host = True
|
|
self.status.valid_credentials = True
|
|
self.status.total_downloaded_bytes += len(chunk)
|
|
self.status.downloaded_percent = (
|
|
self.status.total_downloaded_bytes
|
|
/ (
|
|
self.status.downloading_rom.fs_size_bytes + 1
|
|
) # Add 1 virtual byte to avoid division by zero
|
|
) * 100
|
|
else:
|
|
self._reset_download_status(True, True)
|
|
os.remove(dest_path)
|
|
return
|
|
|
|
# Handle multi-file (ZIP) ROMs
|
|
if rom.has_multiple_files:
|
|
self.status.extracting_rom = True
|
|
print("Multi-file rom detected. Extracting...")
|
|
with zipfile.ZipFile(dest_path, "r") as zip_ref:
|
|
total_size = sum(file.file_size for file in zip_ref.infolist())
|
|
extracted_size = 0
|
|
chunk_size = 1024
|
|
for file in zip_ref.infolist():
|
|
if not self.status.abort_download.is_set():
|
|
file_path = os.path.join(
|
|
os.path.dirname(dest_path),
|
|
self._sanitize_filename(file.filename),
|
|
)
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
|
with (
|
|
zip_ref.open(file) as source,
|
|
open(file_path, "wb") as target,
|
|
):
|
|
while True:
|
|
chunk = source.read(chunk_size)
|
|
if not chunk:
|
|
break
|
|
target.write(chunk)
|
|
extracted_size += len(chunk)
|
|
self.status.extracted_percent = (
|
|
extracted_size / total_size
|
|
) * 100
|
|
else:
|
|
self._reset_download_status(True, True)
|
|
os.remove(dest_path)
|
|
return
|
|
self.status.extracting_rom = False
|
|
self.status.downloading_rom = None
|
|
os.remove(dest_path)
|
|
print(f"Extracted {rom.name} at {os.path.dirname(dest_path)}")
|
|
except HTTPError as e:
|
|
if e.code == 403:
|
|
self._reset_download_status(valid_host=True)
|
|
return
|
|
else:
|
|
raise
|
|
except URLError:
|
|
self._reset_download_status(valid_host=True)
|
|
return
|
|
|
|
# Check if the catalogue path is set and valid
|
|
catalogue_path = self.file_system.get_catalogue_platform_path(
|
|
rom.platform_slug
|
|
)
|
|
if not catalogue_path:
|
|
continue
|
|
|
|
filename = self._sanitize_filename(rom.fs_name_no_ext)
|
|
if rom.summary:
|
|
text_path = os.path.join(
|
|
catalogue_path,
|
|
"text",
|
|
f"{filename}.txt",
|
|
)
|
|
os.makedirs(os.path.dirname(text_path), exist_ok=True)
|
|
with open(text_path, "w") as f:
|
|
f.write(rom.summary)
|
|
f.write("\n\n")
|
|
|
|
if rom.first_release_date:
|
|
dt = datetime.datetime.fromtimestamp(
|
|
rom.first_release_date / 1000
|
|
)
|
|
formatted_date = dt.strftime("%Y-%m-%d")
|
|
f.write(f"First release date: {formatted_date}\n")
|
|
|
|
if rom.average_rating:
|
|
f.write(f"Average rating: {rom.average_rating}\n")
|
|
|
|
if rom.genres:
|
|
f.write(f"Genres: {', '.join(rom.genres)}\n")
|
|
|
|
if rom.franchises:
|
|
f.write(f"Franchises: {', '.join(rom.franchises)}\n")
|
|
|
|
if rom.companies:
|
|
f.write(f"Companies: {', '.join(rom.companies)}\n")
|
|
|
|
# Don't download covers and previews if the user disabled the option
|
|
if not self._download_assets:
|
|
continue
|
|
|
|
box_path = os.path.join(catalogue_path, "box", f"{filename}.png")
|
|
preview_path = os.path.join(catalogue_path, "preview", f"{filename}.png")
|
|
|
|
# Download cover and preview images
|
|
os.makedirs(os.path.dirname(box_path), exist_ok=True)
|
|
os.makedirs(os.path.dirname(preview_path), exist_ok=True)
|
|
|
|
self.image_utils.process_assets(
|
|
fullscreen=self._fullscreen_assets,
|
|
cover_url=rom.path_cover_small,
|
|
screenshot_urls=rom.merged_screenshots,
|
|
box_path=box_path,
|
|
preview_path=preview_path,
|
|
headers=self.headers,
|
|
)
|
|
|
|
# End of download
|
|
self._reset_download_status(valid_host=True, valid_credentials=True)
|