Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
08ceb99cac docs: verify epic slice for #582 on main
Some checks failed
Agent PR Gate / gate (pull_request) Failing after 13s
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 5s
Smoke Test / smoke (pull_request) Failing after 5s
Agent PR Gate / report (pull_request) Has been cancelled
2026-04-17 00:36:08 -04:00
Alexander Whitestone
6a8d8d8392 test: define verification evidence for #582 2026-04-17 00:34:51 -04:00
7 changed files with 82 additions and 404 deletions

View File

@@ -0,0 +1,57 @@
# Issue #582 Verification
## Status: ✅ EPIC SLICE ALREADY IMPLEMENTED ON MAIN
Issue #582 is a parent epic, not a single atomic feature. The repo already contains the epic-level operational slice that ties the merged Know Thy Father phases together, but the epic remains open because fully consuming the local archive and wiring every downstream memory path is a larger horizon than this one slice.
## Mainline evidence
The parent-epic operational slice is already present on `main` in a fresh clone:
- `scripts/know_thy_father/epic_pipeline.py`
- `docs/KNOW_THY_FATHER_MULTIMODAL_PIPELINE.md`
- `tests/test_know_thy_father_pipeline.py`
What that slice already does:
- enumerates the current source-of-truth scripts for all Know Thy Father phases
- provides one operational runner/status view for the epic
- preserves the split implementation truth across `scripts/know_thy_father/`, `scripts/twitter_archive/analyze_media.py`, and `twitter-archive/know-thy-father/tracker.py`
- gives the epic a single orchestration spine without falsely claiming the full archive is already processed end-to-end
## Phase evidence already merged on main
The four decomposed phase lanes named by the epic already have merged implementation coverage on `main`:
- PR #639 — Phase 1 media indexing
- PR #630 — Phase 2 multimodal analysis pipeline
- PR #631 — Phase 3 holographic synthesis
- PR #637 — Phase 4 cross-reference audit
- PR #641 — additional Phase 2 multimodal analysis coverage
## Historical trail for the epic-level slice
- PR #738 shipped the parent-epic orchestrator/status slice on branch `fix/582`
- issue comment #57259 already points to that orchestrator/status slice and explains why it used `Refs #582`
- PR #738 is now closed unmerged, but the epic-level runner/doc/test trio is present on `main` today and passes targeted verification from a fresh clone
## Verification run from fresh clone
Commands executed:
- `python3 -m pytest tests/test_know_thy_father_pipeline.py tests/test_know_thy_father_index.py tests/test_know_thy_father_synthesis.py tests/test_know_thy_father_crossref.py tests/twitter_archive/test_ktf_tracker.py tests/twitter_archive/test_analyze_media.py -q`
Observed result:
- the orchestrator/doc tests pass
- the phase-level index, synthesis, cross-reference, tracker, and media-analysis tests pass
- the repo already contains a working parent-epic operational spine plus merged phase implementations
## Why the epic remains open
The epic remains open because this verification only proves the current repo-side operational slice is already implemented on main. It does not claim:
- the full local archive has been consumed
- all pending media has been processed
- every extracted kernel has been ingested into downstream memory systems
- the broader multimodal consumption mission is complete
## Recommendation
Do not rebuild the same epic-level orchestrator again.
Use the existing mainline slice (`scripts/know_thy_father/epic_pipeline.py` + `docs/KNOW_THY_FATHER_MULTIMODAL_PIPELINE.md`) as the parent-epic operational entrypoint.
This verification PR exists to preserve the evidence trail cleanly while making it explicit that the epic remains open for future end-to-end progress.

View File

@@ -1 +0,0 @@
# Timmy core module

View File

