Files
timmy-home/tests/test_know_thy_father_pipeline.py
Alexander Whitestone 89dfa1e5de
Some checks failed
Smoke Test / smoke (pull_request) Failing after 23s
feat: add Know Thy Father epic orchestrator (#582)
2026-04-15 01:52:58 -04:00

77 lines
3.5 KiB
Python

from pathlib import Path
import importlib.util
import unittest
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "know_thy_father" / "epic_pipeline.py"
DOC_PATH = ROOT / "docs" / "KNOW_THY_FATHER_MULTIMODAL_PIPELINE.md"
def load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class TestKnowThyFatherEpicPipeline(unittest.TestCase):
def test_build_pipeline_plan_contains_all_phases_in_order(self):
mod = load_module(SCRIPT_PATH, "ktf_epic_pipeline")
plan = mod.build_pipeline_plan(batch_size=10)
self.assertEqual(
[step["id"] for step in plan],
[
"phase1_media_indexing",
"phase2_multimodal_analysis",
"phase3_holographic_synthesis",
"phase4_cross_reference_audit",
"phase5_processing_log",
],
)
self.assertIn("scripts/know_thy_father/index_media.py", plan[0]["command"])
self.assertIn("scripts/twitter_archive/analyze_media.py --batch 10", plan[1]["command"])
self.assertIn("scripts/know_thy_father/synthesize_kernels.py", plan[2]["command"])
self.assertIn("scripts/know_thy_father/crossref_audit.py", plan[3]["command"])
self.assertIn("twitter-archive/know-thy-father/tracker.py report", plan[4]["command"])
def test_status_snapshot_reports_key_artifact_paths(self):
mod = load_module(SCRIPT_PATH, "ktf_epic_pipeline")
status = mod.build_status_snapshot(ROOT)
self.assertIn("phase1_media_indexing", status)
self.assertIn("phase2_multimodal_analysis", status)
self.assertIn("phase3_holographic_synthesis", status)
self.assertIn("phase4_cross_reference_audit", status)
self.assertIn("phase5_processing_log", status)
self.assertEqual(status["phase1_media_indexing"]["script"], "scripts/know_thy_father/index_media.py")
self.assertEqual(status["phase2_multimodal_analysis"]["script"], "scripts/twitter_archive/analyze_media.py")
self.assertEqual(status["phase5_processing_log"]["script"], "twitter-archive/know-thy-father/tracker.py")
self.assertTrue(status["phase1_media_indexing"]["script_exists"])
self.assertTrue(status["phase2_multimodal_analysis"]["script_exists"])
self.assertTrue(status["phase3_holographic_synthesis"]["script_exists"])
self.assertTrue(status["phase4_cross_reference_audit"]["script_exists"])
self.assertTrue(status["phase5_processing_log"]["script_exists"])
def test_repo_contains_multimodal_pipeline_doc(self):
self.assertTrue(DOC_PATH.exists(), "missing committed Know Thy Father pipeline doc")
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# Know Thy Father — Multimodal Media Consumption Pipeline",
"scripts/know_thy_father/index_media.py",
"scripts/twitter_archive/analyze_media.py --batch 10",
"scripts/know_thy_father/synthesize_kernels.py",
"scripts/know_thy_father/crossref_audit.py",
"twitter-archive/know-thy-father/tracker.py report",
"Refs #582",
]
for snippet in required:
self.assertIn(snippet, text)
if __name__ == "__main__":
unittest.main()