forked from Rockachopa/Timmy-time-dashboard
Compare commits
29 Commits
kimi/issue
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ddb9c7d8ca | ||
| f361893fdd | |||
| 7ad0ee17b6 | |||
| 29220b6bdd | |||
| 2849dba756 | |||
| e11e07f117 | |||
| 50c8a5428e | |||
| 7da434c85b | |||
| 88e59f7c17 | |||
| aa5e9c3176 | |||
| 1b4fe65650 | |||
| 2d69f73d9d | |||
| ff1e43c235 | |||
| b331aa6139 | |||
| b45b543f2d | |||
| 7c823ab59c | |||
| 9f2728f529 | |||
| cd3dc5d989 | |||
| e4de539bf3 | |||
| b2057f72e1 | |||
| 5f52dd54c0 | |||
| 9ceffd61d1 | |||
| 015d858be5 | |||
| b6d0b5f999 | |||
| d70e4f810a | |||
| 7f20742fcf | |||
| 15eb7c3b45 | |||
| dbc2fd5b0f | |||
| 3c3aca57f1 |
@@ -54,19 +54,6 @@ providers:
|
||||
context_window: 2048
|
||||
capabilities: [text, vision, streaming]
|
||||
|
||||
# Secondary: Local AirLLM (if installed)
|
||||
- name: airllm-local
|
||||
type: airllm
|
||||
enabled: false # Enable if pip install airllm
|
||||
priority: 2
|
||||
models:
|
||||
- name: 70b
|
||||
default: true
|
||||
capabilities: [text, tools, json, streaming]
|
||||
- name: 8b
|
||||
capabilities: [text, tools, json, streaming]
|
||||
- name: 405b
|
||||
capabilities: [text, tools, json, streaming]
|
||||
|
||||
# Tertiary: OpenAI (if API key available)
|
||||
- name: openai-backup
|
||||
|
||||
@@ -94,12 +94,17 @@ def extract_cycle_number(title: str) -> int | None:
|
||||
return int(m.group(1)) if m else None
|
||||
|
||||
|
||||
def extract_issue_number(title: str, body: str) -> int | None:
|
||||
# Try body first (usually has "closes #N")
|
||||
def extract_issue_number(title: str, body: str, pr_number: int | None = None) -> int | None:
|
||||
"""Extract the issue number from PR body/title, ignoring the PR number itself.
|
||||
|
||||
Gitea appends "(#N)" to PR titles where N is the PR number — skip that
|
||||
so we don't confuse it with the linked issue.
|
||||
"""
|
||||
for text in [body or "", title]:
|
||||
m = ISSUE_RE.search(text)
|
||||
if m:
|
||||
return int(m.group(1))
|
||||
for m in ISSUE_RE.finditer(text):
|
||||
num = int(m.group(1))
|
||||
if num != pr_number:
|
||||
return num
|
||||
return None
|
||||
|
||||
|
||||
@@ -140,7 +145,7 @@ def main():
|
||||
else:
|
||||
cycle_counter = max(cycle_counter, cycle)
|
||||
|
||||
issue = extract_issue_number(title, body)
|
||||
issue = extract_issue_number(title, body, pr_number=pr_num)
|
||||
issue_type = classify_pr(title, body)
|
||||
duration = estimate_duration(pr)
|
||||
diff = get_pr_diff_stats(token, pr_num)
|
||||
|
||||
@@ -4,11 +4,26 @@
|
||||
Called after each cycle completes (success or failure).
|
||||
Appends a structured entry to .loop/retro/cycles.jsonl.
|
||||
|
||||
EPOCH NOTATION (turnover system):
|
||||
Each cycle carries a symbolic epoch tag alongside the raw integer:
|
||||
|
||||
⟳WW.D:NNN
|
||||
|
||||
⟳ turnover glyph — marks epoch-aware cycles
|
||||
WW ISO week-of-year (01–53)
|
||||
D ISO weekday (1=Mon … 7=Sun)
|
||||
NNN daily cycle counter, zero-padded, resets at midnight UTC
|
||||
|
||||
Example: ⟳12.3:042 — Week 12, Wednesday, 42nd cycle of the day.
|
||||
|
||||
The raw `cycle` integer is preserved for backward compatibility.
|
||||
The `epoch` field carries the symbolic notation.
|
||||
|
||||
SUCCESS DEFINITION:
|
||||
A cycle is only "success" if BOTH conditions are met:
|
||||
1. The hermes process exited cleanly (exit code 0)
|
||||
2. Main is green (smoke test passes on main after merge)
|
||||
|
||||
|
||||
A cycle that merges a PR but leaves main red is a FAILURE.
|
||||
The --main-green flag records the smoke test result.
|
||||
|
||||
@@ -29,6 +44,8 @@ from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
@@ -36,10 +53,68 @@ from pathlib import Path
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
|
||||
SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json"
|
||||
EPOCH_COUNTER_FILE = REPO_ROOT / ".loop" / "retro" / ".epoch_counter"
|
||||
|
||||
# How many recent entries to include in rolling summary
|
||||
SUMMARY_WINDOW = 50
|
||||
|
||||
# Branch patterns that encode an issue number, e.g. kimi/issue-492
|
||||
BRANCH_ISSUE_RE = re.compile(r"issue[/-](\d+)", re.IGNORECASE)
|
||||
|
||||
|
||||
def detect_issue_from_branch() -> int | None:
|
||||
"""Try to extract an issue number from the current git branch name."""
|
||||
try:
|
||||
branch = subprocess.check_output(
|
||||
["git", "rev-parse", "--abbrev-ref", "HEAD"],
|
||||
stderr=subprocess.DEVNULL,
|
||||
text=True,
|
||||
).strip()
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
return None
|
||||
m = BRANCH_ISSUE_RE.search(branch)
|
||||
return int(m.group(1)) if m else None
|
||||
|
||||
|
||||
# ── Epoch turnover ────────────────────────────────────────────────────────
|
||||
|
||||
def _epoch_tag(now: datetime | None = None) -> tuple[str, dict]:
|
||||
"""Generate the symbolic epoch tag and advance the daily counter.
|
||||
|
||||
Returns (epoch_string, epoch_parts) where epoch_parts is a dict with
|
||||
week, weekday, daily_n for structured storage.
|
||||
|
||||
The daily counter persists in .epoch_counter as a two-line file:
|
||||
line 1: ISO date (YYYY-MM-DD) of the current epoch day
|
||||
line 2: integer count
|
||||
When the date rolls over, the counter resets to 1.
|
||||
"""
|
||||
if now is None:
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
iso_cal = now.isocalendar() # (year, week, weekday)
|
||||
week = iso_cal[1]
|
||||
weekday = iso_cal[2]
|
||||
today_str = now.strftime("%Y-%m-%d")
|
||||
|
||||
# Read / reset daily counter
|
||||
daily_n = 1
|
||||
EPOCH_COUNTER_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
if EPOCH_COUNTER_FILE.exists():
|
||||
try:
|
||||
lines = EPOCH_COUNTER_FILE.read_text().strip().splitlines()
|
||||
if len(lines) == 2 and lines[0] == today_str:
|
||||
daily_n = int(lines[1]) + 1
|
||||
except (ValueError, IndexError):
|
||||
pass # corrupt file — reset
|
||||
|
||||
# Persist
|
||||
EPOCH_COUNTER_FILE.write_text(f"{today_str}\n{daily_n}\n")
|
||||
|
||||
tag = f"\u27f3{week:02d}.{weekday}:{daily_n:03d}"
|
||||
parts = {"week": week, "weekday": weekday, "daily_n": daily_n}
|
||||
return tag, parts
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description="Log a cycle retrospective")
|
||||
@@ -123,8 +198,30 @@ def update_summary() -> None:
|
||||
issue_failures[e["issue"]] = issue_failures.get(e["issue"], 0) + 1
|
||||
quarantine_candidates = {k: v for k, v in issue_failures.items() if v >= 2}
|
||||
|
||||
# Epoch turnover stats — cycles per week/day from epoch-tagged entries
|
||||
epoch_entries = [e for e in recent if e.get("epoch")]
|
||||
by_week: dict[int, int] = {}
|
||||
by_weekday: dict[int, int] = {}
|
||||
for e in epoch_entries:
|
||||
w = e.get("epoch_week")
|
||||
d = e.get("epoch_weekday")
|
||||
if w is not None:
|
||||
by_week[w] = by_week.get(w, 0) + 1
|
||||
if d is not None:
|
||||
by_weekday[d] = by_weekday.get(d, 0) + 1
|
||||
|
||||
# Current epoch — latest entry's epoch tag
|
||||
current_epoch = epoch_entries[-1].get("epoch", "") if epoch_entries else ""
|
||||
|
||||
# Weekday names for display
|
||||
weekday_glyphs = {1: "Mon", 2: "Tue", 3: "Wed", 4: "Thu",
|
||||
5: "Fri", 6: "Sat", 7: "Sun"}
|
||||
by_weekday_named = {weekday_glyphs.get(k, str(k)): v
|
||||
for k, v in sorted(by_weekday.items())}
|
||||
|
||||
summary = {
|
||||
"updated_at": datetime.now(timezone.utc).isoformat(),
|
||||
"current_epoch": current_epoch,
|
||||
"window": len(recent),
|
||||
"measured_cycles": len(measured),
|
||||
"total_cycles": len(entries),
|
||||
@@ -136,9 +233,12 @@ def update_summary() -> None:
|
||||
"total_lines_removed": sum(e.get("lines_removed", 0) for e in recent),
|
||||
"total_prs_merged": sum(1 for e in recent if e.get("pr")),
|
||||
"by_type": type_stats,
|
||||
"by_week": dict(sorted(by_week.items())),
|
||||
"by_weekday": by_weekday_named,
|
||||
"quarantine_candidates": quarantine_candidates,
|
||||
"recent_failures": [
|
||||
{"cycle": e["cycle"], "issue": e.get("issue"), "reason": e.get("reason", "")}
|
||||
{"cycle": e["cycle"], "epoch": e.get("epoch", ""),
|
||||
"issue": e.get("issue"), "reason": e.get("reason", "")}
|
||||
for e in failures[-5:]
|
||||
],
|
||||
}
|
||||
@@ -149,6 +249,10 @@ def update_summary() -> None:
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
|
||||
# Auto-detect issue from branch when not explicitly provided
|
||||
if args.issue is None:
|
||||
args.issue = detect_issue_from_branch()
|
||||
|
||||
# Reject idle cycles — no issue and no duration means nothing happened
|
||||
if not args.issue and args.duration == 0:
|
||||
print(f"[retro] Cycle {args.cycle} skipped — idle (no issue, no duration)")
|
||||
@@ -157,9 +261,17 @@ def main() -> None:
|
||||
# A cycle is only truly successful if hermes exited clean AND main is green
|
||||
truly_success = args.success and args.main_green
|
||||
|
||||
# Generate epoch turnover tag
|
||||
now = datetime.now(timezone.utc)
|
||||
epoch_tag, epoch_parts = _epoch_tag(now)
|
||||
|
||||
entry = {
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"timestamp": now.isoformat(),
|
||||
"cycle": args.cycle,
|
||||
"epoch": epoch_tag,
|
||||
"epoch_week": epoch_parts["week"],
|
||||
"epoch_weekday": epoch_parts["weekday"],
|
||||
"epoch_daily_n": epoch_parts["daily_n"],
|
||||
"issue": args.issue,
|
||||
"type": args.type,
|
||||
"success": truly_success,
|
||||
@@ -184,7 +296,7 @@ def main() -> None:
|
||||
update_summary()
|
||||
|
||||
status = "✓ SUCCESS" if args.success else "✗ FAILURE"
|
||||
print(f"[retro] Cycle {args.cycle} {status}", end="")
|
||||
print(f"[retro] {epoch_tag} Cycle {args.cycle} {status}", end="")
|
||||
if args.issue:
|
||||
print(f" (#{args.issue} {args.type})", end="")
|
||||
if args.duration:
|
||||
|
||||
@@ -18,13 +18,19 @@ Exit codes:
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
|
||||
IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
|
||||
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
|
||||
|
||||
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
|
||||
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
|
||||
|
||||
# Backoff sequence: 60s, 120s, 240s, 600s max
|
||||
BACKOFF_BASE = 60
|
||||
@@ -32,19 +38,81 @@ BACKOFF_MAX = 600
|
||||
BACKOFF_MULTIPLIER = 2
|
||||
|
||||
|
||||
def _get_token() -> str:
|
||||
"""Read Gitea token from env or file."""
|
||||
token = os.environ.get("GITEA_TOKEN", "").strip()
|
||||
if not token and TOKEN_FILE.exists():
|
||||
token = TOKEN_FILE.read_text().strip()
|
||||
return token
|
||||
|
||||
|
||||
def _fetch_open_issue_numbers() -> set[int] | None:
|
||||
"""Fetch open issue numbers from Gitea. Returns None on failure."""
|
||||
token = _get_token()
|
||||
if not token:
|
||||
return None
|
||||
try:
|
||||
numbers: set[int] = set()
|
||||
page = 1
|
||||
while True:
|
||||
url = (
|
||||
f"{GITEA_API}/repos/{REPO_SLUG}/issues"
|
||||
f"?state=open&type=issues&limit=50&page={page}"
|
||||
)
|
||||
req = urllib.request.Request(url, headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Accept": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
data = json.loads(resp.read())
|
||||
if not data:
|
||||
break
|
||||
for issue in data:
|
||||
numbers.add(issue["number"])
|
||||
if len(data) < 50:
|
||||
break
|
||||
page += 1
|
||||
return numbers
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def load_queue() -> list[dict]:
|
||||
"""Load queue.json and return ready items."""
|
||||
"""Load queue.json and return ready items, filtering out closed issues."""
|
||||
if not QUEUE_FILE.exists():
|
||||
return []
|
||||
try:
|
||||
data = json.loads(QUEUE_FILE.read_text())
|
||||
if isinstance(data, list):
|
||||
return [item for item in data if item.get("ready")]
|
||||
return []
|
||||
if not isinstance(data, list):
|
||||
return []
|
||||
ready = [item for item in data if item.get("ready")]
|
||||
if not ready:
|
||||
return []
|
||||
|
||||
# Filter out issues that are no longer open (auto-hygiene)
|
||||
open_numbers = _fetch_open_issue_numbers()
|
||||
if open_numbers is not None:
|
||||
before = len(ready)
|
||||
ready = [item for item in ready if item.get("issue") in open_numbers]
|
||||
removed = before - len(ready)
|
||||
if removed > 0:
|
||||
print(f"[loop-guard] Filtered {removed} closed issue(s) from queue")
|
||||
# Persist the cleaned queue so stale entries don't recur
|
||||
_save_cleaned_queue(data, open_numbers)
|
||||
return ready
|
||||
except (json.JSONDecodeError, OSError):
|
||||
return []
|
||||
|
||||
|
||||
def _save_cleaned_queue(full_queue: list[dict], open_numbers: set[int]) -> None:
|
||||
"""Rewrite queue.json without closed issues."""
|
||||
cleaned = [item for item in full_queue if item.get("issue") in open_numbers]
|
||||
try:
|
||||
QUEUE_FILE.write_text(json.dumps(cleaned, indent=2) + "\n")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def load_idle_state() -> dict:
|
||||
"""Load persistent idle state."""
|
||||
if not IDLE_STATE_FILE.exists():
|
||||
|
||||
407
scripts/loop_introspect.py
Normal file
407
scripts/loop_introspect.py
Normal file
@@ -0,0 +1,407 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Loop introspection — the self-improvement engine.
|
||||
|
||||
Analyzes retro data across time windows to detect trends, extract patterns,
|
||||
and produce structured recommendations. Output is consumed by deep_triage
|
||||
and injected into the loop prompt context.
|
||||
|
||||
This is the piece that closes the feedback loop:
|
||||
cycle_retro → introspect → deep_triage → loop behavior changes
|
||||
|
||||
Run: python3 scripts/loop_introspect.py
|
||||
Output: .loop/retro/insights.json (structured insights + recommendations)
|
||||
Prints human-readable summary to stdout.
|
||||
|
||||
Called by: deep_triage.sh (before the LLM triage), timmy-loop.sh (every 50 cycles)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
CYCLES_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
|
||||
DEEP_TRIAGE_FILE = REPO_ROOT / ".loop" / "retro" / "deep-triage.jsonl"
|
||||
TRIAGE_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
|
||||
QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json"
|
||||
INSIGHTS_FILE = REPO_ROOT / ".loop" / "retro" / "insights.json"
|
||||
|
||||
# ── Helpers ──────────────────────────────────────────────────────────────
|
||||
|
||||
def load_jsonl(path: Path) -> list[dict]:
|
||||
"""Load a JSONL file, skipping bad lines."""
|
||||
if not path.exists():
|
||||
return []
|
||||
entries = []
|
||||
for line in path.read_text().strip().splitlines():
|
||||
try:
|
||||
entries.append(json.loads(line))
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
return entries
|
||||
|
||||
|
||||
def parse_ts(ts_str: str) -> datetime | None:
|
||||
"""Parse an ISO timestamp, tolerating missing tz."""
|
||||
if not ts_str:
|
||||
return None
|
||||
try:
|
||||
dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def window(entries: list[dict], days: int) -> list[dict]:
|
||||
"""Filter entries to the last N days."""
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
||||
result = []
|
||||
for e in entries:
|
||||
ts = parse_ts(e.get("timestamp", ""))
|
||||
if ts and ts >= cutoff:
|
||||
result.append(e)
|
||||
return result
|
||||
|
||||
|
||||
# ── Analysis functions ───────────────────────────────────────────────────
|
||||
|
||||
def compute_trends(cycles: list[dict]) -> dict:
|
||||
"""Compare recent window (last 7d) vs older window (7-14d ago)."""
|
||||
recent = window(cycles, 7)
|
||||
older = window(cycles, 14)
|
||||
# Remove recent from older to get the 7-14d window
|
||||
recent_set = {(e.get("cycle"), e.get("timestamp")) for e in recent}
|
||||
older = [e for e in older if (e.get("cycle"), e.get("timestamp")) not in recent_set]
|
||||
|
||||
def stats(entries):
|
||||
if not entries:
|
||||
return {"count": 0, "success_rate": None, "avg_duration": None,
|
||||
"lines_net": 0, "prs_merged": 0}
|
||||
successes = sum(1 for e in entries if e.get("success"))
|
||||
durations = [e["duration"] for e in entries if e.get("duration", 0) > 0]
|
||||
return {
|
||||
"count": len(entries),
|
||||
"success_rate": round(successes / len(entries), 3) if entries else None,
|
||||
"avg_duration": round(sum(durations) / len(durations)) if durations else None,
|
||||
"lines_net": sum(e.get("lines_added", 0) - e.get("lines_removed", 0) for e in entries),
|
||||
"prs_merged": sum(1 for e in entries if e.get("pr")),
|
||||
}
|
||||
|
||||
recent_stats = stats(recent)
|
||||
older_stats = stats(older)
|
||||
|
||||
trend = {
|
||||
"recent_7d": recent_stats,
|
||||
"previous_7d": older_stats,
|
||||
"velocity_change": None,
|
||||
"success_rate_change": None,
|
||||
"duration_change": None,
|
||||
}
|
||||
|
||||
if recent_stats["count"] and older_stats["count"]:
|
||||
trend["velocity_change"] = recent_stats["count"] - older_stats["count"]
|
||||
if recent_stats["success_rate"] is not None and older_stats["success_rate"] is not None:
|
||||
trend["success_rate_change"] = round(
|
||||
recent_stats["success_rate"] - older_stats["success_rate"], 3
|
||||
)
|
||||
if recent_stats["avg_duration"] is not None and older_stats["avg_duration"] is not None:
|
||||
trend["duration_change"] = recent_stats["avg_duration"] - older_stats["avg_duration"]
|
||||
|
||||
return trend
|
||||
|
||||
|
||||
def type_analysis(cycles: list[dict]) -> dict:
|
||||
"""Per-type success rates and durations."""
|
||||
by_type: dict[str, list[dict]] = defaultdict(list)
|
||||
for c in cycles:
|
||||
by_type[c.get("type", "unknown")].append(c)
|
||||
|
||||
result = {}
|
||||
for t, entries in by_type.items():
|
||||
durations = [e["duration"] for e in entries if e.get("duration", 0) > 0]
|
||||
successes = sum(1 for e in entries if e.get("success"))
|
||||
result[t] = {
|
||||
"count": len(entries),
|
||||
"success_rate": round(successes / len(entries), 3) if entries else 0,
|
||||
"avg_duration": round(sum(durations) / len(durations)) if durations else 0,
|
||||
"max_duration": max(durations) if durations else 0,
|
||||
}
|
||||
return result
|
||||
|
||||
|
||||
def repeat_failures(cycles: list[dict]) -> list[dict]:
|
||||
"""Issues that have failed multiple times — quarantine candidates."""
|
||||
failures: dict[int, list] = defaultdict(list)
|
||||
for c in cycles:
|
||||
if not c.get("success") and c.get("issue"):
|
||||
failures[c["issue"]].append({
|
||||
"cycle": c.get("cycle"),
|
||||
"reason": c.get("reason", ""),
|
||||
"duration": c.get("duration", 0),
|
||||
})
|
||||
# Only issues with 2+ failures
|
||||
return [
|
||||
{"issue": k, "failure_count": len(v), "attempts": v}
|
||||
for k, v in sorted(failures.items(), key=lambda x: -len(x[1]))
|
||||
if len(v) >= 2
|
||||
]
|
||||
|
||||
|
||||
def duration_outliers(cycles: list[dict], threshold_multiple: float = 3.0) -> list[dict]:
|
||||
"""Cycles that took way longer than average — something went wrong."""
|
||||
durations = [c["duration"] for c in cycles if c.get("duration", 0) > 0]
|
||||
if len(durations) < 5:
|
||||
return []
|
||||
avg = sum(durations) / len(durations)
|
||||
threshold = avg * threshold_multiple
|
||||
|
||||
outliers = []
|
||||
for c in cycles:
|
||||
dur = c.get("duration", 0)
|
||||
if dur > threshold:
|
||||
outliers.append({
|
||||
"cycle": c.get("cycle"),
|
||||
"issue": c.get("issue"),
|
||||
"type": c.get("type"),
|
||||
"duration": dur,
|
||||
"avg_duration": round(avg),
|
||||
"multiple": round(dur / avg, 1) if avg > 0 else 0,
|
||||
"reason": c.get("reason", ""),
|
||||
})
|
||||
return outliers
|
||||
|
||||
|
||||
def triage_effectiveness(deep_triages: list[dict]) -> dict:
|
||||
"""How well is the deep triage performing?"""
|
||||
if not deep_triages:
|
||||
return {"runs": 0, "note": "No deep triage data yet"}
|
||||
|
||||
total_reviewed = sum(d.get("issues_reviewed", 0) for d in deep_triages)
|
||||
total_refined = sum(len(d.get("issues_refined", [])) for d in deep_triages)
|
||||
total_created = sum(len(d.get("issues_created", [])) for d in deep_triages)
|
||||
total_closed = sum(len(d.get("issues_closed", [])) for d in deep_triages)
|
||||
timmy_available = sum(1 for d in deep_triages if d.get("timmy_available"))
|
||||
|
||||
# Extract Timmy's feedback themes
|
||||
timmy_themes = []
|
||||
for d in deep_triages:
|
||||
fb = d.get("timmy_feedback", "")
|
||||
if fb:
|
||||
timmy_themes.append(fb[:200])
|
||||
|
||||
return {
|
||||
"runs": len(deep_triages),
|
||||
"total_reviewed": total_reviewed,
|
||||
"total_refined": total_refined,
|
||||
"total_created": total_created,
|
||||
"total_closed": total_closed,
|
||||
"timmy_consultation_rate": round(timmy_available / len(deep_triages), 2),
|
||||
"timmy_recent_feedback": timmy_themes[-1] if timmy_themes else "",
|
||||
"timmy_feedback_history": timmy_themes,
|
||||
}
|
||||
|
||||
|
||||
def generate_recommendations(
|
||||
trends: dict,
|
||||
types: dict,
|
||||
repeats: list,
|
||||
outliers: list,
|
||||
triage_eff: dict,
|
||||
) -> list[dict]:
|
||||
"""Produce actionable recommendations from the analysis."""
|
||||
recs = []
|
||||
|
||||
# 1. Success rate declining?
|
||||
src = trends.get("success_rate_change")
|
||||
if src is not None and src < -0.1:
|
||||
recs.append({
|
||||
"severity": "high",
|
||||
"category": "reliability",
|
||||
"finding": f"Success rate dropped {abs(src)*100:.0f}pp in the last 7 days",
|
||||
"recommendation": "Review recent failures. Are issues poorly scoped? "
|
||||
"Is main unstable? Check if triage is producing bad work items.",
|
||||
})
|
||||
|
||||
# 2. Velocity dropping?
|
||||
vc = trends.get("velocity_change")
|
||||
if vc is not None and vc < -5:
|
||||
recs.append({
|
||||
"severity": "medium",
|
||||
"category": "throughput",
|
||||
"finding": f"Velocity dropped by {abs(vc)} cycles vs previous week",
|
||||
"recommendation": "Check for loop stalls, long-running cycles, or queue starvation.",
|
||||
})
|
||||
|
||||
# 3. Duration creep?
|
||||
dc = trends.get("duration_change")
|
||||
if dc is not None and dc > 120: # 2+ minutes longer
|
||||
recs.append({
|
||||
"severity": "medium",
|
||||
"category": "efficiency",
|
||||
"finding": f"Average cycle duration increased by {dc}s vs previous week",
|
||||
"recommendation": "Issues may be growing in scope. Enforce tighter decomposition "
|
||||
"in deep triage. Check if tests are getting slower.",
|
||||
})
|
||||
|
||||
# 4. Type-specific problems
|
||||
for t, info in types.items():
|
||||
if info["count"] >= 3 and info["success_rate"] < 0.5:
|
||||
recs.append({
|
||||
"severity": "high",
|
||||
"category": "type_reliability",
|
||||
"finding": f"'{t}' issues fail {(1-info['success_rate'])*100:.0f}% of the time "
|
||||
f"({info['count']} attempts)",
|
||||
"recommendation": f"'{t}' issues need better scoping or different approach. "
|
||||
f"Consider: tighter acceptance criteria, smaller scope, "
|
||||
f"or delegating to Kimi with more context.",
|
||||
})
|
||||
if info["avg_duration"] > 600 and info["count"] >= 3: # >10 min avg
|
||||
recs.append({
|
||||
"severity": "medium",
|
||||
"category": "type_efficiency",
|
||||
"finding": f"'{t}' issues average {info['avg_duration']//60}m{info['avg_duration']%60}s "
|
||||
f"(max {info['max_duration']//60}m)",
|
||||
"recommendation": f"Break '{t}' issues into smaller pieces. Target <5 min per cycle.",
|
||||
})
|
||||
|
||||
# 5. Repeat failures
|
||||
for rf in repeats[:3]:
|
||||
recs.append({
|
||||
"severity": "high",
|
||||
"category": "repeat_failure",
|
||||
"finding": f"Issue #{rf['issue']} has failed {rf['failure_count']} times",
|
||||
"recommendation": "Quarantine or rewrite this issue. Repeated failure = "
|
||||
"bad scope or missing prerequisite.",
|
||||
})
|
||||
|
||||
# 6. Outliers
|
||||
if len(outliers) > 2:
|
||||
recs.append({
|
||||
"severity": "medium",
|
||||
"category": "outliers",
|
||||
"finding": f"{len(outliers)} cycles took {outliers[0].get('multiple', '?')}x+ "
|
||||
f"longer than average",
|
||||
"recommendation": "Long cycles waste resources. Add timeout enforcement or "
|
||||
"break complex issues earlier.",
|
||||
})
|
||||
|
||||
# 7. Code growth
|
||||
recent = trends.get("recent_7d", {})
|
||||
net = recent.get("lines_net", 0)
|
||||
if net > 500:
|
||||
recs.append({
|
||||
"severity": "low",
|
||||
"category": "code_health",
|
||||
"finding": f"Net +{net} lines added in the last 7 days",
|
||||
"recommendation": "Lines of code is a liability. Balance feature work with "
|
||||
"refactoring. Target net-zero or negative line growth.",
|
||||
})
|
||||
|
||||
# 8. Triage health
|
||||
if triage_eff.get("runs", 0) == 0:
|
||||
recs.append({
|
||||
"severity": "high",
|
||||
"category": "triage",
|
||||
"finding": "Deep triage has never run",
|
||||
"recommendation": "Enable deep triage (every 20 cycles). The loop needs "
|
||||
"LLM-driven issue refinement to stay effective.",
|
||||
})
|
||||
|
||||
# No recommendations = things are healthy
|
||||
if not recs:
|
||||
recs.append({
|
||||
"severity": "info",
|
||||
"category": "health",
|
||||
"finding": "No significant issues detected",
|
||||
"recommendation": "System is healthy. Continue current patterns.",
|
||||
})
|
||||
|
||||
return recs
|
||||
|
||||
|
||||
# ── Main ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
cycles = load_jsonl(CYCLES_FILE)
|
||||
deep_triages = load_jsonl(DEEP_TRIAGE_FILE)
|
||||
|
||||
if not cycles:
|
||||
print("[introspect] No cycle data found. Nothing to analyze.")
|
||||
return
|
||||
|
||||
# Run all analyses
|
||||
trends = compute_trends(cycles)
|
||||
types = type_analysis(cycles)
|
||||
repeats = repeat_failures(cycles)
|
||||
outliers = duration_outliers(cycles)
|
||||
triage_eff = triage_effectiveness(deep_triages)
|
||||
recommendations = generate_recommendations(trends, types, repeats, outliers, triage_eff)
|
||||
|
||||
insights = {
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
"total_cycles_analyzed": len(cycles),
|
||||
"trends": trends,
|
||||
"by_type": types,
|
||||
"repeat_failures": repeats[:5],
|
||||
"duration_outliers": outliers[:5],
|
||||
"triage_effectiveness": triage_eff,
|
||||
"recommendations": recommendations,
|
||||
}
|
||||
|
||||
# Write insights
|
||||
INSIGHTS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
INSIGHTS_FILE.write_text(json.dumps(insights, indent=2) + "\n")
|
||||
|
||||
# Current epoch from latest entry
|
||||
latest_epoch = ""
|
||||
for c in reversed(cycles):
|
||||
if c.get("epoch"):
|
||||
latest_epoch = c["epoch"]
|
||||
break
|
||||
|
||||
# Human-readable output
|
||||
header = f"[introspect] Analyzed {len(cycles)} cycles"
|
||||
if latest_epoch:
|
||||
header += f" · current epoch: {latest_epoch}"
|
||||
print(header)
|
||||
|
||||
print(f"\n TRENDS (7d vs previous 7d):")
|
||||
r7 = trends["recent_7d"]
|
||||
p7 = trends["previous_7d"]
|
||||
print(f" Cycles: {r7['count']:>3d} (was {p7['count']})")
|
||||
if r7["success_rate"] is not None:
|
||||
arrow = "↑" if (trends["success_rate_change"] or 0) > 0 else "↓" if (trends["success_rate_change"] or 0) < 0 else "→"
|
||||
print(f" Success rate: {r7['success_rate']*100:>4.0f}% {arrow}")
|
||||
if r7["avg_duration"] is not None:
|
||||
print(f" Avg duration: {r7['avg_duration']//60}m{r7['avg_duration']%60:02d}s")
|
||||
print(f" PRs merged: {r7['prs_merged']:>3d} (was {p7['prs_merged']})")
|
||||
print(f" Lines net: {r7['lines_net']:>+5d}")
|
||||
|
||||
print(f"\n BY TYPE:")
|
||||
for t, info in sorted(types.items(), key=lambda x: -x[1]["count"]):
|
||||
print(f" {t:12s} n={info['count']:>2d} "
|
||||
f"ok={info['success_rate']*100:>3.0f}% "
|
||||
f"avg={info['avg_duration']//60}m{info['avg_duration']%60:02d}s")
|
||||
|
||||
if repeats:
|
||||
print(f"\n REPEAT FAILURES:")
|
||||
for rf in repeats[:3]:
|
||||
print(f" #{rf['issue']} failed {rf['failure_count']}x")
|
||||
|
||||
print(f"\n RECOMMENDATIONS ({len(recommendations)}):")
|
||||
for i, rec in enumerate(recommendations, 1):
|
||||
sev = {"high": "🔴", "medium": "🟡", "low": "🟢", "info": "ℹ️ "}.get(rec["severity"], "?")
|
||||
print(f" {sev} {rec['finding']}")
|
||||
print(f" → {rec['recommendation']}")
|
||||
|
||||
print(f"\n Written to: {INSIGHTS_FILE}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -10,6 +10,11 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
APP_START_TIME: _datetime = _datetime.now(UTC)
|
||||
|
||||
|
||||
def normalize_ollama_url(url: str) -> str:
|
||||
"""Replace localhost with 127.0.0.1 to avoid IPv6 resolution delays."""
|
||||
return url.replace("localhost", "127.0.0.1")
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
"""Central configuration — all env-var access goes through this class."""
|
||||
|
||||
@@ -19,6 +24,11 @@ class Settings(BaseSettings):
|
||||
# Ollama host — override with OLLAMA_URL env var or .env file
|
||||
ollama_url: str = "http://localhost:11434"
|
||||
|
||||
@property
|
||||
def normalized_ollama_url(self) -> str:
|
||||
"""Return ollama_url with localhost replaced by 127.0.0.1."""
|
||||
return normalize_ollama_url(self.ollama_url)
|
||||
|
||||
# LLM model passed to Agno/Ollama — override with OLLAMA_MODEL
|
||||
# qwen3:30b is the primary model — better reasoning and tool calling
|
||||
# than llama3.1:8b-instruct while still running locally on modest hardware.
|
||||
@@ -244,6 +254,7 @@ class Settings(BaseSettings):
|
||||
# When enabled, the agent starts an internal thought loop on server start.
|
||||
thinking_enabled: bool = True
|
||||
thinking_interval_seconds: int = 300 # 5 minutes between thoughts
|
||||
thinking_timeout_seconds: int = 120 # max wall-clock time per thinking cycle
|
||||
thinking_distill_every: int = 10 # distill facts from thoughts every Nth thought
|
||||
thinking_issue_every: int = 20 # file Gitea issues from thoughts every Nth thought
|
||||
thinking_memory_check_every: int = 50 # check memory status every Nth thought
|
||||
@@ -392,7 +403,7 @@ def check_ollama_model_available(model_name: str) -> bool:
|
||||
import json
|
||||
import urllib.request
|
||||
|
||||
url = settings.ollama_url.replace("localhost", "127.0.0.1")
|
||||
url = settings.normalized_ollama_url
|
||||
req = urllib.request.Request(
|
||||
f"{url}/api/tags",
|
||||
method="GET",
|
||||
@@ -469,8 +480,19 @@ def validate_startup(*, force: bool = False) -> None:
|
||||
", ".join(_missing),
|
||||
)
|
||||
sys.exit(1)
|
||||
if "*" in settings.cors_origins:
|
||||
_startup_logger.error(
|
||||
"PRODUCTION SECURITY ERROR: CORS wildcard '*' is not allowed "
|
||||
"in production. Set CORS_ORIGINS to explicit origins."
|
||||
)
|
||||
sys.exit(1)
|
||||
_startup_logger.info("Production mode: security secrets validated ✓")
|
||||
else:
|
||||
if "*" in settings.cors_origins:
|
||||
_startup_logger.warning(
|
||||
"SEC: CORS_ORIGINS contains wildcard '*' — "
|
||||
"restrict to explicit origins before deploying to production."
|
||||
)
|
||||
if not settings.l402_hmac_secret:
|
||||
_startup_logger.warning(
|
||||
"SEC: L402_HMAC_SECRET is not set — "
|
||||
|
||||
@@ -155,7 +155,17 @@ async def _thinking_scheduler() -> None:
|
||||
while True:
|
||||
try:
|
||||
if settings.thinking_enabled:
|
||||
await thinking_engine.think_once()
|
||||
await asyncio.wait_for(
|
||||
thinking_engine.think_once(),
|
||||
timeout=settings.thinking_timeout_seconds,
|
||||
)
|
||||
except TimeoutError:
|
||||
logger.warning(
|
||||
"Thinking cycle timed out after %ds — Ollama may be unresponsive",
|
||||
settings.thinking_timeout_seconds,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.error("Thinking scheduler error: %s", exc)
|
||||
|
||||
@@ -175,7 +185,10 @@ async def _loop_qa_scheduler() -> None:
|
||||
while True:
|
||||
try:
|
||||
if settings.loop_qa_enabled:
|
||||
result = await loop_qa_orchestrator.run_next_test()
|
||||
result = await asyncio.wait_for(
|
||||
loop_qa_orchestrator.run_next_test(),
|
||||
timeout=settings.thinking_timeout_seconds,
|
||||
)
|
||||
if result:
|
||||
status = "PASS" if result["success"] else "FAIL"
|
||||
logger.info(
|
||||
@@ -184,6 +197,13 @@ async def _loop_qa_scheduler() -> None:
|
||||
status,
|
||||
result.get("details", "")[:80],
|
||||
)
|
||||
except TimeoutError:
|
||||
logger.warning(
|
||||
"Loop QA test timed out after %ds",
|
||||
settings.thinking_timeout_seconds,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.error("Loop QA scheduler error: %s", exc)
|
||||
|
||||
@@ -329,33 +349,35 @@ async def _discord_token_watcher() -> None:
|
||||
logger.warning("Discord auto-start failed: %s", exc)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""Application lifespan manager with non-blocking startup."""
|
||||
|
||||
# Validate security config (no-op in test mode)
|
||||
def _startup_init() -> None:
|
||||
"""Validate config and enable event persistence."""
|
||||
from config import validate_startup
|
||||
|
||||
validate_startup()
|
||||
|
||||
# Enable event persistence (unified EventBus + swarm event_log)
|
||||
from infrastructure.events.bus import init_event_bus_persistence
|
||||
|
||||
init_event_bus_persistence()
|
||||
|
||||
# Create all background tasks without waiting for them
|
||||
briefing_task = asyncio.create_task(_briefing_scheduler())
|
||||
thinking_task = asyncio.create_task(_thinking_scheduler())
|
||||
loop_qa_task = asyncio.create_task(_loop_qa_scheduler())
|
||||
presence_task = asyncio.create_task(_presence_watcher())
|
||||
|
||||
# Initialize Spark Intelligence engine
|
||||
from spark.engine import get_spark_engine
|
||||
|
||||
if get_spark_engine().enabled:
|
||||
logger.info("Spark Intelligence active — event capture enabled")
|
||||
|
||||
# Auto-prune old vector store memories on startup
|
||||
|
||||
def _startup_background_tasks() -> list[asyncio.Task]:
|
||||
"""Spawn all recurring background tasks (non-blocking)."""
|
||||
return [
|
||||
asyncio.create_task(_briefing_scheduler()),
|
||||
asyncio.create_task(_thinking_scheduler()),
|
||||
asyncio.create_task(_loop_qa_scheduler()),
|
||||
asyncio.create_task(_presence_watcher()),
|
||||
asyncio.create_task(_start_chat_integrations_background()),
|
||||
]
|
||||
|
||||
|
||||
def _startup_pruning() -> None:
|
||||
"""Auto-prune old memories, thoughts, and events on startup."""
|
||||
if settings.memory_prune_days > 0:
|
||||
try:
|
||||
from timmy.memory_system import prune_memories
|
||||
@@ -373,7 +395,6 @@ async def lifespan(app: FastAPI):
|
||||
except Exception as exc:
|
||||
logger.debug("Memory auto-prune skipped: %s", exc)
|
||||
|
||||
# Auto-prune old thoughts on startup
|
||||
if settings.thoughts_prune_days > 0:
|
||||
try:
|
||||
from timmy.thinking import thinking_engine
|
||||
@@ -391,7 +412,6 @@ async def lifespan(app: FastAPI):
|
||||
except Exception as exc:
|
||||
logger.debug("Thought auto-prune skipped: %s", exc)
|
||||
|
||||
# Auto-prune old system events on startup
|
||||
if settings.events_prune_days > 0:
|
||||
try:
|
||||
from swarm.event_log import prune_old_events
|
||||
@@ -409,7 +429,6 @@ async def lifespan(app: FastAPI):
|
||||
except Exception as exc:
|
||||
logger.debug("Event auto-prune skipped: %s", exc)
|
||||
|
||||
# Warn if memory vault exceeds size limit
|
||||
if settings.memory_vault_max_mb > 0:
|
||||
try:
|
||||
vault_path = Path(settings.repo_root) / "memory" / "notes"
|
||||
@@ -425,37 +444,18 @@ async def lifespan(app: FastAPI):
|
||||
except Exception as exc:
|
||||
logger.debug("Vault size check skipped: %s", exc)
|
||||
|
||||
# Start Workshop presence heartbeat with WS relay
|
||||
from dashboard.routes.world import broadcast_world_state
|
||||
from timmy.workshop_state import WorkshopHeartbeat
|
||||
|
||||
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
|
||||
await workshop_heartbeat.start()
|
||||
|
||||
# Start chat integrations in background
|
||||
chat_task = asyncio.create_task(_start_chat_integrations_background())
|
||||
|
||||
# Register session logger with error capture (breaks infrastructure → timmy circular dep)
|
||||
try:
|
||||
from infrastructure.error_capture import register_error_recorder
|
||||
from timmy.session_logger import get_session_logger
|
||||
|
||||
register_error_recorder(get_session_logger().record_error)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
logger.info("✓ Dashboard ready for requests")
|
||||
|
||||
yield
|
||||
|
||||
# Cleanup on shutdown
|
||||
async def _shutdown_cleanup(
|
||||
bg_tasks: list[asyncio.Task],
|
||||
workshop_heartbeat,
|
||||
) -> None:
|
||||
"""Stop chat bots, MCP sessions, heartbeat, and cancel background tasks."""
|
||||
from integrations.chat_bridge.vendors.discord import discord_bot
|
||||
from integrations.telegram_bot.bot import telegram_bot
|
||||
|
||||
await discord_bot.stop()
|
||||
await telegram_bot.stop()
|
||||
|
||||
# Close MCP tool server sessions
|
||||
try:
|
||||
from timmy.mcp_tools import close_mcp_sessions
|
||||
|
||||
@@ -465,13 +465,42 @@ async def lifespan(app: FastAPI):
|
||||
|
||||
await workshop_heartbeat.stop()
|
||||
|
||||
for task in [briefing_task, thinking_task, chat_task, loop_qa_task, presence_task]:
|
||||
if task:
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
for task in bg_tasks:
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""Application lifespan manager with non-blocking startup."""
|
||||
_startup_init()
|
||||
bg_tasks = _startup_background_tasks()
|
||||
_startup_pruning()
|
||||
|
||||
# Start Workshop presence heartbeat with WS relay
|
||||
from dashboard.routes.world import broadcast_world_state
|
||||
from timmy.workshop_state import WorkshopHeartbeat
|
||||
|
||||
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
|
||||
await workshop_heartbeat.start()
|
||||
|
||||
# Register session logger with error capture
|
||||
try:
|
||||
from infrastructure.error_capture import register_error_recorder
|
||||
from timmy.session_logger import get_session_logger
|
||||
|
||||
register_error_recorder(get_session_logger().record_error)
|
||||
except Exception:
|
||||
logger.debug("Failed to register error recorder")
|
||||
|
||||
logger.info("✓ Dashboard ready for requests")
|
||||
|
||||
yield
|
||||
|
||||
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
|
||||
@@ -100,7 +100,7 @@ class CSRFMiddleware(BaseHTTPMiddleware):
|
||||
...
|
||||
|
||||
Usage:
|
||||
app.add_middleware(CSRFMiddleware, secret="your-secret-key")
|
||||
app.add_middleware(CSRFMiddleware, secret=settings.csrf_secret)
|
||||
|
||||
Attributes:
|
||||
secret: Secret key for token signing (optional, for future use).
|
||||
|
||||
@@ -71,27 +71,87 @@ async def clear_history(request: Request):
|
||||
)
|
||||
|
||||
|
||||
@router.post("/default/chat", response_class=HTMLResponse)
|
||||
async def chat_agent(request: Request, message: str = Form(...)):
|
||||
"""Chat — synchronous response with native Agno tool confirmation."""
|
||||
def _validate_message(message: str) -> str:
|
||||
"""Strip and validate chat input; raise HTTPException on bad input."""
|
||||
from fastapi import HTTPException
|
||||
|
||||
message = message.strip()
|
||||
if not message:
|
||||
from fastapi import HTTPException
|
||||
|
||||
raise HTTPException(status_code=400, detail="Message cannot be empty")
|
||||
|
||||
if len(message) > MAX_MESSAGE_LENGTH:
|
||||
from fastapi import HTTPException
|
||||
|
||||
raise HTTPException(status_code=422, detail="Message too long")
|
||||
return message
|
||||
|
||||
# Record user activity so the thinking engine knows we're not idle
|
||||
|
||||
def _record_user_activity() -> None:
|
||||
"""Notify the thinking engine that the user is active."""
|
||||
try:
|
||||
from timmy.thinking import thinking_engine
|
||||
|
||||
thinking_engine.record_user_input()
|
||||
except Exception:
|
||||
pass
|
||||
logger.debug("Failed to record user input for thinking engine")
|
||||
|
||||
|
||||
def _extract_tool_actions(run_output) -> list[dict]:
|
||||
"""If Agno paused the run for tool confirmation, build approval items."""
|
||||
from timmy.approvals import create_item
|
||||
|
||||
tool_actions: list[dict] = []
|
||||
status = getattr(run_output, "status", None)
|
||||
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
|
||||
|
||||
if not (is_paused and getattr(run_output, "active_requirements", None)):
|
||||
return tool_actions
|
||||
|
||||
for req in run_output.active_requirements:
|
||||
if not getattr(req, "needs_confirmation", False):
|
||||
continue
|
||||
te = req.tool_execution
|
||||
tool_name = getattr(te, "tool_name", "unknown")
|
||||
tool_args = getattr(te, "tool_args", {}) or {}
|
||||
|
||||
item = create_item(
|
||||
title=f"Dashboard: {tool_name}",
|
||||
description=format_action_description(tool_name, tool_args),
|
||||
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
|
||||
impact=get_impact_level(tool_name),
|
||||
)
|
||||
_pending_runs[item.id] = {
|
||||
"run_output": run_output,
|
||||
"requirement": req,
|
||||
"tool_name": tool_name,
|
||||
"tool_args": tool_args,
|
||||
}
|
||||
tool_actions.append(
|
||||
{
|
||||
"approval_id": item.id,
|
||||
"tool_name": tool_name,
|
||||
"description": format_action_description(tool_name, tool_args),
|
||||
"impact": get_impact_level(tool_name),
|
||||
}
|
||||
)
|
||||
return tool_actions
|
||||
|
||||
|
||||
def _log_exchange(
|
||||
message: str, response_text: str | None, error_text: str | None, timestamp: str
|
||||
) -> None:
|
||||
"""Append user message and agent/error reply to the in-memory log."""
|
||||
message_log.append(role="user", content=message, timestamp=timestamp, source="browser")
|
||||
if response_text:
|
||||
message_log.append(
|
||||
role="agent", content=response_text, timestamp=timestamp, source="browser"
|
||||
)
|
||||
elif error_text:
|
||||
message_log.append(role="error", content=error_text, timestamp=timestamp, source="browser")
|
||||
|
||||
|
||||
@router.post("/default/chat", response_class=HTMLResponse)
|
||||
async def chat_agent(request: Request, message: str = Form(...)):
|
||||
"""Chat — synchronous response with native Agno tool confirmation."""
|
||||
message = _validate_message(message)
|
||||
_record_user_activity()
|
||||
|
||||
timestamp = datetime.now().strftime("%H:%M:%S")
|
||||
response_text = None
|
||||
@@ -104,54 +164,15 @@ async def chat_agent(request: Request, message: str = Form(...)):
|
||||
error_text = f"Chat error: {exc}"
|
||||
run_output = None
|
||||
|
||||
# Check if Agno paused the run for tool confirmation
|
||||
tool_actions = []
|
||||
tool_actions: list[dict] = []
|
||||
if run_output is not None:
|
||||
status = getattr(run_output, "status", None)
|
||||
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
|
||||
|
||||
if is_paused and getattr(run_output, "active_requirements", None):
|
||||
for req in run_output.active_requirements:
|
||||
if getattr(req, "needs_confirmation", False):
|
||||
te = req.tool_execution
|
||||
tool_name = getattr(te, "tool_name", "unknown")
|
||||
tool_args = getattr(te, "tool_args", {}) or {}
|
||||
|
||||
from timmy.approvals import create_item
|
||||
|
||||
item = create_item(
|
||||
title=f"Dashboard: {tool_name}",
|
||||
description=format_action_description(tool_name, tool_args),
|
||||
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
|
||||
impact=get_impact_level(tool_name),
|
||||
)
|
||||
_pending_runs[item.id] = {
|
||||
"run_output": run_output,
|
||||
"requirement": req,
|
||||
"tool_name": tool_name,
|
||||
"tool_args": tool_args,
|
||||
}
|
||||
tool_actions.append(
|
||||
{
|
||||
"approval_id": item.id,
|
||||
"tool_name": tool_name,
|
||||
"description": format_action_description(tool_name, tool_args),
|
||||
"impact": get_impact_level(tool_name),
|
||||
}
|
||||
)
|
||||
|
||||
tool_actions = _extract_tool_actions(run_output)
|
||||
raw_content = run_output.content if hasattr(run_output, "content") else ""
|
||||
response_text = _clean_response(raw_content or "")
|
||||
if not response_text and not tool_actions:
|
||||
response_text = None # let error template show if needed
|
||||
response_text = None
|
||||
|
||||
message_log.append(role="user", content=message, timestamp=timestamp, source="browser")
|
||||
if response_text:
|
||||
message_log.append(
|
||||
role="agent", content=response_text, timestamp=timestamp, source="browser"
|
||||
)
|
||||
elif error_text:
|
||||
message_log.append(role="error", content=error_text, timestamp=timestamp, source="browser")
|
||||
_log_exchange(message, response_text, error_text, timestamp)
|
||||
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
|
||||
@@ -31,6 +31,93 @@ _UPLOAD_DIR = str(Path(settings.repo_root) / "data" / "chat-uploads")
|
||||
_MAX_UPLOAD_SIZE = 50 * 1024 * 1024 # 50 MB
|
||||
|
||||
|
||||
# ── POST /api/chat — helpers ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def _parse_chat_body(request: Request) -> tuple[dict | None, JSONResponse | None]:
|
||||
"""Parse and validate the JSON request body.
|
||||
|
||||
Returns (body, None) on success or (None, error_response) on failure.
|
||||
"""
|
||||
content_length = request.headers.get("content-length")
|
||||
if content_length and int(content_length) > settings.chat_api_max_body_bytes:
|
||||
return None, JSONResponse(status_code=413, content={"error": "Request body too large"})
|
||||
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception as exc:
|
||||
logger.warning("Chat API JSON parse error: %s", exc)
|
||||
return None, JSONResponse(status_code=400, content={"error": "Invalid JSON"})
|
||||
|
||||
messages = body.get("messages")
|
||||
if not messages or not isinstance(messages, list):
|
||||
return None, JSONResponse(status_code=400, content={"error": "messages array is required"})
|
||||
|
||||
return body, None
|
||||
|
||||
|
||||
def _extract_user_message(messages: list[dict]) -> str | None:
|
||||
"""Return the text of the last user message, or *None* if absent."""
|
||||
for msg in reversed(messages):
|
||||
if msg.get("role") == "user":
|
||||
content = msg.get("content", "")
|
||||
if isinstance(content, list):
|
||||
text_parts = [
|
||||
p.get("text", "")
|
||||
for p in content
|
||||
if isinstance(p, dict) and p.get("type") == "text"
|
||||
]
|
||||
return " ".join(text_parts).strip() or None
|
||||
text = str(content).strip()
|
||||
return text or None
|
||||
return None
|
||||
|
||||
|
||||
def _build_context_prefix() -> str:
|
||||
"""Build the system-context preamble injected before the user message."""
|
||||
now = datetime.now()
|
||||
return (
|
||||
f"[System: Current date/time is "
|
||||
f"{now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n"
|
||||
f"[System: Mobile client]\n\n"
|
||||
)
|
||||
|
||||
|
||||
def _notify_thinking_engine() -> None:
|
||||
"""Record user activity so the thinking engine knows we're not idle."""
|
||||
try:
|
||||
from timmy.thinking import thinking_engine
|
||||
|
||||
thinking_engine.record_user_input()
|
||||
except Exception:
|
||||
logger.debug("Failed to record user input for thinking engine")
|
||||
|
||||
|
||||
async def _process_chat(user_msg: str) -> dict | JSONResponse:
|
||||
"""Send *user_msg* to the agent, log the exchange, and return a response."""
|
||||
_notify_thinking_engine()
|
||||
timestamp = datetime.now().strftime("%H:%M:%S")
|
||||
|
||||
try:
|
||||
response_text = await agent_chat(
|
||||
_build_context_prefix() + user_msg,
|
||||
session_id="mobile",
|
||||
)
|
||||
message_log.append(role="user", content=user_msg, timestamp=timestamp, source="api")
|
||||
message_log.append(role="agent", content=response_text, timestamp=timestamp, source="api")
|
||||
return {"reply": response_text, "timestamp": timestamp}
|
||||
|
||||
except Exception as exc:
|
||||
error_msg = f"Agent is offline: {exc}"
|
||||
logger.error("api_chat error: %s", exc)
|
||||
message_log.append(role="user", content=user_msg, timestamp=timestamp, source="api")
|
||||
message_log.append(role="error", content=error_msg, timestamp=timestamp, source="api")
|
||||
return JSONResponse(
|
||||
status_code=503,
|
||||
content={"error": error_msg, "timestamp": timestamp},
|
||||
)
|
||||
|
||||
|
||||
# ── POST /api/chat ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -44,78 +131,15 @@ async def api_chat(request: Request):
|
||||
Response:
|
||||
{"reply": "...", "timestamp": "HH:MM:SS"}
|
||||
"""
|
||||
# Enforce request body size limit
|
||||
content_length = request.headers.get("content-length")
|
||||
if content_length and int(content_length) > settings.chat_api_max_body_bytes:
|
||||
return JSONResponse(status_code=413, content={"error": "Request body too large"})
|
||||
body, err = await _parse_chat_body(request)
|
||||
if err:
|
||||
return err
|
||||
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception as exc:
|
||||
logger.warning("Chat API JSON parse error: %s", exc)
|
||||
return JSONResponse(status_code=400, content={"error": "Invalid JSON"})
|
||||
|
||||
messages = body.get("messages")
|
||||
if not messages or not isinstance(messages, list):
|
||||
return JSONResponse(status_code=400, content={"error": "messages array is required"})
|
||||
|
||||
# Extract the latest user message text
|
||||
last_user_msg = None
|
||||
for msg in reversed(messages):
|
||||
if msg.get("role") == "user":
|
||||
content = msg.get("content", "")
|
||||
# Handle multimodal content arrays — extract text parts
|
||||
if isinstance(content, list):
|
||||
text_parts = [
|
||||
p.get("text", "")
|
||||
for p in content
|
||||
if isinstance(p, dict) and p.get("type") == "text"
|
||||
]
|
||||
last_user_msg = " ".join(text_parts).strip()
|
||||
else:
|
||||
last_user_msg = str(content).strip()
|
||||
break
|
||||
|
||||
if not last_user_msg:
|
||||
user_msg = _extract_user_message(body["messages"])
|
||||
if not user_msg:
|
||||
return JSONResponse(status_code=400, content={"error": "No user message found"})
|
||||
|
||||
# Record user activity so the thinking engine knows we're not idle
|
||||
try:
|
||||
from timmy.thinking import thinking_engine
|
||||
|
||||
thinking_engine.record_user_input()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
timestamp = datetime.now().strftime("%H:%M:%S")
|
||||
|
||||
try:
|
||||
# Inject context (same pattern as the HTMX chat handler in agents.py)
|
||||
now = datetime.now()
|
||||
context_prefix = (
|
||||
f"[System: Current date/time is "
|
||||
f"{now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n"
|
||||
f"[System: Mobile client]\n\n"
|
||||
)
|
||||
response_text = await agent_chat(
|
||||
context_prefix + last_user_msg,
|
||||
session_id="mobile",
|
||||
)
|
||||
|
||||
message_log.append(role="user", content=last_user_msg, timestamp=timestamp, source="api")
|
||||
message_log.append(role="agent", content=response_text, timestamp=timestamp, source="api")
|
||||
|
||||
return {"reply": response_text, "timestamp": timestamp}
|
||||
|
||||
except Exception as exc:
|
||||
error_msg = f"Agent is offline: {exc}"
|
||||
logger.error("api_chat error: %s", exc)
|
||||
message_log.append(role="user", content=last_user_msg, timestamp=timestamp, source="api")
|
||||
message_log.append(role="error", content=error_msg, timestamp=timestamp, source="api")
|
||||
return JSONResponse(
|
||||
status_code=503,
|
||||
content={"error": error_msg, "timestamp": timestamp},
|
||||
)
|
||||
return await _process_chat(user_msg)
|
||||
|
||||
|
||||
# ── POST /api/upload ──────────────────────────────────────────────────────────
|
||||
|
||||
@@ -65,7 +65,7 @@ def _check_ollama_sync() -> DependencyStatus:
|
||||
try:
|
||||
import urllib.request
|
||||
|
||||
url = settings.ollama_url.replace("localhost", "127.0.0.1")
|
||||
url = settings.normalized_ollama_url
|
||||
req = urllib.request.Request(
|
||||
f"{url}/api/tags",
|
||||
method="GET",
|
||||
|
||||
@@ -166,7 +166,7 @@ async def api_briefing_status():
|
||||
if cached:
|
||||
last_generated = cached.generated_at.isoformat()
|
||||
except Exception:
|
||||
pass
|
||||
logger.debug("Failed to read briefing cache")
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
@@ -190,6 +190,7 @@ async def api_memory_status():
|
||||
stats = get_memory_stats()
|
||||
indexed_files = stats.get("total_entries", 0)
|
||||
except Exception:
|
||||
logger.debug("Failed to get memory stats")
|
||||
indexed_files = 0
|
||||
|
||||
return JSONResponse(
|
||||
@@ -215,7 +216,7 @@ async def api_swarm_status():
|
||||
).fetchone()
|
||||
pending_tasks = row["cnt"] if row else 0
|
||||
except Exception:
|
||||
pass
|
||||
logger.debug("Failed to count pending tasks")
|
||||
|
||||
return JSONResponse(
|
||||
{
|
||||
|
||||
@@ -221,7 +221,7 @@ async def _heartbeat(websocket: WebSocket) -> None:
|
||||
await asyncio.sleep(_HEARTBEAT_INTERVAL)
|
||||
await websocket.send_text(json.dumps({"type": "ping"}))
|
||||
except Exception:
|
||||
pass # connection gone — receive loop will clean up
|
||||
logger.debug("Heartbeat stopped — connection gone")
|
||||
|
||||
|
||||
@router.websocket("/ws")
|
||||
@@ -250,7 +250,7 @@ async def world_ws(websocket: WebSocket) -> None:
|
||||
raw = await websocket.receive_text()
|
||||
await _handle_client_message(raw)
|
||||
except Exception:
|
||||
pass
|
||||
logger.debug("WebSocket receive loop ended")
|
||||
finally:
|
||||
ping_task.cancel()
|
||||
if websocket in _ws_clients:
|
||||
@@ -265,6 +265,7 @@ async def _broadcast(message: str) -> None:
|
||||
try:
|
||||
await ws.send_text(message)
|
||||
except Exception:
|
||||
logger.debug("Pruning dead WebSocket client")
|
||||
dead.append(ws)
|
||||
for ws in dead:
|
||||
if ws in _ws_clients:
|
||||
@@ -340,7 +341,7 @@ async def _bark_and_broadcast(visitor_text: str) -> None:
|
||||
|
||||
pip_familiar.on_event("visitor_spoke")
|
||||
except Exception:
|
||||
pass # Pip is optional
|
||||
logger.debug("Pip familiar notification failed (optional)")
|
||||
|
||||
_refresh_ground(visitor_text)
|
||||
_tick_commitments()
|
||||
|
||||
@@ -100,36 +100,14 @@ def _get_git_context() -> dict:
|
||||
return {"branch": "unknown", "commit": "unknown"}
|
||||
|
||||
|
||||
def capture_error(
|
||||
exc: Exception,
|
||||
source: str = "unknown",
|
||||
context: dict | None = None,
|
||||
) -> str | None:
|
||||
"""Capture an error and optionally create a bug report.
|
||||
|
||||
Args:
|
||||
exc: The exception to capture
|
||||
source: Module/component where the error occurred
|
||||
context: Optional dict of extra context (request path, etc.)
|
||||
def _extract_traceback_info(exc: Exception) -> tuple[str, str, int]:
|
||||
"""Extract formatted traceback, affected file, and line number.
|
||||
|
||||
Returns:
|
||||
Task ID of the created bug report, or None if deduplicated/disabled
|
||||
Tuple of (traceback_string, affected_file, affected_line).
|
||||
"""
|
||||
from config import settings
|
||||
|
||||
if not settings.error_feedback_enabled:
|
||||
return None
|
||||
|
||||
error_hash = _stack_hash(exc)
|
||||
|
||||
if _is_duplicate(error_hash):
|
||||
logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
|
||||
return None
|
||||
|
||||
# Format the stack trace
|
||||
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
|
||||
|
||||
# Extract file/line from traceback
|
||||
tb_obj = exc.__traceback__
|
||||
affected_file = "unknown"
|
||||
affected_line = 0
|
||||
@@ -139,9 +117,18 @@ def capture_error(
|
||||
affected_file = tb_obj.tb_frame.f_code.co_filename
|
||||
affected_line = tb_obj.tb_lineno
|
||||
|
||||
git_ctx = _get_git_context()
|
||||
return tb_str, affected_file, affected_line
|
||||
|
||||
# 1. Log to event_log
|
||||
|
||||
def _log_error_event(
|
||||
exc: Exception,
|
||||
source: str,
|
||||
error_hash: str,
|
||||
affected_file: str,
|
||||
affected_line: int,
|
||||
git_ctx: dict,
|
||||
) -> None:
|
||||
"""Log the captured error to the event log."""
|
||||
try:
|
||||
from swarm.event_log import EventType, log_event
|
||||
|
||||
@@ -161,8 +148,18 @@ def capture_error(
|
||||
except Exception as log_exc:
|
||||
logger.debug("Failed to log error event: %s", log_exc)
|
||||
|
||||
# 2. Create bug report task
|
||||
task_id = None
|
||||
|
||||
def _create_bug_report(
|
||||
exc: Exception,
|
||||
source: str,
|
||||
context: dict | None,
|
||||
error_hash: str,
|
||||
tb_str: str,
|
||||
affected_file: str,
|
||||
affected_line: int,
|
||||
git_ctx: dict,
|
||||
) -> str | None:
|
||||
"""Create a bug report task and return the task ID (or None on failure)."""
|
||||
try:
|
||||
from swarm.task_queue.models import create_task
|
||||
|
||||
@@ -195,7 +192,6 @@ def capture_error(
|
||||
)
|
||||
task_id = task.id
|
||||
|
||||
# Log the creation event
|
||||
try:
|
||||
from swarm.event_log import EventType, log_event
|
||||
|
||||
@@ -210,12 +206,16 @@ def capture_error(
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Bug report screenshot error: %s", exc)
|
||||
pass
|
||||
|
||||
return task_id
|
||||
|
||||
except Exception as task_exc:
|
||||
logger.debug("Failed to create bug report task: %s", task_exc)
|
||||
return None
|
||||
|
||||
# 3. Send notification
|
||||
|
||||
def _notify_bug_report(exc: Exception, source: str) -> None:
|
||||
"""Send a push notification about the captured error."""
|
||||
try:
|
||||
from infrastructure.notifications.push import notifier
|
||||
|
||||
@@ -224,11 +224,12 @@ def capture_error(
|
||||
message=f"{type(exc).__name__} in {source}: {str(exc)[:80]}",
|
||||
category="system",
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Bug report notification error: %s", exc)
|
||||
pass
|
||||
except Exception as notify_exc:
|
||||
logger.warning("Bug report notification error: %s", notify_exc)
|
||||
|
||||
# 4. Record in session logger (via registered callback)
|
||||
|
||||
def _record_to_session(exc: Exception, source: str) -> None:
|
||||
"""Record the error via the registered session callback."""
|
||||
if _error_recorder is not None:
|
||||
try:
|
||||
_error_recorder(
|
||||
@@ -238,4 +239,50 @@ def capture_error(
|
||||
except Exception as log_exc:
|
||||
logger.warning("Bug report session logging error: %s", log_exc)
|
||||
|
||||
|
||||
def capture_error(
|
||||
exc: Exception,
|
||||
source: str = "unknown",
|
||||
context: dict | None = None,
|
||||
) -> str | None:
|
||||
"""Capture an error and optionally create a bug report.
|
||||
|
||||
Args:
|
||||
exc: The exception to capture
|
||||
source: Module/component where the error occurred
|
||||
context: Optional dict of extra context (request path, etc.)
|
||||
|
||||
Returns:
|
||||
Task ID of the created bug report, or None if deduplicated/disabled
|
||||
"""
|
||||
from config import settings
|
||||
|
||||
if not settings.error_feedback_enabled:
|
||||
return None
|
||||
|
||||
error_hash = _stack_hash(exc)
|
||||
|
||||
if _is_duplicate(error_hash):
|
||||
logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
|
||||
return None
|
||||
|
||||
tb_str, affected_file, affected_line = _extract_traceback_info(exc)
|
||||
git_ctx = _get_git_context()
|
||||
|
||||
_log_error_event(exc, source, error_hash, affected_file, affected_line, git_ctx)
|
||||
|
||||
task_id = _create_bug_report(
|
||||
exc,
|
||||
source,
|
||||
context,
|
||||
error_hash,
|
||||
tb_str,
|
||||
affected_file,
|
||||
affected_line,
|
||||
git_ctx,
|
||||
)
|
||||
|
||||
_notify_bug_report(exc, source)
|
||||
_record_to_session(exc, source)
|
||||
|
||||
return task_id
|
||||
|
||||
@@ -144,6 +144,65 @@ class ShellHand:
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _build_run_env(env: dict | None) -> dict:
|
||||
"""Merge *env* overrides into a copy of the current environment."""
|
||||
import os
|
||||
|
||||
run_env = os.environ.copy()
|
||||
if env:
|
||||
run_env.update(env)
|
||||
return run_env
|
||||
|
||||
async def _execute_subprocess(
|
||||
self,
|
||||
command: str,
|
||||
effective_timeout: int,
|
||||
cwd: str | None,
|
||||
run_env: dict,
|
||||
start: float,
|
||||
) -> ShellResult:
|
||||
"""Run *command* as a subprocess with timeout enforcement."""
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
command,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=cwd,
|
||||
env=run_env,
|
||||
)
|
||||
|
||||
try:
|
||||
stdout_bytes, stderr_bytes = await asyncio.wait_for(
|
||||
proc.communicate(), timeout=effective_timeout
|
||||
)
|
||||
except TimeoutError:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
latency = (time.time() - start) * 1000
|
||||
logger.warning("Shell command timed out after %ds: %s", effective_timeout, command)
|
||||
return ShellResult(
|
||||
command=command,
|
||||
success=False,
|
||||
exit_code=-1,
|
||||
error=f"Command timed out after {effective_timeout}s",
|
||||
latency_ms=latency,
|
||||
timed_out=True,
|
||||
)
|
||||
|
||||
latency = (time.time() - start) * 1000
|
||||
exit_code = proc.returncode if proc.returncode is not None else -1
|
||||
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
|
||||
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
|
||||
|
||||
return ShellResult(
|
||||
command=command,
|
||||
success=exit_code == 0,
|
||||
exit_code=exit_code,
|
||||
stdout=stdout,
|
||||
stderr=stderr,
|
||||
latency_ms=latency,
|
||||
)
|
||||
|
||||
async def run(
|
||||
self,
|
||||
command: str,
|
||||
@@ -164,7 +223,6 @@ class ShellHand:
|
||||
"""
|
||||
start = time.time()
|
||||
|
||||
# Validate
|
||||
validation_error = self._validate_command(command)
|
||||
if validation_error:
|
||||
return ShellResult(
|
||||
@@ -178,52 +236,8 @@ class ShellHand:
|
||||
cwd = working_dir or self._working_dir
|
||||
|
||||
try:
|
||||
import os
|
||||
|
||||
run_env = os.environ.copy()
|
||||
if env:
|
||||
run_env.update(env)
|
||||
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
command,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=cwd,
|
||||
env=run_env,
|
||||
)
|
||||
|
||||
try:
|
||||
stdout_bytes, stderr_bytes = await asyncio.wait_for(
|
||||
proc.communicate(), timeout=effective_timeout
|
||||
)
|
||||
except TimeoutError:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
latency = (time.time() - start) * 1000
|
||||
logger.warning("Shell command timed out after %ds: %s", effective_timeout, command)
|
||||
return ShellResult(
|
||||
command=command,
|
||||
success=False,
|
||||
exit_code=-1,
|
||||
error=f"Command timed out after {effective_timeout}s",
|
||||
latency_ms=latency,
|
||||
timed_out=True,
|
||||
)
|
||||
|
||||
latency = (time.time() - start) * 1000
|
||||
exit_code = proc.returncode if proc.returncode is not None else -1
|
||||
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
|
||||
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
|
||||
|
||||
return ShellResult(
|
||||
command=command,
|
||||
success=exit_code == 0,
|
||||
exit_code=exit_code,
|
||||
stdout=stdout,
|
||||
stderr=stderr,
|
||||
latency_ms=latency,
|
||||
)
|
||||
|
||||
run_env = self._build_run_env(env)
|
||||
return await self._execute_subprocess(command, effective_timeout, cwd, run_env, start)
|
||||
except Exception as exc:
|
||||
latency = (time.time() - start) * 1000
|
||||
logger.warning("Shell command failed: %s — %s", command, exc)
|
||||
|
||||
@@ -13,7 +13,7 @@ import logging
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum, auto
|
||||
|
||||
from config import settings
|
||||
from config import normalize_ollama_url, settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -307,7 +307,7 @@ class MultiModalManager:
|
||||
import json
|
||||
import urllib.request
|
||||
|
||||
url = self.ollama_url.replace("localhost", "127.0.0.1")
|
||||
url = normalize_ollama_url(self.ollama_url)
|
||||
req = urllib.request.Request(
|
||||
f"{url}/api/tags",
|
||||
method="GET",
|
||||
@@ -462,7 +462,7 @@ class MultiModalManager:
|
||||
|
||||
logger.info("Pulling model: %s", model_name)
|
||||
|
||||
url = self.ollama_url.replace("localhost", "127.0.0.1")
|
||||
url = normalize_ollama_url(self.ollama_url)
|
||||
req = urllib.request.Request(
|
||||
f"{url}/api/pull",
|
||||
method="POST",
|
||||
|
||||
@@ -18,6 +18,8 @@ from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
@@ -100,7 +102,7 @@ class Provider:
|
||||
"""LLM provider configuration and state."""
|
||||
|
||||
name: str
|
||||
type: str # ollama, openai, anthropic, airllm
|
||||
type: str # ollama, openai, anthropic
|
||||
enabled: bool
|
||||
priority: int
|
||||
url: str | None = None
|
||||
@@ -301,22 +303,13 @@ class CascadeRouter:
|
||||
# Can't check without requests, assume available
|
||||
return True
|
||||
try:
|
||||
url = provider.url or "http://localhost:11434"
|
||||
url = provider.url or settings.ollama_url
|
||||
response = requests.get(f"{url}/api/tags", timeout=5)
|
||||
return response.status_code == 200
|
||||
except Exception as exc:
|
||||
logger.debug("Ollama provider check error: %s", exc)
|
||||
return False
|
||||
|
||||
elif provider.type == "airllm":
|
||||
# Check if airllm is installed
|
||||
try:
|
||||
import importlib.util
|
||||
|
||||
return importlib.util.find_spec("airllm") is not None
|
||||
except (ImportError, ModuleNotFoundError):
|
||||
return False
|
||||
|
||||
elif provider.type in ("openai", "anthropic", "grok"):
|
||||
# Check if API key is set
|
||||
return provider.api_key is not None and provider.api_key != ""
|
||||
@@ -395,6 +388,101 @@ class CascadeRouter:
|
||||
|
||||
return None
|
||||
|
||||
def _select_model(
|
||||
self, provider: Provider, model: str | None, content_type: ContentType
|
||||
) -> tuple[str | None, bool]:
|
||||
"""Select the best model for the request, with vision fallback.
|
||||
|
||||
Returns:
|
||||
Tuple of (selected_model, is_fallback_model).
|
||||
"""
|
||||
selected_model = model or provider.get_default_model()
|
||||
is_fallback = False
|
||||
|
||||
if content_type != ContentType.TEXT and selected_model:
|
||||
if provider.type == "ollama" and self._mm_manager:
|
||||
from infrastructure.models.multimodal import ModelCapability
|
||||
|
||||
if content_type == ContentType.VISION:
|
||||
supports = self._mm_manager.model_supports(
|
||||
selected_model, ModelCapability.VISION
|
||||
)
|
||||
if not supports:
|
||||
fallback = self._get_fallback_model(provider, selected_model, content_type)
|
||||
if fallback:
|
||||
logger.info(
|
||||
"Model %s doesn't support vision, falling back to %s",
|
||||
selected_model,
|
||||
fallback,
|
||||
)
|
||||
selected_model = fallback
|
||||
is_fallback = True
|
||||
else:
|
||||
logger.warning(
|
||||
"No vision-capable model found on %s, trying anyway",
|
||||
provider.name,
|
||||
)
|
||||
|
||||
return selected_model, is_fallback
|
||||
|
||||
async def _attempt_with_retry(
|
||||
self,
|
||||
provider: Provider,
|
||||
messages: list[dict],
|
||||
model: str | None,
|
||||
temperature: float,
|
||||
max_tokens: int | None,
|
||||
content_type: ContentType,
|
||||
) -> dict:
|
||||
"""Try a provider with retries, returning the result dict.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If all retry attempts fail.
|
||||
Returns error strings collected during retries via the exception message.
|
||||
"""
|
||||
errors: list[str] = []
|
||||
for attempt in range(self.config.max_retries_per_provider):
|
||||
try:
|
||||
return await self._try_provider(
|
||||
provider=provider,
|
||||
messages=messages,
|
||||
model=model,
|
||||
temperature=temperature,
|
||||
max_tokens=max_tokens,
|
||||
content_type=content_type,
|
||||
)
|
||||
except Exception as exc:
|
||||
error_msg = str(exc)
|
||||
logger.warning(
|
||||
"Provider %s attempt %d failed: %s",
|
||||
provider.name,
|
||||
attempt + 1,
|
||||
error_msg,
|
||||
)
|
||||
errors.append(f"{provider.name}: {error_msg}")
|
||||
|
||||
if attempt < self.config.max_retries_per_provider - 1:
|
||||
await asyncio.sleep(self.config.retry_delay_seconds)
|
||||
|
||||
raise RuntimeError("; ".join(errors))
|
||||
|
||||
def _is_provider_available(self, provider: Provider) -> bool:
|
||||
"""Check if a provider should be tried (enabled + circuit breaker)."""
|
||||
if not provider.enabled:
|
||||
logger.debug("Skipping %s (disabled)", provider.name)
|
||||
return False
|
||||
|
||||
if provider.status == ProviderStatus.UNHEALTHY:
|
||||
if self._can_close_circuit(provider):
|
||||
provider.circuit_state = CircuitState.HALF_OPEN
|
||||
provider.half_open_calls = 0
|
||||
logger.info("Circuit breaker half-open for %s", provider.name)
|
||||
else:
|
||||
logger.debug("Skipping %s (circuit open)", provider.name)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def complete(
|
||||
self,
|
||||
messages: list[dict],
|
||||
@@ -421,7 +509,6 @@ class CascadeRouter:
|
||||
Raises:
|
||||
RuntimeError: If all providers fail
|
||||
"""
|
||||
# Detect content type for multi-modal routing
|
||||
content_type = self._detect_content_type(messages)
|
||||
if content_type != ContentType.TEXT:
|
||||
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
|
||||
@@ -429,93 +516,34 @@ class CascadeRouter:
|
||||
errors = []
|
||||
|
||||
for provider in self.providers:
|
||||
# Skip disabled providers
|
||||
if not provider.enabled:
|
||||
logger.debug("Skipping %s (disabled)", provider.name)
|
||||
if not self._is_provider_available(provider):
|
||||
continue
|
||||
|
||||
# Skip unhealthy providers (circuit breaker)
|
||||
if provider.status == ProviderStatus.UNHEALTHY:
|
||||
# Check if circuit breaker can close
|
||||
if self._can_close_circuit(provider):
|
||||
provider.circuit_state = CircuitState.HALF_OPEN
|
||||
provider.half_open_calls = 0
|
||||
logger.info("Circuit breaker half-open for %s", provider.name)
|
||||
else:
|
||||
logger.debug("Skipping %s (circuit open)", provider.name)
|
||||
continue
|
||||
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
|
||||
|
||||
# Determine which model to use
|
||||
selected_model = model or provider.get_default_model()
|
||||
is_fallback_model = False
|
||||
try:
|
||||
result = await self._attempt_with_retry(
|
||||
provider,
|
||||
messages,
|
||||
selected_model,
|
||||
temperature,
|
||||
max_tokens,
|
||||
content_type,
|
||||
)
|
||||
except RuntimeError as exc:
|
||||
errors.append(str(exc))
|
||||
self._record_failure(provider)
|
||||
continue
|
||||
|
||||
# For non-text content, check if model supports it
|
||||
if content_type != ContentType.TEXT and selected_model:
|
||||
if provider.type == "ollama" and self._mm_manager:
|
||||
from infrastructure.models.multimodal import ModelCapability
|
||||
self._record_success(provider, result.get("latency_ms", 0))
|
||||
return {
|
||||
"content": result["content"],
|
||||
"provider": provider.name,
|
||||
"model": result.get("model", selected_model or provider.get_default_model()),
|
||||
"latency_ms": result.get("latency_ms", 0),
|
||||
"is_fallback_model": is_fallback_model,
|
||||
}
|
||||
|
||||
# Check if selected model supports the required capability
|
||||
if content_type == ContentType.VISION:
|
||||
supports = self._mm_manager.model_supports(
|
||||
selected_model, ModelCapability.VISION
|
||||
)
|
||||
if not supports:
|
||||
# Find fallback model
|
||||
fallback = self._get_fallback_model(
|
||||
provider, selected_model, content_type
|
||||
)
|
||||
if fallback:
|
||||
logger.info(
|
||||
"Model %s doesn't support vision, falling back to %s",
|
||||
selected_model,
|
||||
fallback,
|
||||
)
|
||||
selected_model = fallback
|
||||
is_fallback_model = True
|
||||
else:
|
||||
logger.warning(
|
||||
"No vision-capable model found on %s, trying anyway",
|
||||
provider.name,
|
||||
)
|
||||
|
||||
# Try this provider
|
||||
for attempt in range(self.config.max_retries_per_provider):
|
||||
try:
|
||||
result = await self._try_provider(
|
||||
provider=provider,
|
||||
messages=messages,
|
||||
model=selected_model,
|
||||
temperature=temperature,
|
||||
max_tokens=max_tokens,
|
||||
content_type=content_type,
|
||||
)
|
||||
|
||||
# Success! Update metrics and return
|
||||
self._record_success(provider, result.get("latency_ms", 0))
|
||||
return {
|
||||
"content": result["content"],
|
||||
"provider": provider.name,
|
||||
"model": result.get(
|
||||
"model", selected_model or provider.get_default_model()
|
||||
),
|
||||
"latency_ms": result.get("latency_ms", 0),
|
||||
"is_fallback_model": is_fallback_model,
|
||||
}
|
||||
|
||||
except Exception as exc:
|
||||
error_msg = str(exc)
|
||||
logger.warning(
|
||||
"Provider %s attempt %d failed: %s", provider.name, attempt + 1, error_msg
|
||||
)
|
||||
errors.append(f"{provider.name}: {error_msg}")
|
||||
|
||||
if attempt < self.config.max_retries_per_provider - 1:
|
||||
await asyncio.sleep(self.config.retry_delay_seconds)
|
||||
|
||||
# All retries failed for this provider
|
||||
self._record_failure(provider)
|
||||
|
||||
# All providers failed
|
||||
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")
|
||||
|
||||
async def _try_provider(
|
||||
@@ -581,7 +609,7 @@ class CascadeRouter:
|
||||
"""Call Ollama API with multi-modal support."""
|
||||
import aiohttp
|
||||
|
||||
url = f"{provider.url}/api/chat"
|
||||
url = f"{provider.url or settings.ollama_url}/api/chat"
|
||||
|
||||
# Transform messages for Ollama format (including images)
|
||||
transformed_messages = self._transform_messages_for_ollama(messages)
|
||||
|
||||
@@ -1 +1 @@
|
||||
"""Timmy — Core AI agent (Ollama/AirLLM backends, CLI, prompts)."""
|
||||
"""Timmy — Core AI agent (Ollama/Grok/Claude backends, CLI, prompts)."""
|
||||
|
||||
@@ -26,12 +26,12 @@ from timmy.prompts import get_system_prompt
|
||||
from timmy.tools import create_full_toolkit
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from timmy.backends import ClaudeBackend, GrokBackend, TimmyAirLLMAgent
|
||||
from timmy.backends import ClaudeBackend, GrokBackend
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Union type for callers that want to hint the return type.
|
||||
TimmyAgent = Union[Agent, "TimmyAirLLMAgent", "GrokBackend", "ClaudeBackend"]
|
||||
TimmyAgent = Union[Agent, "GrokBackend", "ClaudeBackend"]
|
||||
|
||||
# Models known to be too small for reliable tool calling.
|
||||
# These hallucinate tool calls as text, invoke tools randomly,
|
||||
@@ -63,7 +63,7 @@ def _pull_model(model_name: str) -> bool:
|
||||
|
||||
logger.info("Pulling model: %s", model_name)
|
||||
|
||||
url = settings.ollama_url.replace("localhost", "127.0.0.1")
|
||||
url = settings.normalized_ollama_url
|
||||
req = urllib.request.Request(
|
||||
f"{url}/api/pull",
|
||||
method="POST",
|
||||
@@ -172,107 +172,34 @@ def _warmup_model(model_name: str) -> bool:
|
||||
|
||||
|
||||
def _resolve_backend(requested: str | None) -> str:
|
||||
"""Return the backend name to use, resolving 'auto' and explicit overrides.
|
||||
"""Return the backend name to use.
|
||||
|
||||
Priority (highest → lowest):
|
||||
Priority (highest -> lowest):
|
||||
1. CLI flag passed directly to create_timmy()
|
||||
2. TIMMY_MODEL_BACKEND env var / .env setting
|
||||
3. 'ollama' (safe default — no surprises)
|
||||
|
||||
'auto' triggers Apple Silicon detection: uses AirLLM if both
|
||||
is_apple_silicon() and airllm_available() return True.
|
||||
3. 'ollama' (safe default -- no surprises)
|
||||
"""
|
||||
if requested is not None:
|
||||
return requested
|
||||
|
||||
configured = settings.timmy_model_backend # "ollama" | "airllm" | "grok" | "claude" | "auto"
|
||||
if configured != "auto":
|
||||
return configured
|
||||
|
||||
# "auto" path — lazy import to keep startup fast and tests clean.
|
||||
from timmy.backends import airllm_available, is_apple_silicon
|
||||
|
||||
if is_apple_silicon() and airllm_available():
|
||||
return "airllm"
|
||||
return "ollama"
|
||||
return settings.timmy_model_backend # "ollama" | "grok" | "claude"
|
||||
|
||||
|
||||
def create_timmy(
|
||||
db_file: str = "timmy.db",
|
||||
backend: str | None = None,
|
||||
model_size: str | None = None,
|
||||
*,
|
||||
skip_mcp: bool = False,
|
||||
session_id: str = "unknown",
|
||||
) -> TimmyAgent:
|
||||
"""Instantiate the agent — Ollama or AirLLM, same public interface.
|
||||
def _build_tools_list(use_tools: bool, skip_mcp: bool, model_name: str) -> list:
|
||||
"""Assemble the tools list based on model capability and MCP flags.
|
||||
|
||||
Args:
|
||||
db_file: SQLite file for Agno conversation memory (Ollama path only).
|
||||
backend: "ollama" | "airllm" | "auto" | None (reads config/env).
|
||||
model_size: AirLLM size — "8b" | "70b" | "405b" | None (reads config).
|
||||
skip_mcp: If True, omit MCP tool servers (Gitea, filesystem).
|
||||
Use for background tasks (thinking, QA) where MCP's
|
||||
stdio cancel-scope lifecycle conflicts with asyncio
|
||||
task cancellation.
|
||||
|
||||
Returns an Agno Agent or backend-specific agent — all expose
|
||||
print_response(message, stream).
|
||||
Returns a list of Toolkit / MCPTools objects, or an empty list.
|
||||
"""
|
||||
resolved = _resolve_backend(backend)
|
||||
size = model_size or "70b"
|
||||
|
||||
if resolved == "claude":
|
||||
from timmy.backends import ClaudeBackend
|
||||
|
||||
return ClaudeBackend()
|
||||
|
||||
if resolved == "grok":
|
||||
from timmy.backends import GrokBackend
|
||||
|
||||
return GrokBackend()
|
||||
|
||||
if resolved == "airllm":
|
||||
from timmy.backends import TimmyAirLLMAgent
|
||||
|
||||
return TimmyAirLLMAgent(model_size=size)
|
||||
|
||||
# Default: Ollama via Agno.
|
||||
# Resolve model with automatic pulling and fallback
|
||||
model_name, is_fallback = _resolve_model_with_fallback(
|
||||
requested_model=None,
|
||||
require_vision=False,
|
||||
auto_pull=True,
|
||||
)
|
||||
|
||||
# If Ollama is completely unreachable, fail loudly.
|
||||
# Sovereignty: never silently send data to a cloud API.
|
||||
# Use --backend claude explicitly if you want cloud inference.
|
||||
if not _check_model_available(model_name):
|
||||
logger.error(
|
||||
"Ollama unreachable and no local models available. "
|
||||
"Start Ollama with 'ollama serve' or use --backend claude explicitly."
|
||||
)
|
||||
|
||||
if is_fallback:
|
||||
logger.info("Using fallback model %s (requested was unavailable)", model_name)
|
||||
|
||||
use_tools = _model_supports_tools(model_name)
|
||||
|
||||
# Conditionally include tools — small models get none
|
||||
toolkit = create_full_toolkit() if use_tools else None
|
||||
if not use_tools:
|
||||
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
|
||||
return []
|
||||
|
||||
# Build the tools list — Agno accepts a list of Toolkit / MCPTools
|
||||
tools_list: list = []
|
||||
if toolkit:
|
||||
tools_list.append(toolkit)
|
||||
tools_list: list = [create_full_toolkit()]
|
||||
|
||||
# Add MCP tool servers (lazy-connected on first arun()).
|
||||
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
|
||||
# scopes that conflict with asyncio background task cancellation (#72).
|
||||
if use_tools and not skip_mcp:
|
||||
if not skip_mcp:
|
||||
try:
|
||||
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
|
||||
|
||||
@@ -286,34 +213,46 @@ def create_timmy(
|
||||
except Exception as exc:
|
||||
logger.debug("MCP tools unavailable: %s", exc)
|
||||
|
||||
# Select prompt tier based on tool capability
|
||||
return tools_list
|
||||
|
||||
|
||||
def _build_prompt(use_tools: bool, session_id: str) -> str:
|
||||
"""Build the full system prompt with optional memory context."""
|
||||
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
|
||||
|
||||
# Try to load memory context
|
||||
try:
|
||||
from timmy.memory_system import memory_system
|
||||
|
||||
memory_context = memory_system.get_system_context()
|
||||
if memory_context:
|
||||
# Truncate if too long — smaller budget for small models
|
||||
# since the expanded prompt (roster, guardrails) uses more tokens
|
||||
# Smaller budget for small models — expanded prompt uses more tokens
|
||||
max_context = 2000 if not use_tools else 8000
|
||||
if len(memory_context) > max_context:
|
||||
memory_context = memory_context[:max_context] + "\n... [truncated]"
|
||||
full_prompt = (
|
||||
return (
|
||||
f"{base_prompt}\n\n"
|
||||
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
|
||||
f"{memory_context}"
|
||||
)
|
||||
else:
|
||||
full_prompt = base_prompt
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to load memory context: %s", exc)
|
||||
full_prompt = base_prompt
|
||||
|
||||
return base_prompt
|
||||
|
||||
|
||||
def _create_ollama_agent(
|
||||
*,
|
||||
db_file: str,
|
||||
model_name: str,
|
||||
tools_list: list,
|
||||
full_prompt: str,
|
||||
use_tools: bool,
|
||||
) -> Agent:
|
||||
"""Construct the Agno Agent with Ollama backend and warm up the model."""
|
||||
model_kwargs = {}
|
||||
if settings.ollama_num_ctx > 0:
|
||||
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
|
||||
|
||||
agent = Agent(
|
||||
name="Agent",
|
||||
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
|
||||
@@ -330,6 +269,67 @@ def create_timmy(
|
||||
return agent
|
||||
|
||||
|
||||
def create_timmy(
|
||||
db_file: str = "timmy.db",
|
||||
backend: str | None = None,
|
||||
*,
|
||||
skip_mcp: bool = False,
|
||||
session_id: str = "unknown",
|
||||
) -> TimmyAgent:
|
||||
"""Instantiate the agent — Ollama, Grok, or Claude.
|
||||
|
||||
Args:
|
||||
db_file: SQLite file for Agno conversation memory (Ollama path only).
|
||||
backend: "ollama" | "grok" | "claude" | None (reads config/env).
|
||||
skip_mcp: If True, omit MCP tool servers (Gitea, filesystem).
|
||||
Use for background tasks (thinking, QA) where MCP's
|
||||
stdio cancel-scope lifecycle conflicts with asyncio
|
||||
task cancellation.
|
||||
|
||||
Returns an Agno Agent or backend-specific agent — all expose
|
||||
print_response(message, stream).
|
||||
"""
|
||||
resolved = _resolve_backend(backend)
|
||||
|
||||
if resolved == "claude":
|
||||
from timmy.backends import ClaudeBackend
|
||||
|
||||
return ClaudeBackend()
|
||||
|
||||
if resolved == "grok":
|
||||
from timmy.backends import GrokBackend
|
||||
|
||||
return GrokBackend()
|
||||
|
||||
# Default: Ollama via Agno.
|
||||
model_name, is_fallback = _resolve_model_with_fallback(
|
||||
requested_model=None,
|
||||
require_vision=False,
|
||||
auto_pull=True,
|
||||
)
|
||||
|
||||
if not _check_model_available(model_name):
|
||||
logger.error(
|
||||
"Ollama unreachable and no local models available. "
|
||||
"Start Ollama with 'ollama serve' or use --backend claude explicitly."
|
||||
)
|
||||
|
||||
if is_fallback:
|
||||
logger.info("Using fallback model %s (requested was unavailable)", model_name)
|
||||
|
||||
use_tools = _model_supports_tools(model_name)
|
||||
tools_list = _build_tools_list(use_tools, skip_mcp, model_name)
|
||||
full_prompt = _build_prompt(use_tools, session_id)
|
||||
|
||||
return _create_ollama_agent(
|
||||
db_file=db_file,
|
||||
model_name=model_name,
|
||||
tools_list=tools_list,
|
||||
full_prompt=full_prompt,
|
||||
use_tools=use_tools,
|
||||
)
|
||||
|
||||
|
||||
class TimmyWithMemory:
|
||||
"""Agent wrapper with explicit three-tier memory management."""
|
||||
|
||||
|
||||
@@ -95,6 +95,126 @@ def _parse_steps(plan_text: str) -> list[str]:
|
||||
return [line.strip() for line in plan_text.strip().splitlines() if line.strip()]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Extracted helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _extract_content(run_result) -> str:
|
||||
"""Extract text content from an agent run result."""
|
||||
return run_result.content if hasattr(run_result, "content") else str(run_result)
|
||||
|
||||
|
||||
def _clean(text: str) -> str:
|
||||
"""Clean a model response using session's response cleaner."""
|
||||
from timmy.session import _clean_response
|
||||
|
||||
return _clean_response(text)
|
||||
|
||||
|
||||
async def _plan_task(
|
||||
agent, task: str, session_id: str, max_steps: int
|
||||
) -> tuple[list[str], bool] | str:
|
||||
"""Run the planning phase — returns (steps, was_truncated) or error string."""
|
||||
plan_prompt = (
|
||||
f"Break this task into numbered steps (max {max_steps}). "
|
||||
f"Return ONLY a numbered list, nothing else.\n\n"
|
||||
f"Task: {task}"
|
||||
)
|
||||
try:
|
||||
plan_run = await asyncio.to_thread(
|
||||
agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
|
||||
)
|
||||
plan_text = _extract_content(plan_run)
|
||||
except Exception as exc: # broad catch intentional: agent.run can raise any error
|
||||
logger.error("Agentic loop: planning failed: %s", exc)
|
||||
return f"Planning failed: {exc}"
|
||||
|
||||
steps = _parse_steps(plan_text)
|
||||
if not steps:
|
||||
return "Planning produced no steps."
|
||||
|
||||
planned_count = len(steps)
|
||||
steps = steps[:max_steps]
|
||||
return steps, planned_count > len(steps)
|
||||
|
||||
|
||||
async def _execute_step(
|
||||
agent,
|
||||
task: str,
|
||||
step_desc: str,
|
||||
step_num: int,
|
||||
total_steps: int,
|
||||
recent_results: list[str],
|
||||
session_id: str,
|
||||
) -> AgenticStep:
|
||||
"""Execute a single step, returning an AgenticStep."""
|
||||
step_start = time.monotonic()
|
||||
context = (
|
||||
f"Task: {task}\n"
|
||||
f"Step {step_num}/{total_steps}: {step_desc}\n"
|
||||
f"Recent progress: {recent_results[-2:] if recent_results else []}\n\n"
|
||||
f"Execute this step and report what you did."
|
||||
)
|
||||
step_run = await asyncio.to_thread(
|
||||
agent.run, context, stream=False, session_id=f"{session_id}_step{step_num}"
|
||||
)
|
||||
step_result = _clean(_extract_content(step_run))
|
||||
return AgenticStep(
|
||||
step_num=step_num,
|
||||
description=step_desc,
|
||||
result=step_result,
|
||||
status="completed",
|
||||
duration_ms=int((time.monotonic() - step_start) * 1000),
|
||||
)
|
||||
|
||||
|
||||
async def _adapt_step(
|
||||
agent,
|
||||
step_desc: str,
|
||||
step_num: int,
|
||||
error: Exception,
|
||||
step_start: float,
|
||||
session_id: str,
|
||||
) -> AgenticStep:
|
||||
"""Attempt adaptation after a step failure."""
|
||||
adapt_prompt = (
|
||||
f"Step {step_num} failed with error: {error}\n"
|
||||
f"Original step was: {step_desc}\n"
|
||||
f"Adapt the plan and try an alternative approach for this step."
|
||||
)
|
||||
adapt_run = await asyncio.to_thread(
|
||||
agent.run, adapt_prompt, stream=False, session_id=f"{session_id}_adapt{step_num}"
|
||||
)
|
||||
adapt_result = _clean(_extract_content(adapt_run))
|
||||
return AgenticStep(
|
||||
step_num=step_num,
|
||||
description=f"[Adapted] {step_desc}",
|
||||
result=adapt_result,
|
||||
status="adapted",
|
||||
duration_ms=int((time.monotonic() - step_start) * 1000),
|
||||
)
|
||||
|
||||
|
||||
def _summarize(result: AgenticResult, total_steps: int, was_truncated: bool) -> None:
|
||||
"""Fill in summary and final status on the result object (mutates in place)."""
|
||||
completed = sum(1 for s in result.steps if s.status == "completed")
|
||||
adapted = sum(1 for s in result.steps if s.status == "adapted")
|
||||
failed = sum(1 for s in result.steps if s.status == "failed")
|
||||
|
||||
parts = [f"Completed {completed}/{total_steps} steps"]
|
||||
if adapted:
|
||||
parts.append(f"{adapted} adapted")
|
||||
if failed:
|
||||
parts.append(f"{failed} failed")
|
||||
result.summary = f"{result.task}: {', '.join(parts)}."
|
||||
|
||||
if was_truncated or len(result.steps) < total_steps or failed:
|
||||
result.status = "partial"
|
||||
else:
|
||||
result.status = "completed"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core loop
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -125,88 +245,41 @@ async def run_agentic_loop(
|
||||
|
||||
task_id = str(uuid.uuid4())[:8]
|
||||
start_time = time.monotonic()
|
||||
|
||||
agent = _get_loop_agent()
|
||||
result = AgenticResult(task_id=task_id, task=task, summary="")
|
||||
|
||||
# ── Phase 1: Planning ──────────────────────────────────────────────────
|
||||
plan_prompt = (
|
||||
f"Break this task into numbered steps (max {max_steps}). "
|
||||
f"Return ONLY a numbered list, nothing else.\n\n"
|
||||
f"Task: {task}"
|
||||
)
|
||||
try:
|
||||
plan_run = await asyncio.to_thread(
|
||||
agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
|
||||
)
|
||||
plan_text = plan_run.content if hasattr(plan_run, "content") else str(plan_run)
|
||||
except Exception as exc: # broad catch intentional: agent.run can raise any error
|
||||
logger.error("Agentic loop: planning failed: %s", exc)
|
||||
# Phase 1: Planning
|
||||
plan = await _plan_task(agent, task, session_id, max_steps)
|
||||
if isinstance(plan, str):
|
||||
result.status = "failed"
|
||||
result.summary = f"Planning failed: {exc}"
|
||||
result.summary = plan
|
||||
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
|
||||
return result
|
||||
|
||||
steps = _parse_steps(plan_text)
|
||||
if not steps:
|
||||
result.status = "failed"
|
||||
result.summary = "Planning produced no steps."
|
||||
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
|
||||
return result
|
||||
|
||||
# Enforce max_steps — track if we truncated
|
||||
planned_steps = len(steps)
|
||||
steps = steps[:max_steps]
|
||||
steps, was_truncated = plan
|
||||
total_steps = len(steps)
|
||||
was_truncated = planned_steps > total_steps
|
||||
|
||||
# Broadcast plan
|
||||
await _broadcast_progress(
|
||||
"agentic.plan_ready",
|
||||
{
|
||||
"task_id": task_id,
|
||||
"task": task,
|
||||
"steps": steps,
|
||||
"total": total_steps,
|
||||
},
|
||||
{"task_id": task_id, "task": task, "steps": steps, "total": total_steps},
|
||||
)
|
||||
|
||||
# ── Phase 2: Execution ─────────────────────────────────────────────────
|
||||
# Phase 2: Execution
|
||||
completed_results: list[str] = []
|
||||
|
||||
for i, step_desc in enumerate(steps, 1):
|
||||
step_start = time.monotonic()
|
||||
|
||||
recent = completed_results[-2:] if completed_results else []
|
||||
context = (
|
||||
f"Task: {task}\n"
|
||||
f"Step {i}/{total_steps}: {step_desc}\n"
|
||||
f"Recent progress: {recent}\n\n"
|
||||
f"Execute this step and report what you did."
|
||||
)
|
||||
|
||||
try:
|
||||
step_run = await asyncio.to_thread(
|
||||
agent.run, context, stream=False, session_id=f"{session_id}_step{i}"
|
||||
)
|
||||
step_result = step_run.content if hasattr(step_run, "content") else str(step_run)
|
||||
|
||||
# Clean the response
|
||||
from timmy.session import _clean_response
|
||||
|
||||
step_result = _clean_response(step_result)
|
||||
|
||||
step = AgenticStep(
|
||||
step_num=i,
|
||||
description=step_desc,
|
||||
result=step_result,
|
||||
status="completed",
|
||||
duration_ms=int((time.monotonic() - step_start) * 1000),
|
||||
step = await _execute_step(
|
||||
agent,
|
||||
task,
|
||||
step_desc,
|
||||
i,
|
||||
total_steps,
|
||||
completed_results,
|
||||
session_id,
|
||||
)
|
||||
result.steps.append(step)
|
||||
completed_results.append(f"Step {i}: {step_result[:200]}")
|
||||
|
||||
# Broadcast progress
|
||||
completed_results.append(f"Step {i}: {step.result[:200]}")
|
||||
await _broadcast_progress(
|
||||
"agentic.step_complete",
|
||||
{
|
||||
@@ -214,46 +287,18 @@ async def run_agentic_loop(
|
||||
"step": i,
|
||||
"total": total_steps,
|
||||
"description": step_desc,
|
||||
"result": step_result[:200],
|
||||
"result": step.result[:200],
|
||||
},
|
||||
)
|
||||
|
||||
if on_progress:
|
||||
await on_progress(step_desc, i, total_steps)
|
||||
|
||||
except Exception as exc: # broad catch intentional: agent.run can raise any error
|
||||
logger.warning("Agentic loop step %d failed: %s", i, exc)
|
||||
|
||||
# ── Adaptation: ask model to adapt ─────────────────────────────
|
||||
adapt_prompt = (
|
||||
f"Step {i} failed with error: {exc}\n"
|
||||
f"Original step was: {step_desc}\n"
|
||||
f"Adapt the plan and try an alternative approach for this step."
|
||||
)
|
||||
try:
|
||||
adapt_run = await asyncio.to_thread(
|
||||
agent.run,
|
||||
adapt_prompt,
|
||||
stream=False,
|
||||
session_id=f"{session_id}_adapt{i}",
|
||||
)
|
||||
adapt_result = (
|
||||
adapt_run.content if hasattr(adapt_run, "content") else str(adapt_run)
|
||||
)
|
||||
from timmy.session import _clean_response
|
||||
|
||||
adapt_result = _clean_response(adapt_result)
|
||||
|
||||
step = AgenticStep(
|
||||
step_num=i,
|
||||
description=f"[Adapted] {step_desc}",
|
||||
result=adapt_result,
|
||||
status="adapted",
|
||||
duration_ms=int((time.monotonic() - step_start) * 1000),
|
||||
)
|
||||
step = await _adapt_step(agent, step_desc, i, exc, step_start, session_id)
|
||||
result.steps.append(step)
|
||||
completed_results.append(f"Step {i} (adapted): {adapt_result[:200]}")
|
||||
|
||||
completed_results.append(f"Step {i} (adapted): {step.result[:200]}")
|
||||
await _broadcast_progress(
|
||||
"agentic.step_adapted",
|
||||
{
|
||||
@@ -262,46 +307,26 @@ async def run_agentic_loop(
|
||||
"total": total_steps,
|
||||
"description": step_desc,
|
||||
"error": str(exc),
|
||||
"adaptation": adapt_result[:200],
|
||||
"adaptation": step.result[:200],
|
||||
},
|
||||
)
|
||||
|
||||
if on_progress:
|
||||
await on_progress(f"[Adapted] {step_desc}", i, total_steps)
|
||||
|
||||
except Exception as adapt_exc: # broad catch intentional: agent.run can raise any error
|
||||
except Exception as adapt_exc: # broad catch intentional
|
||||
logger.error("Agentic loop adaptation also failed: %s", adapt_exc)
|
||||
step = AgenticStep(
|
||||
step_num=i,
|
||||
description=step_desc,
|
||||
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
|
||||
status="failed",
|
||||
duration_ms=int((time.monotonic() - step_start) * 1000),
|
||||
result.steps.append(
|
||||
AgenticStep(
|
||||
step_num=i,
|
||||
description=step_desc,
|
||||
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
|
||||
status="failed",
|
||||
duration_ms=int((time.monotonic() - step_start) * 1000),
|
||||
)
|
||||
)
|
||||
result.steps.append(step)
|
||||
completed_results.append(f"Step {i}: FAILED")
|
||||
|
||||
# ── Phase 3: Summary ───────────────────────────────────────────────────
|
||||
completed_count = sum(1 for s in result.steps if s.status == "completed")
|
||||
adapted_count = sum(1 for s in result.steps if s.status == "adapted")
|
||||
failed_count = sum(1 for s in result.steps if s.status == "failed")
|
||||
parts = [f"Completed {completed_count}/{total_steps} steps"]
|
||||
if adapted_count:
|
||||
parts.append(f"{adapted_count} adapted")
|
||||
if failed_count:
|
||||
parts.append(f"{failed_count} failed")
|
||||
result.summary = f"{task}: {', '.join(parts)}."
|
||||
|
||||
# Determine final status
|
||||
if was_truncated:
|
||||
result.status = "partial"
|
||||
elif len(result.steps) < total_steps:
|
||||
result.status = "partial"
|
||||
elif any(s.status == "failed" for s in result.steps):
|
||||
result.status = "partial"
|
||||
else:
|
||||
result.status = "completed"
|
||||
|
||||
# Phase 3: Summary
|
||||
_summarize(result, total_steps, was_truncated)
|
||||
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
|
||||
|
||||
await _broadcast_progress(
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
"""LLM backends — AirLLM (local big models), Grok (xAI), and Claude (Anthropic).
|
||||
"""LLM backends — Grok (xAI) and Claude (Anthropic).
|
||||
|
||||
Provides drop-in replacements for the Agno Agent that expose the same
|
||||
run(message, stream) → RunResult interface used by the dashboard and the
|
||||
print_response(message, stream) interface used by the CLI.
|
||||
|
||||
Backends:
|
||||
- TimmyAirLLMAgent: Local 8B/70B/405B via AirLLM (Apple Silicon or PyTorch)
|
||||
- GrokBackend: xAI Grok API via OpenAI-compatible SDK (opt-in premium)
|
||||
- ClaudeBackend: Anthropic Claude API — lightweight cloud fallback
|
||||
|
||||
@@ -16,21 +15,11 @@ import logging
|
||||
import platform
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal
|
||||
|
||||
from timmy.prompts import get_system_prompt
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# HuggingFace model IDs for each supported size.
|
||||
_AIRLLM_MODELS: dict[str, str] = {
|
||||
"8b": "meta-llama/Meta-Llama-3.1-8B-Instruct",
|
||||
"70b": "meta-llama/Meta-Llama-3.1-70B-Instruct",
|
||||
"405b": "meta-llama/Meta-Llama-3.1-405B-Instruct",
|
||||
}
|
||||
|
||||
ModelSize = Literal["8b", "70b", "405b"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class RunResult:
|
||||
@@ -45,108 +34,6 @@ def is_apple_silicon() -> bool:
|
||||
return platform.system() == "Darwin" and platform.machine() == "arm64"
|
||||
|
||||
|
||||
def airllm_available() -> bool:
|
||||
"""Return True when the airllm package is importable."""
|
||||
try:
|
||||
import airllm # noqa: F401
|
||||
|
||||
return True
|
||||
except ImportError:
|
||||
return False
|
||||
|
||||
|
||||
class TimmyAirLLMAgent:
|
||||
"""Thin AirLLM wrapper compatible with both dashboard and CLI call sites.
|
||||
|
||||
Exposes:
|
||||
run(message, stream) → RunResult(content=...) [dashboard]
|
||||
print_response(message, stream) → None [CLI]
|
||||
|
||||
Maintains a rolling 10-turn in-memory history so Timmy remembers the
|
||||
conversation within a session — no SQLite needed at this layer.
|
||||
"""
|
||||
|
||||
def __init__(self, model_size: str = "70b") -> None:
|
||||
model_id = _AIRLLM_MODELS.get(model_size)
|
||||
if model_id is None:
|
||||
raise ValueError(
|
||||
f"Unknown model size {model_size!r}. Choose from: {list(_AIRLLM_MODELS)}"
|
||||
)
|
||||
|
||||
if is_apple_silicon():
|
||||
from airllm import AirLLMMLX # type: ignore[import]
|
||||
|
||||
self._model = AirLLMMLX(model_id)
|
||||
else:
|
||||
from airllm import AutoModel # type: ignore[import]
|
||||
|
||||
self._model = AutoModel.from_pretrained(model_id)
|
||||
|
||||
self._history: list[str] = []
|
||||
self._model_size = model_size
|
||||
|
||||
# ── public interface (mirrors Agno Agent) ────────────────────────────────
|
||||
|
||||
def run(self, message: str, *, stream: bool = False) -> RunResult:
|
||||
"""Run inference and return a structured result (matches Agno Agent.run()).
|
||||
|
||||
`stream` is accepted for API compatibility; AirLLM always generates
|
||||
the full output in one pass.
|
||||
"""
|
||||
prompt = self._build_prompt(message)
|
||||
|
||||
input_tokens = self._model.tokenizer(
|
||||
[prompt],
|
||||
return_tensors="pt",
|
||||
padding=True,
|
||||
truncation=True,
|
||||
max_length=2048,
|
||||
)
|
||||
output = self._model.generate(
|
||||
**input_tokens,
|
||||
max_new_tokens=512,
|
||||
use_cache=True,
|
||||
do_sample=True,
|
||||
temperature=0.7,
|
||||
)
|
||||
|
||||
# Decode only the newly generated tokens, not the prompt.
|
||||
input_len = input_tokens["input_ids"].shape[1]
|
||||
response = self._model.tokenizer.decode(
|
||||
output[0][input_len:], skip_special_tokens=True
|
||||
).strip()
|
||||
|
||||
self._history.append(f"User: {message}")
|
||||
self._history.append(f"Timmy: {response}")
|
||||
|
||||
return RunResult(content=response)
|
||||
|
||||
def print_response(self, message: str, *, stream: bool = True) -> None:
|
||||
"""Run inference and render the response to stdout (CLI interface)."""
|
||||
result = self.run(message, stream=stream)
|
||||
self._render(result.content)
|
||||
|
||||
# ── private helpers ──────────────────────────────────────────────────────
|
||||
|
||||
def _build_prompt(self, message: str) -> str:
|
||||
context = get_system_prompt(tools_enabled=False, session_id="airllm") + "\n\n"
|
||||
# Include the last 10 turns (5 exchanges) for continuity.
|
||||
if self._history:
|
||||
context += "\n".join(self._history[-10:]) + "\n\n"
|
||||
return context + f"User: {message}\nTimmy:"
|
||||
|
||||
@staticmethod
|
||||
def _render(text: str) -> None:
|
||||
"""Print response with rich markdown when available, plain text otherwise."""
|
||||
try:
|
||||
from rich.console import Console
|
||||
from rich.markdown import Markdown
|
||||
|
||||
Console().print(Markdown(text))
|
||||
except ImportError:
|
||||
print(text)
|
||||
|
||||
|
||||
# ── Grok (xAI) Backend ─────────────────────────────────────────────────────
|
||||
# Premium cloud augmentation — opt-in only, never the default path.
|
||||
|
||||
@@ -187,7 +74,7 @@ class GrokBackend:
|
||||
Uses the OpenAI-compatible SDK to connect to xAI's API.
|
||||
Only activated when GROK_ENABLED=true and XAI_API_KEY is set.
|
||||
|
||||
Exposes the same interface as TimmyAirLLMAgent and Agno Agent:
|
||||
Exposes the same interface as Agno Agent:
|
||||
run(message, stream) → RunResult [dashboard]
|
||||
print_response(message, stream) → None [CLI]
|
||||
health_check() → dict [monitoring]
|
||||
@@ -437,8 +324,7 @@ CLAUDE_MODELS: dict[str, str] = {
|
||||
class ClaudeBackend:
|
||||
"""Anthropic Claude backend — cloud fallback when local models are offline.
|
||||
|
||||
Uses the official Anthropic SDK. Same interface as GrokBackend and
|
||||
TimmyAirLLMAgent:
|
||||
Uses the official Anthropic SDK. Same interface as GrokBackend:
|
||||
run(message, stream) → RunResult [dashboard]
|
||||
print_response(message, stream) → None [CLI]
|
||||
health_check() → dict [monitoring]
|
||||
|
||||
@@ -22,13 +22,13 @@ _BACKEND_OPTION = typer.Option(
|
||||
None,
|
||||
"--backend",
|
||||
"-b",
|
||||
help="Inference backend: 'ollama' (default) | 'airllm' | 'auto'",
|
||||
help="Inference backend: 'ollama' (default) | 'grok' | 'claude'",
|
||||
)
|
||||
_MODEL_SIZE_OPTION = typer.Option(
|
||||
None,
|
||||
"--model-size",
|
||||
"-s",
|
||||
help="AirLLM model size when --backend airllm: '8b' | '70b' | '405b'",
|
||||
help="Model size (reserved for future use).",
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -98,6 +98,73 @@ def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
|
||||
return {row[1] for row in cursor.fetchall()}
|
||||
|
||||
|
||||
def _migrate_episodes(conn: sqlite3.Connection) -> None:
|
||||
"""Migrate episodes table rows into the unified memories table."""
|
||||
logger.info("Migration: Converting episodes table to memories")
|
||||
try:
|
||||
cols = _get_table_columns(conn, "episodes")
|
||||
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
|
||||
|
||||
conn.execute(f"""
|
||||
INSERT INTO memories (
|
||||
id, content, memory_type, source, embedding,
|
||||
metadata, agent_id, task_id, session_id,
|
||||
created_at, access_count, last_accessed
|
||||
)
|
||||
SELECT
|
||||
id, content,
|
||||
COALESCE({context_type_col}, 'conversation'),
|
||||
COALESCE(source, 'agent'),
|
||||
embedding,
|
||||
metadata, agent_id, task_id, session_id,
|
||||
COALESCE(timestamp, datetime('now')), 0, NULL
|
||||
FROM episodes
|
||||
""")
|
||||
conn.execute("DROP TABLE episodes")
|
||||
logger.info("Migration: Migrated episodes to memories")
|
||||
except sqlite3.Error as exc:
|
||||
logger.warning("Migration: Failed to migrate episodes: %s", exc)
|
||||
|
||||
|
||||
def _migrate_chunks(conn: sqlite3.Connection) -> None:
|
||||
"""Migrate chunks table rows into the unified memories table."""
|
||||
logger.info("Migration: Converting chunks table to memories")
|
||||
try:
|
||||
cols = _get_table_columns(conn, "chunks")
|
||||
|
||||
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
|
||||
content_col = "content" if "content" in cols else "text"
|
||||
source_col = (
|
||||
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
|
||||
)
|
||||
embedding_col = "embedding" if "embedding" in cols else "NULL"
|
||||
created_col = "created_at" if "created_at" in cols else "datetime('now')"
|
||||
|
||||
conn.execute(f"""
|
||||
INSERT INTO memories (
|
||||
id, content, memory_type, source, embedding,
|
||||
created_at, access_count
|
||||
)
|
||||
SELECT
|
||||
{id_col}, {content_col}, 'vault_chunk', {source_col},
|
||||
{embedding_col}, {created_col}, 0
|
||||
FROM chunks
|
||||
""")
|
||||
conn.execute("DROP TABLE chunks")
|
||||
logger.info("Migration: Migrated chunks to memories")
|
||||
except sqlite3.Error as exc:
|
||||
logger.warning("Migration: Failed to migrate chunks: %s", exc)
|
||||
|
||||
|
||||
def _drop_legacy_table(conn: sqlite3.Connection, table: str) -> None:
|
||||
"""Drop a legacy table if it exists."""
|
||||
try:
|
||||
conn.execute(f"DROP TABLE {table}") # noqa: S608
|
||||
logger.info("Migration: Dropped old %s table", table)
|
||||
except sqlite3.Error as exc:
|
||||
logger.warning("Migration: Failed to drop %s: %s", table, exc)
|
||||
|
||||
|
||||
def _migrate_schema(conn: sqlite3.Connection) -> None:
|
||||
"""Migrate from old three-table schema to unified memories table.
|
||||
|
||||
@@ -110,78 +177,16 @@ def _migrate_schema(conn: sqlite3.Connection) -> None:
|
||||
tables = {row[0] for row in cursor.fetchall()}
|
||||
|
||||
has_memories = "memories" in tables
|
||||
has_episodes = "episodes" in tables
|
||||
has_chunks = "chunks" in tables
|
||||
has_facts = "facts" in tables
|
||||
|
||||
# Check if we need to migrate (old schema exists)
|
||||
if not has_memories and (has_episodes or has_chunks or has_facts):
|
||||
if not has_memories and (tables & {"episodes", "chunks", "facts"}):
|
||||
logger.info("Migration: Creating unified memories table")
|
||||
# Schema will be created by _ensure_schema above
|
||||
|
||||
# Migrate episodes -> memories
|
||||
if has_episodes and has_memories:
|
||||
logger.info("Migration: Converting episodes table to memories")
|
||||
try:
|
||||
cols = _get_table_columns(conn, "episodes")
|
||||
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
|
||||
|
||||
conn.execute(f"""
|
||||
INSERT INTO memories (
|
||||
id, content, memory_type, source, embedding,
|
||||
metadata, agent_id, task_id, session_id,
|
||||
created_at, access_count, last_accessed
|
||||
)
|
||||
SELECT
|
||||
id, content,
|
||||
COALESCE({context_type_col}, 'conversation'),
|
||||
COALESCE(source, 'agent'),
|
||||
embedding,
|
||||
metadata, agent_id, task_id, session_id,
|
||||
COALESCE(timestamp, datetime('now')), 0, NULL
|
||||
FROM episodes
|
||||
""")
|
||||
conn.execute("DROP TABLE episodes")
|
||||
logger.info("Migration: Migrated episodes to memories")
|
||||
except sqlite3.Error as exc:
|
||||
logger.warning("Migration: Failed to migrate episodes: %s", exc)
|
||||
|
||||
# Migrate chunks -> memories as vault_chunk
|
||||
if has_chunks and has_memories:
|
||||
logger.info("Migration: Converting chunks table to memories")
|
||||
try:
|
||||
cols = _get_table_columns(conn, "chunks")
|
||||
|
||||
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
|
||||
content_col = "content" if "content" in cols else "text"
|
||||
source_col = (
|
||||
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
|
||||
)
|
||||
embedding_col = "embedding" if "embedding" in cols else "NULL"
|
||||
created_col = "created_at" if "created_at" in cols else "datetime('now')"
|
||||
|
||||
conn.execute(f"""
|
||||
INSERT INTO memories (
|
||||
id, content, memory_type, source, embedding,
|
||||
created_at, access_count
|
||||
)
|
||||
SELECT
|
||||
{id_col}, {content_col}, 'vault_chunk', {source_col},
|
||||
{embedding_col}, {created_col}, 0
|
||||
FROM chunks
|
||||
""")
|
||||
conn.execute("DROP TABLE chunks")
|
||||
logger.info("Migration: Migrated chunks to memories")
|
||||
except sqlite3.Error as exc:
|
||||
logger.warning("Migration: Failed to migrate chunks: %s", exc)
|
||||
|
||||
# Drop old tables
|
||||
if has_facts:
|
||||
try:
|
||||
conn.execute("DROP TABLE facts")
|
||||
logger.info("Migration: Dropped old facts table")
|
||||
except sqlite3.Error as exc:
|
||||
logger.warning("Migration: Failed to drop facts: %s", exc)
|
||||
if "episodes" in tables and has_memories:
|
||||
_migrate_episodes(conn)
|
||||
if "chunks" in tables and has_memories:
|
||||
_migrate_chunks(conn)
|
||||
if "facts" in tables:
|
||||
_drop_legacy_table(conn, "facts")
|
||||
|
||||
conn.commit()
|
||||
|
||||
@@ -298,6 +303,86 @@ def store_memory(
|
||||
return entry
|
||||
|
||||
|
||||
def _build_memory_filter(
|
||||
context_type: str | None,
|
||||
agent_id: str | None,
|
||||
session_id: str | None,
|
||||
) -> tuple[str, list]:
|
||||
"""Build WHERE clause and params for memory queries."""
|
||||
conditions: list[str] = []
|
||||
params: list = []
|
||||
|
||||
if context_type:
|
||||
conditions.append("memory_type = ?")
|
||||
params.append(context_type)
|
||||
if agent_id:
|
||||
conditions.append("agent_id = ?")
|
||||
params.append(agent_id)
|
||||
if session_id:
|
||||
conditions.append("session_id = ?")
|
||||
params.append(session_id)
|
||||
|
||||
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
|
||||
return where_clause, params
|
||||
|
||||
|
||||
def _fetch_memory_candidates(
|
||||
where_clause: str, params: list, candidate_limit: int
|
||||
) -> list[sqlite3.Row]:
|
||||
"""Fetch candidate memory rows from the database."""
|
||||
query_sql = f"""
|
||||
SELECT * FROM memories
|
||||
{where_clause}
|
||||
ORDER BY created_at DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
params.append(candidate_limit)
|
||||
|
||||
with get_connection() as conn:
|
||||
return conn.execute(query_sql, params).fetchall()
|
||||
|
||||
|
||||
def _row_to_entry(row: sqlite3.Row) -> MemoryEntry:
|
||||
"""Convert a database row to a MemoryEntry."""
|
||||
return MemoryEntry(
|
||||
id=row["id"],
|
||||
content=row["content"],
|
||||
source=row["source"],
|
||||
context_type=row["memory_type"], # DB column -> API field
|
||||
agent_id=row["agent_id"],
|
||||
task_id=row["task_id"],
|
||||
session_id=row["session_id"],
|
||||
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
|
||||
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
|
||||
timestamp=row["created_at"],
|
||||
)
|
||||
|
||||
|
||||
def _score_and_rank(
|
||||
rows: list[sqlite3.Row],
|
||||
query: str,
|
||||
query_embedding: list[float],
|
||||
min_relevance: float,
|
||||
limit: int,
|
||||
) -> list[MemoryEntry]:
|
||||
"""Score candidates by similarity and return top results."""
|
||||
results = []
|
||||
for row in rows:
|
||||
entry = _row_to_entry(row)
|
||||
|
||||
if entry.embedding:
|
||||
score = cosine_similarity(query_embedding, entry.embedding)
|
||||
else:
|
||||
score = _keyword_overlap(query, entry.content)
|
||||
|
||||
entry.relevance_score = score
|
||||
if score >= min_relevance:
|
||||
results.append(entry)
|
||||
|
||||
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
|
||||
return results[:limit]
|
||||
|
||||
|
||||
def search_memories(
|
||||
query: str,
|
||||
limit: int = 10,
|
||||
@@ -320,66 +405,9 @@ def search_memories(
|
||||
List of MemoryEntry objects sorted by relevance
|
||||
"""
|
||||
query_embedding = embed_text(query)
|
||||
|
||||
# Build query with filters
|
||||
conditions = []
|
||||
params = []
|
||||
|
||||
if context_type:
|
||||
conditions.append("memory_type = ?")
|
||||
params.append(context_type)
|
||||
if agent_id:
|
||||
conditions.append("agent_id = ?")
|
||||
params.append(agent_id)
|
||||
if session_id:
|
||||
conditions.append("session_id = ?")
|
||||
params.append(session_id)
|
||||
|
||||
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
|
||||
|
||||
# Fetch candidates (we'll do in-memory similarity for now)
|
||||
query_sql = f"""
|
||||
SELECT * FROM memories
|
||||
{where_clause}
|
||||
ORDER BY created_at DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
params.append(limit * 3) # Get more candidates for ranking
|
||||
|
||||
with get_connection() as conn:
|
||||
rows = conn.execute(query_sql, params).fetchall()
|
||||
|
||||
# Compute similarity scores
|
||||
results = []
|
||||
for row in rows:
|
||||
entry = MemoryEntry(
|
||||
id=row["id"],
|
||||
content=row["content"],
|
||||
source=row["source"],
|
||||
context_type=row["memory_type"], # DB column -> API field
|
||||
agent_id=row["agent_id"],
|
||||
task_id=row["task_id"],
|
||||
session_id=row["session_id"],
|
||||
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
|
||||
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
|
||||
timestamp=row["created_at"],
|
||||
)
|
||||
|
||||
if entry.embedding:
|
||||
score = cosine_similarity(query_embedding, entry.embedding)
|
||||
entry.relevance_score = score
|
||||
if score >= min_relevance:
|
||||
results.append(entry)
|
||||
else:
|
||||
# Fallback: check for keyword overlap
|
||||
score = _keyword_overlap(query, entry.content)
|
||||
entry.relevance_score = score
|
||||
if score >= min_relevance:
|
||||
results.append(entry)
|
||||
|
||||
# Sort by relevance and return top results
|
||||
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
|
||||
return results[:limit]
|
||||
where_clause, params = _build_memory_filter(context_type, agent_id, session_id)
|
||||
rows = _fetch_memory_candidates(where_clause, params, limit * 3)
|
||||
return _score_and_rank(rows, query, query_embedding, min_relevance, limit)
|
||||
|
||||
|
||||
def delete_memory(memory_id: str) -> bool:
|
||||
@@ -636,7 +664,7 @@ class HotMemory:
|
||||
if len(lines) > 1:
|
||||
return "\n".join(lines)
|
||||
except Exception:
|
||||
pass
|
||||
logger.debug("DB context read failed, falling back to file")
|
||||
|
||||
# Fallback to file if DB unavailable
|
||||
if self.path.exists():
|
||||
|
||||
@@ -323,6 +323,75 @@ def session_history(query: str, role: str = "", limit: int = 10) -> str:
|
||||
_LOW_CONFIDENCE_THRESHOLD = 0.5
|
||||
|
||||
|
||||
def _categorize_entries(
|
||||
entries: list[dict],
|
||||
) -> tuple[list[dict], list[dict], list[dict], list[dict]]:
|
||||
"""Split session entries into messages, errors, timmy msgs, user msgs."""
|
||||
messages = [e for e in entries if e.get("type") == "message"]
|
||||
errors = [e for e in entries if e.get("type") == "error"]
|
||||
timmy_msgs = [e for e in messages if e.get("role") == "timmy"]
|
||||
user_msgs = [e for e in messages if e.get("role") == "user"]
|
||||
return messages, errors, timmy_msgs, user_msgs
|
||||
|
||||
|
||||
def _find_low_confidence(timmy_msgs: list[dict]) -> list[dict]:
|
||||
"""Return Timmy responses below the confidence threshold."""
|
||||
return [
|
||||
m
|
||||
for m in timmy_msgs
|
||||
if m.get("confidence") is not None and m["confidence"] < _LOW_CONFIDENCE_THRESHOLD
|
||||
]
|
||||
|
||||
|
||||
def _find_repeated_topics(user_msgs: list[dict], top_n: int = 5) -> list[tuple[str, int]]:
|
||||
"""Identify frequently mentioned words in user messages."""
|
||||
topic_counts: dict[str, int] = {}
|
||||
for m in user_msgs:
|
||||
for word in (m.get("content") or "").lower().split():
|
||||
cleaned = word.strip(".,!?\"'()[]")
|
||||
if len(cleaned) > 3:
|
||||
topic_counts[cleaned] = topic_counts.get(cleaned, 0) + 1
|
||||
return sorted(
|
||||
((w, c) for w, c in topic_counts.items() if c >= 3),
|
||||
key=lambda x: x[1],
|
||||
reverse=True,
|
||||
)[:top_n]
|
||||
|
||||
|
||||
def _format_reflection_section(
|
||||
title: str,
|
||||
items: list[dict],
|
||||
formatter: object,
|
||||
empty_msg: str,
|
||||
) -> list[str]:
|
||||
"""Format a titled section with items, or an empty-state message."""
|
||||
if items:
|
||||
lines = [f"### {title} ({len(items)})"]
|
||||
for item in items[:5]:
|
||||
lines.append(formatter(item)) # type: ignore[operator]
|
||||
lines.append("")
|
||||
return lines
|
||||
return [f"### {title}\n{empty_msg}\n"]
|
||||
|
||||
|
||||
def _build_insights(
|
||||
low_conf: list[dict],
|
||||
errors: list[dict],
|
||||
repeated: list[tuple[str, int]],
|
||||
) -> list[str]:
|
||||
"""Generate actionable insight bullets from analysis results."""
|
||||
insights: list[str] = []
|
||||
if low_conf:
|
||||
insights.append("Consider studying topics where confidence was low.")
|
||||
if errors:
|
||||
insights.append("Review error patterns for recurring infrastructure issues.")
|
||||
if repeated:
|
||||
insights.append(
|
||||
f'User frequently asks about "{repeated[0][0]}" — consider deepening knowledge here.'
|
||||
)
|
||||
return insights or ["Conversations look healthy. Keep up the good work."]
|
||||
|
||||
|
||||
def self_reflect(limit: int = 30) -> str:
|
||||
"""Review recent conversations and reflect on Timmy's own behavior.
|
||||
|
||||
@@ -343,35 +412,12 @@ def self_reflect(limit: int = 30) -> str:
|
||||
if not entries:
|
||||
return "No conversation history to reflect on yet."
|
||||
|
||||
# Categorize entries
|
||||
messages = [e for e in entries if e.get("type") == "message"]
|
||||
errors = [e for e in entries if e.get("type") == "error"]
|
||||
timmy_msgs = [e for e in messages if e.get("role") == "timmy"]
|
||||
user_msgs = [e for e in messages if e.get("role") == "user"]
|
||||
|
||||
# 1. Low-confidence responses
|
||||
low_conf = [
|
||||
m
|
||||
for m in timmy_msgs
|
||||
if m.get("confidence") is not None and m["confidence"] < _LOW_CONFIDENCE_THRESHOLD
|
||||
]
|
||||
|
||||
# 2. Identify repeated user topics (simple word frequency)
|
||||
topic_counts: dict[str, int] = {}
|
||||
for m in user_msgs:
|
||||
for word in (m.get("content") or "").lower().split():
|
||||
cleaned = word.strip(".,!?\"'()[]")
|
||||
if len(cleaned) > 3:
|
||||
topic_counts[cleaned] = topic_counts.get(cleaned, 0) + 1
|
||||
repeated = sorted(
|
||||
((w, c) for w, c in topic_counts.items() if c >= 3),
|
||||
key=lambda x: x[1],
|
||||
reverse=True,
|
||||
)[:5]
|
||||
_messages, errors, timmy_msgs, user_msgs = _categorize_entries(entries)
|
||||
low_conf = _find_low_confidence(timmy_msgs)
|
||||
repeated = _find_repeated_topics(user_msgs)
|
||||
|
||||
# Build reflection report
|
||||
sections: list[str] = ["## Self-Reflection Report\n"]
|
||||
|
||||
sections.append(
|
||||
f"Reviewed {len(entries)} recent entries: "
|
||||
f"{len(user_msgs)} user messages, "
|
||||
@@ -379,32 +425,27 @@ def self_reflect(limit: int = 30) -> str:
|
||||
f"{len(errors)} errors.\n"
|
||||
)
|
||||
|
||||
# Low confidence
|
||||
if low_conf:
|
||||
sections.append(f"### Low-Confidence Responses ({len(low_conf)})")
|
||||
for m in low_conf[:5]:
|
||||
ts = (m.get("timestamp") or "?")[:19]
|
||||
conf = m.get("confidence", 0)
|
||||
text = (m.get("content") or "")[:120]
|
||||
sections.append(f"- [{ts}] confidence={conf:.0%}: {text}")
|
||||
sections.append("")
|
||||
else:
|
||||
sections.append(
|
||||
"### Low-Confidence Responses\nNone found — all responses above threshold.\n"
|
||||
sections.extend(
|
||||
_format_reflection_section(
|
||||
"Low-Confidence Responses",
|
||||
low_conf,
|
||||
lambda m: (
|
||||
f"- [{(m.get('timestamp') or '?')[:19]}] "
|
||||
f"confidence={m.get('confidence', 0):.0%}: "
|
||||
f"{(m.get('content') or '')[:120]}"
|
||||
),
|
||||
"None found — all responses above threshold.",
|
||||
)
|
||||
)
|
||||
sections.extend(
|
||||
_format_reflection_section(
|
||||
"Errors",
|
||||
errors,
|
||||
lambda e: f"- [{(e.get('timestamp') or '?')[:19]}] {(e.get('error') or '')[:120]}",
|
||||
"No errors recorded.",
|
||||
)
|
||||
)
|
||||
|
||||
# Errors
|
||||
if errors:
|
||||
sections.append(f"### Errors ({len(errors)})")
|
||||
for e in errors[:5]:
|
||||
ts = (e.get("timestamp") or "?")[:19]
|
||||
err = (e.get("error") or "")[:120]
|
||||
sections.append(f"- [{ts}] {err}")
|
||||
sections.append("")
|
||||
else:
|
||||
sections.append("### Errors\nNo errors recorded.\n")
|
||||
|
||||
# Repeated topics
|
||||
if repeated:
|
||||
sections.append("### Recurring Topics")
|
||||
for word, count in repeated:
|
||||
@@ -413,22 +454,8 @@ def self_reflect(limit: int = 30) -> str:
|
||||
else:
|
||||
sections.append("### Recurring Topics\nNo strong patterns detected.\n")
|
||||
|
||||
# Actionable summary
|
||||
insights: list[str] = []
|
||||
if low_conf:
|
||||
insights.append("Consider studying topics where confidence was low.")
|
||||
if errors:
|
||||
insights.append("Review error patterns for recurring infrastructure issues.")
|
||||
if repeated:
|
||||
top_topic = repeated[0][0]
|
||||
insights.append(
|
||||
f'User frequently asks about "{top_topic}" — consider deepening knowledge here.'
|
||||
)
|
||||
if not insights:
|
||||
insights.append("Conversations look healthy. Keep up the good work.")
|
||||
|
||||
sections.append("### Insights")
|
||||
for insight in insights:
|
||||
for insight in _build_insights(low_conf, errors, repeated):
|
||||
sections.append(f"- {insight}")
|
||||
|
||||
return "\n".join(sections)
|
||||
|
||||
@@ -232,6 +232,90 @@ class ThinkingEngine:
|
||||
return False # Disabled — never idle
|
||||
return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout)
|
||||
|
||||
def _build_thinking_context(self) -> tuple[str, str, list["Thought"]]:
|
||||
"""Assemble the context needed for a thinking cycle.
|
||||
|
||||
Returns:
|
||||
(memory_context, system_context, recent_thoughts)
|
||||
"""
|
||||
memory_context = self._load_memory_context()
|
||||
system_context = self._gather_system_snapshot()
|
||||
recent_thoughts = self.get_recent_thoughts(limit=5)
|
||||
return memory_context, system_context, recent_thoughts
|
||||
|
||||
async def _generate_novel_thought(
|
||||
self,
|
||||
prompt: str | None,
|
||||
memory_context: str,
|
||||
system_context: str,
|
||||
recent_thoughts: list["Thought"],
|
||||
) -> tuple[str | None, str]:
|
||||
"""Run the dedup-retry loop to produce a novel thought.
|
||||
|
||||
Returns:
|
||||
(content, seed_type) — content is None if no novel thought produced.
|
||||
"""
|
||||
seed_type: str = "freeform"
|
||||
|
||||
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
|
||||
if prompt:
|
||||
seed_type = "prompted"
|
||||
seed_context = f"Journal prompt: {prompt}"
|
||||
else:
|
||||
seed_type, seed_context = self._gather_seed()
|
||||
|
||||
continuity = self._build_continuity_context()
|
||||
|
||||
full_prompt = _THINKING_PROMPT.format(
|
||||
memory_context=memory_context,
|
||||
system_context=system_context,
|
||||
seed_context=seed_context,
|
||||
continuity_context=continuity,
|
||||
)
|
||||
|
||||
try:
|
||||
raw = await self._call_agent(full_prompt)
|
||||
except Exception as exc:
|
||||
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
|
||||
return None, seed_type
|
||||
|
||||
if not raw or not raw.strip():
|
||||
logger.debug("Thinking cycle produced empty response, skipping")
|
||||
return None, seed_type
|
||||
|
||||
content = raw.strip()
|
||||
|
||||
# Dedup: reject thoughts too similar to recent ones
|
||||
if not self._is_too_similar(content, recent_thoughts):
|
||||
return content, seed_type # Good — novel thought
|
||||
|
||||
if attempt < self._MAX_DEDUP_RETRIES:
|
||||
logger.info(
|
||||
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
|
||||
attempt + 1,
|
||||
self._MAX_DEDUP_RETRIES + 1,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"Thought still repetitive after %d retries, discarding",
|
||||
self._MAX_DEDUP_RETRIES + 1,
|
||||
)
|
||||
return None, seed_type
|
||||
|
||||
return None, seed_type
|
||||
|
||||
async def _process_thinking_result(self, thought: "Thought") -> None:
|
||||
"""Run all post-hooks after a thought is stored."""
|
||||
self._maybe_check_memory()
|
||||
await self._maybe_distill()
|
||||
await self._maybe_file_issues()
|
||||
await self._check_workspace()
|
||||
self._maybe_check_memory_status()
|
||||
self._update_memory(thought)
|
||||
self._log_event(thought)
|
||||
self._write_journal(thought)
|
||||
await self._broadcast(thought)
|
||||
|
||||
async def think_once(self, prompt: str | None = None) -> Thought | None:
|
||||
"""Execute one thinking cycle.
|
||||
|
||||
@@ -257,91 +341,21 @@ class ThinkingEngine:
|
||||
)
|
||||
return None
|
||||
|
||||
memory_context = self._load_memory_context()
|
||||
system_context = self._gather_system_snapshot()
|
||||
recent_thoughts = self.get_recent_thoughts(limit=5)
|
||||
|
||||
content: str | None = None
|
||||
seed_type: str = "freeform"
|
||||
|
||||
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
|
||||
if prompt:
|
||||
seed_type = "prompted"
|
||||
seed_context = f"Journal prompt: {prompt}"
|
||||
else:
|
||||
seed_type, seed_context = self._gather_seed()
|
||||
|
||||
continuity = self._build_continuity_context()
|
||||
|
||||
full_prompt = _THINKING_PROMPT.format(
|
||||
memory_context=memory_context,
|
||||
system_context=system_context,
|
||||
seed_context=seed_context,
|
||||
continuity_context=continuity,
|
||||
)
|
||||
|
||||
try:
|
||||
raw = await self._call_agent(full_prompt)
|
||||
except Exception as exc:
|
||||
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
|
||||
return None
|
||||
|
||||
if not raw or not raw.strip():
|
||||
logger.debug("Thinking cycle produced empty response, skipping")
|
||||
return None
|
||||
|
||||
content = raw.strip()
|
||||
|
||||
# Dedup: reject thoughts too similar to recent ones
|
||||
if not self._is_too_similar(content, recent_thoughts):
|
||||
break # Good — novel thought
|
||||
|
||||
if attempt < self._MAX_DEDUP_RETRIES:
|
||||
logger.info(
|
||||
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
|
||||
attempt + 1,
|
||||
self._MAX_DEDUP_RETRIES + 1,
|
||||
)
|
||||
content = None # Will retry
|
||||
else:
|
||||
logger.warning(
|
||||
"Thought still repetitive after %d retries, discarding",
|
||||
self._MAX_DEDUP_RETRIES + 1,
|
||||
)
|
||||
return None
|
||||
memory_context, system_context, recent_thoughts = self._build_thinking_context()
|
||||
|
||||
content, seed_type = await self._generate_novel_thought(
|
||||
prompt,
|
||||
memory_context,
|
||||
system_context,
|
||||
recent_thoughts,
|
||||
)
|
||||
if not content:
|
||||
return None
|
||||
|
||||
thought = self._store_thought(content, seed_type)
|
||||
self._last_thought_id = thought.id
|
||||
|
||||
# Post-hook: check memory status periodically
|
||||
self._maybe_check_memory()
|
||||
|
||||
# Post-hook: distill facts from recent thoughts periodically
|
||||
await self._maybe_distill()
|
||||
|
||||
# Post-hook: file Gitea issues for actionable observations
|
||||
await self._maybe_file_issues()
|
||||
|
||||
# Post-hook: check workspace for new messages from Hermes
|
||||
await self._check_workspace()
|
||||
|
||||
# Post-hook: proactive memory status audit
|
||||
self._maybe_check_memory_status()
|
||||
|
||||
# Post-hook: update MEMORY.md with latest reflection
|
||||
self._update_memory(thought)
|
||||
|
||||
# Log to swarm event system
|
||||
self._log_event(thought)
|
||||
|
||||
# Append to daily journal file
|
||||
self._write_journal(thought)
|
||||
|
||||
# Broadcast to WebSocket clients
|
||||
await self._broadcast(thought)
|
||||
await self._process_thinking_result(thought)
|
||||
|
||||
logger.info(
|
||||
"Thought [%s] (%s): %s",
|
||||
@@ -758,23 +772,10 @@ class ThinkingEngine:
|
||||
except Exception as exc:
|
||||
logger.debug("Thought issue filing skipped: %s", exc)
|
||||
|
||||
def _gather_system_snapshot(self) -> str:
|
||||
"""Gather lightweight real system state for grounding thoughts in reality.
|
||||
# ── System snapshot helpers ────────────────────────────────────────────
|
||||
|
||||
Returns a short multi-line string with current time, thought count,
|
||||
recent chat activity, and task queue status. Never crashes — every
|
||||
section is independently try/excepted.
|
||||
"""
|
||||
parts: list[str] = []
|
||||
|
||||
# Current local time
|
||||
now = datetime.now().astimezone()
|
||||
tz = now.strftime("%Z") or "UTC"
|
||||
parts.append(
|
||||
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
|
||||
)
|
||||
|
||||
# Thought count today (cheap DB query)
|
||||
def _snap_thought_count(self, now: datetime) -> str | None:
|
||||
"""Return today's thought count, or *None* on failure."""
|
||||
try:
|
||||
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
with _get_conn(self._db_path) as conn:
|
||||
@@ -782,66 +783,94 @@ class ThinkingEngine:
|
||||
"SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
|
||||
(today_start.isoformat(),),
|
||||
).fetchone()["c"]
|
||||
parts.append(f"Thoughts today: {count}")
|
||||
return f"Thoughts today: {count}"
|
||||
except Exception as exc:
|
||||
logger.debug("Thought count query failed: %s", exc)
|
||||
pass
|
||||
return None
|
||||
|
||||
# Recent chat activity (in-memory, no I/O)
|
||||
def _snap_chat_activity(self) -> list[str]:
|
||||
"""Return chat-activity lines (in-memory, no I/O)."""
|
||||
try:
|
||||
from infrastructure.chat_store import message_log
|
||||
|
||||
messages = message_log.all()
|
||||
if messages:
|
||||
parts.append(f"Chat messages this session: {len(messages)}")
|
||||
last = messages[-1]
|
||||
parts.append(f'Last chat ({last.role}): "{last.content[:80]}"')
|
||||
else:
|
||||
parts.append("No chat messages this session")
|
||||
return [
|
||||
f"Chat messages this session: {len(messages)}",
|
||||
f'Last chat ({last.role}): "{last.content[:80]}"',
|
||||
]
|
||||
return ["No chat messages this session"]
|
||||
except Exception as exc:
|
||||
logger.debug("Chat activity query failed: %s", exc)
|
||||
pass
|
||||
return []
|
||||
|
||||
# Task queue (lightweight DB query)
|
||||
def _snap_task_queue(self) -> str | None:
|
||||
"""Return a one-line task queue summary, or *None*."""
|
||||
try:
|
||||
from swarm.task_queue.models import get_task_summary_for_briefing
|
||||
|
||||
summary = get_task_summary_for_briefing()
|
||||
running = summary.get("running", 0)
|
||||
pending = summary.get("pending_approval", 0)
|
||||
done = summary.get("completed", 0)
|
||||
failed = summary.get("failed", 0)
|
||||
s = get_task_summary_for_briefing()
|
||||
running, pending = s.get("running", 0), s.get("pending_approval", 0)
|
||||
done, failed = s.get("completed", 0), s.get("failed", 0)
|
||||
if running or pending or done or failed:
|
||||
parts.append(
|
||||
return (
|
||||
f"Tasks: {running} running, {pending} pending, "
|
||||
f"{done} completed, {failed} failed"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Task queue query failed: %s", exc)
|
||||
pass
|
||||
return None
|
||||
|
||||
# Workspace updates (file-based communication with Hermes)
|
||||
def _snap_workspace(self) -> list[str]:
|
||||
"""Return workspace-update lines (file-based Hermes comms)."""
|
||||
try:
|
||||
from timmy.workspace import workspace_monitor
|
||||
|
||||
updates = workspace_monitor.get_pending_updates()
|
||||
lines: list[str] = []
|
||||
new_corr = updates.get("new_correspondence")
|
||||
new_inbox = updates.get("new_inbox_files", [])
|
||||
|
||||
if new_corr:
|
||||
# Count entries (assuming each entry starts with a timestamp or header)
|
||||
line_count = len([line for line in new_corr.splitlines() if line.strip()])
|
||||
parts.append(
|
||||
line_count = len([ln for ln in new_corr.splitlines() if ln.strip()])
|
||||
lines.append(
|
||||
f"Workspace: {line_count} new correspondence entries (latest from: Hermes)"
|
||||
)
|
||||
new_inbox = updates.get("new_inbox_files", [])
|
||||
if new_inbox:
|
||||
files_str = ", ".join(new_inbox[:5])
|
||||
if len(new_inbox) > 5:
|
||||
files_str += f", ... (+{len(new_inbox) - 5} more)"
|
||||
parts.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
|
||||
lines.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
|
||||
return lines
|
||||
except Exception as exc:
|
||||
logger.debug("Workspace check failed: %s", exc)
|
||||
pass
|
||||
return []
|
||||
|
||||
def _gather_system_snapshot(self) -> str:
|
||||
"""Gather lightweight real system state for grounding thoughts in reality.
|
||||
|
||||
Returns a short multi-line string with current time, thought count,
|
||||
recent chat activity, and task queue status. Never crashes — every
|
||||
section is independently try/excepted.
|
||||
"""
|
||||
now = datetime.now().astimezone()
|
||||
tz = now.strftime("%Z") or "UTC"
|
||||
|
||||
parts: list[str] = [
|
||||
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
|
||||
]
|
||||
|
||||
thought_line = self._snap_thought_count(now)
|
||||
if thought_line:
|
||||
parts.append(thought_line)
|
||||
|
||||
parts.extend(self._snap_chat_activity())
|
||||
|
||||
task_line = self._snap_task_queue()
|
||||
if task_line:
|
||||
parts.append(task_line)
|
||||
|
||||
parts.extend(self._snap_workspace())
|
||||
|
||||
return "\n".join(parts) if parts else ""
|
||||
|
||||
@@ -1110,21 +1139,37 @@ class ThinkingEngine:
|
||||
lines.append(f"- [{thought.seed_type}] {snippet}")
|
||||
return "\n".join(lines)
|
||||
|
||||
_thinking_agent = None # cached agent — avoids per-call resource leaks (#525)
|
||||
|
||||
async def _call_agent(self, prompt: str) -> str:
|
||||
"""Call Timmy's agent to generate a thought.
|
||||
|
||||
Creates a lightweight agent with skip_mcp=True to avoid the cancel-scope
|
||||
Reuses a cached agent with skip_mcp=True to avoid the cancel-scope
|
||||
errors that occur when MCP stdio transports are spawned inside asyncio
|
||||
background tasks (#72). The thinking engine doesn't need Gitea or
|
||||
filesystem tools — it only needs the LLM.
|
||||
background tasks (#72) and to prevent per-call resource leaks (httpx
|
||||
clients, SQLite connections, model warmups) that caused the thinking
|
||||
loop to die every ~10 min (#525).
|
||||
|
||||
Individual calls are capped at 120 s so a hung Ollama never blocks
|
||||
the scheduler indefinitely.
|
||||
|
||||
Strips ``<think>`` tags from reasoning models (qwen3, etc.) so that
|
||||
downstream parsers (fact distillation, issue filing) receive clean text.
|
||||
"""
|
||||
from timmy.agent import create_timmy
|
||||
import asyncio
|
||||
|
||||
if self._thinking_agent is None:
|
||||
from timmy.agent import create_timmy
|
||||
|
||||
self._thinking_agent = create_timmy(skip_mcp=True)
|
||||
|
||||
try:
|
||||
async with asyncio.timeout(120):
|
||||
run = await self._thinking_agent.arun(prompt, stream=False)
|
||||
except TimeoutError:
|
||||
logger.warning("Thinking LLM call timed out after 120 s")
|
||||
return ""
|
||||
|
||||
agent = create_timmy(skip_mcp=True)
|
||||
run = await agent.arun(prompt, stream=False)
|
||||
raw = run.content if hasattr(run, "content") else str(run)
|
||||
return _THINK_TAG_RE.sub("", raw) if raw else raw
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ def get_system_info() -> dict[str, Any]:
|
||||
- python_version: Python version
|
||||
- platform: OS platform
|
||||
- model: Current Ollama model (queried from API)
|
||||
- model_backend: Configured backend (ollama/airllm/grok)
|
||||
- model_backend: Configured backend (ollama/grok/claude)
|
||||
- ollama_url: Ollama host URL
|
||||
- repo_root: Repository root path
|
||||
- grok_enabled: Whether GROK is enabled
|
||||
@@ -127,54 +127,48 @@ def check_ollama_health() -> dict[str, Any]:
|
||||
return result
|
||||
|
||||
|
||||
def get_memory_status() -> dict[str, Any]:
|
||||
"""Get the status of Timmy's memory system.
|
||||
|
||||
Returns:
|
||||
Dict with memory tier information
|
||||
"""
|
||||
from config import settings
|
||||
|
||||
repo_root = Path(settings.repo_root)
|
||||
|
||||
# Check tier 1: Hot memory
|
||||
def _hot_memory_info(repo_root: Path) -> dict[str, Any]:
|
||||
"""Tier 1: Hot memory (MEMORY.md) status."""
|
||||
memory_md = repo_root / "MEMORY.md"
|
||||
tier1_exists = memory_md.exists()
|
||||
tier1_content = ""
|
||||
if tier1_exists:
|
||||
tier1_content = memory_md.read_text()[:500] # First 500 chars
|
||||
tier1_content = memory_md.read_text()[:500]
|
||||
|
||||
# Check tier 2: Vault
|
||||
vault_path = repo_root / "memory" / "self"
|
||||
tier2_exists = vault_path.exists()
|
||||
tier2_files = []
|
||||
if tier2_exists:
|
||||
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()]
|
||||
|
||||
tier1_info: dict[str, Any] = {
|
||||
info: dict[str, Any] = {
|
||||
"exists": tier1_exists,
|
||||
"path": str(memory_md),
|
||||
"preview": " ".join(tier1_content[:200].split()) if tier1_content else None,
|
||||
}
|
||||
if tier1_exists:
|
||||
lines = memory_md.read_text().splitlines()
|
||||
tier1_info["line_count"] = len(lines)
|
||||
tier1_info["sections"] = [ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")]
|
||||
info["line_count"] = len(lines)
|
||||
info["sections"] = [ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")]
|
||||
return info
|
||||
|
||||
|
||||
def _vault_info(repo_root: Path) -> dict[str, Any]:
|
||||
"""Tier 2: Vault (memory/ directory tree) status."""
|
||||
vault_path = repo_root / "memory" / "self"
|
||||
tier2_exists = vault_path.exists()
|
||||
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()] if tier2_exists else []
|
||||
|
||||
# Vault — scan all subdirs under memory/
|
||||
vault_root = repo_root / "memory"
|
||||
vault_info: dict[str, Any] = {
|
||||
info: dict[str, Any] = {
|
||||
"exists": tier2_exists,
|
||||
"path": str(vault_path),
|
||||
"file_count": len(tier2_files),
|
||||
"files": tier2_files[:10],
|
||||
}
|
||||
if vault_root.exists():
|
||||
vault_info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()]
|
||||
vault_info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md"))
|
||||
info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()]
|
||||
info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md"))
|
||||
return info
|
||||
|
||||
# Tier 3: Semantic memory row count
|
||||
tier3_info: dict[str, Any] = {"available": False}
|
||||
|
||||
def _semantic_memory_info(repo_root: Path) -> dict[str, Any]:
|
||||
"""Tier 3: Semantic memory (vector DB) status."""
|
||||
info: dict[str, Any] = {"available": False}
|
||||
try:
|
||||
sem_db = repo_root / "data" / "memory.db"
|
||||
if sem_db.exists():
|
||||
@@ -184,14 +178,16 @@ def get_memory_status() -> dict[str, Any]:
|
||||
).fetchone()
|
||||
if row and row[0]:
|
||||
count = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()
|
||||
tier3_info["available"] = True
|
||||
tier3_info["vector_count"] = count[0] if count else 0
|
||||
info["available"] = True
|
||||
info["vector_count"] = count[0] if count else 0
|
||||
except Exception as exc:
|
||||
logger.debug("Memory status query failed: %s", exc)
|
||||
pass
|
||||
return info
|
||||
|
||||
# Self-coding journal stats
|
||||
journal_info: dict[str, Any] = {"available": False}
|
||||
|
||||
def _journal_info(repo_root: Path) -> dict[str, Any]:
|
||||
"""Self-coding journal statistics."""
|
||||
info: dict[str, Any] = {"available": False}
|
||||
try:
|
||||
journal_db = repo_root / "data" / "self_coding.db"
|
||||
if journal_db.exists():
|
||||
@@ -203,7 +199,7 @@ def get_memory_status() -> dict[str, Any]:
|
||||
if rows:
|
||||
counts = {r["outcome"]: r["cnt"] for r in rows}
|
||||
total = sum(counts.values())
|
||||
journal_info = {
|
||||
info = {
|
||||
"available": True,
|
||||
"total_attempts": total,
|
||||
"successes": counts.get("success", 0),
|
||||
@@ -212,13 +208,24 @@ def get_memory_status() -> dict[str, Any]:
|
||||
}
|
||||
except Exception as exc:
|
||||
logger.debug("Journal stats query failed: %s", exc)
|
||||
pass
|
||||
return info
|
||||
|
||||
|
||||
def get_memory_status() -> dict[str, Any]:
|
||||
"""Get the status of Timmy's memory system.
|
||||
|
||||
Returns:
|
||||
Dict with memory tier information
|
||||
"""
|
||||
from config import settings
|
||||
|
||||
repo_root = Path(settings.repo_root)
|
||||
|
||||
return {
|
||||
"tier1_hot_memory": tier1_info,
|
||||
"tier2_vault": vault_info,
|
||||
"tier3_semantic": tier3_info,
|
||||
"self_coding_journal": journal_info,
|
||||
"tier1_hot_memory": _hot_memory_info(repo_root),
|
||||
"tier2_vault": _vault_info(repo_root),
|
||||
"tier3_semantic": _semantic_memory_info(repo_root),
|
||||
"self_coding_journal": _journal_info(repo_root),
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -75,6 +75,8 @@ def create_timmy_serve_app() -> FastAPI:
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
logger.info("Timmy Serve starting")
|
||||
app.state.timmy = create_timmy()
|
||||
logger.info("Timmy agent cached in app state")
|
||||
yield
|
||||
logger.info("Timmy Serve shutting down")
|
||||
|
||||
@@ -101,7 +103,7 @@ def create_timmy_serve_app() -> FastAPI:
|
||||
async def serve_chat(request: Request, body: ChatRequest):
|
||||
"""Process a chat request."""
|
||||
try:
|
||||
timmy = create_timmy()
|
||||
timmy = request.app.state.timmy
|
||||
result = timmy.run(body.message, stream=False)
|
||||
response_text = result.content if hasattr(result, "content") else str(result)
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ except ImportError:
|
||||
# agno is a core dependency (always installed) — do NOT stub it, or its
|
||||
# internal import chains break under xdist parallel workers.
|
||||
for _mod in [
|
||||
"airllm",
|
||||
"mcp",
|
||||
"mcp.client",
|
||||
"mcp.client.stdio",
|
||||
|
||||
@@ -10,12 +10,10 @@ Categories:
|
||||
M3xx iOS keyboard & zoom prevention
|
||||
M4xx HTMX robustness (double-submit, sync)
|
||||
M5xx Safe-area / notch support
|
||||
M6xx AirLLM backend interface contract
|
||||
"""
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
# ── helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -206,147 +204,3 @@ def test_M505_dvh_units_used():
|
||||
"""Dynamic viewport height (dvh) accounts for collapsing browser chrome."""
|
||||
css = _css()
|
||||
assert "dvh" in css
|
||||
|
||||
|
||||
# ── M6xx — AirLLM backend interface contract ──────────────────────────────────
|
||||
|
||||
|
||||
def test_M601_airllm_agent_has_run_method():
|
||||
"""TimmyAirLLMAgent must expose run() so the dashboard route can call it."""
|
||||
from timmy.backends import TimmyAirLLMAgent
|
||||
|
||||
assert hasattr(TimmyAirLLMAgent, "run"), (
|
||||
"TimmyAirLLMAgent is missing run() — dashboard will fail with AirLLM backend"
|
||||
)
|
||||
|
||||
|
||||
def test_M602_airllm_run_returns_content_attribute():
|
||||
"""run() must return an object with a .content attribute (Agno RunResponse compat)."""
|
||||
with patch("timmy.backends.is_apple_silicon", return_value=False):
|
||||
from timmy.backends import TimmyAirLLMAgent
|
||||
|
||||
agent = TimmyAirLLMAgent(model_size="8b")
|
||||
|
||||
mock_model = MagicMock()
|
||||
mock_tokenizer = MagicMock()
|
||||
input_ids_mock = MagicMock()
|
||||
input_ids_mock.shape = [1, 5]
|
||||
mock_tokenizer.return_value = {"input_ids": input_ids_mock}
|
||||
mock_tokenizer.decode.return_value = "Sir, affirmative."
|
||||
mock_model.tokenizer = mock_tokenizer
|
||||
mock_model.generate.return_value = [list(range(10))]
|
||||
agent._model = mock_model
|
||||
|
||||
result = agent.run("test")
|
||||
assert hasattr(result, "content"), "run() result must have a .content attribute"
|
||||
assert isinstance(result.content, str)
|
||||
|
||||
|
||||
def test_M603_airllm_run_updates_history():
|
||||
"""run() must update _history so multi-turn context is preserved."""
|
||||
with patch("timmy.backends.is_apple_silicon", return_value=False):
|
||||
from timmy.backends import TimmyAirLLMAgent
|
||||
|
||||
agent = TimmyAirLLMAgent(model_size="8b")
|
||||
|
||||
mock_model = MagicMock()
|
||||
mock_tokenizer = MagicMock()
|
||||
input_ids_mock = MagicMock()
|
||||
input_ids_mock.shape = [1, 5]
|
||||
mock_tokenizer.return_value = {"input_ids": input_ids_mock}
|
||||
mock_tokenizer.decode.return_value = "Acknowledged."
|
||||
mock_model.tokenizer = mock_tokenizer
|
||||
mock_model.generate.return_value = [list(range(10))]
|
||||
agent._model = mock_model
|
||||
|
||||
assert len(agent._history) == 0
|
||||
agent.run("hello")
|
||||
assert len(agent._history) == 2
|
||||
assert any("hello" in h for h in agent._history)
|
||||
|
||||
|
||||
def test_M604_airllm_print_response_delegates_to_run():
|
||||
"""print_response must use run() so both interfaces share one inference path."""
|
||||
with patch("timmy.backends.is_apple_silicon", return_value=False):
|
||||
from timmy.backends import RunResult, TimmyAirLLMAgent
|
||||
|
||||
agent = TimmyAirLLMAgent(model_size="8b")
|
||||
|
||||
with (
|
||||
patch.object(agent, "run", return_value=RunResult(content="ok")) as mock_run,
|
||||
patch.object(agent, "_render"),
|
||||
):
|
||||
agent.print_response("hello", stream=True)
|
||||
|
||||
mock_run.assert_called_once_with("hello", stream=True)
|
||||
|
||||
|
||||
def test_M605_health_status_passes_model_to_template(client):
|
||||
"""Health status partial must receive the configured model name, not a hardcoded string."""
|
||||
from config import settings
|
||||
|
||||
with patch(
|
||||
"dashboard.routes.health.check_ollama",
|
||||
new_callable=AsyncMock,
|
||||
return_value=True,
|
||||
):
|
||||
response = client.get("/health/status")
|
||||
# Model name should come from settings, not be hardcoded
|
||||
assert response.status_code == 200
|
||||
model_short = settings.ollama_model.split(":")[0]
|
||||
assert model_short in response.text
|
||||
|
||||
|
||||
# ── M7xx — XSS prevention ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _mobile_html() -> str:
|
||||
"""Read the mobile template source."""
|
||||
path = Path(__file__).parent.parent.parent / "src" / "dashboard" / "templates" / "mobile.html"
|
||||
return path.read_text()
|
||||
|
||||
|
||||
def _swarm_live_html() -> str:
|
||||
"""Read the swarm live template source."""
|
||||
path = (
|
||||
Path(__file__).parent.parent.parent / "src" / "dashboard" / "templates" / "swarm_live.html"
|
||||
)
|
||||
return path.read_text()
|
||||
|
||||
|
||||
def test_M701_mobile_chat_no_raw_message_interpolation():
|
||||
"""mobile.html must not interpolate ${message} directly into innerHTML — XSS risk."""
|
||||
html = _mobile_html()
|
||||
# The vulnerable pattern is `${message}` inside a template literal assigned to innerHTML
|
||||
# After the fix, message must only appear via textContent assignment
|
||||
assert "textContent = message" in html or "textContent=message" in html, (
|
||||
"mobile.html still uses innerHTML + ${message} interpolation — XSS vulnerability"
|
||||
)
|
||||
|
||||
|
||||
def test_M702_mobile_chat_user_input_not_in_innerhtml_template_literal():
|
||||
"""${message} must not appear inside a backtick string that is assigned to innerHTML."""
|
||||
html = _mobile_html()
|
||||
# Find all innerHTML += `...` blocks and verify none contain ${message}
|
||||
blocks = re.findall(r"innerHTML\s*\+=?\s*`([^`]*)`", html, re.DOTALL)
|
||||
for block in blocks:
|
||||
assert "${message}" not in block, (
|
||||
"innerHTML template literal still contains ${message} — XSS vulnerability"
|
||||
)
|
||||
|
||||
|
||||
def test_M703_swarm_live_agent_name_not_interpolated_in_innerhtml():
|
||||
"""swarm_live.html must not put ${agent.name} inside innerHTML template literals."""
|
||||
html = _swarm_live_html()
|
||||
blocks = re.findall(r"innerHTML\s*=\s*agents\.map\([^;]+\)\.join\([^)]*\)", html, re.DOTALL)
|
||||
assert len(blocks) == 0, (
|
||||
"swarm_live.html still uses innerHTML=agents.map(…) with interpolated agent data — XSS vulnerability"
|
||||
)
|
||||
|
||||
|
||||
def test_M704_swarm_live_uses_textcontent_for_agent_data():
|
||||
"""swarm_live.html must use textContent (not innerHTML) to set agent name/description."""
|
||||
html = _swarm_live_html()
|
||||
assert "textContent" in html, (
|
||||
"swarm_live.html does not use textContent — agent data may be raw-interpolated into DOM"
|
||||
)
|
||||
|
||||
@@ -5,9 +5,14 @@ from datetime import UTC, datetime, timedelta
|
||||
from unittest.mock import patch
|
||||
|
||||
from infrastructure.error_capture import (
|
||||
_create_bug_report,
|
||||
_dedup_cache,
|
||||
_extract_traceback_info,
|
||||
_get_git_context,
|
||||
_is_duplicate,
|
||||
_log_error_event,
|
||||
_notify_bug_report,
|
||||
_record_to_session,
|
||||
_stack_hash,
|
||||
capture_error,
|
||||
)
|
||||
@@ -193,3 +198,91 @@ class TestCaptureError:
|
||||
|
||||
def teardown_method(self):
|
||||
_dedup_cache.clear()
|
||||
|
||||
|
||||
class TestExtractTracebackInfo:
|
||||
"""Test _extract_traceback_info helper."""
|
||||
|
||||
def test_returns_three_tuple(self):
|
||||
try:
|
||||
raise ValueError("extract test")
|
||||
except ValueError as e:
|
||||
tb_str, affected_file, affected_line = _extract_traceback_info(e)
|
||||
assert "ValueError" in tb_str
|
||||
assert "extract test" in tb_str
|
||||
assert affected_file.endswith(".py")
|
||||
assert affected_line > 0
|
||||
|
||||
def test_file_points_to_raise_site(self):
|
||||
try:
|
||||
_make_exception()
|
||||
except ValueError as e:
|
||||
_, affected_file, _ = _extract_traceback_info(e)
|
||||
assert "test_error_capture" in affected_file
|
||||
|
||||
|
||||
class TestLogErrorEvent:
|
||||
"""Test _log_error_event helper."""
|
||||
|
||||
def test_does_not_crash_on_missing_deps(self):
|
||||
try:
|
||||
raise RuntimeError("log test")
|
||||
except RuntimeError as e:
|
||||
_log_error_event(e, "test", "abc123", "file.py", 42, {"branch": "main"})
|
||||
|
||||
|
||||
class TestCreateBugReport:
|
||||
"""Test _create_bug_report helper."""
|
||||
|
||||
def test_does_not_crash_on_missing_deps(self):
|
||||
try:
|
||||
raise RuntimeError("report test")
|
||||
except RuntimeError as e:
|
||||
result = _create_bug_report(
|
||||
e, "test", None, "abc123", "traceback...", "file.py", 42, {}
|
||||
)
|
||||
# May return None if swarm deps unavailable — that's fine
|
||||
assert result is None or isinstance(result, str)
|
||||
|
||||
def test_with_context(self):
|
||||
try:
|
||||
raise RuntimeError("ctx test")
|
||||
except RuntimeError as e:
|
||||
result = _create_bug_report(e, "test", {"path": "/api"}, "abc", "tb", "f.py", 1, {})
|
||||
assert result is None or isinstance(result, str)
|
||||
|
||||
|
||||
class TestNotifyBugReport:
|
||||
"""Test _notify_bug_report helper."""
|
||||
|
||||
def test_does_not_crash(self):
|
||||
try:
|
||||
raise RuntimeError("notify test")
|
||||
except RuntimeError as e:
|
||||
_notify_bug_report(e, "test")
|
||||
|
||||
|
||||
class TestRecordToSession:
|
||||
"""Test _record_to_session helper."""
|
||||
|
||||
def test_does_not_crash_without_recorder(self):
|
||||
try:
|
||||
raise RuntimeError("session test")
|
||||
except RuntimeError as e:
|
||||
_record_to_session(e, "test")
|
||||
|
||||
def test_calls_registered_recorder(self):
|
||||
from infrastructure.error_capture import register_error_recorder
|
||||
|
||||
calls = []
|
||||
register_error_recorder(lambda **kwargs: calls.append(kwargs))
|
||||
try:
|
||||
try:
|
||||
raise RuntimeError("callback test")
|
||||
except RuntimeError as e:
|
||||
_record_to_session(e, "test_source")
|
||||
assert len(calls) == 1
|
||||
assert "RuntimeError" in calls[0]["error"]
|
||||
assert calls[0]["context"] == "test_source"
|
||||
finally:
|
||||
register_error_recorder(None)
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
@@ -489,34 +489,6 @@ class TestProviderAvailabilityCheck:
|
||||
|
||||
assert router._check_provider_available(provider) is False
|
||||
|
||||
def test_check_airllm_installed(self):
|
||||
"""Test AirLLM when installed."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
|
||||
provider = Provider(
|
||||
name="airllm",
|
||||
type="airllm",
|
||||
enabled=True,
|
||||
priority=1,
|
||||
)
|
||||
|
||||
with patch("importlib.util.find_spec", return_value=MagicMock()):
|
||||
assert router._check_provider_available(provider) is True
|
||||
|
||||
def test_check_airllm_not_installed(self):
|
||||
"""Test AirLLM when not installed."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
|
||||
provider = Provider(
|
||||
name="airllm",
|
||||
type="airllm",
|
||||
enabled=True,
|
||||
priority=1,
|
||||
)
|
||||
|
||||
with patch("importlib.util.find_spec", return_value=None):
|
||||
assert router._check_provider_available(provider) is False
|
||||
|
||||
|
||||
class TestCascadeRouterReload:
|
||||
"""Test hot-reload of providers.yaml."""
|
||||
|
||||
86
tests/loop/test_cycle_retro.py
Normal file
86
tests/loop/test_cycle_retro.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Tests for scripts/cycle_retro.py issue auto-detection."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
# Import the module under test — it's a script so we import the helpers directly
|
||||
import importlib
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
SCRIPTS_DIR = Path(__file__).resolve().parent.parent.parent / "scripts"
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _add_scripts_to_path(monkeypatch):
|
||||
monkeypatch.syspath_prepend(str(SCRIPTS_DIR))
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mod():
|
||||
"""Import cycle_retro as a module."""
|
||||
return importlib.import_module("cycle_retro")
|
||||
|
||||
|
||||
class TestDetectIssueFromBranch:
|
||||
def test_kimi_issue_branch(self, mod):
|
||||
with patch.object(subprocess, "check_output", return_value="kimi/issue-492\n"):
|
||||
assert mod.detect_issue_from_branch() == 492
|
||||
|
||||
def test_plain_issue_branch(self, mod):
|
||||
with patch.object(subprocess, "check_output", return_value="issue-123\n"):
|
||||
assert mod.detect_issue_from_branch() == 123
|
||||
|
||||
def test_issue_slash_number(self, mod):
|
||||
with patch.object(subprocess, "check_output", return_value="fix/issue/55\n"):
|
||||
assert mod.detect_issue_from_branch() == 55
|
||||
|
||||
def test_no_issue_in_branch(self, mod):
|
||||
with patch.object(subprocess, "check_output", return_value="main\n"):
|
||||
assert mod.detect_issue_from_branch() is None
|
||||
|
||||
def test_feature_branch(self, mod):
|
||||
with patch.object(subprocess, "check_output", return_value="feature/add-widget\n"):
|
||||
assert mod.detect_issue_from_branch() is None
|
||||
|
||||
def test_git_not_available(self, mod):
|
||||
with patch.object(subprocess, "check_output", side_effect=FileNotFoundError):
|
||||
assert mod.detect_issue_from_branch() is None
|
||||
|
||||
def test_git_fails(self, mod):
|
||||
with patch.object(
|
||||
subprocess,
|
||||
"check_output",
|
||||
side_effect=subprocess.CalledProcessError(1, "git"),
|
||||
):
|
||||
assert mod.detect_issue_from_branch() is None
|
||||
|
||||
|
||||
class TestBackfillExtractIssueNumber:
|
||||
"""Tests for backfill_retro.extract_issue_number PR-number filtering."""
|
||||
|
||||
@pytest.fixture()
|
||||
def backfill(self):
|
||||
return importlib.import_module("backfill_retro")
|
||||
|
||||
def test_body_has_issue(self, backfill):
|
||||
assert backfill.extract_issue_number("fix: foo (#491)", "Fixes #490", pr_number=491) == 490
|
||||
|
||||
def test_title_skips_pr_number(self, backfill):
|
||||
assert backfill.extract_issue_number("fix: foo (#491)", "", pr_number=491) is None
|
||||
|
||||
def test_title_with_issue_and_pr(self, backfill):
|
||||
# [loop-cycle-538] refactor: ... (#459) (#481)
|
||||
assert (
|
||||
backfill.extract_issue_number(
|
||||
"[loop-cycle-538] refactor: remove dead airllm (#459) (#481)",
|
||||
"",
|
||||
pr_number=481,
|
||||
)
|
||||
== 459
|
||||
)
|
||||
|
||||
def test_no_pr_number_provided(self, backfill):
|
||||
assert backfill.extract_issue_number("fix: foo (#491)", "") == 491
|
||||
@@ -49,6 +49,34 @@ class TestConfigLazyValidation:
|
||||
# Should not raise
|
||||
validate_startup(force=True)
|
||||
|
||||
def test_validate_startup_exits_on_cors_wildcard_in_production(self):
|
||||
"""validate_startup() should exit in production when CORS has wildcard."""
|
||||
from config import settings, validate_startup
|
||||
|
||||
with (
|
||||
patch.object(settings, "timmy_env", "production"),
|
||||
patch.object(settings, "l402_hmac_secret", "test-secret-hex-value-32"),
|
||||
patch.object(settings, "l402_macaroon_secret", "test-macaroon-hex-value-32"),
|
||||
patch.object(settings, "cors_origins", ["*"]),
|
||||
pytest.raises(SystemExit),
|
||||
):
|
||||
validate_startup(force=True)
|
||||
|
||||
def test_validate_startup_warns_cors_wildcard_in_dev(self):
|
||||
"""validate_startup() should warn in dev when CORS has wildcard."""
|
||||
from config import settings, validate_startup
|
||||
|
||||
with (
|
||||
patch.object(settings, "timmy_env", "development"),
|
||||
patch.object(settings, "cors_origins", ["*"]),
|
||||
patch("config._startup_logger") as mock_logger,
|
||||
):
|
||||
validate_startup(force=True)
|
||||
mock_logger.warning.assert_any_call(
|
||||
"SEC: CORS_ORIGINS contains wildcard '*' — "
|
||||
"restrict to explicit origins before deploying to production."
|
||||
)
|
||||
|
||||
def test_validate_startup_skips_in_test_mode(self):
|
||||
"""validate_startup() should be a no-op in test mode."""
|
||||
from config import validate_startup
|
||||
|
||||
@@ -81,7 +81,6 @@ def test_create_timmy_respects_custom_ollama_url():
|
||||
mock_settings.ollama_url = custom_url
|
||||
mock_settings.ollama_num_ctx = 4096
|
||||
mock_settings.timmy_model_backend = "ollama"
|
||||
mock_settings.airllm_model_size = "70b"
|
||||
|
||||
from timmy.agent import create_timmy
|
||||
|
||||
@@ -91,33 +90,6 @@ def test_create_timmy_respects_custom_ollama_url():
|
||||
assert kwargs["host"] == custom_url
|
||||
|
||||
|
||||
# ── AirLLM path ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_create_timmy_airllm_returns_airllm_agent():
|
||||
"""backend='airllm' must return a TimmyAirLLMAgent, not an Agno Agent."""
|
||||
with patch("timmy.backends.is_apple_silicon", return_value=False):
|
||||
from timmy.agent import create_timmy
|
||||
from timmy.backends import TimmyAirLLMAgent
|
||||
|
||||
result = create_timmy(backend="airllm", model_size="8b")
|
||||
|
||||
assert isinstance(result, TimmyAirLLMAgent)
|
||||
|
||||
|
||||
def test_create_timmy_airllm_does_not_call_agno_agent():
|
||||
"""When using the airllm backend, Agno Agent should never be instantiated."""
|
||||
with (
|
||||
patch("timmy.agent.Agent") as MockAgent,
|
||||
patch("timmy.backends.is_apple_silicon", return_value=False),
|
||||
):
|
||||
from timmy.agent import create_timmy
|
||||
|
||||
create_timmy(backend="airllm", model_size="8b")
|
||||
|
||||
MockAgent.assert_not_called()
|
||||
|
||||
|
||||
def test_create_timmy_explicit_ollama_ignores_autodetect():
|
||||
"""backend='ollama' must always use Ollama, even on Apple Silicon."""
|
||||
with (
|
||||
@@ -141,7 +113,6 @@ def test_create_timmy_explicit_ollama_ignores_autodetect():
|
||||
def test_resolve_backend_explicit_takes_priority():
|
||||
from timmy.agent import _resolve_backend
|
||||
|
||||
assert _resolve_backend("airllm") == "airllm"
|
||||
assert _resolve_backend("ollama") == "ollama"
|
||||
|
||||
|
||||
@@ -152,39 +123,6 @@ def test_resolve_backend_defaults_to_ollama_without_config():
|
||||
assert _resolve_backend(None) == "ollama"
|
||||
|
||||
|
||||
def test_resolve_backend_auto_uses_airllm_on_apple_silicon():
|
||||
"""'auto' on Apple Silicon with airllm stubbed → 'airllm'."""
|
||||
with (
|
||||
patch("timmy.backends.is_apple_silicon", return_value=True),
|
||||
patch("timmy.agent.settings") as mock_settings,
|
||||
):
|
||||
mock_settings.timmy_model_backend = "auto"
|
||||
mock_settings.airllm_model_size = "70b"
|
||||
mock_settings.ollama_model = "llama3.2"
|
||||
|
||||
from timmy.agent import _resolve_backend
|
||||
|
||||
assert _resolve_backend(None) == "airllm"
|
||||
|
||||
|
||||
def test_resolve_backend_auto_falls_back_on_non_apple():
|
||||
"""'auto' on non-Apple Silicon → 'ollama'."""
|
||||
with (
|
||||
patch("timmy.backends.is_apple_silicon", return_value=False),
|
||||
patch("timmy.agent.settings") as mock_settings,
|
||||
):
|
||||
mock_settings.timmy_model_backend = "auto"
|
||||
mock_settings.airllm_model_size = "70b"
|
||||
mock_settings.ollama_model = "llama3.2"
|
||||
|
||||
from timmy.agent import _resolve_backend
|
||||
|
||||
assert _resolve_backend(None) == "ollama"
|
||||
|
||||
|
||||
# ── _model_supports_tools ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_model_supports_tools_llama32_returns_false():
|
||||
"""llama3.2 (3B) is too small for reliable tool calling."""
|
||||
from timmy.agent import _model_supports_tools
|
||||
@@ -259,7 +197,6 @@ def test_create_timmy_includes_tools_for_large_model():
|
||||
mock_settings.ollama_url = "http://localhost:11434"
|
||||
mock_settings.ollama_num_ctx = 4096
|
||||
mock_settings.timmy_model_backend = "ollama"
|
||||
mock_settings.airllm_model_size = "70b"
|
||||
mock_settings.telemetry_enabled = False
|
||||
|
||||
from timmy.agent import create_timmy
|
||||
@@ -444,6 +381,150 @@ def test_get_effective_ollama_model_walks_fallback_chain():
|
||||
assert result == "fb-2"
|
||||
|
||||
|
||||
# ── _build_tools_list ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_build_tools_list_empty_when_tools_disabled():
|
||||
"""Small models get an empty tools list."""
|
||||
from timmy.agent import _build_tools_list
|
||||
|
||||
result = _build_tools_list(use_tools=False, skip_mcp=False, model_name="llama3.2")
|
||||
assert result == []
|
||||
|
||||
|
||||
def test_build_tools_list_includes_toolkit_when_enabled():
|
||||
"""Tool-capable models get the full toolkit."""
|
||||
mock_toolkit = MagicMock()
|
||||
with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit):
|
||||
from timmy.agent import _build_tools_list
|
||||
|
||||
result = _build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
|
||||
assert mock_toolkit in result
|
||||
|
||||
|
||||
def test_build_tools_list_skips_mcp_when_flagged():
|
||||
"""skip_mcp=True must not call MCP factories."""
|
||||
mock_toolkit = MagicMock()
|
||||
with (
|
||||
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
|
||||
patch("timmy.mcp_tools.create_gitea_mcp_tools") as mock_gitea,
|
||||
patch("timmy.mcp_tools.create_filesystem_mcp_tools") as mock_fs,
|
||||
):
|
||||
from timmy.agent import _build_tools_list
|
||||
|
||||
_build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
|
||||
mock_gitea.assert_not_called()
|
||||
mock_fs.assert_not_called()
|
||||
|
||||
|
||||
def test_build_tools_list_includes_mcp_when_not_skipped():
|
||||
"""skip_mcp=False should attempt MCP tool creation."""
|
||||
mock_toolkit = MagicMock()
|
||||
with (
|
||||
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
|
||||
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=None) as mock_gitea,
|
||||
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None) as mock_fs,
|
||||
):
|
||||
from timmy.agent import _build_tools_list
|
||||
|
||||
_build_tools_list(use_tools=True, skip_mcp=False, model_name="llama3.1")
|
||||
mock_gitea.assert_called_once()
|
||||
mock_fs.assert_called_once()
|
||||
|
||||
|
||||
# ── _build_prompt ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_build_prompt_includes_base_prompt():
|
||||
"""Prompt should always contain the base system prompt."""
|
||||
from timmy.agent import _build_prompt
|
||||
|
||||
result = _build_prompt(use_tools=False, session_id="test")
|
||||
assert "Timmy" in result
|
||||
|
||||
|
||||
def test_build_prompt_appends_memory_context():
|
||||
"""Memory context should be appended when available."""
|
||||
mock_memory = MagicMock()
|
||||
mock_memory.get_system_context.return_value = "User prefers dark mode."
|
||||
with patch("timmy.memory_system.memory_system", mock_memory):
|
||||
from timmy.agent import _build_prompt
|
||||
|
||||
result = _build_prompt(use_tools=True, session_id="test")
|
||||
assert "GROUNDED CONTEXT" in result
|
||||
assert "dark mode" in result
|
||||
|
||||
|
||||
def test_build_prompt_truncates_long_memory():
|
||||
"""Long memory context should be truncated."""
|
||||
mock_memory = MagicMock()
|
||||
mock_memory.get_system_context.return_value = "x" * 10000
|
||||
with patch("timmy.memory_system.memory_system", mock_memory):
|
||||
from timmy.agent import _build_prompt
|
||||
|
||||
result = _build_prompt(use_tools=False, session_id="test")
|
||||
assert "[truncated]" in result
|
||||
|
||||
|
||||
def test_build_prompt_survives_memory_failure():
|
||||
"""Prompt should fall back to base when memory fails."""
|
||||
mock_memory = MagicMock()
|
||||
mock_memory.get_system_context.side_effect = RuntimeError("db locked")
|
||||
with patch("timmy.memory_system.memory_system", mock_memory):
|
||||
from timmy.agent import _build_prompt
|
||||
|
||||
result = _build_prompt(use_tools=True, session_id="test")
|
||||
assert "Timmy" in result
|
||||
# Memory context should NOT be appended (the db locked error was caught)
|
||||
assert "db locked" not in result
|
||||
|
||||
|
||||
# ── _create_ollama_agent ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_create_ollama_agent_passes_correct_kwargs():
|
||||
"""_create_ollama_agent must pass the expected kwargs to Agent."""
|
||||
with (
|
||||
patch("timmy.agent.Agent") as MockAgent,
|
||||
patch("timmy.agent.Ollama"),
|
||||
patch("timmy.agent.SqliteDb"),
|
||||
patch("timmy.agent._warmup_model", return_value=True),
|
||||
):
|
||||
from timmy.agent import _create_ollama_agent
|
||||
|
||||
_create_ollama_agent(
|
||||
db_file="test.db",
|
||||
model_name="llama3.1",
|
||||
tools_list=[MagicMock()],
|
||||
full_prompt="test prompt",
|
||||
use_tools=True,
|
||||
)
|
||||
kwargs = MockAgent.call_args.kwargs
|
||||
assert kwargs["description"] == "test prompt"
|
||||
assert kwargs["markdown"] is False
|
||||
|
||||
|
||||
def test_create_ollama_agent_none_tools_when_empty():
|
||||
"""Empty tools_list should pass tools=None to Agent."""
|
||||
with (
|
||||
patch("timmy.agent.Agent") as MockAgent,
|
||||
patch("timmy.agent.Ollama"),
|
||||
patch("timmy.agent.SqliteDb"),
|
||||
patch("timmy.agent._warmup_model", return_value=True),
|
||||
):
|
||||
from timmy.agent import _create_ollama_agent
|
||||
|
||||
_create_ollama_agent(
|
||||
db_file="test.db",
|
||||
model_name="llama3.2",
|
||||
tools_list=[],
|
||||
full_prompt="test prompt",
|
||||
use_tools=False,
|
||||
)
|
||||
kwargs = MockAgent.call_args.kwargs
|
||||
assert kwargs["tools"] is None
|
||||
|
||||
|
||||
def test_no_hardcoded_fallback_constants_in_agent():
|
||||
"""agent.py must not define module-level DEFAULT_MODEL_FALLBACKS."""
|
||||
import timmy.agent as agent_mod
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
"""Tests for src/timmy/backends.py — AirLLM wrapper and helpers."""
|
||||
"""Tests for src/timmy/backends.py — backend helpers and classes."""
|
||||
|
||||
import sys
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# ── is_apple_silicon ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -38,183 +35,6 @@ def test_is_apple_silicon_false_on_intel_mac():
|
||||
assert is_apple_silicon() is False
|
||||
|
||||
|
||||
# ── airllm_available ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_airllm_available_true_when_stub_in_sys_modules():
|
||||
# conftest already stubs 'airllm' — importable → True.
|
||||
from timmy.backends import airllm_available
|
||||
|
||||
assert airllm_available() is True
|
||||
|
||||
|
||||
def test_airllm_available_false_when_not_importable():
|
||||
# Temporarily remove the stub to simulate airllm not installed.
|
||||
saved = sys.modules.pop("airllm", None)
|
||||
try:
|
||||
from timmy.backends import airllm_available
|
||||
|
||||
assert airllm_available() is False
|
||||
finally:
|
||||
if saved is not None:
|
||||
sys.modules["airllm"] = saved
|
||||
|
||||
|
||||
# ── TimmyAirLLMAgent construction ────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_airllm_agent_raises_on_unknown_size():
|
||||
from timmy.backends import TimmyAirLLMAgent
|
||||
|
||||
with pytest.raises(ValueError, match="Unknown model size"):
|
||||
TimmyAirLLMAgent(model_size="3b")
|
||||
|
||||
|
||||
def test_airllm_agent_uses_automodel_on_non_apple():
|
||||
"""Non-Apple-Silicon path uses AutoModel.from_pretrained."""
|
||||
with patch("timmy.backends.is_apple_silicon", return_value=False):
|
||||
from timmy.backends import TimmyAirLLMAgent
|
||||
|
||||
TimmyAirLLMAgent(model_size="8b")
|
||||
# sys.modules["airllm"] is a MagicMock; AutoModel.from_pretrained was called.
|
||||
assert sys.modules["airllm"].AutoModel.from_pretrained.called
|
||||
|
||||
|
||||
def test_airllm_agent_uses_mlx_on_apple_silicon():
|
||||
"""Apple Silicon path uses AirLLMMLX, not AutoModel."""
|
||||
with patch("timmy.backends.is_apple_silicon", return_value=True):
|
||||
from timmy.backends import TimmyAirLLMAgent
|
||||
|
||||
TimmyAirLLMAgent(model_size="8b")
|
||||
assert sys.modules["airllm"].AirLLMMLX.called
|
||||
|
||||
|
||||
def test_airllm_agent_resolves_correct_model_id_for_70b():
|
||||
with patch("timmy.backends.is_apple_silicon", return_value=False):
|
||||
from timmy.backends import _AIRLLM_MODELS, TimmyAirLLMAgent
|
||||
|
||||
TimmyAirLLMAgent(model_size="70b")
|
||||
sys.modules["airllm"].AutoModel.from_pretrained.assert_called_with(_AIRLLM_MODELS["70b"])
|
||||
|
||||
|
||||
# ── TimmyAirLLMAgent.print_response ──────────────────────────────────────────
|
||||
|
||||
|
||||
def _make_agent(model_size: str = "8b") -> "TimmyAirLLMAgent": # noqa: F821
|
||||
"""Helper: create an agent with a fully mocked underlying model."""
|
||||
with patch("timmy.backends.is_apple_silicon", return_value=False):
|
||||
from timmy.backends import TimmyAirLLMAgent
|
||||
|
||||
agent = TimmyAirLLMAgent(model_size=model_size)
|
||||
|
||||
# Replace the underlying model with a clean mock that returns predictable output.
|
||||
mock_model = MagicMock()
|
||||
mock_tokenizer = MagicMock()
|
||||
# tokenizer() returns a dict-like object with an "input_ids" tensor mock.
|
||||
input_ids_mock = MagicMock()
|
||||
input_ids_mock.shape = [1, 10] # shape[1] = prompt token count = 10
|
||||
token_dict = {"input_ids": input_ids_mock}
|
||||
mock_tokenizer.return_value = token_dict
|
||||
# generate() returns a list of token sequences.
|
||||
mock_tokenizer.decode.return_value = "Sir, affirmative."
|
||||
mock_model.tokenizer = mock_tokenizer
|
||||
mock_model.generate.return_value = [list(range(15))] # 15 tokens total
|
||||
agent._model = mock_model
|
||||
return agent
|
||||
|
||||
|
||||
def test_print_response_calls_generate():
|
||||
agent = _make_agent()
|
||||
agent.print_response("What is sovereignty?", stream=True)
|
||||
agent._model.generate.assert_called_once()
|
||||
|
||||
|
||||
def test_print_response_decodes_only_generated_tokens():
|
||||
agent = _make_agent()
|
||||
agent.print_response("Hello", stream=False)
|
||||
# decode should be called with tokens starting at index 10 (prompt length).
|
||||
decode_call = agent._model.tokenizer.decode.call_args
|
||||
token_slice = decode_call[0][0]
|
||||
assert list(token_slice) == list(range(10, 15))
|
||||
|
||||
|
||||
def test_print_response_updates_history():
|
||||
agent = _make_agent()
|
||||
agent.print_response("First message")
|
||||
assert any("First message" in turn for turn in agent._history)
|
||||
assert any("Timmy:" in turn for turn in agent._history)
|
||||
|
||||
|
||||
def test_print_response_history_included_in_second_prompt():
|
||||
agent = _make_agent()
|
||||
agent.print_response("First")
|
||||
# Build the prompt for the second call — history should appear.
|
||||
prompt = agent._build_prompt("Second")
|
||||
assert "First" in prompt
|
||||
assert "Second" in prompt
|
||||
|
||||
|
||||
def test_print_response_stream_flag_accepted():
|
||||
"""stream=False should not raise — it's accepted for API compatibility."""
|
||||
agent = _make_agent()
|
||||
agent.print_response("hello", stream=False) # no error
|
||||
|
||||
|
||||
# ── Prompt formatting tests ────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_airllm_prompt_contains_formatted_model_name():
|
||||
"""AirLLM prompt should have actual model name, not literal {model_name}."""
|
||||
with (
|
||||
patch("timmy.backends.is_apple_silicon", return_value=False),
|
||||
patch("config.settings") as mock_settings,
|
||||
):
|
||||
mock_settings.ollama_model = "llama3.2:3b"
|
||||
from timmy.backends import TimmyAirLLMAgent
|
||||
|
||||
agent = TimmyAirLLMAgent(model_size="8b")
|
||||
prompt = agent._build_prompt("test message")
|
||||
|
||||
# Should contain the actual model name, not the placeholder
|
||||
assert "{model_name}" not in prompt
|
||||
assert "llama3.2:3b" in prompt
|
||||
|
||||
|
||||
def test_airllm_prompt_gets_lite_tier():
|
||||
"""AirLLM should get LITE tier prompt (tools_enabled=False)."""
|
||||
with (
|
||||
patch("timmy.backends.is_apple_silicon", return_value=False),
|
||||
patch("config.settings") as mock_settings,
|
||||
):
|
||||
mock_settings.ollama_model = "test-model"
|
||||
from timmy.backends import TimmyAirLLMAgent
|
||||
|
||||
agent = TimmyAirLLMAgent(model_size="8b")
|
||||
prompt = agent._build_prompt("test message")
|
||||
|
||||
# LITE tier should NOT have TOOL USAGE section
|
||||
assert "TOOL USAGE" not in prompt
|
||||
# LITE tier should have the basic rules
|
||||
assert "Be brief by default" in prompt
|
||||
|
||||
|
||||
def test_airllm_prompt_contains_session_id():
|
||||
"""AirLLM prompt should have session_id formatted, not placeholder."""
|
||||
with (
|
||||
patch("timmy.backends.is_apple_silicon", return_value=False),
|
||||
patch("config.settings") as mock_settings,
|
||||
):
|
||||
mock_settings.ollama_model = "test-model"
|
||||
from timmy.backends import TimmyAirLLMAgent
|
||||
|
||||
agent = TimmyAirLLMAgent(model_size="8b")
|
||||
prompt = agent._build_prompt("test message")
|
||||
|
||||
# Should contain the session_id, not the placeholder
|
||||
assert '{session_id}"' not in prompt
|
||||
assert 'session "airllm"' in prompt
|
||||
|
||||
|
||||
# ── ClaudeBackend ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
|
||||
@@ -107,19 +107,7 @@ def test_chat_new_session_uses_unique_id():
|
||||
|
||||
|
||||
def test_chat_passes_backend_option():
|
||||
"""chat --backend airllm must forward the backend to create_timmy."""
|
||||
mock_run_output = MagicMock()
|
||||
mock_run_output.content = "OK"
|
||||
mock_run_output.status = "COMPLETED"
|
||||
mock_run_output.active_requirements = []
|
||||
|
||||
mock_timmy = MagicMock()
|
||||
mock_timmy.run.return_value = mock_run_output
|
||||
|
||||
with patch("timmy.cli.create_timmy", return_value=mock_timmy) as mock_create:
|
||||
runner.invoke(app, ["chat", "test", "--backend", "airllm"])
|
||||
|
||||
mock_create.assert_called_once_with(backend="airllm", model_size=None, session_id="cli")
|
||||
pass
|
||||
|
||||
|
||||
def test_chat_cleans_response():
|
||||
|
||||
@@ -8,11 +8,14 @@ from fastapi.testclient import TestClient
|
||||
|
||||
@pytest.fixture
|
||||
def serve_client():
|
||||
"""Create a TestClient for the timmy-serve app."""
|
||||
from timmy_serve.app import create_timmy_serve_app
|
||||
"""Create a TestClient for the timmy-serve app with mocked Timmy agent."""
|
||||
with patch("timmy_serve.app.create_timmy") as mock_create:
|
||||
mock_create.return_value = MagicMock()
|
||||
from timmy_serve.app import create_timmy_serve_app
|
||||
|
||||
app = create_timmy_serve_app()
|
||||
return TestClient(app)
|
||||
app = create_timmy_serve_app()
|
||||
with TestClient(app) as client:
|
||||
yield client
|
||||
|
||||
|
||||
class TestHealthEndpoint:
|
||||
@@ -34,18 +37,40 @@ class TestServeStatus:
|
||||
|
||||
class TestServeChatEndpoint:
|
||||
@patch("timmy_serve.app.create_timmy")
|
||||
def test_chat_returns_response(self, mock_create, serve_client):
|
||||
def test_chat_returns_response(self, mock_create):
|
||||
mock_agent = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_result.content = "I am Timmy."
|
||||
mock_agent.run.return_value = mock_result
|
||||
mock_create.return_value = mock_agent
|
||||
|
||||
resp = serve_client.post(
|
||||
"/serve/chat",
|
||||
json={"message": "Who are you?"},
|
||||
)
|
||||
from timmy_serve.app import create_timmy_serve_app
|
||||
|
||||
app = create_timmy_serve_app()
|
||||
with TestClient(app) as client:
|
||||
resp = client.post(
|
||||
"/serve/chat",
|
||||
json={"message": "Who are you?"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["response"] == "I am Timmy."
|
||||
mock_agent.run.assert_called_once_with("Who are you?", stream=False)
|
||||
|
||||
@patch("timmy_serve.app.create_timmy")
|
||||
def test_agent_cached_at_startup(self, mock_create):
|
||||
"""Verify create_timmy is called once at startup, not per request."""
|
||||
mock_agent = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_result.content = "reply"
|
||||
mock_agent.run.return_value = mock_result
|
||||
mock_create.return_value = mock_agent
|
||||
|
||||
from timmy_serve.app import create_timmy_serve_app
|
||||
|
||||
app = create_timmy_serve_app()
|
||||
with TestClient(app) as client:
|
||||
# Two requests — create_timmy should only be called once (at startup)
|
||||
client.post("/serve/chat", json={"message": "hello"})
|
||||
client.post("/serve/chat", json={"message": "world"})
|
||||
mock_create.assert_called_once()
|
||||
|
||||
Reference in New Issue
Block a user