mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-15 08:22:39 +08:00
321 lines
11 KiB
Python
321 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
|
|
"""
|
|
archivebox run [--daemon] [--crawl-id=...] [--snapshot-id=...]
|
|
|
|
Unified command for processing queued work.
|
|
|
|
Modes:
|
|
- With stdin JSONL: Process piped records, exit when complete
|
|
- Without stdin (TTY): Run orchestrator in foreground until killed
|
|
- --crawl-id: Run orchestrator for specific crawl only
|
|
- --snapshot-id: Run worker for specific snapshot only (internal use)
|
|
|
|
Examples:
|
|
# Run orchestrator in foreground
|
|
archivebox run
|
|
|
|
# Run as daemon (don't exit on idle)
|
|
archivebox run --daemon
|
|
|
|
# Process specific records (pipe any JSONL type, exits when done)
|
|
archivebox snapshot list --status=queued | archivebox run
|
|
archivebox archiveresult list --status=failed | archivebox run
|
|
archivebox crawl list --status=queued | archivebox run
|
|
|
|
# Mixed types work too
|
|
cat mixed_records.jsonl | archivebox run
|
|
|
|
# Run orchestrator for specific crawl (shows live progress for that crawl)
|
|
archivebox run --crawl-id=019b7e90-04d0-73ed-adec-aad9cfcd863e
|
|
|
|
# Run worker for specific snapshot (internal use by orchestrator)
|
|
archivebox run --snapshot-id=019b7e90-5a8e-712c-9877-2c70eebe80ad
|
|
"""
|
|
|
|
__package__ = 'archivebox.cli'
|
|
__command__ = 'archivebox run'
|
|
|
|
import sys
|
|
|
|
import rich_click as click
|
|
from rich import print as rprint
|
|
|
|
|
|
def process_stdin_records() -> int:
|
|
"""
|
|
Process JSONL records from stdin.
|
|
|
|
Create-or-update behavior:
|
|
- Records WITHOUT id: Create via Model.from_json(), then queue
|
|
- Records WITH id: Lookup existing, re-queue for processing
|
|
|
|
Outputs JSONL of all processed records (for chaining).
|
|
|
|
Handles any record type: Crawl, Snapshot, ArchiveResult.
|
|
Auto-cascades: Crawl → Snapshots → ArchiveResults.
|
|
|
|
Returns exit code (0 = success, 1 = error).
|
|
"""
|
|
from django.utils import timezone
|
|
|
|
from archivebox.misc.jsonl import read_stdin, write_record, TYPE_CRAWL, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_BINARY
|
|
from archivebox.base_models.models import get_or_create_system_user_pk
|
|
from archivebox.core.models import Snapshot, ArchiveResult
|
|
from archivebox.crawls.models import Crawl
|
|
from archivebox.machine.models import Binary
|
|
from archivebox.workers.orchestrator import Orchestrator
|
|
|
|
records = list(read_stdin())
|
|
is_tty = sys.stdout.isatty()
|
|
|
|
if not records:
|
|
return 0 # Nothing to process
|
|
|
|
created_by_id = get_or_create_system_user_pk()
|
|
queued_count = 0
|
|
output_records = []
|
|
|
|
for record in records:
|
|
record_type = record.get('type', '')
|
|
record_id = record.get('id')
|
|
|
|
try:
|
|
if record_type == TYPE_CRAWL:
|
|
if record_id:
|
|
# Existing crawl - re-queue
|
|
try:
|
|
crawl = Crawl.objects.get(id=record_id)
|
|
except Crawl.DoesNotExist:
|
|
crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
|
|
else:
|
|
# New crawl - create it
|
|
crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
|
|
|
|
if crawl:
|
|
crawl.retry_at = timezone.now()
|
|
if crawl.status not in [Crawl.StatusChoices.SEALED]:
|
|
crawl.status = Crawl.StatusChoices.QUEUED
|
|
crawl.save()
|
|
output_records.append(crawl.to_json())
|
|
queued_count += 1
|
|
|
|
elif record_type == TYPE_SNAPSHOT or (record.get('url') and not record_type):
|
|
if record_id:
|
|
# Existing snapshot - re-queue
|
|
try:
|
|
snapshot = Snapshot.objects.get(id=record_id)
|
|
except Snapshot.DoesNotExist:
|
|
snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
|
|
else:
|
|
# New snapshot - create it
|
|
snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
|
|
|
|
if snapshot:
|
|
snapshot.retry_at = timezone.now()
|
|
if snapshot.status not in [Snapshot.StatusChoices.SEALED]:
|
|
snapshot.status = Snapshot.StatusChoices.QUEUED
|
|
snapshot.save()
|
|
output_records.append(snapshot.to_json())
|
|
queued_count += 1
|
|
|
|
elif record_type == TYPE_ARCHIVERESULT:
|
|
if record_id:
|
|
# Existing archiveresult - re-queue
|
|
try:
|
|
archiveresult = ArchiveResult.objects.get(id=record_id)
|
|
except ArchiveResult.DoesNotExist:
|
|
archiveresult = ArchiveResult.from_json(record)
|
|
else:
|
|
# New archiveresult - create it
|
|
archiveresult = ArchiveResult.from_json(record)
|
|
|
|
if archiveresult:
|
|
archiveresult.retry_at = timezone.now()
|
|
if archiveresult.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED, ArchiveResult.StatusChoices.BACKOFF]:
|
|
archiveresult.status = ArchiveResult.StatusChoices.QUEUED
|
|
archiveresult.save()
|
|
output_records.append(archiveresult.to_json())
|
|
queued_count += 1
|
|
|
|
elif record_type == TYPE_BINARY:
|
|
# Binary records - create or update and queue for installation
|
|
if record_id:
|
|
# Existing binary - re-queue
|
|
try:
|
|
binary = Binary.objects.get(id=record_id)
|
|
except Binary.DoesNotExist:
|
|
binary = Binary.from_json(record)
|
|
else:
|
|
# New binary - create it
|
|
binary = Binary.from_json(record)
|
|
|
|
if binary:
|
|
binary.retry_at = timezone.now()
|
|
if binary.status != Binary.StatusChoices.INSTALLED:
|
|
binary.status = Binary.StatusChoices.QUEUED
|
|
binary.save()
|
|
output_records.append(binary.to_json())
|
|
queued_count += 1
|
|
|
|
else:
|
|
# Unknown type - pass through
|
|
output_records.append(record)
|
|
|
|
except Exception as e:
|
|
rprint(f'[yellow]Error processing record: {e}[/yellow]', file=sys.stderr)
|
|
continue
|
|
|
|
# Output all processed records (for chaining)
|
|
if not is_tty:
|
|
for rec in output_records:
|
|
write_record(rec)
|
|
|
|
if queued_count == 0:
|
|
rprint('[yellow]No records to process[/yellow]', file=sys.stderr)
|
|
return 0
|
|
|
|
rprint(f'[blue]Processing {queued_count} records...[/blue]', file=sys.stderr)
|
|
|
|
# Run orchestrator until all queued work is done
|
|
orchestrator = Orchestrator(exit_on_idle=True)
|
|
orchestrator.runloop()
|
|
|
|
return 0
|
|
|
|
|
|
def run_orchestrator(daemon: bool = False) -> int:
|
|
"""
|
|
Run the orchestrator process.
|
|
|
|
The orchestrator:
|
|
1. Polls each model queue (Crawl, Snapshot, ArchiveResult)
|
|
2. Spawns worker processes when there is work to do
|
|
3. Monitors worker health and restarts failed workers
|
|
4. Exits when all queues are empty (unless --daemon)
|
|
|
|
Args:
|
|
daemon: Run forever (don't exit when idle)
|
|
|
|
Returns exit code (0 = success, 1 = error).
|
|
"""
|
|
from archivebox.workers.orchestrator import Orchestrator
|
|
|
|
if Orchestrator.is_running():
|
|
rprint('[yellow]Orchestrator is already running[/yellow]', file=sys.stderr)
|
|
return 0
|
|
|
|
try:
|
|
orchestrator = Orchestrator(exit_on_idle=not daemon)
|
|
orchestrator.runloop()
|
|
return 0
|
|
except KeyboardInterrupt:
|
|
return 0
|
|
except Exception as e:
|
|
rprint(f'[red]Orchestrator error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
def run_snapshot_worker(snapshot_id: str) -> int:
|
|
"""
|
|
Run a SnapshotWorker for a specific snapshot.
|
|
|
|
Args:
|
|
snapshot_id: Snapshot UUID to process
|
|
|
|
Returns exit code (0 = success, 1 = error).
|
|
"""
|
|
from archivebox.workers.worker import _run_snapshot_worker
|
|
|
|
try:
|
|
_run_snapshot_worker(snapshot_id=snapshot_id, worker_id=0)
|
|
return 0
|
|
except KeyboardInterrupt:
|
|
return 0
|
|
except Exception as e:
|
|
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
|
|
import traceback
|
|
traceback.print_exc()
|
|
return 1
|
|
|
|
|
|
@click.command()
|
|
@click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)")
|
|
@click.option('--crawl-id', help="Run orchestrator for specific crawl only")
|
|
@click.option('--snapshot-id', help="Run worker for specific snapshot only")
|
|
@click.option('--binary-id', help="Run worker for specific binary only")
|
|
@click.option('--worker-type', help="Run worker of specific type (binary)")
|
|
def main(daemon: bool, crawl_id: str, snapshot_id: str, binary_id: str, worker_type: str):
|
|
"""
|
|
Process queued work.
|
|
|
|
Modes:
|
|
- No args + stdin piped: Process piped JSONL records
|
|
- No args + TTY: Run orchestrator for all work
|
|
- --crawl-id: Run orchestrator for that crawl only
|
|
- --snapshot-id: Run worker for that snapshot only
|
|
- --binary-id: Run worker for that binary only
|
|
"""
|
|
# Snapshot worker mode
|
|
if snapshot_id:
|
|
sys.exit(run_snapshot_worker(snapshot_id))
|
|
|
|
# Binary worker mode (specific binary)
|
|
if binary_id:
|
|
from archivebox.workers.worker import BinaryWorker
|
|
try:
|
|
worker = BinaryWorker(binary_id=binary_id, worker_id=0)
|
|
worker.runloop()
|
|
sys.exit(0)
|
|
except KeyboardInterrupt:
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
# Worker type mode (daemon - processes all pending items)
|
|
if worker_type:
|
|
if worker_type == 'binary':
|
|
from archivebox.workers.worker import BinaryWorker
|
|
try:
|
|
worker = BinaryWorker(worker_id=0) # No binary_id = daemon mode
|
|
worker.runloop()
|
|
sys.exit(0)
|
|
except KeyboardInterrupt:
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
else:
|
|
rprint(f'[red]Unknown worker type: {worker_type}[/red]', file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Crawl worker mode
|
|
if crawl_id:
|
|
from archivebox.workers.worker import CrawlWorker
|
|
try:
|
|
worker = CrawlWorker(crawl_id=crawl_id, worker_id=0)
|
|
worker.runloop()
|
|
sys.exit(0)
|
|
except KeyboardInterrupt:
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
# Check if stdin has data (non-TTY means piped input)
|
|
if not sys.stdin.isatty():
|
|
sys.exit(process_stdin_records())
|
|
else:
|
|
sys.exit(run_orchestrator(daemon=daemon))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|