mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-02-20 00:56:07 +08:00
134 lines
4.7 KiB
Python
134 lines
4.7 KiB
Python
import os
|
|
import sqlite3
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
|
|
def _find_snapshot_dir(data_dir: Path, snapshot_id: str) -> Path | None:
|
|
candidates = {snapshot_id}
|
|
if len(snapshot_id) == 32:
|
|
hyphenated = f"{snapshot_id[:8]}-{snapshot_id[8:12]}-{snapshot_id[12:16]}-{snapshot_id[16:20]}-{snapshot_id[20:]}"
|
|
candidates.add(hyphenated)
|
|
elif len(snapshot_id) == 36 and '-' in snapshot_id:
|
|
candidates.add(snapshot_id.replace('-', ''))
|
|
|
|
for needle in candidates:
|
|
for path in data_dir.rglob(needle):
|
|
if path.is_dir():
|
|
return path
|
|
return None
|
|
|
|
|
|
def _find_html_with_text(root: Path, needle: str) -> list[Path]:
|
|
hits: list[Path] = []
|
|
for path in root.rglob("*.htm*"):
|
|
if not path.is_file():
|
|
continue
|
|
try:
|
|
if needle in path.read_text(errors="ignore"):
|
|
hits.append(path)
|
|
except Exception:
|
|
continue
|
|
return hits
|
|
|
|
|
|
def test_add_real_world_example_domain(tmp_path):
|
|
os.chdir(tmp_path)
|
|
tmp_short = Path("/tmp") / f"abx-{tmp_path.name}"
|
|
tmp_short.mkdir(parents=True, exist_ok=True)
|
|
env = os.environ.copy()
|
|
env["TMP_DIR"] = str(tmp_short)
|
|
env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
|
|
|
|
init = subprocess.run(
|
|
["archivebox", "init"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=120,
|
|
env=env,
|
|
)
|
|
assert init.returncode == 0, f"archivebox init failed: {init.stderr}"
|
|
|
|
result = subprocess.run(
|
|
["archivebox", "add", "https://example.com"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=900,
|
|
env=env,
|
|
)
|
|
assert result.returncode == 0, (
|
|
"archivebox add failed.\n"
|
|
f"stdout:\n{result.stdout}\n"
|
|
f"stderr:\n{result.stderr}"
|
|
)
|
|
|
|
conn = sqlite3.connect(tmp_path / "index.sqlite3")
|
|
c = conn.cursor()
|
|
snapshot_row = c.execute(
|
|
"SELECT id, url, title FROM core_snapshot WHERE url = ?",
|
|
("https://example.com",),
|
|
).fetchone()
|
|
assert snapshot_row is not None, "Snapshot for https://example.com not found in DB"
|
|
snapshot_id, snapshot_url, snapshot_title = snapshot_row
|
|
assert snapshot_title and "Example Domain" in snapshot_title, (
|
|
f"Expected title to contain Example Domain, got: {snapshot_title}"
|
|
)
|
|
|
|
failed_results = c.execute(
|
|
"SELECT COUNT(*) FROM core_archiveresult WHERE snapshot_id = ? AND status = 'failed'",
|
|
(snapshot_id,),
|
|
).fetchone()[0]
|
|
assert failed_results == 0, "Some archive results failed for example.com snapshot"
|
|
|
|
binary_workers = c.execute(
|
|
"SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary'"
|
|
).fetchone()[0]
|
|
assert binary_workers > 0, "Expected BinaryWorker to run installs via BinaryMachine"
|
|
|
|
failed_binary_workers = c.execute(
|
|
"SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'binary' "
|
|
"AND exit_code IS NOT NULL AND exit_code != 0"
|
|
).fetchone()[0]
|
|
assert failed_binary_workers == 0, "BinaryWorker reported non-zero exit codes"
|
|
|
|
queued_binaries = c.execute(
|
|
"SELECT name FROM machine_binary WHERE status != 'installed'"
|
|
).fetchall()
|
|
assert not queued_binaries, f"Some binaries did not install: {queued_binaries}"
|
|
conn.close()
|
|
|
|
snapshot_dir = _find_snapshot_dir(tmp_path, str(snapshot_id))
|
|
assert snapshot_dir is not None, "Snapshot output directory not found"
|
|
|
|
title_path = snapshot_dir / "title" / "title.txt"
|
|
assert title_path.exists(), f"Missing title output: {title_path}"
|
|
assert "Example Domain" in title_path.read_text(errors="ignore")
|
|
|
|
html_sources = []
|
|
for candidate in ("wget", "singlefile", "dom"):
|
|
for candidate_dir in (snapshot_dir / candidate, *snapshot_dir.glob(f"*_{candidate}")):
|
|
if candidate_dir.exists():
|
|
html_sources.extend(_find_html_with_text(candidate_dir, "Example Domain"))
|
|
assert len(html_sources) >= 2, (
|
|
"Expected HTML outputs from multiple extractors to contain Example Domain "
|
|
f"(found {len(html_sources)})."
|
|
)
|
|
|
|
text_hits = 0
|
|
for path in (
|
|
*snapshot_dir.glob("*_readability/content.txt"),
|
|
snapshot_dir / "readability" / "content.txt",
|
|
):
|
|
if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
|
|
text_hits += 1
|
|
for path in (
|
|
*snapshot_dir.glob("*_htmltotext/htmltotext.txt"),
|
|
snapshot_dir / "htmltotext" / "htmltotext.txt",
|
|
):
|
|
if path.exists() and "Example Domain" in path.read_text(errors="ignore"):
|
|
text_hits += 1
|
|
assert text_hits >= 2, (
|
|
"Expected multiple text extractors to contain Example Domain "
|
|
f"(readability/htmltotext hits={text_hits})."
|
|
)
|