Compare commits

...

13 Commits

Author SHA1 Message Date
kimi
735bfc7820 refactor: break up create_timmy() into testable helpers
All checks were successful
Tests / lint (pull_request) Successful in 3s
Tests / test (pull_request) Successful in 1m16s
Extract three focused helpers from the 131-line create_timmy():
- _build_tools_list(): assembles toolkit + optional MCP servers
- _build_prompt(): builds system prompt with memory context
- _create_ollama_agent(): constructs the Agno Agent

create_timmy() now delegates to these helpers, making each
concern independently testable. Added 8 unit tests for the
new helpers.

Fixes #512

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 19:53:53 -04:00
7c823ab59c refactor: break up think_once() into testable helpers (#518)
All checks were successful
Tests / lint (push) Successful in 4s
Tests / test (push) Successful in 1m19s
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:43:26 -04:00
9f2728f529 refactor: break up lifespan() into testable helpers (#515)
All checks were successful
Tests / lint (push) Successful in 4s
Tests / test (push) Successful in 1m8s
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:30:32 -04:00
cd3dc5d989 refactor: break up CascadeRouter.complete() into focused helpers (#510)
All checks were successful
Tests / lint (push) Successful in 7s
Tests / test (push) Successful in 1m13s
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:24:36 -04:00
e4de539bf3 fix: extract ollama_url normalization into shared utility (#508)
All checks were successful
Tests / lint (push) Successful in 2s
Tests / test (push) Successful in 1m26s
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:18:22 -04:00
b2057f72e1 [loop-cycle] refactor: break up run_agentic_loop into testable helpers (#504) (#509)
All checks were successful
Tests / lint (push) Successful in 3s
Tests / test (push) Successful in 1m20s
2026-03-19 19:15:38 -04:00
5f52dd54c0 [loop-cycle-932] fix: add logging to bare except Exception blocks (#484) (#501)
All checks were successful
Tests / lint (push) Successful in 3s
Tests / test (push) Successful in 1m37s
2026-03-19 19:05:02 -04:00
9ceffd61d1 [loop-cycle-544] fix: use settings.ollama_url fallback in _call_ollama (#490) (#498)
All checks were successful
Tests / lint (push) Successful in 3s
Tests / test (push) Successful in 1m8s
2026-03-19 16:18:39 -04:00
015d858be5 fix: auto-detect issue number in cycle retro from git branch (#495)
All checks were successful
Tests / lint (push) Successful in 2s
Tests / test (push) Successful in 1m19s
## Summary
- `cycle_retro.py` now auto-detects issue number from the git branch name (e.g. `kimi/issue-492` → `492`) when `--issue` is not provided
- `backfill_retro.py` now skips the PR number suffix Gitea appends to titles so it does not confuse PR numbers with issue numbers
- Added tests for both fixes

Fixes #492

Co-authored-by: kimi <kimi@localhost>
Reviewed-on: http://localhost:3000/rockachopa/Timmy-time-dashboard/pulls/495
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 16:13:35 -04:00
b6d0b5f999 feat: epoch turnover notation for loopstat cycles ⟳WW.D:NNN (#496)
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Has been cancelled
2026-03-19 16:12:10 -04:00
d70e4f810a fix: use settings.ollama_url instead of hardcoded fallback in cascade router (#491)
All checks were successful
Tests / lint (push) Successful in 3s
Tests / test (push) Successful in 1m20s
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 16:02:20 -04:00
7f20742fcf fix: replace hardcoded secret placeholder in CSRF middleware docstring (#488)
All checks were successful
Tests / lint (push) Successful in 4s
Tests / test (push) Successful in 1m11s
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 15:52:29 -04:00
15eb7c3b45 [loop-cycle-538] refactor: remove dead airllm provider from cascade router (#459) (#481)
All checks were successful
Tests / lint (push) Successful in 3s
Tests / test (push) Successful in 1m28s
2026-03-19 15:44:10 -04:00
21 changed files with 1305 additions and 486 deletions

View File

@@ -54,19 +54,6 @@ providers:
context_window: 2048 context_window: 2048
capabilities: [text, vision, streaming] capabilities: [text, vision, streaming]
# Secondary: Local AirLLM (if installed)
- name: airllm-local
type: airllm
enabled: false # Enable if pip install airllm
priority: 2
models:
- name: 70b
default: true
capabilities: [text, tools, json, streaming]
- name: 8b
capabilities: [text, tools, json, streaming]
- name: 405b
capabilities: [text, tools, json, streaming]
# Tertiary: OpenAI (if API key available) # Tertiary: OpenAI (if API key available)
- name: openai-backup - name: openai-backup

View File

@@ -94,12 +94,17 @@ def extract_cycle_number(title: str) -> int | None:
return int(m.group(1)) if m else None return int(m.group(1)) if m else None
def extract_issue_number(title: str, body: str) -> int | None: def extract_issue_number(title: str, body: str, pr_number: int | None = None) -> int | None:
# Try body first (usually has "closes #N") """Extract the issue number from PR body/title, ignoring the PR number itself.
Gitea appends "(#N)" to PR titles where N is the PR number — skip that
so we don't confuse it with the linked issue.
"""
for text in [body or "", title]: for text in [body or "", title]:
m = ISSUE_RE.search(text) for m in ISSUE_RE.finditer(text):
if m: num = int(m.group(1))
return int(m.group(1)) if num != pr_number:
return num
return None return None
@@ -140,7 +145,7 @@ def main():
else: else:
cycle_counter = max(cycle_counter, cycle) cycle_counter = max(cycle_counter, cycle)
issue = extract_issue_number(title, body) issue = extract_issue_number(title, body, pr_number=pr_num)
issue_type = classify_pr(title, body) issue_type = classify_pr(title, body)
duration = estimate_duration(pr) duration = estimate_duration(pr)
diff = get_pr_diff_stats(token, pr_num) diff = get_pr_diff_stats(token, pr_num)

View File

@@ -4,11 +4,26 @@
Called after each cycle completes (success or failure). Called after each cycle completes (success or failure).
Appends a structured entry to .loop/retro/cycles.jsonl. Appends a structured entry to .loop/retro/cycles.jsonl.
EPOCH NOTATION (turnover system):
Each cycle carries a symbolic epoch tag alongside the raw integer:
⟳WW.D:NNN
⟳ turnover glyph — marks epoch-aware cycles
WW ISO week-of-year (0153)
D ISO weekday (1=Mon … 7=Sun)
NNN daily cycle counter, zero-padded, resets at midnight UTC
Example: ⟳12.3:042 — Week 12, Wednesday, 42nd cycle of the day.
The raw `cycle` integer is preserved for backward compatibility.
The `epoch` field carries the symbolic notation.
SUCCESS DEFINITION: SUCCESS DEFINITION:
A cycle is only "success" if BOTH conditions are met: A cycle is only "success" if BOTH conditions are met:
1. The hermes process exited cleanly (exit code 0) 1. The hermes process exited cleanly (exit code 0)
2. Main is green (smoke test passes on main after merge) 2. Main is green (smoke test passes on main after merge)
A cycle that merges a PR but leaves main red is a FAILURE. A cycle that merges a PR but leaves main red is a FAILURE.
The --main-green flag records the smoke test result. The --main-green flag records the smoke test result.
@@ -29,6 +44,8 @@ from __future__ import annotations
import argparse import argparse
import json import json
import re
import subprocess
import sys import sys
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
@@ -36,10 +53,68 @@ from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent REPO_ROOT = Path(__file__).resolve().parent.parent
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl" RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json" SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json"
EPOCH_COUNTER_FILE = REPO_ROOT / ".loop" / "retro" / ".epoch_counter"
# How many recent entries to include in rolling summary # How many recent entries to include in rolling summary
SUMMARY_WINDOW = 50 SUMMARY_WINDOW = 50
# Branch patterns that encode an issue number, e.g. kimi/issue-492
BRANCH_ISSUE_RE = re.compile(r"issue[/-](\d+)", re.IGNORECASE)
def detect_issue_from_branch() -> int | None:
"""Try to extract an issue number from the current git branch name."""
try:
branch = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
stderr=subprocess.DEVNULL,
text=True,
).strip()
except (subprocess.CalledProcessError, FileNotFoundError):
return None
m = BRANCH_ISSUE_RE.search(branch)
return int(m.group(1)) if m else None
# ── Epoch turnover ────────────────────────────────────────────────────────
def _epoch_tag(now: datetime | None = None) -> tuple[str, dict]:
"""Generate the symbolic epoch tag and advance the daily counter.
Returns (epoch_string, epoch_parts) where epoch_parts is a dict with
week, weekday, daily_n for structured storage.
The daily counter persists in .epoch_counter as a two-line file:
line 1: ISO date (YYYY-MM-DD) of the current epoch day
line 2: integer count
When the date rolls over, the counter resets to 1.
"""
if now is None:
now = datetime.now(timezone.utc)
iso_cal = now.isocalendar() # (year, week, weekday)
week = iso_cal[1]
weekday = iso_cal[2]
today_str = now.strftime("%Y-%m-%d")
# Read / reset daily counter
daily_n = 1
EPOCH_COUNTER_FILE.parent.mkdir(parents=True, exist_ok=True)
if EPOCH_COUNTER_FILE.exists():
try:
lines = EPOCH_COUNTER_FILE.read_text().strip().splitlines()
if len(lines) == 2 and lines[0] == today_str:
daily_n = int(lines[1]) + 1
except (ValueError, IndexError):
pass # corrupt file — reset
# Persist
EPOCH_COUNTER_FILE.write_text(f"{today_str}\n{daily_n}\n")
tag = f"\u27f3{week:02d}.{weekday}:{daily_n:03d}"
parts = {"week": week, "weekday": weekday, "daily_n": daily_n}
return tag, parts
def parse_args() -> argparse.Namespace: def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Log a cycle retrospective") p = argparse.ArgumentParser(description="Log a cycle retrospective")
@@ -123,8 +198,30 @@ def update_summary() -> None:
issue_failures[e["issue"]] = issue_failures.get(e["issue"], 0) + 1 issue_failures[e["issue"]] = issue_failures.get(e["issue"], 0) + 1
quarantine_candidates = {k: v for k, v in issue_failures.items() if v >= 2} quarantine_candidates = {k: v for k, v in issue_failures.items() if v >= 2}
# Epoch turnover stats — cycles per week/day from epoch-tagged entries
epoch_entries = [e for e in recent if e.get("epoch")]
by_week: dict[int, int] = {}
by_weekday: dict[int, int] = {}
for e in epoch_entries:
w = e.get("epoch_week")
d = e.get("epoch_weekday")
if w is not None:
by_week[w] = by_week.get(w, 0) + 1
if d is not None:
by_weekday[d] = by_weekday.get(d, 0) + 1
# Current epoch — latest entry's epoch tag
current_epoch = epoch_entries[-1].get("epoch", "") if epoch_entries else ""
# Weekday names for display
weekday_glyphs = {1: "Mon", 2: "Tue", 3: "Wed", 4: "Thu",
5: "Fri", 6: "Sat", 7: "Sun"}
by_weekday_named = {weekday_glyphs.get(k, str(k)): v
for k, v in sorted(by_weekday.items())}
summary = { summary = {
"updated_at": datetime.now(timezone.utc).isoformat(), "updated_at": datetime.now(timezone.utc).isoformat(),
"current_epoch": current_epoch,
"window": len(recent), "window": len(recent),
"measured_cycles": len(measured), "measured_cycles": len(measured),
"total_cycles": len(entries), "total_cycles": len(entries),
@@ -136,9 +233,12 @@ def update_summary() -> None:
"total_lines_removed": sum(e.get("lines_removed", 0) for e in recent), "total_lines_removed": sum(e.get("lines_removed", 0) for e in recent),
"total_prs_merged": sum(1 for e in recent if e.get("pr")), "total_prs_merged": sum(1 for e in recent if e.get("pr")),
"by_type": type_stats, "by_type": type_stats,
"by_week": dict(sorted(by_week.items())),
"by_weekday": by_weekday_named,
"quarantine_candidates": quarantine_candidates, "quarantine_candidates": quarantine_candidates,
"recent_failures": [ "recent_failures": [
{"cycle": e["cycle"], "issue": e.get("issue"), "reason": e.get("reason", "")} {"cycle": e["cycle"], "epoch": e.get("epoch", ""),
"issue": e.get("issue"), "reason": e.get("reason", "")}
for e in failures[-5:] for e in failures[-5:]
], ],
} }
@@ -149,6 +249,10 @@ def update_summary() -> None:
def main() -> None: def main() -> None:
args = parse_args() args = parse_args()
# Auto-detect issue from branch when not explicitly provided
if args.issue is None:
args.issue = detect_issue_from_branch()
# Reject idle cycles — no issue and no duration means nothing happened # Reject idle cycles — no issue and no duration means nothing happened
if not args.issue and args.duration == 0: if not args.issue and args.duration == 0:
print(f"[retro] Cycle {args.cycle} skipped — idle (no issue, no duration)") print(f"[retro] Cycle {args.cycle} skipped — idle (no issue, no duration)")
@@ -157,9 +261,17 @@ def main() -> None:
# A cycle is only truly successful if hermes exited clean AND main is green # A cycle is only truly successful if hermes exited clean AND main is green
truly_success = args.success and args.main_green truly_success = args.success and args.main_green
# Generate epoch turnover tag
now = datetime.now(timezone.utc)
epoch_tag, epoch_parts = _epoch_tag(now)
entry = { entry = {
"timestamp": datetime.now(timezone.utc).isoformat(), "timestamp": now.isoformat(),
"cycle": args.cycle, "cycle": args.cycle,
"epoch": epoch_tag,
"epoch_week": epoch_parts["week"],
"epoch_weekday": epoch_parts["weekday"],
"epoch_daily_n": epoch_parts["daily_n"],
"issue": args.issue, "issue": args.issue,
"type": args.type, "type": args.type,
"success": truly_success, "success": truly_success,
@@ -184,7 +296,7 @@ def main() -> None:
update_summary() update_summary()
status = "✓ SUCCESS" if args.success else "✗ FAILURE" status = "✓ SUCCESS" if args.success else "✗ FAILURE"
print(f"[retro] Cycle {args.cycle} {status}", end="") print(f"[retro] {epoch_tag} Cycle {args.cycle} {status}", end="")
if args.issue: if args.issue:
print(f" (#{args.issue} {args.type})", end="") print(f" (#{args.issue} {args.type})", end="")
if args.duration: if args.duration:

407
scripts/loop_introspect.py Normal file
View File

@@ -0,0 +1,407 @@
#!/usr/bin/env python3
"""Loop introspection — the self-improvement engine.
Analyzes retro data across time windows to detect trends, extract patterns,
and produce structured recommendations. Output is consumed by deep_triage
and injected into the loop prompt context.
This is the piece that closes the feedback loop:
cycle_retro → introspect → deep_triage → loop behavior changes
Run: python3 scripts/loop_introspect.py
Output: .loop/retro/insights.json (structured insights + recommendations)
Prints human-readable summary to stdout.
Called by: deep_triage.sh (before the LLM triage), timmy-loop.sh (every 50 cycles)
"""
from __future__ import annotations
import json
import sys
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
CYCLES_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
DEEP_TRIAGE_FILE = REPO_ROOT / ".loop" / "retro" / "deep-triage.jsonl"
TRIAGE_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json"
INSIGHTS_FILE = REPO_ROOT / ".loop" / "retro" / "insights.json"
# ── Helpers ──────────────────────────────────────────────────────────────
def load_jsonl(path: Path) -> list[dict]:
"""Load a JSONL file, skipping bad lines."""
if not path.exists():
return []
entries = []
for line in path.read_text().strip().splitlines():
try:
entries.append(json.loads(line))
except (json.JSONDecodeError, ValueError):
continue
return entries
def parse_ts(ts_str: str) -> datetime | None:
"""Parse an ISO timestamp, tolerating missing tz."""
if not ts_str:
return None
try:
dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except (ValueError, TypeError):
return None
def window(entries: list[dict], days: int) -> list[dict]:
"""Filter entries to the last N days."""
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
result = []
for e in entries:
ts = parse_ts(e.get("timestamp", ""))
if ts and ts >= cutoff:
result.append(e)
return result
# ── Analysis functions ───────────────────────────────────────────────────
def compute_trends(cycles: list[dict]) -> dict:
"""Compare recent window (last 7d) vs older window (7-14d ago)."""
recent = window(cycles, 7)
older = window(cycles, 14)
# Remove recent from older to get the 7-14d window
recent_set = {(e.get("cycle"), e.get("timestamp")) for e in recent}
older = [e for e in older if (e.get("cycle"), e.get("timestamp")) not in recent_set]
def stats(entries):
if not entries:
return {"count": 0, "success_rate": None, "avg_duration": None,
"lines_net": 0, "prs_merged": 0}
successes = sum(1 for e in entries if e.get("success"))
durations = [e["duration"] for e in entries if e.get("duration", 0) > 0]
return {
"count": len(entries),
"success_rate": round(successes / len(entries), 3) if entries else None,
"avg_duration": round(sum(durations) / len(durations)) if durations else None,
"lines_net": sum(e.get("lines_added", 0) - e.get("lines_removed", 0) for e in entries),
"prs_merged": sum(1 for e in entries if e.get("pr")),
}
recent_stats = stats(recent)
older_stats = stats(older)
trend = {
"recent_7d": recent_stats,
"previous_7d": older_stats,
"velocity_change": None,
"success_rate_change": None,
"duration_change": None,
}
if recent_stats["count"] and older_stats["count"]:
trend["velocity_change"] = recent_stats["count"] - older_stats["count"]
if recent_stats["success_rate"] is not None and older_stats["success_rate"] is not None:
trend["success_rate_change"] = round(
recent_stats["success_rate"] - older_stats["success_rate"], 3
)
if recent_stats["avg_duration"] is not None and older_stats["avg_duration"] is not None:
trend["duration_change"] = recent_stats["avg_duration"] - older_stats["avg_duration"]
return trend
def type_analysis(cycles: list[dict]) -> dict:
"""Per-type success rates and durations."""
by_type: dict[str, list[dict]] = defaultdict(list)
for c in cycles:
by_type[c.get("type", "unknown")].append(c)
result = {}
for t, entries in by_type.items():
durations = [e["duration"] for e in entries if e.get("duration", 0) > 0]
successes = sum(1 for e in entries if e.get("success"))
result[t] = {
"count": len(entries),
"success_rate": round(successes / len(entries), 3) if entries else 0,
"avg_duration": round(sum(durations) / len(durations)) if durations else 0,
"max_duration": max(durations) if durations else 0,
}
return result
def repeat_failures(cycles: list[dict]) -> list[dict]:
"""Issues that have failed multiple times — quarantine candidates."""
failures: dict[int, list] = defaultdict(list)
for c in cycles:
if not c.get("success") and c.get("issue"):
failures[c["issue"]].append({
"cycle": c.get("cycle"),
"reason": c.get("reason", ""),
"duration": c.get("duration", 0),
})
# Only issues with 2+ failures
return [
{"issue": k, "failure_count": len(v), "attempts": v}
for k, v in sorted(failures.items(), key=lambda x: -len(x[1]))
if len(v) >= 2
]
def duration_outliers(cycles: list[dict], threshold_multiple: float = 3.0) -> list[dict]:
"""Cycles that took way longer than average — something went wrong."""
durations = [c["duration"] for c in cycles if c.get("duration", 0) > 0]
if len(durations) < 5:
return []
avg = sum(durations) / len(durations)
threshold = avg * threshold_multiple
outliers = []
for c in cycles:
dur = c.get("duration", 0)
if dur > threshold:
outliers.append({
"cycle": c.get("cycle"),
"issue": c.get("issue"),
"type": c.get("type"),
"duration": dur,
"avg_duration": round(avg),
"multiple": round(dur / avg, 1) if avg > 0 else 0,
"reason": c.get("reason", ""),
})
return outliers
def triage_effectiveness(deep_triages: list[dict]) -> dict:
"""How well is the deep triage performing?"""
if not deep_triages:
return {"runs": 0, "note": "No deep triage data yet"}
total_reviewed = sum(d.get("issues_reviewed", 0) for d in deep_triages)
total_refined = sum(len(d.get("issues_refined", [])) for d in deep_triages)
total_created = sum(len(d.get("issues_created", [])) for d in deep_triages)
total_closed = sum(len(d.get("issues_closed", [])) for d in deep_triages)
timmy_available = sum(1 for d in deep_triages if d.get("timmy_available"))
# Extract Timmy's feedback themes
timmy_themes = []
for d in deep_triages:
fb = d.get("timmy_feedback", "")
if fb:
timmy_themes.append(fb[:200])
return {
"runs": len(deep_triages),
"total_reviewed": total_reviewed,
"total_refined": total_refined,
"total_created": total_created,
"total_closed": total_closed,
"timmy_consultation_rate": round(timmy_available / len(deep_triages), 2),
"timmy_recent_feedback": timmy_themes[-1] if timmy_themes else "",
"timmy_feedback_history": timmy_themes,
}
def generate_recommendations(
trends: dict,
types: dict,
repeats: list,
outliers: list,
triage_eff: dict,
) -> list[dict]:
"""Produce actionable recommendations from the analysis."""
recs = []
# 1. Success rate declining?
src = trends.get("success_rate_change")
if src is not None and src < -0.1:
recs.append({
"severity": "high",
"category": "reliability",
"finding": f"Success rate dropped {abs(src)*100:.0f}pp in the last 7 days",
"recommendation": "Review recent failures. Are issues poorly scoped? "
"Is main unstable? Check if triage is producing bad work items.",
})
# 2. Velocity dropping?
vc = trends.get("velocity_change")
if vc is not None and vc < -5:
recs.append({
"severity": "medium",
"category": "throughput",
"finding": f"Velocity dropped by {abs(vc)} cycles vs previous week",
"recommendation": "Check for loop stalls, long-running cycles, or queue starvation.",
})
# 3. Duration creep?
dc = trends.get("duration_change")
if dc is not None and dc > 120: # 2+ minutes longer
recs.append({
"severity": "medium",
"category": "efficiency",
"finding": f"Average cycle duration increased by {dc}s vs previous week",
"recommendation": "Issues may be growing in scope. Enforce tighter decomposition "
"in deep triage. Check if tests are getting slower.",
})
# 4. Type-specific problems
for t, info in types.items():
if info["count"] >= 3 and info["success_rate"] < 0.5:
recs.append({
"severity": "high",
"category": "type_reliability",
"finding": f"'{t}' issues fail {(1-info['success_rate'])*100:.0f}% of the time "
f"({info['count']} attempts)",
"recommendation": f"'{t}' issues need better scoping or different approach. "
f"Consider: tighter acceptance criteria, smaller scope, "
f"or delegating to Kimi with more context.",
})
if info["avg_duration"] > 600 and info["count"] >= 3: # >10 min avg
recs.append({
"severity": "medium",
"category": "type_efficiency",
"finding": f"'{t}' issues average {info['avg_duration']//60}m{info['avg_duration']%60}s "
f"(max {info['max_duration']//60}m)",
"recommendation": f"Break '{t}' issues into smaller pieces. Target <5 min per cycle.",
})
# 5. Repeat failures
for rf in repeats[:3]:
recs.append({
"severity": "high",
"category": "repeat_failure",
"finding": f"Issue #{rf['issue']} has failed {rf['failure_count']} times",
"recommendation": "Quarantine or rewrite this issue. Repeated failure = "
"bad scope or missing prerequisite.",
})
# 6. Outliers
if len(outliers) > 2:
recs.append({
"severity": "medium",
"category": "outliers",
"finding": f"{len(outliers)} cycles took {outliers[0].get('multiple', '?')}x+ "
f"longer than average",
"recommendation": "Long cycles waste resources. Add timeout enforcement or "
"break complex issues earlier.",
})
# 7. Code growth
recent = trends.get("recent_7d", {})
net = recent.get("lines_net", 0)
if net > 500:
recs.append({
"severity": "low",
"category": "code_health",
"finding": f"Net +{net} lines added in the last 7 days",
"recommendation": "Lines of code is a liability. Balance feature work with "
"refactoring. Target net-zero or negative line growth.",
})
# 8. Triage health
if triage_eff.get("runs", 0) == 0:
recs.append({
"severity": "high",
"category": "triage",
"finding": "Deep triage has never run",
"recommendation": "Enable deep triage (every 20 cycles). The loop needs "
"LLM-driven issue refinement to stay effective.",
})
# No recommendations = things are healthy
if not recs:
recs.append({
"severity": "info",
"category": "health",
"finding": "No significant issues detected",
"recommendation": "System is healthy. Continue current patterns.",
})
return recs
# ── Main ─────────────────────────────────────────────────────────────────
def main() -> None:
cycles = load_jsonl(CYCLES_FILE)
deep_triages = load_jsonl(DEEP_TRIAGE_FILE)
if not cycles:
print("[introspect] No cycle data found. Nothing to analyze.")
return
# Run all analyses
trends = compute_trends(cycles)
types = type_analysis(cycles)
repeats = repeat_failures(cycles)
outliers = duration_outliers(cycles)
triage_eff = triage_effectiveness(deep_triages)
recommendations = generate_recommendations(trends, types, repeats, outliers, triage_eff)
insights = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"total_cycles_analyzed": len(cycles),
"trends": trends,
"by_type": types,
"repeat_failures": repeats[:5],
"duration_outliers": outliers[:5],
"triage_effectiveness": triage_eff,
"recommendations": recommendations,
}
# Write insights
INSIGHTS_FILE.parent.mkdir(parents=True, exist_ok=True)
INSIGHTS_FILE.write_text(json.dumps(insights, indent=2) + "\n")
# Current epoch from latest entry
latest_epoch = ""
for c in reversed(cycles):
if c.get("epoch"):
latest_epoch = c["epoch"]
break
# Human-readable output
header = f"[introspect] Analyzed {len(cycles)} cycles"
if latest_epoch:
header += f" · current epoch: {latest_epoch}"
print(header)
print(f"\n TRENDS (7d vs previous 7d):")
r7 = trends["recent_7d"]
p7 = trends["previous_7d"]
print(f" Cycles: {r7['count']:>3d} (was {p7['count']})")
if r7["success_rate"] is not None:
arrow = "" if (trends["success_rate_change"] or 0) > 0 else "" if (trends["success_rate_change"] or 0) < 0 else ""
print(f" Success rate: {r7['success_rate']*100:>4.0f}% {arrow}")
if r7["avg_duration"] is not None:
print(f" Avg duration: {r7['avg_duration']//60}m{r7['avg_duration']%60:02d}s")
print(f" PRs merged: {r7['prs_merged']:>3d} (was {p7['prs_merged']})")
print(f" Lines net: {r7['lines_net']:>+5d}")
print(f"\n BY TYPE:")
for t, info in sorted(types.items(), key=lambda x: -x[1]["count"]):
print(f" {t:12s} n={info['count']:>2d} "
f"ok={info['success_rate']*100:>3.0f}% "
f"avg={info['avg_duration']//60}m{info['avg_duration']%60:02d}s")
if repeats:
print(f"\n REPEAT FAILURES:")
for rf in repeats[:3]:
print(f" #{rf['issue']} failed {rf['failure_count']}x")
print(f"\n RECOMMENDATIONS ({len(recommendations)}):")
for i, rec in enumerate(recommendations, 1):
sev = {"high": "🔴", "medium": "🟡", "low": "🟢", "info": " "}.get(rec["severity"], "?")
print(f" {sev} {rec['finding']}")
print(f"{rec['recommendation']}")
print(f"\n Written to: {INSIGHTS_FILE}")
if __name__ == "__main__":
main()

View File

@@ -10,6 +10,11 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
APP_START_TIME: _datetime = _datetime.now(UTC) APP_START_TIME: _datetime = _datetime.now(UTC)
def normalize_ollama_url(url: str) -> str:
"""Replace localhost with 127.0.0.1 to avoid IPv6 resolution delays."""
return url.replace("localhost", "127.0.0.1")
class Settings(BaseSettings): class Settings(BaseSettings):
"""Central configuration — all env-var access goes through this class.""" """Central configuration — all env-var access goes through this class."""
@@ -19,6 +24,11 @@ class Settings(BaseSettings):
# Ollama host — override with OLLAMA_URL env var or .env file # Ollama host — override with OLLAMA_URL env var or .env file
ollama_url: str = "http://localhost:11434" ollama_url: str = "http://localhost:11434"
@property
def normalized_ollama_url(self) -> str:
"""Return ollama_url with localhost replaced by 127.0.0.1."""
return normalize_ollama_url(self.ollama_url)
# LLM model passed to Agno/Ollama — override with OLLAMA_MODEL # LLM model passed to Agno/Ollama — override with OLLAMA_MODEL
# qwen3:30b is the primary model — better reasoning and tool calling # qwen3:30b is the primary model — better reasoning and tool calling
# than llama3.1:8b-instruct while still running locally on modest hardware. # than llama3.1:8b-instruct while still running locally on modest hardware.
@@ -392,7 +402,7 @@ def check_ollama_model_available(model_name: str) -> bool:
import json import json
import urllib.request import urllib.request
url = settings.ollama_url.replace("localhost", "127.0.0.1") url = settings.normalized_ollama_url
req = urllib.request.Request( req = urllib.request.Request(
f"{url}/api/tags", f"{url}/api/tags",
method="GET", method="GET",

View File

@@ -329,33 +329,35 @@ async def _discord_token_watcher() -> None:
logger.warning("Discord auto-start failed: %s", exc) logger.warning("Discord auto-start failed: %s", exc)
@asynccontextmanager def _startup_init() -> None:
async def lifespan(app: FastAPI): """Validate config and enable event persistence."""
"""Application lifespan manager with non-blocking startup."""
# Validate security config (no-op in test mode)
from config import validate_startup from config import validate_startup
validate_startup() validate_startup()
# Enable event persistence (unified EventBus + swarm event_log)
from infrastructure.events.bus import init_event_bus_persistence from infrastructure.events.bus import init_event_bus_persistence
init_event_bus_persistence() init_event_bus_persistence()
# Create all background tasks without waiting for them
briefing_task = asyncio.create_task(_briefing_scheduler())
thinking_task = asyncio.create_task(_thinking_scheduler())
loop_qa_task = asyncio.create_task(_loop_qa_scheduler())
presence_task = asyncio.create_task(_presence_watcher())
# Initialize Spark Intelligence engine
from spark.engine import get_spark_engine from spark.engine import get_spark_engine
if get_spark_engine().enabled: if get_spark_engine().enabled:
logger.info("Spark Intelligence active — event capture enabled") logger.info("Spark Intelligence active — event capture enabled")
# Auto-prune old vector store memories on startup
def _startup_background_tasks() -> list[asyncio.Task]:
"""Spawn all recurring background tasks (non-blocking)."""
return [
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
]
def _startup_pruning() -> None:
"""Auto-prune old memories, thoughts, and events on startup."""
if settings.memory_prune_days > 0: if settings.memory_prune_days > 0:
try: try:
from timmy.memory_system import prune_memories from timmy.memory_system import prune_memories
@@ -373,7 +375,6 @@ async def lifespan(app: FastAPI):
except Exception as exc: except Exception as exc:
logger.debug("Memory auto-prune skipped: %s", exc) logger.debug("Memory auto-prune skipped: %s", exc)
# Auto-prune old thoughts on startup
if settings.thoughts_prune_days > 0: if settings.thoughts_prune_days > 0:
try: try:
from timmy.thinking import thinking_engine from timmy.thinking import thinking_engine
@@ -391,7 +392,6 @@ async def lifespan(app: FastAPI):
except Exception as exc: except Exception as exc:
logger.debug("Thought auto-prune skipped: %s", exc) logger.debug("Thought auto-prune skipped: %s", exc)
# Auto-prune old system events on startup
if settings.events_prune_days > 0: if settings.events_prune_days > 0:
try: try:
from swarm.event_log import prune_old_events from swarm.event_log import prune_old_events
@@ -409,7 +409,6 @@ async def lifespan(app: FastAPI):
except Exception as exc: except Exception as exc:
logger.debug("Event auto-prune skipped: %s", exc) logger.debug("Event auto-prune skipped: %s", exc)
# Warn if memory vault exceeds size limit
if settings.memory_vault_max_mb > 0: if settings.memory_vault_max_mb > 0:
try: try:
vault_path = Path(settings.repo_root) / "memory" / "notes" vault_path = Path(settings.repo_root) / "memory" / "notes"
@@ -425,37 +424,18 @@ async def lifespan(app: FastAPI):
except Exception as exc: except Exception as exc:
logger.debug("Vault size check skipped: %s", exc) logger.debug("Vault size check skipped: %s", exc)
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state) async def _shutdown_cleanup(
await workshop_heartbeat.start() bg_tasks: list[asyncio.Task],
workshop_heartbeat,
# Start chat integrations in background ) -> None:
chat_task = asyncio.create_task(_start_chat_integrations_background()) """Stop chat bots, MCP sessions, heartbeat, and cancel background tasks."""
# Register session logger with error capture (breaks infrastructure → timmy circular dep)
try:
from infrastructure.error_capture import register_error_recorder
from timmy.session_logger import get_session_logger
register_error_recorder(get_session_logger().record_error)
except Exception:
pass
logger.info("✓ Dashboard ready for requests")
yield
# Cleanup on shutdown
from integrations.chat_bridge.vendors.discord import discord_bot from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.telegram_bot.bot import telegram_bot from integrations.telegram_bot.bot import telegram_bot
await discord_bot.stop() await discord_bot.stop()
await telegram_bot.stop() await telegram_bot.stop()
# Close MCP tool server sessions
try: try:
from timmy.mcp_tools import close_mcp_sessions from timmy.mcp_tools import close_mcp_sessions
@@ -465,13 +445,42 @@ async def lifespan(app: FastAPI):
await workshop_heartbeat.stop() await workshop_heartbeat.stop()
for task in [briefing_task, thinking_task, chat_task, loop_qa_task, presence_task]: for task in bg_tasks:
if task: task.cancel()
task.cancel() try:
try: await task
await task except asyncio.CancelledError:
except asyncio.CancelledError: pass
pass
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
_startup_init()
bg_tasks = _startup_background_tasks()
_startup_pruning()
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
await workshop_heartbeat.start()
# Register session logger with error capture
try:
from infrastructure.error_capture import register_error_recorder
from timmy.session_logger import get_session_logger
register_error_recorder(get_session_logger().record_error)
except Exception:
logger.debug("Failed to register error recorder")
logger.info("✓ Dashboard ready for requests")
yield
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
app = FastAPI( app = FastAPI(

View File

@@ -100,7 +100,7 @@ class CSRFMiddleware(BaseHTTPMiddleware):
... ...
Usage: Usage:
app.add_middleware(CSRFMiddleware, secret="your-secret-key") app.add_middleware(CSRFMiddleware, secret=settings.csrf_secret)
Attributes: Attributes:
secret: Secret key for token signing (optional, for future use). secret: Secret key for token signing (optional, for future use).

View File

@@ -91,7 +91,7 @@ async def chat_agent(request: Request, message: str = Form(...)):
thinking_engine.record_user_input() thinking_engine.record_user_input()
except Exception: except Exception:
pass logger.debug("Failed to record user input for thinking engine")
timestamp = datetime.now().strftime("%H:%M:%S") timestamp = datetime.now().strftime("%H:%M:%S")
response_text = None response_text = None

View File

@@ -85,7 +85,7 @@ async def api_chat(request: Request):
thinking_engine.record_user_input() thinking_engine.record_user_input()
except Exception: except Exception:
pass logger.debug("Failed to record user input for thinking engine")
timestamp = datetime.now().strftime("%H:%M:%S") timestamp = datetime.now().strftime("%H:%M:%S")

View File

@@ -65,7 +65,7 @@ def _check_ollama_sync() -> DependencyStatus:
try: try:
import urllib.request import urllib.request
url = settings.ollama_url.replace("localhost", "127.0.0.1") url = settings.normalized_ollama_url
req = urllib.request.Request( req = urllib.request.Request(
f"{url}/api/tags", f"{url}/api/tags",
method="GET", method="GET",

View File

@@ -166,7 +166,7 @@ async def api_briefing_status():
if cached: if cached:
last_generated = cached.generated_at.isoformat() last_generated = cached.generated_at.isoformat()
except Exception: except Exception:
pass logger.debug("Failed to read briefing cache")
return JSONResponse( return JSONResponse(
{ {
@@ -190,6 +190,7 @@ async def api_memory_status():
stats = get_memory_stats() stats = get_memory_stats()
indexed_files = stats.get("total_entries", 0) indexed_files = stats.get("total_entries", 0)
except Exception: except Exception:
logger.debug("Failed to get memory stats")
indexed_files = 0 indexed_files = 0
return JSONResponse( return JSONResponse(
@@ -215,7 +216,7 @@ async def api_swarm_status():
).fetchone() ).fetchone()
pending_tasks = row["cnt"] if row else 0 pending_tasks = row["cnt"] if row else 0
except Exception: except Exception:
pass logger.debug("Failed to count pending tasks")
return JSONResponse( return JSONResponse(
{ {

View File

@@ -221,7 +221,7 @@ async def _heartbeat(websocket: WebSocket) -> None:
await asyncio.sleep(_HEARTBEAT_INTERVAL) await asyncio.sleep(_HEARTBEAT_INTERVAL)
await websocket.send_text(json.dumps({"type": "ping"})) await websocket.send_text(json.dumps({"type": "ping"}))
except Exception: except Exception:
pass # connection gone — receive loop will clean up logger.debug("Heartbeat stopped — connection gone")
@router.websocket("/ws") @router.websocket("/ws")
@@ -250,7 +250,7 @@ async def world_ws(websocket: WebSocket) -> None:
raw = await websocket.receive_text() raw = await websocket.receive_text()
await _handle_client_message(raw) await _handle_client_message(raw)
except Exception: except Exception:
pass logger.debug("WebSocket receive loop ended")
finally: finally:
ping_task.cancel() ping_task.cancel()
if websocket in _ws_clients: if websocket in _ws_clients:
@@ -265,6 +265,7 @@ async def _broadcast(message: str) -> None:
try: try:
await ws.send_text(message) await ws.send_text(message)
except Exception: except Exception:
logger.debug("Pruning dead WebSocket client")
dead.append(ws) dead.append(ws)
for ws in dead: for ws in dead:
if ws in _ws_clients: if ws in _ws_clients:
@@ -340,7 +341,7 @@ async def _bark_and_broadcast(visitor_text: str) -> None:
pip_familiar.on_event("visitor_spoke") pip_familiar.on_event("visitor_spoke")
except Exception: except Exception:
pass # Pip is optional logger.debug("Pip familiar notification failed (optional)")
_refresh_ground(visitor_text) _refresh_ground(visitor_text)
_tick_commitments() _tick_commitments()

View File

@@ -13,7 +13,7 @@ import logging
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import Enum, auto from enum import Enum, auto
from config import settings from config import normalize_ollama_url, settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -307,7 +307,7 @@ class MultiModalManager:
import json import json
import urllib.request import urllib.request
url = self.ollama_url.replace("localhost", "127.0.0.1") url = normalize_ollama_url(self.ollama_url)
req = urllib.request.Request( req = urllib.request.Request(
f"{url}/api/tags", f"{url}/api/tags",
method="GET", method="GET",
@@ -462,7 +462,7 @@ class MultiModalManager:
logger.info("Pulling model: %s", model_name) logger.info("Pulling model: %s", model_name)
url = self.ollama_url.replace("localhost", "127.0.0.1") url = normalize_ollama_url(self.ollama_url)
req = urllib.request.Request( req = urllib.request.Request(
f"{url}/api/pull", f"{url}/api/pull",
method="POST", method="POST",

View File

@@ -18,6 +18,8 @@ from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from config import settings
try: try:
import yaml import yaml
except ImportError: except ImportError:
@@ -100,7 +102,7 @@ class Provider:
"""LLM provider configuration and state.""" """LLM provider configuration and state."""
name: str name: str
type: str # ollama, openai, anthropic, airllm type: str # ollama, openai, anthropic
enabled: bool enabled: bool
priority: int priority: int
url: str | None = None url: str | None = None
@@ -301,22 +303,13 @@ class CascadeRouter:
# Can't check without requests, assume available # Can't check without requests, assume available
return True return True
try: try:
url = provider.url or "http://localhost:11434" url = provider.url or settings.ollama_url
response = requests.get(f"{url}/api/tags", timeout=5) response = requests.get(f"{url}/api/tags", timeout=5)
return response.status_code == 200 return response.status_code == 200
except Exception as exc: except Exception as exc:
logger.debug("Ollama provider check error: %s", exc) logger.debug("Ollama provider check error: %s", exc)
return False return False
elif provider.type == "airllm":
# Check if airllm is installed
try:
import importlib.util
return importlib.util.find_spec("airllm") is not None
except (ImportError, ModuleNotFoundError):
return False
elif provider.type in ("openai", "anthropic", "grok"): elif provider.type in ("openai", "anthropic", "grok"):
# Check if API key is set # Check if API key is set
return provider.api_key is not None and provider.api_key != "" return provider.api_key is not None and provider.api_key != ""
@@ -395,6 +388,101 @@ class CascadeRouter:
return None return None
def _select_model(
self, provider: Provider, model: str | None, content_type: ContentType
) -> tuple[str | None, bool]:
"""Select the best model for the request, with vision fallback.
Returns:
Tuple of (selected_model, is_fallback_model).
"""
selected_model = model or provider.get_default_model()
is_fallback = False
if content_type != ContentType.TEXT and selected_model:
if provider.type == "ollama" and self._mm_manager:
from infrastructure.models.multimodal import ModelCapability
if content_type == ContentType.VISION:
supports = self._mm_manager.model_supports(
selected_model, ModelCapability.VISION
)
if not supports:
fallback = self._get_fallback_model(provider, selected_model, content_type)
if fallback:
logger.info(
"Model %s doesn't support vision, falling back to %s",
selected_model,
fallback,
)
selected_model = fallback
is_fallback = True
else:
logger.warning(
"No vision-capable model found on %s, trying anyway",
provider.name,
)
return selected_model, is_fallback
async def _attempt_with_retry(
self,
provider: Provider,
messages: list[dict],
model: str | None,
temperature: float,
max_tokens: int | None,
content_type: ContentType,
) -> dict:
"""Try a provider with retries, returning the result dict.
Raises:
RuntimeError: If all retry attempts fail.
Returns error strings collected during retries via the exception message.
"""
errors: list[str] = []
for attempt in range(self.config.max_retries_per_provider):
try:
return await self._try_provider(
provider=provider,
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
except Exception as exc:
error_msg = str(exc)
logger.warning(
"Provider %s attempt %d failed: %s",
provider.name,
attempt + 1,
error_msg,
)
errors.append(f"{provider.name}: {error_msg}")
if attempt < self.config.max_retries_per_provider - 1:
await asyncio.sleep(self.config.retry_delay_seconds)
raise RuntimeError("; ".join(errors))
def _is_provider_available(self, provider: Provider) -> bool:
"""Check if a provider should be tried (enabled + circuit breaker)."""
if not provider.enabled:
logger.debug("Skipping %s (disabled)", provider.name)
return False
if provider.status == ProviderStatus.UNHEALTHY:
if self._can_close_circuit(provider):
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
logger.info("Circuit breaker half-open for %s", provider.name)
else:
logger.debug("Skipping %s (circuit open)", provider.name)
return False
return True
async def complete( async def complete(
self, self,
messages: list[dict], messages: list[dict],
@@ -421,7 +509,6 @@ class CascadeRouter:
Raises: Raises:
RuntimeError: If all providers fail RuntimeError: If all providers fail
""" """
# Detect content type for multi-modal routing
content_type = self._detect_content_type(messages) content_type = self._detect_content_type(messages)
if content_type != ContentType.TEXT: if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value) logger.debug("Detected %s content, selecting appropriate model", content_type.value)
@@ -429,93 +516,34 @@ class CascadeRouter:
errors = [] errors = []
for provider in self.providers: for provider in self.providers:
# Skip disabled providers if not self._is_provider_available(provider):
if not provider.enabled:
logger.debug("Skipping %s (disabled)", provider.name)
continue continue
# Skip unhealthy providers (circuit breaker) selected_model, is_fallback_model = self._select_model(provider, model, content_type)
if provider.status == ProviderStatus.UNHEALTHY:
# Check if circuit breaker can close
if self._can_close_circuit(provider):
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
logger.info("Circuit breaker half-open for %s", provider.name)
else:
logger.debug("Skipping %s (circuit open)", provider.name)
continue
# Determine which model to use try:
selected_model = model or provider.get_default_model() result = await self._attempt_with_retry(
is_fallback_model = False provider,
messages,
selected_model,
temperature,
max_tokens,
content_type,
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
continue
# For non-text content, check if model supports it self._record_success(provider, result.get("latency_ms", 0))
if content_type != ContentType.TEXT and selected_model: return {
if provider.type == "ollama" and self._mm_manager: "content": result["content"],
from infrastructure.models.multimodal import ModelCapability "provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
# Check if selected model supports the required capability
if content_type == ContentType.VISION:
supports = self._mm_manager.model_supports(
selected_model, ModelCapability.VISION
)
if not supports:
# Find fallback model
fallback = self._get_fallback_model(
provider, selected_model, content_type
)
if fallback:
logger.info(
"Model %s doesn't support vision, falling back to %s",
selected_model,
fallback,
)
selected_model = fallback
is_fallback_model = True
else:
logger.warning(
"No vision-capable model found on %s, trying anyway",
provider.name,
)
# Try this provider
for attempt in range(self.config.max_retries_per_provider):
try:
result = await self._try_provider(
provider=provider,
messages=messages,
model=selected_model,
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
# Success! Update metrics and return
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get(
"model", selected_model or provider.get_default_model()
),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
except Exception as exc:
error_msg = str(exc)
logger.warning(
"Provider %s attempt %d failed: %s", provider.name, attempt + 1, error_msg
)
errors.append(f"{provider.name}: {error_msg}")
if attempt < self.config.max_retries_per_provider - 1:
await asyncio.sleep(self.config.retry_delay_seconds)
# All retries failed for this provider
self._record_failure(provider)
# All providers failed
raise RuntimeError(f"All providers failed: {'; '.join(errors)}") raise RuntimeError(f"All providers failed: {'; '.join(errors)}")
async def _try_provider( async def _try_provider(
@@ -581,7 +609,7 @@ class CascadeRouter:
"""Call Ollama API with multi-modal support.""" """Call Ollama API with multi-modal support."""
import aiohttp import aiohttp
url = f"{provider.url}/api/chat" url = f"{provider.url or settings.ollama_url}/api/chat"
# Transform messages for Ollama format (including images) # Transform messages for Ollama format (including images)
transformed_messages = self._transform_messages_for_ollama(messages) transformed_messages = self._transform_messages_for_ollama(messages)

View File

@@ -63,7 +63,7 @@ def _pull_model(model_name: str) -> bool:
logger.info("Pulling model: %s", model_name) logger.info("Pulling model: %s", model_name)
url = settings.ollama_url.replace("localhost", "127.0.0.1") url = settings.normalized_ollama_url
req = urllib.request.Request( req = urllib.request.Request(
f"{url}/api/pull", f"{url}/api/pull",
method="POST", method="POST",
@@ -197,6 +197,113 @@ def _resolve_backend(requested: str | None) -> str:
return "ollama" return "ollama"
def _build_tools_list(use_tools: bool, skip_mcp: bool) -> list:
"""Build the Agno tools list (toolkit + optional MCP servers).
Args:
use_tools: Whether the model supports tool calling.
skip_mcp: If True, omit MCP tool servers.
Returns:
List of Toolkit / MCPTools, possibly empty.
"""
if not use_tools:
logger.info("Tools disabled (model too small for reliable tool calling)")
return []
toolkit = create_full_toolkit()
tools_list: list = [toolkit]
# Add MCP tool servers (lazy-connected on first arun()).
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
# scopes that conflict with asyncio background task cancellation (#72).
if not skip_mcp:
try:
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
gitea_mcp = create_gitea_mcp_tools()
if gitea_mcp:
tools_list.append(gitea_mcp)
fs_mcp = create_filesystem_mcp_tools()
if fs_mcp:
tools_list.append(fs_mcp)
except Exception as exc:
logger.debug("MCP tools unavailable: %s", exc)
return tools_list
def _build_prompt(use_tools: bool, session_id: str) -> str:
"""Build the full system prompt with optional memory context.
Args:
use_tools: Whether tools are enabled (affects prompt tier and context budget).
session_id: Session identifier for the prompt.
Returns:
Complete system prompt string.
"""
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
try:
from timmy.memory_system import memory_system
memory_context = memory_system.get_system_context()
if memory_context:
# Truncate if too long — smaller budget for small models
# since the expanded prompt (roster, guardrails) uses more tokens
max_context = 2000 if not use_tools else 8000
if len(memory_context) > max_context:
memory_context = memory_context[:max_context] + "\n... [truncated]"
return (
f"{base_prompt}\n\n"
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
f"{memory_context}"
)
except Exception as exc:
logger.warning("Failed to load memory context: %s", exc)
return base_prompt
def _create_ollama_agent(
model_name: str,
db_file: str,
tools_list: list,
full_prompt: str,
use_tools: bool,
) -> Agent:
"""Construct the Agno Agent with an Ollama model.
Args:
model_name: Resolved Ollama model name.
db_file: SQLite file for conversation memory.
tools_list: Pre-built tools list (may be empty).
full_prompt: Complete system prompt.
use_tools: Whether tools are enabled.
Returns:
Configured Agno Agent.
"""
model_kwargs = {}
if settings.ollama_num_ctx > 0:
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
return Agent(
name="Agent",
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
db=SqliteDb(db_file=db_file),
description=full_prompt,
add_history_to_context=True,
num_history_runs=20,
markdown=False,
tools=tools_list if tools_list else None,
tool_call_limit=settings.max_agent_steps if use_tools else None,
telemetry=settings.telemetry_enabled,
)
def create_timmy( def create_timmy(
db_file: str = "timmy.db", db_file: str = "timmy.db",
backend: str | None = None, backend: str | None = None,
@@ -238,16 +345,12 @@ def create_timmy(
return TimmyAirLLMAgent(model_size=size) return TimmyAirLLMAgent(model_size=size)
# Default: Ollama via Agno. # Default: Ollama via Agno.
# Resolve model with automatic pulling and fallback
model_name, is_fallback = _resolve_model_with_fallback( model_name, is_fallback = _resolve_model_with_fallback(
requested_model=None, requested_model=None,
require_vision=False, require_vision=False,
auto_pull=True, auto_pull=True,
) )
# If Ollama is completely unreachable, fail loudly.
# Sovereignty: never silently send data to a cloud API.
# Use --backend claude explicitly if you want cloud inference.
if not _check_model_available(model_name): if not _check_model_available(model_name):
logger.error( logger.error(
"Ollama unreachable and no local models available. " "Ollama unreachable and no local models available. "
@@ -258,74 +361,9 @@ def create_timmy(
logger.info("Using fallback model %s (requested was unavailable)", model_name) logger.info("Using fallback model %s (requested was unavailable)", model_name)
use_tools = _model_supports_tools(model_name) use_tools = _model_supports_tools(model_name)
tools_list = _build_tools_list(use_tools, skip_mcp)
# Conditionally include tools — small models get none full_prompt = _build_prompt(use_tools, session_id)
toolkit = create_full_toolkit() if use_tools else None agent = _create_ollama_agent(model_name, db_file, tools_list, full_prompt, use_tools)
if not use_tools:
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
# Build the tools list — Agno accepts a list of Toolkit / MCPTools
tools_list: list = []
if toolkit:
tools_list.append(toolkit)
# Add MCP tool servers (lazy-connected on first arun()).
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
# scopes that conflict with asyncio background task cancellation (#72).
if use_tools and not skip_mcp:
try:
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
gitea_mcp = create_gitea_mcp_tools()
if gitea_mcp:
tools_list.append(gitea_mcp)
fs_mcp = create_filesystem_mcp_tools()
if fs_mcp:
tools_list.append(fs_mcp)
except Exception as exc:
logger.debug("MCP tools unavailable: %s", exc)
# Select prompt tier based on tool capability
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
# Try to load memory context
try:
from timmy.memory_system import memory_system
memory_context = memory_system.get_system_context()
if memory_context:
# Truncate if too long — smaller budget for small models
# since the expanded prompt (roster, guardrails) uses more tokens
max_context = 2000 if not use_tools else 8000
if len(memory_context) > max_context:
memory_context = memory_context[:max_context] + "\n... [truncated]"
full_prompt = (
f"{base_prompt}\n\n"
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
f"{memory_context}"
)
else:
full_prompt = base_prompt
except Exception as exc:
logger.warning("Failed to load memory context: %s", exc)
full_prompt = base_prompt
model_kwargs = {}
if settings.ollama_num_ctx > 0:
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
agent = Agent(
name="Agent",
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
db=SqliteDb(db_file=db_file),
description=full_prompt,
add_history_to_context=True,
num_history_runs=20,
markdown=False,
tools=tools_list if tools_list else None,
tool_call_limit=settings.max_agent_steps if use_tools else None,
telemetry=settings.telemetry_enabled,
)
_warmup_model(model_name) _warmup_model(model_name)
return agent return agent

View File

@@ -95,6 +95,126 @@ def _parse_steps(plan_text: str) -> list[str]:
return [line.strip() for line in plan_text.strip().splitlines() if line.strip()] return [line.strip() for line in plan_text.strip().splitlines() if line.strip()]
# ---------------------------------------------------------------------------
# Extracted helpers
# ---------------------------------------------------------------------------
def _extract_content(run_result) -> str:
"""Extract text content from an agent run result."""
return run_result.content if hasattr(run_result, "content") else str(run_result)
def _clean(text: str) -> str:
"""Clean a model response using session's response cleaner."""
from timmy.session import _clean_response
return _clean_response(text)
async def _plan_task(
agent, task: str, session_id: str, max_steps: int
) -> tuple[list[str], bool] | str:
"""Run the planning phase — returns (steps, was_truncated) or error string."""
plan_prompt = (
f"Break this task into numbered steps (max {max_steps}). "
f"Return ONLY a numbered list, nothing else.\n\n"
f"Task: {task}"
)
try:
plan_run = await asyncio.to_thread(
agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
)
plan_text = _extract_content(plan_run)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop: planning failed: %s", exc)
return f"Planning failed: {exc}"
steps = _parse_steps(plan_text)
if not steps:
return "Planning produced no steps."
planned_count = len(steps)
steps = steps[:max_steps]
return steps, planned_count > len(steps)
async def _execute_step(
agent,
task: str,
step_desc: str,
step_num: int,
total_steps: int,
recent_results: list[str],
session_id: str,
) -> AgenticStep:
"""Execute a single step, returning an AgenticStep."""
step_start = time.monotonic()
context = (
f"Task: {task}\n"
f"Step {step_num}/{total_steps}: {step_desc}\n"
f"Recent progress: {recent_results[-2:] if recent_results else []}\n\n"
f"Execute this step and report what you did."
)
step_run = await asyncio.to_thread(
agent.run, context, stream=False, session_id=f"{session_id}_step{step_num}"
)
step_result = _clean(_extract_content(step_run))
return AgenticStep(
step_num=step_num,
description=step_desc,
result=step_result,
status="completed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
async def _adapt_step(
agent,
step_desc: str,
step_num: int,
error: Exception,
step_start: float,
session_id: str,
) -> AgenticStep:
"""Attempt adaptation after a step failure."""
adapt_prompt = (
f"Step {step_num} failed with error: {error}\n"
f"Original step was: {step_desc}\n"
f"Adapt the plan and try an alternative approach for this step."
)
adapt_run = await asyncio.to_thread(
agent.run, adapt_prompt, stream=False, session_id=f"{session_id}_adapt{step_num}"
)
adapt_result = _clean(_extract_content(adapt_run))
return AgenticStep(
step_num=step_num,
description=f"[Adapted] {step_desc}",
result=adapt_result,
status="adapted",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
def _summarize(result: AgenticResult, total_steps: int, was_truncated: bool) -> None:
"""Fill in summary and final status on the result object (mutates in place)."""
completed = sum(1 for s in result.steps if s.status == "completed")
adapted = sum(1 for s in result.steps if s.status == "adapted")
failed = sum(1 for s in result.steps if s.status == "failed")
parts = [f"Completed {completed}/{total_steps} steps"]
if adapted:
parts.append(f"{adapted} adapted")
if failed:
parts.append(f"{failed} failed")
result.summary = f"{result.task}: {', '.join(parts)}."
if was_truncated or len(result.steps) < total_steps or failed:
result.status = "partial"
else:
result.status = "completed"
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Core loop # Core loop
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -125,88 +245,41 @@ async def run_agentic_loop(
task_id = str(uuid.uuid4())[:8] task_id = str(uuid.uuid4())[:8]
start_time = time.monotonic() start_time = time.monotonic()
agent = _get_loop_agent() agent = _get_loop_agent()
result = AgenticResult(task_id=task_id, task=task, summary="") result = AgenticResult(task_id=task_id, task=task, summary="")
# ── Phase 1: Planning ────────────────────────────────────────────────── # Phase 1: Planning
plan_prompt = ( plan = await _plan_task(agent, task, session_id, max_steps)
f"Break this task into numbered steps (max {max_steps}). " if isinstance(plan, str):
f"Return ONLY a numbered list, nothing else.\n\n"
f"Task: {task}"
)
try:
plan_run = await asyncio.to_thread(
agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
)
plan_text = plan_run.content if hasattr(plan_run, "content") else str(plan_run)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop: planning failed: %s", exc)
result.status = "failed" result.status = "failed"
result.summary = f"Planning failed: {exc}" result.summary = plan
result.total_duration_ms = int((time.monotonic() - start_time) * 1000) result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
return result return result
steps = _parse_steps(plan_text) steps, was_truncated = plan
if not steps:
result.status = "failed"
result.summary = "Planning produced no steps."
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
return result
# Enforce max_steps — track if we truncated
planned_steps = len(steps)
steps = steps[:max_steps]
total_steps = len(steps) total_steps = len(steps)
was_truncated = planned_steps > total_steps
# Broadcast plan
await _broadcast_progress( await _broadcast_progress(
"agentic.plan_ready", "agentic.plan_ready",
{ {"task_id": task_id, "task": task, "steps": steps, "total": total_steps},
"task_id": task_id,
"task": task,
"steps": steps,
"total": total_steps,
},
) )
# ── Phase 2: Execution ───────────────────────────────────────────────── # Phase 2: Execution
completed_results: list[str] = [] completed_results: list[str] = []
for i, step_desc in enumerate(steps, 1): for i, step_desc in enumerate(steps, 1):
step_start = time.monotonic() step_start = time.monotonic()
recent = completed_results[-2:] if completed_results else []
context = (
f"Task: {task}\n"
f"Step {i}/{total_steps}: {step_desc}\n"
f"Recent progress: {recent}\n\n"
f"Execute this step and report what you did."
)
try: try:
step_run = await asyncio.to_thread( step = await _execute_step(
agent.run, context, stream=False, session_id=f"{session_id}_step{i}" agent,
) task,
step_result = step_run.content if hasattr(step_run, "content") else str(step_run) step_desc,
i,
# Clean the response total_steps,
from timmy.session import _clean_response completed_results,
session_id,
step_result = _clean_response(step_result)
step = AgenticStep(
step_num=i,
description=step_desc,
result=step_result,
status="completed",
duration_ms=int((time.monotonic() - step_start) * 1000),
) )
result.steps.append(step) result.steps.append(step)
completed_results.append(f"Step {i}: {step_result[:200]}") completed_results.append(f"Step {i}: {step.result[:200]}")
# Broadcast progress
await _broadcast_progress( await _broadcast_progress(
"agentic.step_complete", "agentic.step_complete",
{ {
@@ -214,46 +287,18 @@ async def run_agentic_loop(
"step": i, "step": i,
"total": total_steps, "total": total_steps,
"description": step_desc, "description": step_desc,
"result": step_result[:200], "result": step.result[:200],
}, },
) )
if on_progress: if on_progress:
await on_progress(step_desc, i, total_steps) await on_progress(step_desc, i, total_steps)
except Exception as exc: # broad catch intentional: agent.run can raise any error except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.warning("Agentic loop step %d failed: %s", i, exc) logger.warning("Agentic loop step %d failed: %s", i, exc)
# ── Adaptation: ask model to adapt ─────────────────────────────
adapt_prompt = (
f"Step {i} failed with error: {exc}\n"
f"Original step was: {step_desc}\n"
f"Adapt the plan and try an alternative approach for this step."
)
try: try:
adapt_run = await asyncio.to_thread( step = await _adapt_step(agent, step_desc, i, exc, step_start, session_id)
agent.run,
adapt_prompt,
stream=False,
session_id=f"{session_id}_adapt{i}",
)
adapt_result = (
adapt_run.content if hasattr(adapt_run, "content") else str(adapt_run)
)
from timmy.session import _clean_response
adapt_result = _clean_response(adapt_result)
step = AgenticStep(
step_num=i,
description=f"[Adapted] {step_desc}",
result=adapt_result,
status="adapted",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
result.steps.append(step) result.steps.append(step)
completed_results.append(f"Step {i} (adapted): {adapt_result[:200]}") completed_results.append(f"Step {i} (adapted): {step.result[:200]}")
await _broadcast_progress( await _broadcast_progress(
"agentic.step_adapted", "agentic.step_adapted",
{ {
@@ -262,46 +307,26 @@ async def run_agentic_loop(
"total": total_steps, "total": total_steps,
"description": step_desc, "description": step_desc,
"error": str(exc), "error": str(exc),
"adaptation": adapt_result[:200], "adaptation": step.result[:200],
}, },
) )
if on_progress: if on_progress:
await on_progress(f"[Adapted] {step_desc}", i, total_steps) await on_progress(f"[Adapted] {step_desc}", i, total_steps)
except Exception as adapt_exc: # broad catch intentional
except Exception as adapt_exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop adaptation also failed: %s", adapt_exc) logger.error("Agentic loop adaptation also failed: %s", adapt_exc)
step = AgenticStep( result.steps.append(
step_num=i, AgenticStep(
description=step_desc, step_num=i,
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}", description=step_desc,
status="failed", result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
duration_ms=int((time.monotonic() - step_start) * 1000), status="failed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
) )
result.steps.append(step)
completed_results.append(f"Step {i}: FAILED") completed_results.append(f"Step {i}: FAILED")
# ── Phase 3: Summary ─────────────────────────────────────────────────── # Phase 3: Summary
completed_count = sum(1 for s in result.steps if s.status == "completed") _summarize(result, total_steps, was_truncated)
adapted_count = sum(1 for s in result.steps if s.status == "adapted")
failed_count = sum(1 for s in result.steps if s.status == "failed")
parts = [f"Completed {completed_count}/{total_steps} steps"]
if adapted_count:
parts.append(f"{adapted_count} adapted")
if failed_count:
parts.append(f"{failed_count} failed")
result.summary = f"{task}: {', '.join(parts)}."
# Determine final status
if was_truncated:
result.status = "partial"
elif len(result.steps) < total_steps:
result.status = "partial"
elif any(s.status == "failed" for s in result.steps):
result.status = "partial"
else:
result.status = "completed"
result.total_duration_ms = int((time.monotonic() - start_time) * 1000) result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
await _broadcast_progress( await _broadcast_progress(

View File

@@ -636,7 +636,7 @@ class HotMemory:
if len(lines) > 1: if len(lines) > 1:
return "\n".join(lines) return "\n".join(lines)
except Exception: except Exception:
pass logger.debug("DB context read failed, falling back to file")
# Fallback to file if DB unavailable # Fallback to file if DB unavailable
if self.path.exists(): if self.path.exists():

View File

@@ -232,6 +232,90 @@ class ThinkingEngine:
return False # Disabled — never idle return False # Disabled — never idle
return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout) return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout)
def _build_thinking_context(self) -> tuple[str, str, list["Thought"]]:
"""Assemble the context needed for a thinking cycle.
Returns:
(memory_context, system_context, recent_thoughts)
"""
memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5)
return memory_context, system_context, recent_thoughts
async def _generate_novel_thought(
self,
prompt: str | None,
memory_context: str,
system_context: str,
recent_thoughts: list["Thought"],
) -> tuple[str | None, str]:
"""Run the dedup-retry loop to produce a novel thought.
Returns:
(content, seed_type) — content is None if no novel thought produced.
"""
seed_type: str = "freeform"
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
if prompt:
seed_type = "prompted"
seed_context = f"Journal prompt: {prompt}"
else:
seed_type, seed_context = self._gather_seed()
continuity = self._build_continuity_context()
full_prompt = _THINKING_PROMPT.format(
memory_context=memory_context,
system_context=system_context,
seed_context=seed_context,
continuity_context=continuity,
)
try:
raw = await self._call_agent(full_prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None, seed_type
if not raw or not raw.strip():
logger.debug("Thinking cycle produced empty response, skipping")
return None, seed_type
content = raw.strip()
# Dedup: reject thoughts too similar to recent ones
if not self._is_too_similar(content, recent_thoughts):
return content, seed_type # Good — novel thought
if attempt < self._MAX_DEDUP_RETRIES:
logger.info(
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
attempt + 1,
self._MAX_DEDUP_RETRIES + 1,
)
else:
logger.warning(
"Thought still repetitive after %d retries, discarding",
self._MAX_DEDUP_RETRIES + 1,
)
return None, seed_type
return None, seed_type
async def _process_thinking_result(self, thought: "Thought") -> None:
"""Run all post-hooks after a thought is stored."""
self._maybe_check_memory()
await self._maybe_distill()
await self._maybe_file_issues()
await self._check_workspace()
self._maybe_check_memory_status()
self._update_memory(thought)
self._log_event(thought)
self._write_journal(thought)
await self._broadcast(thought)
async def think_once(self, prompt: str | None = None) -> Thought | None: async def think_once(self, prompt: str | None = None) -> Thought | None:
"""Execute one thinking cycle. """Execute one thinking cycle.
@@ -257,91 +341,21 @@ class ThinkingEngine:
) )
return None return None
memory_context = self._load_memory_context() memory_context, system_context, recent_thoughts = self._build_thinking_context()
system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5)
content: str | None = None
seed_type: str = "freeform"
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
if prompt:
seed_type = "prompted"
seed_context = f"Journal prompt: {prompt}"
else:
seed_type, seed_context = self._gather_seed()
continuity = self._build_continuity_context()
full_prompt = _THINKING_PROMPT.format(
memory_context=memory_context,
system_context=system_context,
seed_context=seed_context,
continuity_context=continuity,
)
try:
raw = await self._call_agent(full_prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None
if not raw or not raw.strip():
logger.debug("Thinking cycle produced empty response, skipping")
return None
content = raw.strip()
# Dedup: reject thoughts too similar to recent ones
if not self._is_too_similar(content, recent_thoughts):
break # Good — novel thought
if attempt < self._MAX_DEDUP_RETRIES:
logger.info(
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
attempt + 1,
self._MAX_DEDUP_RETRIES + 1,
)
content = None # Will retry
else:
logger.warning(
"Thought still repetitive after %d retries, discarding",
self._MAX_DEDUP_RETRIES + 1,
)
return None
content, seed_type = await self._generate_novel_thought(
prompt,
memory_context,
system_context,
recent_thoughts,
)
if not content: if not content:
return None return None
thought = self._store_thought(content, seed_type) thought = self._store_thought(content, seed_type)
self._last_thought_id = thought.id self._last_thought_id = thought.id
# Post-hook: check memory status periodically await self._process_thinking_result(thought)
self._maybe_check_memory()
# Post-hook: distill facts from recent thoughts periodically
await self._maybe_distill()
# Post-hook: file Gitea issues for actionable observations
await self._maybe_file_issues()
# Post-hook: check workspace for new messages from Hermes
await self._check_workspace()
# Post-hook: proactive memory status audit
self._maybe_check_memory_status()
# Post-hook: update MEMORY.md with latest reflection
self._update_memory(thought)
# Log to swarm event system
self._log_event(thought)
# Append to daily journal file
self._write_journal(thought)
# Broadcast to WebSocket clients
await self._broadcast(thought)
logger.info( logger.info(
"Thought [%s] (%s): %s", "Thought [%s] (%s): %s",

View File

@@ -2,7 +2,7 @@
import time import time
from pathlib import Path from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, patch
import pytest import pytest
import yaml import yaml
@@ -489,34 +489,6 @@ class TestProviderAvailabilityCheck:
assert router._check_provider_available(provider) is False assert router._check_provider_available(provider) is False
def test_check_airllm_installed(self):
"""Test AirLLM when installed."""
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = Provider(
name="airllm",
type="airllm",
enabled=True,
priority=1,
)
with patch("importlib.util.find_spec", return_value=MagicMock()):
assert router._check_provider_available(provider) is True
def test_check_airllm_not_installed(self):
"""Test AirLLM when not installed."""
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = Provider(
name="airllm",
type="airllm",
enabled=True,
priority=1,
)
with patch("importlib.util.find_spec", return_value=None):
assert router._check_provider_available(provider) is False
class TestCascadeRouterReload: class TestCascadeRouterReload:
"""Test hot-reload of providers.yaml.""" """Test hot-reload of providers.yaml."""

View File

@@ -0,0 +1,86 @@
"""Tests for scripts/cycle_retro.py issue auto-detection."""
from __future__ import annotations
# Import the module under test — it's a script so we import the helpers directly
import importlib
import subprocess
from pathlib import Path
from unittest.mock import patch
import pytest
SCRIPTS_DIR = Path(__file__).resolve().parent.parent.parent / "scripts"
@pytest.fixture(autouse=True)
def _add_scripts_to_path(monkeypatch):
monkeypatch.syspath_prepend(str(SCRIPTS_DIR))
@pytest.fixture()
def mod():
"""Import cycle_retro as a module."""
return importlib.import_module("cycle_retro")
class TestDetectIssueFromBranch:
def test_kimi_issue_branch(self, mod):
with patch.object(subprocess, "check_output", return_value="kimi/issue-492\n"):
assert mod.detect_issue_from_branch() == 492
def test_plain_issue_branch(self, mod):
with patch.object(subprocess, "check_output", return_value="issue-123\n"):
assert mod.detect_issue_from_branch() == 123
def test_issue_slash_number(self, mod):
with patch.object(subprocess, "check_output", return_value="fix/issue/55\n"):
assert mod.detect_issue_from_branch() == 55
def test_no_issue_in_branch(self, mod):
with patch.object(subprocess, "check_output", return_value="main\n"):
assert mod.detect_issue_from_branch() is None
def test_feature_branch(self, mod):
with patch.object(subprocess, "check_output", return_value="feature/add-widget\n"):
assert mod.detect_issue_from_branch() is None
def test_git_not_available(self, mod):
with patch.object(subprocess, "check_output", side_effect=FileNotFoundError):
assert mod.detect_issue_from_branch() is None
def test_git_fails(self, mod):
with patch.object(
subprocess,
"check_output",
side_effect=subprocess.CalledProcessError(1, "git"),
):
assert mod.detect_issue_from_branch() is None
class TestBackfillExtractIssueNumber:
"""Tests for backfill_retro.extract_issue_number PR-number filtering."""
@pytest.fixture()
def backfill(self):
return importlib.import_module("backfill_retro")
def test_body_has_issue(self, backfill):
assert backfill.extract_issue_number("fix: foo (#491)", "Fixes #490", pr_number=491) == 490
def test_title_skips_pr_number(self, backfill):
assert backfill.extract_issue_number("fix: foo (#491)", "", pr_number=491) is None
def test_title_with_issue_and_pr(self, backfill):
# [loop-cycle-538] refactor: ... (#459) (#481)
assert (
backfill.extract_issue_number(
"[loop-cycle-538] refactor: remove dead airllm (#459) (#481)",
"",
pr_number=481,
)
== 459
)
def test_no_pr_number_provided(self, backfill):
assert backfill.extract_issue_number("fix: foo (#491)", "") == 491

View File

@@ -454,3 +454,127 @@ def test_no_hardcoded_fallback_constants_in_agent():
assert not hasattr(agent_mod, "VISION_MODEL_FALLBACKS"), ( assert not hasattr(agent_mod, "VISION_MODEL_FALLBACKS"), (
"Hardcoded VISION_MODEL_FALLBACKS still exists — use settings.vision_fallback_models" "Hardcoded VISION_MODEL_FALLBACKS still exists — use settings.vision_fallback_models"
) )
# ── _build_tools_list helper ─────────────────────────────────────────────────
def test_build_tools_list_returns_empty_when_no_tools():
"""When use_tools=False, _build_tools_list returns an empty list."""
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=False, skip_mcp=False)
assert result == []
def test_build_tools_list_includes_toolkit():
"""When use_tools=True, _build_tools_list includes the toolkit."""
mock_toolkit = MagicMock()
with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit):
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=True, skip_mcp=True)
assert mock_toolkit in result
def test_build_tools_list_adds_mcp_when_not_skipped():
"""When skip_mcp=False, _build_tools_list attempts MCP tools."""
mock_toolkit = MagicMock()
mock_gitea = MagicMock()
with (
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=mock_gitea),
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None),
):
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=True, skip_mcp=False)
assert mock_toolkit in result
assert mock_gitea in result
# ── _build_prompt helper ─────────────────────────────────────────────────────
def test_build_prompt_returns_base_when_no_memory():
"""_build_prompt returns base prompt when memory context is empty."""
with patch("timmy.memory_system.memory_system") as mock_mem:
mock_mem.get_system_context.return_value = ""
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "Timmy" in result
def test_build_prompt_appends_memory_context():
"""_build_prompt appends memory context when available."""
with patch("timmy.memory_system.memory_system") as mock_mem:
mock_mem.get_system_context.return_value = "User likes pizza"
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "User likes pizza" in result
assert "GROUNDED CONTEXT" in result
def test_build_prompt_truncates_long_memory_for_small_models():
"""_build_prompt truncates memory for small models (use_tools=False)."""
long_context = "x" * 5000
with patch("timmy.memory_system.memory_system") as mock_mem:
mock_mem.get_system_context.return_value = long_context
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=False, session_id="test")
# Max context is 2000 for small models + truncation marker
assert "[truncated]" in result
# ── _create_ollama_agent helper ──────────────────────────────────────────────
def test_create_ollama_agent_passes_correct_kwargs():
"""_create_ollama_agent passes the expected kwargs to Agent()."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
model_name="test-model",
db_file="test.db",
tools_list=[MagicMock()],
full_prompt="Test prompt",
use_tools=True,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["description"] == "Test prompt"
assert kwargs["tools"] is not None
def test_create_ollama_agent_none_tools_when_empty():
"""_create_ollama_agent passes tools=None when tools_list is empty."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
model_name="test-model",
db_file="test.db",
tools_list=[],
full_prompt="Test prompt",
use_tools=False,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["tools"] is None