Add inline mode to orchestrator for fast CLI piping

This commit is contained in:
Claude 2025-12-31 11:45:35 +00:00
parent b87bbbbecb
commit 50f5cdb5c2
No known key found for this signature in database
9 changed files with 344 additions and 1193 deletions

View File

@ -47,6 +47,8 @@ def process_stdin_records() -> int:
Handles any record type: Crawl, Snapshot, ArchiveResult.
Auto-cascades: Crawl Snapshots ArchiveResults.
Uses inline mode for fast processing (no subprocess overhead).
Returns exit code (0 = success, 1 = error).
"""
from django.utils import timezone
@ -148,8 +150,8 @@ def process_stdin_records() -> int:
rprint(f'[blue]Processing {queued_count} records...[/blue]', file=sys.stderr)
# Run orchestrator until all queued work is done
orchestrator = Orchestrator(exit_on_idle=True)
# Run orchestrator with inline=True for fast processing (no subprocess overhead)
orchestrator = Orchestrator(exit_on_idle=True, inline=True)
orchestrator.runloop()
return 0
@ -193,7 +195,7 @@ def main(daemon: bool):
"""
Process queued work.
When stdin is piped: Process those specific records and exit.
When stdin is piped: Process those specific records and exit (uses inline mode).
When run standalone: Run orchestrator in foreground.
"""
# Check if stdin has data (non-TTY means piped input)

View File

@ -10,55 +10,30 @@ from typing import List, Dict, Any, Optional, Tuple
import pytest
# =============================================================================
# CLI Helpers (defined before fixtures that use them)
# =============================================================================
def run_archivebox_cmd(
args: List[str],
data_dir: Path,
stdin: Optional[str] = None,
timeout: int = 60,
env: Optional[Dict[str, str]] = None,
) -> Tuple[str, str, int]:
"""
Run archivebox command via subprocess, return (stdout, stderr, returncode).
Args:
args: Command arguments (e.g., ['crawl', 'create', 'https://example.com'])
data_dir: The DATA_DIR to use
stdin: Optional string to pipe to stdin
timeout: Command timeout in seconds
env: Additional environment variables
Returns:
Tuple of (stdout, stderr, returncode)
"""
cmd = [sys.executable, '-m', 'archivebox'] + args
base_env = os.environ.copy()
base_env['DATA_DIR'] = str(data_dir)
base_env['USE_COLOR'] = 'False'
base_env['SHOW_PROGRESS'] = 'False'
# Disable slow extractors for faster tests
base_env['SAVE_ARCHIVEDOTORG'] = 'False'
base_env['SAVE_TITLE'] = 'False'
base_env['SAVE_FAVICON'] = 'False'
base_env['SAVE_WGET'] = 'False'
base_env['SAVE_WARC'] = 'False'
base_env['SAVE_PDF'] = 'False'
base_env['SAVE_SCREENSHOT'] = 'False'
base_env['SAVE_DOM'] = 'False'
base_env['SAVE_SINGLEFILE'] = 'False'
base_env['SAVE_READABILITY'] = 'False'
base_env['SAVE_MERCURY'] = 'False'
base_env['SAVE_GIT'] = 'False'
base_env['SAVE_YTDLP'] = 'False'
base_env['SAVE_HEADERS'] = 'False'
base_env['SAVE_HTMLTOTEXT'] = 'False'
if env:
base_env.update(env)
env = os.environ.copy()
env['DATA_DIR'] = str(data_dir)
env['USE_COLOR'] = 'False'
env['SHOW_PROGRESS'] = 'False'
# Enable only HEADERS extractor (pure Python, no Chrome) - disable all others
env['SAVE_HEADERS'] = 'True'
for extractor in ['TITLE', 'FAVICON', 'WGET', 'WARC', 'PDF', 'SCREENSHOT',
'DOM', 'SINGLEFILE', 'READABILITY', 'MERCURY', 'GIT',
'YTDLP', 'HTMLTOTEXT', 'ARCHIVEDOTORG']:
env[f'SAVE_{extractor}'] = 'False'
# Speed up network operations
env['TIMEOUT'] = '5'
env['CHECK_SSL_VALIDITY'] = 'False'
result = subprocess.run(
cmd,
@ -66,50 +41,29 @@ def run_archivebox_cmd(
capture_output=True,
text=True,
cwd=data_dir,
env=base_env,
env=env,
timeout=timeout,
)
return result.stdout, result.stderr, result.returncode
# =============================================================================
# Fixtures
# =============================================================================
@pytest.fixture
def isolated_data_dir(tmp_path):
@pytest.fixture(scope="module")
def shared_archive(tmp_path_factory):
"""
Create isolated DATA_DIR for each test.
Uses tmp_path for complete isolation.
"""
data_dir = tmp_path / 'archivebox_data'
data_dir.mkdir()
return data_dir
@pytest.fixture
def initialized_archive(isolated_data_dir):
"""
Initialize ArchiveBox archive in isolated directory.
Runs `archivebox init` via subprocess to set up database and directories.
Module-scoped archive - init runs ONCE per test file.
Much faster than per-test initialization.
"""
data_dir = tmp_path_factory.mktemp("archivebox_data")
stdout, stderr, returncode = run_archivebox_cmd(
['init', '--quick'],
data_dir=isolated_data_dir,
data_dir=data_dir,
timeout=60,
)
assert returncode == 0, f"archivebox init failed: {stderr}"
return isolated_data_dir
return data_dir
# =============================================================================
# Output Assertions
# =============================================================================
def parse_jsonl_output(stdout: str) -> List[Dict[str, Any]]:
def parse_jsonl(stdout: str) -> List[Dict[str, Any]]:
"""Parse JSONL output into list of dicts."""
records = []
for line in stdout.strip().split('\n'):
@ -122,64 +76,7 @@ def parse_jsonl_output(stdout: str) -> List[Dict[str, Any]]:
return records
def assert_jsonl_contains_type(stdout: str, record_type: str, min_count: int = 1):
"""Assert output contains at least min_count records of type."""
records = parse_jsonl_output(stdout)
matching = [r for r in records if r.get('type') == record_type]
assert len(matching) >= min_count, \
f"Expected >= {min_count} {record_type}, got {len(matching)}"
return matching
def assert_jsonl_pass_through(stdout: str, input_records: List[Dict[str, Any]]):
"""Assert that input records appear in output (pass-through behavior)."""
output_records = parse_jsonl_output(stdout)
output_ids = {r.get('id') for r in output_records if r.get('id')}
for input_rec in input_records:
input_id = input_rec.get('id')
if input_id:
assert input_id in output_ids, \
f"Input record {input_id} not found in output (pass-through failed)"
def assert_record_has_fields(record: Dict[str, Any], required_fields: List[str]):
"""Assert record has all required fields with non-None values."""
for field in required_fields:
assert field in record, f"Record missing field: {field}"
assert record[field] is not None, f"Record field is None: {field}"
# =============================================================================
# Test Data Factories
# =============================================================================
def create_test_url(domain: str = 'example.com', path: str = None) -> str:
"""Generate unique test URL."""
def create_url(suffix: str = "") -> str:
"""Generate test URL."""
import uuid
path = path or uuid.uuid4().hex[:8]
return f'https://{domain}/{path}'
def create_test_crawl_json(urls: List[str] = None, **kwargs) -> Dict[str, Any]:
"""Create Crawl JSONL record for testing."""
urls = urls or [create_test_url()]
return {
'type': 'Crawl',
'urls': '\n'.join(urls),
'max_depth': kwargs.get('max_depth', 0),
'tags_str': kwargs.get('tags_str', ''),
'status': kwargs.get('status', 'queued'),
**{k: v for k, v in kwargs.items() if k not in ('max_depth', 'tags_str', 'status')},
}
def create_test_snapshot_json(url: str = None, **kwargs) -> Dict[str, Any]:
"""Create Snapshot JSONL record for testing."""
return {
'type': 'Snapshot',
'url': url or create_test_url(),
'tags_str': kwargs.get('tags_str', ''),
'status': kwargs.get('status', 'queued'),
**{k: v for k, v in kwargs.items() if k not in ('tags_str', 'status')},
}
return f'https://example.com/{suffix or uuid.uuid4().hex[:8]}'

View File

@ -1,264 +0,0 @@
"""
Tests for archivebox archiveresult CLI command.
Tests cover:
- archiveresult create (from Snapshot JSONL, with --plugin, pass-through)
- archiveresult list (with filters)
- archiveresult update
- archiveresult delete
"""
import json
import pytest
from archivebox.tests.conftest import (
run_archivebox_cmd,
parse_jsonl_output,
create_test_url,
)
class TestArchiveResultCreate:
"""Tests for `archivebox archiveresult create`."""
def test_create_from_snapshot_jsonl(self, initialized_archive):
"""Create archive results from Snapshot JSONL input."""
url = create_test_url()
# Create a snapshot first
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
# Pipe snapshot to archiveresult create
stdout2, stderr, code = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
records = parse_jsonl_output(stdout2)
# Should have the Snapshot passed through and ArchiveResult created
types = [r.get('type') for r in records]
assert 'Snapshot' in types
assert 'ArchiveResult' in types
ar = next(r for r in records if r['type'] == 'ArchiveResult')
assert ar['plugin'] == 'title'
def test_create_with_specific_plugin(self, initialized_archive):
"""Create archive result for specific plugin."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, stderr, code = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=screenshot'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout2)
ar_records = [r for r in records if r.get('type') == 'ArchiveResult']
assert len(ar_records) >= 1
assert ar_records[0]['plugin'] == 'screenshot'
def test_create_pass_through_crawl(self, initialized_archive):
"""Pass-through Crawl records unchanged."""
url = create_test_url()
# Create crawl and snapshot
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['snapshot', 'create'],
stdin=json.dumps(crawl),
data_dir=initialized_archive,
)
# Now pipe all to archiveresult create
stdout3, stderr, code = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=stdout2,
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout3)
types = [r.get('type') for r in records]
assert 'Crawl' in types
assert 'Snapshot' in types
assert 'ArchiveResult' in types
def test_create_pass_through_only_when_no_snapshots(self, initialized_archive):
"""Only pass-through records but no new snapshots returns success."""
crawl_record = {'type': 'Crawl', 'id': 'fake-id', 'urls': 'https://example.com'}
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'create'],
stdin=json.dumps(crawl_record),
data_dir=initialized_archive,
)
assert code == 0
assert 'Passed through' in stderr
class TestArchiveResultList:
"""Tests for `archivebox archiveresult list`."""
def test_list_empty(self, initialized_archive):
"""List with no archive results returns empty."""
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'list'],
data_dir=initialized_archive,
)
assert code == 0
assert 'Listed 0 archive results' in stderr
def test_list_filter_by_status(self, initialized_archive):
"""Filter archive results by status."""
# Create snapshot and archive result
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'list', '--status=queued'],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
for r in records:
assert r['status'] == 'queued'
def test_list_filter_by_plugin(self, initialized_archive):
"""Filter archive results by plugin."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'list', '--plugin=title'],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
for r in records:
assert r['plugin'] == 'title'
def test_list_with_limit(self, initialized_archive):
"""Limit number of results."""
# Create multiple archive results
for _ in range(3):
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'list', '--limit=2'],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert len(records) == 2
class TestArchiveResultUpdate:
"""Tests for `archivebox archiveresult update`."""
def test_update_status(self, initialized_archive):
"""Update archive result status."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
stdout3, stderr, code = run_archivebox_cmd(
['archiveresult', 'update', '--status=failed'],
stdin=json.dumps(ar),
data_dir=initialized_archive,
)
assert code == 0
assert 'Updated 1 archive results' in stderr
records = parse_jsonl_output(stdout3)
assert records[0]['status'] == 'failed'
class TestArchiveResultDelete:
"""Tests for `archivebox archiveresult delete`."""
def test_delete_requires_yes(self, initialized_archive):
"""Delete requires --yes flag."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'delete'],
stdin=json.dumps(ar),
data_dir=initialized_archive,
)
assert code == 1
assert '--yes' in stderr
def test_delete_with_yes(self, initialized_archive):
"""Delete with --yes flag works."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
stdout, stderr, code = run_archivebox_cmd(
['archiveresult', 'delete', '--yes'],
stdin=json.dumps(ar),
data_dir=initialized_archive,
)
assert code == 0
assert 'Deleted 1 archive results' in stderr

