mirror of
https://github.com/meeb/tubesync.git
synced 2026-04-06 00:01:50 +08:00
455 lines
19 KiB
Python
455 lines
19 KiB
Python
import logging
|
|
from datetime import datetime
|
|
from urllib.parse import urlsplit
|
|
from django.conf import settings
|
|
from django.test import TestCase, Client
|
|
from django.utils import timezone
|
|
from django_huey import DJANGO_HUEY, get_queue
|
|
from common.models import TaskHistory
|
|
from sync.models import Source, Media
|
|
from .fixtures import all_test_metadata
|
|
from sync.tasks import (
|
|
check_source_directory_exists,
|
|
get_media_download_task, get_media_thumbnail_task,
|
|
)
|
|
from sync.choices import (
|
|
Val, Fallback, FilterSeconds, IndexSchedule, SourceResolution,
|
|
TaskQueue, YouTube_AudioCodec, YouTube_VideoCodec,
|
|
YouTube_SourceType,
|
|
)
|
|
|
|
class FrontEndTestCase(TestCase):
|
|
maxDiff = None
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
# Use immediate mode to execute tasks in this process
|
|
for qn in DJANGO_HUEY.get('queues', dict()):
|
|
q = get_queue(qn)
|
|
# Set the storage variable before using the property.
|
|
q.immediate_use_memory = True
|
|
q.immediate = False
|
|
|
|
def setUp(self):
|
|
# Disable general logging for test case
|
|
logging.disable(logging.CRITICAL)
|
|
|
|
def test_dashboard(self):
|
|
c = Client()
|
|
response = c.get('/')
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
def test_validate_source(self):
|
|
test_sources = {
|
|
'youtube-channel': {
|
|
'valid': (
|
|
'https://m.youtube.com/testchannel',
|
|
'https://m.youtube.com/c/testchannel',
|
|
'https://m.youtube.com/c/testchannel/videos',
|
|
'https://www.youtube.com/testchannel',
|
|
'https://www.youtube.com/c/testchannel',
|
|
'https://www.youtube.com/c/testchannel/videos',
|
|
),
|
|
'invalid_schema': (
|
|
'http://www.youtube.com/c/playlist',
|
|
'ftp://www.youtube.com/c/playlist',
|
|
),
|
|
'invalid_domain': (
|
|
'https://www.test.com/c/testchannel',
|
|
'https://www.example.com/c/testchannel',
|
|
'https://n.youtube.com/c/testchannel',
|
|
),
|
|
'invalid_path': (
|
|
'https://www.youtube.com/test/invalid',
|
|
'https://www.youtube.com/c/test/invalid',
|
|
),
|
|
'invalid_reserved_paths': (
|
|
'https://www.youtube.com/watch?v=OkMadb8cpIw',
|
|
'https://www.youtube.com/watch',
|
|
'https://www.youtube.com/shorts',
|
|
'https://www.youtube.com/live',
|
|
'https://www.youtube.com/feed',
|
|
'https://www.youtube.com/trending',
|
|
),
|
|
},
|
|
'youtube-channel-id': {
|
|
'valid': (
|
|
'https://m.youtube.com/channel/channelid',
|
|
'https://m.youtube.com/channel/channelid/videos',
|
|
'https://www.youtube.com/channel/channelid',
|
|
'https://www.youtube.com/channel/channelid/videos',
|
|
),
|
|
'invalid_schema': (
|
|
'http://www.youtube.com/channel/channelid',
|
|
'ftp://www.youtube.com/channel/channelid',
|
|
),
|
|
'invalid_domain': (
|
|
'https://www.test.com/channel/channelid',
|
|
'https://www.example.com/channel/channelid',
|
|
'https://n.youtube.com/channel/channelid',
|
|
),
|
|
'invalid_path': (
|
|
'https://www.youtube.com/test/invalid',
|
|
'https://www.youtube.com/channel/test/invalid',
|
|
),
|
|
},
|
|
'youtube-playlist': {
|
|
'valid': (
|
|
'https://m.youtube.com/playlist?list=testplaylist',
|
|
'https://m.youtube.com/watch?v=testvideo&list=testplaylist',
|
|
'https://www.youtube.com/playlist?list=testplaylist',
|
|
'https://www.youtube.com/watch?v=testvideo&list=testplaylist',
|
|
),
|
|
'invalid_schema': (
|
|
'http://www.youtube.com/playlist?list=testplaylist',
|
|
'ftp://www.youtube.com/playlist?list=testplaylist',
|
|
),
|
|
'invalid_domain': (
|
|
'https://www.test.com/playlist?list=testplaylist',
|
|
'https://www.example.com/playlist?list=testplaylist',
|
|
'https://n.youtube.com/playlist?list=testplaylist',
|
|
),
|
|
'invalid_path': (
|
|
'https://www.youtube.com/test/invalid',
|
|
),
|
|
}
|
|
}
|
|
c = Client()
|
|
response = c.get('/source-validate')
|
|
self.assertEqual(response.status_code, 200)
|
|
for (source_type, tests) in test_sources.items():
|
|
for test, urls in tests.items():
|
|
for url in urls:
|
|
data = {'source_url': url}
|
|
response = c.post('/source-validate', data)
|
|
if test == 'valid':
|
|
# Valid source tests should bounce to /source-add
|
|
self.assertEqual(response.status_code, 302)
|
|
url_parts = urlsplit(response.url)
|
|
self.assertEqual(url_parts.path, '/source-add')
|
|
else:
|
|
# Invalid source tests should reload the page with an error
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertIn('<ul class="errorlist"',
|
|
response.content.decode())
|
|
|
|
def test_add_source_prepopulation(self):
|
|
c = Client()
|
|
response = c.get('/source-add?key=testkey&name=testname&directory=testdir')
|
|
self.assertEqual(response.status_code, 200)
|
|
html = response.content.decode()
|
|
checked_key, checked_name, checked_directory = False, False, False
|
|
for line in html.split('\n'):
|
|
if 'id="id_key"' in line:
|
|
self.assertIn('value="testkey', line)
|
|
checked_key = True
|
|
if 'id="id_name"' in line:
|
|
self.assertIn('value="testname', line)
|
|
checked_name = True
|
|
if 'id="id_directory"' in line:
|
|
self.assertIn('value="testdir', line)
|
|
checked_directory = True
|
|
self.assertTrue(checked_key)
|
|
self.assertTrue(checked_name)
|
|
self.assertTrue(checked_directory)
|
|
|
|
def test_source(self):
|
|
#logging.disable(logging.NOTSET)
|
|
# Sources overview page
|
|
c = Client()
|
|
response = c.get('/sources')
|
|
self.assertEqual(response.status_code, 200)
|
|
# Add as source form
|
|
response = c.get('/source-add')
|
|
self.assertEqual(response.status_code, 200)
|
|
# Create a new source
|
|
data_categories = ('sponsor', 'preview', 'preview', 'sponsor',)
|
|
expected_categories = ['sponsor', 'preview']
|
|
data = {
|
|
'source_type': 'c',
|
|
'key': 'testkey',
|
|
'name': 'testname',
|
|
'directory': 'testdirectory',
|
|
'media_format': settings.MEDIA_FORMATSTR_DEFAULT,
|
|
'download_cap': 0,
|
|
'filter_text': '.*',
|
|
'filter_seconds_min': bool(FilterSeconds.MIN),
|
|
'index_schedule': 3600,
|
|
'download_media': False,
|
|
'index_videos': True,
|
|
'delete_old_media': False,
|
|
'days_to_keep': 14,
|
|
'source_resolution': '1080p',
|
|
'source_vcodec': 'VP9',
|
|
'source_acodec': 'OPUS',
|
|
'prefer_60fps': False,
|
|
'prefer_hdr': False,
|
|
'fallback': 'f',
|
|
'sponsorblock_categories': data_categories,
|
|
'sub_langs': 'en',
|
|
}
|
|
response = c.post('/source-add', data)
|
|
self.assertEqual(response.status_code, 302)
|
|
url_parts = urlsplit(response.url)
|
|
url_path = str(url_parts.path).strip()
|
|
if url_path.startswith('/'):
|
|
url_path = url_path[1:]
|
|
path_parts = url_path.split('/')
|
|
self.assertEqual(path_parts[0], 'source')
|
|
source_uuid = path_parts[1]
|
|
source = Source.objects.get(pk=source_uuid)
|
|
self.assertEqual(str(source.pk), source_uuid)
|
|
# Check that the SponsorBlock categories were saved
|
|
self.assertEqual(source.sponsorblock_categories.selected_choices,
|
|
expected_categories)
|
|
# Run the check_source_directory_exists task
|
|
check_source_directory_exists.call_local(source_uuid)
|
|
# Check the source is now on the source overview page
|
|
response = c.get('/sources')
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertIn(source_uuid, response.content.decode())
|
|
# Check the source detail page loads
|
|
response = c.get(f'/source/{source_uuid}')
|
|
self.assertEqual(response.status_code, 200)
|
|
# Check a task was created to index the media for the new source
|
|
index_task_qs = TaskHistory.objects.filter(
|
|
name='sync.tasks.index_source',
|
|
task_params__0__0=source_uuid,
|
|
).order_by('end_at')
|
|
self.assertTrue(index_task_qs)
|
|
task = index_task_qs.last()
|
|
self.assertEqual(task.queue, get_queue(Val(TaskQueue.LIMIT)).name)
|
|
# save and refresh the Source
|
|
source.refresh_from_db()
|
|
source.sponsorblock_categories.selected_choices.append('sponsor')
|
|
source.save()
|
|
source.refresh_from_db()
|
|
# Check that the SponsorBlock categories remain saved
|
|
self.assertEqual(source.sponsorblock_categories.selected_choices,
|
|
expected_categories)
|
|
# Update the source key
|
|
data = {
|
|
'source_type': Val(YouTube_SourceType.CHANNEL),
|
|
'key': 'updatedkey', # changed
|
|
'name': 'testname',
|
|
'directory': 'testdirectory',
|
|
'media_format': settings.MEDIA_FORMATSTR_DEFAULT,
|
|
'download_cap': 0,
|
|
'filter_text': '.*',
|
|
'filter_seconds_min': bool(FilterSeconds.MIN),
|
|
'index_schedule': Val(IndexSchedule.EVERY_HOUR),
|
|
'delete_old_media': False,
|
|
'days_to_keep': 14,
|
|
'source_resolution': Val(SourceResolution.VIDEO_1080P),
|
|
'source_vcodec': Val(YouTube_VideoCodec.VP9),
|
|
'source_acodec': Val(YouTube_AudioCodec.OPUS),
|
|
'prefer_60fps': False,
|
|
'prefer_hdr': False,
|
|
'fallback': Val(Fallback.FAIL),
|
|
'sponsorblock_categories': data_categories,
|
|
'sub_langs': 'en',
|
|
}
|
|
response = c.post(f'/source-update/{source_uuid}', data)
|
|
self.assertEqual(response.status_code, 302)
|
|
url_parts = urlsplit(response.url)
|
|
url_path = str(url_parts.path).strip()
|
|
if url_path.startswith('/'):
|
|
url_path = url_path[1:]
|
|
path_parts = url_path.split('/')
|
|
self.assertEqual(path_parts[0], 'source')
|
|
source_uuid = path_parts[1]
|
|
source = Source.objects.get(pk=source_uuid)
|
|
self.assertEqual(source.key, 'updatedkey')
|
|
# Check that the SponsorBlock categories remain saved
|
|
source.refresh_from_db()
|
|
self.assertEqual(source.sponsorblock_categories.selected_choices,
|
|
expected_categories)
|
|
# Update the source index schedule which should recreate the scheduled task
|
|
data = {
|
|
'source_type': Val(YouTube_SourceType.CHANNEL),
|
|
'key': 'updatedkey',
|
|
'name': 'testname',
|
|
'directory': 'testdirectory',
|
|
'media_format': settings.MEDIA_FORMATSTR_DEFAULT,
|
|
'download_cap': 0,
|
|
'filter_text': '.*',
|
|
'filter_seconds_min': bool(FilterSeconds.MIN),
|
|
'index_schedule': Val(IndexSchedule.EVERY_2_HOURS), # changed
|
|
'delete_old_media': False,
|
|
'days_to_keep': 14,
|
|
'source_resolution': Val(SourceResolution.VIDEO_1080P),
|
|
'source_vcodec': Val(YouTube_VideoCodec.VP9),
|
|
'source_acodec': Val(YouTube_AudioCodec.OPUS),
|
|
'prefer_60fps': False,
|
|
'prefer_hdr': False,
|
|
'fallback': Val(Fallback.FAIL),
|
|
'sponsorblock_categories': data_categories,
|
|
'sub_langs': 'en',
|
|
}
|
|
response = c.post(f'/source-update/{source_uuid}', data)
|
|
self.assertEqual(response.status_code, 302)
|
|
url_parts = urlsplit(response.url)
|
|
url_path = str(url_parts.path).strip()
|
|
if url_path.startswith('/'):
|
|
url_path = url_path[1:]
|
|
path_parts = url_path.split('/')
|
|
self.assertEqual(path_parts[0], 'source')
|
|
source_uuid = path_parts[1]
|
|
source = Source.objects.get(pk=source_uuid)
|
|
# Check that the SponsorBlock categories remain saved
|
|
self.assertEqual(source.sponsorblock_categories.selected_choices,
|
|
expected_categories)
|
|
# Check a new task has been created by seeing if the pk has changed
|
|
new_task = index_task_qs.last()
|
|
self.assertNotEqual(task.pk, new_task.pk)
|
|
# Delete source confirmation page
|
|
response = c.get(f'/source-delete/{source_uuid}')
|
|
self.assertEqual(response.status_code, 200)
|
|
# Delete source
|
|
response = c.post(f'/source-delete/{source_uuid}')
|
|
self.assertEqual(response.status_code, 302)
|
|
url_parts = urlsplit(response.url)
|
|
self.assertEqual(url_parts.path, '/sources')
|
|
try:
|
|
Source.objects.get(pk=source_uuid)
|
|
object_gone = False
|
|
except Source.DoesNotExist:
|
|
object_gone = True
|
|
self.assertTrue(object_gone)
|
|
# Check the source is now gone from the source overview page
|
|
response = c.get('/sources')
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertNotIn(source_uuid, response.content.decode())
|
|
# Check the source details page now 404s
|
|
response = c.get(f'/source/{source_uuid}')
|
|
self.assertEqual(response.status_code, 404)
|
|
|
|
def test_media(self):
|
|
# Media overview page
|
|
c = Client()
|
|
response = c.get('/media')
|
|
self.assertEqual(response.status_code, 200)
|
|
# Add a test source
|
|
test_source = Source.objects.create(
|
|
source_type=Val(YouTube_SourceType.CHANNEL),
|
|
key='testkey',
|
|
name='testname',
|
|
directory='testdirectory',
|
|
index_schedule=Val(IndexSchedule.EVERY_HOUR),
|
|
delete_old_media=False,
|
|
days_to_keep=14,
|
|
source_resolution=Val(SourceResolution.VIDEO_1080P),
|
|
source_vcodec=Val(YouTube_VideoCodec.VP9),
|
|
source_acodec=Val(YouTube_AudioCodec.OPUS),
|
|
prefer_60fps=False,
|
|
prefer_hdr=False,
|
|
fallback=Val(Fallback.FAIL)
|
|
)
|
|
# Add some media
|
|
test_minimal_metadata = all_test_metadata['minimal']
|
|
before_dt = timezone.now()
|
|
past_date = timezone.make_aware(datetime(year=2000, month=1, day=1))
|
|
test_media1 = Media.objects.create(
|
|
key='mediakey1',
|
|
source=test_source,
|
|
published=past_date,
|
|
metadata=test_minimal_metadata
|
|
)
|
|
test_media1_pk = str(test_media1.pk)
|
|
test_media2 = Media.objects.create(
|
|
key='mediakey2',
|
|
source=test_source,
|
|
published=past_date,
|
|
metadata=test_minimal_metadata
|
|
)
|
|
test_media2_pk = str(test_media2.pk)
|
|
test_media3 = Media.objects.create(
|
|
key='mediakey3',
|
|
source=test_source,
|
|
published=past_date,
|
|
metadata=test_minimal_metadata
|
|
)
|
|
test_media3_pk = str(test_media3.pk)
|
|
# simulate the tasks consumer signals having already run
|
|
now_dt = timezone.now()
|
|
TaskHistory.objects.filter(
|
|
name__startswith='sync.tasks.download_media_',
|
|
).update(
|
|
scheduled_at=before_dt,
|
|
start_at=now_dt,
|
|
end_at=now_dt,
|
|
)
|
|
# Check the tasks to fetch the media thumbnails have been scheduled
|
|
found_download_task1 = get_media_download_task(test_media1_pk)
|
|
found_download_task2 = get_media_download_task(test_media2_pk)
|
|
found_download_task3 = get_media_download_task(test_media3_pk)
|
|
found_thumbnail_task1 = get_media_thumbnail_task(test_media1_pk)
|
|
found_thumbnail_task2 = get_media_thumbnail_task(test_media2_pk)
|
|
found_thumbnail_task3 = get_media_thumbnail_task(test_media3_pk)
|
|
self.assertTrue(found_download_task1)
|
|
self.assertTrue(found_download_task2)
|
|
self.assertTrue(found_download_task3)
|
|
self.assertTrue(found_thumbnail_task1)
|
|
self.assertTrue(found_thumbnail_task2)
|
|
self.assertTrue(found_thumbnail_task3)
|
|
# Check the media is listed on the media overview page
|
|
response = c.get('/media')
|
|
self.assertEqual(response.status_code, 200)
|
|
html = response.content.decode()
|
|
self.assertIn(test_media1_pk, html)
|
|
self.assertIn(test_media2_pk, html)
|
|
self.assertIn(test_media3_pk, html)
|
|
# Check the media detail pages load
|
|
response = c.get(f'/media/{test_media1_pk}')
|
|
self.assertEqual(response.status_code, 200)
|
|
response = c.get(f'/media/{test_media2_pk}')
|
|
self.assertEqual(response.status_code, 200)
|
|
response = c.get(f'/media/{test_media3_pk}')
|
|
self.assertEqual(response.status_code, 200)
|
|
# Delete the media
|
|
test_media1.delete()
|
|
test_media2.delete()
|
|
test_media3.delete()
|
|
# Check the media detail pages now 404
|
|
response = c.get(f'/media/{test_media1_pk}')
|
|
self.assertEqual(response.status_code, 404)
|
|
response = c.get(f'/media/{test_media2_pk}')
|
|
self.assertEqual(response.status_code, 404)
|
|
response = c.get(f'/media/{test_media3_pk}')
|
|
self.assertEqual(response.status_code, 404)
|
|
# simulate the tasks consumer signals having already run
|
|
TaskHistory.objects.filter(
|
|
name__startswith='sync.tasks.download_media_',
|
|
).update(end_at=timezone.now())
|
|
# Confirm any tasks have been deleted
|
|
found_download_task1 = get_media_download_task(test_media1_pk)
|
|
found_download_task2 = get_media_download_task(test_media2_pk)
|
|
found_download_task3 = get_media_download_task(test_media3_pk)
|
|
found_thumbnail_task1 = get_media_thumbnail_task(test_media1_pk)
|
|
found_thumbnail_task2 = get_media_thumbnail_task(test_media2_pk)
|
|
found_thumbnail_task3 = get_media_thumbnail_task(test_media3_pk)
|
|
self.assertFalse(found_download_task1)
|
|
self.assertFalse(found_download_task2)
|
|
self.assertFalse(found_download_task3)
|
|
self.assertFalse(found_thumbnail_task1)
|
|
self.assertFalse(found_thumbnail_task2)
|
|
self.assertFalse(found_thumbnail_task3)
|
|
|
|
def test_tasks(self):
|
|
# Tasks overview page
|
|
c = Client()
|
|
response = c.get('/tasks')
|
|
self.assertEqual(response.status_code, 200)
|
|
# Completed tasks overview page
|
|
response = c.get('/tasks-completed')
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
def test_mediaservers(self):
|
|
# Media servers overview page
|
|
c = Client()
|
|
response = c.get('/mediaservers')
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
|