Compare commits

...

6 Commits

Author SHA1 Message Date
Alexander Whitestone
c58093dccc WIP: Claude Code progress on #1285
Automated salvage commit — agent session ended (exit 124).
Work in progress, may need continuation.
2026-03-23 22:02:09 -04:00
55beaf241f [claude] Research summary: Kimi creative blueprint (#891) (#1286)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-24 01:46:28 +00:00
69498c9add [claude] Screenshot dump triage — 5 issues created (#1275) (#1287)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-24 01:46:22 +00:00
6c76bf2f66 [claude] Integrate health snapshot into Daily Run pre-flight (#923) (#1280)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-24 01:43:49 +00:00
0436dfd4c4 [claude] Dashboard: Agent Scorecards panel in Mission Control (#929) (#1276)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-24 01:43:21 +00:00
9eeb49a6f1 [claude] Autonomous research pipeline — orchestrator + SOVEREIGNTY.md (#972) (#1274)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-24 01:40:53 +00:00
19 changed files with 1884 additions and 29 deletions

View File

@@ -18,9 +18,17 @@ jobs:
- name: Lint (ruff via tox) - name: Lint (ruff via tox)
run: tox -e lint run: tox -e lint
test: typecheck:
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: lint needs: lint
steps:
- uses: actions/checkout@v4
- name: Type-check (mypy via tox)
run: tox -e typecheck
test:
runs-on: ubuntu-latest
needs: typecheck
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Run tests (via tox) - name: Run tests (via tox)

122
SOVEREIGNTY.md Normal file
View File

@@ -0,0 +1,122 @@
# SOVEREIGNTY.md — Research Sovereignty Manifest
> "If this spec is implemented correctly, it is the last research document
> Alexander should need to request from a corporate AI."
> — Issue #972, March 22 2026
---
## What This Is
A machine-readable declaration of Timmy's research independence:
where we are, where we're going, and how to measure progress.
---
## The Problem We're Solving
On March 22, 2026, a single Claude session produced six deep research reports.
It consumed ~3 hours of human time and substantial corporate AI inference.
Every report was valuable — but the workflow was **linear**.
It would cost exactly the same to reproduce tomorrow.
This file tracks the pipeline that crystallizes that workflow into something
Timmy can run autonomously.
---
## The Six-Step Pipeline
| Step | What Happens | Status |
|------|-------------|--------|
| 1. Scope | Human describes knowledge gap → Gitea issue with template | ✅ Done (`skills/research/`) |
| 2. Query | LLM slot-fills template → 515 targeted queries | ✅ Done (`research.py`) |
| 3. Search | Execute queries → top result URLs | ✅ Done (`research_tools.py`) |
| 4. Fetch | Download + extract full pages (trafilatura) | ✅ Done (`tools/system_tools.py`) |
| 5. Synthesize | Compress findings → structured report | ✅ Done (`research.py` cascade) |
| 6. Deliver | Store to semantic memory + optional disk persist | ✅ Done (`research.py`) |
---
## Cascade Tiers (Synthesis Quality vs. Cost)
| Tier | Model | Cost | Quality | Status |
|------|-------|------|---------|--------|
| **4** | SQLite semantic cache | $0.00 / instant | reuses prior | ✅ Active |
| **3** | Ollama `qwen3:14b` | $0.00 / local | ★★★ | ✅ Active |
| **2** | Claude API (haiku) | ~$0.01/report | ★★★★ | ✅ Active (opt-in) |
| **1** | Groq `llama-3.3-70b` | $0.00 / rate-limited | ★★★★ | 🔲 Planned (#980) |
Set `ANTHROPIC_API_KEY` to enable Tier 2 fallback.
---
## Research Templates
Six prompt templates live in `skills/research/`:
| Template | Use Case |
|----------|----------|
| `tool_evaluation.md` | Find all shipping tools for `{domain}` |
| `architecture_spike.md` | How to connect `{system_a}` to `{system_b}` |
| `game_analysis.md` | Evaluate `{game}` for AI agent play |
| `integration_guide.md` | Wire `{tool}` into `{stack}` with code |
| `state_of_art.md` | What exists in `{field}` as of `{date}` |
| `competitive_scan.md` | How does `{project}` compare to `{alternatives}` |
---
## Sovereignty Metrics
| Metric | Target (Week 1) | Target (Month 1) | Target (Month 3) | Graduation |
|--------|-----------------|------------------|------------------|------------|
| Queries answered locally | 10% | 40% | 80% | >90% |
| API cost per report | <$1.50 | <$0.50 | <$0.10 | <$0.01 |
| Time from question to report | <3 hours | <30 min | <5 min | <1 min |
| Human involvement | 100% (review) | Review only | Approve only | None |
---
## How to Use the Pipeline
```python
from timmy.research import run_research
# Quick research (no template)
result = await run_research("best local embedding models for 36GB RAM")
# With a template and slot values
result = await run_research(
topic="PDF text extraction libraries for Python",
template="tool_evaluation",
slots={"domain": "PDF parsing", "use_case": "RAG pipeline", "focus_criteria": "accuracy"},
save_to_disk=True,
)
print(result.report)
print(f"Backend: {result.synthesis_backend}, Cached: {result.cached}")
```
---
## Implementation Status
| Component | Issue | Status |
|-----------|-------|--------|
| `web_fetch` tool (trafilatura) | #973 | ✅ Done |
| Research template library (6 templates) | #974 | ✅ Done |
| `ResearchOrchestrator` (`research.py`) | #975 | ✅ Done |
| Semantic index for outputs | #976 | 🔲 Planned |
| Auto-create Gitea issues from findings | #977 | 🔲 Planned |
| Paperclip task runner integration | #978 | 🔲 Planned |
| Kimi delegation via labels | #979 | 🔲 Planned |
| Groq free-tier cascade tier | #980 | 🔲 Planned |
| Sovereignty metrics dashboard | #981 | 🔲 Planned |
---
## Governing Spec
See [issue #972](http://143.198.27.163:3000/Rockachopa/Timmy-time-dashboard/issues/972) for the full spec and rationale.
Research artifacts committed to `docs/research/`.

View File

@@ -0,0 +1,89 @@
# Screenshot Dump Triage — Visual Inspiration & Research Leads
**Date:** March 24, 2026
**Source:** Issue #1275 — "Screenshot dump for triage #1"
**Analyst:** Claude (Sonnet 4.6)
---
## Screenshots Ingested
| File | Subject | Action |
|------|---------|--------|
| IMG_6187.jpeg | AirLLM / Apple Silicon local LLM requirements | → Issue #1284 |
| IMG_6125.jpeg | vLLM backend for agentic workloads | → Issue #1281 |
| IMG_6124.jpeg | DeerFlow autonomous research pipeline | → Issue #1283 |
| IMG_6123.jpeg | "Vibe Coder vs Normal Developer" meme | → Issue #1285 |
| IMG_6410.jpeg | SearXNG + Crawl4AI self-hosted search MCP | → Issue #1282 |
---
## Tickets Created
### #1281 — feat: add vLLM as alternative inference backend
**Source:** IMG_6125 (vLLM for agentic workloads)
vLLM's continuous batching makes it 310x more throughput-efficient than Ollama for multi-agent
request patterns. Implement `VllmBackend` in `infrastructure/llm_router/` as a selectable
backend (`TIMMY_LLM_BACKEND=vllm`) with graceful fallback to Ollama.
**Priority:** Medium — impactful for research pipeline performance once #972 is in use
---
### #1282 — feat: integrate SearXNG + Crawl4AI as self-hosted search backend
**Source:** IMG_6410 (luxiaolei/searxng-crawl4ai-mcp)
Self-hosted search via SearXNG + Crawl4AI removes the hard dependency on paid search APIs
(Brave, Tavily). Add both as Docker Compose services, implement `web_search()` and
`scrape_url()` tools in `timmy/tools/`, and register them with the research agent.
**Priority:** High — unblocks fully local/private operation of research agents
---
### #1283 — research: evaluate DeerFlow as autonomous research orchestration layer
**Source:** IMG_6124 (deer-flow Docker setup)
DeerFlow is ByteDance's open-source autonomous research pipeline framework. Before investing
further in Timmy's custom orchestrator (#972), evaluate whether DeerFlow's architecture offers
integration value or design patterns worth borrowing.
**Priority:** Medium — research first, implementation follows if go/no-go is positive
---
### #1284 — chore: document and validate AirLLM Apple Silicon requirements
**Source:** IMG_6187 (Mac-compatible LLM setup)
AirLLM graceful degradation is already implemented but undocumented. Add System Requirements
to README (M1/M2/M3/M4, 16 GB RAM min, 15 GB disk) and document `TIMMY_LLM_BACKEND` in
`.env.example`.
**Priority:** Low — documentation only, no code risk
---
### #1285 — chore: enforce "Normal Developer" discipline — tighten quality gates
**Source:** IMG_6123 (Vibe Coder vs Normal Developer meme)
Tighten the existing mypy/bandit/coverage gates: fix all mypy errors, raise coverage from 73%
to 80%, add a documented pre-push hook, and run `vulture` for dead code. The infrastructure
exists — it just needs enforcing.
**Priority:** Medium — technical debt prevention, pairs well with any green-field feature work
---
## Patterns Observed Across Screenshots
1. **Local-first is the north star.** All five images reinforce the same theme: private,
self-hosted, runs on your hardware. vLLM, SearXNG, AirLLM, DeerFlow — none require cloud.
Timmy is already aligned with this direction; these are tactical additions.
2. **Agentic performance bottlenecks are real.** Two of five images (vLLM, DeerFlow) focus
specifically on throughput and reliability for multi-agent loops. As the research pipeline
matures, inference speed and search reliability will become the main constraints.
3. **Discipline compounds.** The meme is a reminder that the quality gates we have (tox,
mypy, bandit, coverage) only pay off if they are enforced without exceptions.

View File

@@ -0,0 +1,290 @@
# Building Timmy: Technical Blueprint for Sovereign Creative AI
> **Source:** PDF attached to issue #891, "Building Timmy: a technical blueprint for sovereign
> creative AI" — generated by Kimi.ai, 16 pages, filed by Perplexity for Timmy's review.
> **Filed:** 2026-03-22 · **Reviewed:** 2026-03-23
---
## Executive Summary
The blueprint establishes that a sovereign creative AI capable of coding, composing music,
generating art, building worlds, publishing narratives, and managing its own economy is
**technically feasible today** — but only through orchestration of dozens of tools operating
at different maturity levels. The core insight: *the integration is the invention*. No single
component is new; the missing piece is a coherent identity operating across all domains
simultaneously with persistent memory, autonomous economics, and cross-domain creative
reactions.
Three non-negotiable architectural decisions:
1. **Human oversight for all public-facing content** — every successful creative AI has this;
every one that removed it failed.
2. **Legal entity before economic activity** — AI agents are not legal persons; establish
structure before wealth accumulates (Truth Terminal cautionary tale: $20M acquired before
a foundation was retroactively created).
3. **Hybrid memory: vector search + knowledge graph** — neither alone is sufficient for
multi-domain context breadth.
---
## Domain-by-Domain Assessment
### Software Development (immediately deployable)
| Component | Recommendation | Notes |
|-----------|----------------|-------|
| Primary agent | Claude Code (Opus 4.6, 77.2% SWE-bench) | Already in use |
| Self-hosted forge | Forgejo (MIT, 170200MB RAM) | Project uses Gitea/Forgejo now |
| CI/CD | GitHub Actions-compatible via `act_runner` | — |
| Tool-making | LATM pattern: frontier model creates tools, cheaper model applies them | New — see ADR opportunity |
| Open-source fallback | OpenHands (~65% SWE-bench, Docker sandboxed) | Backup to Claude Code |
| Self-improvement | Darwin Gödel Machine / SICA patterns | 36 month investment |
**Development estimate:** 23 weeks for Forgejo + Claude Code integration with automated
PR workflows; 12 months for self-improving tool-making pipeline.
**Cross-reference:** This project already runs Claude Code agents on Forgejo. The LATM
pattern (tool registry) and self-improvement loop are the actionable gaps.
---
### Music (14 weeks)
| Component | Recommendation | Notes |
|-----------|----------------|-------|
| Commercial vocals | Suno v5 API (~$0.03/song, $30/month Premier) | No official API; third-party: sunoapi.org, AIMLAPI, EvoLink |
| Local instrumental | MusicGen 1.5B (CC-BY-NC — monetization blocker) | On M2 Max: ~60s for 5s clip |
| Voice cloning | GPT-SoVITS v4 (MIT) | Works on Apple Silicon CPU, RTF 0.526 on M4 |
| Voice conversion | RVC (MIT, 510 min training audio) | — |
| Apple Silicon TTS | MLX-Audio: Kokoro 82M + Qwen3-TTS 0.6B | 45x faster via Metal |
| Publishing | Wavlake (90/10 split, Lightning micropayments) | Auto-syndicates to Fountain.fm |
| Nostr | NIP-94 (kind:1063) audio events → NIP-96 servers | — |
**Copyright reality:** US Copyright Office (Jan 2025) and US Court of Appeals (Mar 2025):
purely AI-generated music cannot be copyrighted and enters public domain. Wavlake's
Value4Value model works around this — fans pay for relationship, not exclusive rights.
**Avoid:** Udio (download disabled since Oct 2025, 2.4/5 Trustpilot).
---
### Visual Art (13 weeks)
| Component | Recommendation | Notes |
|-----------|----------------|-------|
| Local generation | ComfyUI API at `127.0.0.1:8188` (programmatic control via WebSocket) | MLX extension: 5070% faster |
| Speed | Draw Things (free, Mac App Store) | 3× faster than ComfyUI via Metal shaders |
| Quality frontier | Flux 2 (Nov 2025, 4MP, multi-reference) | SDXL needs 16GB+, Flux Dev 32GB+ |
| Character consistency | LoRA training (30 min, 1530 references) + Flux.1 Kontext | Solved problem |
| Face consistency | IP-Adapter + FaceID (ComfyUI-IP-Adapter-Plus) | Training-free |
| Comics | Jenova AI ($20/month, 200+ page consistency) or LlamaGen AI (free) | — |
| Publishing | Blossom protocol (SHA-256 addressed, kind:10063) + Nostr NIP-94 | — |
| Physical | Printful REST API (200+ products, automated fulfillment) | — |
---
### Writing / Narrative (14 weeks for pipeline; ongoing for quality)
| Component | Recommendation | Notes |
|-----------|----------------|-------|
| LLM | Claude Opus 4.5/4.6 (leads Mazur Writing Benchmark at 8.561) | Already in use |
| Context | 500K tokens (1M in beta) — entire novels fit | — |
| Architecture | Outline-first → RAG lore bible → chapter-by-chapter generation | Without outline: novels meander |
| Lore management | WorldAnvil Pro or custom LoreScribe (local RAG) | No tool achieves 100% consistency |
| Publishing (ebooks) | Pandoc → EPUB / KDP PDF | pandoc-novel template on GitHub |
| Publishing (print) | Lulu Press REST API (80% profit, global print network) | KDP: no official API, 3-book/day limit |
| Publishing (Nostr) | NIP-23 kind:30023 long-form events | Habla.news, YakiHonne, Stacker News |
| Podcasts | LLM script → TTS (ElevenLabs or local Kokoro/MLX-Audio) → feedgen RSS → Fountain.fm | Value4Value sats-per-minute |
**Key constraint:** AI-assisted (human directs, AI drafts) = 40% faster. Fully autonomous
without editing = "generic, soulless prose" and character drift by chapter 3 without explicit
memory.
---
### World Building / Games (2 weeks3 months depending on target)
| Component | Recommendation | Notes |
|-----------|----------------|-------|
| Algorithms | Wave Function Collapse, Perlin noise (FastNoiseLite in Godot 4), L-systems | All mature |
| Platform | Godot Engine + gd-agentic-skills (82+ skills, 26 genre blueprints) | Strong LLM/GDScript knowledge |
| Narrative design | Knowledge graph (world state) + LLM + quest template grammar | CHI 2023 validated |
| Quick win | Luanti/Minetest (Lua API, 2,800+ open mods for reference) | Immediately feasible |
| Medium effort | OpenMW content creation (omwaddon format engineering required) | 23 months |
| Future | Unity MCP (AI direct Unity Editor interaction) | Early-stage |
---
### Identity Architecture (2 months)
The blueprint formalizes the **SOUL.md standard** (GitHub: aaronjmars/soul.md):
| File | Purpose |
|------|---------|
| `SOUL.md` | Who you are — identity, worldview, opinions |
| `STYLE.md` | How you write — voice, syntax, patterns |
| `SKILL.md` | Operating modes |
| `MEMORY.md` | Session continuity |
**Critical decision — static vs self-modifying identity:**
- Static Core Truths (version-controlled, human-approved changes only) ✓
- Self-modifying Learned Preferences (logged with rollback, monitored by guardian) ✓
- **Warning:** OpenClaw's "Soul Evolution" creates a security attack surface — Zenity Labs
demonstrated a complete zero-click attack chain targeting SOUL.md files.
**Relevance to this repo:** Claude Code agents already use a `MEMORY.md` pattern in
this project. The SOUL.md stack is a natural extension.
---
### Memory Architecture (2 months)
Hybrid vector + knowledge graph is the recommendation:
| Component | Tool | Notes |
|-----------|------|-------|
| Vector + KG combined | Mem0 (mem0.ai) | 26% accuracy improvement over OpenAI memory, 91% lower p95 latency, 90% token savings |
| Vector store | Qdrant (Rust, open-source) | High-throughput with metadata filtering |
| Temporal KG | Neo4j + Graphiti (Zep AI) | P95 retrieval: 300ms, hybrid semantic + BM25 + graph |
| Backup/migration | AgentKeeper (95% critical fact recovery across model migrations) | — |
**Journal pattern (Stanford Generative Agents):** Agent writes about experiences, generates
high-level reflections 23x/day when importance scores exceed threshold. Ablation studies:
removing any component (observation, planning, reflection) significantly reduces behavioral
believability.
**Cross-reference:** The existing `brain/` package is the memory system. Qdrant and
Mem0 are the recommended upgrade targets.
---
### Multi-Agent Sub-System (36 months)
The blueprint describes a named sub-agent hierarchy:
| Agent | Role |
|-------|------|
| Oracle | Top-level planner / supervisor |
| Sentinel | Safety / moderation |
| Scout | Research / information gathering |
| Scribe | Writing / narrative |
| Ledger | Economic management |
| Weaver | Visual art generation |
| Composer | Music generation |
| Social | Platform publishing |
**Orchestration options:**
- **Agno** (already in use) — microsecond instantiation, 50× less memory than LangGraph
- **CrewAI Flows** — event-driven with fine-grained control
- **LangGraph** — DAG-based with stateful workflows and time-travel debugging
**Scheduling pattern (Stanford Generative Agents):** Top-down recursive daily → hourly →
5-minute planning. Event interrupts for reactive tasks. Re-planning triggers when accumulated
importance scores exceed threshold.
**Cross-reference:** The existing `spark/` package (event capture, advisory engine) aligns
with this architecture. `infrastructure/event_bus` is the choreography backbone.
---
### Economic Engine (14 weeks)
Lightning Labs released `lightning-agent-tools` (open-source) in February 2026:
- `lnget` — CLI HTTP client for L402 payments
- Remote signer architecture (private keys on separate machine from agent)
- Scoped macaroon credentials (pay-only, invoice-only, read-only roles)
- **Aperture** — converts any API to pay-per-use via L402 (HTTP 402)
| Option | Effort | Notes |
|--------|--------|-------|
| ln.bot | 1 week | "Bitcoin for AI Agents" — 3 commands create a wallet; CLI + MCP + REST |
| LND via gRPC | 23 weeks | Full programmatic node management for production |
| Coinbase Agentic Wallets | — | Fiat-adjacent; less aligned with sovereignty ethos |
**Revenue channels:** Wavlake (music, 90/10 Lightning), Nostr zaps (articles), Stacker News
(earn sats from engagement), Printful (physical goods), L402-gated API access (pay-per-use
services), Geyser.fund (Lightning crowdfunding, better initial runway than micropayments).
**Cross-reference:** The existing `lightning/` package in this repo is the foundation.
L402 paywall endpoints for Timmy's own services is the actionable gap.
---
## Pioneer Case Studies
| Agent | Active | Revenue | Key Lesson |
|-------|--------|---------|-----------|
| Botto | Since Oct 2021 | $5M+ (art auctions) | Community governance via DAO sustains engagement; "taste model" (humans guide, not direct) preserves autonomous authorship |
| Neuro-sama | Since Dec 2022 | $400K+/month (subscriptions) | 3+ years of iteration; errors became entertainment features; 24/7 capability is an insurmountable advantage |
| Truth Terminal | Since Jun 2024 | $20M accumulated | Memetic fitness > planned monetization; human gatekeeper approved tweets while selecting AI-intent responses; **establish legal entity first** |
| Holly+ | Since 2021 | Conceptual | DAO of stewards for voice governance; "identity play" as alternative to defensive IP |
| AI Sponge | 2023 | Banned | Unmoderated content → TOS violations + copyright |
| Nothing Forever | 2022present | 8 viewers | Unmoderated content → ban → audience collapse; novelty-only propositions fail |
**Universal pattern:** Human oversight + economic incentive alignment + multi-year personality
development + platform-native economics = success.
---
## Recommended Implementation Sequence
From the blueprint, mapped against Timmy's existing architecture:
### Phase 1: Immediate (weeks)
1. **Code sovereignty** — Forgejo + Claude Code automated PR workflows (already substantially done)
2. **Music pipeline** — Suno API → Wavlake/Nostr NIP-94 publishing
3. **Visual art pipeline** — ComfyUI API → Blossom/Nostr with LoRA character consistency
4. **Basic Lightning wallet** — ln.bot integration for receiving micropayments
5. **Long-form publishing** — Nostr NIP-23 + RSS feed generation
### Phase 2: Moderate effort (13 months)
6. **LATM tool registry** — frontier model creates Python utilities, caches them, lighter model applies
7. **Event-driven cross-domain reactions** — game event → blog + artwork + music (CrewAI/LangGraph)
8. **Podcast generation** — TTS + feedgen → Fountain.fm
9. **Self-improving pipeline** — agent creates, tests, caches own Python utilities
10. **Comic generation** — character-consistent panels with Jenova AI or local LoRA
### Phase 3: Significant investment (36 months)
11. **Full sub-agent hierarchy** — Oracle/Sentinel/Scout/Scribe/Ledger/Weaver with Agno
12. **SOUL.md identity system** — bounded evolution + guardian monitoring
13. **Hybrid memory upgrade** — Qdrant + Mem0/Graphiti replacing or extending `brain/`
14. **Procedural world generation** — Godot + AI-driven narrative (quests, NPCs, lore)
15. **Self-sustaining economic loop** — earned revenue covers compute costs
### Remains aspirational (12+ months)
- Fully autonomous novel-length fiction without editorial intervention
- YouTube monetization for AI-generated content (tightening platform policies)
- Copyright protection for AI-generated works (current US law denies this)
- True artistic identity evolution (genuine creative voice vs pattern remixing)
- Self-modifying architecture without regression or identity drift
---
## Gap Analysis: Blueprint vs Current Codebase
| Blueprint Capability | Current Status | Gap |
|---------------------|----------------|-----|
| Code sovereignty | Done (Claude Code + Forgejo) | LATM tool registry |
| Music generation | Not started | Suno API integration + Wavlake publishing |
| Visual art | Not started | ComfyUI API client + Blossom publishing |
| Writing/publishing | Not started | Nostr NIP-23 + Pandoc pipeline |
| World building | Bannerlord work (different scope) | Luanti mods as quick win |
| Identity (SOUL.md) | Partial (CLAUDE.md + MEMORY.md) | Full SOUL.md stack |
| Memory (hybrid) | `brain/` package (SQLite-based) | Qdrant + knowledge graph |
| Multi-agent | Agno in use | Named hierarchy + event choreography |
| Lightning payments | `lightning/` package | ln.bot wallet + L402 endpoints |
| Nostr identity | Referenced in roadmap, not built | NIP-05, NIP-89 capability cards |
| Legal entity | Unknown | **Must be resolved before economic activity** |
---
## ADR Candidates
Issues that warrant Architecture Decision Records based on this review:
1. **LATM tool registry pattern** — How Timmy creates, tests, and caches self-made tools
2. **Music generation strategy** — Suno (cloud, commercial quality) vs MusicGen (local, CC-BY-NC)
3. **Memory upgrade path** — When/how to migrate `brain/` from SQLite to Qdrant + KG
4. **SOUL.md adoption** — Extending existing CLAUDE.md/MEMORY.md to full SOUL.md stack
5. **Lightning L402 strategy** — Which services Timmy gates behind micropayments
6. **Sub-agent naming and contracts** — Formalizing Oracle/Sentinel/Scout/Scribe/Ledger/Weaver

View File

@@ -164,3 +164,7 @@ directory = "htmlcov"
[tool.coverage.xml] [tool.coverage.xml]
output = "coverage.xml" output = "coverage.xml"
[tool.mypy]
ignore_missing_imports = true
no_error_summary = true

View File

View File

@@ -6,6 +6,8 @@ import sqlite3
from contextlib import closing from contextlib import closing
from pathlib import Path from pathlib import Path
from typing import Any
from fastapi import APIRouter, Request from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse from fastapi.responses import HTMLResponse, JSONResponse
@@ -36,9 +38,9 @@ def _discover_databases() -> list[dict]:
return dbs return dbs
def _query_database(db_path: str) -> dict: def _query_database(db_path: str) -> dict[str, Any]:
"""Open a database read-only and return all tables with their rows.""" """Open a database read-only and return all tables with their rows."""
result = {"tables": {}, "error": None} result: dict[str, Any] = {"tables": {}, "error": None}
try: try:
with closing(sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)) as conn: with closing(sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)) as conn:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row

View File

@@ -186,6 +186,24 @@
<p class="chat-history-placeholder">Loading sovereignty metrics...</p> <p class="chat-history-placeholder">Loading sovereignty metrics...</p>
{% endcall %} {% endcall %}
<!-- Agent Scorecards -->
<div class="card mc-card-spaced" id="mc-scorecards-card">
<div class="card-header">
<h2 class="card-title">Agent Scorecards</h2>
<div class="d-flex align-items-center gap-2">
<select id="mc-scorecard-period" class="form-select form-select-sm" style="width: auto;"
onchange="loadMcScorecards()">
<option value="daily" selected>Daily</option>
<option value="weekly">Weekly</option>
</select>
<a href="/scorecards" class="btn btn-sm btn-outline-secondary">Full View</a>
</div>
</div>
<div id="mc-scorecards-content" class="p-2">
<p class="chat-history-placeholder">Loading scorecards...</p>
</div>
</div>
<!-- Chat History --> <!-- Chat History -->
<div class="card mc-card-spaced"> <div class="card mc-card-spaced">
<div class="card-header"> <div class="card-header">
@@ -502,6 +520,20 @@ async function loadSparkStatus() {
} }
} }
// Load agent scorecards
async function loadMcScorecards() {
var period = document.getElementById('mc-scorecard-period').value;
var container = document.getElementById('mc-scorecards-content');
container.innerHTML = '<p class="chat-history-placeholder">Loading scorecards...</p>';
try {
var response = await fetch('/scorecards/all/panels?period=' + period);
var html = await response.text();
container.innerHTML = html;
} catch (error) {
container.innerHTML = '<p class="chat-history-placeholder">Scorecards unavailable</p>';
}
}
// Initial load // Initial load
loadSparkStatus(); loadSparkStatus();
loadSovereignty(); loadSovereignty();
@@ -510,6 +542,7 @@ loadSwarmStats();
loadLightningStats(); loadLightningStats();
loadGrokStats(); loadGrokStats();
loadChatHistory(); loadChatHistory();
loadMcScorecards();
// Periodic updates // Periodic updates
setInterval(loadSovereignty, 30000); setInterval(loadSovereignty, 30000);
@@ -518,5 +551,6 @@ setInterval(loadSwarmStats, 5000);
setInterval(updateHeartbeat, 5000); setInterval(updateHeartbeat, 5000);
setInterval(loadGrokStats, 10000); setInterval(loadGrokStats, 10000);
setInterval(loadSparkStatus, 15000); setInterval(loadSparkStatus, 15000);
setInterval(loadMcScorecards, 300000);
</script> </script>
{% endblock %} {% endblock %}

View File

@@ -137,7 +137,7 @@ class HermesMonitor:
message=f"Check error: {r}", message=f"Check error: {r}",
) )
) )
else: elif isinstance(r, CheckResult):
checks.append(r) checks.append(r)
# Compute overall level # Compute overall level

