From 50f5cdb5c2e8bafa45696187531e159a63ced5cf Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 31 Dec 2025 11:45:35 +0000 Subject: [PATCH] Add inline mode to orchestrator for fast CLI piping --- archivebox/cli/archivebox_run.py | 8 +- archivebox/tests/conftest.py | 153 ++---------- archivebox/tests/test_cli_archiveresult.py | 264 -------------------- archivebox/tests/test_cli_crawl.py | 261 -------------------- archivebox/tests/test_cli_piping.py | 270 ++++++++++++++++++++ archivebox/tests/test_cli_run.py | 254 ------------------- archivebox/tests/test_cli_snapshot.py | 274 --------------------- archivebox/tests/test_hooks.py | 0 archivebox/workers/orchestrator.py | 53 +++- 9 files changed, 344 insertions(+), 1193 deletions(-) delete mode 100644 archivebox/tests/test_cli_archiveresult.py delete mode 100644 archivebox/tests/test_cli_crawl.py create mode 100644 archivebox/tests/test_cli_piping.py delete mode 100644 archivebox/tests/test_cli_run.py delete mode 100644 archivebox/tests/test_cli_snapshot.py mode change 100755 => 100644 archivebox/tests/test_hooks.py diff --git a/archivebox/cli/archivebox_run.py b/archivebox/cli/archivebox_run.py index 9901c684..979d9341 100644 --- a/archivebox/cli/archivebox_run.py +++ b/archivebox/cli/archivebox_run.py @@ -47,6 +47,8 @@ def process_stdin_records() -> int: Handles any record type: Crawl, Snapshot, ArchiveResult. Auto-cascades: Crawl → Snapshots → ArchiveResults. + Uses inline mode for fast processing (no subprocess overhead). + Returns exit code (0 = success, 1 = error). """ from django.utils import timezone @@ -148,8 +150,8 @@ def process_stdin_records() -> int: rprint(f'[blue]Processing {queued_count} records...[/blue]', file=sys.stderr) - # Run orchestrator until all queued work is done - orchestrator = Orchestrator(exit_on_idle=True) + # Run orchestrator with inline=True for fast processing (no subprocess overhead) + orchestrator = Orchestrator(exit_on_idle=True, inline=True) orchestrator.runloop() return 0 @@ -193,7 +195,7 @@ def main(daemon: bool): """ Process queued work. - When stdin is piped: Process those specific records and exit. + When stdin is piped: Process those specific records and exit (uses inline mode). When run standalone: Run orchestrator in foreground. """ # Check if stdin has data (non-TTY means piped input) diff --git a/archivebox/tests/conftest.py b/archivebox/tests/conftest.py index ff6f1875..70445c71 100644 --- a/archivebox/tests/conftest.py +++ b/archivebox/tests/conftest.py @@ -10,55 +10,30 @@ from typing import List, Dict, Any, Optional, Tuple import pytest -# ============================================================================= -# CLI Helpers (defined before fixtures that use them) -# ============================================================================= - def run_archivebox_cmd( args: List[str], data_dir: Path, stdin: Optional[str] = None, timeout: int = 60, - env: Optional[Dict[str, str]] = None, ) -> Tuple[str, str, int]: """ Run archivebox command via subprocess, return (stdout, stderr, returncode). - - Args: - args: Command arguments (e.g., ['crawl', 'create', 'https://example.com']) - data_dir: The DATA_DIR to use - stdin: Optional string to pipe to stdin - timeout: Command timeout in seconds - env: Additional environment variables - - Returns: - Tuple of (stdout, stderr, returncode) """ cmd = [sys.executable, '-m', 'archivebox'] + args - base_env = os.environ.copy() - base_env['DATA_DIR'] = str(data_dir) - base_env['USE_COLOR'] = 'False' - base_env['SHOW_PROGRESS'] = 'False' - # Disable slow extractors for faster tests - base_env['SAVE_ARCHIVEDOTORG'] = 'False' - base_env['SAVE_TITLE'] = 'False' - base_env['SAVE_FAVICON'] = 'False' - base_env['SAVE_WGET'] = 'False' - base_env['SAVE_WARC'] = 'False' - base_env['SAVE_PDF'] = 'False' - base_env['SAVE_SCREENSHOT'] = 'False' - base_env['SAVE_DOM'] = 'False' - base_env['SAVE_SINGLEFILE'] = 'False' - base_env['SAVE_READABILITY'] = 'False' - base_env['SAVE_MERCURY'] = 'False' - base_env['SAVE_GIT'] = 'False' - base_env['SAVE_YTDLP'] = 'False' - base_env['SAVE_HEADERS'] = 'False' - base_env['SAVE_HTMLTOTEXT'] = 'False' - - if env: - base_env.update(env) + env = os.environ.copy() + env['DATA_DIR'] = str(data_dir) + env['USE_COLOR'] = 'False' + env['SHOW_PROGRESS'] = 'False' + # Enable only HEADERS extractor (pure Python, no Chrome) - disable all others + env['SAVE_HEADERS'] = 'True' + for extractor in ['TITLE', 'FAVICON', 'WGET', 'WARC', 'PDF', 'SCREENSHOT', + 'DOM', 'SINGLEFILE', 'READABILITY', 'MERCURY', 'GIT', + 'YTDLP', 'HTMLTOTEXT', 'ARCHIVEDOTORG']: + env[f'SAVE_{extractor}'] = 'False' + # Speed up network operations + env['TIMEOUT'] = '5' + env['CHECK_SSL_VALIDITY'] = 'False' result = subprocess.run( cmd, @@ -66,50 +41,29 @@ def run_archivebox_cmd( capture_output=True, text=True, cwd=data_dir, - env=base_env, + env=env, timeout=timeout, ) - return result.stdout, result.stderr, result.returncode -# ============================================================================= -# Fixtures -# ============================================================================= - -@pytest.fixture -def isolated_data_dir(tmp_path): +@pytest.fixture(scope="module") +def shared_archive(tmp_path_factory): """ - Create isolated DATA_DIR for each test. - - Uses tmp_path for complete isolation. - """ - data_dir = tmp_path / 'archivebox_data' - data_dir.mkdir() - return data_dir - - -@pytest.fixture -def initialized_archive(isolated_data_dir): - """ - Initialize ArchiveBox archive in isolated directory. - - Runs `archivebox init` via subprocess to set up database and directories. + Module-scoped archive - init runs ONCE per test file. + Much faster than per-test initialization. """ + data_dir = tmp_path_factory.mktemp("archivebox_data") stdout, stderr, returncode = run_archivebox_cmd( ['init', '--quick'], - data_dir=isolated_data_dir, + data_dir=data_dir, timeout=60, ) assert returncode == 0, f"archivebox init failed: {stderr}" - return isolated_data_dir + return data_dir -# ============================================================================= -# Output Assertions -# ============================================================================= - -def parse_jsonl_output(stdout: str) -> List[Dict[str, Any]]: +def parse_jsonl(stdout: str) -> List[Dict[str, Any]]: """Parse JSONL output into list of dicts.""" records = [] for line in stdout.strip().split('\n'): @@ -122,64 +76,7 @@ def parse_jsonl_output(stdout: str) -> List[Dict[str, Any]]: return records -def assert_jsonl_contains_type(stdout: str, record_type: str, min_count: int = 1): - """Assert output contains at least min_count records of type.""" - records = parse_jsonl_output(stdout) - matching = [r for r in records if r.get('type') == record_type] - assert len(matching) >= min_count, \ - f"Expected >= {min_count} {record_type}, got {len(matching)}" - return matching - - -def assert_jsonl_pass_through(stdout: str, input_records: List[Dict[str, Any]]): - """Assert that input records appear in output (pass-through behavior).""" - output_records = parse_jsonl_output(stdout) - output_ids = {r.get('id') for r in output_records if r.get('id')} - - for input_rec in input_records: - input_id = input_rec.get('id') - if input_id: - assert input_id in output_ids, \ - f"Input record {input_id} not found in output (pass-through failed)" - - -def assert_record_has_fields(record: Dict[str, Any], required_fields: List[str]): - """Assert record has all required fields with non-None values.""" - for field in required_fields: - assert field in record, f"Record missing field: {field}" - assert record[field] is not None, f"Record field is None: {field}" - - -# ============================================================================= -# Test Data Factories -# ============================================================================= - -def create_test_url(domain: str = 'example.com', path: str = None) -> str: - """Generate unique test URL.""" +def create_url(suffix: str = "") -> str: + """Generate test URL.""" import uuid - path = path or uuid.uuid4().hex[:8] - return f'https://{domain}/{path}' - - -def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]: - """Create Crawl JSONL record for testing.""" - urls = urls or [create_test_url()] - return { - 'type': 'Crawl', - 'urls': '\n'.join(urls), - 'max_depth': kwargs.get('max_depth', 0), - 'tags_str': kwargs.get('tags_str', ''), - 'status': kwargs.get('status', 'queued'), - **{k: v for k, v in kwargs.items() if k not in ('max_depth', 'tags_str', 'status')}, - } - - -def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]: - """Create Snapshot JSONL record for testing.""" - return { - 'type': 'Snapshot', - 'url': url or create_test_url(), - 'tags_str': kwargs.get('tags_str', ''), - 'status': kwargs.get('status', 'queued'), - **{k: v for k, v in kwargs.items() if k not in ('tags_str', 'status')}, - } + return f'https://example.com/{suffix or uuid.uuid4().hex[:8]}' diff --git a/archivebox/tests/test_cli_archiveresult.py b/archivebox/tests/test_cli_archiveresult.py deleted file mode 100644 index de016010..00000000 --- a/archivebox/tests/test_cli_archiveresult.py +++ /dev/null @@ -1,264 +0,0 @@ -""" -Tests for archivebox archiveresult CLI command. - -Tests cover: -- archiveresult create (from Snapshot JSONL, with --plugin, pass-through) -- archiveresult list (with filters) -- archiveresult update -- archiveresult delete -""" - -import json -import pytest - -from archivebox.tests.conftest import ( - run_archivebox_cmd, - parse_jsonl_output, - create_test_url, -) - - -class TestArchiveResultCreate: - """Tests for `archivebox archiveresult create`.""" - - def test_create_from_snapshot_jsonl(self, initialized_archive): - """Create archive results from Snapshot JSONL input.""" - url = create_test_url() - - # Create a snapshot first - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - - # Pipe snapshot to archiveresult create - stdout2, stderr, code = run_archivebox_cmd( - ['archiveresult', 'create', '--plugin=title'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - - assert code == 0, f"Command failed: {stderr}" - - records = parse_jsonl_output(stdout2) - # Should have the Snapshot passed through and ArchiveResult created - types = [r.get('type') for r in records] - assert 'Snapshot' in types - assert 'ArchiveResult' in types - - ar = next(r for r in records if r['type'] == 'ArchiveResult') - assert ar['plugin'] == 'title' - - def test_create_with_specific_plugin(self, initialized_archive): - """Create archive result for specific plugin.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - - stdout2, stderr, code = run_archivebox_cmd( - ['archiveresult', 'create', '--plugin=screenshot'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout2) - ar_records = [r for r in records if r.get('type') == 'ArchiveResult'] - assert len(ar_records) >= 1 - assert ar_records[0]['plugin'] == 'screenshot' - - def test_create_pass_through_crawl(self, initialized_archive): - """Pass-through Crawl records unchanged.""" - url = create_test_url() - - # Create crawl and snapshot - stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive) - crawl = parse_jsonl_output(stdout1)[0] - - stdout2, _, _ = run_archivebox_cmd( - ['snapshot', 'create'], - stdin=json.dumps(crawl), - data_dir=initialized_archive, - ) - - # Now pipe all to archiveresult create - stdout3, stderr, code = run_archivebox_cmd( - ['archiveresult', 'create', '--plugin=title'], - stdin=stdout2, - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout3) - - types = [r.get('type') for r in records] - assert 'Crawl' in types - assert 'Snapshot' in types - assert 'ArchiveResult' in types - - def test_create_pass_through_only_when_no_snapshots(self, initialized_archive): - """Only pass-through records but no new snapshots returns success.""" - crawl_record = {'type': 'Crawl', 'id': 'fake-id', 'urls': 'https://example.com'} - - stdout, stderr, code = run_archivebox_cmd( - ['archiveresult', 'create'], - stdin=json.dumps(crawl_record), - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Passed through' in stderr - - -class TestArchiveResultList: - """Tests for `archivebox archiveresult list`.""" - - def test_list_empty(self, initialized_archive): - """List with no archive results returns empty.""" - stdout, stderr, code = run_archivebox_cmd( - ['archiveresult', 'list'], - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Listed 0 archive results' in stderr - - def test_list_filter_by_status(self, initialized_archive): - """Filter archive results by status.""" - # Create snapshot and archive result - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - run_archivebox_cmd( - ['archiveresult', 'create', '--plugin=title'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - - stdout, stderr, code = run_archivebox_cmd( - ['archiveresult', 'list', '--status=queued'], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - for r in records: - assert r['status'] == 'queued' - - def test_list_filter_by_plugin(self, initialized_archive): - """Filter archive results by plugin.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - run_archivebox_cmd( - ['archiveresult', 'create', '--plugin=title'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - - stdout, stderr, code = run_archivebox_cmd( - ['archiveresult', 'list', '--plugin=title'], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - for r in records: - assert r['plugin'] == 'title' - - def test_list_with_limit(self, initialized_archive): - """Limit number of results.""" - # Create multiple archive results - for _ in range(3): - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - run_archivebox_cmd( - ['archiveresult', 'create', '--plugin=title'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - - stdout, stderr, code = run_archivebox_cmd( - ['archiveresult', 'list', '--limit=2'], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - assert len(records) == 2 - - -class TestArchiveResultUpdate: - """Tests for `archivebox archiveresult update`.""" - - def test_update_status(self, initialized_archive): - """Update archive result status.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - - stdout2, _, _ = run_archivebox_cmd( - ['archiveresult', 'create', '--plugin=title'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult') - - stdout3, stderr, code = run_archivebox_cmd( - ['archiveresult', 'update', '--status=failed'], - stdin=json.dumps(ar), - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Updated 1 archive results' in stderr - - records = parse_jsonl_output(stdout3) - assert records[0]['status'] == 'failed' - - -class TestArchiveResultDelete: - """Tests for `archivebox archiveresult delete`.""" - - def test_delete_requires_yes(self, initialized_archive): - """Delete requires --yes flag.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - - stdout2, _, _ = run_archivebox_cmd( - ['archiveresult', 'create', '--plugin=title'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult') - - stdout, stderr, code = run_archivebox_cmd( - ['archiveresult', 'delete'], - stdin=json.dumps(ar), - data_dir=initialized_archive, - ) - - assert code == 1 - assert '--yes' in stderr - - def test_delete_with_yes(self, initialized_archive): - """Delete with --yes flag works.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - - stdout2, _, _ = run_archivebox_cmd( - ['archiveresult', 'create', '--plugin=title'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult') - - stdout, stderr, code = run_archivebox_cmd( - ['archiveresult', 'delete', '--yes'], - stdin=json.dumps(ar), - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Deleted 1 archive results' in stderr diff --git a/archivebox/tests/test_cli_crawl.py b/archivebox/tests/test_cli_crawl.py deleted file mode 100644 index 891f4114..00000000 --- a/archivebox/tests/test_cli_crawl.py +++ /dev/null @@ -1,261 +0,0 @@ -""" -Tests for archivebox crawl CLI command. - -Tests cover: -- crawl create (with URLs, from stdin, pass-through) -- crawl list (with filters) -- crawl update -- crawl delete -""" - -import json -import pytest - -from archivebox.tests.conftest import ( - run_archivebox_cmd, - parse_jsonl_output, - assert_jsonl_contains_type, - create_test_url, - create_test_crawl_json, -) - - -class TestCrawlCreate: - """Tests for `archivebox crawl create`.""" - - def test_create_from_url_args(self, initialized_archive): - """Create crawl from URL arguments.""" - url = create_test_url() - - stdout, stderr, code = run_archivebox_cmd( - ['crawl', 'create', url], - data_dir=initialized_archive, - ) - - assert code == 0, f"Command failed: {stderr}" - assert 'Created crawl' in stderr - - # Check JSONL output - records = parse_jsonl_output(stdout) - assert len(records) == 1 - assert records[0]['type'] == 'Crawl' - assert url in records[0]['urls'] - - def test_create_from_stdin_urls(self, initialized_archive): - """Create crawl from stdin URLs (one per line).""" - urls = [create_test_url() for _ in range(3)] - stdin = '\n'.join(urls) - - stdout, stderr, code = run_archivebox_cmd( - ['crawl', 'create'], - stdin=stdin, - data_dir=initialized_archive, - ) - - assert code == 0, f"Command failed: {stderr}" - - records = parse_jsonl_output(stdout) - assert len(records) == 1 - crawl = records[0] - assert crawl['type'] == 'Crawl' - # All URLs should be in the crawl - for url in urls: - assert url in crawl['urls'] - - def test_create_with_depth(self, initialized_archive): - """Create crawl with --depth flag.""" - url = create_test_url() - - stdout, stderr, code = run_archivebox_cmd( - ['crawl', 'create', '--depth=2', url], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - assert records[0]['max_depth'] == 2 - - def test_create_with_tag(self, initialized_archive): - """Create crawl with --tag flag.""" - url = create_test_url() - - stdout, stderr, code = run_archivebox_cmd( - ['crawl', 'create', '--tag=test-tag', url], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - assert 'test-tag' in records[0].get('tags_str', '') - - def test_create_pass_through_other_types(self, initialized_archive): - """Pass-through records of other types unchanged.""" - tag_record = {'type': 'Tag', 'id': 'fake-tag-id', 'name': 'test'} - url = create_test_url() - stdin = json.dumps(tag_record) + '\n' + json.dumps({'url': url}) - - stdout, stderr, code = run_archivebox_cmd( - ['crawl', 'create'], - stdin=stdin, - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - - # Should have both the passed-through Tag and the new Crawl - types = [r.get('type') for r in records] - assert 'Tag' in types - assert 'Crawl' in types - - def test_create_pass_through_existing_crawl(self, initialized_archive): - """Existing Crawl records (with id) are passed through.""" - # First create a crawl - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive) - crawl = parse_jsonl_output(stdout1)[0] - - # Now pipe it back - should pass through - stdout2, stderr, code = run_archivebox_cmd( - ['crawl', 'create'], - stdin=json.dumps(crawl), - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout2) - assert len(records) == 1 - assert records[0]['id'] == crawl['id'] - - -class TestCrawlList: - """Tests for `archivebox crawl list`.""" - - def test_list_empty(self, initialized_archive): - """List with no crawls returns empty.""" - stdout, stderr, code = run_archivebox_cmd( - ['crawl', 'list'], - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Listed 0 crawls' in stderr - - def test_list_returns_created(self, initialized_archive): - """List returns previously created crawls.""" - url = create_test_url() - run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive) - - stdout, stderr, code = run_archivebox_cmd( - ['crawl', 'list'], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - assert len(records) >= 1 - assert any(url in r.get('urls', '') for r in records) - - def test_list_filter_by_status(self, initialized_archive): - """Filter crawls by status.""" - url = create_test_url() - run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive) - - stdout, stderr, code = run_archivebox_cmd( - ['crawl', 'list', '--status=queued'], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - for r in records: - assert r['status'] == 'queued' - - def test_list_with_limit(self, initialized_archive): - """Limit number of results.""" - # Create multiple crawls - for _ in range(3): - run_archivebox_cmd(['crawl', 'create', create_test_url()], data_dir=initialized_archive) - - stdout, stderr, code = run_archivebox_cmd( - ['crawl', 'list', '--limit=2'], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - assert len(records) == 2 - - -class TestCrawlUpdate: - """Tests for `archivebox crawl update`.""" - - def test_update_status(self, initialized_archive): - """Update crawl status.""" - # Create a crawl - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive) - crawl = parse_jsonl_output(stdout1)[0] - - # Update it - stdout2, stderr, code = run_archivebox_cmd( - ['crawl', 'update', '--status=started'], - stdin=json.dumps(crawl), - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Updated 1 crawls' in stderr - - records = parse_jsonl_output(stdout2) - assert records[0]['status'] == 'started' - - -class TestCrawlDelete: - """Tests for `archivebox crawl delete`.""" - - def test_delete_requires_yes(self, initialized_archive): - """Delete requires --yes flag.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive) - crawl = parse_jsonl_output(stdout1)[0] - - stdout, stderr, code = run_archivebox_cmd( - ['crawl', 'delete'], - stdin=json.dumps(crawl), - data_dir=initialized_archive, - ) - - assert code == 1 - assert '--yes' in stderr - - def test_delete_with_yes(self, initialized_archive): - """Delete with --yes flag works.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive) - crawl = parse_jsonl_output(stdout1)[0] - - stdout, stderr, code = run_archivebox_cmd( - ['crawl', 'delete', '--yes'], - stdin=json.dumps(crawl), - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Deleted 1 crawls' in stderr - - def test_delete_dry_run(self, initialized_archive): - """Dry run shows what would be deleted.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive) - crawl = parse_jsonl_output(stdout1)[0] - - stdout, stderr, code = run_archivebox_cmd( - ['crawl', 'delete', '--dry-run'], - stdin=json.dumps(crawl), - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Would delete' in stderr - assert 'dry run' in stderr.lower() diff --git a/archivebox/tests/test_cli_piping.py b/archivebox/tests/test_cli_piping.py new file mode 100644 index 00000000..374711ed --- /dev/null +++ b/archivebox/tests/test_cli_piping.py @@ -0,0 +1,270 @@ +""" +Tests for CLI JSONL piping with real URL archiving. + +Tests the REAL piping workflows users will run: +- archivebox crawl create URL | archivebox run +- archivebox snapshot create URL | archivebox run +- archivebox archiveresult list --status=failed | archivebox run +- Pass-through behavior (accumulating records through pipeline) + +Uses module-scoped fixture for speed - init runs ONCE per test file. +Uses inline mode in orchestrator for fast processing (no subprocess overhead). +""" + +import json +import pytest + +from archivebox.tests.conftest import run_archivebox_cmd, parse_jsonl, create_url + + +class TestCrawlPipeline: + """Test: archivebox crawl create URL | archivebox run""" + + def test_crawl_create_outputs_jsonl(self, shared_archive): + """crawl create outputs Crawl JSONL to stdout.""" + url = create_url("crawl-test") + stdout, stderr, code = run_archivebox_cmd( + ['crawl', 'create', url], + data_dir=shared_archive, + ) + assert code == 0, stderr + records = parse_jsonl(stdout) + assert len(records) == 1 + assert records[0]['type'] == 'Crawl' + assert url in records[0]['urls'] + + def test_crawl_pipe_to_run(self, shared_archive): + """crawl create | run - processes with inline mode (fast).""" + url = create_url("pipe-to-run") + + # Create crawl + stdout1, _, code1 = run_archivebox_cmd( + ['crawl', 'create', url], + data_dir=shared_archive, + ) + assert code1 == 0 + + # Pipe to run (uses inline mode for fast processing) + stdout2, stderr2, code2 = run_archivebox_cmd( + ['run'], + stdin=stdout1, + data_dir=shared_archive, + timeout=30, + ) + assert code2 == 0, stderr2 + + # run outputs processed records + records = parse_jsonl(stdout2) + assert len(records) >= 1 + assert any(r.get('type') == 'Crawl' for r in records) + + +class TestSnapshotPipeline: + """Test: archivebox snapshot create URL | archivebox run""" + + def test_snapshot_from_crawl(self, shared_archive): + """snapshot create accepts Crawl JSONL and creates Snapshots.""" + url = create_url("snap-from-crawl") + + # Create crawl + stdout1, _, _ = run_archivebox_cmd( + ['crawl', 'create', url], + data_dir=shared_archive, + ) + + # Pipe to snapshot create + stdout2, stderr, code = run_archivebox_cmd( + ['snapshot', 'create'], + stdin=stdout1, + data_dir=shared_archive, + ) + assert code == 0, stderr + + records = parse_jsonl(stdout2) + types = {r['type'] for r in records} + + # Should have Crawl (passed through) and Snapshot (created) + assert 'Crawl' in types + assert 'Snapshot' in types + + def test_snapshot_pipe_to_run(self, shared_archive): + """snapshot create | run - processes with inline mode.""" + url = create_url("snap-to-run") + + # Create snapshot + stdout1, _, code1 = run_archivebox_cmd( + ['snapshot', 'create', url], + data_dir=shared_archive, + ) + assert code1 == 0 + + # Pipe to run + stdout2, stderr2, code2 = run_archivebox_cmd( + ['run'], + stdin=stdout1, + data_dir=shared_archive, + timeout=30, + ) + assert code2 == 0, stderr2 + + records = parse_jsonl(stdout2) + snapshots = [r for r in records if r.get('type') == 'Snapshot'] + assert len(snapshots) >= 1 + + +class TestArchiveResultPipeline: + """Test: snapshot | archiveresult create --plugin=X | run""" + + def test_archiveresult_from_snapshot(self, shared_archive): + """archiveresult create accepts Snapshot JSONL.""" + url = create_url("ar-from-snap") + + # Create snapshot + stdout1, _, _ = run_archivebox_cmd( + ['snapshot', 'create', url], + data_dir=shared_archive, + ) + + # Pipe to archiveresult create + stdout2, stderr, code = run_archivebox_cmd( + ['archiveresult', 'create', '--plugin=headers'], + stdin=stdout1, + data_dir=shared_archive, + ) + assert code == 0, stderr + + records = parse_jsonl(stdout2) + types = {r['type'] for r in records} + + # Should have Snapshot (passed through) and ArchiveResult (created) + assert 'Snapshot' in types + assert 'ArchiveResult' in types + + +class TestFullPipeline: + """Test: crawl create | snapshot create | archiveresult create | run""" + + def test_full_four_stage_pipeline(self, shared_archive): + """Full 4-stage pipeline with real processing.""" + url = create_url("full-pipeline") + + # Stage 1: crawl create + out1, _, code1 = run_archivebox_cmd( + ['crawl', 'create', url], + data_dir=shared_archive, + ) + assert code1 == 0 + + # Stage 2: snapshot create + out2, _, code2 = run_archivebox_cmd( + ['snapshot', 'create'], + stdin=out1, + data_dir=shared_archive, + ) + assert code2 == 0 + + # Stage 3: archiveresult create + out3, _, code3 = run_archivebox_cmd( + ['archiveresult', 'create', '--plugin=headers'], + stdin=out2, + data_dir=shared_archive, + ) + assert code3 == 0 + + # Stage 4: run (inline mode) + out4, stderr, code4 = run_archivebox_cmd( + ['run'], + stdin=out3, + data_dir=shared_archive, + timeout=30, + ) + assert code4 == 0, stderr + + # Final output should have records + records = parse_jsonl(out4) + types = {r['type'] for r in records} + assert len(types) >= 1 + + +class TestPassThrough: + """Test pass-through behavior - unknown types pass through unchanged.""" + + def test_unknown_type_passes_through(self, shared_archive): + """Records with unknown types pass through all commands.""" + unknown = {'type': 'CustomType', 'id': 'test-123', 'data': 'preserved'} + url = create_url("passthrough") + + stdin = json.dumps(unknown) + '\n' + url + + stdout, _, code = run_archivebox_cmd( + ['crawl', 'create'], + stdin=stdin, + data_dir=shared_archive, + ) + assert code == 0 + + records = parse_jsonl(stdout) + types = {r['type'] for r in records} + + # CustomType should be passed through + assert 'CustomType' in types + assert 'Crawl' in types + + # Data should be preserved + custom = next(r for r in records if r['type'] == 'CustomType') + assert custom['data'] == 'preserved' + + +class TestListCommands: + """Test list commands with filters.""" + + def test_crawl_list_filter_status(self, shared_archive): + """crawl list --status=queued filters correctly.""" + # Create a crawl first + url = create_url("list-filter") + run_archivebox_cmd(['crawl', 'create', url], data_dir=shared_archive) + + # List with filter + stdout, _, code = run_archivebox_cmd( + ['crawl', 'list', '--status=queued'], + data_dir=shared_archive, + ) + assert code == 0 + + records = parse_jsonl(stdout) + for r in records: + assert r['status'] == 'queued' + + def test_snapshot_list_outputs_jsonl(self, shared_archive): + """snapshot list outputs JSONL for piping.""" + stdout, _, code = run_archivebox_cmd( + ['snapshot', 'list'], + data_dir=shared_archive, + ) + assert code == 0 + # Output is valid JSONL (even if empty) + records = parse_jsonl(stdout) + for r in records: + assert r['type'] == 'Snapshot' + + +class TestRunBehavior: + """Test run's create-or-update behavior.""" + + def test_run_creates_from_url_record(self, shared_archive): + """run creates Snapshot from URL record without id.""" + url = create_url("run-create") + record = {'url': url, 'type': 'Snapshot'} + + stdout, stderr, code = run_archivebox_cmd( + ['run'], + stdin=json.dumps(record), + data_dir=shared_archive, + timeout=30, + ) + assert code == 0, stderr + + records = parse_jsonl(stdout) + snapshots = [r for r in records if r.get('type') == 'Snapshot'] + assert len(snapshots) >= 1 + assert snapshots[0].get('id') # Should have ID after creation diff --git a/archivebox/tests/test_cli_run.py b/archivebox/tests/test_cli_run.py deleted file mode 100644 index 88878d1c..00000000 --- a/archivebox/tests/test_cli_run.py +++ /dev/null @@ -1,254 +0,0 @@ -""" -Tests for archivebox run CLI command. - -Tests cover: -- run with stdin JSONL (Crawl, Snapshot, ArchiveResult) -- create-or-update behavior (records with/without id) -- pass-through output (for chaining) -""" - -import json -import pytest - -from archivebox.tests.conftest import ( - run_archivebox_cmd, - parse_jsonl_output, - create_test_url, - create_test_crawl_json, - create_test_snapshot_json, -) - - -class TestRunWithCrawl: - """Tests for `archivebox run` with Crawl input.""" - - def test_run_with_new_crawl(self, initialized_archive): - """Run creates and processes a new Crawl (no id).""" - crawl_record = create_test_crawl_json() - - stdout, stderr, code = run_archivebox_cmd( - ['run'], - stdin=json.dumps(crawl_record), - data_dir=initialized_archive, - timeout=120, - ) - - assert code == 0, f"Command failed: {stderr}" - - # Should output the created Crawl - records = parse_jsonl_output(stdout) - crawl_records = [r for r in records if r.get('type') == 'Crawl'] - assert len(crawl_records) >= 1 - assert crawl_records[0].get('id') # Should have an id now - - def test_run_with_existing_crawl(self, initialized_archive): - """Run re-queues an existing Crawl (with id).""" - url = create_test_url() - - # First create a crawl - stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive) - crawl = parse_jsonl_output(stdout1)[0] - - # Run with the existing crawl - stdout2, stderr, code = run_archivebox_cmd( - ['run'], - stdin=json.dumps(crawl), - data_dir=initialized_archive, - timeout=120, - ) - - assert code == 0 - records = parse_jsonl_output(stdout2) - assert len(records) >= 1 - - -class TestRunWithSnapshot: - """Tests for `archivebox run` with Snapshot input.""" - - def test_run_with_new_snapshot(self, initialized_archive): - """Run creates and processes a new Snapshot (no id, just url).""" - snapshot_record = create_test_snapshot_json() - - stdout, stderr, code = run_archivebox_cmd( - ['run'], - stdin=json.dumps(snapshot_record), - data_dir=initialized_archive, - timeout=120, - ) - - assert code == 0, f"Command failed: {stderr}" - - records = parse_jsonl_output(stdout) - snapshot_records = [r for r in records if r.get('type') == 'Snapshot'] - assert len(snapshot_records) >= 1 - assert snapshot_records[0].get('id') - - def test_run_with_existing_snapshot(self, initialized_archive): - """Run re-queues an existing Snapshot (with id).""" - url = create_test_url() - - # First create a snapshot - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - - # Run with the existing snapshot - stdout2, stderr, code = run_archivebox_cmd( - ['run'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - timeout=120, - ) - - assert code == 0 - records = parse_jsonl_output(stdout2) - assert len(records) >= 1 - - def test_run_with_plain_url(self, initialized_archive): - """Run accepts plain URL records (no type field).""" - url = create_test_url() - url_record = {'url': url} - - stdout, stderr, code = run_archivebox_cmd( - ['run'], - stdin=json.dumps(url_record), - data_dir=initialized_archive, - timeout=120, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - assert len(records) >= 1 - - -class TestRunWithArchiveResult: - """Tests for `archivebox run` with ArchiveResult input.""" - - def test_run_requeues_failed_archiveresult(self, initialized_archive): - """Run re-queues a failed ArchiveResult.""" - url = create_test_url() - - # Create snapshot and archive result - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - - stdout2, _, _ = run_archivebox_cmd( - ['archiveresult', 'create', '--plugin=title'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult') - - # Update to failed - ar['status'] = 'failed' - run_archivebox_cmd( - ['archiveresult', 'update', '--status=failed'], - stdin=json.dumps(ar), - data_dir=initialized_archive, - ) - - # Now run should re-queue it - stdout3, stderr, code = run_archivebox_cmd( - ['run'], - stdin=json.dumps(ar), - data_dir=initialized_archive, - timeout=120, - ) - - assert code == 0 - records = parse_jsonl_output(stdout3) - ar_records = [r for r in records if r.get('type') == 'ArchiveResult'] - assert len(ar_records) >= 1 - - -class TestRunPassThrough: - """Tests for pass-through behavior in `archivebox run`.""" - - def test_run_passes_through_unknown_types(self, initialized_archive): - """Run passes through records with unknown types.""" - unknown_record = {'type': 'Unknown', 'id': 'fake-id', 'data': 'test'} - - stdout, stderr, code = run_archivebox_cmd( - ['run'], - stdin=json.dumps(unknown_record), - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - unknown_records = [r for r in records if r.get('type') == 'Unknown'] - assert len(unknown_records) == 1 - assert unknown_records[0]['data'] == 'test' - - def test_run_outputs_all_processed_records(self, initialized_archive): - """Run outputs all processed records for chaining.""" - url = create_test_url() - crawl_record = create_test_crawl_json(urls=[url]) - - stdout, stderr, code = run_archivebox_cmd( - ['run'], - stdin=json.dumps(crawl_record), - data_dir=initialized_archive, - timeout=120, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - # Should have at least the Crawl in output - assert len(records) >= 1 - - -class TestRunMixedInput: - """Tests for `archivebox run` with mixed record types.""" - - def test_run_handles_mixed_types(self, initialized_archive): - """Run handles mixed Crawl/Snapshot/ArchiveResult input.""" - crawl = create_test_crawl_json() - snapshot = create_test_snapshot_json() - unknown = {'type': 'Tag', 'id': 'fake', 'name': 'test'} - - stdin = '\n'.join([ - json.dumps(crawl), - json.dumps(snapshot), - json.dumps(unknown), - ]) - - stdout, stderr, code = run_archivebox_cmd( - ['run'], - stdin=stdin, - data_dir=initialized_archive, - timeout=120, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - - types = set(r.get('type') for r in records) - # Should have processed Crawl and Snapshot, passed through Tag - assert 'Crawl' in types or 'Snapshot' in types or 'Tag' in types - - -class TestRunEmpty: - """Tests for `archivebox run` edge cases.""" - - def test_run_empty_stdin(self, initialized_archive): - """Run with empty stdin returns success.""" - stdout, stderr, code = run_archivebox_cmd( - ['run'], - stdin='', - data_dir=initialized_archive, - ) - - assert code == 0 - - def test_run_no_records_to_process(self, initialized_archive): - """Run with only pass-through records shows message.""" - unknown = {'type': 'Unknown', 'id': 'fake'} - - stdout, stderr, code = run_archivebox_cmd( - ['run'], - stdin=json.dumps(unknown), - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'No records to process' in stderr diff --git a/archivebox/tests/test_cli_snapshot.py b/archivebox/tests/test_cli_snapshot.py deleted file mode 100644 index 24f35bf7..00000000 --- a/archivebox/tests/test_cli_snapshot.py +++ /dev/null @@ -1,274 +0,0 @@ -""" -Tests for archivebox snapshot CLI command. - -Tests cover: -- snapshot create (from URLs, from Crawl JSONL, pass-through) -- snapshot list (with filters) -- snapshot update -- snapshot delete -""" - -import json -import pytest - -from archivebox.tests.conftest import ( - run_archivebox_cmd, - parse_jsonl_output, - assert_jsonl_contains_type, - create_test_url, -) - - -class TestSnapshotCreate: - """Tests for `archivebox snapshot create`.""" - - def test_create_from_url_args(self, initialized_archive): - """Create snapshot from URL arguments.""" - url = create_test_url() - - stdout, stderr, code = run_archivebox_cmd( - ['snapshot', 'create', url], - data_dir=initialized_archive, - ) - - assert code == 0, f"Command failed: {stderr}" - assert 'Created' in stderr - - records = parse_jsonl_output(stdout) - assert len(records) == 1 - assert records[0]['type'] == 'Snapshot' - assert records[0]['url'] == url - - def test_create_from_crawl_jsonl(self, initialized_archive): - """Create snapshots from Crawl JSONL input.""" - url = create_test_url() - - # First create a crawl - stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive) - crawl = parse_jsonl_output(stdout1)[0] - - # Pipe crawl to snapshot create - stdout2, stderr, code = run_archivebox_cmd( - ['snapshot', 'create'], - stdin=json.dumps(crawl), - data_dir=initialized_archive, - ) - - assert code == 0, f"Command failed: {stderr}" - - records = parse_jsonl_output(stdout2) - # Should have the Crawl passed through and the Snapshot created - types = [r.get('type') for r in records] - assert 'Crawl' in types - assert 'Snapshot' in types - - snapshot = next(r for r in records if r['type'] == 'Snapshot') - assert snapshot['url'] == url - - def test_create_with_tag(self, initialized_archive): - """Create snapshot with --tag flag.""" - url = create_test_url() - - stdout, stderr, code = run_archivebox_cmd( - ['snapshot', 'create', '--tag=test-tag', url], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - assert 'test-tag' in records[0].get('tags_str', '') - - def test_create_pass_through_other_types(self, initialized_archive): - """Pass-through records of other types unchanged.""" - tag_record = {'type': 'Tag', 'id': 'fake-tag-id', 'name': 'test'} - url = create_test_url() - stdin = json.dumps(tag_record) + '\n' + json.dumps({'url': url}) - - stdout, stderr, code = run_archivebox_cmd( - ['snapshot', 'create'], - stdin=stdin, - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - - types = [r.get('type') for r in records] - assert 'Tag' in types - assert 'Snapshot' in types - - def test_create_multiple_urls(self, initialized_archive): - """Create snapshots from multiple URLs.""" - urls = [create_test_url() for _ in range(3)] - - stdout, stderr, code = run_archivebox_cmd( - ['snapshot', 'create'] + urls, - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - assert len(records) == 3 - - created_urls = {r['url'] for r in records} - for url in urls: - assert url in created_urls - - -class TestSnapshotList: - """Tests for `archivebox snapshot list`.""" - - def test_list_empty(self, initialized_archive): - """List with no snapshots returns empty.""" - stdout, stderr, code = run_archivebox_cmd( - ['snapshot', 'list'], - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Listed 0 snapshots' in stderr - - def test_list_returns_created(self, initialized_archive): - """List returns previously created snapshots.""" - url = create_test_url() - run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - - stdout, stderr, code = run_archivebox_cmd( - ['snapshot', 'list'], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - assert len(records) >= 1 - assert any(r.get('url') == url for r in records) - - def test_list_filter_by_status(self, initialized_archive): - """Filter snapshots by status.""" - url = create_test_url() - run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - - stdout, stderr, code = run_archivebox_cmd( - ['snapshot', 'list', '--status=queued'], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - for r in records: - assert r['status'] == 'queued' - - def test_list_filter_by_url_contains(self, initialized_archive): - """Filter snapshots by URL contains.""" - url = create_test_url(domain='unique-domain-12345.com') - run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - - stdout, stderr, code = run_archivebox_cmd( - ['snapshot', 'list', '--url__icontains=unique-domain-12345'], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - assert len(records) == 1 - assert 'unique-domain-12345' in records[0]['url'] - - def test_list_with_limit(self, initialized_archive): - """Limit number of results.""" - for _ in range(3): - run_archivebox_cmd(['snapshot', 'create', create_test_url()], data_dir=initialized_archive) - - stdout, stderr, code = run_archivebox_cmd( - ['snapshot', 'list', '--limit=2'], - data_dir=initialized_archive, - ) - - assert code == 0 - records = parse_jsonl_output(stdout) - assert len(records) == 2 - - -class TestSnapshotUpdate: - """Tests for `archivebox snapshot update`.""" - - def test_update_status(self, initialized_archive): - """Update snapshot status.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - - stdout2, stderr, code = run_archivebox_cmd( - ['snapshot', 'update', '--status=started'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Updated 1 snapshots' in stderr - - records = parse_jsonl_output(stdout2) - assert records[0]['status'] == 'started' - - def test_update_add_tag(self, initialized_archive): - """Update snapshot by adding tag.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - - stdout2, stderr, code = run_archivebox_cmd( - ['snapshot', 'update', '--tag=new-tag'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Updated 1 snapshots' in stderr - - -class TestSnapshotDelete: - """Tests for `archivebox snapshot delete`.""" - - def test_delete_requires_yes(self, initialized_archive): - """Delete requires --yes flag.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - - stdout, stderr, code = run_archivebox_cmd( - ['snapshot', 'delete'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - - assert code == 1 - assert '--yes' in stderr - - def test_delete_with_yes(self, initialized_archive): - """Delete with --yes flag works.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - - stdout, stderr, code = run_archivebox_cmd( - ['snapshot', 'delete', '--yes'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Deleted 1 snapshots' in stderr - - def test_delete_dry_run(self, initialized_archive): - """Dry run shows what would be deleted.""" - url = create_test_url() - stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive) - snapshot = parse_jsonl_output(stdout1)[0] - - stdout, stderr, code = run_archivebox_cmd( - ['snapshot', 'delete', '--dry-run'], - stdin=json.dumps(snapshot), - data_dir=initialized_archive, - ) - - assert code == 0 - assert 'Would delete' in stderr diff --git a/archivebox/tests/test_hooks.py b/archivebox/tests/test_hooks.py old mode 100755 new mode 100644 diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py index 1b1789cb..41c50ece 100644 --- a/archivebox/workers/orchestrator.py +++ b/archivebox/workers/orchestrator.py @@ -57,14 +57,19 @@ def _run_orchestrator_process(exit_on_idle: bool) -> None: class Orchestrator: """ Manages worker processes by polling queues and spawning workers as needed. - + The orchestrator: 1. Polls each model queue (Crawl, Snapshot, ArchiveResult) 2. If items exist and fewer than MAX_CONCURRENT workers are running, spawns workers 3. Monitors worker health and cleans up stale PIDs 4. Exits when all queues are empty (unless daemon mode) + + Inline mode (inline=True): + - Processes items directly in the same process (no subprocess spawn) + - Much faster for small batches (avoids 2-3 sec subprocess overhead per worker) + - Useful for CLI piping and tests """ - + WORKER_TYPES: list[Type[Worker]] = [CrawlWorker, SnapshotWorker, ArchiveResultWorker] # Configuration @@ -72,12 +77,18 @@ class Orchestrator: IDLE_TIMEOUT: int = 3 # Exit after N idle ticks (0 = never exit) MAX_WORKERS_PER_TYPE: int = 8 # Max workers per model type MAX_TOTAL_WORKERS: int = 24 # Max workers across all types - - def __init__(self, exit_on_idle: bool = True): + + def __init__(self, exit_on_idle: bool = True, inline: bool = False): self.exit_on_idle = exit_on_idle + self.inline = inline # Process items directly instead of spawning workers self.pid: int = os.getpid() self.pid_file = None self.idle_count: int = 0 + + # Faster polling in inline mode + if self.inline: + self.POLL_INTERVAL = 0.1 + self.IDLE_TIMEOUT = 2 def __repr__(self) -> str: return f'[underline]Orchestrator[/underline]\\[pid={self.pid}]' @@ -169,13 +180,31 @@ class Orchestrator: ) return None + def process_inline(self, WorkerClass: Type[Worker]) -> int: + """ + Process items inline (same process) instead of spawning workers. + Returns number of items processed. + """ + worker = WorkerClass(worker_id=0) + processed = 0 + + while True: + obj = worker.claim_next() + if obj is None: + break + + worker.process_item(obj) + processed += 1 + + return processed + def check_queues_and_spawn_workers(self) -> dict[str, int]: """ Check all queues and spawn workers as needed. Returns dict of queue sizes by worker type. """ queue_sizes = {} - + for WorkerClass in self.WORKER_TYPES: # Get queue for this worker type # Need to instantiate worker to get queue (for model access) @@ -183,11 +212,17 @@ class Orchestrator: queue = worker.get_queue() queue_count = queue.count() queue_sizes[WorkerClass.name] = queue_count - - # Spawn worker if needed - if self.should_spawn_worker(WorkerClass, queue_count): + + if queue_count == 0: + continue + + if self.inline: + # Process items directly (fast, no subprocess overhead) + self.process_inline(WorkerClass) + elif self.should_spawn_worker(WorkerClass, queue_count): + # Spawn worker subprocess (slow, but parallel) self.spawn_worker(WorkerClass) - + return queue_sizes def has_pending_work(self, queue_sizes: dict[str, int]) -> bool: