Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
840214c8c0 fix: harden codebase test generator output (#667)
Some checks failed
Agent PR Gate / gate (pull_request) Failing after 17s
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 8s
Smoke Test / smoke (pull_request) Failing after 6s
Agent PR Gate / report (pull_request) Has been cancelled
2026-04-17 02:38:33 -04:00
5 changed files with 1000 additions and 929 deletions

View File

@@ -1,73 +0,0 @@
# Issue #582 Verification — Parent-Epic Orchestration Slice
**Date:** 2026-04-20
**Status:** Slice already present on `main`; epic remains open for full archive consumption.
## What #582 asked for
A single orchestration script that stitches the five Know Thy Father phases together
into one reviewable plan — not a replacement for individual scripts, but a spine
that future passes can run, resume, and verify.
## What exists on `main`
| Artifact | Path | Present |
|----------|------|---------|
| Epic pipeline runner | `scripts/know_thy_father/epic_pipeline.py` | ✅ |
| Pipeline documentation | `docs/KNOW_THY_FATHER_MULTIMODAL_PIPELINE.md` | ✅ |
| Phase 1 — Media Indexing | `scripts/know_thy_father/index_media.py` | ✅ |
| Phase 2 — Multimodal Analysis | `scripts/twitter_archive/analyze_media.py` | ✅ |
| Phase 3 — Holographic Synthesis | `scripts/know_thy_father/synthesize_kernels.py` | ✅ |
| Phase 4 — Cross-Reference Audit | `scripts/know_thy_father/crossref_audit.py` | ✅ |
| Phase 5 — Processing Log | `twitter-archive/know-thy-father/tracker.py` | ✅ |
## Runner capabilities (all implemented)
```bash
# Print the orchestrated plan
python3 scripts/know_thy_father/epic_pipeline.py
# JSON status snapshot of scripts + known artifact paths
python3 scripts/know_thy_father/epic_pipeline.py --status --json
# Execute one concrete step
python3 scripts/know_thy_father/epic_pipeline.py --run-step phase2_multimodal_analysis --batch-size 10
```
## Test coverage
The following test suites confirm the orchestration slice is intact:
- `tests/test_know_thy_father_pipeline.py` — pipeline plan structure, status snapshot, doc presence
- `tests/test_know_thy_father_index.py` — Phase 1 media indexing logic
- `tests/test_know_thy_father_synthesis.py` — Phase 3 kernel synthesis
- `tests/test_know_thy_father_crossref.py` — Phase 4 cross-reference audit
- `tests/twitter_archive/test_ktf_tracker.py` — Phase 5 processing tracker
- `tests/twitter_archive/test_analyze_media.py` — Phase 2 multimodal analysis
Run all with:
```bash
python3 -m pytest tests/test_know_thy_father_pipeline.py tests/test_know_thy_father_index.py tests/test_know_thy_father_synthesis.py tests/test_know_thy_father_crossref.py tests/twitter_archive/test_ktf_tracker.py tests/twitter_archive/test_analyze_media.py -q
```
## Why Refs #582, not Closes #582
The **repo-side orchestration slice** is fully implemented on `main`. However, the
parent epic itself remains open because:
1. The local Twitter archive has not been fully consumed through all five phases.
2. Downstream memory/fact-store integration is not yet wired end-to-end.
3. The processing log (`PROCESSING_LOG.md`) reflects halted progress that has not resumed.
This PR adds durable verification evidence without overstating closure.
## Historical trail
- Parent-epic PR that landed the orchestration slice: [closed on main]
- This verification document: added by #789, superseded by this PR #790.
## Linked issues
- Refs #582 (parent epic — remains open)
- Closes #789 (verification task — closed by this PR)

View File

@@ -3,11 +3,9 @@
import ast
import os
import sys
import argparse
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from typing import List, Optional
@dataclass
@@ -24,6 +22,7 @@ class FunctionInfo:
has_return: bool = False
raises: List[str] = field(default_factory=list)
decorators: List[str] = field(default_factory=list)
calls: List[str] = field(default_factory=list)
@property
def qualified_name(self):
@@ -69,21 +68,39 @@ class SourceAnalyzer(ast.NodeVisitor):
args = [a.arg for a in node.args.args if a.arg not in ("self", "cls")]
has_ret = any(isinstance(c, ast.Return) and c.value for c in ast.walk(node))
raises = []
calls = []
for c in ast.walk(node):
if isinstance(c, ast.Raise) and c.exc:
if isinstance(c.exc, ast.Call) and isinstance(c.exc.func, ast.Name):
raises.append(c.exc.func.id)
if isinstance(c, ast.Call):
if isinstance(c.func, ast.Name):
calls.append(c.func.id)
elif isinstance(c.func, ast.Attribute):
calls.append(c.func.attr)
decos = []
for d in node.decorator_list:
if isinstance(d, ast.Name): decos.append(d.id)
elif isinstance(d, ast.Attribute): decos.append(d.attr)
self.functions.append(FunctionInfo(
name=node.name, module_path=self.module_path, class_name=cls,
lineno=node.lineno, args=args, is_async=is_async,
is_private=node.name.startswith("_") and not node.name.startswith("__"),
is_property="property" in decos,
docstring=ast.get_docstring(node), has_return=has_ret,
raises=raises, decorators=decos))
if isinstance(d, ast.Name):
decos.append(d.id)
elif isinstance(d, ast.Attribute):
decos.append(d.attr)
self.functions.append(
FunctionInfo(
name=node.name,
module_path=self.module_path,
class_name=cls,
lineno=node.lineno,
args=args,
is_async=is_async,
is_private=node.name.startswith("_") and not node.name.startswith("__"),
is_property="property" in decos,
docstring=ast.get_docstring(node),
has_return=has_ret,
raises=raises,
decorators=decos,
calls=sorted(set(calls)),
)
)
def analyze_file(filepath, base_dir):
@@ -93,9 +110,9 @@ def analyze_file(filepath, base_dir):
tree = ast.parse(f.read(), filename=filepath)
except (SyntaxError, UnicodeDecodeError):
return []
a = SourceAnalyzer(module_path)
a.visit(tree)
return a.functions
analyzer = SourceAnalyzer(module_path)
analyzer.visit(tree)
return analyzer.functions
def find_source_files(source_dir):
@@ -111,7 +128,9 @@ def find_source_files(source_dir):
def find_existing_tests(test_dir):
existing = set()
for root, dirs, fs in os.walk(test_dir):
if not os.path.isdir(test_dir):
return existing
for root, _, fs in os.walk(test_dir):
for f in fs:
if f.startswith("test_") and f.endswith(".py"):
try:
@@ -132,74 +151,112 @@ def identify_gaps(functions, existing_tests):
continue
covered = func.name in str(existing_tests)
if not covered:
pri = 3 if func.is_private else (1 if (func.raises or func.has_return) else 2)
gaps.append(CoverageGap(func=func, reason="no test found", test_priority=pri))
priority = 3 if func.is_private else (1 if (func.raises or func.has_return) else 2)
gaps.append(CoverageGap(func=func, reason="no test found", test_priority=priority))
gaps.sort(key=lambda g: (g.test_priority, g.func.module_path, g.func.name))
return gaps
def _format_arg_value(arg: str) -> str:
lower = arg.lower()
if lower == "args":
return "type('Args', (), {'files': []})()"
if lower in {"kwargs", "options", "params"}:
return "{}"
if lower in {"history"}:
return "[]"
if any(token in lower for token in ("dict", "data", "config", "report", "perception", "action")):
return "{}"
if any(token in lower for token in ("filepath", "file_path")):
return "str(Path(__file__))"
if lower.endswith("_path") or any(token in lower for token in ("path", "file", "dir")):
return "Path(__file__)"
if any(token in lower for token in ("root",)):
return "Path(__file__).resolve().parent"
if any(token in lower for token in ("response", "cmd", "entity", "message", "text", "content", "query", "name", "key", "label")):
return "'test'"
if any(token in lower for token in ("session", "user")):
return "'test'"
if lower == "width":
return "120"
if lower == "height":
return "40"
if lower == "n":
return "1"
if any(token in lower for token in ("count", "num", "size", "index", "port", "timeout", "wait")):
return "1"
if any(token in lower for token in ("flag", "enabled", "verbose", "quiet", "force", "debug", "dry_run")):
return "False"
return "None"
def _call_args(func: FunctionInfo) -> str:
return ", ".join(f"{arg}={_format_arg_value(arg)}" for arg in func.args if arg not in ("self", "cls"))
def _strict_runtime_exception_expected(func: FunctionInfo) -> bool:
strict_names = {"tmux", "send_key", "send_text", "keypress", "type_and_observe", "cmd_classify_risk"}
return func.name in strict_names
def _path_returning(func: FunctionInfo) -> bool:
return func.name.endswith("_path")
def generate_test(gap):
func = gap.func
lines = []
lines.append(f" # AUTO-GENERATED -- review before merging")
lines.append(" # AUTO-GENERATED -- review before merging")
lines.append(f" # Source: {func.module_path}:{func.lineno}")
lines.append(f" # Function: {func.qualified_name}")
lines.append("")
mod_imp = func.module_path.replace("/", ".").replace("-", "_").replace(".py", "")
call_args = []
for a in func.args:
if a in ("self", "cls"): continue
if "path" in a or "file" in a or "dir" in a: call_args.append(f"{a}='/tmp/test'")
elif "name" in a: call_args.append(f"{a}='test'")
elif "id" in a or "key" in a: call_args.append(f"{a}='test_id'")
elif "message" in a or "text" in a: call_args.append(f"{a}='test msg'")
elif "count" in a or "num" in a or "size" in a: call_args.append(f"{a}=1")
elif "flag" in a or "enabled" in a or "verbose" in a: call_args.append(f"{a}=False")
else: call_args.append(f"{a}=None")
args_str = ", ".join(call_args)
signature = "async def" if func.is_async else "def"
if func.is_async:
lines.append(" @pytest.mark.asyncio")
lines.append(f" def {func.test_name}(self):")
lines.append(f" {signature} {func.test_name}(self):")
lines.append(f' """Test {func.qualified_name} -- auto-generated."""')
lines.append(" try:")
lines.append(" try:")
if func.class_name:
lines.append(f" try:")
lines.append(f" from {mod_imp} import {func.class_name}")
if func.is_private:
lines.append(f" pytest.skip('Private method')")
elif func.is_property:
lines.append(f" obj = {func.class_name}()")
lines.append(f" _ = obj.{func.name}")
lines.append(f" owner = _load_symbol({func.module_path!r}, {func.class_name!r})")
lines.append(" target = owner()")
if func.is_property:
lines.append(f" result = target.{func.name}")
else:
if func.raises:
lines.append(f" with pytest.raises(({', '.join(func.raises)})):")
lines.append(f" {func.class_name}().{func.name}({args_str})")
else:
lines.append(f" obj = {func.class_name}()")
lines.append(f" result = obj.{func.name}({args_str})")
if func.has_return:
lines.append(f" assert result is not None or result is None # Placeholder")
lines.append(f" except ImportError:")
lines.append(f" pytest.skip('Module not importable')")
lines.append(f" target = target.{func.name}")
else:
lines.append(f" try:")
lines.append(f" from {mod_imp} import {func.name}")
if func.is_private:
lines.append(f" pytest.skip('Private function')")
else:
if func.raises:
lines.append(f" with pytest.raises(({', '.join(func.raises)})):")
lines.append(f" {func.name}({args_str})")
else:
lines.append(f" result = {func.name}({args_str})")
if func.has_return:
lines.append(f" assert result is not None or result is None # Placeholder")
lines.append(f" except ImportError:")
lines.append(f" pytest.skip('Module not importable')")
lines.append(f" target = _load_symbol({func.module_path!r}, {func.name!r})")
return chr(10).join(lines)
args_str = _call_args(func)
call_expr = f"target({args_str})" if not func.is_property else "result"
if _strict_runtime_exception_expected(func):
lines.append(" with pytest.raises((RuntimeError, ValueError, TypeError)):")
if func.is_async:
lines.append(f" await {call_expr}")
else:
lines.append(f" {call_expr}")
else:
if not func.is_property:
if func.is_async:
lines.append(f" result = await {call_expr}")
else:
lines.append(f" result = {call_expr}")
if _path_returning(func):
lines.append(" assert isinstance(result, Path)")
elif func.name.startswith(("has_", "is_")):
lines.append(" assert isinstance(result, bool)")
elif func.name.startswith("list_"):
lines.append(" assert isinstance(result, (list, tuple, set, dict, str))")
elif func.has_return:
lines.append(" assert result is not NotImplemented")
else:
lines.append(" assert True # smoke: reached without exception")
lines.append(" except (RuntimeError, ValueError, TypeError, AttributeError, FileNotFoundError, OSError, KeyError) as exc:")
lines.append(" pytest.skip(f'Auto-generated stub needs richer fixture: {exc}')")
lines.append(" except (ImportError, ModuleNotFoundError) as exc:")
lines.append(" pytest.skip(f'Module not importable: {exc}')")
return "\n".join(lines)
def generate_test_suite(gaps, max_tests=50):
@@ -216,10 +273,26 @@ def generate_test_suite(gaps, max_tests=50):
lines.append("These tests are starting points. Review before merging.")
lines.append('"""')
lines.append("")
lines.append("import importlib.util")
lines.append("from pathlib import Path")
lines.append("import pytest")
lines.append("from unittest.mock import MagicMock, patch")
lines.append("")
lines.append("")
lines.append("def _load_symbol(relative_path, symbol):")
lines.append(" module_path = Path(__file__).resolve().parents[1] / relative_path")
lines.append(" if not module_path.exists():")
lines.append(" pytest.skip(f'Module file not found: {module_path}')")
lines.append(" spec_name = 'autogen_' + str(relative_path).replace('/', '_').replace('-', '_').replace('.', '_')")
lines.append(" spec = importlib.util.spec_from_file_location(spec_name, module_path)")
lines.append(" module = importlib.util.module_from_spec(spec)")
lines.append(" try:")
lines.append(" spec.loader.exec_module(module)")
lines.append(" except Exception as exc:")
lines.append(" pytest.skip(f'Module not importable: {exc}')")
lines.append(" return getattr(module, symbol)")
lines.append("")
lines.append("")
lines.append("# AUTO-GENERATED -- DO NOT EDIT WITHOUT REVIEW")
for module, mgaps in sorted(by_module.items()):
@@ -276,7 +349,7 @@ def main():
return
if gaps:
content = generate_test_suite(gaps, max_tests=args.max-tests if hasattr(args, 'max-tests') else args.max_tests)
content = generate_test_suite(gaps, max_tests=args.max_tests)
out = os.path.join(source_dir, args.output)
os.makedirs(os.path.dirname(out), exist_ok=True)
with open(out, "w") as f:

View File

@@ -0,0 +1,55 @@
import importlib.util
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
SCRIPT = ROOT / "scripts" / "codebase_test_generator.py"
def load_module():
spec = importlib.util.spec_from_file_location("codebase_test_generator", str(SCRIPT))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def test_generate_test_suite_uses_dynamic_loader_for_numbered_paths():
mod = load_module()
func = mod.FunctionInfo(
name="linkify",
module_path="reports/notebooklm/2026-03-27-hermes-openclaw/render_reports.py",
lineno=12,
args=["text"],
has_return=True,
)
gap = mod.CoverageGap(func=func, reason="no test found", test_priority=1)
suite = mod.generate_test_suite([gap], max_tests=1)
assert "import importlib.util" in suite
assert "_load_symbol(" in suite
assert "from reports.notebooklm" not in suite
assert "2026-03-27-hermes-openclaw/render_reports.py" in suite
def test_generate_test_handles_async_and_runtime_args_safely():
mod = load_module()
func = mod.FunctionInfo(
name="keypress",
module_path="angband/mcp_server.py",
lineno=200,
args=["key", "wait_ms", "session_name"],
is_async=True,
has_return=True,
calls=["send_key"],
)
gap = mod.CoverageGap(func=func, reason="no test found", test_priority=1)
test_code = mod.generate_test(gap)
assert "@pytest.mark.asyncio" in test_code
assert "async def" in test_code
assert "await target(" in test_code
assert "key='test'" in test_code
assert "wait_ms=1" in test_code
assert "session_name='test'" in test_code
assert "pytest.raises((RuntimeError, ValueError, TypeError))" in test_code

File diff suppressed because it is too large Load Diff

View File

@@ -1,146 +0,0 @@
"""Durable verification that the Issue #582 parent-epic orchestration slice exists on main.
These tests confirm:
1. The epic pipeline runner script is present and importable.
2. The pipeline documentation is committed.
3. All five phase scripts exist at their expected paths.
4. The pipeline plan exposes the correct five phases in order.
5. Each plan step references the correct underlying script.
6. The status snapshot reports script_exists=True for all phases.
7. The status snapshot includes expected artifact output paths.
8. The runner can produce a JSON-serialisable plan.
9. The runner can produce a JSON-serialisable status snapshot.
10. The verification document itself is present.
Refs #582. Closes #789.
"""
import importlib.util
import json
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
EPIC_PIPELINE = ROOT / "scripts" / "know_thy_father" / "epic_pipeline.py"
PIPELINE_DOC = ROOT / "docs" / "KNOW_THY_FATHER_MULTIMODAL_PIPELINE.md"
VERIFICATION_DOC = ROOT / "docs" / "issue-582-verification.md"
EXPECTED_PHASES = [
"phase1_media_indexing",
"phase2_multimodal_analysis",
"phase3_holographic_synthesis",
"phase4_cross_reference_audit",
"phase5_processing_log",
]
EXPECTED_SCRIPTS = {
"phase1_media_indexing": "scripts/know_thy_father/index_media.py",
"phase2_multimodal_analysis": "scripts/twitter_archive/analyze_media.py",
"phase3_holographic_synthesis": "scripts/know_thy_father/synthesize_kernels.py",
"phase4_cross_reference_audit": "scripts/know_thy_father/crossref_audit.py",
"phase5_processing_log": "twitter-archive/know-thy-father/tracker.py",
}
EXPECTED_OUTPUTS = {
"phase1_media_indexing": ["twitter-archive/know-thy-father/media_manifest.jsonl"],
"phase3_holographic_synthesis": ["twitter-archive/knowledge/fathers_ledger.jsonl"],
"phase5_processing_log": ["twitter-archive/know-thy-father/REPORT.md"],
}
def _load_epic_module():
spec = importlib.util.spec_from_file_location("ktf_epic_pipeline", EPIC_PIPELINE)
assert spec and spec.loader, "Cannot load epic_pipeline module spec"
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
class TestIssue582Verification(unittest.TestCase):
"""10-test suite proving the #582 orchestration slice is on main."""
# -- existence checks --------------------------------------------------
def test_01_epic_pipeline_script_exists(self):
"""The orchestration runner is committed."""
self.assertTrue(EPIC_PIPELINE.exists(), f"missing {EPIC_PIPELINE.relative_to(ROOT)}")
def test_02_pipeline_documentation_exists(self):
"""The multimodal pipeline doc is committed."""
self.assertTrue(PIPELINE_DOC.exists(), "missing KNOW_THY_FATHER_MULTIMODAL_PIPELINE.md")
def test_03_all_phase_scripts_exist_on_disk(self):
"""Every script referenced by the pipeline exists in the repo."""
for phase_id, script_rel in EXPECTED_SCRIPTS.items():
path = ROOT / script_rel
self.assertTrue(path.exists(), f"{phase_id}: missing {script_rel}")
# -- plan structure ----------------------------------------------------
def test_04_pipeline_plan_has_five_phases_in_order(self):
mod = _load_epic_module()
plan = mod.build_pipeline_plan(batch_size=10)
ids = [step["id"] for step in plan]
self.assertEqual(ids, EXPECTED_PHASES)
def test_05_plan_commands_reference_correct_scripts(self):
mod = _load_epic_module()
plan = mod.build_pipeline_plan(batch_size=10)
for step in plan:
expected_script = EXPECTED_SCRIPTS[step["id"]]
self.assertIn(
expected_script,
step["command"],
f"{step['id']} command missing {expected_script}",
)
# -- status snapshot ---------------------------------------------------
def test_06_status_snapshot_all_scripts_exist(self):
mod = _load_epic_module()
status = mod.build_status_snapshot(ROOT)
for phase_id in EXPECTED_PHASES:
self.assertIn(phase_id, status)
self.assertTrue(
status[phase_id]["script_exists"],
f"{phase_id} script_exists should be True",
)
def test_07_status_snapshot_reports_expected_outputs(self):
mod = _load_epic_module()
status = mod.build_status_snapshot(ROOT)
for phase_id, expected_paths in EXPECTED_OUTPUTS.items():
actual_paths = [o["path"] for o in status[phase_id]["outputs"]]
for p in expected_paths:
self.assertIn(p, actual_paths, f"{phase_id} missing output path {p}")
# -- JSON serialisation ------------------------------------------------
def test_08_plan_is_json_serialisable(self):
mod = _load_epic_module()
plan = mod.build_pipeline_plan(batch_size=10)
dumped = json.dumps(plan)
restored = json.loads(dumped)
self.assertEqual(len(restored), 5)
def test_09_status_snapshot_is_json_serialisable(self):
mod = _load_epic_module()
status = mod.build_status_snapshot(ROOT)
dumped = json.dumps(status)
restored = json.loads(dumped)
for phase_id in EXPECTED_PHASES:
self.assertIn(phase_id, restored)
# -- verification doc --------------------------------------------------
def test_10_verification_document_exists(self):
"""This verification trail is committed."""
self.assertTrue(
VERIFICATION_DOC.exists(),
"missing docs/issue-582-verification.md",
)
if __name__ == "__main__":
unittest.main()