View File

@ -1,261 +0,0 @@
"""
Tests for archivebox crawl CLI command.
Tests cover:
- crawl create (with URLs, from stdin, pass-through)
- crawl list (with filters)
- crawl update
- crawl delete
"""
import json
import pytest
from archivebox.tests.conftest import (
run_archivebox_cmd,
parse_jsonl_output,
assert_jsonl_contains_type,
create_test_url,
create_test_crawl_json,
)
class TestCrawlCreate:
"""Tests for `archivebox crawl create`."""
def test_create_from_url_args(self, initialized_archive):
"""Create crawl from URL arguments."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create', url],
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
assert 'Created crawl' in stderr
# Check JSONL output
records = parse_jsonl_output(stdout)
assert len(records) == 1
assert records[0]['type'] == 'Crawl'
assert url in records[0]['urls']
def test_create_from_stdin_urls(self, initialized_archive):
"""Create crawl from stdin URLs (one per line)."""
urls = [create_test_url() for _ in range(3)]
stdin = '\n'.join(urls)
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create'],
stdin=stdin,
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
records = parse_jsonl_output(stdout)
assert len(records) == 1
crawl = records[0]
assert crawl['type'] == 'Crawl'
# All URLs should be in the crawl
for url in urls:
assert url in crawl['urls']
def test_create_with_depth(self, initialized_archive):
"""Create crawl with --depth flag."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create', '--depth=2', url],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert records[0]['max_depth'] == 2
def test_create_with_tag(self, initialized_archive):
"""Create crawl with --tag flag."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create', '--tag=test-tag', url],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert 'test-tag' in records[0].get('tags_str', '')
def test_create_pass_through_other_types(self, initialized_archive):
"""Pass-through records of other types unchanged."""
tag_record = {'type': 'Tag', 'id': 'fake-tag-id', 'name': 'test'}
url = create_test_url()
stdin = json.dumps(tag_record) + '\n' + json.dumps({'url': url})
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create'],
stdin=stdin,
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
# Should have both the passed-through Tag and the new Crawl
types = [r.get('type') for r in records]
assert 'Tag' in types
assert 'Crawl' in types
def test_create_pass_through_existing_crawl(self, initialized_archive):
"""Existing Crawl records (with id) are passed through."""
# First create a crawl
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
# Now pipe it back - should pass through
stdout2, stderr, code = run_archivebox_cmd(
['crawl', 'create'],
stdin=json.dumps(crawl),
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout2)
assert len(records) == 1
assert records[0]['id'] == crawl['id']
class TestCrawlList:
"""Tests for `archivebox crawl list`."""
def test_list_empty(self, initialized_archive):
"""List with no crawls returns empty."""
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'list'],
data_dir=initialized_archive,
)
assert code == 0
assert 'Listed 0 crawls' in stderr
def test_list_returns_created(self, initialized_archive):
"""List returns previously created crawls."""
url = create_test_url()
run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'list'],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert len(records) >= 1
assert any(url in r.get('urls', '') for r in records)
def test_list_filter_by_status(self, initialized_archive):
"""Filter crawls by status."""
url = create_test_url()
run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'list', '--status=queued'],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
for r in records:
assert r['status'] == 'queued'
def test_list_with_limit(self, initialized_archive):
"""Limit number of results."""
# Create multiple crawls
for _ in range(3):
run_archivebox_cmd(['crawl', 'create', create_test_url()], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'list', '--limit=2'],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert len(records) == 2
class TestCrawlUpdate:
"""Tests for `archivebox crawl update`."""
def test_update_status(self, initialized_archive):
"""Update crawl status."""
# Create a crawl
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
# Update it
stdout2, stderr, code = run_archivebox_cmd(
['crawl', 'update', '--status=started'],
stdin=json.dumps(crawl),
data_dir=initialized_archive,
)
assert code == 0
assert 'Updated 1 crawls' in stderr
records = parse_jsonl_output(stdout2)
assert records[0]['status'] == 'started'
class TestCrawlDelete:
"""Tests for `archivebox crawl delete`."""
def test_delete_requires_yes(self, initialized_archive):
"""Delete requires --yes flag."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'delete'],
stdin=json.dumps(crawl),
data_dir=initialized_archive,
)
assert code == 1
assert '--yes' in stderr
def test_delete_with_yes(self, initialized_archive):
"""Delete with --yes flag works."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'delete', '--yes'],
stdin=json.dumps(crawl),
data_dir=initialized_archive,
)
assert code == 0
assert 'Deleted 1 crawls' in stderr
def test_delete_dry_run(self, initialized_archive):
"""Dry run shows what would be deleted."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'delete', '--dry-run'],
stdin=json.dumps(crawl),
data_dir=initialized_archive,
)
assert code == 0
assert 'Would delete' in stderr
assert 'dry run' in stderr.lower()

