mirror of
https://github.com/meeb/tubesync.git
synced 2026-04-06 00:01:50 +08:00
396 lines
14 KiB
Python
396 lines
14 KiB
Python
from django.conf import settings
|
|
from django.http import HttpResponseNotFound, HttpResponseRedirect
|
|
from django.views import View
|
|
from django.views.generic import ListView
|
|
from django.views.generic.edit import FormView
|
|
from django.views.generic.detail import SingleObjectMixin
|
|
from django.urls import reverse_lazy
|
|
from django.db.models import F
|
|
from django.forms import ValidationError
|
|
from django.utils import timezone
|
|
from django.utils.translation import gettext_lazy as _
|
|
from common.models import TaskHistory
|
|
from common.timestamp import timestamp_to_datetime
|
|
from common.utils import append_uri_params, multi_key_sort
|
|
from common.huey import h_q_reset_tasks
|
|
from common.logger import log
|
|
from django_huey import DJANGO_HUEY, get_queue
|
|
from .utils import get_waiting_tasks
|
|
from ..models import Source
|
|
from django import forms
|
|
from ..forms import ScheduleTaskForm
|
|
from ..tasks import (
|
|
map_task_to_instance, get_error_message,
|
|
get_running_tasks, check_source_directory_exists,
|
|
)
|
|
|
|
|
|
class TasksView(ListView):
|
|
'''
|
|
A list of tasks queued to be completed. This is, for example, scraping for new
|
|
media or downloading media.
|
|
'''
|
|
|
|
template_name = 'sync/tasks.html'
|
|
context_object_name = 'tasks'
|
|
paginate_by = settings.TASKS_PER_PAGE
|
|
messages = {
|
|
'filter': _('Viewing tasks filtered for source: <strong>{name}</strong>'),
|
|
'reset': _('All tasks have been reset'),
|
|
'revoked': _('Revoked task: {task_id}'),
|
|
'scheduled': _('Scheduled task: {name}'),
|
|
}
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.filter_source = None
|
|
self.message = None
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
message_key = request.GET.get('message', '')
|
|
self.message = self.messages.get(message_key, '')
|
|
filter_by = request.GET.get('filter', '')
|
|
if filter_by:
|
|
try:
|
|
self.filter_source = Source.objects.get(pk=filter_by)
|
|
except Source.DoesNotExist:
|
|
self.filter_source = None
|
|
else:
|
|
if not message_key or 'filter' == message_key:
|
|
message = self.messages.get('filter', '')
|
|
self.message = message.format(
|
|
name=self.filter_source.name
|
|
)
|
|
|
|
if message_key in ('revoked', 'scheduled'):
|
|
fmt_vars = dict(
|
|
pk=request.GET.get('pk', 'Unknown'),
|
|
task_id=request.GET.get('task_id', 'Unknown'),
|
|
)
|
|
try:
|
|
task = TaskHistory.objects.get(pk=fmt_vars['pk'])
|
|
except TaskHistory.DoesNotExist:
|
|
fmt_vars['name'] = fmt_vars['pk']
|
|
else:
|
|
fmt_vars['name'] = task.verbose_name or task.task_id or task.pk
|
|
fmt_vars['task_id'] = task.task_id
|
|
self.message = self.message.format(**fmt_vars)
|
|
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
def get_queryset(self):
|
|
qs = get_waiting_tasks()
|
|
if self.filter_source:
|
|
params_prefix=f'[["{self.filter_source.pk}"'
|
|
qs = qs.filter(task_params__istartswith=params_prefix)
|
|
return qs.order_by(
|
|
'-priority',
|
|
'scheduled_at',
|
|
'end_at',
|
|
)
|
|
|
|
def get_context_data(self, *args, **kwargs):
|
|
data = super().get_context_data(*args, **kwargs)
|
|
now_dt = timezone.now()
|
|
scheduled_qs = get_waiting_tasks()
|
|
# Huey removes running tasks,
|
|
# so the waiting tasks will not include them.
|
|
running_qs = get_running_tasks(now_dt)
|
|
errors_qs = scheduled_qs.filter(
|
|
attempts__gt=0
|
|
).exclude(last_error__exact='')
|
|
|
|
# Add to context data from ListView
|
|
data['message'] = self.message
|
|
data['source'] = self.filter_source
|
|
data['running'] = list()
|
|
data['errors'] = list()
|
|
data['total_errors'] = errors_qs.count()
|
|
data['scheduled'] = list()
|
|
data['total_scheduled'] = scheduled_qs.count()
|
|
data['wait_for_database_queue'] = False
|
|
|
|
def add_to_task(task):
|
|
setattr(task, 'run_now', task.scheduled_at < now_dt)
|
|
obj, url = map_task_to_instance(task)
|
|
if obj:
|
|
setattr(task, 'instance', obj)
|
|
setattr(task, 'url', url)
|
|
if task.has_error():
|
|
error_message = get_error_message(task)
|
|
setattr(task, 'error_message', error_message)
|
|
return 'error'
|
|
return True and obj
|
|
|
|
for task in running_qs:
|
|
if task in data['running']:
|
|
continue
|
|
add_to_task(task)
|
|
data['running'].append(task)
|
|
|
|
# show all the errors when they fit on one page
|
|
if (data['total_errors'] + len(data['running'])) < self.paginate_by:
|
|
for task in errors_qs:
|
|
if task in data['running']:
|
|
continue
|
|
mapped = add_to_task(task)
|
|
if 'error' == mapped:
|
|
data['errors'].append(task)
|
|
elif mapped:
|
|
data['scheduled'].append(task)
|
|
|
|
for task in data['tasks']:
|
|
already_added = (
|
|
task in data['running'] or
|
|
task in data['errors'] or
|
|
task in data['scheduled']
|
|
)
|
|
if already_added:
|
|
continue
|
|
mapped = add_to_task(task)
|
|
if 'error' == mapped:
|
|
data['errors'].append(task)
|
|
elif mapped or settings.DEBUG:
|
|
data['scheduled'].append(task)
|
|
|
|
sort_keys = (
|
|
# key, reverse
|
|
('priority', True),
|
|
('scheduled_at', False),
|
|
('run_now', True),
|
|
)
|
|
data['errors'] = multi_key_sort(data['errors'], sort_keys, attr=True)
|
|
data['scheduled'] = multi_key_sort(data['scheduled'], sort_keys, attr=True)
|
|
|
|
return data
|
|
|
|
def paginate_queryset(self, queryset, page_size):
|
|
"""Paginate the queryset, if needed."""
|
|
paginator = self.get_paginator(
|
|
queryset,
|
|
page_size,
|
|
orphans=self.get_paginate_orphans(),
|
|
allow_empty_first_page=self.get_allow_empty(),
|
|
)
|
|
page_kwarg = self.page_kwarg
|
|
page = self.kwargs.get(page_kwarg) or self.request.GET.get(page_kwarg) or 1
|
|
try:
|
|
page_number = int(page)
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
if page_number > paginator.num_pages:
|
|
self.kwargs[page_kwarg] = 'last'
|
|
return super().paginate_queryset(queryset, page_size)
|
|
|
|
|
|
class RevokeTaskView(View):
|
|
'''
|
|
Revokes (cancels) a single queued task, then redirects back to the
|
|
tasks list.
|
|
'''
|
|
|
|
def get(self, request, pk):
|
|
try:
|
|
task = TaskHistory.objects.get(pk=pk)
|
|
except TaskHistory.DoesNotExist:
|
|
return HttpResponseNotFound()
|
|
huey_queue_names = (DJANGO_HUEY or {}).get('queues', {})
|
|
huey_queues = { q.name: q for q in map(get_queue, huey_queue_names) }
|
|
q = huey_queues.get(task.queue)
|
|
if q is None:
|
|
msg = f'RevokeTaskView: queue not found: {task.pk=} {task.queue=}'
|
|
log.warning(msg)
|
|
return HttpResponseNotFound()
|
|
# revoke the task we want to cancel
|
|
q.revoke_by_id(id=task.task_id, revoke_once=True)
|
|
vn = task.verbose_name or task.task_id or task.pk
|
|
if not vn.startswith('[revoked] '):
|
|
task.verbose_name = f'[revoked] {vn}'
|
|
task.save()
|
|
return HttpResponseRedirect(append_uri_params(
|
|
reverse_lazy('sync:tasks'),
|
|
dict(
|
|
message='revoked',
|
|
pk=str(task.pk),
|
|
task_id=str(task.task_id),
|
|
),
|
|
))
|
|
|
|
|
|
class CompletedTasksView(ListView):
|
|
'''
|
|
List of tasks which have been completed with an optional per-source filter.
|
|
'''
|
|
|
|
template_name = 'sync/tasks-completed.html'
|
|
context_object_name = 'tasks'
|
|
paginate_by = settings.TASKS_PER_PAGE
|
|
messages = {
|
|
'filter': _('Viewing tasks filtered for source: <strong>{name}</strong>'),
|
|
}
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.filter_source = None
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
filter_by = request.GET.get('filter', '')
|
|
if filter_by:
|
|
try:
|
|
self.filter_source = Source.objects.get(pk=filter_by)
|
|
except Source.DoesNotExist:
|
|
self.filter_source = None
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
def get_queryset(self):
|
|
qs = TaskHistory.objects.filter(
|
|
start_at__isnull=False,
|
|
end_at__gt=F('start_at'),
|
|
)
|
|
if self.filter_source:
|
|
params_prefix=f'[["{self.filter_source.pk}"'
|
|
qs = qs.filter(task_params__istartswith=params_prefix)
|
|
return qs.order_by('-end_at')
|
|
|
|
def get_context_data(self, *args, **kwargs):
|
|
data = super().get_context_data(*args, **kwargs)
|
|
for task in data['tasks']:
|
|
if task.has_error():
|
|
error_message = get_error_message(task)
|
|
setattr(task, 'error_message', error_message)
|
|
data['message'] = ''
|
|
data['source'] = self.filter_source
|
|
if self.filter_source:
|
|
message = str(self.messages.get('filter', ''))
|
|
data['message'] = message.format(name=self.filter_source.name)
|
|
return data
|
|
|
|
|
|
class ResetTasks(FormView):
|
|
'''
|
|
Confirm that all tasks should be reset. As all tasks are triggered from
|
|
signals by checking for files existing etc. this can be done by just deleting
|
|
all tasks and then calling every Source objects .save() method.
|
|
'''
|
|
|
|
template_name = 'sync/tasks-reset.html'
|
|
form_class = forms.Form
|
|
|
|
def form_valid(self, form):
|
|
# Delete all tasks
|
|
huey_queue_names = (DJANGO_HUEY or {}).get('queues', {})
|
|
for queue_name in huey_queue_names:
|
|
h_q_reset_tasks(queue_name)
|
|
# Iter all tasks
|
|
for source in Source.objects.all():
|
|
check_source_directory_exists(str(source.pk))
|
|
# This also chains down to call each Media objects .save() as well
|
|
source.save()
|
|
return super().form_valid(form)
|
|
|
|
def get_success_url(self):
|
|
url = reverse_lazy('sync:tasks')
|
|
return append_uri_params(url, {'message': 'reset'})
|
|
|
|
|
|
class TaskScheduleView(FormView, SingleObjectMixin):
|
|
'''
|
|
Confirm that the task should be re-scheduled.
|
|
'''
|
|
|
|
template_name = 'sync/task-schedule.html'
|
|
form_class = ScheduleTaskForm
|
|
model = TaskHistory
|
|
context_object_name = 'task'
|
|
errors = dict(
|
|
invalid_when=_('The type ({}) was incorrect.'),
|
|
when_before_now=_('The date and time must be in the future.'),
|
|
)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.now = timezone.now()
|
|
self.object = None
|
|
self.timestamp = None
|
|
self.when = None
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
self.now = timezone.now()
|
|
self.object = self.get_object()
|
|
self.timestamp = kwargs.get('timestamp')
|
|
try:
|
|
self.when = timestamp_to_datetime(self.timestamp)
|
|
except AssertionError:
|
|
self.when = None
|
|
if self.when is None:
|
|
self.when = self.now
|
|
# Use the next minute and zero seconds
|
|
# The web browser does not select seconds by default
|
|
self.when = self.when.replace(second=0) + timezone.timedelta(minutes=1)
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
|
def get_initial(self):
|
|
initial = super().get_initial()
|
|
initial['now'] = self.now
|
|
initial['when'] = self.when
|
|
return initial
|
|
|
|
def get_context_data(self, *args, **kwargs):
|
|
data = super().get_context_data(*args, **kwargs)
|
|
data['now'] = self.now
|
|
data['when'] = self.when
|
|
return data
|
|
|
|
def get_success_url(self):
|
|
return append_uri_params(
|
|
reverse_lazy('sync:tasks'),
|
|
dict(
|
|
message='scheduled',
|
|
pk=str(self.object.pk),
|
|
task_id=str(self.object.task_id),
|
|
),
|
|
)
|
|
|
|
def form_valid(self, form):
|
|
when = form.cleaned_data.get('when')
|
|
|
|
if not isinstance(when, self.now.__class__):
|
|
form.add_error(
|
|
'when',
|
|
ValidationError(
|
|
self.errors['invalid_when'].format(
|
|
type(when),
|
|
),
|
|
),
|
|
)
|
|
if when < self.now:
|
|
form.add_error(
|
|
'when',
|
|
ValidationError(self.errors['when_before_now']),
|
|
)
|
|
|
|
if form.errors:
|
|
return super().form_invalid(form)
|
|
|
|
pk = self.object.pk
|
|
queue = self.object.queue
|
|
task_id = self.object.task_id
|
|
huey_queue_names = (DJANGO_HUEY or {}).get('queues', {})
|
|
huey_queues = { q.name: q for q in map(get_queue, huey_queue_names) }
|
|
q = huey_queues.get(queue)
|
|
if q is None:
|
|
msg = f'TaskScheduleView: queue not found: {pk=} {queue=}'
|
|
log.warning(msg)
|
|
else:
|
|
eta = max(self.now, when)
|
|
if q.reschedule(task_id, eta):
|
|
vn = self.object.verbose_name or task_id or pk
|
|
self.object.verbose_name = f'[revoked] {vn}'
|
|
self.object.save()
|
|
else:
|
|
msg = f'TaskScheduleView: task not found: {pk=} {task_id=}'
|
|
log.warning(msg)
|
|
|
|
return super().form_valid(form)
|