View File

@@ -203,7 +203,7 @@ async def reload_config(
@router.get("/history") @router.get("/history")
async def get_history( async def get_history(
hours: int = 24, hours: int = 24,
store: Annotated[HealthHistoryStore, Depends(get_history_store)] = None, store: Annotated[HealthHistoryStore | None, Depends(get_history_store)] = None,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
"""Get provider health history for the last N hours.""" """Get provider health history for the last N hours."""
if store is None: if store is None:

View File

@@ -744,19 +744,20 @@ class CascadeRouter:
self, self,
provider: Provider, provider: Provider,
messages: list[dict], messages: list[dict],
model: str, model: str | None,
temperature: float, temperature: float,
max_tokens: int | None, max_tokens: int | None,
content_type: ContentType = ContentType.TEXT, content_type: ContentType = ContentType.TEXT,
) -> dict: ) -> dict:
"""Try a single provider request.""" """Try a single provider request."""
start_time = time.time() start_time = time.time()
effective_model: str = model or provider.get_default_model() or ""
if provider.type == "ollama": if provider.type == "ollama":
result = await self._call_ollama( result = await self._call_ollama(
provider=provider, provider=provider,
messages=messages, messages=messages,
model=model or provider.get_default_model(), model=effective_model,
temperature=temperature, temperature=temperature,
max_tokens=max_tokens, max_tokens=max_tokens,
content_type=content_type, content_type=content_type,
@@ -765,7 +766,7 @@ class CascadeRouter:
result = await self._call_openai( result = await self._call_openai(
provider=provider, provider=provider,
messages=messages, messages=messages,
model=model or provider.get_default_model(), model=effective_model,
temperature=temperature, temperature=temperature,
max_tokens=max_tokens, max_tokens=max_tokens,
) )
@@ -773,7 +774,7 @@ class CascadeRouter:
result = await self._call_anthropic( result = await self._call_anthropic(
provider=provider, provider=provider,
messages=messages, messages=messages,
model=model or provider.get_default_model(), model=effective_model,
temperature=temperature, temperature=temperature,
max_tokens=max_tokens, max_tokens=max_tokens,
) )
@@ -781,7 +782,7 @@ class CascadeRouter:
result = await self._call_grok( result = await self._call_grok(
provider=provider, provider=provider,
messages=messages, messages=messages,
model=model or provider.get_default_model(), model=effective_model,
temperature=temperature, temperature=temperature,
max_tokens=max_tokens, max_tokens=max_tokens,
) )
@@ -789,7 +790,7 @@ class CascadeRouter:
result = await self._call_vllm_mlx( result = await self._call_vllm_mlx(
provider=provider, provider=provider,
messages=messages, messages=messages,
model=model or provider.get_default_model(), model=effective_model,
temperature=temperature, temperature=temperature,
max_tokens=max_tokens, max_tokens=max_tokens,
) )

View File

@@ -474,7 +474,7 @@ class DiscordVendor(ChatPlatform):
async def _run_client(self, token: str) -> None: async def _run_client(self, token: str) -> None:
"""Run the discord.py client (blocking call in a task).""" """Run the discord.py client (blocking call in a task)."""
try: try:
await self._client.start(token) await self._client.start(token) # type: ignore[union-attr]
except Exception as exc: except Exception as exc:
logger.error("Discord client error: %s", exc) logger.error("Discord client error: %s", exc)
self._state = PlatformState.ERROR self._state = PlatformState.ERROR
@@ -482,32 +482,32 @@ class DiscordVendor(ChatPlatform):
def _register_handlers(self) -> None: def _register_handlers(self) -> None:
"""Register Discord event handlers on the client.""" """Register Discord event handlers on the client."""
@self._client.event @self._client.event # type: ignore[union-attr]
async def on_ready(): async def on_ready():
self._guild_count = len(self._client.guilds) self._guild_count = len(self._client.guilds) # type: ignore[union-attr]
self._state = PlatformState.CONNECTED self._state = PlatformState.CONNECTED
logger.info( logger.info(
"Discord ready: %s in %d guild(s)", "Discord ready: %s in %d guild(s)",
self._client.user, self._client.user, # type: ignore[union-attr]
self._guild_count, self._guild_count,
) )
@self._client.event @self._client.event # type: ignore[union-attr]
async def on_message(message): async def on_message(message):
# Ignore our own messages # Ignore our own messages
if message.author == self._client.user: if message.author == self._client.user: # type: ignore[union-attr]
return return
# Only respond to mentions or DMs # Only respond to mentions or DMs
is_dm = not hasattr(message.channel, "guild") or message.channel.guild is None is_dm = not hasattr(message.channel, "guild") or message.channel.guild is None
is_mention = self._client.user in message.mentions is_mention = self._client.user in message.mentions # type: ignore[union-attr]
if not is_dm and not is_mention: if not is_dm and not is_mention:
return return
await self._handle_message(message) await self._handle_message(message)
@self._client.event @self._client.event # type: ignore[union-attr]
async def on_disconnect(): async def on_disconnect():
if self._state != PlatformState.DISCONNECTED: if self._state != PlatformState.DISCONNECTED:
self._state = PlatformState.CONNECTING self._state = PlatformState.CONNECTING
@@ -535,8 +535,8 @@ class DiscordVendor(ChatPlatform):
def _extract_content(self, message) -> str: def _extract_content(self, message) -> str:
"""Strip the bot mention and return clean message text.""" """Strip the bot mention and return clean message text."""
content = message.content content = message.content
if self._client.user: if self._client.user: # type: ignore[union-attr]
content = content.replace(f"<@{self._client.user.id}>", "").strip() content = content.replace(f"<@{self._client.user.id}>", "").strip() # type: ignore[union-attr]
return content return content
async def _invoke_agent(self, content: str, session_id: str, target): async def _invoke_agent(self, content: str, session_id: str, target):

View File

@@ -102,14 +102,14 @@ class TelegramBot:
self._token = tok self._token = tok
self._app = Application.builder().token(tok).build() self._app = Application.builder().token(tok).build()
self._app.add_handler(CommandHandler("start", self._cmd_start)) self._app.add_handler(CommandHandler("start", self._cmd_start)) # type: ignore[union-attr]
self._app.add_handler( self._app.add_handler( # type: ignore[union-attr]
MessageHandler(filters.TEXT & ~filters.COMMAND, self._handle_message) MessageHandler(filters.TEXT & ~filters.COMMAND, self._handle_message)
) )
await self._app.initialize() await self._app.initialize() # type: ignore[union-attr]
await self._app.start() await self._app.start() # type: ignore[union-attr]
await self._app.updater.start_polling(allowed_updates=Update.ALL_TYPES) await self._app.updater.start_polling(allowed_updates=Update.ALL_TYPES) # type: ignore[union-attr]
self._running = True self._running = True
logger.info("Telegram bot started.") logger.info("Telegram bot started.")

528
src/timmy/research.py Normal file
View File

@@ -0,0 +1,528 @@
"""Research Orchestrator — autonomous, sovereign research pipeline.
Chains all six steps of the research workflow with local-first execution:
Step 0 Cache — check semantic memory (SQLite, instant, zero API cost)
Step 1 Scope — load a research template from skills/research/
Step 2 Query — slot-fill template + formulate 5-15 search queries via Ollama
Step 3 Search — execute queries via web_search (SerpAPI or fallback)
Step 4 Fetch — download + extract full pages via web_fetch (trafilatura)
Step 5 Synth — compress findings into a structured report via cascade
Step 6 Deliver — store to semantic memory; optionally save to docs/research/
Cascade tiers for synthesis (spec §4):
Tier 4 SQLite semantic cache — instant, free, covers ~80% after warm-up
Tier 3 Ollama (qwen3:14b) — local, free, good quality
Tier 2 Claude API (haiku) — cloud fallback, cheap, set ANTHROPIC_API_KEY
Tier 1 (future) Groq — free-tier rate-limited, tracked in #980
All optional services degrade gracefully per project conventions.
Refs #972 (governing spec), #975 (ResearchOrchestrator sub-issue).
"""
from __future__ import annotations
import asyncio
import logging
import re
import textwrap
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# Optional memory imports — available at module level so tests can patch them.
try:
from timmy.memory_system import SemanticMemory, store_memory
except Exception: # pragma: no cover
SemanticMemory = None # type: ignore[assignment,misc]
store_memory = None # type: ignore[assignment]
# Root of the project — two levels up from src/timmy/
_PROJECT_ROOT = Path(__file__).parent.parent.parent
_SKILLS_ROOT = _PROJECT_ROOT / "skills" / "research"
_DOCS_ROOT = _PROJECT_ROOT / "docs" / "research"
# Similarity threshold for cache hit (01 cosine similarity)
_CACHE_HIT_THRESHOLD = 0.82
# How many search result URLs to fetch as full pages
_FETCH_TOP_N = 5
# Maximum tokens to request from the synthesis LLM
_SYNTHESIS_MAX_TOKENS = 4096
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class ResearchResult:
"""Full output of a research pipeline run."""
topic: str
query_count: int
sources_fetched: int
report: str
cached: bool = False
cache_similarity: float = 0.0
synthesis_backend: str = "unknown"
errors: list[str] = field(default_factory=list)
def is_empty(self) -> bool:
return not self.report.strip()
# ---------------------------------------------------------------------------
# Template loading
# ---------------------------------------------------------------------------
def list_templates() -> list[str]:
"""Return names of available research templates (without .md extension)."""
if not _SKILLS_ROOT.exists():
return []
return [p.stem for p in sorted(_SKILLS_ROOT.glob("*.md"))]
def load_template(template_name: str, slots: dict[str, str] | None = None) -> str:
"""Load a research template and fill {slot} placeholders.
Args:
template_name: Stem of the .md file under skills/research/ (e.g. "tool_evaluation").
slots: Mapping of {placeholder} → replacement value.
Returns:
Template text with slots filled. Unfilled slots are left as-is.
"""
path = _SKILLS_ROOT / f"{template_name}.md"
if not path.exists():
available = ", ".join(list_templates()) or "(none)"
raise FileNotFoundError(
f"Research template {template_name!r} not found. "
f"Available: {available}"
)
text = path.read_text(encoding="utf-8")
# Strip YAML frontmatter (--- ... ---), including empty frontmatter (--- \n---)
text = re.sub(r"^---\n.*?---\n", "", text, flags=re.DOTALL)
if slots:
for key, value in slots.items():
text = text.replace(f"{{{key}}}", value)
return text.strip()
# ---------------------------------------------------------------------------
# Query formulation (Step 2)
# ---------------------------------------------------------------------------
async def _formulate_queries(topic: str, template_context: str, n: int = 8) -> list[str]:
"""Use the local LLM to generate targeted search queries for a topic.
Falls back to a simple heuristic if Ollama is unavailable.
"""
prompt = textwrap.dedent(f"""\
You are a research assistant. Generate exactly {n} targeted, specific web search
queries to thoroughly research the following topic.
TOPIC: {topic}
RESEARCH CONTEXT:
{template_context[:1000]}
Rules:
- One query per line, no numbering, no bullet points.
- Vary the angle (definition, comparison, implementation, alternatives, pitfalls).
- Prefer exact technical terms, tool names, and version numbers where relevant.
- Output ONLY the queries, nothing else.
""")
queries = await _ollama_complete(prompt, max_tokens=512)
if not queries:
# Minimal fallback
return [
f"{topic} overview",
f"{topic} tutorial",
f"{topic} best practices",
f"{topic} alternatives",
f"{topic} 2025",
]
lines = [ln.strip() for ln in queries.splitlines() if ln.strip()]
return lines[:n] if len(lines) >= n else lines
# ---------------------------------------------------------------------------
# Search (Step 3)
# ---------------------------------------------------------------------------
async def _execute_search(queries: list[str]) -> list[dict[str, str]]:
"""Run each query through the available web search backend.
Returns a flat list of {title, url, snippet} dicts.
Degrades gracefully if SerpAPI key is absent.
"""
results: list[dict[str, str]] = []
seen_urls: set[str] = set()
for query in queries:
try:
raw = await asyncio.to_thread(_run_search_sync, query)
for item in raw:
url = item.get("url", "")
if url and url not in seen_urls:
seen_urls.add(url)
results.append(item)
except Exception as exc:
logger.warning("Search failed for query %r: %s", query, exc)
return results
def _run_search_sync(query: str) -> list[dict[str, str]]:
"""Synchronous search — wraps SerpAPI or returns empty on missing key."""
import os
if not os.environ.get("SERPAPI_API_KEY"):
logger.debug("SERPAPI_API_KEY not set — skipping web search for %r", query)
return []
try:
from serpapi import GoogleSearch
params = {"q": query, "api_key": os.environ["SERPAPI_API_KEY"], "num": 5}
search = GoogleSearch(params)
data = search.get_dict()
items = []
for r in data.get("organic_results", []):
items.append(
{
"title": r.get("title", ""),
"url": r.get("link", ""),
"snippet": r.get("snippet", ""),
}
)
return items
except Exception as exc:
logger.warning("SerpAPI search error: %s", exc)
return []
# ---------------------------------------------------------------------------
# Fetch (Step 4)
# ---------------------------------------------------------------------------
async def _fetch_pages(results: list[dict[str, str]], top_n: int = _FETCH_TOP_N) -> list[str]:
"""Download and extract full text for the top search results.
Uses web_fetch (trafilatura) from timmy.tools.system_tools.
"""
try:
from timmy.tools.system_tools import web_fetch
except ImportError:
logger.warning("web_fetch not available — skipping page fetch")
return []
pages: list[str] = []
for item in results[:top_n]:
url = item.get("url", "")
if not url:
continue
try:
text = await asyncio.to_thread(web_fetch, url, 6000)
if text and not text.startswith("Error:"):
pages.append(f"## {item.get('title', url)}\nSource: {url}\n\n{text}")
except Exception as exc:
logger.warning("Failed to fetch %s: %s", url, exc)
return pages
# ---------------------------------------------------------------------------
# Synthesis (Step 5) — cascade: Ollama → Claude fallback
# ---------------------------------------------------------------------------
async def _synthesize(topic: str, pages: list[str], snippets: list[str]) -> tuple[str, str]:
"""Compress fetched pages + snippets into a structured research report.
Returns (report_markdown, backend_used).
"""
# Build synthesis prompt
source_content = "\n\n---\n\n".join(pages[:5])
if not source_content and snippets:
source_content = "\n".join(f"- {s}" for s in snippets[:20])
if not source_content:
return (
f"# Research: {topic}\n\n*No source material was retrieved. "
"Check SERPAPI_API_KEY and network connectivity.*",
"none",
)
prompt = textwrap.dedent(f"""\
You are a senior technical researcher. Synthesize the source material below
into a structured research report on the topic: **{topic}**
FORMAT YOUR REPORT AS:
# {topic}
## Executive Summary
(2-3 sentences: what you found, top recommendation)
## Key Findings
(Bullet list of the most important facts, tools, or patterns)
## Comparison / Options
(Table or list comparing alternatives where applicable)
## Recommended Approach
(Concrete recommendation with rationale)
## Gaps & Next Steps
(What wasn't answered, what to investigate next)
---
SOURCE MATERIAL:
{source_content[:12000]}
""")
# Tier 3 — try Ollama first
report = await _ollama_complete(prompt, max_tokens=_SYNTHESIS_MAX_TOKENS)
if report:
return report, "ollama"
# Tier 2 — Claude fallback
report = await _claude_complete(prompt, max_tokens=_SYNTHESIS_MAX_TOKENS)
if report:
return report, "claude"
# Last resort — structured snippet summary
summary = f"# {topic}\n\n## Snippets\n\n" + "\n\n".join(
f"- {s}" for s in snippets[:15]
)
return summary, "fallback"
# ---------------------------------------------------------------------------
# LLM helpers
# ---------------------------------------------------------------------------
async def _ollama_complete(prompt: str, max_tokens: int = 1024) -> str:
"""Send a prompt to Ollama and return the response text.
Returns empty string on failure (graceful degradation).
"""
try:
import httpx
from config import settings
url = f"{settings.normalized_ollama_url}/api/generate"
payload: dict[str, Any] = {
"model": settings.ollama_model,
"prompt": prompt,
"stream": False,
"options": {
"num_predict": max_tokens,
"temperature": 0.3,
},
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
data = resp.json()
return data.get("response", "").strip()
except Exception as exc:
logger.warning("Ollama completion failed: %s", exc)
return ""
async def _claude_complete(prompt: str, max_tokens: int = 1024) -> str:
"""Send a prompt to Claude API as a last-resort fallback.
Only active when ANTHROPIC_API_KEY is configured.
Returns empty string on failure or missing key.
"""
try:
from config import settings
if not settings.anthropic_api_key:
return ""
from timmy.backends import ClaudeBackend
backend = ClaudeBackend()
result = await asyncio.to_thread(backend.run, prompt)
return result.content.strip()
except Exception as exc:
logger.warning("Claude fallback failed: %s", exc)
return ""
# ---------------------------------------------------------------------------
# Memory cache (Step 0 + Step 6)
# ---------------------------------------------------------------------------
def _check_cache(topic: str) -> tuple[str | None, float]:
"""Search semantic memory for a prior result on this topic.
Returns (cached_report, similarity) or (None, 0.0).
"""
try:
if SemanticMemory is None:
return None, 0.0
mem = SemanticMemory()
hits = mem.search(topic, top_k=1)
if hits:
content, score = hits[0]
if score >= _CACHE_HIT_THRESHOLD:
return content, score
except Exception as exc:
logger.debug("Cache check failed: %s", exc)
return None, 0.0
def _store_result(topic: str, report: str) -> None:
"""Index the research report into semantic memory for future retrieval."""
try:
if store_memory is None:
logger.debug("store_memory not available — skipping memory index")
return
store_memory(
content=report,
source="research_pipeline",
context_type="research",
metadata={"topic": topic},
)
logger.info("Research result indexed for topic: %r", topic)
except Exception as exc:
logger.warning("Failed to store research result: %s", exc)
def _save_to_disk(topic: str, report: str) -> Path | None:
"""Persist the report as a markdown file under docs/research/.
Filename is derived from the topic (slugified). Returns the path or None.
"""
try:
slug = re.sub(r"[^a-z0-9]+", "-", topic.lower()).strip("-")[:60]
_DOCS_ROOT.mkdir(parents=True, exist_ok=True)
path = _DOCS_ROOT / f"{slug}.md"
path.write_text(report, encoding="utf-8")
logger.info("Research report saved to %s", path)
return path
except Exception as exc:
logger.warning("Failed to save research report to disk: %s", exc)
return None
# ---------------------------------------------------------------------------
# Main orchestrator
# ---------------------------------------------------------------------------
async def run_research(
topic: str,
template: str | None = None,
slots: dict[str, str] | None = None,
save_to_disk: bool = False,
skip_cache: bool = False,
) -> ResearchResult:
"""Run the full 6-step autonomous research pipeline.
Args:
topic: The research question or subject.
template: Name of a template from skills/research/ (e.g. "tool_evaluation").
If None, runs without a template scaffold.
slots: Placeholder values for the template (e.g. {"domain": "PDF parsing"}).
save_to_disk: If True, write the report to docs/research/<slug>.md.
skip_cache: If True, bypass the semantic memory cache.
Returns:
ResearchResult with report and metadata.
"""
errors: list[str] = []
# ------------------------------------------------------------------
# Step 0 — check cache
# ------------------------------------------------------------------
if not skip_cache:
cached, score = _check_cache(topic)
if cached:
logger.info("Cache hit (%.2f) for topic: %r", score, topic)
return ResearchResult(
topic=topic,
query_count=0,
sources_fetched=0,
report=cached,
cached=True,
cache_similarity=score,
synthesis_backend="cache",
)
# ------------------------------------------------------------------
# Step 1 — load template (optional)
# ------------------------------------------------------------------
template_context = ""
if template:
try:
template_context = load_template(template, slots)
except FileNotFoundError as exc:
errors.append(str(exc))
logger.warning("Template load failed: %s", exc)
# ------------------------------------------------------------------
# Step 2 — formulate queries
# ------------------------------------------------------------------
queries = await _formulate_queries(topic, template_context)
logger.info("Formulated %d queries for topic: %r", len(queries), topic)
# ------------------------------------------------------------------
# Step 3 — execute search
# ------------------------------------------------------------------
search_results = await _execute_search(queries)
logger.info("Search returned %d results", len(search_results))
snippets = [r.get("snippet", "") for r in search_results if r.get("snippet")]
# ------------------------------------------------------------------
# Step 4 — fetch full pages
# ------------------------------------------------------------------
pages = await _fetch_pages(search_results)
logger.info("Fetched %d pages", len(pages))
# ------------------------------------------------------------------
# Step 5 — synthesize
# ------------------------------------------------------------------
report, backend = await _synthesize(topic, pages, snippets)
# ------------------------------------------------------------------
# Step 6 — deliver
# ------------------------------------------------------------------
_store_result(topic, report)
if save_to_disk:
_save_to_disk(topic, report)
return ResearchResult(
topic=topic,
query_count=len(queries),
sources_fetched=len(pages),
report=report,
cached=False,
synthesis_backend=backend,
errors=errors,
)

View File

@@ -245,6 +245,7 @@ class VoiceLoop:
def _transcribe(self, audio: np.ndarray) -> str: def _transcribe(self, audio: np.ndarray) -> str:
"""Transcribe audio using local Whisper model.""" """Transcribe audio using local Whisper model."""
self._load_whisper() self._load_whisper()
assert self._whisper_model is not None, "Whisper model failed to load"
sys.stdout.write(" 🧠 Transcribing...\r") sys.stdout.write(" 🧠 Transcribing...\r")
sys.stdout.flush() sys.stdout.flush()

View File

@@ -0,0 +1,403 @@
"""Unit tests for src/timmy/research.py — ResearchOrchestrator pipeline.
Refs #972 (governing spec), #975 (ResearchOrchestrator).
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
pytestmark = pytest.mark.unit
# ---------------------------------------------------------------------------
# list_templates
# ---------------------------------------------------------------------------
class TestListTemplates:
def test_returns_list(self, tmp_path, monkeypatch):
(tmp_path / "tool_evaluation.md").write_text("---\n---\n# T")
(tmp_path / "game_analysis.md").write_text("---\n---\n# G")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
from timmy.research import list_templates
result = list_templates()
assert isinstance(result, list)
assert "tool_evaluation" in result
assert "game_analysis" in result
def test_returns_empty_when_dir_missing(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path / "nonexistent")
from timmy.research import list_templates
assert list_templates() == []
# ---------------------------------------------------------------------------
# load_template
# ---------------------------------------------------------------------------
class TestLoadTemplate:
def _write_template(self, path: Path, name: str, body: str) -> None:
(path / f"{name}.md").write_text(body, encoding="utf-8")
def test_loads_and_strips_frontmatter(self, tmp_path, monkeypatch):
self._write_template(
tmp_path,
"tool_evaluation",
"---\nname: Tool Evaluation\ntype: research\n---\n# Tool Eval: {domain}",
)
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
result = load_template("tool_evaluation", {"domain": "PDF parsing"})
assert "# Tool Eval: PDF parsing" in result
assert "name: Tool Evaluation" not in result
def test_fills_slots(self, tmp_path, monkeypatch):
self._write_template(tmp_path, "arch", "Connect {system_a} to {system_b}")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
result = load_template("arch", {"system_a": "Kafka", "system_b": "Postgres"})
assert "Kafka" in result
assert "Postgres" in result
def test_unfilled_slots_preserved(self, tmp_path, monkeypatch):
self._write_template(tmp_path, "t", "Hello {name} and {other}")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
result = load_template("t", {"name": "World"})
assert "{other}" in result
def test_raises_file_not_found_for_missing_template(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
with pytest.raises(FileNotFoundError, match="nonexistent"):
load_template("nonexistent")
def test_no_slots_returns_raw_body(self, tmp_path, monkeypatch):
self._write_template(tmp_path, "plain", "---\n---\nJust text here")
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
from timmy.research import load_template
result = load_template("plain")
assert result == "Just text here"
# ---------------------------------------------------------------------------
# _check_cache
# ---------------------------------------------------------------------------
class TestCheckCache:
def test_returns_none_when_no_hits(self):
mock_mem = MagicMock()
mock_mem.search.return_value = []
with patch("timmy.research.SemanticMemory", return_value=mock_mem):
from timmy.research import _check_cache
content, score = _check_cache("some topic")
assert content is None
assert score == 0.0
def test_returns_content_above_threshold(self):
mock_mem = MagicMock()
mock_mem.search.return_value = [("cached report text", 0.91)]
with patch("timmy.research.SemanticMemory", return_value=mock_mem):
from timmy.research import _check_cache
content, score = _check_cache("same topic")
assert content == "cached report text"
assert score == pytest.approx(0.91)
def test_returns_none_below_threshold(self):
mock_mem = MagicMock()
mock_mem.search.return_value = [("old report", 0.60)]
with patch("timmy.research.SemanticMemory", return_value=mock_mem):
from timmy.research import _check_cache
content, score = _check_cache("slightly different topic")
assert content is None
assert score == 0.0
def test_degrades_gracefully_on_import_error(self):
with patch("timmy.research.SemanticMemory", None):
from timmy.research import _check_cache
content, score = _check_cache("topic")
assert content is None
assert score == 0.0
# ---------------------------------------------------------------------------
# _store_result
# ---------------------------------------------------------------------------
class TestStoreResult:
def test_calls_store_memory(self):
mock_store = MagicMock()
with patch("timmy.research.store_memory", mock_store):
from timmy.research import _store_result
_store_result("test topic", "# Report\n\nContent here.")
mock_store.assert_called_once()
call_kwargs = mock_store.call_args
assert "test topic" in str(call_kwargs)
def test_degrades_gracefully_on_error(self):
mock_store = MagicMock(side_effect=RuntimeError("db error"))
with patch("timmy.research.store_memory", mock_store):
from timmy.research import _store_result
# Should not raise
_store_result("topic", "report")
# ---------------------------------------------------------------------------
# _save_to_disk
# ---------------------------------------------------------------------------
class TestSaveToDisk:
def test_writes_file(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._DOCS_ROOT", tmp_path / "research")
from timmy.research import _save_to_disk
path = _save_to_disk("Test Topic: PDF Parsing", "# Test Report")
assert path is not None
assert path.exists()
assert path.read_text() == "# Test Report"
def test_slugifies_topic_name(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._DOCS_ROOT", tmp_path / "research")
from timmy.research import _save_to_disk
path = _save_to_disk("My Complex Topic! v2.0", "content")
assert path is not None
# Should be slugified: no special chars
assert " " not in path.name
assert "!" not in path.name
def test_returns_none_on_error(self, monkeypatch):
monkeypatch.setattr(
"timmy.research._DOCS_ROOT",
Path("/nonexistent_root/deeply/nested"),
)
with patch("pathlib.Path.mkdir", side_effect=PermissionError("denied")):
from timmy.research import _save_to_disk
result = _save_to_disk("topic", "report")
assert result is None
# ---------------------------------------------------------------------------
# run_research — end-to-end with mocks
# ---------------------------------------------------------------------------
class TestRunResearch:
@pytest.mark.asyncio
async def test_returns_cached_result_when_cache_hit(self):
cached_report = "# Cached Report\n\nPreviously computed."
with (
patch("timmy.research._check_cache", return_value=(cached_report, 0.93)),
):
from timmy.research import run_research
result = await run_research("some topic")
assert result.cached is True
assert result.cache_similarity == pytest.approx(0.93)
assert result.report == cached_report
assert result.synthesis_backend == "cache"
@pytest.mark.asyncio
async def test_skips_cache_when_requested(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=("cached", 0.99)) as mock_cache,
patch(
"timmy.research._formulate_queries",
new=AsyncMock(return_value=["q1"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
new=AsyncMock(return_value=("# Fresh report", "ollama")),
),
patch("timmy.research._store_result"),
):
from timmy.research import run_research
result = await run_research("topic", skip_cache=True)
mock_cache.assert_not_called()
assert result.cached is False
assert result.report == "# Fresh report"
@pytest.mark.asyncio
async def test_full_pipeline_no_search_results(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
new=AsyncMock(return_value=["query 1", "query 2"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
new=AsyncMock(return_value=("# Report", "ollama")),
),
patch("timmy.research._store_result"),
):
from timmy.research import run_research
result = await run_research("a new topic")
assert not result.cached
assert result.query_count == 2
assert result.sources_fetched == 0
assert result.report == "# Report"
assert result.synthesis_backend == "ollama"
@pytest.mark.asyncio
async def test_returns_result_with_error_on_bad_template(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
new=AsyncMock(return_value=["q1"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
new=AsyncMock(return_value=("# Report", "ollama")),
),
patch("timmy.research._store_result"),
):
from timmy.research import run_research
result = await run_research("topic", template="nonexistent_template")
assert len(result.errors) == 1
assert "nonexistent_template" in result.errors[0]
@pytest.mark.asyncio
async def test_saves_to_disk_when_requested(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
monkeypatch.setattr("timmy.research._DOCS_ROOT", tmp_path / "research")
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
new=AsyncMock(return_value=["q1"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
new=AsyncMock(return_value=("# Saved Report", "ollama")),
),
patch("timmy.research._store_result"),
):
from timmy.research import run_research
result = await run_research("disk topic", save_to_disk=True)
assert result.report == "# Saved Report"
saved_files = list((tmp_path / "research").glob("*.md"))
assert len(saved_files) == 1
assert saved_files[0].read_text() == "# Saved Report"
@pytest.mark.asyncio
async def test_result_is_not_empty_after_synthesis(self, tmp_path, monkeypatch):
monkeypatch.setattr("timmy.research._SKILLS_ROOT", tmp_path)
with (
patch("timmy.research._check_cache", return_value=(None, 0.0)),
patch(
"timmy.research._formulate_queries",
new=AsyncMock(return_value=["q"]),
),
patch("timmy.research._execute_search", new=AsyncMock(return_value=[])),
patch("timmy.research._fetch_pages", new=AsyncMock(return_value=[])),
patch(
"timmy.research._synthesize",
new=AsyncMock(return_value=("# Non-empty", "ollama")),
),
patch("timmy.research._store_result"),
):
from timmy.research import run_research
result = await run_research("topic")
assert not result.is_empty()
# ---------------------------------------------------------------------------
# ResearchResult
# ---------------------------------------------------------------------------
class TestResearchResult:
def test_is_empty_when_no_report(self):
from timmy.research import ResearchResult
r = ResearchResult(topic="t", query_count=0, sources_fetched=0, report="")
assert r.is_empty()
def test_is_not_empty_with_content(self):
from timmy.research import ResearchResult
r = ResearchResult(topic="t", query_count=1, sources_fetched=1, report="# Report")
assert not r.is_empty()
def test_default_cached_false(self):
from timmy.research import ResearchResult
r = ResearchResult(topic="t", query_count=0, sources_fetched=0, report="x")
assert r.cached is False
def test_errors_defaults_to_empty_list(self):
from timmy.research import ResearchResult
r = ResearchResult(topic="t", query_count=0, sources_fetched=0, report="x")
assert r.errors == []

View File

@@ -0,0 +1,270 @@
"""Tests for Daily Run orchestrator — health snapshot integration.
Verifies that the orchestrator runs a pre-flight health snapshot before
any coding work begins, and aborts on red status unless --force is passed.
Refs: #923
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Add timmy_automations to path for imports
_TA_PATH = Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run"
if str(_TA_PATH) not in sys.path:
sys.path.insert(0, str(_TA_PATH))
# Also add utils path
_TA_UTILS = Path(__file__).resolve().parent.parent.parent / "timmy_automations"
if str(_TA_UTILS) not in sys.path:
sys.path.insert(0, str(_TA_UTILS))
import health_snapshot as hs
import orchestrator as orch
def _make_snapshot(overall_status: str) -> hs.HealthSnapshot:
"""Build a minimal HealthSnapshot for testing."""
return hs.HealthSnapshot(
timestamp="2026-01-01T00:00:00+00:00",
overall_status=overall_status,
ci=hs.CISignal(status="pass", message="CI passing"),
issues=hs.IssueSignal(count=0, p0_count=0, p1_count=0),
flakiness=hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
),
tokens=hs.TokenEconomySignal(status="balanced", message="Balanced"),
)
def _make_red_snapshot() -> hs.HealthSnapshot:
return hs.HealthSnapshot(
timestamp="2026-01-01T00:00:00+00:00",
overall_status="red",
ci=hs.CISignal(status="fail", message="CI failed"),
issues=hs.IssueSignal(count=1, p0_count=1, p1_count=0),
flakiness=hs.FlakinessSignal(
status="critical",
recent_failures=8,
recent_cycles=10,
failure_rate=0.8,
message="High flakiness",
),
tokens=hs.TokenEconomySignal(status="unknown", message="No data"),
)
def _default_args(**overrides) -> argparse.Namespace:
"""Build an argparse Namespace with defaults matching the orchestrator flags."""
defaults = {
"review": False,
"json": False,
"max_items": None,
"skip_health_check": False,
"force": False,
}
defaults.update(overrides)
return argparse.Namespace(**defaults)
class TestRunHealthSnapshot:
"""Test run_health_snapshot() — the pre-flight check called by main()."""
def test_green_returns_zero(self, capsys):
"""Green snapshot returns 0 (proceed)."""
args = _default_args()
with patch.object(orch, "_generate_health_snapshot", return_value=_make_snapshot("green")):
rc = orch.run_health_snapshot(args)
assert rc == 0
def test_yellow_returns_zero(self, capsys):
"""Yellow snapshot returns 0 (proceed with caution)."""
args = _default_args()
with patch.object(orch, "_generate_health_snapshot", return_value=_make_snapshot("yellow")):
rc = orch.run_health_snapshot(args)
assert rc == 0
def test_red_returns_one(self, capsys):
"""Red snapshot returns 1 (abort)."""
args = _default_args()
with patch.object(orch, "_generate_health_snapshot", return_value=_make_red_snapshot()):
rc = orch.run_health_snapshot(args)
assert rc == 1
def test_red_with_force_returns_zero(self, capsys):
"""Red snapshot with --force returns 0 (proceed anyway)."""
args = _default_args(force=True)
with patch.object(orch, "_generate_health_snapshot", return_value=_make_red_snapshot()):
rc = orch.run_health_snapshot(args)
assert rc == 0
def test_snapshot_exception_is_skipped(self, capsys):
"""If health snapshot raises, it degrades gracefully and returns 0."""
args = _default_args()
with patch.object(orch, "_generate_health_snapshot", side_effect=RuntimeError("boom")):
rc = orch.run_health_snapshot(args)
assert rc == 0
captured = capsys.readouterr()
assert "warning" in captured.err.lower() or "skipping" in captured.err.lower()
def test_snapshot_prints_summary(self, capsys):
"""Health snapshot prints a pre-flight summary block."""
args = _default_args()
with patch.object(orch, "_generate_health_snapshot", return_value=_make_snapshot("green")):
orch.run_health_snapshot(args)
captured = capsys.readouterr()
assert "PRE-FLIGHT HEALTH CHECK" in captured.out
assert "CI" in captured.out
def test_red_prints_abort_message(self, capsys):
"""Red snapshot prints an abort message to stderr."""
args = _default_args()
with patch.object(orch, "_generate_health_snapshot", return_value=_make_red_snapshot()):
orch.run_health_snapshot(args)
captured = capsys.readouterr()
assert "RED" in captured.err or "aborting" in captured.err.lower()
def test_p0_issues_shown_in_output(self, capsys):
"""P0 issue count is shown in the pre-flight output."""
args = _default_args()
snapshot = hs.HealthSnapshot(
timestamp="2026-01-01T00:00:00+00:00",
overall_status="red",
ci=hs.CISignal(status="pass", message="CI passing"),
issues=hs.IssueSignal(count=2, p0_count=2, p1_count=0),
flakiness=hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
),
tokens=hs.TokenEconomySignal(status="balanced", message="Balanced"),
)
with patch.object(orch, "_generate_health_snapshot", return_value=snapshot):
orch.run_health_snapshot(args)
captured = capsys.readouterr()
assert "P0" in captured.out
class TestMainHealthCheckIntegration:
"""Test that main() runs health snapshot before any coding work."""
def _patch_gitea_unavailable(self):
return patch.object(orch.GiteaClient, "is_available", return_value=False)
def test_main_runs_health_check_before_gitea(self):
"""Health snapshot is called before Gitea client work."""
call_order = []
def fake_snapshot(*_a, **_kw):
call_order.append("health")
return _make_snapshot("green")
def fake_gitea_available(self):
call_order.append("gitea")
return False
args = _default_args()
with (
patch.object(orch, "_generate_health_snapshot", side_effect=fake_snapshot),
patch.object(orch.GiteaClient, "is_available", fake_gitea_available),
patch("sys.argv", ["orchestrator"]),
):
orch.main()
assert call_order.index("health") < call_order.index("gitea")
def test_main_aborts_on_red_before_gitea(self):
"""main() aborts with non-zero exit code when health is red."""
gitea_called = []
def fake_gitea_available(self):
gitea_called.append(True)
return True
with (
patch.object(orch, "_generate_health_snapshot", return_value=_make_red_snapshot()),
patch.object(orch.GiteaClient, "is_available", fake_gitea_available),
patch("sys.argv", ["orchestrator"]),
):
rc = orch.main()
assert rc != 0
assert not gitea_called, "Gitea should NOT be called when health is red"
def test_main_skips_health_check_with_flag(self):
"""--skip-health-check bypasses the pre-flight snapshot."""
health_called = []
def fake_snapshot(*_a, **_kw):
health_called.append(True)
return _make_snapshot("green")
with (
patch.object(orch, "_generate_health_snapshot", side_effect=fake_snapshot),
patch.object(orch.GiteaClient, "is_available", return_value=False),
patch("sys.argv", ["orchestrator", "--skip-health-check"]),
):
orch.main()
assert not health_called, "Health snapshot should be skipped"
def test_main_force_flag_continues_despite_red(self):
"""--force allows Daily Run to continue even when health is red."""
gitea_called = []
def fake_gitea_available(self):
gitea_called.append(True)
return False # Gitea unavailable → exits early but after health check
with (
patch.object(orch, "_generate_health_snapshot", return_value=_make_red_snapshot()),
patch.object(orch.GiteaClient, "is_available", fake_gitea_available),
patch("sys.argv", ["orchestrator", "--force"]),
):
orch.main()
# Gitea was reached despite red status because --force was passed
assert gitea_called
def test_main_json_output_on_red_includes_error(self, capsys):
"""JSON output includes error key when health is red."""
with (
patch.object(orch, "_generate_health_snapshot", return_value=_make_red_snapshot()),
patch.object(orch.GiteaClient, "is_available", return_value=True),
patch("sys.argv", ["orchestrator", "--json"]),
):
rc = orch.main()
assert rc != 0
captured = capsys.readouterr()
data = json.loads(captured.out)
assert "error" in data

View File

@@ -4,10 +4,13 @@
Connects to local Gitea, fetches candidate issues, and produces a concise agenda Connects to local Gitea, fetches candidate issues, and produces a concise agenda
plus a day summary (review mode). plus a day summary (review mode).
The Daily Run begins with a Quick Health Snapshot (#710) to ensure mandatory
systems are green before burning cycles on work that cannot land.
Run: python3 timmy_automations/daily_run/orchestrator.py [--review] Run: python3 timmy_automations/daily_run/orchestrator.py [--review]
Env: See timmy_automations/config/daily_run.json for configuration Env: See timmy_automations/config/daily_run.json for configuration
Refs: #703 Refs: #703, #923
""" """
from __future__ import annotations from __future__ import annotations
@@ -30,6 +33,11 @@ sys.path.insert(
) )
from utils.token_rules import TokenRules, compute_token_reward from utils.token_rules import TokenRules, compute_token_reward
# Health snapshot lives in the same package
from health_snapshot import generate_snapshot as _generate_health_snapshot
from health_snapshot import get_token as _hs_get_token
from health_snapshot import load_config as _hs_load_config
# ── Configuration ───────────────────────────────────────────────────────── # ── Configuration ─────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent.parent.parent REPO_ROOT = Path(__file__).resolve().parent.parent.parent
@@ -495,6 +503,16 @@ def parse_args() -> argparse.Namespace:
default=None, default=None,
help="Override max agenda items", help="Override max agenda items",
) )
p.add_argument(
"--skip-health-check",
action="store_true",
help="Skip the pre-flight health snapshot (not recommended)",
)
p.add_argument(
"--force",
action="store_true",
help="Continue even if health snapshot is red (overrides abort-on-red)",
)
return p.parse_args() return p.parse_args()
@@ -535,6 +553,76 @@ def compute_daily_run_tokens(success: bool = True) -> dict[str, Any]:
} }
def run_health_snapshot(args: argparse.Namespace) -> int:
"""Run pre-flight health snapshot and return 0 (ok) or 1 (abort).
Prints a concise summary of CI, issues, flakiness, and token economy.
Returns 1 if the overall status is red AND --force was not passed.
Returns 0 for green/yellow or when --force is active.
On any import/runtime error the check is skipped with a warning.
"""
try:
hs_config = _hs_load_config()
hs_token = _hs_get_token(hs_config)
snapshot = _generate_health_snapshot(hs_config, hs_token)
except Exception as exc: # noqa: BLE001
print(f"[health] Warning: health snapshot failed ({exc}) — skipping", file=sys.stderr)
return 0
# Print concise pre-flight header
status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get(
snapshot.overall_status, ""
)
print("" * 60)
print(f"PRE-FLIGHT HEALTH CHECK {status_emoji} {snapshot.overall_status.upper()}")
print("" * 60)
ci_emoji = {"pass": "", "fail": "", "unknown": "⚠️", "unavailable": ""}.get(
snapshot.ci.status, ""
)
print(f" {ci_emoji} CI: {snapshot.ci.message}")
if snapshot.issues.p0_count > 0:
issue_emoji = "🔴"
elif snapshot.issues.p1_count > 0:
issue_emoji = "🟡"
else:
issue_emoji = ""
critical_str = f"{snapshot.issues.count} critical"
if snapshot.issues.p0_count:
critical_str += f" (P0: {snapshot.issues.p0_count})"
if snapshot.issues.p1_count:
critical_str += f" (P1: {snapshot.issues.p1_count})"
print(f" {issue_emoji} Issues: {critical_str}")
flak_emoji = {"healthy": "", "degraded": "🟡", "critical": "🔴", "unknown": ""}.get(
snapshot.flakiness.status, ""
)
print(f" {flak_emoji} Flakiness: {snapshot.flakiness.message}")
token_emoji = {"balanced": "", "inflationary": "🟡", "deflationary": "🔵", "unknown": ""}.get(
snapshot.tokens.status, ""
)
print(f" {token_emoji} Tokens: {snapshot.tokens.message}")
print()
if snapshot.overall_status == "red" and not args.force:
print(
"🛑 Health status is RED — aborting Daily Run to avoid burning cycles.",
file=sys.stderr,
)
print(
" Fix the issues above or re-run with --force to override.",
file=sys.stderr,
)
return 1
if snapshot.overall_status == "red":
print("⚠️ Health is RED but --force passed — proceeding anyway.", file=sys.stderr)
return 0
def main() -> int: def main() -> int:
args = parse_args() args = parse_args()
config = load_config() config = load_config()
@@ -542,6 +630,15 @@ def main() -> int:
if args.max_items: if args.max_items:
config["max_agenda_items"] = args.max_items config["max_agenda_items"] = args.max_items
# ── Step 0: Pre-flight health snapshot ──────────────────────────────────
if not args.skip_health_check:
health_rc = run_health_snapshot(args)
if health_rc != 0:
tokens = compute_daily_run_tokens(success=False)
if args.json:
print(json.dumps({"error": "health_check_failed", "tokens": tokens}))
return health_rc
token = get_token(config) token = get_token(config)
client = GiteaClient(config, token) client = GiteaClient(config, token)