View File

@ -0,0 +1,270 @@
"""
Tests for CLI JSONL piping with real URL archiving.
Tests the REAL piping workflows users will run:
- archivebox crawl create URL | archivebox run
- archivebox snapshot create URL | archivebox run
- archivebox archiveresult list --status=failed | archivebox run
- Pass-through behavior (accumulating records through pipeline)
Uses module-scoped fixture for speed - init runs ONCE per test file.
Uses inline mode in orchestrator for fast processing (no subprocess overhead).
"""
import json
import pytest
from archivebox.tests.conftest import run_archivebox_cmd, parse_jsonl, create_url
class TestCrawlPipeline:
"""Test: archivebox crawl create URL | archivebox run"""
def test_crawl_create_outputs_jsonl(self, shared_archive):
"""crawl create outputs Crawl JSONL to stdout."""
url = create_url("crawl-test")
stdout, stderr, code = run_archivebox_cmd(
['crawl', 'create', url],
data_dir=shared_archive,
)
assert code == 0, stderr
records = parse_jsonl(stdout)
assert len(records) == 1
assert records[0]['type'] == 'Crawl'
assert url in records[0]['urls']
def test_crawl_pipe_to_run(self, shared_archive):
"""crawl create | run - processes with inline mode (fast)."""
url = create_url("pipe-to-run")
# Create crawl
stdout1, _, code1 = run_archivebox_cmd(
['crawl', 'create', url],
data_dir=shared_archive,
)
assert code1 == 0
# Pipe to run (uses inline mode for fast processing)
stdout2, stderr2, code2 = run_archivebox_cmd(
['run'],
stdin=stdout1,
data_dir=shared_archive,
timeout=30,
)
assert code2 == 0, stderr2
# run outputs processed records
records = parse_jsonl(stdout2)
assert len(records) >= 1
assert any(r.get('type') == 'Crawl' for r in records)
class TestSnapshotPipeline:
"""Test: archivebox snapshot create URL | archivebox run"""
def test_snapshot_from_crawl(self, shared_archive):
"""snapshot create accepts Crawl JSONL and creates Snapshots."""
url = create_url("snap-from-crawl")
# Create crawl
stdout1, _, _ = run_archivebox_cmd(
['crawl', 'create', url],
data_dir=shared_archive,
)
# Pipe to snapshot create
stdout2, stderr, code = run_archivebox_cmd(
['snapshot', 'create'],
stdin=stdout1,
data_dir=shared_archive,
)
assert code == 0, stderr
records = parse_jsonl(stdout2)
types = {r['type'] for r in records}
# Should have Crawl (passed through) and Snapshot (created)
assert 'Crawl' in types
assert 'Snapshot' in types
def test_snapshot_pipe_to_run(self, shared_archive):
"""snapshot create | run - processes with inline mode."""
url = create_url("snap-to-run")
# Create snapshot
stdout1, _, code1 = run_archivebox_cmd(
['snapshot', 'create', url],
data_dir=shared_archive,
)
assert code1 == 0
# Pipe to run
stdout2, stderr2, code2 = run_archivebox_cmd(
['run'],
stdin=stdout1,
data_dir=shared_archive,
timeout=30,
)
assert code2 == 0, stderr2
records = parse_jsonl(stdout2)
snapshots = [r for r in records if r.get('type') == 'Snapshot']
assert len(snapshots) >= 1
class TestArchiveResultPipeline:
"""Test: snapshot | archiveresult create --plugin=X | run"""
def test_archiveresult_from_snapshot(self, shared_archive):
"""archiveresult create accepts Snapshot JSONL."""
url = create_url("ar-from-snap")
# Create snapshot
stdout1, _, _ = run_archivebox_cmd(
['snapshot', 'create', url],
data_dir=shared_archive,
)
# Pipe to archiveresult create
stdout2, stderr, code = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=headers'],
stdin=stdout1,
data_dir=shared_archive,
)
assert code == 0, stderr
records = parse_jsonl(stdout2)
types = {r['type'] for r in records}
# Should have Snapshot (passed through) and ArchiveResult (created)
assert 'Snapshot' in types
assert 'ArchiveResult' in types
class TestFullPipeline:
"""Test: crawl create | snapshot create | archiveresult create | run"""
def test_full_four_stage_pipeline(self, shared_archive):
"""Full 4-stage pipeline with real processing."""
url = create_url("full-pipeline")
# Stage 1: crawl create
out1, _, code1 = run_archivebox_cmd(
['crawl', 'create', url],
data_dir=shared_archive,
)
assert code1 == 0
# Stage 2: snapshot create
out2, _, code2 = run_archivebox_cmd(
['snapshot', 'create'],
stdin=out1,
data_dir=shared_archive,
)
assert code2 == 0
# Stage 3: archiveresult create
out3, _, code3 = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=headers'],
stdin=out2,
data_dir=shared_archive,
)
assert code3 == 0
# Stage 4: run (inline mode)
out4, stderr, code4 = run_archivebox_cmd(
['run'],
stdin=out3,
data_dir=shared_archive,
timeout=30,
)
assert code4 == 0, stderr
# Final output should have records
records = parse_jsonl(out4)
types = {r['type'] for r in records}
assert len(types) >= 1
class TestPassThrough:
"""Test pass-through behavior - unknown types pass through unchanged."""
def test_unknown_type_passes_through(self, shared_archive):
"""Records with unknown types pass through all commands."""
unknown = {'type': 'CustomType', 'id': 'test-123', 'data': 'preserved'}
url = create_url("passthrough")
stdin = json.dumps(unknown) + '\n' + url
stdout, _, code = run_archivebox_cmd(
['crawl', 'create'],
stdin=stdin,
data_dir=shared_archive,
)
assert code == 0
records = parse_jsonl(stdout)
types = {r['type'] for r in records}
# CustomType should be passed through
assert 'CustomType' in types
assert 'Crawl' in types
# Data should be preserved
custom = next(r for r in records if r['type'] == 'CustomType')
assert custom['data'] == 'preserved'
class TestListCommands:
"""Test list commands with filters."""
def test_crawl_list_filter_status(self, shared_archive):
"""crawl list --status=queued filters correctly."""
# Create a crawl first
url = create_url("list-filter")
run_archivebox_cmd(['crawl', 'create', url], data_dir=shared_archive)
# List with filter
stdout, _, code = run_archivebox_cmd(
['crawl', 'list', '--status=queued'],
data_dir=shared_archive,
)
assert code == 0
records = parse_jsonl(stdout)
for r in records:
assert r['status'] == 'queued'
def test_snapshot_list_outputs_jsonl(self, shared_archive):
"""snapshot list outputs JSONL for piping."""
stdout, _, code = run_archivebox_cmd(
['snapshot', 'list'],
data_dir=shared_archive,
)
assert code == 0
# Output is valid JSONL (even if empty)
records = parse_jsonl(stdout)
for r in records:
assert r['type'] == 'Snapshot'
class TestRunBehavior:
"""Test run's create-or-update behavior."""
def test_run_creates_from_url_record(self, shared_archive):
"""run creates Snapshot from URL record without id."""
url = create_url("run-create")
record = {'url': url, 'type': 'Snapshot'}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(record),
data_dir=shared_archive,
timeout=30,
)
assert code == 0, stderr
records = parse_jsonl(stdout)
snapshots = [r for r in records if r.get('type') == 'Snapshot']
assert len(snapshots) >= 1
assert snapshots[0].get('id') # Should have ID after creation

