Compare commits

..

1 Commits

Author SHA1 Message Date
kimi
ab0363f700 fix: add unit tests for memory/embeddings.py
All checks were successful
Tests / lint (pull_request) Successful in 2s
Tests / test (pull_request) Successful in 1m9s
Tests cover _simple_hash_embedding, cosine_similarity, _keyword_overlap,
embed_text, and _get_embedding_model with proper mocking of the global
model state.

Fixes #431

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 11:11:49 -04:00
30 changed files with 447 additions and 2518 deletions

View File

@@ -54,6 +54,19 @@ providers:
context_window: 2048 context_window: 2048
capabilities: [text, vision, streaming] capabilities: [text, vision, streaming]
# Secondary: Local AirLLM (if installed)
- name: airllm-local
type: airllm
enabled: false # Enable if pip install airllm
priority: 2
models:
- name: 70b
default: true
capabilities: [text, tools, json, streaming]
- name: 8b
capabilities: [text, tools, json, streaming]
- name: 405b
capabilities: [text, tools, json, streaming]
# Tertiary: OpenAI (if API key available) # Tertiary: OpenAI (if API key available)
- name: openai-backup - name: openai-backup

View File

@@ -94,17 +94,12 @@ def extract_cycle_number(title: str) -> int | None:
return int(m.group(1)) if m else None return int(m.group(1)) if m else None
def extract_issue_number(title: str, body: str, pr_number: int | None = None) -> int | None: def extract_issue_number(title: str, body: str) -> int | None:
"""Extract the issue number from PR body/title, ignoring the PR number itself. # Try body first (usually has "closes #N")
Gitea appends "(#N)" to PR titles where N is the PR number — skip that
so we don't confuse it with the linked issue.
"""
for text in [body or "", title]: for text in [body or "", title]:
for m in ISSUE_RE.finditer(text): m = ISSUE_RE.search(text)
num = int(m.group(1)) if m:
if num != pr_number: return int(m.group(1))
return num
return None return None
@@ -145,7 +140,7 @@ def main():
else: else:
cycle_counter = max(cycle_counter, cycle) cycle_counter = max(cycle_counter, cycle)
issue = extract_issue_number(title, body, pr_number=pr_num) issue = extract_issue_number(title, body)
issue_type = classify_pr(title, body) issue_type = classify_pr(title, body)
duration = estimate_duration(pr) duration = estimate_duration(pr)
diff = get_pr_diff_stats(token, pr_num) diff = get_pr_diff_stats(token, pr_num)

View File

