mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-15 08:22:39 +08:00
10 KiB
10 KiB
Process Model Integration Plan
Current Architecture
Hook Execution Flow
Orchestrator
├─> CrawlWorker
│ └─> Crawl.run() [state machine @started.enter]
│ └─> run_hook() for on_Crawl__* hooks
│ └─> subprocess.Popen (NOT using Process model)
│
└─> SnapshotWorker
└─> Snapshot.run() [planned - doesn't exist yet]
└─> ArchiveResult.run() [state machine @started.enter]
└─> run_hook() for on_Snapshot__* hooks
└─> subprocess.Popen (NOT using Process model)
Problem
- No Process tracking:
run_hook()usessubprocess.Popendirectly, never creates Process records - Orphaned Process model: Process model has
.launch(),.wait(),.terminate()methods that are NEVER used - Manual process management: SnapshotWorker manually uses psutil for waiting/killing
- Duplicate logic: Process model and run_hook() both do subprocess management independently
Unified Architecture
Goal
Make Process model the single source of truth for all subprocess operations:
- Hook execution
- PID tracking
- stdout/stderr capture
- Process lifecycle (launch, wait, terminate)
Design
# hooks.py - Thin wrapper
def run_hook(...) -> Process:
"""
Run a hook using Process model (THIN WRAPPER).
Returns Process model instance for tracking and control.
"""
from archivebox.machine.models import Process
# Build command
cmd = build_hook_cmd(script, kwargs)
# Use Process.launch() - handles everything
process = Process.objects.create(
machine=Machine.current(),
process_type=Process.TypeChoices.HOOK,
pwd=str(output_dir),
cmd=cmd,
env=build_hook_env(config),
timeout=timeout,
)
# Launch subprocess
process.launch(background=is_background_hook(script.name))
return process # Return Process, not dict
# worker.py - Use Process methods
class SnapshotWorker:
def _run_hook(self, hook_path, ar) -> Process:
"""Fork hook using Process model."""
process = run_hook(
hook_path,
ar.create_output_dir(),
self.snapshot.config,
url=self.snapshot.url,
snapshot_id=str(self.snapshot.id),
)
# Link ArchiveResult to Process
ar.process = process
ar.save()
return process
def _wait_for_hook(self, process, ar):
"""Wait using Process.wait() method."""
exit_code = process.wait(timeout=None)
# Update AR from hook output
ar.update_from_output()
ar.status = ar.StatusChoices.SUCCEEDED if exit_code == 0 else ar.StatusChoices.FAILED
ar.save()
def on_shutdown(self):
"""
Terminate all background hooks in parallel with per-plugin timeouts.
Phase 1: Send SIGTERM to all in parallel (polite request to wrap up)
Phase 2: Wait for all in parallel, respecting individual plugin timeouts
Phase 3: SIGKILL any that exceed their timeout
Each plugin has its own timeout (SCREENSHOT_TIMEOUT=60, YTDLP_TIMEOUT=300, etc.)
Some hooks (consolelog, responses) exit immediately on SIGTERM.
Others (ytdlp, wget) need their full timeout to finish actual work.
"""
# Send SIGTERM to all processes in parallel
for hook_name, process in self.background_processes.items():
os.kill(process.pid, signal.SIGTERM)
# Build per-process deadlines based on plugin-specific timeouts
deadlines = {
name: (proc, time.time() + max(0, proc.timeout - (time.time() - proc.started_at.timestamp())))
for name, proc in self.background_processes.items()
}
# Poll all processes in parallel - no head-of-line blocking
still_running = set(deadlines.keys())
while still_running:
time.sleep(0.1)
for name in list(still_running):
proc, deadline = deadlines[name]
if not proc.is_running():
still_running.remove(name)
elif time.time() >= deadline:
os.kill(proc.pid, signal.SIGKILL) # Timeout exceeded
still_running.remove(name)
# models.py - Process becomes active
class Process:
def launch(self, background=False):
"""Spawn subprocess and track it."""
with open(self.stdout_file, 'w') as out, open(self.stderr_file, 'w') as err:
proc = subprocess.Popen(
self.cmd,
cwd=self.pwd,
stdout=out,
stderr=err,
env=self._build_env(),
)
self.pid = proc.pid
self.started_at = timezone.now()
self.status = self.StatusChoices.RUNNING
self.save()
if not background:
# Foreground - wait inline
proc.wait()
self.exit_code = proc.returncode
self.ended_at = timezone.now()
self.status = self.StatusChoices.EXITED
self.save()
return self
def wait(self, timeout=None):
"""Wait for process to exit, polling DB."""
while True:
self.refresh_from_db()
if self.status == self.StatusChoices.EXITED:
return self.exit_code
# Check via psutil if Process died without updating DB
if not self.is_running():
self._reap() # Update status from OS
return self.exit_code
time.sleep(0.1)
def terminate(self, sig=signal.SIGTERM):
"""Gracefully terminate: SIGTERM → wait → SIGKILL."""
if not self.is_running():
return True
os.kill(self.pid, sig)
# Wait for graceful shutdown
for _ in range(50): # 5 seconds
if not self.is_running():
self._reap()
return True
time.sleep(0.1)
# Escalate to SIGKILL
os.kill(self.pid, signal.SIGKILL)
self._reap()
return True
Migration Steps
Step 1: Update Process.launch() (DONE - already exists)
Process model already has .launch(), .wait(), .terminate() methods implemented in machine/models.py:1295-1593
Step 2: Refactor run_hook() to use Process.launch()
File: archivebox/hooks.py
Change signature from:
def run_hook(...) -> HookResult: # Returns dict
To:
def run_hook(...) -> Process: # Returns Process model
Implementation:
def run_hook(script, output_dir, config, timeout=None, **kwargs) -> Process:
from archivebox.machine.models import Process, Machine
# Build command
cmd = build_hook_cmd(script, kwargs)
env = build_hook_env(config)
is_bg = is_background_hook(script.name)
# Create Process record
process = Process.objects.create(
machine=Machine.current(),
process_type=Process.TypeChoices.HOOK,
pwd=str(output_dir),
cmd=cmd,
env=env,
timeout=timeout or 120,
)
# Launch subprocess
process.launch(background=is_bg)
return process
Step 3: Update SnapshotWorker to use Process methods
File: archivebox/workers/worker.py
Replace manual psutil code with Process model methods (shown above in Design section).
Step 4: Update ArchiveResult.run() to use new run_hook()
File: archivebox/core/models.py:2559
Change from:
result = run_hook(...) # Returns HookResult dict
if result is None:
is_bg_hook = True
To:
process = run_hook(...) # Returns Process
self.process = process
self.save()
if process.status == Process.StatusChoices.RUNNING:
# Background hook - still running
return
else:
# Foreground hook - completed
self.update_from_output()
Step 5: Update Crawl.run() similarly
File: archivebox/crawls/models.py:374
Same pattern as ArchiveResult.run()
Benefits
1. Single Source of Truth
- Process model owns ALL subprocess operations
- No duplicate logic between run_hook(), Process, and workers
- Consistent PID tracking, stdout/stderr handling
2. Proper Hierarchy
Process.parent_id creates tree:
Orchestrator (PID 1000)
└─> CrawlWorker (PID 1001, parent=1000)
└─> on_Crawl__01_chrome.js (PID 1010, parent=1001)
└─> SnapshotWorker (PID 1020, parent=1000)
└─> on_Snapshot__50_wget.py (PID 1021, parent=1020)
└─> on_Snapshot__63_ytdlp.bg.py (PID 1022, parent=1020)
3. Better Observability
- Query all hook processes:
snapshot.process_set.all() - Count running:
Process.objects.filter(status='running').count() - Track resource usage via Process.get_memory_info()
4. Cleaner Code
- SnapshotWorker._wait_for_hook: 25 lines → 8 lines
- SnapshotWorker.on_shutdown: 12 lines → 7 lines
- run_hook(): ~200 lines → ~50 lines
- Total: ~100 LoC saved
Risks & Mitigation
Risk 1: Breaking existing run_hook() callers
Mitigation: Two-phase rollout
- Phase 1: Add run_hook_v2() that returns Process
- Phase 2: Migrate callers to run_hook_v2()
- Phase 3: Rename run_hook → run_hook_legacy, run_hook_v2 → run_hook
Risk 2: Background hook tracking changes
Mitigation:
- Process.launch(background=True) handles async launches
- Process.wait() already polls for completion
- Behavior identical to current subprocess.Popen
Risk 3: Performance overhead (extra DB writes)
Mitigation:
- Process records already being created (just not used)
- Batch updates where possible
- Monitor via metrics
Timeline
Immediate (This PR)
- State machine fixes (completed)
- Step advancement optimization (completed)
- Document unified architecture (this file)
Next PR (Process Integration)
- Add run_hook_v2() returning Process
- Update SnapshotWorker to use Process methods
- Migrate ArchiveResult.run() and Crawl.run()
- Deprecate old run_hook()
Future
- Remove run_hook_legacy after migration complete
- Add Process.get_tree() for hierarchy visualization
- Add ProcessMachine state machine for lifecycle management