View File

@ -1,254 +0,0 @@
"""
Tests for archivebox run CLI command.
Tests cover:
- run with stdin JSONL (Crawl, Snapshot, ArchiveResult)
- create-or-update behavior (records with/without id)
- pass-through output (for chaining)
"""
import json
import pytest
from archivebox.tests.conftest import (
run_archivebox_cmd,
parse_jsonl_output,
create_test_url,
create_test_crawl_json,
create_test_snapshot_json,
)
class TestRunWithCrawl:
"""Tests for `archivebox run` with Crawl input."""
def test_run_with_new_crawl(self, initialized_archive):
"""Run creates and processes a new Crawl (no id)."""
crawl_record = create_test_crawl_json()
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(crawl_record),
data_dir=initialized_archive,
timeout=120,
)
assert code == 0, f"Command failed: {stderr}"
# Should output the created Crawl
records = parse_jsonl_output(stdout)
crawl_records = [r for r in records if r.get('type') == 'Crawl']
assert len(crawl_records) >= 1
assert crawl_records[0].get('id') # Should have an id now
def test_run_with_existing_crawl(self, initialized_archive):
"""Run re-queues an existing Crawl (with id)."""
url = create_test_url()
# First create a crawl
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
# Run with the existing crawl
stdout2, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(crawl),
data_dir=initialized_archive,
timeout=120,
)
assert code == 0
records = parse_jsonl_output(stdout2)
assert len(records) >= 1
class TestRunWithSnapshot:
"""Tests for `archivebox run` with Snapshot input."""
def test_run_with_new_snapshot(self, initialized_archive):
"""Run creates and processes a new Snapshot (no id, just url)."""
snapshot_record = create_test_snapshot_json()
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(snapshot_record),
data_dir=initialized_archive,
timeout=120,
)
assert code == 0, f"Command failed: {stderr}"
records = parse_jsonl_output(stdout)
snapshot_records = [r for r in records if r.get('type') == 'Snapshot']
assert len(snapshot_records) >= 1
assert snapshot_records[0].get('id')
def test_run_with_existing_snapshot(self, initialized_archive):
"""Run re-queues an existing Snapshot (with id)."""
url = create_test_url()
# First create a snapshot
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
# Run with the existing snapshot
stdout2, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
timeout=120,
)
assert code == 0
records = parse_jsonl_output(stdout2)
assert len(records) >= 1
def test_run_with_plain_url(self, initialized_archive):
"""Run accepts plain URL records (no type field)."""
url = create_test_url()
url_record = {'url': url}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(url_record),
data_dir=initialized_archive,
timeout=120,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert len(records) >= 1
class TestRunWithArchiveResult:
"""Tests for `archivebox run` with ArchiveResult input."""
def test_run_requeues_failed_archiveresult(self, initialized_archive):
"""Run re-queues a failed ArchiveResult."""
url = create_test_url()
# Create snapshot and archive result
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, _, _ = run_archivebox_cmd(
['archiveresult', 'create', '--plugin=title'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
ar = next(r for r in parse_jsonl_output(stdout2) if r.get('type') == 'ArchiveResult')
# Update to failed
ar['status'] = 'failed'
run_archivebox_cmd(
['archiveresult', 'update', '--status=failed'],
stdin=json.dumps(ar),
data_dir=initialized_archive,
)
# Now run should re-queue it
stdout3, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(ar),
data_dir=initialized_archive,
timeout=120,
)
assert code == 0
records = parse_jsonl_output(stdout3)
ar_records = [r for r in records if r.get('type') == 'ArchiveResult']
assert len(ar_records) >= 1
class TestRunPassThrough:
"""Tests for pass-through behavior in `archivebox run`."""
def test_run_passes_through_unknown_types(self, initialized_archive):
"""Run passes through records with unknown types."""
unknown_record = {'type': 'Unknown', 'id': 'fake-id', 'data': 'test'}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(unknown_record),
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
unknown_records = [r for r in records if r.get('type') == 'Unknown']
assert len(unknown_records) == 1
assert unknown_records[0]['data'] == 'test'
def test_run_outputs_all_processed_records(self, initialized_archive):
"""Run outputs all processed records for chaining."""
url = create_test_url()
crawl_record = create_test_crawl_json(urls=[url])
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(crawl_record),
data_dir=initialized_archive,
timeout=120,
)
assert code == 0
records = parse_jsonl_output(stdout)
# Should have at least the Crawl in output
assert len(records) >= 1
class TestRunMixedInput:
"""Tests for `archivebox run` with mixed record types."""
def test_run_handles_mixed_types(self, initialized_archive):
"""Run handles mixed Crawl/Snapshot/ArchiveResult input."""
crawl = create_test_crawl_json()
snapshot = create_test_snapshot_json()
unknown = {'type': 'Tag', 'id': 'fake', 'name': 'test'}
stdin = '\n'.join([
json.dumps(crawl),
json.dumps(snapshot),
json.dumps(unknown),
])
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=stdin,
data_dir=initialized_archive,
timeout=120,
)
assert code == 0
records = parse_jsonl_output(stdout)
types = set(r.get('type') for r in records)
# Should have processed Crawl and Snapshot, passed through Tag
assert 'Crawl' in types or 'Snapshot' in types or 'Tag' in types
class TestRunEmpty:
"""Tests for `archivebox run` edge cases."""
def test_run_empty_stdin(self, initialized_archive):
"""Run with empty stdin returns success."""
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin='',
data_dir=initialized_archive,
)
assert code == 0
def test_run_no_records_to_process(self, initialized_archive):
"""Run with only pass-through records shows message."""
unknown = {'type': 'Unknown', 'id': 'fake'}
stdout, stderr, code = run_archivebox_cmd(
['run'],
stdin=json.dumps(unknown),
data_dir=initialized_archive,
)
assert code == 0
assert 'No records to process' in stderr