10
tox.ini
View File

@@ -41,8 +41,10 @@ description = Static type checking with mypy
commands_pre = commands_pre =
deps = deps =
mypy>=1.0.0 mypy>=1.0.0
types-PyYAML
types-requests
commands = commands =
mypy src --ignore-missing-imports --no-error-summary mypy src
# ── Test Environments ──────────────────────────────────────────────────────── # ── Test Environments ────────────────────────────────────────────────────────
@@ -130,13 +132,17 @@ commands =
# ── Pre-push (mirrors CI exactly) ──────────────────────────────────────────── # ── Pre-push (mirrors CI exactly) ────────────────────────────────────────────
[testenv:pre-push] [testenv:pre-push]
description = Local gate — lint + full CI suite (same as Gitea Actions) description = Local gate — lint + typecheck + full CI suite (same as Gitea Actions)
deps = deps =
ruff>=0.8.0 ruff>=0.8.0
mypy>=1.0.0
types-PyYAML
types-requests
commands = commands =
ruff check src/ tests/ ruff check src/ tests/
ruff format --check src/ tests/ ruff format --check src/ tests/
bash -c 'files=$(grep -rl "<style" src/dashboard/templates/ --include="*.html" 2>/dev/null); if [ -n "$files" ]; then echo "ERROR: inline <style> blocks found — move CSS to static/css/mission-control.css:"; echo "$files"; exit 1; fi; echo "No inline CSS — OK"' bash -c 'files=$(grep -rl "<style" src/dashboard/templates/ --include="*.html" 2>/dev/null); if [ -n "$files" ]; then echo "ERROR: inline <style> blocks found — move CSS to static/css/mission-control.css:"; echo "$files"; exit 1; fi; echo "No inline CSS — OK"'
mypy src
mkdir -p reports mkdir -p reports
pytest tests/ \ pytest tests/ \
--cov=src \ --cov=src \