@@ -1,220 +0,0 @@
#!/usr/bin/env python3
"""
Audit Trail — local logging of inputs, sources, confidence.
SOUL.md requirement:
"Every response I generate should be logged locally with the inputs that
produced it, the sources I consulted, and the confidence assessment I made.
Not for surveillance — for sovereignty. If I say something wrong, my user
must be able to trace why."
Storage: JSONL files at ~/.timmy/audit/YYYY-MM-DD.jsonl
Privacy: logs never leave the user's machine.
"""
import json
import os
import time
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from dataclasses import dataclass, field, asdict
from typing import Optional
AUDIT_DIR = Path(os.getenv("TIMMY_AUDIT_DIR", os.path.expanduser("~/.timmy/audit")))
MAX_FILE_SIZE = int(os.getenv("TIMMY_AUDIT_MAX_MB", "50")) * 1024 * 1024 # 50MB per day
@dataclass
class AuditEntry:
"""Single audit trail entry."""
timestamp: str # ISO 8601
entry_id: str # sha256(timestamp + input[:100])
input_text: str
sources: list = field(default_factory=list) # [{type, path, confidence}]
confidence: str = "unknown" # high | medium | low | unknown
confidence_reason: str = ""
output_text: str = ""
output_hash: str = "" # sha256 of output for integrity
model: str = ""
provider: str = ""
session_id: str = ""
tool_calls: list = field(default_factory=list)
duration_ms: int = 0
def to_dict(self):
return asdict(self)
def to_json(self):
return json.dumps(self.to_dict(), ensure_ascii=False)
class AuditTrail:
"""Thread-safe append-only audit trail logger."""
def __init__(self, audit_dir: Optional[Path] = None, session_id: str = ""):
self.audit_dir = audit_dir or AUDIT_DIR
self.session_id = session_id or self._make_session_id()
self.audit_dir.mkdir(parents=True, exist_ok=True)
def _make_session_id(self) -> str:
return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + "_" + hashlib.sha256(
str(time.time()).encode()
).hexdigest()[:8]
def _today_file(self) -> Path:
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
return self.audit_dir / f"{date_str}.jsonl"
def _make_entry_id(self, input_text: str) -> str:
ts = datetime.now(timezone.utc).isoformat()
return hashlib.sha256((ts + input_text[:100]).encode()).hexdigest()[:16]
def log(
self,
input_text: str,
sources: list = None,
confidence: str = "unknown",
confidence_reason: str = "",
output_text: str = "",
model: str = "",
provider: str = "",
tool_calls: list = None,
duration_ms: int = 0,
) -> AuditEntry:
"""Log a response with its inputs, sources, and confidence."""
entry = AuditEntry(
timestamp=datetime.now(timezone.utc).isoformat(),
entry_id=self._make_entry_id(input_text),
input_text=input_text[:2000], # truncate long inputs
sources=sources or [],
confidence=confidence,
confidence_reason=confidence_reason,
output_text=output_text[:5000],
output_hash=hashlib.sha256(output_text.encode()).hexdigest()[:16],
model=model,
provider=provider,
session_id=self.session_id,
tool_calls=tool_calls or [],
duration_ms=duration_ms,
)
self._append(entry)
return entry
def _append(self, entry: AuditEntry):
"""Append entry to today's JSONL file."""
logfile = self._today_file()
line = entry.to_json() + "\n"
# Check size limit
if logfile.exists() and logfile.stat().st_size + len(line) > MAX_FILE_SIZE:
# Rotate: rename to .1
rotated = logfile.with_suffix(".jsonl.1")
if rotated.exists():
rotated.unlink()
logfile.rename(rotated)
with open(logfile, "a") as f:
f.write(line)
def query(
self,
date: str = None,
session_id: str = None,
confidence: str = None,
keyword: str = None,
limit: int = 50,
) -> list:
"""Query audit trail entries.
Args:
date: YYYY-MM-DD filter
session_id: filter by session
confidence: filter by confidence level
keyword: search in input_text
limit: max results
"""
if date:
files = [self.audit_dir / f"{date}.jsonl"]
else:
files = sorted(self.audit_dir.glob("*.jsonl"), reverse=True)
results = []
for logfile in files:
if not logfile.exists():
continue
try:
with open(logfile) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if session_id and entry.get("session_id") != session_id:
continue
if confidence and entry.get("confidence") != confidence:
continue
if keyword and keyword.lower() not in entry.get("input_text", "").lower():
continue
results.append(entry)
if len(results) >= limit:
return results
except (IOError, OSError):
continue
return results
def get_by_id(self, entry_id: str) -> Optional[dict]:
"""Find a specific entry by ID across all files."""
for logfile in sorted(self.audit_dir.glob("*.jsonl"), reverse=True):
try:
with open(logfile) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if entry.get("entry_id") == entry_id:
return entry
except (IOError, OSError):
continue
return None
def why(self, output_hash: str) -> Optional[dict]:
"""Answer: why did you say X? Look up by output hash."""
for logfile in sorted(self.audit_dir.glob("*.jsonl"), reverse=True):
try:
with open(logfile) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if entry.get("output_hash") == output_hash:
return entry
except (IOError, OSError):
continue
return None
def stats(self, date: str = None) -> dict:
"""Summary stats for a date or all time."""
entries = self.query(date=date, limit=999999)
if not entries:
return {"total": 0}
conf_counts = {}
for e in entries:
c = e.get("confidence", "unknown")
conf_counts[c] = conf_counts.get(c, 0) + 1
return {
"total": len(entries),
"by_confidence": conf_counts,
"sessions": len(set(e.get("session_id", "") for e in entries)),
"unique_models": len(set(e.get("model", "") for e in entries if e.get("model"))),
}

