ArchiveBox_ArchiveBox/archivebox/tests/test_real_world_add.py
2026-01-19 01:00:53 -08:00

134 lines
4.7 KiB
Python

import os
import sqlite3
import subprocess
from pathlib import Path
def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None:
candidates = {snapshot_id}
if len(snapshot_id) == 32:
hyphenated = f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}"
candidates.add(hyphenated)
elif len(snapshot_id) == 36 and '-' in snapshot_id:
candidates.add(snapshot_id.replace('-', ''))
for needle in candidates:
for path in data_dir.rglob(needle):
if path.is_dir():
return path
return None
def _find_html_with_text(root: Path, needle: str) -> list[Path]:
hits: list[Path] = []
for path in root.rglob("*.htm*"):
if not path.is_file():
continue
try:
if needle in path.read_text(errors="ignore"):
hits.append(path)
except Exception:
continue
return hits
def test_add_real_world_example_domain(tmp_path):
os.chdir(tmp_path)
tmp_short = Path("/tmp") / f"abx-{tmp_path.name}"
tmp_short.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
env["TMP_DIR"] = str(tmp_short)
env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
init = subprocess.run(
["archivebox", "init"],
capture_output=True,
text=True,
timeout=120,
env=env,
)
assert init.returncode == 0, f"archivebox init failed: {init.stderr}"
result = subprocess.run(
["archivebox", "add", "https://example.com"],
capture_output=True,
text=True,
timeout=900,
env=env,
)
assert result.returncode == 0, (
"archivebox add failed.\n"
f"stdout:\n{result.stdout}\n"
f"stderr:\n{result.stderr}"
)
conn = sqlite3.connect(tmp_path / "index.sqlite3")
c = conn.cursor()
snapshot_row = c.execute(
"SELECT id, url, title FROM core_snapshot WHERE url = ?",
("https://example.com",),
).fetchone()
assert snapshot_row is not None, "Snapshot for https://example.com not found in DB"
snapshot_id, snapshot_url, snapshot_title = snapshot_row
assert snapshot_title and "Example Domain" in snapshot_title, (
f"Expected title to contain Example Domain, got: {snapshot_title}"
)
failed_results = c.execute(
"SELECT COUNT(*) FROM core_archiveresult WHERE snapshot_id = ? AND status = 'failed'",
(snapshot_id,),
).fetchone()[0]
assert failed_results == 0, "Some archive results failed for example.com snapshot"
binary_workers = c.execute(
"SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary'"
).fetchone()[0]
assert binary_workers > 0, "Expected BinaryWorker to run installs via BinaryMachine"
failed_binary_workers = c.execute(
"SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary' "
"AND exit_code IS NOT NULL AND exit_code != 0"
).fetchone()[0]
assert failed_binary_workers == 0, "BinaryWorker reported non-zero exit codes"
queued_binaries = c.execute(
"SELECT name FROM machine_binary WHERE status != 'installed'"
).fetchall()
assert not queued_binaries, f"Some binaries did not install: {queued_binaries}"
conn.close()
snapshot_dir = _find_snapshot_dir(tmp_path, str(snapshot_id))
assert snapshot_dir is not None, "Snapshot output directory not found"
title_path = snapshot_dir / "title" / "title.txt"
assert title_path.exists(), f"Missing title output: {title_path}"
assert "Example Domain" in title_path.read_text(errors="ignore")
html_sources = []
for candidate in ("wget", "singlefile", "dom"):
for candidate_dir in (snapshot_dir / candidate, *snapshot_dir.glob(f"*_{candidate}")):
if candidate_dir.exists():
html_sources.extend(_find_html_with_text(candidate_dir, "Example Domain"))
assert len(html_sources) >= 2, (
"Expected HTML outputs from multiple extractors to contain Example Domain "
f"(found {len(html_sources)})."
)
text_hits = 0
for path in (
*snapshot_dir.glob("*_readability/content.txt"),
snapshot_dir / "readability" / "content.txt",
):
if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
text_hits += 1
for path in (
*snapshot_dir.glob("*_htmltotext/htmltotext.txt"),
snapshot_dir / "htmltotext" / "htmltotext.txt",
):
if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
text_hits += 1
assert text_hits >= 2, (
"Expected multiple text extractors to contain Example Domain "
f"(readability/htmltotext hits={text_hits})."
)