Michael Manganiello 637a0b74f7
misc: Add type hints to task classes
Small change including type hints for the Task base classes, and related
fixes to related tests.
2025-08-06 15:55:41 -03:00

132 lines
3.9 KiB
Python

from abc import ABC, abstractmethod
from typing import Any
import httpx
from exceptions.task_exceptions import SchedulerException
from handler.redis_handler import low_prio_queue
from logger.logger import log
from rq.job import Job
from rq_scheduler import Scheduler
from utils.context import ctx_httpx_client
tasks_scheduler = Scheduler(queue=low_prio_queue, connection=low_prio_queue.connection)
class Task(ABC):
"""Base class for all RQ tasks."""
title: str
description: str
enabled: bool
manual_run: bool
cron_string: str | None = None
def __init__(
self,
title: str,
description: str,
enabled: bool = False,
manual_run: bool = False,
cron_string: str | None = None,
):
self.title = title
self.description = description or title
self.enabled = enabled
self.manual_run = manual_run
self.cron_string = cron_string
@abstractmethod
async def run(self, *args: Any, **kwargs: Any) -> Any: ...
class PeriodicTask(Task, ABC):
"""Base class for periodic tasks that can be scheduled."""
def __init__(self, *args: Any, func: str, **kwargs: Any):
super().__init__(*args, **kwargs)
self.func = func
def _get_existing_job(self) -> Job | None:
existing_jobs = tasks_scheduler.get_jobs()
for job in existing_jobs:
if isinstance(job, Job) and job.func_name == self.func:
return job
return None
def init(self) -> Job | None:
"""Initialize the task by scheduling or unscheduling it based on its state.
Returns the scheduled job if it was successfully scheduled, or None if it was already
scheduled or unscheduled.
"""
job = self._get_existing_job()
if self.enabled and not job:
return self.schedule()
elif job and not self.enabled:
self.unschedule()
return None
return None
def schedule(self) -> Job | None:
"""Schedule the task if it is enabled and not already scheduled.
Returns the scheduled job if successful, or None otherwise.
"""
if not self.enabled:
raise SchedulerException(f"Scheduled {self.description} is not enabled.")
if self._get_existing_job():
log.info(f"{self.description.capitalize()} is already scheduled.")
return None
if self.cron_string:
return tasks_scheduler.cron(
self.cron_string,
func=self.func,
repeat=None,
)
return None
def unschedule(self) -> bool:
"""Unschedule the task if it is currently scheduled.
Returns whether the unscheduling was successful.
"""
job = self._get_existing_job()
if not job:
log.info(f"{self.description.capitalize()} is not scheduled.")
return False
tasks_scheduler.cancel(job)
log.info(f"{self.description.capitalize()} unscheduled.")
return True
class RemoteFilePullTask(PeriodicTask, ABC):
"""Base class for tasks that pull files from a remote URL."""
def __init__(self, *args: Any, url: str, **kwargs: Any):
super().__init__(*args, **kwargs)
self.url = url
async def run(self, force: bool = False) -> bytes | None:
if not self.enabled and not force:
log.info(f"Scheduled {self.description} not enabled, unscheduling...")
self.unschedule()
return None
log.info(f"Scheduled {self.description} started...")
httpx_client = ctx_httpx_client.get()
try:
response = await httpx_client.get(self.url, timeout=120)
response.raise_for_status()
return response.content
except httpx.HTTPError as e:
log.error(f"Scheduled {self.description} failed", exc_info=True)
log.error(e)
return None