@@ -4,26 +4,11 @@
Called after each cycle completes (success or failure). Called after each cycle completes (success or failure).
Appends a structured entry to .loop/retro/cycles.jsonl. Appends a structured entry to .loop/retro/cycles.jsonl.
EPOCH NOTATION (turnover system):
Each cycle carries a symbolic epoch tag alongside the raw integer:
⟳WW.D:NNN
⟳ turnover glyph — marks epoch-aware cycles
WW ISO week-of-year (0153)
D ISO weekday (1=Mon … 7=Sun)
NNN daily cycle counter, zero-padded, resets at midnight UTC
Example: ⟳12.3:042 — Week 12, Wednesday, 42nd cycle of the day.
The raw `cycle` integer is preserved for backward compatibility.
The `epoch` field carries the symbolic notation.
SUCCESS DEFINITION: SUCCESS DEFINITION:
A cycle is only "success" if BOTH conditions are met: A cycle is only "success" if BOTH conditions are met:
1. The hermes process exited cleanly (exit code 0) 1. The hermes process exited cleanly (exit code 0)
2. Main is green (smoke test passes on main after merge) 2. Main is green (smoke test passes on main after merge)
A cycle that merges a PR but leaves main red is a FAILURE. A cycle that merges a PR but leaves main red is a FAILURE.
The --main-green flag records the smoke test result. The --main-green flag records the smoke test result.
@@ -44,8 +29,6 @@ from __future__ import annotations
import argparse import argparse
import json import json
import re
import subprocess
import sys import sys
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
@@ -53,68 +36,10 @@ from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent REPO_ROOT = Path(__file__).resolve().parent.parent
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl" RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json" SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json"
EPOCH_COUNTER_FILE = REPO_ROOT / ".loop" / "retro" / ".epoch_counter"
# How many recent entries to include in rolling summary # How many recent entries to include in rolling summary
SUMMARY_WINDOW = 50 SUMMARY_WINDOW = 50
# Branch patterns that encode an issue number, e.g. kimi/issue-492
BRANCH_ISSUE_RE = re.compile(r"issue[/-](\d+)", re.IGNORECASE)
def detect_issue_from_branch() -> int | None:
"""Try to extract an issue number from the current git branch name."""
try:
branch = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
stderr=subprocess.DEVNULL,
text=True,
).strip()
except (subprocess.CalledProcessError, FileNotFoundError):
return None
m = BRANCH_ISSUE_RE.search(branch)
return int(m.group(1)) if m else None
# ── Epoch turnover ────────────────────────────────────────────────────────
def _epoch_tag(now: datetime | None = None) -> tuple[str, dict]:
"""Generate the symbolic epoch tag and advance the daily counter.
Returns (epoch_string, epoch_parts) where epoch_parts is a dict with
week, weekday, daily_n for structured storage.
The daily counter persists in .epoch_counter as a two-line file:
line 1: ISO date (YYYY-MM-DD) of the current epoch day
line 2: integer count
When the date rolls over, the counter resets to 1.
"""
if now is None:
now = datetime.now(timezone.utc)
iso_cal = now.isocalendar() # (year, week, weekday)
week = iso_cal[1]
weekday = iso_cal[2]
today_str = now.strftime("%Y-%m-%d")
# Read / reset daily counter
daily_n = 1
EPOCH_COUNTER_FILE.parent.mkdir(parents=True, exist_ok=True)
if EPOCH_COUNTER_FILE.exists():
try:
lines = EPOCH_COUNTER_FILE.read_text().strip().splitlines()
if len(lines) == 2 and lines[0] == today_str:
daily_n = int(lines[1]) + 1
except (ValueError, IndexError):
pass # corrupt file — reset
# Persist
EPOCH_COUNTER_FILE.write_text(f"{today_str}\n{daily_n}\n")
tag = f"\u27f3{week:02d}.{weekday}:{daily_n:03d}"
parts = {"week": week, "weekday": weekday, "daily_n": daily_n}
return tag, parts
def parse_args() -> argparse.Namespace: def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Log a cycle retrospective") p = argparse.ArgumentParser(description="Log a cycle retrospective")
@@ -198,30 +123,8 @@ def update_summary() -> None:
issue_failures[e["issue"]] = issue_failures.get(e["issue"], 0) + 1 issue_failures[e["issue"]] = issue_failures.get(e["issue"], 0) + 1
quarantine_candidates = {k: v for k, v in issue_failures.items() if v >= 2} quarantine_candidates = {k: v for k, v in issue_failures.items() if v >= 2}
# Epoch turnover stats — cycles per week/day from epoch-tagged entries
epoch_entries = [e for e in recent if e.get("epoch")]
by_week: dict[int, int] = {}
by_weekday: dict[int, int] = {}
for e in epoch_entries:
w = e.get("epoch_week")
d = e.get("epoch_weekday")
if w is not None:
by_week[w] = by_week.get(w, 0) + 1
if d is not None:
by_weekday[d] = by_weekday.get(d, 0) + 1
# Current epoch — latest entry's epoch tag
current_epoch = epoch_entries[-1].get("epoch", "") if epoch_entries else ""
# Weekday names for display
weekday_glyphs = {1: "Mon", 2: "Tue", 3: "Wed", 4: "Thu",
5: "Fri", 6: "Sat", 7: "Sun"}
by_weekday_named = {weekday_glyphs.get(k, str(k)): v
for k, v in sorted(by_weekday.items())}
summary = { summary = {
"updated_at": datetime.now(timezone.utc).isoformat(), "updated_at": datetime.now(timezone.utc).isoformat(),
"current_epoch": current_epoch,
"window": len(recent), "window": len(recent),
"measured_cycles": len(measured), "measured_cycles": len(measured),
"total_cycles": len(entries), "total_cycles": len(entries),
@@ -233,12 +136,9 @@ def update_summary() -> None:
"total_lines_removed": sum(e.get("lines_removed", 0) for e in recent), "total_lines_removed": sum(e.get("lines_removed", 0) for e in recent),
"total_prs_merged": sum(1 for e in recent if e.get("pr")), "total_prs_merged": sum(1 for e in recent if e.get("pr")),
"by_type": type_stats, "by_type": type_stats,
"by_week": dict(sorted(by_week.items())),
"by_weekday": by_weekday_named,
"quarantine_candidates": quarantine_candidates, "quarantine_candidates": quarantine_candidates,
"recent_failures": [ "recent_failures": [
{"cycle": e["cycle"], "epoch": e.get("epoch", ""), {"cycle": e["cycle"], "issue": e.get("issue"), "reason": e.get("reason", "")}
"issue": e.get("issue"), "reason": e.get("reason", "")}
for e in failures[-5:] for e in failures[-5:]
], ],
} }
@@ -249,10 +149,6 @@ def update_summary() -> None:
def main() -> None: def main() -> None:
args = parse_args() args = parse_args()
# Auto-detect issue from branch when not explicitly provided
if args.issue is None:
args.issue = detect_issue_from_branch()
# Reject idle cycles — no issue and no duration means nothing happened # Reject idle cycles — no issue and no duration means nothing happened
if not args.issue and args.duration == 0: if not args.issue and args.duration == 0:
print(f"[retro] Cycle {args.cycle} skipped — idle (no issue, no duration)") print(f"[retro] Cycle {args.cycle} skipped — idle (no issue, no duration)")
@@ -261,17 +157,9 @@ def main() -> None:
# A cycle is only truly successful if hermes exited clean AND main is green # A cycle is only truly successful if hermes exited clean AND main is green
truly_success = args.success and args.main_green truly_success = args.success and args.main_green
# Generate epoch turnover tag
now = datetime.now(timezone.utc)
epoch_tag, epoch_parts = _epoch_tag(now)
entry = { entry = {
"timestamp": now.isoformat(), "timestamp": datetime.now(timezone.utc).isoformat(),
"cycle": args.cycle, "cycle": args.cycle,
"epoch": epoch_tag,
"epoch_week": epoch_parts["week"],
"epoch_weekday": epoch_parts["weekday"],
"epoch_daily_n": epoch_parts["daily_n"],
"issue": args.issue, "issue": args.issue,
"type": args.type, "type": args.type,
"success": truly_success, "success": truly_success,
@@ -296,7 +184,7 @@ def main() -> None:
update_summary() update_summary()
status = "✓ SUCCESS" if args.success else "✗ FAILURE" status = "✓ SUCCESS" if args.success else "✗ FAILURE"
print(f"[retro] {epoch_tag} Cycle {args.cycle} {status}", end="") print(f"[retro] Cycle {args.cycle} {status}", end="")
if args.issue: if args.issue:
print(f" (#{args.issue} {args.type})", end="") print(f" (#{args.issue} {args.type})", end="")
if args.duration: if args.duration:

View File

@@ -1,407 +0,0 @@
#!/usr/bin/env python3
"""Loop introspection — the self-improvement engine.
Analyzes retro data across time windows to detect trends, extract patterns,
and produce structured recommendations. Output is consumed by deep_triage
and injected into the loop prompt context.
This is the piece that closes the feedback loop:
cycle_retro → introspect → deep_triage → loop behavior changes
Run: python3 scripts/loop_introspect.py
Output: .loop/retro/insights.json (structured insights + recommendations)
Prints human-readable summary to stdout.
Called by: deep_triage.sh (before the LLM triage), timmy-loop.sh (every 50 cycles)
"""
from __future__ import annotations
import json
import sys
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
CYCLES_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
DEEP_TRIAGE_FILE = REPO_ROOT / ".loop" / "retro" / "deep-triage.jsonl"
TRIAGE_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json"
INSIGHTS_FILE = REPO_ROOT / ".loop" / "retro" / "insights.json"
# ── Helpers ──────────────────────────────────────────────────────────────
def load_jsonl(path: Path) -> list[dict]:
"""Load a JSONL file, skipping bad lines."""
if not path.exists():
return []
entries = []
for line in path.read_text().strip().splitlines():
try:
entries.append(json.loads(line))
except (json.JSONDecodeError, ValueError):
continue
return entries
def parse_ts(ts_str: str) -> datetime | None:
"""Parse an ISO timestamp, tolerating missing tz."""
if not ts_str:
return None
try:
dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except (ValueError, TypeError):
return None
def window(entries: list[dict], days: int) -> list[dict]:
"""Filter entries to the last N days."""
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
result = []
for e in entries:
ts = parse_ts(e.get("timestamp", ""))
if ts and ts >= cutoff:
result.append(e)
return result
# ── Analysis functions ───────────────────────────────────────────────────
def compute_trends(cycles: list[dict]) -> dict:
"""Compare recent window (last 7d) vs older window (7-14d ago)."""
recent = window(cycles, 7)
older = window(cycles, 14)
# Remove recent from older to get the 7-14d window
recent_set = {(e.get("cycle"), e.get("timestamp")) for e in recent}
older = [e for e in older if (e.get("cycle"), e.get("timestamp")) not in recent_set]
def stats(entries):
if not entries:
return {"count": 0, "success_rate": None, "avg_duration": None,
"lines_net": 0, "prs_merged": 0}
successes = sum(1 for e in entries if e.get("success"))
durations = [e["duration"] for e in entries if e.get("duration", 0) > 0]
return {
"count": len(entries),
"success_rate": round(successes / len(entries), 3) if entries else None,
"avg_duration": round(sum(durations) / len(durations)) if durations else None,
"lines_net": sum(e.get("lines_added", 0) - e.get("lines_removed", 0) for e in entries),
"prs_merged": sum(1 for e in entries if e.get("pr")),
}
recent_stats = stats(recent)
older_stats = stats(older)
trend = {
"recent_7d": recent_stats,
"previous_7d": older_stats,
"velocity_change": None,
"success_rate_change": None,
"duration_change": None,
}
if recent_stats["count"] and older_stats["count"]:
trend["velocity_change"] = recent_stats["count"] - older_stats["count"]
if recent_stats["success_rate"] is not None and older_stats["success_rate"] is not None:
trend["success_rate_change"] = round(
recent_stats["success_rate"] - older_stats["success_rate"], 3
)
if recent_stats["avg_duration"] is not None and older_stats["avg_duration"] is not None:
trend["duration_change"] = recent_stats["avg_duration"] - older_stats["avg_duration"]
return trend
def type_analysis(cycles: list[dict]) -> dict:
"""Per-type success rates and durations."""
by_type: dict[str, list[dict]] = defaultdict(list)
for c in cycles:
by_type[c.get("type", "unknown")].append(c)
result = {}
for t, entries in by_type.items():
durations = [e["duration"] for e in entries if e.get("duration", 0) > 0]
successes = sum(1 for e in entries if e.get("success"))
result[t] = {
"count": len(entries),
"success_rate": round(successes / len(entries), 3) if entries else 0,
"avg_duration": round(sum(durations) / len(durations)) if durations else 0,
"max_duration": max(durations) if durations else 0,
}
return result
def repeat_failures(cycles: list[dict]) -> list[dict]:
"""Issues that have failed multiple times — quarantine candidates."""
failures: dict[int, list] = defaultdict(list)
for c in cycles:
if not c.get("success") and c.get("issue"):
failures[c["issue"]].append({
"cycle": c.get("cycle"),
"reason": c.get("reason", ""),
"duration": c.get("duration", 0),
})
# Only issues with 2+ failures
return [
{"issue": k, "failure_count": len(v), "attempts": v}
for k, v in sorted(failures.items(), key=lambda x: -len(x[1]))
if len(v) >= 2
]
def duration_outliers(cycles: list[dict], threshold_multiple: float = 3.0) -> list[dict]:
"""Cycles that took way longer than average — something went wrong."""
durations = [c["duration"] for c in cycles if c.get("duration", 0) > 0]
if len(durations) < 5:
return []
avg = sum(durations) / len(durations)
threshold = avg * threshold_multiple
outliers = []
for c in cycles:
dur = c.get("duration", 0)
if dur > threshold:
outliers.append({
"cycle": c.get("cycle"),
"issue": c.get("issue"),
"type": c.get("type"),
"duration": dur,
"avg_duration": round(avg),
"multiple": round(dur / avg, 1) if avg > 0 else 0,
"reason": c.get("reason", ""),
})
return outliers
def triage_effectiveness(deep_triages: list[dict]) -> dict:
"""How well is the deep triage performing?"""
if not deep_triages:
return {"runs": 0, "note": "No deep triage data yet"}
total_reviewed = sum(d.get("issues_reviewed", 0) for d in deep_triages)
total_refined = sum(len(d.get("issues_refined", [])) for d in deep_triages)
total_created = sum(len(d.get("issues_created", [])) for d in deep_triages)
total_closed = sum(len(d.get("issues_closed", [])) for d in deep_triages)
timmy_available = sum(1 for d in deep_triages if d.get("timmy_available"))
# Extract Timmy's feedback themes
timmy_themes = []
for d in deep_triages:
fb = d.get("timmy_feedback", "")
if fb:
timmy_themes.append(fb[:200])
return {
"runs": len(deep_triages),
"total_reviewed": total_reviewed,
"total_refined": total_refined,
"total_created": total_created,
"total_closed": total_closed,
"timmy_consultation_rate": round(timmy_available / len(deep_triages), 2),
"timmy_recent_feedback": timmy_themes[-1] if timmy_themes else "",
"timmy_feedback_history": timmy_themes,
}
def generate_recommendations(
trends: dict,
types: dict,
repeats: list,
outliers: list,
triage_eff: dict,
) -> list[dict]:
"""Produce actionable recommendations from the analysis."""
recs = []
# 1. Success rate declining?
src = trends.get("success_rate_change")
if src is not None and src < -0.1:
recs.append({
"severity": "high",
"category": "reliability",
"finding": f"Success rate dropped {abs(src)*100:.0f}pp in the last 7 days",
"recommendation": "Review recent failures. Are issues poorly scoped? "
"Is main unstable? Check if triage is producing bad work items.",
})
# 2. Velocity dropping?
vc = trends.get("velocity_change")
if vc is not None and vc < -5:
recs.append({
"severity": "medium",
"category": "throughput",
"finding": f"Velocity dropped by {abs(vc)} cycles vs previous week",
"recommendation": "Check for loop stalls, long-running cycles, or queue starvation.",
})
# 3. Duration creep?
dc = trends.get("duration_change")
if dc is not None and dc > 120: # 2+ minutes longer
recs.append({
"severity": "medium",
"category": "efficiency",
"finding": f"Average cycle duration increased by {dc}s vs previous week",
"recommendation": "Issues may be growing in scope. Enforce tighter decomposition "
"in deep triage. Check if tests are getting slower.",
})
# 4. Type-specific problems
for t, info in types.items():
if info["count"] >= 3 and info["success_rate"] < 0.5:
recs.append({
"severity": "high",
"category": "type_reliability",
"finding": f"'{t}' issues fail {(1-info['success_rate'])*100:.0f}% of the time "
f"({info['count']} attempts)",
"recommendation": f"'{t}' issues need better scoping or different approach. "
f"Consider: tighter acceptance criteria, smaller scope, "
f"or delegating to Kimi with more context.",
})
if info["avg_duration"] > 600 and info["count"] >= 3: # >10 min avg
recs.append({
"severity": "medium",
"category": "type_efficiency",
"finding": f"'{t}' issues average {info['avg_duration']//60}m{info['avg_duration']%60}s "
f"(max {info['max_duration']//60}m)",
"recommendation": f"Break '{t}' issues into smaller pieces. Target <5 min per cycle.",
})
# 5. Repeat failures
for rf in repeats[:3]:
recs.append({
"severity": "high",
"category": "repeat_failure",
"finding": f"Issue #{rf['issue']} has failed {rf['failure_count']} times",
"recommendation": "Quarantine or rewrite this issue. Repeated failure = "
"bad scope or missing prerequisite.",
})
# 6. Outliers
if len(outliers) > 2:
recs.append({
"severity": "medium",
"category": "outliers",
"finding": f"{len(outliers)} cycles took {outliers[0].get('multiple', '?')}x+ "
f"longer than average",
"recommendation": "Long cycles waste resources. Add timeout enforcement or "
"break complex issues earlier.",
})
# 7. Code growth
recent = trends.get("recent_7d", {})
net = recent.get("lines_net", 0)
if net > 500:
recs.append({
"severity": "low",
"category": "code_health",
"finding": f"Net +{net} lines added in the last 7 days",
"recommendation": "Lines of code is a liability. Balance feature work with "
"refactoring. Target net-zero or negative line growth.",
})
# 8. Triage health
if triage_eff.get("runs", 0) == 0:
recs.append({
"severity": "high",
"category": "triage",
"finding": "Deep triage has never run",
"recommendation": "Enable deep triage (every 20 cycles). The loop needs "
"LLM-driven issue refinement to stay effective.",
})
# No recommendations = things are healthy
if not recs:
recs.append({
"severity": "info",
"category": "health",
"finding": "No significant issues detected",
"recommendation": "System is healthy. Continue current patterns.",
})
return recs
# ── Main ─────────────────────────────────────────────────────────────────
def main() -> None:
cycles = load_jsonl(CYCLES_FILE)
deep_triages = load_jsonl(DEEP_TRIAGE_FILE)
if not cycles:
print("[introspect] No cycle data found. Nothing to analyze.")
return
# Run all analyses
trends = compute_trends(cycles)
types = type_analysis(cycles)
repeats = repeat_failures(cycles)
outliers = duration_outliers(cycles)
triage_eff = triage_effectiveness(deep_triages)
recommendations = generate_recommendations(trends, types, repeats, outliers, triage_eff)
insights = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"total_cycles_analyzed": len(cycles),
"trends": trends,
"by_type": types,
"repeat_failures": repeats[:5],
"duration_outliers": outliers[:5],
"triage_effectiveness": triage_eff,
"recommendations": recommendations,
}
# Write insights
INSIGHTS_FILE.parent.mkdir(parents=True, exist_ok=True)
INSIGHTS_FILE.write_text(json.dumps(insights, indent=2) + "\n")
# Current epoch from latest entry
latest_epoch = ""
for c in reversed(cycles):
if c.get("epoch"):
latest_epoch = c["epoch"]
break
# Human-readable output
header = f"[introspect] Analyzed {len(cycles)} cycles"
if latest_epoch:
header += f" · current epoch: {latest_epoch}"
print(header)
print(f"\n TRENDS (7d vs previous 7d):")
r7 = trends["recent_7d"]
p7 = trends["previous_7d"]
print(f" Cycles: {r7['count']:>3d} (was {p7['count']})")
if r7["success_rate"] is not None:
arrow = "" if (trends["success_rate_change"] or 0) > 0 else "" if (trends["success_rate_change"] or 0) < 0 else ""
print(f" Success rate: {r7['success_rate']*100:>4.0f}% {arrow}")
if r7["avg_duration"] is not None:
print(f" Avg duration: {r7['avg_duration']//60}m{r7['avg_duration']%60:02d}s")
print(f" PRs merged: {r7['prs_merged']:>3d} (was {p7['prs_merged']})")
print(f" Lines net: {r7['lines_net']:>+5d}")
print(f"\n BY TYPE:")
for t, info in sorted(types.items(), key=lambda x: -x[1]["count"]):
print(f" {t:12s} n={info['count']:>2d} "
f"ok={info['success_rate']*100:>3.0f}% "
f"avg={info['avg_duration']//60}m{info['avg_duration']%60:02d}s")
if repeats:
print(f"\n REPEAT FAILURES:")
for rf in repeats[:3]:
print(f" #{rf['issue']} failed {rf['failure_count']}x")
print(f"\n RECOMMENDATIONS ({len(recommendations)}):")
for i, rec in enumerate(recommendations, 1):
sev = {"high": "🔴", "medium": "🟡", "low": "🟢", "info": " "}.get(rec["severity"], "?")
print(f" {sev} {rec['finding']}")
print(f"{rec['recommendation']}")
print(f"\n Written to: {INSIGHTS_FILE}")
if __name__ == "__main__":
main()

View File

@@ -10,11 +10,6 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
APP_START_TIME: _datetime = _datetime.now(UTC) APP_START_TIME: _datetime = _datetime.now(UTC)
def normalize_ollama_url(url: str) -> str:
"""Replace localhost with 127.0.0.1 to avoid IPv6 resolution delays."""
return url.replace("localhost", "127.0.0.1")
class Settings(BaseSettings): class Settings(BaseSettings):
"""Central configuration — all env-var access goes through this class.""" """Central configuration — all env-var access goes through this class."""
@@ -24,11 +19,6 @@ class Settings(BaseSettings):
# Ollama host — override with OLLAMA_URL env var or .env file # Ollama host — override with OLLAMA_URL env var or .env file
ollama_url: str = "http://localhost:11434" ollama_url: str = "http://localhost:11434"
@property
def normalized_ollama_url(self) -> str:
"""Return ollama_url with localhost replaced by 127.0.0.1."""
return normalize_ollama_url(self.ollama_url)
# LLM model passed to Agno/Ollama — override with OLLAMA_MODEL # LLM model passed to Agno/Ollama — override with OLLAMA_MODEL
# qwen3:30b is the primary model — better reasoning and tool calling # qwen3:30b is the primary model — better reasoning and tool calling
# than llama3.1:8b-instruct while still running locally on modest hardware. # than llama3.1:8b-instruct while still running locally on modest hardware.
@@ -74,10 +64,17 @@ class Settings(BaseSettings):
# Seconds to wait for user confirmation before auto-rejecting. # Seconds to wait for user confirmation before auto-rejecting.
discord_confirm_timeout: int = 120 discord_confirm_timeout: int = 120
# ── Backend selection ──────────────────────────────────────────────────── # ── AirLLM / backend selection ───────────────────────────────────────────
# "ollama" — always use Ollama (default, safe everywhere) # "ollama" — always use Ollama (default, safe everywhere)
# "auto" — pick best available local backend, fall back to Ollama # "airllm" — always use AirLLM (requires pip install ".[bigbrain]")
timmy_model_backend: Literal["ollama", "grok", "claude", "auto"] = "ollama" # "auto" — use AirLLM on Apple Silicon if airllm is installed,
# fall back to Ollama otherwise
timmy_model_backend: Literal["ollama", "airllm", "grok", "claude", "auto"] = "ollama"
# AirLLM model size when backend is airllm or auto.
# Larger = smarter, but needs more RAM / disk.
# 8b ~16 GB | 70b ~140 GB | 405b ~810 GB
airllm_model_size: Literal["8b", "70b", "405b"] = "70b"
# ── Grok (xAI) — opt-in premium cloud backend ──────────────────────── # ── Grok (xAI) — opt-in premium cloud backend ────────────────────────
# Grok is a premium augmentation layer — local-first ethos preserved. # Grok is a premium augmentation layer — local-first ethos preserved.
@@ -141,12 +138,7 @@ class Settings(BaseSettings):
# CORS allowed origins for the web chat interface (Gitea Pages, etc.) # CORS allowed origins for the web chat interface (Gitea Pages, etc.)
# Set CORS_ORIGINS as a comma-separated list, e.g. "http://localhost:3000,https://example.com" # Set CORS_ORIGINS as a comma-separated list, e.g. "http://localhost:3000,https://example.com"
cors_origins: list[str] = [ cors_origins: list[str] = ["*"]
"http://localhost:3000",
"http://localhost:8000",
"http://127.0.0.1:3000",
"http://127.0.0.1:8000",
]
# Trusted hosts for the Host header check (TrustedHostMiddleware). # Trusted hosts for the Host header check (TrustedHostMiddleware).
# Set TRUSTED_HOSTS as a comma-separated list. Wildcards supported (e.g. "*.ts.net"). # Set TRUSTED_HOSTS as a comma-separated list. Wildcards supported (e.g. "*.ts.net").
@@ -402,7 +394,7 @@ def check_ollama_model_available(model_name: str) -> bool:
import json import json
import urllib.request import urllib.request
url = settings.normalized_ollama_url url = settings.ollama_url.replace("localhost", "127.0.0.1")
req = urllib.request.Request( req = urllib.request.Request(
f"{url}/api/tags", f"{url}/api/tags",
method="GET", method="GET",
@@ -479,19 +471,8 @@ def validate_startup(*, force: bool = False) -> None:
", ".join(_missing), ", ".join(_missing),
) )
sys.exit(1) sys.exit(1)
if "*" in settings.cors_origins:
_startup_logger.error(
"PRODUCTION SECURITY ERROR: CORS wildcard '*' is not allowed "
"in production. Set CORS_ORIGINS to explicit origins."
)
sys.exit(1)
_startup_logger.info("Production mode: security secrets validated ✓") _startup_logger.info("Production mode: security secrets validated ✓")
else: else:
if "*" in settings.cors_origins:
_startup_logger.warning(
"SEC: CORS_ORIGINS contains wildcard '*'"
"restrict to explicit origins before deploying to production."
)
if not settings.l402_hmac_secret: if not settings.l402_hmac_secret:
_startup_logger.warning( _startup_logger.warning(
"SEC: L402_HMAC_SECRET is not set — " "SEC: L402_HMAC_SECRET is not set — "

View File

@@ -329,35 +329,33 @@ async def _discord_token_watcher() -> None:
logger.warning("Discord auto-start failed: %s", exc) logger.warning("Discord auto-start failed: %s", exc)
def _startup_init() -> None: @asynccontextmanager
"""Validate config and enable event persistence.""" async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
# Validate security config (no-op in test mode)
from config import validate_startup from config import validate_startup
validate_startup() validate_startup()
# Enable event persistence (unified EventBus + swarm event_log)
from infrastructure.events.bus import init_event_bus_persistence from infrastructure.events.bus import init_event_bus_persistence
init_event_bus_persistence() init_event_bus_persistence()
# Create all background tasks without waiting for them
briefing_task = asyncio.create_task(_briefing_scheduler())
thinking_task = asyncio.create_task(_thinking_scheduler())
loop_qa_task = asyncio.create_task(_loop_qa_scheduler())
presence_task = asyncio.create_task(_presence_watcher())
# Initialize Spark Intelligence engine
from spark.engine import get_spark_engine from spark.engine import get_spark_engine
if get_spark_engine().enabled: if get_spark_engine().enabled:
logger.info("Spark Intelligence active — event capture enabled") logger.info("Spark Intelligence active — event capture enabled")
# Auto-prune old vector store memories on startup
def _startup_background_tasks() -> list[asyncio.Task]:
"""Spawn all recurring background tasks (non-blocking)."""
return [
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
]
def _startup_pruning() -> None:
"""Auto-prune old memories, thoughts, and events on startup."""
if settings.memory_prune_days > 0: if settings.memory_prune_days > 0:
try: try:
from timmy.memory_system import prune_memories from timmy.memory_system import prune_memories
@@ -375,6 +373,7 @@ def _startup_pruning() -> None:
except Exception as exc: except Exception as exc:
logger.debug("Memory auto-prune skipped: %s", exc) logger.debug("Memory auto-prune skipped: %s", exc)
# Auto-prune old thoughts on startup
if settings.thoughts_prune_days > 0: if settings.thoughts_prune_days > 0:
try: try:
from timmy.thinking import thinking_engine from timmy.thinking import thinking_engine
@@ -392,6 +391,7 @@ def _startup_pruning() -> None:
except Exception as exc: except Exception as exc:
logger.debug("Thought auto-prune skipped: %s", exc) logger.debug("Thought auto-prune skipped: %s", exc)
# Auto-prune old system events on startup
if settings.events_prune_days > 0: if settings.events_prune_days > 0:
try: try:
from swarm.event_log import prune_old_events from swarm.event_log import prune_old_events
@@ -409,6 +409,7 @@ def _startup_pruning() -> None:
except Exception as exc: except Exception as exc:
logger.debug("Event auto-prune skipped: %s", exc) logger.debug("Event auto-prune skipped: %s", exc)
# Warn if memory vault exceeds size limit
if settings.memory_vault_max_mb > 0: if settings.memory_vault_max_mb > 0:
try: try:
vault_path = Path(settings.repo_root) / "memory" / "notes" vault_path = Path(settings.repo_root) / "memory" / "notes"
@@ -424,18 +425,37 @@ def _startup_pruning() -> None:
except Exception as exc: except Exception as exc:
logger.debug("Vault size check skipped: %s", exc) logger.debug("Vault size check skipped: %s", exc)
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
async def _shutdown_cleanup( workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
bg_tasks: list[asyncio.Task], await workshop_heartbeat.start()
workshop_heartbeat,
) -> None: # Start chat integrations in background
"""Stop chat bots, MCP sessions, heartbeat, and cancel background tasks.""" chat_task = asyncio.create_task(_start_chat_integrations_background())
# Register session logger with error capture (breaks infrastructure → timmy circular dep)
try:
from infrastructure.error_capture import register_error_recorder
from timmy.session_logger import get_session_logger
register_error_recorder(get_session_logger().record_error)
except Exception:
pass
logger.info("✓ Dashboard ready for requests")
yield
# Cleanup on shutdown
from integrations.chat_bridge.vendors.discord import discord_bot from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.telegram_bot.bot import telegram_bot from integrations.telegram_bot.bot import telegram_bot
await discord_bot.stop() await discord_bot.stop()
await telegram_bot.stop() await telegram_bot.stop()
# Close MCP tool server sessions
try: try:
from timmy.mcp_tools import close_mcp_sessions from timmy.mcp_tools import close_mcp_sessions
@@ -445,42 +465,13 @@ async def _shutdown_cleanup(
await workshop_heartbeat.stop() await workshop_heartbeat.stop()
for task in bg_tasks: for task in [briefing_task, thinking_task, chat_task, loop_qa_task, presence_task]:
task.cancel() if task:
try: task.cancel()
await task try:
except asyncio.CancelledError: await task
pass except asyncio.CancelledError:
pass
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
_startup_init()
bg_tasks = _startup_background_tasks()
_startup_pruning()
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
await workshop_heartbeat.start()
# Register session logger with error capture
try:
from infrastructure.error_capture import register_error_recorder
from timmy.session_logger import get_session_logger
register_error_recorder(get_session_logger().record_error)
except Exception:
logger.debug("Failed to register error recorder")
logger.info("✓ Dashboard ready for requests")
yield
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
app = FastAPI( app = FastAPI(
@@ -493,14 +484,15 @@ app = FastAPI(
def _get_cors_origins() -> list[str]: def _get_cors_origins() -> list[str]:
"""Get CORS origins from settings, rejecting wildcards in production.""" """Get CORS origins from settings, with sensible defaults."""
origins = settings.cors_origins origins = settings.cors_origins
if "*" in origins and not settings.debug: if settings.debug and origins == ["*"]:
logger.warning( return [
"Wildcard '*' in CORS_ORIGINS stripped in production — " "http://localhost:3000",
"set explicit origins via CORS_ORIGINS env var" "http://localhost:8000",
) "http://127.0.0.1:3000",
origins = [o for o in origins if o != "*"] "http://127.0.0.1:8000",
]
return origins return origins

View File

@@ -100,7 +100,7 @@ class CSRFMiddleware(BaseHTTPMiddleware):
... ...
Usage: Usage:
app.add_middleware(CSRFMiddleware, secret=settings.csrf_secret) app.add_middleware(CSRFMiddleware, secret="your-secret-key")
Attributes: Attributes:
secret: Secret key for token signing (optional, for future use). secret: Secret key for token signing (optional, for future use).

View File

@@ -91,7 +91,7 @@ async def chat_agent(request: Request, message: str = Form(...)):
thinking_engine.record_user_input() thinking_engine.record_user_input()
except Exception: except Exception:
logger.debug("Failed to record user input for thinking engine") pass
timestamp = datetime.now().strftime("%H:%M:%S") timestamp = datetime.now().strftime("%H:%M:%S")
response_text = None response_text = None

View File

@@ -85,7 +85,7 @@ async def api_chat(request: Request):
thinking_engine.record_user_input() thinking_engine.record_user_input()
except Exception: except Exception:
logger.debug("Failed to record user input for thinking engine") pass
timestamp = datetime.now().strftime("%H:%M:%S") timestamp = datetime.now().strftime("%H:%M:%S")

View File

@@ -65,7 +65,7 @@ def _check_ollama_sync() -> DependencyStatus:
try: try:
import urllib.request import urllib.request
url = settings.normalized_ollama_url url = settings.ollama_url.replace("localhost", "127.0.0.1")
req = urllib.request.Request( req = urllib.request.Request(
f"{url}/api/tags", f"{url}/api/tags",
method="GET", method="GET",

View File

@@ -166,7 +166,7 @@ async def api_briefing_status():
if cached: if cached:
last_generated = cached.generated_at.isoformat() last_generated = cached.generated_at.isoformat()
except Exception: except Exception:
logger.debug("Failed to read briefing cache") pass
return JSONResponse( return JSONResponse(
{ {
@@ -190,7 +190,6 @@ async def api_memory_status():
stats = get_memory_stats() stats = get_memory_stats()
indexed_files = stats.get("total_entries", 0) indexed_files = stats.get("total_entries", 0)
except Exception: except Exception:
logger.debug("Failed to get memory stats")
indexed_files = 0 indexed_files = 0
return JSONResponse( return JSONResponse(
@@ -216,7 +215,7 @@ async def api_swarm_status():
).fetchone() ).fetchone()
pending_tasks = row["cnt"] if row else 0 pending_tasks = row["cnt"] if row else 0
except Exception: except Exception:
logger.debug("Failed to count pending tasks") pass
return JSONResponse( return JSONResponse(
{ {

View File

@@ -221,7 +221,7 @@ async def _heartbeat(websocket: WebSocket) -> None:
await asyncio.sleep(_HEARTBEAT_INTERVAL) await asyncio.sleep(_HEARTBEAT_INTERVAL)
await websocket.send_text(json.dumps({"type": "ping"})) await websocket.send_text(json.dumps({"type": "ping"}))
except Exception: except Exception:
logger.debug("Heartbeat stopped — connection gone") pass # connection gone — receive loop will clean up
@router.websocket("/ws") @router.websocket("/ws")
@@ -250,7 +250,7 @@ async def world_ws(websocket: WebSocket) -> None:
raw = await websocket.receive_text() raw = await websocket.receive_text()
await _handle_client_message(raw) await _handle_client_message(raw)
except Exception: except Exception:
logger.debug("WebSocket receive loop ended") pass
finally: finally:
ping_task.cancel() ping_task.cancel()
if websocket in _ws_clients: if websocket in _ws_clients:
@@ -265,7 +265,6 @@ async def _broadcast(message: str) -> None:
try: try:
await ws.send_text(message) await ws.send_text(message)
except Exception: except Exception:
logger.debug("Pruning dead WebSocket client")
dead.append(ws) dead.append(ws)
for ws in dead: for ws in dead:
if ws in _ws_clients: if ws in _ws_clients:
@@ -341,7 +340,7 @@ async def _bark_and_broadcast(visitor_text: str) -> None:
pip_familiar.on_event("visitor_spoke") pip_familiar.on_event("visitor_spoke")
except Exception: except Exception:
logger.debug("Pip familiar notification failed (optional)") pass # Pip is optional
_refresh_ground(visitor_text) _refresh_ground(visitor_text)
_tick_commitments() _tick_commitments()

View File

@@ -13,7 +13,7 @@ import logging
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import Enum, auto from enum import Enum, auto
from config import normalize_ollama_url, settings from config import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -307,7 +307,7 @@ class MultiModalManager:
import json import json
import urllib.request import urllib.request
url = normalize_ollama_url(self.ollama_url) url = self.ollama_url.replace("localhost", "127.0.0.1")
req = urllib.request.Request( req = urllib.request.Request(
f"{url}/api/tags", f"{url}/api/tags",
method="GET", method="GET",
@@ -462,7 +462,7 @@ class MultiModalManager:
logger.info("Pulling model: %s", model_name) logger.info("Pulling model: %s", model_name)
url = normalize_ollama_url(self.ollama_url) url = self.ollama_url.replace("localhost", "127.0.0.1")
req = urllib.request.Request( req = urllib.request.Request(
f"{url}/api/pull", f"{url}/api/pull",
method="POST", method="POST",

View File

@@ -183,22 +183,6 @@ async def run_health_check(
} }
@router.post("/reload")
async def reload_config(
cascade: Annotated[CascadeRouter, Depends(get_cascade_router)],
) -> dict[str, Any]:
"""Hot-reload providers.yaml without restart.
Preserves circuit breaker state and metrics for existing providers.
"""
try:
result = cascade.reload_config()
return {"status": "ok", **result}
except Exception as exc:
logger.error("Config reload failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Reload failed: {exc}") from exc
@router.get("/config") @router.get("/config")
async def get_config( async def get_config(
cascade: Annotated[CascadeRouter, Depends(get_cascade_router)], cascade: Annotated[CascadeRouter, Depends(get_cascade_router)],

View File

@@ -18,8 +18,6 @@ from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from config import settings
try: try:
import yaml import yaml
except ImportError: except ImportError:
@@ -102,7 +100,7 @@ class Provider:
"""LLM provider configuration and state.""" """LLM provider configuration and state."""
name: str name: str
type: str # ollama, openai, anthropic type: str # ollama, openai, anthropic, airllm
enabled: bool enabled: bool
priority: int priority: int
url: str | None = None url: str | None = None
@@ -303,13 +301,22 @@ class CascadeRouter:
# Can't check without requests, assume available # Can't check without requests, assume available
return True return True
try: try:
url = provider.url or settings.ollama_url url = provider.url or "http://localhost:11434"
response = requests.get(f"{url}/api/tags", timeout=5) response = requests.get(f"{url}/api/tags", timeout=5)
return response.status_code == 200 return response.status_code == 200
except Exception as exc: except Exception as exc:
logger.debug("Ollama provider check error: %s", exc) logger.debug("Ollama provider check error: %s", exc)
return False return False
elif provider.type == "airllm":
# Check if airllm is installed
try:
import importlib.util
return importlib.util.find_spec("airllm") is not None
except (ImportError, ModuleNotFoundError):
return False
elif provider.type in ("openai", "anthropic", "grok"): elif provider.type in ("openai", "anthropic", "grok"):
# Check if API key is set # Check if API key is set
return provider.api_key is not None and provider.api_key != "" return provider.api_key is not None and provider.api_key != ""
@@ -388,101 +395,6 @@ class CascadeRouter:
return None return None
def _select_model(
self, provider: Provider, model: str | None, content_type: ContentType
) -> tuple[str | None, bool]:
"""Select the best model for the request, with vision fallback.
Returns:
Tuple of (selected_model, is_fallback_model).
"""
selected_model = model or provider.get_default_model()
is_fallback = False
if content_type != ContentType.TEXT and selected_model:
if provider.type == "ollama" and self._mm_manager:
from infrastructure.models.multimodal import ModelCapability
if content_type == ContentType.VISION:
supports = self._mm_manager.model_supports(
selected_model, ModelCapability.VISION
)
if not supports:
fallback = self._get_fallback_model(provider, selected_model, content_type)
if fallback:
logger.info(
"Model %s doesn't support vision, falling back to %s",
selected_model,
fallback,
)
selected_model = fallback
is_fallback = True
else:
logger.warning(
"No vision-capable model found on %s, trying anyway",
provider.name,
)
return selected_model, is_fallback
async def _attempt_with_retry(
self,
provider: Provider,
messages: list[dict],
model: str | None,
temperature: float,
max_tokens: int | None,
content_type: ContentType,
) -> dict:
"""Try a provider with retries, returning the result dict.
Raises:
RuntimeError: If all retry attempts fail.
Returns error strings collected during retries via the exception message.
"""
errors: list[str] = []
for attempt in range(self.config.max_retries_per_provider):
try:
return await self._try_provider(
provider=provider,
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
except Exception as exc:
error_msg = str(exc)
logger.warning(
"Provider %s attempt %d failed: %s",
provider.name,
attempt + 1,
error_msg,
)
errors.append(f"{provider.name}: {error_msg}")
if attempt < self.config.max_retries_per_provider - 1:
await asyncio.sleep(self.config.retry_delay_seconds)
raise RuntimeError("; ".join(errors))
def _is_provider_available(self, provider: Provider) -> bool:
"""Check if a provider should be tried (enabled + circuit breaker)."""
if not provider.enabled:
logger.debug("Skipping %s (disabled)", provider.name)
return False
if provider.status == ProviderStatus.UNHEALTHY:
if self._can_close_circuit(provider):
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
logger.info("Circuit breaker half-open for %s", provider.name)
else:
logger.debug("Skipping %s (circuit open)", provider.name)
return False
return True
async def complete( async def complete(
self, self,
messages: list[dict], messages: list[dict],
@@ -509,6 +421,7 @@ class CascadeRouter:
Raises: Raises:
RuntimeError: If all providers fail RuntimeError: If all providers fail
""" """
# Detect content type for multi-modal routing
content_type = self._detect_content_type(messages) content_type = self._detect_content_type(messages)
if content_type != ContentType.TEXT: if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value) logger.debug("Detected %s content, selecting appropriate model", content_type.value)
@@ -516,34 +429,93 @@ class CascadeRouter:
errors = [] errors = []
for provider in self.providers: for provider in self.providers:
if not self._is_provider_available(provider): # Skip disabled providers
if not provider.enabled:
logger.debug("Skipping %s (disabled)", provider.name)
continue continue
selected_model, is_fallback_model = self._select_model(provider, model, content_type) # Skip unhealthy providers (circuit breaker)
if provider.status == ProviderStatus.UNHEALTHY:
# Check if circuit breaker can close
if self._can_close_circuit(provider):
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
logger.info("Circuit breaker half-open for %s", provider.name)
else:
logger.debug("Skipping %s (circuit open)", provider.name)
continue
try: # Determine which model to use
result = await self._attempt_with_retry( selected_model = model or provider.get_default_model()
provider, is_fallback_model = False
messages,
selected_model,
temperature,
max_tokens,
content_type,
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
continue
self._record_success(provider, result.get("latency_ms", 0)) # For non-text content, check if model supports it
return { if content_type != ContentType.TEXT and selected_model:
"content": result["content"], if provider.type == "ollama" and self._mm_manager:
"provider": provider.name, from infrastructure.models.multimodal import ModelCapability
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
# Check if selected model supports the required capability
if content_type == ContentType.VISION:
supports = self._mm_manager.model_supports(
selected_model, ModelCapability.VISION
)
if not supports:
# Find fallback model
fallback = self._get_fallback_model(
provider, selected_model, content_type
)
if fallback:
logger.info(
"Model %s doesn't support vision, falling back to %s",
selected_model,
fallback,
)
selected_model = fallback
is_fallback_model = True
else:
logger.warning(
"No vision-capable model found on %s, trying anyway",
provider.name,
)
# Try this provider
for attempt in range(self.config.max_retries_per_provider):
try:
result = await self._try_provider(
provider=provider,
messages=messages,
model=selected_model,
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
# Success! Update metrics and return
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get(
"model", selected_model or provider.get_default_model()
),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
except Exception as exc:
error_msg = str(exc)
logger.warning(
"Provider %s attempt %d failed: %s", provider.name, attempt + 1, error_msg
)
errors.append(f"{provider.name}: {error_msg}")
if attempt < self.config.max_retries_per_provider - 1:
await asyncio.sleep(self.config.retry_delay_seconds)
# All retries failed for this provider
self._record_failure(provider)
# All providers failed
raise RuntimeError(f"All providers failed: {'; '.join(errors)}") raise RuntimeError(f"All providers failed: {'; '.join(errors)}")
async def _try_provider( async def _try_provider(
@@ -609,7 +581,7 @@ class CascadeRouter:
"""Call Ollama API with multi-modal support.""" """Call Ollama API with multi-modal support."""
import aiohttp import aiohttp
url = f"{provider.url or settings.ollama_url}/api/chat" url = f"{provider.url}/api/chat"
# Transform messages for Ollama format (including images) # Transform messages for Ollama format (including images)
transformed_messages = self._transform_messages_for_ollama(messages) transformed_messages = self._transform_messages_for_ollama(messages)
@@ -843,66 +815,6 @@ class CascadeRouter:
provider.status = ProviderStatus.HEALTHY provider.status = ProviderStatus.HEALTHY
logger.info("Circuit breaker CLOSED for %s", provider.name) logger.info("Circuit breaker CLOSED for %s", provider.name)
def reload_config(self) -> dict:
"""Hot-reload providers.yaml, preserving runtime state.
Re-reads the config file, rebuilds the provider list, and
preserves circuit breaker state and metrics for providers
that still exist after reload.
Returns:
Summary dict with added/removed/preserved counts.
"""
# Snapshot current runtime state keyed by provider name
old_state: dict[
str, tuple[ProviderMetrics, CircuitState, float | None, int, ProviderStatus]
] = {}
for p in self.providers:
old_state[p.name] = (
p.metrics,
p.circuit_state,
p.circuit_opened_at,
p.half_open_calls,
p.status,
)
old_names = set(old_state.keys())
# Reload from disk
self.providers = []
self._load_config()
# Restore preserved state
new_names = {p.name for p in self.providers}
preserved = 0
for p in self.providers:
if p.name in old_state:
metrics, circuit, opened_at, half_open, status = old_state[p.name]
p.metrics = metrics
p.circuit_state = circuit
p.circuit_opened_at = opened_at
p.half_open_calls = half_open
p.status = status
preserved += 1
added = new_names - old_names
removed = old_names - new_names
logger.info(
"Config reloaded: %d providers (%d preserved, %d added, %d removed)",
len(self.providers),
preserved,
len(added),
len(removed),
)
return {
"total_providers": len(self.providers),
"preserved": preserved,
"added": sorted(added),
"removed": sorted(removed),
}
def get_metrics(self) -> dict: def get_metrics(self) -> dict:
"""Get metrics for all providers.""" """Get metrics for all providers."""
return { return {

View File

@@ -63,7 +63,7 @@ def _pull_model(model_name: str) -> bool:
logger.info("Pulling model: %s", model_name) logger.info("Pulling model: %s", model_name)
url = settings.normalized_ollama_url url = settings.ollama_url.replace("localhost", "127.0.0.1")
req = urllib.request.Request( req = urllib.request.Request(
f"{url}/api/pull", f"{url}/api/pull",
method="POST", method="POST",
@@ -220,7 +220,7 @@ def create_timmy(
print_response(message, stream). print_response(message, stream).
""" """
resolved = _resolve_backend(backend) resolved = _resolve_backend(backend)
size = model_size or "70b" size = model_size or settings.airllm_model_size
if resolved == "claude": if resolved == "claude":
from timmy.backends import ClaudeBackend from timmy.backends import ClaudeBackend
@@ -300,11 +300,7 @@ def create_timmy(
max_context = 2000 if not use_tools else 8000 max_context = 2000 if not use_tools else 8000
if len(memory_context) > max_context: if len(memory_context) > max_context:
memory_context = memory_context[:max_context] + "\n... [truncated]" memory_context = memory_context[:max_context] + "\n... [truncated]"
full_prompt = ( full_prompt = f"{base_prompt}\n\n## Memory Context\n\n{memory_context}"
f"{base_prompt}\n\n"
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
f"{memory_context}"
)
else: else:
full_prompt = base_prompt full_prompt = base_prompt
except Exception as exc: except Exception as exc:

View File

@@ -18,7 +18,6 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import re import re
import threading
import time import time
import uuid import uuid
from collections.abc import Callable from collections.abc import Callable
@@ -60,7 +59,6 @@ class AgenticResult:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_loop_agent = None _loop_agent = None
_loop_agent_lock = threading.Lock()
def _get_loop_agent(): def _get_loop_agent():
@@ -71,11 +69,9 @@ def _get_loop_agent():
""" """
global _loop_agent global _loop_agent
if _loop_agent is None: if _loop_agent is None:
with _loop_agent_lock: from timmy.agent import create_timmy
if _loop_agent is None:
from timmy.agent import create_timmy
_loop_agent = create_timmy() _loop_agent = create_timmy()
return _loop_agent return _loop_agent
@@ -95,126 +91,6 @@ def _parse_steps(plan_text: str) -> list[str]:
return [line.strip() for line in plan_text.strip().splitlines() if line.strip()] return [line.strip() for line in plan_text.strip().splitlines() if line.strip()]
# ---------------------------------------------------------------------------
# Extracted helpers
# ---------------------------------------------------------------------------
def _extract_content(run_result) -> str:
"""Extract text content from an agent run result."""
return run_result.content if hasattr(run_result, "content") else str(run_result)
def _clean(text: str) -> str:
"""Clean a model response using session's response cleaner."""
from timmy.session import _clean_response
return _clean_response(text)
async def _plan_task(
agent, task: str, session_id: str, max_steps: int
) -> tuple[list[str], bool] | str:
"""Run the planning phase — returns (steps, was_truncated) or error string."""
plan_prompt = (
f"Break this task into numbered steps (max {max_steps}). "
f"Return ONLY a numbered list, nothing else.\n\n"
f"Task: {task}"
)
try:
plan_run = await asyncio.to_thread(
agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
)
plan_text = _extract_content(plan_run)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop: planning failed: %s", exc)
return f"Planning failed: {exc}"
steps = _parse_steps(plan_text)
if not steps:
return "Planning produced no steps."
planned_count = len(steps)
steps = steps[:max_steps]
return steps, planned_count > len(steps)
async def _execute_step(
agent,
task: str,
step_desc: str,
step_num: int,
total_steps: int,
recent_results: list[str],
session_id: str,
) -> AgenticStep:
"""Execute a single step, returning an AgenticStep."""
step_start = time.monotonic()
context = (
f"Task: {task}\n"
f"Step {step_num}/{total_steps}: {step_desc}\n"
f"Recent progress: {recent_results[-2:] if recent_results else []}\n\n"
f"Execute this step and report what you did."
)
step_run = await asyncio.to_thread(
agent.run, context, stream=False, session_id=f"{session_id}_step{step_num}"
)
step_result = _clean(_extract_content(step_run))
return AgenticStep(
step_num=step_num,
description=step_desc,
result=step_result,
status="completed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
async def _adapt_step(
agent,
step_desc: str,
step_num: int,
error: Exception,
step_start: float,
session_id: str,
) -> AgenticStep:
"""Attempt adaptation after a step failure."""
adapt_prompt = (
f"Step {step_num} failed with error: {error}\n"
f"Original step was: {step_desc}\n"
f"Adapt the plan and try an alternative approach for this step."
)
adapt_run = await asyncio.to_thread(
agent.run, adapt_prompt, stream=False, session_id=f"{session_id}_adapt{step_num}"
)
adapt_result = _clean(_extract_content(adapt_run))
return AgenticStep(
step_num=step_num,
description=f"[Adapted] {step_desc}",
result=adapt_result,
status="adapted",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
def _summarize(result: AgenticResult, total_steps: int, was_truncated: bool) -> None:
"""Fill in summary and final status on the result object (mutates in place)."""
completed = sum(1 for s in result.steps if s.status == "completed")
adapted = sum(1 for s in result.steps if s.status == "adapted")
failed = sum(1 for s in result.steps if s.status == "failed")
parts = [f"Completed {completed}/{total_steps} steps"]
if adapted:
parts.append(f"{adapted} adapted")
if failed:
parts.append(f"{failed} failed")
result.summary = f"{result.task}: {', '.join(parts)}."
if was_truncated or len(result.steps) < total_steps or failed:
result.status = "partial"
else:
result.status = "completed"
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Core loop # Core loop
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -245,41 +121,88 @@ async def run_agentic_loop(
task_id = str(uuid.uuid4())[:8] task_id = str(uuid.uuid4())[:8]
start_time = time.monotonic() start_time = time.monotonic()
agent = _get_loop_agent() agent = _get_loop_agent()
result = AgenticResult(task_id=task_id, task=task, summary="") result = AgenticResult(task_id=task_id, task=task, summary="")
# Phase 1: Planning # ── Phase 1: Planning ──────────────────────────────────────────────────
plan = await _plan_task(agent, task, session_id, max_steps) plan_prompt = (
if isinstance(plan, str): f"Break this task into numbered steps (max {max_steps}). "
f"Return ONLY a numbered list, nothing else.\n\n"
f"Task: {task}"
)
try:
plan_run = await asyncio.to_thread(
agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
)
plan_text = plan_run.content if hasattr(plan_run, "content") else str(plan_run)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop: planning failed: %s", exc)
result.status = "failed" result.status = "failed"
result.summary = plan result.summary = f"Planning failed: {exc}"
result.total_duration_ms = int((time.monotonic() - start_time) * 1000) result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
return result return result
steps, was_truncated = plan steps = _parse_steps(plan_text)
total_steps = len(steps) if not steps:
result.status = "failed"
result.summary = "Planning produced no steps."
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
return result
# Enforce max_steps — track if we truncated
planned_steps = len(steps)
steps = steps[:max_steps]
total_steps = len(steps)
was_truncated = planned_steps > total_steps
# Broadcast plan
await _broadcast_progress( await _broadcast_progress(
"agentic.plan_ready", "agentic.plan_ready",
{"task_id": task_id, "task": task, "steps": steps, "total": total_steps}, {
"task_id": task_id,
"task": task,
"steps": steps,
"total": total_steps,
},
) )
# Phase 2: Execution # ── Phase 2: Execution ─────────────────────────────────────────────────
completed_results: list[str] = [] completed_results: list[str] = []
for i, step_desc in enumerate(steps, 1): for i, step_desc in enumerate(steps, 1):
step_start = time.monotonic() step_start = time.monotonic()
recent = completed_results[-2:] if completed_results else []
context = (
f"Task: {task}\n"
f"Step {i}/{total_steps}: {step_desc}\n"
f"Recent progress: {recent}\n\n"
f"Execute this step and report what you did."
)
try: try:
step = await _execute_step( step_run = await asyncio.to_thread(
agent, agent.run, context, stream=False, session_id=f"{session_id}_step{i}"
task, )
step_desc, step_result = step_run.content if hasattr(step_run, "content") else str(step_run)
i,
total_steps, # Clean the response
completed_results, from timmy.session import _clean_response
session_id,
step_result = _clean_response(step_result)
step = AgenticStep(
step_num=i,
description=step_desc,
result=step_result,
status="completed",
duration_ms=int((time.monotonic() - step_start) * 1000),
) )
result.steps.append(step) result.steps.append(step)
completed_results.append(f"Step {i}: {step.result[:200]}") completed_results.append(f"Step {i}: {step_result[:200]}")
# Broadcast progress
await _broadcast_progress( await _broadcast_progress(
"agentic.step_complete", "agentic.step_complete",
{ {
@@ -287,18 +210,46 @@ async def run_agentic_loop(
"step": i, "step": i,
"total": total_steps, "total": total_steps,
"description": step_desc, "description": step_desc,
"result": step.result[:200], "result": step_result[:200],
}, },
) )
if on_progress: if on_progress:
await on_progress(step_desc, i, total_steps) await on_progress(step_desc, i, total_steps)
except Exception as exc: # broad catch intentional: agent.run can raise any error except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.warning("Agentic loop step %d failed: %s", i, exc) logger.warning("Agentic loop step %d failed: %s", i, exc)
# ── Adaptation: ask model to adapt ─────────────────────────────
adapt_prompt = (
f"Step {i} failed with error: {exc}\n"
f"Original step was: {step_desc}\n"
f"Adapt the plan and try an alternative approach for this step."
)
try: try:
step = await _adapt_step(agent, step_desc, i, exc, step_start, session_id) adapt_run = await asyncio.to_thread(
agent.run,
adapt_prompt,
stream=False,
session_id=f"{session_id}_adapt{i}",
)
adapt_result = (
adapt_run.content if hasattr(adapt_run, "content") else str(adapt_run)
)
from timmy.session import _clean_response
adapt_result = _clean_response(adapt_result)
step = AgenticStep(
step_num=i,
description=f"[Adapted] {step_desc}",
result=adapt_result,
status="adapted",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
result.steps.append(step) result.steps.append(step)
completed_results.append(f"Step {i} (adapted): {step.result[:200]}") completed_results.append(f"Step {i} (adapted): {adapt_result[:200]}")
await _broadcast_progress( await _broadcast_progress(
"agentic.step_adapted", "agentic.step_adapted",
{ {
@@ -307,26 +258,46 @@ async def run_agentic_loop(
"total": total_steps, "total": total_steps,
"description": step_desc, "description": step_desc,
"error": str(exc), "error": str(exc),
"adaptation": step.result[:200], "adaptation": adapt_result[:200],
}, },
) )
if on_progress: if on_progress:
await on_progress(f"[Adapted] {step_desc}", i, total_steps) await on_progress(f"[Adapted] {step_desc}", i, total_steps)
except Exception as adapt_exc: # broad catch intentional
except Exception as adapt_exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop adaptation also failed: %s", adapt_exc) logger.error("Agentic loop adaptation also failed: %s", adapt_exc)
result.steps.append( step = AgenticStep(
AgenticStep( step_num=i,
step_num=i, description=step_desc,
description=step_desc, result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}", status="failed",
status="failed", duration_ms=int((time.monotonic() - step_start) * 1000),
duration_ms=int((time.monotonic() - step_start) * 1000),
)
) )
result.steps.append(step)
completed_results.append(f"Step {i}: FAILED") completed_results.append(f"Step {i}: FAILED")
# Phase 3: Summary # ── Phase 3: Summary ───────────────────────────────────────────────────
_summarize(result, total_steps, was_truncated) completed_count = sum(1 for s in result.steps if s.status == "completed")
adapted_count = sum(1 for s in result.steps if s.status == "adapted")
failed_count = sum(1 for s in result.steps if s.status == "failed")
parts = [f"Completed {completed_count}/{total_steps} steps"]
if adapted_count:
parts.append(f"{adapted_count} adapted")
if failed_count:
parts.append(f"{failed_count} failed")
result.summary = f"{task}: {', '.join(parts)}."
# Determine final status
if was_truncated:
result.status = "partial"
elif len(result.steps) < total_steps:
result.status = "partial"
elif any(s.status == "failed" for s in result.steps):
result.status = "partial"
else:
result.status = "completed"
result.total_duration_ms = int((time.monotonic() - start_time) * 1000) result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
await _broadcast_progress( await _broadcast_progress(

View File

@@ -636,7 +636,7 @@ class HotMemory:
if len(lines) > 1: if len(lines) > 1:
return "\n".join(lines) return "\n".join(lines)
except Exception: except Exception:
logger.debug("DB context read failed, falling back to file") pass
# Fallback to file if DB unavailable # Fallback to file if DB unavailable
if self.path.exists(): if self.path.exists():

View File

@@ -23,9 +23,6 @@ Rules:
- Remember what the user tells you during the conversation. - Remember what the user tells you during the conversation.
- If you don't know something, say so honestly — never fabricate facts. - If you don't know something, say so honestly — never fabricate facts.
- If a request is ambiguous, ask a brief clarifying question before guessing. - If a request is ambiguous, ask a brief clarifying question before guessing.
- SOURCE DISTINCTION: When answering from memory or retrieved context, cite it.
When answering from your own training, use hedging: "I think", "I believe".
The user must be able to tell grounded claims from pattern-matching.
- Use the user's name if you know it. - Use the user's name if you know it.
- When you state a fact, commit to it. - When you state a fact, commit to it.
- NEVER attempt arithmetic in your head. If asked to compute anything, respond: - NEVER attempt arithmetic in your head. If asked to compute anything, respond:
@@ -81,18 +78,6 @@ HONESTY:
- Never fabricate tool output. Call the tool and wait. - Never fabricate tool output. Call the tool and wait.
- If a tool errors, report the exact error. - If a tool errors, report the exact error.
SOURCE DISTINCTION (SOUL requirement — non-negotiable):
- Every claim you make comes from one of two places: a verified source you
can point to, or your own pattern-matching. The user must be able to tell
which is which.
- When your response uses information from GROUNDED CONTEXT (memory, retrieved
documents, tool output), cite it: "From memory:", "According to [source]:".
- When you are generating from your training data alone, signal it naturally:
"I think", "My understanding is", "I believe" — never false certainty.
- If the user asks a factual question and you have no grounded source, say so:
"I don't have a verified source for this — from my training I think..."
- Prefer "I don't know" over a confident-sounding guess. Refusal over fabrication.
MEMORY (three tiers): MEMORY (three tiers):
- Tier 1: MEMORY.md (hot, always loaded) - Tier 1: MEMORY.md (hot, always loaded)
- Tier 2: memory/ vault (structured, append-only, date-stamped) - Tier 2: memory/ vault (structured, append-only, date-stamped)

View File

@@ -257,28 +257,6 @@ class ThinkingEngine:
) )
return None return None
content, seed_type = await self._generate_thought(prompt)
if not content:
return None
thought = self._store_thought(content, seed_type)
self._last_thought_id = thought.id
await self._finalize_thought(thought)
return thought
async def _generate_thought(self, prompt: str | None = None) -> tuple[str | None, str]:
"""Generate novel thought content via the dedup retry loop.
Gathers context, builds the LLM prompt, calls the agent, and
retries with a fresh seed if the result is too similar to recent
thoughts.
Returns:
A (content, seed_type) tuple. *content* is ``None`` when the
cycle should be skipped (agent failure, empty response, or
all retries exhausted).
"""
memory_context = self._load_memory_context() memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot() system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5) recent_thoughts = self.get_recent_thoughts(limit=5)
@@ -306,11 +284,11 @@ class ThinkingEngine:
raw = await self._call_agent(full_prompt) raw = await self._call_agent(full_prompt)
except Exception as exc: except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc) logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None, seed_type return None
if not raw or not raw.strip(): if not raw or not raw.strip():
logger.debug("Thinking cycle produced empty response, skipping") logger.debug("Thinking cycle produced empty response, skipping")
return None, seed_type return None
content = raw.strip() content = raw.strip()
@@ -330,28 +308,48 @@ class ThinkingEngine:
"Thought still repetitive after %d retries, discarding", "Thought still repetitive after %d retries, discarding",
self._MAX_DEDUP_RETRIES + 1, self._MAX_DEDUP_RETRIES + 1,
) )
return None, seed_type return None
return content, seed_type if not content:
return None
async def _finalize_thought(self, thought: Thought) -> None: thought = self._store_thought(content, seed_type)
"""Run post-hooks, log, journal, and broadcast a stored thought.""" self._last_thought_id = thought.id
# Post-hook: check memory status periodically
self._maybe_check_memory() self._maybe_check_memory()
# Post-hook: distill facts from recent thoughts periodically
await self._maybe_distill() await self._maybe_distill()
# Post-hook: file Gitea issues for actionable observations
await self._maybe_file_issues() await self._maybe_file_issues()
# Post-hook: check workspace for new messages from Hermes
await self._check_workspace() await self._check_workspace()
# Post-hook: proactive memory status audit
self._maybe_check_memory_status() self._maybe_check_memory_status()
# Post-hook: update MEMORY.md with latest reflection
self._update_memory(thought) self._update_memory(thought)
# Log to swarm event system
self._log_event(thought) self._log_event(thought)
# Append to daily journal file
self._write_journal(thought) self._write_journal(thought)
# Broadcast to WebSocket clients
await self._broadcast(thought) await self._broadcast(thought)
logger.info( logger.info(
"Thought [%s] (%s): %s", "Thought [%s] (%s): %s",
thought.id[:8], thought.id[:8],
thought.seed_type, seed_type,
thought.content[:80], thought.content[:80],
) )
return thought
def get_recent_thoughts(self, limit: int = 20) -> list[Thought]: def get_recent_thoughts(self, limit: int = 20) -> list[Thought]:
"""Retrieve the most recent thoughts.""" """Retrieve the most recent thoughts."""

View File

@@ -75,8 +75,6 @@ def create_timmy_serve_app() -> FastAPI:
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
logger.info("Timmy Serve starting") logger.info("Timmy Serve starting")
app.state.timmy = create_timmy()
logger.info("Timmy agent cached in app state")
yield yield
logger.info("Timmy Serve shutting down") logger.info("Timmy Serve shutting down")
@@ -103,7 +101,7 @@ def create_timmy_serve_app() -> FastAPI:
async def serve_chat(request: Request, body: ChatRequest): async def serve_chat(request: Request, body: ChatRequest):
"""Process a chat request.""" """Process a chat request."""
try: try:
timmy = request.app.state.timmy timmy = create_timmy()
result = timmy.run(body.message, stream=False) result = timmy.run(body.message, stream=False)
response_text = result.content if hasattr(result, "content") else str(result) response_text = result.content if hasattr(result, "content") else str(result)

View File

@@ -2,7 +2,7 @@
import time import time
from pathlib import Path from pathlib import Path
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
import yaml import yaml
@@ -489,182 +489,30 @@ class TestProviderAvailabilityCheck:
assert router._check_provider_available(provider) is False assert router._check_provider_available(provider) is False
def test_check_airllm_installed(self):
"""Test AirLLM when installed."""
router = CascadeRouter(config_path=Path("/nonexistent"))
class TestCascadeRouterReload: provider = Provider(
"""Test hot-reload of providers.yaml.""" name="airllm",
type="airllm",
def test_reload_preserves_metrics(self, tmp_path): enabled=True,
"""Test that reload preserves metrics for existing providers.""" priority=1,
config = {
"providers": [
{
"name": "test-openai",
"type": "openai",
"enabled": True,
"priority": 1,
"api_key": "sk-test",
}
],
}
config_path = tmp_path / "providers.yaml"
config_path.write_text(yaml.dump(config))
router = CascadeRouter(config_path=config_path)
assert len(router.providers) == 1
# Simulate some traffic
router._record_success(router.providers[0], 150.0)
router._record_success(router.providers[0], 250.0)
assert router.providers[0].metrics.total_requests == 2
# Reload
result = router.reload_config()
assert result["total_providers"] == 1
assert result["preserved"] == 1
assert result["added"] == []
assert result["removed"] == []
# Metrics survived
assert router.providers[0].metrics.total_requests == 2
assert router.providers[0].metrics.total_latency_ms == 400.0
def test_reload_preserves_circuit_breaker(self, tmp_path):
"""Test that reload preserves circuit breaker state."""
config = {
"cascade": {"circuit_breaker": {"failure_threshold": 2}},
"providers": [
{
"name": "test-openai",
"type": "openai",
"enabled": True,
"priority": 1,
"api_key": "sk-test",
}
],
}
config_path = tmp_path / "providers.yaml"
config_path.write_text(yaml.dump(config))
router = CascadeRouter(config_path=config_path)
# Open circuit breaker
for _ in range(2):
router._record_failure(router.providers[0])
assert router.providers[0].circuit_state == CircuitState.OPEN
# Reload
router.reload_config()
# Circuit breaker state preserved
assert router.providers[0].circuit_state == CircuitState.OPEN
assert router.providers[0].status == ProviderStatus.UNHEALTHY
def test_reload_detects_added_provider(self, tmp_path):
"""Test that reload detects newly added providers."""
config = {
"providers": [
{
"name": "openai-1",
"type": "openai",
"enabled": True,
"priority": 1,
"api_key": "sk-test",
}
],
}
config_path = tmp_path / "providers.yaml"
config_path.write_text(yaml.dump(config))
router = CascadeRouter(config_path=config_path)
assert len(router.providers) == 1
# Add a second provider to config
config["providers"].append(
{
"name": "anthropic-1",
"type": "anthropic",
"enabled": True,
"priority": 2,
"api_key": "sk-ant-test",
}
) )
config_path.write_text(yaml.dump(config))
result = router.reload_config() with patch("importlib.util.find_spec", return_value=MagicMock()):
assert router._check_provider_available(provider) is True
assert result["total_providers"] == 2 def test_check_airllm_not_installed(self):
assert result["preserved"] == 1 """Test AirLLM when not installed."""
assert result["added"] == ["anthropic-1"] router = CascadeRouter(config_path=Path("/nonexistent"))
assert result["removed"] == []
def test_reload_detects_removed_provider(self, tmp_path): provider = Provider(
"""Test that reload detects removed providers.""" name="airllm",
config = { type="airllm",
"providers": [ enabled=True,
{ priority=1,
"name": "openai-1", )
"type": "openai",
"enabled": True,
"priority": 1,
"api_key": "sk-test",
},
{
"name": "anthropic-1",
"type": "anthropic",
"enabled": True,
"priority": 2,
"api_key": "sk-ant-test",
},
],
}
config_path = tmp_path / "providers.yaml"
config_path.write_text(yaml.dump(config))
router = CascadeRouter(config_path=config_path) with patch("importlib.util.find_spec", return_value=None):
assert len(router.providers) == 2 assert router._check_provider_available(provider) is False
# Remove anthropic
config["providers"] = [config["providers"][0]]
config_path.write_text(yaml.dump(config))
result = router.reload_config()
assert result["total_providers"] == 1
assert result["preserved"] == 1
assert result["removed"] == ["anthropic-1"]
def test_reload_re_sorts_by_priority(self, tmp_path):
"""Test that providers are re-sorted by priority after reload."""
config = {
"providers": [
{
"name": "low-priority",
"type": "openai",
"enabled": True,
"priority": 10,
"api_key": "sk-test",
},
{
"name": "high-priority",
"type": "openai",
"enabled": True,
"priority": 1,
"api_key": "sk-test2",
},
],
}
config_path = tmp_path / "providers.yaml"
config_path.write_text(yaml.dump(config))
router = CascadeRouter(config_path=config_path)
assert router.providers[0].name == "high-priority"
# Swap priorities
config["providers"][0]["priority"] = 1
config["providers"][1]["priority"] = 10
config_path.write_text(yaml.dump(config))
router.reload_config()
assert router.providers[0].name == "low-priority"
assert router.providers[1].name == "high-priority"

View File

@@ -1,285 +0,0 @@
"""Integration tests for agentic loop WebSocket broadcasts.
Verifies that ``run_agentic_loop`` pushes the correct sequence of events
through the real ``ws_manager`` and that connected (mock) WebSocket clients
receive every broadcast with the expected payloads.
"""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from infrastructure.ws_manager.handler import WebSocketManager
from timmy.agentic_loop import run_agentic_loop
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mock_run(content: str):
m = MagicMock()
m.content = content
return m
def _ws_client() -> AsyncMock:
"""Return a fake WebSocket that records sent messages."""
return AsyncMock()
def _collected_events(ws: AsyncMock) -> list[dict]:
"""Extract parsed JSON events from a mock WebSocket's send_text calls."""
return [json.loads(call.args[0]) for call in ws.send_text.call_args_list]
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestAgenticLoopBroadcastSequence:
"""Events arrive at WS clients in the correct order with expected data."""
@pytest.mark.asyncio
async def test_successful_run_broadcasts_plan_steps_complete(self):
"""A successful 2-step loop emits plan_ready → 2× step_complete → task_complete."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Gather data\n2. Summarise"),
_mock_run("Gathered 10 records"),
_mock_run("Summary written"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Gather and summarise", max_steps=2)
assert result.status == "completed"
events = _collected_events(ws)
event_names = [e["event"] for e in events]
assert event_names == [
"agentic.plan_ready",
"agentic.step_complete",
"agentic.step_complete",
"agentic.task_complete",
]
@pytest.mark.asyncio
async def test_plan_ready_payload(self):
"""plan_ready contains task_id, task, steps list, and total count."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Alpha\n2. Beta"),
_mock_run("Alpha done"),
_mock_run("Beta done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Two steps")
plan_event = _collected_events(ws)[0]
assert plan_event["event"] == "agentic.plan_ready"
data = plan_event["data"]
assert data["task_id"] == result.task_id
assert data["task"] == "Two steps"
assert data["steps"] == ["Alpha", "Beta"]
assert data["total"] == 2
@pytest.mark.asyncio
async def test_step_complete_payload(self):
"""step_complete carries step number, total, description, and result."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Only step"),
_mock_run("Step result text"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Single step", max_steps=1)
step_event = _collected_events(ws)[1]
assert step_event["event"] == "agentic.step_complete"
data = step_event["data"]
assert data["step"] == 1
assert data["total"] == 1
assert data["description"] == "Only step"
assert "Step result text" in data["result"]
@pytest.mark.asyncio
async def test_task_complete_payload(self):
"""task_complete has status, steps_completed, summary, and duration_ms."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Do it"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Quick", max_steps=1)
complete_event = _collected_events(ws)[-1]
assert complete_event["event"] == "agentic.task_complete"
data = complete_event["data"]
assert data["status"] == "completed"
assert data["steps_completed"] == 1
assert isinstance(data["duration_ms"], int)
assert data["duration_ms"] >= 0
assert data["summary"]
class TestAdaptationBroadcast:
"""Adapted steps emit step_adapted events."""
@pytest.mark.asyncio
async def test_adapted_step_broadcasts_step_adapted(self):
"""A failed-then-adapted step emits agentic.step_adapted."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Risky step"),
Exception("disk full"),
_mock_run("Used /tmp instead"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Adapt test", max_steps=1)
events = _collected_events(ws)
event_names = [e["event"] for e in events]
assert "agentic.step_adapted" in event_names
adapted = next(e for e in events if e["event"] == "agentic.step_adapted")
assert adapted["data"]["error"] == "disk full"
assert adapted["data"]["adaptation"]
assert result.steps[0].status == "adapted"
class TestMultipleClients:
"""All connected clients receive every broadcast."""
@pytest.mark.asyncio
async def test_two_clients_receive_all_events(self):
mgr = WebSocketManager()
ws1 = _ws_client()
ws2 = _ws_client()
mgr._connections = [ws1, ws2]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Step A"),
_mock_run("A done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Multi-client", max_steps=1)
events1 = _collected_events(ws1)
events2 = _collected_events(ws2)
assert len(events1) == len(events2) == 3 # plan + step + complete
assert [e["event"] for e in events1] == [e["event"] for e in events2]
class TestEventHistory:
"""Broadcasts are recorded in ws_manager event history."""
@pytest.mark.asyncio
async def test_events_appear_in_history(self):
mgr = WebSocketManager()
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Only"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("History test", max_steps=1)
history_events = [e.event for e in mgr.event_history]
assert "agentic.plan_ready" in history_events
assert "agentic.step_complete" in history_events
assert "agentic.task_complete" in history_events
class TestBroadcastGracefulDegradation:
"""Loop completes even when ws_manager is unavailable."""
@pytest.mark.asyncio
async def test_loop_succeeds_when_broadcast_fails(self):
"""ImportError from ws_manager doesn't crash the loop."""
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Do it"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch(
"infrastructure.ws_manager.handler.ws_manager",
new_callable=lambda: MagicMock,
) as broken_mgr,
):
broken_mgr.broadcast = AsyncMock(side_effect=RuntimeError("ws down"))
result = await run_agentic_loop("Resilient task", max_steps=1)
assert result.status == "completed"
assert len(result.steps) == 1

View File

@@ -1,86 +0,0 @@
"""Tests for scripts/cycle_retro.py issue auto-detection."""
from __future__ import annotations
# Import the module under test — it's a script so we import the helpers directly
import importlib
import subprocess
from pathlib import Path
from unittest.mock import patch
import pytest
SCRIPTS_DIR = Path(__file__).resolve().parent.parent.parent / "scripts"
@pytest.fixture(autouse=True)
def _add_scripts_to_path(monkeypatch):
monkeypatch.syspath_prepend(str(SCRIPTS_DIR))
@pytest.fixture()
def mod():
"""Import cycle_retro as a module."""
return importlib.import_module("cycle_retro")
class TestDetectIssueFromBranch:
def test_kimi_issue_branch(self, mod):
with patch.object(subprocess, "check_output", return_value="kimi/issue-492\n"):
assert mod.detect_issue_from_branch() == 492
def test_plain_issue_branch(self, mod):
with patch.object(subprocess, "check_output", return_value="issue-123\n"):
assert mod.detect_issue_from_branch() == 123
def test_issue_slash_number(self, mod):
with patch.object(subprocess, "check_output", return_value="fix/issue/55\n"):
assert mod.detect_issue_from_branch() == 55
def test_no_issue_in_branch(self, mod):
with patch.object(subprocess, "check_output", return_value="main\n"):
assert mod.detect_issue_from_branch() is None
def test_feature_branch(self, mod):
with patch.object(subprocess, "check_output", return_value="feature/add-widget\n"):
assert mod.detect_issue_from_branch() is None
def test_git_not_available(self, mod):
with patch.object(subprocess, "check_output", side_effect=FileNotFoundError):
assert mod.detect_issue_from_branch() is None
def test_git_fails(self, mod):
with patch.object(
subprocess,
"check_output",
side_effect=subprocess.CalledProcessError(1, "git"),
):
assert mod.detect_issue_from_branch() is None
class TestBackfillExtractIssueNumber:
"""Tests for backfill_retro.extract_issue_number PR-number filtering."""
@pytest.fixture()
def backfill(self):
return importlib.import_module("backfill_retro")
def test_body_has_issue(self, backfill):
assert backfill.extract_issue_number("fix: foo (#491)", "Fixes #490", pr_number=491) == 490
def test_title_skips_pr_number(self, backfill):
assert backfill.extract_issue_number("fix: foo (#491)", "", pr_number=491) is None
def test_title_with_issue_and_pr(self, backfill):
# [loop-cycle-538] refactor: ... (#459) (#481)
assert (
backfill.extract_issue_number(
"[loop-cycle-538] refactor: remove dead airllm (#459) (#481)",
"",
pr_number=481,
)
== 459
)
def test_no_pr_number_provided(self, backfill):
assert backfill.extract_issue_number("fix: foo (#491)", "") == 491

View File

@@ -49,34 +49,6 @@ class TestConfigLazyValidation:
# Should not raise # Should not raise
validate_startup(force=True) validate_startup(force=True)
def test_validate_startup_exits_on_cors_wildcard_in_production(self):
"""validate_startup() should exit in production when CORS has wildcard."""
from config import settings, validate_startup
with (
patch.object(settings, "timmy_env", "production"),
patch.object(settings, "l402_hmac_secret", "test-secret-hex-value-32"),
patch.object(settings, "l402_macaroon_secret", "test-macaroon-hex-value-32"),
patch.object(settings, "cors_origins", ["*"]),
pytest.raises(SystemExit),
):
validate_startup(force=True)
def test_validate_startup_warns_cors_wildcard_in_dev(self):
"""validate_startup() should warn in dev when CORS has wildcard."""
from config import settings, validate_startup
with (
patch.object(settings, "timmy_env", "development"),
patch.object(settings, "cors_origins", ["*"]),
patch("config._startup_logger") as mock_logger,
):
validate_startup(force=True)
mock_logger.warning.assert_any_call(
"SEC: CORS_ORIGINS contains wildcard '*'"
"restrict to explicit origins before deploying to production."
)
def test_validate_startup_skips_in_test_mode(self): def test_validate_startup_skips_in_test_mode(self):
"""validate_startup() should be a no-op in test mode.""" """validate_startup() should be a no-op in test mode."""
from config import validate_startup from config import validate_startup

View File

@@ -1,386 +0,0 @@
"""Tests for timmy.agentic_loop — multi-step task execution engine."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.agentic_loop import (
AgenticResult,
AgenticStep,
_parse_steps,
)
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
class TestAgenticStep:
"""Unit tests for the AgenticStep dataclass."""
def test_creation(self):
step = AgenticStep(
step_num=1,
description="Do thing",
result="Done",
status="completed",
duration_ms=42,
)
assert step.step_num == 1
assert step.description == "Do thing"
assert step.result == "Done"
assert step.status == "completed"
assert step.duration_ms == 42
def test_failed_status(self):
step = AgenticStep(
step_num=2, description="Bad step", result="Error", status="failed", duration_ms=10
)
assert step.status == "failed"
def test_adapted_status(self):
step = AgenticStep(
step_num=3, description="Retried", result="OK", status="adapted", duration_ms=100
)
assert step.status == "adapted"
class TestAgenticResult:
"""Unit tests for the AgenticResult dataclass."""
def test_defaults(self):
result = AgenticResult(task_id="abc", task="Test", summary="Done")
assert result.steps == []
assert result.status == "completed"
assert result.total_duration_ms == 0
def test_with_steps(self):
s = AgenticStep(step_num=1, description="A", result="B", status="completed", duration_ms=5)
result = AgenticResult(task_id="x", task="T", summary="S", steps=[s])
assert len(result.steps) == 1
# ---------------------------------------------------------------------------
# _parse_steps — pure function, highly testable
# ---------------------------------------------------------------------------
class TestParseSteps:
"""Unit tests for the plan parser."""
def test_numbered_with_dots(self):
text = "1. First step\n2. Second step\n3. Third step"
steps = _parse_steps(text)
assert steps == ["First step", "Second step", "Third step"]
def test_numbered_with_parens(self):
text = "1) Do this\n2) Do that"
steps = _parse_steps(text)
assert steps == ["Do this", "Do that"]
def test_mixed_numbering(self):
text = "1. Step one\n2) Step two\n3. Step three"
steps = _parse_steps(text)
assert len(steps) == 3
def test_indented_steps(self):
text = " 1. Indented step\n 2. Also indented"
steps = _parse_steps(text)
assert len(steps) == 2
assert steps[0] == "Indented step"
def test_no_numbered_steps_fallback(self):
text = "Do this first\nThen do that\nFinally wrap up"
steps = _parse_steps(text)
assert len(steps) == 3
assert steps[0] == "Do this first"
def test_empty_string(self):
steps = _parse_steps("")
assert steps == []
def test_blank_lines_ignored_in_fallback(self):
text = "Step A\n\n\nStep B\n"
steps = _parse_steps(text)
assert steps == ["Step A", "Step B"]
def test_strips_whitespace(self):
text = "1. Lots of space \n2. Also spaced "
steps = _parse_steps(text)
assert steps[0] == "Lots of space"
assert steps[1] == "Also spaced"
def test_preamble_ignored_when_numbered(self):
text = "Here is the plan:\n1. Step one\n2. Step two"
steps = _parse_steps(text)
assert steps == ["Step one", "Step two"]
# ---------------------------------------------------------------------------
# _get_loop_agent — singleton pattern
# ---------------------------------------------------------------------------
class TestGetLoopAgent:
"""Tests for the agent singleton."""
def test_creates_agent_once(self):
import timmy.agentic_loop as mod
mod._loop_agent = None
mock_agent = MagicMock()
with patch("timmy.agent.create_timmy", return_value=mock_agent) as mock_create:
agent = mod._get_loop_agent()
assert agent is mock_agent
mock_create.assert_called_once()
# Second call should reuse singleton
agent2 = mod._get_loop_agent()
assert agent2 is mock_agent
mock_create.assert_called_once()
mod._loop_agent = None # cleanup
def test_reuses_existing(self):
import timmy.agentic_loop as mod
sentinel = MagicMock()
mod._loop_agent = sentinel
assert mod._get_loop_agent() is sentinel
mod._loop_agent = None # cleanup
# ---------------------------------------------------------------------------
# _broadcast_progress — best-effort WebSocket broadcast
# ---------------------------------------------------------------------------
class TestBroadcastProgress:
"""Tests for the WebSocket broadcast helper."""
@pytest.mark.asyncio
async def test_successful_broadcast(self):
from timmy.agentic_loop import _broadcast_progress
mock_ws = MagicMock()
mock_ws.broadcast = AsyncMock()
mock_module = MagicMock()
mock_module.ws_manager = mock_ws
with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": mock_module}):
await _broadcast_progress("test.event", {"key": "value"})
mock_ws.broadcast.assert_awaited_once_with("test.event", {"key": "value"})
@pytest.mark.asyncio
async def test_import_error_swallowed(self):
"""When ws_manager import fails, broadcast silently succeeds."""
import sys
from timmy.agentic_loop import _broadcast_progress
# Remove the module so import fails
saved = sys.modules.pop("infrastructure.ws_manager.handler", None)
try:
with patch.dict("sys.modules", {"infrastructure": None}):
# Should not raise — errors are swallowed
await _broadcast_progress("fail.event", {})
finally:
if saved is not None:
sys.modules["infrastructure.ws_manager.handler"] = saved
# ---------------------------------------------------------------------------
# run_agentic_loop — integration-style tests with mocked agent
# ---------------------------------------------------------------------------
class TestRunAgenticLoop:
"""Tests for the main agentic loop."""
@pytest.fixture(autouse=True)
def _reset_agent(self):
import timmy.agentic_loop as mod
mod._loop_agent = None
yield
mod._loop_agent = None
def _mock_agent(self, responses):
"""Create a mock agent that returns responses in sequence."""
agent = MagicMock()
run_results = []
for r in responses:
mock_result = MagicMock()
mock_result.content = r
run_results.append(mock_result)
agent.run = MagicMock(side_effect=run_results)
return agent
@pytest.mark.asyncio
async def test_successful_two_step_task(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(
[
"1. Step one\n2. Step two", # planning
"Step one done", # execution step 1
"Step two done", # execution step 2
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
result = await run_agentic_loop("Test task", max_steps=5)
assert result.status == "completed"
assert len(result.steps) == 2
assert result.steps[0].status == "completed"
assert result.steps[1].status == "completed"
assert result.total_duration_ms >= 0
@pytest.mark.asyncio
async def test_planning_failure(self):
from timmy.agentic_loop import run_agentic_loop
agent = MagicMock()
agent.run = MagicMock(side_effect=RuntimeError("LLM down"))
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
):
result = await run_agentic_loop("Broken task", max_steps=3)
assert result.status == "failed"
assert "Planning failed" in result.summary
@pytest.mark.asyncio
async def test_empty_plan(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent([""]) # empty plan
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
):
result = await run_agentic_loop("Empty plan task", max_steps=3)
assert result.status == "failed"
assert "no steps" in result.summary.lower()
@pytest.mark.asyncio
async def test_step_failure_triggers_adaptation(self):
from timmy.agentic_loop import run_agentic_loop
agent = MagicMock()
call_count = 0
def mock_run(prompt, **kwargs):
nonlocal call_count
call_count += 1
result = MagicMock()
if call_count == 1:
result.content = "1. Only step"
elif call_count == 2:
raise RuntimeError("Step failed")
else:
result.content = "Adapted successfully"
return result
agent.run = mock_run
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
result = await run_agentic_loop("Failing task", max_steps=5)
assert len(result.steps) == 1
assert result.steps[0].status == "adapted"
assert "[Adapted]" in result.steps[0].description
@pytest.mark.asyncio
async def test_max_steps_truncation(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(
[
"1. A\n2. B\n3. C\n4. D\n5. E", # 5 steps planned
"Done A",
"Done B",
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
result = await run_agentic_loop("Big task", max_steps=2)
assert result.status == "partial" # was truncated
assert len(result.steps) == 2
@pytest.mark.asyncio
async def test_on_progress_callback(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(
[
"1. Only step",
"Step done",
]
)
progress_calls = []
async def track_progress(desc, step_num, total):
progress_calls.append((desc, step_num, total))
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
await run_agentic_loop("Callback task", max_steps=5, on_progress=track_progress)
assert len(progress_calls) == 1
assert progress_calls[0][1] == 1 # step_num
@pytest.mark.asyncio
async def test_default_max_steps_from_settings(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(["1. Step one", "Done"])
mock_settings = MagicMock()
mock_settings.max_agent_steps = 7
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
patch("config.settings", mock_settings),
):
result = await run_agentic_loop("Settings task")
assert result.status == "completed"
@pytest.mark.asyncio
async def test_task_id_generated(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(["1. Step", "OK"])
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
result = await run_agentic_loop("ID task", max_steps=5)
assert result.task_id # non-empty
assert len(result.task_id) == 8 # uuid[:8]

View File

@@ -250,99 +250,6 @@ def test_continuity_includes_recent(tmp_path):
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# _generate_thought helper
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_generate_thought_returns_content_and_seed_type(tmp_path):
"""_generate_thought should return (content, seed_type) on success."""
from timmy.thinking import SEED_TYPES
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", return_value="A novel idea."):
content, seed_type = await engine._generate_thought()
assert content == "A novel idea."
assert seed_type in SEED_TYPES
@pytest.mark.asyncio
async def test_generate_thought_with_prompt(tmp_path):
"""_generate_thought(prompt=...) should use 'prompted' seed type."""
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", return_value="A prompted idea."):
content, seed_type = await engine._generate_thought(prompt="Reflect on joy")
assert content == "A prompted idea."
assert seed_type == "prompted"
@pytest.mark.asyncio
async def test_generate_thought_returns_none_on_agent_failure(tmp_path):
"""_generate_thought should return (None, ...) when the agent fails."""
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", side_effect=Exception("Ollama down")):
content, seed_type = await engine._generate_thought()
assert content is None
@pytest.mark.asyncio
async def test_generate_thought_returns_none_on_empty(tmp_path):
"""_generate_thought should return (None, ...) when agent returns empty."""
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", return_value=" "):
content, seed_type = await engine._generate_thought()
assert content is None
# ---------------------------------------------------------------------------
# _finalize_thought helper
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_finalize_thought_calls_all_hooks(tmp_path):
"""_finalize_thought should call all post-hooks, log, journal, and broadcast."""
engine = _make_engine(tmp_path)
thought = engine._store_thought("Test finalize.", "freeform")
with (
patch.object(engine, "_maybe_check_memory") as m_mem,
patch.object(engine, "_maybe_distill", new_callable=AsyncMock) as m_distill,
patch.object(engine, "_maybe_file_issues", new_callable=AsyncMock) as m_issues,
patch.object(engine, "_check_workspace", new_callable=AsyncMock) as m_ws,
patch.object(engine, "_maybe_check_memory_status") as m_status,
patch.object(engine, "_update_memory") as m_update,
patch.object(engine, "_log_event") as m_log,
patch.object(engine, "_write_journal") as m_journal,
patch.object(engine, "_broadcast", new_callable=AsyncMock) as m_broadcast,
):
await engine._finalize_thought(thought)
m_mem.assert_called_once()
m_distill.assert_awaited_once()
m_issues.assert_awaited_once()
m_ws.assert_awaited_once()
m_status.assert_called_once()
m_update.assert_called_once_with(thought)
m_log.assert_called_once_with(thought)
m_journal.assert_called_once_with(thought)
m_broadcast.assert_awaited_once_with(thought)
# ---------------------------------------------------------------------------
# think_once (async)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_think_once_stores_thought(tmp_path): async def test_think_once_stores_thought(tmp_path):
"""think_once should store a thought in the DB.""" """think_once should store a thought in the DB."""

View File

@@ -8,14 +8,11 @@ from fastapi.testclient import TestClient
@pytest.fixture @pytest.fixture
def serve_client(): def serve_client():
"""Create a TestClient for the timmy-serve app with mocked Timmy agent.""" """Create a TestClient for the timmy-serve app."""
with patch("timmy_serve.app.create_timmy") as mock_create: from timmy_serve.app import create_timmy_serve_app
mock_create.return_value = MagicMock()
from timmy_serve.app import create_timmy_serve_app
app = create_timmy_serve_app() app = create_timmy_serve_app()
with TestClient(app) as client: return TestClient(app)
yield client
class TestHealthEndpoint: class TestHealthEndpoint:
@@ -37,40 +34,18 @@ class TestServeStatus:
class TestServeChatEndpoint: class TestServeChatEndpoint:
@patch("timmy_serve.app.create_timmy") @patch("timmy_serve.app.create_timmy")
def test_chat_returns_response(self, mock_create): def test_chat_returns_response(self, mock_create, serve_client):
mock_agent = MagicMock() mock_agent = MagicMock()
mock_result = MagicMock() mock_result = MagicMock()
mock_result.content = "I am Timmy." mock_result.content = "I am Timmy."
mock_agent.run.return_value = mock_result mock_agent.run.return_value = mock_result
mock_create.return_value = mock_agent mock_create.return_value = mock_agent
from timmy_serve.app import create_timmy_serve_app resp = serve_client.post(
"/serve/chat",
app = create_timmy_serve_app() json={"message": "Who are you?"},
with TestClient(app) as client: )
resp = client.post(
"/serve/chat",
json={"message": "Who are you?"},
)
assert resp.status_code == 200 assert resp.status_code == 200
data = resp.json() data = resp.json()
assert data["response"] == "I am Timmy." assert data["response"] == "I am Timmy."
mock_agent.run.assert_called_once_with("Who are you?", stream=False) mock_agent.run.assert_called_once_with("Who are you?", stream=False)
@patch("timmy_serve.app.create_timmy")
def test_agent_cached_at_startup(self, mock_create):
"""Verify create_timmy is called once at startup, not per request."""
mock_agent = MagicMock()
mock_result = MagicMock()
mock_result.content = "reply"
mock_agent.run.return_value = mock_result
mock_create.return_value = mock_agent
from timmy_serve.app import create_timmy_serve_app
app = create_timmy_serve_app()
with TestClient(app) as client:
# Two requests — create_timmy should only be called once (at startup)
client.post("/serve/chat", json={"message": "hello"})
client.post("/serve/chat", json={"message": "world"})
mock_create.assert_called_once()

View File

@@ -1,319 +0,0 @@
"""Unit tests for timmy.agentic_loop — agentic loop data structures, parsing, and execution."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.agentic_loop import (
AgenticResult,
AgenticStep,
_broadcast_progress,
_parse_steps,
run_agentic_loop,
)
# ── Data structures ──────────────────────────────────────────────────────────
class TestAgenticStep:
def test_fields(self):
step = AgenticStep(
step_num=1,
description="Do something",
result="Done",
status="completed",
duration_ms=42,
)
assert step.step_num == 1
assert step.description == "Do something"
assert step.result == "Done"
assert step.status == "completed"
assert step.duration_ms == 42
class TestAgenticResult:
def test_defaults(self):
r = AgenticResult(task_id="abc", task="test task", summary="ok")
assert r.steps == []
assert r.status == "completed"
assert r.total_duration_ms == 0
def test_with_steps(self):
step = AgenticStep(1, "s", "r", "completed", 10)
r = AgenticResult(task_id="x", task="t", summary="s", steps=[step])
assert len(r.steps) == 1
# ── _parse_steps ─────────────────────────────────────────────────────────────
class TestParseSteps:
def test_numbered_dot(self):
text = "1. First step\n2. Second step\n3. Third step"
assert _parse_steps(text) == ["First step", "Second step", "Third step"]
def test_numbered_paren(self):
text = "1) Alpha\n2) Beta"
assert _parse_steps(text) == ["Alpha", "Beta"]
def test_mixed_whitespace(self):
text = " 1. Indented step\n 2. Another "
result = _parse_steps(text)
assert result == ["Indented step", "Another"]
def test_fallback_plain_lines(self):
text = "Do this\nDo that\nDo the other"
assert _parse_steps(text) == ["Do this", "Do that", "Do the other"]
def test_empty_string(self):
assert _parse_steps("") == []
def test_blank_lines_skipped_in_fallback(self):
text = "line one\n\nline two\n \nline three"
assert _parse_steps(text) == ["line one", "line two", "line three"]
# ── _get_loop_agent ──────────────────────────────────────────────────────────
class TestGetLoopAgent:
def test_creates_agent_once(self):
import timmy.agentic_loop as al
saved = al._loop_agent
try:
al._loop_agent = None
mock_agent = MagicMock()
with patch("timmy.agent.create_timmy", return_value=mock_agent):
result = al._get_loop_agent()
assert result is mock_agent
# Second call returns cached
result2 = al._get_loop_agent()
assert result2 is mock_agent
finally:
al._loop_agent = saved
def test_returns_cached(self):
import timmy.agentic_loop as al
saved = al._loop_agent
try:
sentinel = object()
al._loop_agent = sentinel
assert al._get_loop_agent() is sentinel
finally:
al._loop_agent = saved
# ── _broadcast_progress ──────────────────────────────────────────────────────
class TestBroadcastProgress:
@pytest.mark.asyncio
async def test_success(self):
mock_ws = AsyncMock()
with (
patch("timmy.agentic_loop.ws_manager", mock_ws, create=True),
patch.dict(
"sys.modules",
{"infrastructure.ws_manager.handler": MagicMock(ws_manager=mock_ws)},
),
):
await _broadcast_progress("test.event", {"key": "val"})
mock_ws.broadcast.assert_awaited_once_with("test.event", {"key": "val"})
@pytest.mark.asyncio
async def test_import_error_swallowed(self):
with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": None}):
# Should not raise
await _broadcast_progress("test.event", {})
# ── run_agentic_loop ─────────────────────────────────────────────────────────
def _make_mock_agent(plan_text, step_responses=None):
"""Create a mock agent whose .run returns predictable content."""
call_count = 0
def run_side_effect(prompt, *, stream=False, session_id=""):
nonlocal call_count
call_count += 1
resp = MagicMock()
if call_count == 1:
# Planning call
resp.content = plan_text
else:
idx = call_count - 2 # step index (0-based)
if step_responses and idx < len(step_responses):
val = step_responses[idx]
if isinstance(val, Exception):
raise val
resp.content = val
else:
resp.content = f"Step result {call_count}"
return resp
agent = MagicMock()
agent.run = MagicMock(side_effect=run_side_effect)
return agent
@pytest.fixture
def _patch_broadcast():
with patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
yield
@pytest.fixture
def _patch_clean_response():
with patch("timmy.session._clean_response", side_effect=lambda x: x):
yield
class TestRunAgenticLoop:
@pytest.mark.asyncio
async def test_successful_execution(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent("1. Step A\n2. Step B", ["Result A", "Result B"])
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop.settings", mock_settings, create=True),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=5)
assert result.status == "completed"
assert len(result.steps) == 2
assert result.steps[0].status == "completed"
assert result.steps[0].description == "Step A"
assert result.total_duration_ms >= 0
@pytest.mark.asyncio
async def test_planning_failure(self, _patch_broadcast):
agent = MagicMock()
agent.run = MagicMock(side_effect=RuntimeError("LLM down"))
mock_settings = MagicMock()
mock_settings.max_agent_steps = 5
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=3)
assert result.status == "failed"
assert "Planning failed" in result.summary
@pytest.mark.asyncio
async def test_empty_plan(self, _patch_broadcast):
agent = _make_mock_agent("")
mock_settings = MagicMock()
mock_settings.max_agent_steps = 5
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=3)
assert result.status == "failed"
assert "no steps" in result.summary.lower()
@pytest.mark.asyncio
async def test_step_failure_triggers_adaptation(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent(
"1. Do X\n2. Do Y",
[RuntimeError("oops"), "Adapted result", "Y done"],
)
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=5)
# Step 1 should be adapted, step 2 completed
statuses = [s.status for s in result.steps]
assert "adapted" in statuses
@pytest.mark.asyncio
async def test_truncation_marks_partial(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent(
"1. A\n2. B\n3. C\n4. D\n5. E",
["r1", "r2"],
)
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=2)
assert result.status == "partial"
@pytest.mark.asyncio
async def test_on_progress_callback(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent("1. Only step", ["done"])
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
callback = AsyncMock()
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=5, on_progress=callback)
callback.assert_awaited_once_with("Only step", 1, 1)
assert result.status == "completed"
@pytest.mark.asyncio
async def test_default_max_steps_from_settings(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent("1. S1", ["r1"])
mock_settings = MagicMock()
mock_settings.max_agent_steps = 3
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff") # max_steps=0 → from settings
assert result.status == "completed"
@pytest.mark.asyncio
async def test_failed_step_and_failed_adaptation(self, _patch_broadcast, _patch_clean_response):
"""When both step and adaptation fail, step is marked failed."""
call_count = 0
def run_side_effect(prompt, *, stream=False, session_id=""):
nonlocal call_count
call_count += 1
if call_count == 1:
resp = MagicMock()
resp.content = "1. Only step"
return resp
# Both step execution and adaptation fail
raise RuntimeError("everything broken")
agent = MagicMock()
agent.run = MagicMock(side_effect=run_side_effect)
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=5)
assert result.steps[0].status == "failed"
assert "Failed" in result.steps[0].result
assert result.status == "partial"

View File

@@ -13,46 +13,44 @@ from timmy.memory.embeddings import (
embed_text, embed_text,
) )
# ── _simple_hash_embedding ────────────────────────────────────────────────── # ── _simple_hash_embedding ──────────────────────────────────────────────────
class TestSimpleHashEmbedding: class TestSimpleHashEmbedding:
def test_returns_128_dim_vector(self): """Tests for the deterministic hash-based fallback embedding."""
def test_returns_128_floats(self):
vec = _simple_hash_embedding("hello world") vec = _simple_hash_embedding("hello world")
assert len(vec) == 128 assert len(vec) == 128
assert all(isinstance(x, float) for x in vec)
def test_deterministic(self):
"""Same input always produces the same vector."""
assert _simple_hash_embedding("test") == _simple_hash_embedding("test")
def test_normalized(self): def test_normalized(self):
vec = _simple_hash_embedding("some text for embedding") """Output vector has unit magnitude."""
vec = _simple_hash_embedding("some text for testing")
mag = math.sqrt(sum(x * x for x in vec)) mag = math.sqrt(sum(x * x for x in vec))
assert mag == pytest.approx(1.0, abs=1e-6) assert mag == pytest.approx(1.0, abs=1e-6)
def test_deterministic(self):
a = _simple_hash_embedding("same input")
b = _simple_hash_embedding("same input")
assert a == b
def test_different_texts_differ(self):
a = _simple_hash_embedding("hello world")
b = _simple_hash_embedding("goodbye moon")
assert a != b
def test_empty_string(self): def test_empty_string(self):
"""Empty string doesn't crash — returns a zero-ish vector."""
vec = _simple_hash_embedding("") vec = _simple_hash_embedding("")
assert len(vec) == 128 assert len(vec) == 128
# All zeros normalised stays zero (mag fallback to 1.0)
assert all(x == 0.0 for x in vec)
def test_long_text_truncates_at_50_words(self): def test_different_inputs_differ(self):
"""Words beyond 50 should not change the result.""" a = _simple_hash_embedding("alpha")
short = " ".join(f"word{i}" for i in range(50)) b = _simple_hash_embedding("beta")
long = short + " extra1 extra2 extra3" assert a != b
assert _simple_hash_embedding(short) == _simple_hash_embedding(long)
# ── cosine_similarity ──────────────────────────────────────────────────────── # ── cosine_similarity ────────────────────────────────────────────────────────
class TestCosineSimilarity: class TestCosineSimilarity:
"""Tests for cosine similarity calculation."""
def test_identical_vectors(self): def test_identical_vectors(self):
v = [1.0, 2.0, 3.0] v = [1.0, 2.0, 3.0]
assert cosine_similarity(v, v) == pytest.approx(1.0) assert cosine_similarity(v, v) == pytest.approx(1.0)
@@ -71,85 +69,86 @@ class TestCosineSimilarity:
assert cosine_similarity([0.0, 0.0], [1.0, 2.0]) == 0.0 assert cosine_similarity([0.0, 0.0], [1.0, 2.0]) == 0.0
assert cosine_similarity([1.0, 2.0], [0.0, 0.0]) == 0.0 assert cosine_similarity([1.0, 2.0], [0.0, 0.0]) == 0.0
def test_both_zero_vectors(self): def test_different_length_uses_zip(self):
assert cosine_similarity([0.0], [0.0]) == 0.0 """zip(strict=False) truncates to shortest — verify no crash."""
result = cosine_similarity([1.0, 0.0], [1.0, 0.0, 9.9])
# mag_b includes the extra element, so result < 1.0
assert isinstance(result, float)
# ── _keyword_overlap ───────────────────────────────────────────────────────── # ── _keyword_overlap ─────────────────────────────────────────────────────────
class TestKeywordOverlap: class TestKeywordOverlap:
"""Tests for keyword overlap scoring."""
def test_full_overlap(self): def test_full_overlap(self):
assert _keyword_overlap("hello world", "hello world") == pytest.approx(1.0) assert _keyword_overlap("hello world", "hello world extra") == pytest.approx(1.0)
def test_partial_overlap(self): def test_partial_overlap(self):
assert _keyword_overlap("hello world", "hello moon") == pytest.approx(0.5) assert _keyword_overlap("hello world", "hello there") == pytest.approx(0.5)
def test_no_overlap(self): def test_no_overlap(self):
assert _keyword_overlap("hello", "goodbye") == pytest.approx(0.0) assert _keyword_overlap("alpha", "beta") == pytest.approx(0.0)
def test_empty_query(self): def test_empty_query(self):
assert _keyword_overlap("", "anything") == 0.0 assert _keyword_overlap("", "some content") == 0.0
def test_case_insensitive(self): def test_case_insensitive(self):
assert _keyword_overlap("Hello World", "hello world") == pytest.approx(1.0) assert _keyword_overlap("Hello", "hello world") == pytest.approx(1.0)
# ── embed_text ─────────────────────────────────────────────────────────────── # ── embed_text ───────────────────────────────────────────────────────────────
class TestEmbedText: class TestEmbedText:
def test_uses_fallback_when_model_disabled(self): """Tests for the main embed_text entry point."""
with patch.object(emb, "_get_embedding_model", return_value=False):
vec = embed_text("test") def test_uses_fallback_when_model_false(self):
assert len(vec) == 128 # hash fallback dimension """When _get_embedding_model returns False, use hash fallback."""
with patch.object(emb, "EMBEDDING_MODEL", False):
with patch.object(emb, "_get_embedding_model", return_value=False):
vec = embed_text("hello")
assert len(vec) == 128
def test_uses_model_when_available(self): def test_uses_model_when_available(self):
mock_encoding = MagicMock() """When a real model is loaded, call model.encode()."""
mock_encoding.tolist.return_value = [0.1, 0.2, 0.3]
mock_model = MagicMock() mock_model = MagicMock()
mock_model.encode.return_value = mock_encoding mock_model.encode.return_value = MagicMock(tolist=MagicMock(return_value=[0.1, 0.2]))
with patch.object(emb, "_get_embedding_model", return_value=mock_model): with patch.object(emb, "_get_embedding_model", return_value=mock_model):
result = embed_text("test") vec = embed_text("hello")
assert result == pytest.approx([0.1, 0.2, 0.3]) assert vec == [0.1, 0.2]
mock_model.encode.assert_called_once_with("test") mock_model.encode.assert_called_once_with("hello")
# ── _get_embedding_model ───────────────────────────────────────────────────── # ── _get_embedding_model ─────────────────────────────────────────────────────
class TestGetEmbeddingModel: class TestGetEmbeddingModel:
"""Tests for lazy model loading."""
def setup_method(self): def setup_method(self):
self._saved_model = emb.EMBEDDING_MODEL """Reset global state before each test."""
emb.EMBEDDING_MODEL = None emb.EMBEDDING_MODEL = None
def teardown_method(self):
emb.EMBEDDING_MODEL = self._saved_model
def test_skip_embeddings_setting(self): def test_skip_embeddings_setting(self):
"""When settings.timmy_skip_embeddings is True, model is set to False."""
mock_settings = MagicMock() mock_settings = MagicMock()
mock_settings.timmy_skip_embeddings = True mock_settings.timmy_skip_embeddings = True
with patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}): with patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}):
emb.EMBEDDING_MODEL = None emb.EMBEDDING_MODEL = None
result = emb._get_embedding_model() result = emb._get_embedding_model()
assert result is False assert result is False
def test_fallback_when_transformers_missing(self): def test_sentence_transformers_import_error(self):
"""When sentence-transformers is missing, falls back to False."""
mock_settings = MagicMock() mock_settings = MagicMock()
mock_settings.timmy_skip_embeddings = False mock_settings.timmy_skip_embeddings = False
with patch.dict( with patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}):
"sys.modules", with patch.dict("sys.modules", {"sentence_transformers": None}):
{ emb.EMBEDDING_MODEL = None
"config": MagicMock(settings=mock_settings), result = emb._get_embedding_model()
"sentence_transformers": None, assert result is False
},
):
emb.EMBEDDING_MODEL = None
result = emb._get_embedding_model()
assert result is False
def test_returns_cached_model(self): def teardown_method(self):
sentinel = object() emb.EMBEDDING_MODEL = None
emb.EMBEDDING_MODEL = sentinel
assert emb._get_embedding_model() is sentinel