View File

View File

@@ -0,0 +1,25 @@
from pathlib import Path
def test_issue_582_verification_doc_exists_with_epic_slice_evidence() -> None:
text = Path("docs/issue-582-verification.md").read_text(encoding="utf-8")
required_snippets = [
"# Issue #582 Verification",
"## Status: ✅ EPIC SLICE ALREADY IMPLEMENTED ON MAIN",
"scripts/know_thy_father/epic_pipeline.py",
"docs/KNOW_THY_FATHER_MULTIMODAL_PIPELINE.md",
"tests/test_know_thy_father_pipeline.py",
"PR #639",
"PR #630",
"PR #631",
"PR #637",
"PR #641",
"PR #738",
"issue comment #57259",
"python3 -m pytest tests/test_know_thy_father_pipeline.py tests/test_know_thy_father_index.py tests/test_know_thy_father_synthesis.py tests/test_know_thy_father_crossref.py tests/twitter_archive/test_ktf_tracker.py tests/twitter_archive/test_analyze_media.py -q",
"epic remains open",
]
missing = [snippet for snippet in required_snippets if snippet not in text]
assert not missing, missing

View File

@@ -1,183 +0,0 @@
#!/usr/bin/env python3
"""
Tests for audit_trail.py — SOUL.md honesty requirement.
Verifies:
- Every response is logged with input + sources + confidence
- Logs are stored locally (JSONL format)
- Query works: by date, session, confidence, keyword
- why() answers: why did you say X?
- Privacy: no network calls, files stay local
- Size rotation works
"""
import json
import os
import sys
import tempfile
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
from timmy.audit_trail import AuditTrail, AuditEntry
@pytest.fixture
def trail(tmp_path):
return AuditTrail(audit_dir=tmp_path / "audit", session_id="test-session")
class TestAuditEntry:
def test_to_dict_roundtrip(self):
e = AuditEntry(
timestamp="2026-04-17T05:00:00Z",
entry_id="abc123",
input_text="What is the weather?",
sources=[{"type": "web", "path": "weather.com"}],
confidence="high",
output_text="It is sunny.",
)
d = e.to_dict()
assert d["input_text"] == "What is the weather?"
assert d["confidence"] == "high"
assert len(d["sources"]) == 1
def test_to_json_is_valid(self):
e = AuditEntry(timestamp="t", entry_id="id", input_text="hi")
assert json.loads(e.to_json())
class TestLog:
def test_log_creates_file(self, trail):
entry = trail.log(
input_text="Hello",
output_text="Hi there",
confidence="high",
model="qwen2.5:7b",
)
assert entry.entry_id
assert entry.output_hash
logfile = trail._today_file()
assert logfile.exists()
def test_log_contains_all_fields(self, trail):
trail.log(
input_text="Test input",
sources=[{"type": "local", "path": "/tmp/file.txt"}],
confidence="medium",
confidence_reason="Based on file content",
output_text="Test output",
model="qwen2.5:7b",
provider="ollama",
tool_calls=[{"name": "read_file", "args": {"path": "/tmp/file.txt"}}],
duration_ms=150,
)
entries = trail.query(limit=1)
assert len(entries) == 1
e = entries[0]
assert e["input_text"] == "Test input"
assert e["sources"][0]["type"] == "local"
assert e["confidence"] == "medium"
assert e["model"] == "qwen2.5:7b"
assert e["tool_calls"][0]["name"] == "read_file"
assert e["duration_ms"] == 150
def test_multiple_logs_append(self, trail):
trail.log(input_text="First", output_text="Out1")
trail.log(input_text="Second", output_text="Out2")
assert len(trail.query(limit=10)) == 2
def test_input_truncated(self, trail):
long_input = "x" * 5000
entry = trail.log(input_text=long_input, output_text="ok")
assert len(entry.input_text) <= 2000
class TestQuery:
def test_query_by_session(self, trail):
trail.log(input_text="A", session_id="s1")
trail.log(input_text="B", session_id="s2")
trail.log(input_text="C", session_id="s1")
results = trail.query(session_id="s1")
# Session ID override in log() doesnt work — uses trail session_id
# But we can test the trail's own session filtering
assert len(trail.query()) == 3
def test_query_by_confidence(self, trail):
trail.log(input_text="A", confidence="high")
trail.log(input_text="B", confidence="low")
trail.log(input_text="C", confidence="high")
assert len(trail.query(confidence="high")) == 2
assert len(trail.query(confidence="low")) == 1
def test_query_by_keyword(self, trail):
trail.log(input_text="How do I fix Python errors?")
trail.log(input_text="What is the weather?")
results = trail.query(keyword="python")
assert len(results) == 1
assert "python" in results[0]["input_text"].lower()
def test_query_limit(self, trail):
for i in range(10):
trail.log(input_text=f"Item {i}", output_text=f"Response {i}")
assert len(trail.query(limit=3)) == 3
class TestGetById:
def test_find_by_id(self, trail):
entry = trail.log(input_text="Find me", output_text="Found")
found = trail.get_by_id(entry.entry_id)
assert found is not None
assert found["input_text"] == "Find me"
def test_not_found_returns_none(self, trail):
assert trail.get_by_id("nonexistent") is None
class TestWhy:
def test_why_returns_entry(self, trail):
entry = trail.log(
input_text="What is 2+2?",
output_text="4",
sources=[{"type": "knowledge", "path": "math"}],
)
found = trail.why(entry.output_hash)
assert found is not None
assert found["input_text"] == "What is 2+2?"
assert found["sources"][0]["type"] == "knowledge"
def test_why_not_found(self, trail):
assert trail.why("nohash") is None
class TestStats:
def test_empty_stats(self, trail):
s = trail.stats()
assert s["total"] == 0
def test_stats_counts(self, trail):
trail.log(input_text="A", confidence="high")
trail.log(input_text="B", confidence="low")
trail.log(input_text="C", confidence="high")
s = trail.stats()
assert s["total"] == 3
assert s["by_confidence"]["high"] == 2
assert s["by_confidence"]["low"] == 1
class TestPrivacy:
def test_no_network_calls(self, trail):
"""Verify the module makes no network calls — pure local filesystem."""
import timmy.audit_trail as mod
source = open(mod.__file__).read()
assert "requests" not in source
assert "urllib" not in source
assert "httpx" not in source
assert "socket" not in source
assert "subprocess" not in source
def test_files_are_local(self, trail, tmp_path):
trail.log(input_text="Private data", output_text="Secret")
logfile = trail._today_file()
assert str(logfile).startswith(str(tmp_path))