ArchiveBox_ArchiveBox/old/TODO_Process_cleanup_unification.md

10 KiB

Process Model Integration Plan

Current Architecture

Hook Execution Flow

Orchestrator
  ├─> CrawlWorker
  │     └─> Crawl.run() [state machine @started.enter]
  │           └─> run_hook() for on_Crawl__* hooks
  │                 └─> subprocess.Popen (NOT using Process model)
  │
  └─> SnapshotWorker
        └─> Snapshot.run() [planned - doesn't exist yet]
              └─> ArchiveResult.run() [state machine @started.enter]
                    └─> run_hook() for on_Snapshot__* hooks
                          └─> subprocess.Popen (NOT using Process model)

Problem

  1. No Process tracking: run_hook() uses subprocess.Popen directly, never creates Process records
  2. Orphaned Process model: Process model has .launch(), .wait(), .terminate() methods that are NEVER used
  3. Manual process management: SnapshotWorker manually uses psutil for waiting/killing
  4. Duplicate logic: Process model and run_hook() both do subprocess management independently

Unified Architecture

Goal

Make Process model the single source of truth for all subprocess operations:

  • Hook execution
  • PID tracking
  • stdout/stderr capture
  • Process lifecycle (launch, wait, terminate)

Design

# hooks.py - Thin wrapper
def run_hook(...) -> Process:
    """
    Run a hook using Process model (THIN WRAPPER).

    Returns Process model instance for tracking and control.
    """
    from archivebox.machine.models import Process

    # Build command
    cmd = build_hook_cmd(script, kwargs)

    # Use Process.launch() - handles everything
    process = Process.objects.create(
        machine=Machine.current(),
        process_type=Process.TypeChoices.HOOK,
        pwd=str(output_dir),
        cmd=cmd,
        env=build_hook_env(config),
        timeout=timeout,
    )

    # Launch subprocess
    process.launch(background=is_background_hook(script.name))

    return process  # Return Process, not dict


# worker.py - Use Process methods
class SnapshotWorker:
    def _run_hook(self, hook_path, ar) -> Process:
        """Fork hook using Process model."""
        process = run_hook(
            hook_path,
            ar.create_output_dir(),
            self.snapshot.config,
            url=self.snapshot.url,
            snapshot_id=str(self.snapshot.id),
        )

        # Link ArchiveResult to Process
        ar.process = process
        ar.save()

        return process

    def _wait_for_hook(self, process, ar):
        """Wait using Process.wait() method."""
        exit_code = process.wait(timeout=None)

        # Update AR from hook output
        ar.update_from_output()
        ar.status = ar.StatusChoices.SUCCEEDED if exit_code == 0 else ar.StatusChoices.FAILED
        ar.save()

    def on_shutdown(self):
        """
        Terminate all background hooks in parallel with per-plugin timeouts.

        Phase 1: Send SIGTERM to all in parallel (polite request to wrap up)
        Phase 2: Wait for all in parallel, respecting individual plugin timeouts
        Phase 3: SIGKILL any that exceed their timeout

        Each plugin has its own timeout (SCREENSHOT_TIMEOUT=60, YTDLP_TIMEOUT=300, etc.)
        Some hooks (consolelog, responses) exit immediately on SIGTERM.
        Others (ytdlp, wget) need their full timeout to finish actual work.
        """
        # Send SIGTERM to all processes in parallel
        for hook_name, process in self.background_processes.items():
            os.kill(process.pid, signal.SIGTERM)

        # Build per-process deadlines based on plugin-specific timeouts
        deadlines = {
            name: (proc, time.time() + max(0, proc.timeout - (time.time() - proc.started_at.timestamp())))
            for name, proc in self.background_processes.items()
        }

        # Poll all processes in parallel - no head-of-line blocking
        still_running = set(deadlines.keys())
        while still_running:
            time.sleep(0.1)
            for name in list(still_running):
                proc, deadline = deadlines[name]
                if not proc.is_running():
                    still_running.remove(name)
                elif time.time() >= deadline:
                    os.kill(proc.pid, signal.SIGKILL)  # Timeout exceeded
                    still_running.remove(name)


# models.py - Process becomes active
class Process:
    def launch(self, background=False):
        """Spawn subprocess and track it."""
        with open(self.stdout_file, 'w') as out, open(self.stderr_file, 'w') as err:
            proc = subprocess.Popen(
                self.cmd,
                cwd=self.pwd,
                stdout=out,
                stderr=err,
                env=self._build_env(),
            )

            self.pid = proc.pid
            self.started_at = timezone.now()
            self.status = self.StatusChoices.RUNNING
            self.save()

            if not background:
                # Foreground - wait inline
                proc.wait()
                self.exit_code = proc.returncode
                self.ended_at = timezone.now()
                self.status = self.StatusChoices.EXITED
                self.save()

        return self

    def wait(self, timeout=None):
        """Wait for process to exit, polling DB."""
        while True:
            self.refresh_from_db()
            if self.status == self.StatusChoices.EXITED:
                return self.exit_code

            # Check via psutil if Process died without updating DB
            if not self.is_running():
                self._reap()  # Update status from OS
                return self.exit_code

            time.sleep(0.1)

    def terminate(self, sig=signal.SIGTERM):
        """Gracefully terminate: SIGTERM → wait → SIGKILL."""
        if not self.is_running():
            return True

        os.kill(self.pid, sig)

        # Wait for graceful shutdown
        for _ in range(50):  # 5 seconds
            if not self.is_running():
                self._reap()
                return True
            time.sleep(0.1)

        # Escalate to SIGKILL
        os.kill(self.pid, signal.SIGKILL)
        self._reap()
        return True

Migration Steps

Step 1: Update Process.launch() (DONE - already exists)

Process model already has .launch(), .wait(), .terminate() methods implemented in machine/models.py:1295-1593

Step 2: Refactor run_hook() to use Process.launch()

File: archivebox/hooks.py

Change signature from:

def run_hook(...) -> HookResult:  # Returns dict

To:

def run_hook(...) -> Process:  # Returns Process model

Implementation:

def run_hook(script, output_dir, config, timeout=None, **kwargs) -> Process:
    from archivebox.machine.models import Process, Machine

    # Build command
    cmd = build_hook_cmd(script, kwargs)
    env = build_hook_env(config)
    is_bg = is_background_hook(script.name)

    # Create Process record
    process = Process.objects.create(
        machine=Machine.current(),
        process_type=Process.TypeChoices.HOOK,
        pwd=str(output_dir),
        cmd=cmd,
        env=env,
        timeout=timeout or 120,
    )

    # Launch subprocess
    process.launch(background=is_bg)

    return process

Step 3: Update SnapshotWorker to use Process methods

File: archivebox/workers/worker.py

Replace manual psutil code with Process model methods (shown above in Design section).

Step 4: Update ArchiveResult.run() to use new run_hook()

File: archivebox/core/models.py:2559

Change from:

result = run_hook(...)  # Returns HookResult dict
if result is None:
    is_bg_hook = True

To:

process = run_hook(...)  # Returns Process
self.process = process
self.save()

if process.status == Process.StatusChoices.RUNNING:
    # Background hook - still running
    return
else:
    # Foreground hook - completed
    self.update_from_output()

Step 5: Update Crawl.run() similarly

File: archivebox/crawls/models.py:374

Same pattern as ArchiveResult.run()

Benefits

1. Single Source of Truth

  • Process model owns ALL subprocess operations
  • No duplicate logic between run_hook(), Process, and workers
  • Consistent PID tracking, stdout/stderr handling

2. Proper Hierarchy

Process.parent_id creates tree:
Orchestrator (PID 1000)
  └─> CrawlWorker (PID 1001, parent=1000)
        └─> on_Crawl__01_chrome.js (PID 1010, parent=1001)
  └─> SnapshotWorker (PID 1020, parent=1000)
        └─> on_Snapshot__50_wget.py (PID 1021, parent=1020)
        └─> on_Snapshot__63_ytdlp.bg.py (PID 1022, parent=1020)

3. Better Observability

  • Query all hook processes: snapshot.process_set.all()
  • Count running: Process.objects.filter(status='running').count()
  • Track resource usage via Process.get_memory_info()

4. Cleaner Code

  • SnapshotWorker._wait_for_hook: 25 lines → 8 lines
  • SnapshotWorker.on_shutdown: 12 lines → 7 lines
  • run_hook(): ~200 lines → ~50 lines
  • Total: ~100 LoC saved

Risks & Mitigation

Risk 1: Breaking existing run_hook() callers

Mitigation: Two-phase rollout

  1. Phase 1: Add run_hook_v2() that returns Process
  2. Phase 2: Migrate callers to run_hook_v2()
  3. Phase 3: Rename run_hook → run_hook_legacy, run_hook_v2 → run_hook

Risk 2: Background hook tracking changes

Mitigation:

  • Process.launch(background=True) handles async launches
  • Process.wait() already polls for completion
  • Behavior identical to current subprocess.Popen

Risk 3: Performance overhead (extra DB writes)

Mitigation:

  • Process records already being created (just not used)
  • Batch updates where possible
  • Monitor via metrics

Timeline

Immediate (This PR)

  • State machine fixes (completed)
  • Step advancement optimization (completed)
  • Document unified architecture (this file)

Next PR (Process Integration)

  1. Add run_hook_v2() returning Process
  2. Update SnapshotWorker to use Process methods
  3. Migrate ArchiveResult.run() and Crawl.run()
  4. Deprecate old run_hook()

Future

  • Remove run_hook_legacy after migration complete
  • Add Process.get_tree() for hierarchy visualization
  • Add ProcessMachine state machine for lifecycle management