View File

@ -1,274 +0,0 @@
"""
Tests for archivebox snapshot CLI command.
Tests cover:
- snapshot create (from URLs, from Crawl JSONL, pass-through)
- snapshot list (with filters)
- snapshot update
- snapshot delete
"""
import json
import pytest
from archivebox.tests.conftest import (
run_archivebox_cmd,
parse_jsonl_output,
assert_jsonl_contains_type,
create_test_url,
)
class TestSnapshotCreate:
"""Tests for `archivebox snapshot create`."""
def test_create_from_url_args(self, initialized_archive):
"""Create snapshot from URL arguments."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'create', url],
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
assert 'Created' in stderr
records = parse_jsonl_output(stdout)
assert len(records) == 1
assert records[0]['type'] == 'Snapshot'
assert records[0]['url'] == url
def test_create_from_crawl_jsonl(self, initialized_archive):
"""Create snapshots from Crawl JSONL input."""
url = create_test_url()
# First create a crawl
stdout1, _, _ = run_archivebox_cmd(['crawl', 'create', url], data_dir=initialized_archive)
crawl = parse_jsonl_output(stdout1)[0]
# Pipe crawl to snapshot create
stdout2, stderr, code = run_archivebox_cmd(
['snapshot', 'create'],
stdin=json.dumps(crawl),
data_dir=initialized_archive,
)
assert code == 0, f"Command failed: {stderr}"
records = parse_jsonl_output(stdout2)
# Should have the Crawl passed through and the Snapshot created
types = [r.get('type') for r in records]
assert 'Crawl' in types
assert 'Snapshot' in types
snapshot = next(r for r in records if r['type'] == 'Snapshot')
assert snapshot['url'] == url
def test_create_with_tag(self, initialized_archive):
"""Create snapshot with --tag flag."""
url = create_test_url()
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'create', '--tag=test-tag', url],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert 'test-tag' in records[0].get('tags_str', '')
def test_create_pass_through_other_types(self, initialized_archive):
"""Pass-through records of other types unchanged."""
tag_record = {'type': 'Tag', 'id': 'fake-tag-id', 'name': 'test'}
url = create_test_url()
stdin = json.dumps(tag_record) + '\n' + json.dumps({'url': url})
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'create'],
stdin=stdin,
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
types = [r.get('type') for r in records]
assert 'Tag' in types
assert 'Snapshot' in types
def test_create_multiple_urls(self, initialized_archive):
"""Create snapshots from multiple URLs."""
urls = [create_test_url() for _ in range(3)]
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'create'] + urls,
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert len(records) == 3
created_urls = {r['url'] for r in records}
for url in urls:
assert url in created_urls
class TestSnapshotList:
"""Tests for `archivebox snapshot list`."""
def test_list_empty(self, initialized_archive):
"""List with no snapshots returns empty."""
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list'],
data_dir=initialized_archive,
)
assert code == 0
assert 'Listed 0 snapshots' in stderr
def test_list_returns_created(self, initialized_archive):
"""List returns previously created snapshots."""
url = create_test_url()
run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list'],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert len(records) >= 1
assert any(r.get('url') == url for r in records)
def test_list_filter_by_status(self, initialized_archive):
"""Filter snapshots by status."""
url = create_test_url()
run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list', '--status=queued'],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
for r in records:
assert r['status'] == 'queued'
def test_list_filter_by_url_contains(self, initialized_archive):
"""Filter snapshots by URL contains."""
url = create_test_url(domain='unique-domain-12345.com')
run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list', '--url__icontains=unique-domain-12345'],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert len(records) == 1
assert 'unique-domain-12345' in records[0]['url']
def test_list_with_limit(self, initialized_archive):
"""Limit number of results."""
for _ in range(3):
run_archivebox_cmd(['snapshot', 'create', create_test_url()], data_dir=initialized_archive)
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'list', '--limit=2'],
data_dir=initialized_archive,
)
assert code == 0
records = parse_jsonl_output(stdout)
assert len(records) == 2
class TestSnapshotUpdate:
"""Tests for `archivebox snapshot update`."""
def test_update_status(self, initialized_archive):
"""Update snapshot status."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, stderr, code = run_archivebox_cmd(
['snapshot', 'update', '--status=started'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
assert code == 0
assert 'Updated 1 snapshots' in stderr
records = parse_jsonl_output(stdout2)
assert records[0]['status'] == 'started'
def test_update_add_tag(self, initialized_archive):
"""Update snapshot by adding tag."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout2, stderr, code = run_archivebox_cmd(
['snapshot', 'update', '--tag=new-tag'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
assert code == 0
assert 'Updated 1 snapshots' in stderr
class TestSnapshotDelete:
"""Tests for `archivebox snapshot delete`."""
def test_delete_requires_yes(self, initialized_archive):
"""Delete requires --yes flag."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'delete'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
assert code == 1
assert '--yes' in stderr
def test_delete_with_yes(self, initialized_archive):
"""Delete with --yes flag works."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'delete', '--yes'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
assert code == 0
assert 'Deleted 1 snapshots' in stderr
def test_delete_dry_run(self, initialized_archive):
"""Dry run shows what would be deleted."""
url = create_test_url()
stdout1, _, _ = run_archivebox_cmd(['snapshot', 'create', url], data_dir=initialized_archive)
snapshot = parse_jsonl_output(stdout1)[0]
stdout, stderr, code = run_archivebox_cmd(
['snapshot', 'delete', '--dry-run'],
stdin=json.dumps(snapshot),
data_dir=initialized_archive,
)
assert code == 0
assert 'Would delete' in stderr

0
archivebox/tests/test_hooks.py Executable file → Normal file
View File

View File

@ -57,14 +57,19 @@ def _run_orchestrator_process(exit_on_idle: bool) -> None:
class Orchestrator:
"""
Manages worker processes by polling queues and spawning workers as needed.
The orchestrator:
1. Polls each model queue (Crawl, Snapshot, ArchiveResult)
2. If items exist and fewer than MAX_CONCURRENT workers are running, spawns workers
3. Monitors worker health and cleans up stale PIDs
4. Exits when all queues are empty (unless daemon mode)
Inline mode (inline=True):
- Processes items directly in the same process (no subprocess spawn)
- Much faster for small batches (avoids 2-3 sec subprocess overhead per worker)
- Useful for CLI piping and tests
"""
WORKER_TYPES: list[Type[Worker]] = [CrawlWorker, SnapshotWorker, ArchiveResultWorker]
# Configuration
@ -72,12 +77,18 @@ class Orchestrator:
IDLE_TIMEOUT: int = 3 # Exit after N idle ticks (0 = never exit)
MAX_WORKERS_PER_TYPE: int = 8 # Max workers per model type
MAX_TOTAL_WORKERS: int = 24 # Max workers across all types
def __init__(self, exit_on_idle: bool = True):
def __init__(self, exit_on_idle: bool = True, inline: bool = False):
self.exit_on_idle = exit_on_idle
self.inline = inline # Process items directly instead of spawning workers
self.pid: int = os.getpid()
self.pid_file = None
self.idle_count: int = 0
# Faster polling in inline mode
if self.inline:
self.POLL_INTERVAL = 0.1
self.IDLE_TIMEOUT = 2
def __repr__(self) -> str:
return f'[underline]Orchestrator[/underline]\\[pid={self.pid}]'
@ -169,13 +180,31 @@ class Orchestrator:
)
return None
def process_inline(self, WorkerClass: Type[Worker]) -> int:
"""
Process items inline (same process) instead of spawning workers.
Returns number of items processed.
"""
worker = WorkerClass(worker_id=0)
processed = 0
while True:
obj = worker.claim_next()
if obj is None:
break
worker.process_item(obj)
processed += 1
return processed
def check_queues_and_spawn_workers(self) -> dict[str, int]:
"""
Check all queues and spawn workers as needed.
Returns dict of queue sizes by worker type.
"""
queue_sizes = {}
for WorkerClass in self.WORKER_TYPES:
# Get queue for this worker type
# Need to instantiate worker to get queue (for model access)
@ -183,11 +212,17 @@ class Orchestrator:
queue = worker.get_queue()
queue_count = queue.count()
queue_sizes[WorkerClass.name] = queue_count
# Spawn worker if needed
if self.should_spawn_worker(WorkerClass, queue_count):
if queue_count == 0:
continue
if self.inline:
# Process items directly (fast, no subprocess overhead)
self.process_inline(WorkerClass)
elif self.should_spawn_worker(WorkerClass, queue_count):
# Spawn worker subprocess (slow, but parallel)
self.spawn_worker(WorkerClass)
return queue_sizes
def has_pending_work(self, queue_sizes: dict[str, int]) -> bool: