ArchiveBox_ArchiveBox/archivebox/tests/test_cli_add_interrupt.py
2026-01-19 01:00:53 -08:00

134 lines
4.0 KiB
Python

import os
import signal
import sqlite3
import subprocess
import sys
import time
from pathlib import Path
def _run(cmd, data_dir: Path, env: dict, timeout: int = 120):
return subprocess.run(
cmd,
cwd=data_dir,
env=env,
capture_output=True,
text=True,
timeout=timeout,
)
def _make_env(data_dir: Path) -> dict:
env = os.environ.copy()
env["DATA_DIR"] = str(data_dir)
env["USE_COLOR"] = "False"
env["SHOW_PROGRESS"] = "False"
env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
env["PLUGINS"] = "title,favicon"
# Keep it fast but still real hooks
env["SAVE_TITLE"] = "True"
env["SAVE_FAVICON"] = "True"
env["SAVE_WGET"] = "False"
env["SAVE_WARC"] = "False"
env["SAVE_PDF"] = "False"
env["SAVE_SCREENSHOT"] = "False"
env["SAVE_DOM"] = "False"
env["SAVE_SINGLEFILE"] = "False"
env["SAVE_READABILITY"] = "False"
env["SAVE_MERCURY"] = "False"
env["SAVE_GIT"] = "False"
env["SAVE_YTDLP"] = "False"
env["SAVE_HEADERS"] = "False"
env["SAVE_HTMLTOTEXT"] = "False"
return env
def _count_running_processes(db_path: Path, where: str) -> int:
for _ in range(50):
try:
conn = sqlite3.connect(db_path, timeout=1)
cur = conn.cursor()
count = cur.execute(
f"SELECT COUNT(*) FROM machine_process WHERE status = 'running' AND {where}"
).fetchone()[0]
conn.close()
return count
except sqlite3.OperationalError:
time.sleep(0.1)
return 0
def _wait_for_count(db_path: Path, where: str, target: int, timeout: int = 20) -> bool:
start = time.time()
while time.time() - start < timeout:
if _count_running_processes(db_path, where) >= target:
return True
time.sleep(0.1)
return False
def test_add_parents_workers_to_orchestrator(tmp_path):
data_dir = tmp_path / "data"
data_dir.mkdir()
env = _make_env(data_dir)
init = _run([sys.executable, "-m", "archivebox", "init", "--quick"], data_dir, env)
assert init.returncode == 0, init.stderr
add = _run([sys.executable, "-m", "archivebox", "add", "https://example.com"], data_dir, env, timeout=120)
assert add.returncode == 0, add.stderr
conn = sqlite3.connect(data_dir / "index.sqlite3")
cur = conn.cursor()
orchestrator = cur.execute(
"SELECT id FROM machine_process WHERE process_type = 'orchestrator' ORDER BY created_at DESC LIMIT 1"
).fetchone()
assert orchestrator is not None
orchestrator_id = orchestrator[0]
worker_count = cur.execute(
"SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'crawl' "
"AND parent_id = ?",
(orchestrator_id,),
).fetchone()[0]
conn.close()
assert worker_count >= 1, "Expected crawl worker to be parented to orchestrator"
def test_add_interrupt_cleans_orphaned_processes(tmp_path):
data_dir = tmp_path / "data"
data_dir.mkdir()
env = _make_env(data_dir)
init = _run([sys.executable, "-m", "archivebox", "init", "--quick"], data_dir, env)
assert init.returncode == 0, init.stderr
proc = subprocess.Popen(
[sys.executable, "-m", "archivebox", "add", "https://example.com"],
cwd=data_dir,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
db_path = data_dir / "index.sqlite3"
saw_worker = _wait_for_count(db_path, "process_type = 'worker'", 1, timeout=20)
assert saw_worker, "Expected at least one worker to start before interrupt"
proc.send_signal(signal.SIGINT)
proc.wait(timeout=30)
# Wait for workers/hooks to be cleaned up
start = time.time()
while time.time() - start < 30:
running = _count_running_processes(db_path, "process_type IN ('worker','hook')")
if running == 0:
break
time.sleep(0.2)
assert _count_running_processes(db_path, "process_type IN ('worker','hook')") == 0, (
"Expected no running worker/hook processes after interrupt"
)