Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
909b88af56 fix: use prior active window baseline for #749
Some checks failed
Agent PR Gate / gate (pull_request) Failing after 12s
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 5s
Smoke Test / smoke (pull_request) Failing after 5s
Agent PR Gate / report (pull_request) Has been cancelled
2026-04-17 00:19:50 -04:00
Alexander Whitestone
f9f342cee7 test: capture sparse baseline fallback for #749 2026-04-17 00:17:21 -04:00
8 changed files with 24 additions and 411 deletions

View File

@@ -12,8 +12,8 @@ The predictor reads two data sources:
2. **Heartbeat logs** (`heartbeat/ticks_*.jsonl`) — Gitea availability,
local inference health
It compares a **recent window** (last N hours) against a **baseline window**
(previous N hours) to detect surges and degradation.
It compares a **recent window** (last N hours of activity) against the **previous active window**
(previous N hours ending at the most recent event before the current window) so sparse telemetry still yields a meaningful baseline.
## Output Contract

View File

@@ -90,13 +90,19 @@ def compute_rates(
latest = max(_parse_ts(r["timestamp"]) for r in rows)
recent_cutoff = latest - timedelta(hours=horizon_hours)
baseline_cutoff = latest - timedelta(hours=horizon_hours * 2)
recent = [r for r in rows if _parse_ts(r["timestamp"]) >= recent_cutoff]
baseline = [
r for r in rows
if baseline_cutoff <= _parse_ts(r["timestamp"]) < recent_cutoff
]
earlier = [r for r in rows if _parse_ts(r["timestamp"]) < recent_cutoff]
if earlier:
previous_latest = max(_parse_ts(r["timestamp"]) for r in earlier)
previous_cutoff = previous_latest - timedelta(hours=horizon_hours)
baseline = [
r for r in earlier
if _parse_ts(r["timestamp"]) >= previous_cutoff
]
else:
baseline = []
recent_rate = len(recent) / max(horizon_hours, 1)
baseline_rate = (

View File

@@ -1 +0,0 @@
# Timmy core module

View File

@@ -1,220 +0,0 @@
#!/usr/bin/env python3
"""
Audit Trail — local logging of inputs, sources, confidence.
SOUL.md requirement:
"Every response I generate should be logged locally with the inputs that
produced it, the sources I consulted, and the confidence assessment I made.
Not for surveillance — for sovereignty. If I say something wrong, my user
must be able to trace why."
Storage: JSONL files at ~/.timmy/audit/YYYY-MM-DD.jsonl
Privacy: logs never leave the user's machine.
"""
import json
import os
import time
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from dataclasses import dataclass, field, asdict
from typing import Optional
AUDIT_DIR = Path(os.getenv("TIMMY_AUDIT_DIR", os.path.expanduser("~/.timmy/audit")))
MAX_FILE_SIZE = int(os.getenv("TIMMY_AUDIT_MAX_MB", "50")) * 1024 * 1024 # 50MB per day
@dataclass
class AuditEntry:
"""Single audit trail entry."""
timestamp: str # ISO 8601
entry_id: str # sha256(timestamp + input[:100])
input_text: str
sources: list = field(default_factory=list) # [{type, path, confidence}]
confidence: str = "unknown" # high | medium | low | unknown
confidence_reason: str = ""
output_text: str = ""
output_hash: str = "" # sha256 of output for integrity
model: str = ""
provider: str = ""
session_id: str = ""
tool_calls: list = field(default_factory=list)
duration_ms: int = 0
def to_dict(self):
return asdict(self)
def to_json(self):
return json.dumps(self.to_dict(), ensure_ascii=False)
class AuditTrail:
"""Thread-safe append-only audit trail logger."""
def __init__(self, audit_dir: Optional[Path] = None, session_id: str = ""):
self.audit_dir = audit_dir or AUDIT_DIR
self.session_id = session_id or self._make_session_id()
self.audit_dir.mkdir(parents=True, exist_ok=True)
def _make_session_id(self) -> str:
return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + "_" + hashlib.sha256(
str(time.time()).encode()
).hexdigest()[:8]
def _today_file(self) -> Path:
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
return self.audit_dir / f"{date_str}.jsonl"
def _make_entry_id(self, input_text: str) -> str:
ts = datetime.now(timezone.utc).isoformat()
return hashlib.sha256((ts + input_text[:100]).encode()).hexdigest()[:16]
def log(
self,
input_text: str,
sources: list = None,
confidence: str = "unknown",
confidence_reason: str = "",
output_text: str = "",
model: str = "",
provider: str = "",
tool_calls: list = None,
duration_ms: int = 0,
) -> AuditEntry:
"""Log a response with its inputs, sources, and confidence."""
entry = AuditEntry(
timestamp=datetime.now(timezone.utc).isoformat(),
entry_id=self._make_entry_id(input_text),
input_text=input_text[:2000], # truncate long inputs
sources=sources or [],
confidence=confidence,
confidence_reason=confidence_reason,
output_text=output_text[:5000],
output_hash=hashlib.sha256(output_text.encode()).hexdigest()[:16],
model=model,
provider=provider,
session_id=self.session_id,
tool_calls=tool_calls or [],
duration_ms=duration_ms,
)
self._append(entry)
return entry
def _append(self, entry: AuditEntry):
"""Append entry to today's JSONL file."""
logfile = self._today_file()
line = entry.to_json() + "\n"
# Check size limit
if logfile.exists() and logfile.stat().st_size + len(line) > MAX_FILE_SIZE:
# Rotate: rename to .1
rotated = logfile.with_suffix(".jsonl.1")
if rotated.exists():
rotated.unlink()
logfile.rename(rotated)
with open(logfile, "a") as f:
f.write(line)
def query(
self,
date: str = None,
session_id: str = None,
confidence: str = None,
keyword: str = None,
limit: int = 50,
) -> list:
"""Query audit trail entries.
Args:
date: YYYY-MM-DD filter
session_id: filter by session
confidence: filter by confidence level
keyword: search in input_text
limit: max results
"""
if date:
files = [self.audit_dir / f"{date}.jsonl"]
else:
files = sorted(self.audit_dir.glob("*.jsonl"), reverse=True)
results = []
for logfile in files:
if not logfile.exists():
continue
try:
with open(logfile) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if session_id and entry.get("session_id") != session_id:
continue
if confidence and entry.get("confidence") != confidence:
continue
if keyword and keyword.lower() not in entry.get("input_text", "").lower():
continue
results.append(entry)
if len(results) >= limit:
return results
except (IOError, OSError):
continue
return results
def get_by_id(self, entry_id: str) -> Optional[dict]:
"""Find a specific entry by ID across all files."""
for logfile in sorted(self.audit_dir.glob("*.jsonl"), reverse=True):
try:
with open(logfile) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if entry.get("entry_id") == entry_id:
return entry
except (IOError, OSError):
continue
return None
def why(self, output_hash: str) -> Optional[dict]:
"""Answer: why did you say X? Look up by output hash."""
for logfile in sorted(self.audit_dir.glob("*.jsonl"), reverse=True):
try:
with open(logfile) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if entry.get("output_hash") == output_hash:
return entry
except (IOError, OSError):
continue
return None
def stats(self, date: str = None) -> dict:
"""Summary stats for a date or all time."""
entries = self.query(date=date, limit=999999)
if not entries:
return {"total": 0}
conf_counts = {}
for e in entries:
c = e.get("confidence", "unknown")
conf_counts[c] = conf_counts.get(c, 0) + 1
return {
"total": len(entries),
"by_confidence": conf_counts,
"sessions": len(set(e.get("session_id", "") for e in entries)),
"unique_models": len(set(e.get("model", "") for e in entries if e.get("model"))),
}

View File

View File

@@ -99,6 +99,17 @@ class TestComputeRates:
_, _, surge, _, _ = compute_rates(rows, horizon_hours=6)
assert surge < 1.5
def test_falls_back_to_prior_activity_when_previous_window_is_empty(self):
baseline = _make_metrics(3, base_hour=0)
recent = _make_metrics(6, base_hour=12)
rows = baseline + recent
recent_rate, baseline_rate, surge, _, _ = compute_rates(rows, horizon_hours=6)
assert recent_rate == 1.0
assert baseline_rate == 0.5
assert surge == 2.0
# ── Caller Analysis ──────────────────────────────────────────────────────────

View File

@@ -1,183 +0,0 @@
#!/usr/bin/env python3
"""
Tests for audit_trail.py — SOUL.md honesty requirement.
Verifies:
- Every response is logged with input + sources + confidence
- Logs are stored locally (JSONL format)
- Query works: by date, session, confidence, keyword
- why() answers: why did you say X?
- Privacy: no network calls, files stay local
- Size rotation works
"""
import json
import os
import sys
import tempfile
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
from timmy.audit_trail import AuditTrail, AuditEntry
@pytest.fixture
def trail(tmp_path):
return AuditTrail(audit_dir=tmp_path / "audit", session_id="test-session")
class TestAuditEntry:
def test_to_dict_roundtrip(self):
e = AuditEntry(
timestamp="2026-04-17T05:00:00Z",
entry_id="abc123",
input_text="What is the weather?",
sources=[{"type": "web", "path": "weather.com"}],
confidence="high",
output_text="It is sunny.",
)
d = e.to_dict()
assert d["input_text"] == "What is the weather?"
assert d["confidence"] == "high"
assert len(d["sources"]) == 1
def test_to_json_is_valid(self):
e = AuditEntry(timestamp="t", entry_id="id", input_text="hi")
assert json.loads(e.to_json())
class TestLog:
def test_log_creates_file(self, trail):
entry = trail.log(
input_text="Hello",
output_text="Hi there",
confidence="high",
model="qwen2.5:7b",
)
assert entry.entry_id
assert entry.output_hash
logfile = trail._today_file()
assert logfile.exists()
def test_log_contains_all_fields(self, trail):
trail.log(
input_text="Test input",
sources=[{"type": "local", "path": "/tmp/file.txt"}],
confidence="medium",
confidence_reason="Based on file content",
output_text="Test output",
model="qwen2.5:7b",
provider="ollama",
tool_calls=[{"name": "read_file", "args": {"path": "/tmp/file.txt"}}],
duration_ms=150,
)
entries = trail.query(limit=1)
assert len(entries) == 1
e = entries[0]
assert e["input_text"] == "Test input"
assert e["sources"][0]["type"] == "local"
assert e["confidence"] == "medium"
assert e["model"] == "qwen2.5:7b"
assert e["tool_calls"][0]["name"] == "read_file"
assert e["duration_ms"] == 150
def test_multiple_logs_append(self, trail):
trail.log(input_text="First", output_text="Out1")
trail.log(input_text="Second", output_text="Out2")
assert len(trail.query(limit=10)) == 2
def test_input_truncated(self, trail):
long_input = "x" * 5000
entry = trail.log(input_text=long_input, output_text="ok")
assert len(entry.input_text) <= 2000
class TestQuery:
def test_query_by_session(self, trail):
trail.log(input_text="A", session_id="s1")
trail.log(input_text="B", session_id="s2")
trail.log(input_text="C", session_id="s1")
results = trail.query(session_id="s1")
# Session ID override in log() doesnt work — uses trail session_id
# But we can test the trail's own session filtering
assert len(trail.query()) == 3
def test_query_by_confidence(self, trail):
trail.log(input_text="A", confidence="high")
trail.log(input_text="B", confidence="low")
trail.log(input_text="C", confidence="high")
assert len(trail.query(confidence="high")) == 2
assert len(trail.query(confidence="low")) == 1
def test_query_by_keyword(self, trail):
trail.log(input_text="How do I fix Python errors?")
trail.log(input_text="What is the weather?")
results = trail.query(keyword="python")
assert len(results) == 1
assert "python" in results[0]["input_text"].lower()
def test_query_limit(self, trail):
for i in range(10):
trail.log(input_text=f"Item {i}", output_text=f"Response {i}")
assert len(trail.query(limit=3)) == 3
class TestGetById:
def test_find_by_id(self, trail):
entry = trail.log(input_text="Find me", output_text="Found")
found = trail.get_by_id(entry.entry_id)
assert found is not None
assert found["input_text"] == "Find me"
def test_not_found_returns_none(self, trail):
assert trail.get_by_id("nonexistent") is None
class TestWhy:
def test_why_returns_entry(self, trail):
entry = trail.log(
input_text="What is 2+2?",
output_text="4",
sources=[{"type": "knowledge", "path": "math"}],
)
found = trail.why(entry.output_hash)
assert found is not None
assert found["input_text"] == "What is 2+2?"
assert found["sources"][0]["type"] == "knowledge"
def test_why_not_found(self, trail):
assert trail.why("nohash") is None
class TestStats:
def test_empty_stats(self, trail):
s = trail.stats()
assert s["total"] == 0
def test_stats_counts(self, trail):
trail.log(input_text="A", confidence="high")
trail.log(input_text="B", confidence="low")
trail.log(input_text="C", confidence="high")
s = trail.stats()
assert s["total"] == 3
assert s["by_confidence"]["high"] == 2
assert s["by_confidence"]["low"] == 1
class TestPrivacy:
def test_no_network_calls(self, trail):
"""Verify the module makes no network calls — pure local filesystem."""
import timmy.audit_trail as mod
source = open(mod.__file__).read()
assert "requests" not in source
assert "urllib" not in source
assert "httpx" not in source
assert "socket" not in source
assert "subprocess" not in source
def test_files_are_local(self, trail, tmp_path):
trail.log(input_text="Private data", output_text="Secret")
logfile = trail._today_file()
assert str(logfile).startswith(str(tmp_path))