2026-01-05 02:18:38 -08:00

321 lines
11 KiB
Python

#!/usr/bin/env python3
"""
archivebox run [--daemon] [--crawl-id=...] [--snapshot-id=...]
Unified command for processing queued work.
Modes:
- With stdin JSONL: Process piped records, exit when complete
- Without stdin (TTY): Run orchestrator in foreground until killed
- --crawl-id: Run orchestrator for specific crawl only
- --snapshot-id: Run worker for specific snapshot only (internal use)
Examples:
# Run orchestrator in foreground
archivebox run
# Run as daemon (don't exit on idle)
archivebox run --daemon
# Process specific records (pipe any JSONL type, exits when done)
archivebox snapshot list --status=queued | archivebox run
archivebox archiveresult list --status=failed | archivebox run
archivebox crawl list --status=queued | archivebox run
# Mixed types work too
cat mixed_records.jsonl | archivebox run
# Run orchestrator for specific crawl (shows live progress for that crawl)
archivebox run --crawl-id=019b7e90-04d0-73ed-adec-aad9cfcd863e
# Run worker for specific snapshot (internal use by orchestrator)
archivebox run --snapshot-id=019b7e90-5a8e-712c-9877-2c70eebe80ad
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox run'
import sys
import rich_click as click
from rich import print as rprint
def process_stdin_records() -> int:
"""
Process JSONL records from stdin.
Create-or-update behavior:
- Records WITHOUT id: Create via Model.from_json(), then queue
- Records WITH id: Lookup existing, re-queue for processing
Outputs JSONL of all processed records (for chaining).
Handles any record type: Crawl, Snapshot, ArchiveResult.
Auto-cascades: Crawl → Snapshots → ArchiveResults.
Returns exit code (0 = success, 1 = error).
"""
from django.utils import timezone
from archivebox.misc.jsonl import read_stdin, write_record, TYPE_CRAWL, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_BINARY
from archivebox.base_models.models import get_or_create_system_user_pk
from archivebox.core.models import Snapshot, ArchiveResult
from archivebox.crawls.models import Crawl
from archivebox.machine.models import Binary
from archivebox.workers.orchestrator import Orchestrator
records = list(read_stdin())
is_tty = sys.stdout.isatty()
if not records:
return 0 # Nothing to process
created_by_id = get_or_create_system_user_pk()
queued_count = 0
output_records = []
for record in records:
record_type = record.get('type', '')
record_id = record.get('id')
try:
if record_type == TYPE_CRAWL:
if record_id:
# Existing crawl - re-queue
try:
crawl = Crawl.objects.get(id=record_id)
except Crawl.DoesNotExist:
crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
else:
# New crawl - create it
crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
if crawl:
crawl.retry_at = timezone.now()
if crawl.status not in [Crawl.StatusChoices.SEALED]:
crawl.status = Crawl.StatusChoices.QUEUED
crawl.save()
output_records.append(crawl.to_json())
queued_count += 1
elif record_type == TYPE_SNAPSHOT or (record.get('url') and not record_type):
if record_id:
# Existing snapshot - re-queue
try:
snapshot = Snapshot.objects.get(id=record_id)
except Snapshot.DoesNotExist:
snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
else:
# New snapshot - create it
snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
if snapshot:
snapshot.retry_at = timezone.now()
if snapshot.status not in [Snapshot.StatusChoices.SEALED]:
snapshot.status = Snapshot.StatusChoices.QUEUED
snapshot.save()
output_records.append(snapshot.to_json())
queued_count += 1
elif record_type == TYPE_ARCHIVERESULT:
if record_id:
# Existing archiveresult - re-queue
try:
archiveresult = ArchiveResult.objects.get(id=record_id)
except ArchiveResult.DoesNotExist:
archiveresult = ArchiveResult.from_json(record)
else:
# New archiveresult - create it
archiveresult = ArchiveResult.from_json(record)
if archiveresult:
archiveresult.retry_at = timezone.now()
if archiveresult.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED, ArchiveResult.StatusChoices.BACKOFF]:
archiveresult.status = ArchiveResult.StatusChoices.QUEUED
archiveresult.save()
output_records.append(archiveresult.to_json())
queued_count += 1
elif record_type == TYPE_BINARY:
# Binary records - create or update and queue for installation
if record_id:
# Existing binary - re-queue
try:
binary = Binary.objects.get(id=record_id)
except Binary.DoesNotExist:
binary = Binary.from_json(record)
else:
# New binary - create it
binary = Binary.from_json(record)
if binary:
binary.retry_at = timezone.now()
if binary.status != Binary.StatusChoices.INSTALLED:
binary.status = Binary.StatusChoices.QUEUED
binary.save()
output_records.append(binary.to_json())
queued_count += 1
else:
# Unknown type - pass through
output_records.append(record)
except Exception as e:
rprint(f'[yellow]Error processing record: {e}[/yellow]', file=sys.stderr)
continue
# Output all processed records (for chaining)
if not is_tty:
for rec in output_records:
write_record(rec)
if queued_count == 0:
rprint('[yellow]No records to process[/yellow]', file=sys.stderr)
return 0
rprint(f'[blue]Processing {queued_count} records...[/blue]', file=sys.stderr)
# Run orchestrator until all queued work is done
orchestrator = Orchestrator(exit_on_idle=True)
orchestrator.runloop()
return 0
def run_orchestrator(daemon: bool = False) -> int:
"""
Run the orchestrator process.
The orchestrator:
1. Polls each model queue (Crawl, Snapshot, ArchiveResult)
2. Spawns worker processes when there is work to do
3. Monitors worker health and restarts failed workers
4. Exits when all queues are empty (unless --daemon)
Args:
daemon: Run forever (don't exit when idle)
Returns exit code (0 = success, 1 = error).
"""
from archivebox.workers.orchestrator import Orchestrator
if Orchestrator.is_running():
rprint('[yellow]Orchestrator is already running[/yellow]', file=sys.stderr)
return 0
try:
orchestrator = Orchestrator(exit_on_idle=not daemon)
orchestrator.runloop()
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
rprint(f'[red]Orchestrator error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
return 1
def run_snapshot_worker(snapshot_id: str) -> int:
"""
Run a SnapshotWorker for a specific snapshot.
Args:
snapshot_id: Snapshot UUID to process
Returns exit code (0 = success, 1 = error).
"""
from archivebox.workers.worker import _run_snapshot_worker
try:
_run_snapshot_worker(snapshot_id=snapshot_id, worker_id=0)
return 0
except KeyboardInterrupt:
return 0
except Exception as e:
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
return 1
@click.command()
@click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)")
@click.option('--crawl-id', help="Run orchestrator for specific crawl only")
@click.option('--snapshot-id', help="Run worker for specific snapshot only")
@click.option('--binary-id', help="Run worker for specific binary only")
@click.option('--worker-type', help="Run worker of specific type (binary)")
def main(daemon: bool, crawl_id: str, snapshot_id: str, binary_id: str, worker_type: str):
"""
Process queued work.
Modes:
- No args + stdin piped: Process piped JSONL records
- No args + TTY: Run orchestrator for all work
- --crawl-id: Run orchestrator for that crawl only
- --snapshot-id: Run worker for that snapshot only
- --binary-id: Run worker for that binary only
"""
# Snapshot worker mode
if snapshot_id:
sys.exit(run_snapshot_worker(snapshot_id))
# Binary worker mode (specific binary)
if binary_id:
from archivebox.workers.worker import BinaryWorker
try:
worker = BinaryWorker(binary_id=binary_id, worker_id=0)
worker.runloop()
sys.exit(0)
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
# Worker type mode (daemon - processes all pending items)
if worker_type:
if worker_type == 'binary':
from archivebox.workers.worker import BinaryWorker
try:
worker = BinaryWorker(worker_id=0) # No binary_id = daemon mode
worker.runloop()
sys.exit(0)
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
else:
rprint(f'[red]Unknown worker type: {worker_type}[/red]', file=sys.stderr)
sys.exit(1)
# Crawl worker mode
if crawl_id:
from archivebox.workers.worker import CrawlWorker
try:
worker = CrawlWorker(crawl_id=crawl_id, worker_id=0)
worker.runloop()
sys.exit(0)
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
# Check if stdin has data (non-TTY means piped input)
if not sys.stdin.isatty():
sys.exit(process_stdin_records())
else:
sys.exit(run_orchestrator(daemon=daemon))
if __name__ == '__main__':
main()