mirror of
https://github.com/rommapp/romm.git
synced 2026-02-05 02:58:28 +08:00
Small change including type hints for the Task base classes, and related fixes to related tests.
132 lines
3.9 KiB
Python
132 lines
3.9 KiB
Python
from abc import ABC, abstractmethod
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from exceptions.task_exceptions import SchedulerException
|
|
from handler.redis_handler import low_prio_queue
|
|
from logger.logger import log
|
|
from rq.job import Job
|
|
from rq_scheduler import Scheduler
|
|
from utils.context import ctx_httpx_client
|
|
|
|
tasks_scheduler = Scheduler(queue=low_prio_queue, connection=low_prio_queue.connection)
|
|
|
|
|
|
class Task(ABC):
|
|
"""Base class for all RQ tasks."""
|
|
|
|
title: str
|
|
description: str
|
|
enabled: bool
|
|
manual_run: bool
|
|
cron_string: str | None = None
|
|
|
|
def __init__(
|
|
self,
|
|
title: str,
|
|
description: str,
|
|
enabled: bool = False,
|
|
manual_run: bool = False,
|
|
cron_string: str | None = None,
|
|
):
|
|
self.title = title
|
|
self.description = description or title
|
|
self.enabled = enabled
|
|
self.manual_run = manual_run
|
|
self.cron_string = cron_string
|
|
|
|
@abstractmethod
|
|
async def run(self, *args: Any, **kwargs: Any) -> Any: ...
|
|
|
|
|
|
class PeriodicTask(Task, ABC):
|
|
"""Base class for periodic tasks that can be scheduled."""
|
|
|
|
def __init__(self, *args: Any, func: str, **kwargs: Any):
|
|
super().__init__(*args, **kwargs)
|
|
self.func = func
|
|
|
|
def _get_existing_job(self) -> Job | None:
|
|
existing_jobs = tasks_scheduler.get_jobs()
|
|
for job in existing_jobs:
|
|
if isinstance(job, Job) and job.func_name == self.func:
|
|
return job
|
|
|
|
return None
|
|
|
|
def init(self) -> Job | None:
|
|
"""Initialize the task by scheduling or unscheduling it based on its state.
|
|
|
|
Returns the scheduled job if it was successfully scheduled, or None if it was already
|
|
scheduled or unscheduled.
|
|
"""
|
|
job = self._get_existing_job()
|
|
|
|
if self.enabled and not job:
|
|
return self.schedule()
|
|
elif job and not self.enabled:
|
|
self.unschedule()
|
|
return None
|
|
return None
|
|
|
|
def schedule(self) -> Job | None:
|
|
"""Schedule the task if it is enabled and not already scheduled.
|
|
|
|
Returns the scheduled job if successful, or None otherwise.
|
|
"""
|
|
if not self.enabled:
|
|
raise SchedulerException(f"Scheduled {self.description} is not enabled.")
|
|
|
|
if self._get_existing_job():
|
|
log.info(f"{self.description.capitalize()} is already scheduled.")
|
|
return None
|
|
|
|
if self.cron_string:
|
|
return tasks_scheduler.cron(
|
|
self.cron_string,
|
|
func=self.func,
|
|
repeat=None,
|
|
)
|
|
|
|
return None
|
|
|
|
def unschedule(self) -> bool:
|
|
"""Unschedule the task if it is currently scheduled.
|
|
|
|
Returns whether the unscheduling was successful.
|
|
"""
|
|
job = self._get_existing_job()
|
|
if not job:
|
|
log.info(f"{self.description.capitalize()} is not scheduled.")
|
|
return False
|
|
|
|
tasks_scheduler.cancel(job)
|
|
log.info(f"{self.description.capitalize()} unscheduled.")
|
|
return True
|
|
|
|
|
|
class RemoteFilePullTask(PeriodicTask, ABC):
|
|
"""Base class for tasks that pull files from a remote URL."""
|
|
|
|
def __init__(self, *args: Any, url: str, **kwargs: Any):
|
|
super().__init__(*args, **kwargs)
|
|
self.url = url
|
|
|
|
async def run(self, force: bool = False) -> bytes | None:
|
|
if not self.enabled and not force:
|
|
log.info(f"Scheduled {self.description} not enabled, unscheduling...")
|
|
self.unschedule()
|
|
return None
|
|
|
|
log.info(f"Scheduled {self.description} started...")
|
|
|
|
httpx_client = ctx_httpx_client.get()
|
|
try:
|
|
response = await httpx_client.get(self.url, timeout=120)
|
|
response.raise_for_status()
|
|
return response.content
|
|
except httpx.HTTPError as e:
|
|
log.error(f"Scheduled {self.description} failed", exc_info=True)
|
|
log.error(e)
|
|
return None
|