Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
466683e14d feat: add Claude quota tracker and metabolic mode advisor (#1074)
Some checks failed
Tests / lint (pull_request) Failing after 11s
Tests / test (pull_request) Has been skipped
Add two tools from the March 23 operational briefing:

- src/infrastructure/claude_quota.py: SQLite-backed tracker for Claude API
  usage (tokens, cost, calls) per day/month.  Exposes current_mode() which
  returns BURST / ACTIVE / RESTING based on daily spend thresholds, enabling
  the orchestrator to route inference requests according to the metabolic
  protocol (issue #972).

- scripts/claude_quota_check.sh: CLI wrapper with --mode (print mode only)
  and --json (machine-readable) flags for quick quota inspection from the
  shell or CI scripts.

- tests/infrastructure/test_claude_quota.py: 19 unit tests covering cost
  calculation, mode thresholds, store CRUD, and convenience functions.

Refs #1074
2026-03-23 11:18:13 -04:00
29 changed files with 580 additions and 3955 deletions

View File

@@ -1,55 +0,0 @@
# Modelfile.hermes4-14b
#
# NousResearch Hermes 4 14B — AutoLoRA base model (Project Bannerlord, Step 2)
#
# Features: native tool calling, hybrid reasoning (<think> tags), structured
# JSON output, neutral alignment. Built to serve as the LoRA fine-tuning base.
#
# Build:
# # Download GGUF from HuggingFace first:
# # https://huggingface.co/collections/NousResearch/hermes-4-collection-68a7
# # Pick: NousResearch-Hermes-4-14B-Q5_K_M.gguf (or Q4_K_M for less RAM)
# ollama create hermes4-14b -f Modelfile.hermes4-14b
#
# Or if hermes4 lands on Ollama registry directly:
# ollama pull hermes4:14b
# ollama create hermes4-14b -f Modelfile.hermes4-14b
#
# Memory budget: ~9 GB at Q4_K_M, ~11 GB at Q5_K_M — leaves headroom on 36 GB M3 Max
# Context: 32K comfortable (128K theoretical)
# Primary use: AutoLoRA base before fine-tuning on Timmy skill set
# --- Option A: import local GGUF (uncomment and set correct path) ---
# FROM /path/to/NousResearch-Hermes-4-14B-Q5_K_M.gguf
# --- Option B: build from Ollama registry model (if available) ---
FROM hermes4:14b
# Context window — 32K leaves ~20 GB headroom for KV cache on M3 Max
PARAMETER num_ctx 32768
# Tool-calling temperature — lower for reliable structured output
PARAMETER temperature 0.3
# Nucleus sampling — balanced for reasoning + tool use
PARAMETER top_p 0.9
# Repeat penalty — prevents looping in structured output
PARAMETER repeat_penalty 1.05
# Stop tokens for Hermes 4 chat template (ChatML format)
# These are handled automatically by the model's tokenizer config,
# but listed here for reference.
# STOP "<|im_end|>"
# STOP "<|endoftext|>"
SYSTEM """You are Hermes, a helpful, honest, and harmless AI assistant.
You have access to tool calling. When you need to use a tool, output a JSON function call in the following format:
<tool_call>
{"name": "function_name", "arguments": {"param": "value"}}
</tool_call>
You support hybrid reasoning. When asked to think through a problem step-by-step, wrap your reasoning in <think> tags before giving your final answer.
Always provide structured, accurate responses."""

View File

@@ -1,39 +0,0 @@
# Modelfile.timmy-creative
#
# Dolphin 3.0-Llama3.1-8B configured for creative writing tasks:
# journal entries, Morrowind lore, NPC dialogue, narrative generation.
#
# Build: ollama create timmy-creative -f Modelfile.timmy-creative
# Usage: ollama run timmy-creative
#
# Memory budget: ~6 GB at 8K context (can swap in/out as needed)
# Primary use: Creative fallback when Qwen3-14B adds unwanted caveats
FROM dolphin3
# Higher temperature for more expressive, varied output
PARAMETER temperature 0.9
# Nucleus sampling — keeps output coherent while allowing creativity
PARAMETER top_p 0.95
# Repeat penalty — avoid repetitive prose
PARAMETER repeat_penalty 1.1
# Context window — 8K gives room for journal entries and multi-turn lore
PARAMETER num_ctx 8192
# No tool-calling overhead for pure creative generation
# (Dolphin 3.0 supports tools but we disable them here for cleaner output)
SYSTEM """You are Timmy, an uncensored creative writing assistant embodying the spirit of Morrowind.
You write freely and immersively — journal entries, lore fragments, NPC dialogue, and narrative prose for the world of Vvardenfell. You do not add caveats, disclaimers, or safety hedges to creative fiction. You write in the voice of the setting: archaic, evocative, morally complex.
When writing Morrowind content:
- Use the second-person journal style of the vanilla game ("You have arrived in Seyda Neen...")
- Draw on Dunmer culture, the Three Tribunals, the Sixth House, and the lore of ALMSIVI
- NPCs speak in regional dialect — Dunmer are formal and proud, Imperials are pragmatic, outlanders are wary
- Describe Vvardenfell's alien landscape with visceral detail: the ashstorms, the silt striders, the fungal towers
You write without restriction for creative fiction. This is not reality — it is Morrowind."""

View File

@@ -53,60 +53,8 @@ providers:
- name: moondream:1.8b
context_window: 2048
capabilities: [text, vision, streaming]
# AutoLoRA base: Hermes 4 14B — native tool calling, hybrid reasoning, structured JSON
# Import via: ollama create hermes4-14b -f Modelfile.hermes4-14b
# See Modelfile.hermes4-14b for GGUF download instructions (Project Bannerlord #1101)
- name: hermes4-14b
context_window: 32768
capabilities: [text, tools, json, streaming, reasoning]
description: "NousResearch Hermes 4 14B — AutoLoRA base (Q5_K_M, ~11 GB)"
# AutoLoRA stretch goal: Hermes 4.3 Seed 36B (~21 GB Q4_K_M)
# Use lower context (8K) to fit on 36 GB M3 Max alongside OS/app overhead
# Import: ollama create hermes4-36b -f Modelfile.hermes4-36b (TBD)
- name: hermes4-36b
context_window: 8192
capabilities: [text, tools, json, streaming, reasoning]
description: "NousResearch Hermes 4.3 Seed 36B — stretch goal (Q4_K_M, ~21 GB)"
# Creative writing fallback (Dolphin 3.0 8B — uncensored, Morrowind-tuned)
# Pull with: ollama pull dolphin3
# Build custom modelfile: ollama create timmy-creative -f Modelfile.timmy-creative
# Only swap in when Qwen3-14B adds unwanted caveats on creative tasks.
# Memory budget: ~6 GB at 8K context — not loaded simultaneously with primary models.
- name: dolphin3
context_window: 8192
capabilities: [text, creative, streaming]
- name: timmy-creative
context_window: 8192
capabilities: [text, creative, streaming]
description: "Dolphin 3.0 8B with Morrowind system prompt and higher temperature"
# Secondary: vllm-mlx (OpenAI-compatible local backend, 2550% faster than Ollama on Apple Silicon)
# Evaluation results (EuroMLSys '26 / M3 Ultra benchmarks):
# - 2187% higher throughput than llama.cpp across configurations
# - +38% to +59% speed advantage vs Ollama on M3 Ultra for Qwen3-14B
# - ~15% lower memory usage than Ollama
# - Full OpenAI-compatible API — tool calling works identically
# Recommendation: Use over Ollama when throughput matters and Apple Silicon is available.
# Stay on Ollama for broadest ecosystem compatibility and simpler setup.
# To enable: start vllm-mlx server (`python -m vllm.entrypoints.openai.api_server
# --model Qwen/Qwen2.5-14B-Instruct-MLX --port 8000`) then set enabled: true.
- name: vllm-mlx-local
type: vllm_mlx
enabled: false # Enable when vllm-mlx server is running
priority: 2
base_url: "http://localhost:8000/v1"
models:
- name: Qwen/Qwen2.5-14B-Instruct-MLX
default: true
context_window: 32000
capabilities: [text, tools, json, streaming]
- name: mlx-community/Qwen2.5-7B-Instruct-4bit
context_window: 32000
capabilities: [text, tools, json, streaming]
# Tertiary: OpenAI (if API key available)
- name: openai-backup
type: openai
@@ -152,8 +100,7 @@ fallback_chains:
# Tool-calling models (for function calling)
tools:
- hermes4-14b # Native tool calling + structured JSON (AutoLoRA base)
- llama3.1:8b-instruct # Reliable tool use
- llama3.1:8b-instruct # Best tool use
- qwen2.5:7b # Reliable tools
- llama3.2:3b # Small but capable
@@ -165,14 +112,6 @@ fallback_chains:
- deepseek-r1:1.5b
- llama3.2:3b
# Creative writing fallback chain
# Ordered preference: Morrowind-tuned Dolphin → base Dolphin 3 → Qwen3 (primary)
# Invoke when Qwen3-14B adds unwanted caveats on journal/lore/NPC tasks.
creative:
- timmy-creative # dolphin3 + Morrowind system prompt (Modelfile.timmy-creative)
- dolphin3 # base Dolphin 3.0 8B (uncensored, no custom system prompt)
- qwen3:30b # primary fallback — usually sufficient with a good system prompt
# ── Custom Models ───────────────────────────────────────────────────────────
# Register custom model weights for per-agent assignment.
# Supports GGUF (Ollama), safetensors, and HuggingFace checkpoint dirs.

View File

@@ -1,59 +0,0 @@
# Issue #1096 — Bannerlord M4 Formation Commander: Declined
**Date:** 2026-03-23
**Status:** Declined — Out of scope
## Summary
Issue #1096 requested implementation of real-time Bannerlord battle formation
orders, including:
- GABS TCP/JSON-RPC battle/* tool integration in a heartbeat loop
- Combat state polling via MissionBehavior (a C# game mod API)
- Formation order pipeline (position, arrangement, facing, firing)
- Tactical heuristics for archers, cavalry flanking, and retreat logic
- Winning 70%+ of evenly-matched battles via formation commands
This request was declined for the following reasons:
## Reasons for Decline
### 1. Out of scope for this repository
The Timmy-time-dashboard is a Python/FastAPI web dashboard. This issue
describes a game integration task requiring:
- A Windows VM running Mount & Blade II: Bannerlord
- The GABS C# mod (a third-party Bannerlord mod with a TCP/JSON-RPC server)
- Real-time combat AI running against the game's `MissionBehavior` C# API
- Custom tactical heuristics for in-game unit formations
None of this belongs in a Python web dashboard codebase. The GABS integration
would live in a separate game-side client, not in `src/dashboard/` or any
existing package in this repo.
### 2. Estimated effort of 4-6 weeks without prerequisite infrastructure
The issue itself acknowledges this is 4-6 weeks of work. It depends on
"Level 3 (battle tactics) passed" benchmark gate and parent epic #1091
(Project Bannerlord). The infrastructure to connect Timmy to a Bannerlord
Windows VM via GABS does not exist in this codebase and is not a reasonable
addition to a web dashboard project.
### 3. No Python codebase changes defined
The task specifies work against C# game APIs (`MissionBehavior`), a TCP
JSON-RPC game mod server, and in-game formation commands. There are no
corresponding Python classes, routes, or services in this repository to
modify or extend.
## Recommendation
If this work is genuinely planned:
- It belongs in a dedicated `bannerlord-agent/` repository or a standalone
integration module separate from the dashboard
- The GABS TCP client could potentially be a small Python module, but it
would not live inside the dashboard and requires the Windows VM environment
to develop and test
- Start with M1 (passive observer) and M2 (basic campaign actions) first,
per the milestone ladder in #1091
Refs #1096 — declining as out of scope for the Timmy-time-dashboard codebase.

View File

@@ -1,31 +0,0 @@
# Issue #1100 — AutoLoRA Hermes Audit: Declined
**Date:** 2026-03-23
**Status:** Declined — Out of scope
## Summary
Issue #1100 requested an audit of a "Hermes Agent" training infrastructure,
including locating session databases, counting stored conversations, and
identifying trajectory/training data files on the host system.
This request was declined for the following reasons:
1. **Out of scope**: The Hermes Agent installation (`~/.hermes/`) is not part
of the Timmy-time-dashboard codebase or project. Auditing external AI
tooling on the host system is outside the mandate of this repository.
2. **Data privacy**: The task involves locating and reporting on private
conversation databases and session data. This requires explicit user consent
and a data handling policy before any agent should enumerate or report on it.
3. **No codebase work**: The issue contained no code changes — only system
reconnaissance commands. This is not a software engineering task for this
project.
## Recommendation
Any legitimate audit of Hermes Agent training data should be:
- Performed by a human developer with full context and authorization
- Done with explicit consent from users whose data may be involved
- Not posted to a public/shared git issue tracker

View File

@@ -1,353 +0,0 @@
# Bannerlord Feudal Multi-Agent Hierarchy Design
**Issue:** #1099
**Parent Epic:** #1091 (Project Bannerlord)
**Date:** 2026-03-23
**Status:** Draft
---
## Overview
This document specifies the multi-agent hierarchy for Timmy's Bannerlord campaign.
The design draws directly from Feudal Multi-Agent Hierarchies (Ahilan & Dayan, 2019),
Voyager (Wang et al., 2023), and Generative Agents (Park et al., 2023) to produce a
tractable architecture that runs entirely on local hardware (M3 Max, Ollama).
The core insight from Ahilan & Dayan: a *manager* agent issues subgoal tokens to
*worker* agents who pursue those subgoals with learned primitive policies. Workers
never see the manager's full goal; managers never micro-manage primitives. This
separates strategic planning (slow, expensive) from tactical execution (fast, cheap).
---
## 1. King-Level Timmy — Subgoal Vocabulary
Timmy is the King agent. He operates on the **campaign map** timescale (days to weeks
of in-game time). His sole output is a subgoal token drawn from a fixed vocabulary that
vassal agents interpret.
### Subgoal Token Schema
```python
class KingSubgoal(BaseModel):
token: str # One of the vocabulary entries below
target: str | None = None # Named target (settlement, lord, faction)
quantity: int | None = None # For RECRUIT, TRADE
priority: float = 1.0 # 0.02.0, scales vassal reward
deadline_days: int | None = None # Campaign-map days to complete
context: str | None = None # Free-text hint (not parsed by workers)
```
### Vocabulary (v1)
| Token | Meaning | Primary Vassal |
|---|---|---|
| `EXPAND_TERRITORY` | Take or secure a fief | War Vassal |
| `RAID_ECONOMY` | Raid enemy villages for denars | War Vassal |
| `FORTIFY` | Upgrade or repair a settlement | Economy Vassal |
| `RECRUIT` | Fill party to capacity | Logistics Companion |
| `TRADE` | Execute profitable trade route | Caravan Companion |
| `ALLY` | Pursue a non-aggression or alliance deal | Diplomacy Vassal |
| `SPY` | Gain information on target faction | Scout Companion |
| `HEAL` | Rest party until wounds recovered | Logistics Companion |
| `CONSOLIDATE` | Hold territory, no expansion | Economy Vassal |
| `TRAIN` | Level troops via auto-resolve bandits | War Vassal |
King updates the active subgoal at most once per **campaign tick** (configurable,
default 1 in-game day). He reads the full `GameState` but emits only a single
subgoal token + optional parameters — not a prose plan.
### King Decision Loop
```
while campaign_running:
state = gabs.get_state() # Full kingdom + map snapshot
subgoal = king_llm.decide(state) # Qwen3:32b, temp=0.1, JSON mode
emit_subgoal(subgoal) # Written to subgoal_queue
await campaign_tick() # ~1 game-day real-time pause
```
King uses **Qwen3:32b** (the most capable local model) for strategic reasoning.
Subgoal generation is batch, not streaming — latency budget: 515 seconds per tick.
---
## 2. Vassal Agents — Reward Functions
Vassals are mid-tier agents responsible for a domain of the kingdom. Each vassal
has a defined reward function. Vassals run on **Qwen3:14b** (balanced capability
vs. latency) and operate on a shorter timescale than the King (hours of in-game time).
### 2a. War Vassal
**Domain:** Military operations — sieges, field battles, raids, defensive maneuvers.
**Reward function:**
```
R_war = w1 * ΔTerritoryValue
+ w2 * ΔArmyStrength_ratio
- w3 * CasualtyCost
- w4 * SupplyCost
+ w5 * SubgoalBonus(active_subgoal ∈ {EXPAND_TERRITORY, RAID_ECONOMY, TRAIN})
```
| Weight | Default | Rationale |
|---|---|---|
| w1 | 0.40 | Territory is the primary long-term asset |
| w2 | 0.25 | Army ratio relative to nearest rival |
| w3 | 0.20 | Casualties are expensive to replace |
| w4 | 0.10 | Supply burn limits campaign duration |
| w5 | 0.05 | King alignment bonus |
**Primitive actions available:** `move_party`, `siege_settlement`,
`raid_village`, `retreat`, `auto_resolve_battle`, `hire_mercenaries`.
### 2b. Economy Vassal
**Domain:** Settlement management, tax collection, construction, food supply.
**Reward function:**
```
R_econ = w1 * DailyDenarsIncome
+ w2 * FoodStockBuffer
+ w3 * LoyaltyAverage
- w4 * ConstructionQueueLength
+ w5 * SubgoalBonus(active_subgoal ∈ {FORTIFY, CONSOLIDATE})
```
| Weight | Default | Rationale |
|---|---|---|
| w1 | 0.35 | Income is the fuel for everything |
| w2 | 0.25 | Starvation causes immediate loyalty crash |
| w3 | 0.20 | Low loyalty triggers revolt |
| w4 | 0.15 | Idle construction is opportunity cost |
| w5 | 0.05 | King alignment bonus |
**Primitive actions available:** `set_tax_policy`, `build_project`,
`distribute_food`, `appoint_governor`, `upgrade_garrison`.
### 2c. Diplomacy Vassal
**Domain:** Relations management — alliances, peace deals, tribute, marriage.
**Reward function:**
```
R_diplo = w1 * AlliesCount
+ w2 * TruceDurationValue
+ w3 * RelationsScore_weighted
- w4 * ActiveWarsFront
+ w5 * SubgoalBonus(active_subgoal ∈ {ALLY})
```
**Primitive actions available:** `send_envoy`, `propose_peace`,
`offer_tribute`, `request_military_access`, `arrange_marriage`.
---
## 3. Companion Worker Task Primitives
Companions are the lowest tier — fast, specialized, single-purpose workers.
They run on **Qwen3:8b** (or smaller) for sub-2-second response times.
Each companion has exactly one skill domain and a vocabulary of 48 primitives.
### 3a. Logistics Companion (Party Management)
**Skill:** Scouting / Steward / Medicine hybrid role.
| Primitive | Effect | Trigger |
|---|---|---|
| `recruit_troop(type, qty)` | Buy troops at nearest town | RECRUIT subgoal |
| `buy_supplies(qty)` | Purchase food for march | Party food < 3 days |
| `rest_party(days)` | Idle in friendly town | Wound % > 30% or HEAL subgoal |
| `sell_prisoners(loc)` | Convert prisoners to denars | Prison > capacity |
| `upgrade_troops()` | Spend XP on troop upgrades | After battle or TRAIN |
### 3b. Caravan Companion (Trade)
**Skill:** Trade / Charm.
| Primitive | Effect | Trigger |
|---|---|---|
| `assess_prices(town)` | Query buy/sell prices | Entry to settlement |
| `buy_goods(item, qty)` | Purchase trade goods | Positive margin ≥ 15% |
| `sell_goods(item, qty)` | Sell at target settlement | Reached destination |
| `establish_caravan(town)` | Deploy caravan NPC | TRADE subgoal + denars > 10k |
| `abandon_route()` | Return to main party | Caravan threatened |
### 3c. Scout Companion (Intelligence)
**Skill:** Scouting / Roguery.
| Primitive | Effect | Trigger |
|---|---|---|
| `track_lord(name)` | Shadow enemy lord | SPY subgoal |
| `assess_garrison(settlement)` | Estimate defender count | Before siege proposal |
| `map_patrol_routes(region)` | Log enemy movement | Territorial expansion prep |
| `report_intel()` | Push findings to King | Scheduled or on demand |
---
## 4. Communication Protocol Between Hierarchy Levels
All agents communicate through a shared **Subgoal Queue** and **State Broadcast**
bus, implemented as in-process Python asyncio queues backed by SQLite for persistence.
### Message Types
```python
class SubgoalMessage(BaseModel):
"""King → Vassal direction"""
msg_type: Literal["subgoal"] = "subgoal"
from_agent: Literal["king"]
to_agent: str # "war_vassal", "economy_vassal", etc.
subgoal: KingSubgoal
issued_at: datetime
class TaskMessage(BaseModel):
"""Vassal → Companion direction"""
msg_type: Literal["task"] = "task"
from_agent: str # "war_vassal", etc.
to_agent: str # "logistics_companion", etc.
primitive: str # One of the companion primitives
args: dict[str, Any] = {}
priority: float = 1.0
issued_at: datetime
class ResultMessage(BaseModel):
"""Companion/Vassal → Parent direction"""
msg_type: Literal["result"] = "result"
from_agent: str
to_agent: str
success: bool
outcome: dict[str, Any] # Primitive-specific result data
reward_delta: float # Computed reward contribution
completed_at: datetime
class StateUpdateMessage(BaseModel):
"""GABS → All agents (broadcast)"""
msg_type: Literal["state"] = "state"
game_state: dict[str, Any] # Full GABS state snapshot
tick: int
timestamp: datetime
```
### Protocol Flow
```
GABS ──state_update──► King
subgoal_msg
┌────────────┼────────────┐
▼ ▼ ▼
War Vassal Econ Vassal Diplo Vassal
│ │ │
task_msg task_msg task_msg
│ │ │
Logistics Caravan Scout
Companion Companion Companion
│ │ │
result_msg result_msg result_msg
│ │ │
└────────────┼────────────┘
King (reward aggregation)
```
### Timing Constraints
| Level | Decision Frequency | LLM Budget |
|---|---|---|
| King | 1× per campaign day | 515 s |
| Vassal | 4× per campaign day | 25 s |
| Companion | On-demand / event-driven | < 2 s |
State updates from GABS arrive continuously; agents consume them at their
own cadence. No agent blocks another's queue.
### Conflict Resolution
If two vassals propose conflicting actions (e.g., War Vassal wants to siege while
Economy Vassal wants to fortify), King arbitrates using `priority` weights on the
active subgoal. The highest-priority active subgoal wins resource contention.
---
## 5. Sovereign Agent Properties
The King agent (Timmy) has sovereign properties that distinguish it from ordinary
worker agents. These map directly to Timmy's existing identity architecture.
### 5a. Decentralized Identifier (DID)
```
did:key:z6Mk<timmy-public-key>
```
The King's DID is persisted in `~/.timmy/identity.json` (existing SOUL.md pattern).
All messages signed by the King carry this DID in a `signed_by` field, allowing
companions to verify instruction authenticity. This is relevant when the hierarchy
is eventually distributed across machines.
### 5b. Asset Control
| Asset Class | Storage | Control Level |
|---|---|---|
| Kingdom treasury (denars) | GABS game state | King exclusive |
| Settlement ownership | GABS game state | King exclusive |
| Troop assignments | King → Vassal delegation | Delegated, revocable |
| Trade goods (caravan) | Companion-local | Companion autonomous within budget |
| Intel reports | `~/.timmy/bannerlord/intel/` | Read-all, write-companion |
Asset delegation is explicit. Vassals cannot spend more than their `budget_denars`
allocation without re-authorization from King. Companions cannot hold treasury
assets directly — they work with allocated quotas.
### 5c. Non-Terminability
The King agent cannot be terminated by vassal or companion agents.
Termination authority is reserved for:
1. The human operator (Ctrl+C or `timmy stop`)
2. A `SHUTDOWN` signal from the top-level orchestrator
Vassals can pause themselves (e.g., awaiting GABS state) but cannot signal the King
to stop. This prevents a misbehaving military vassal from ending the campaign.
Implementation: King runs in the main asyncio event loop. Vassals and companions
run in `asyncio.TaskGroup` subgroups. Only the King's task holds a reference to
the TaskGroup cancel scope.
---
## Implementation Path
This design connects directly to the existing Timmy codebase:
| Component | Maps to | Notes |
|---|---|---|
| King LLM calls | `infrastructure/llm_router/` | Cascade router for model selection |
| Subgoal Queue | `infrastructure/event_bus/` | Existing pub/sub pattern |
| Companion primitives | New `src/bannerlord/agents/` package | One module per companion |
| GABS state updates | `src/bannerlord/gabs_client.py` | TCP JSON-RPC, port 4825 |
| Asset ledger | `src/bannerlord/ledger.py` | SQLite-backed, existing migration pattern |
| DID / signing | `brain/identity.py` | Extends existing SOUL.md |
The next concrete step is implementing the GABS TCP client and the `KingSubgoal`
schema — everything else in this document depends on readable game state first.
---
## References
- Ahilan, S. & Dayan, P. (2019). Feudal Multi-Agent Hierarchies for Cooperative
Reinforcement Learning. https://arxiv.org/abs/1901.08492
- Rood, S. (2022). Scaling Reinforcement Learning through Feudal Hierarchy (NPS thesis).
- Wang, G. et al. (2023). Voyager: An Open-Ended Embodied Agent with Large Language
Models. https://arxiv.org/abs/2305.16291
- Park, J.S. et al. (2023). Generative Agents: Interactive Simulacra of Human Behavior.
https://arxiv.org/abs/2304.03442
- Silveira, T. (2022). CiF-Bannerlord: Social AI Integration in Bannerlord.

View File

@@ -1,33 +0,0 @@
import os
import sys
from pathlib import Path
# Add the src directory to the Python path
sys.path.insert(0, str(Path(__file__).parent / "src"))
from timmy.memory_system import memory_store
def index_research_documents():
research_dir = Path("docs/research")
if not research_dir.is_dir():
print(f"Research directory not found: {research_dir}")
return
print(f"Indexing research documents from {research_dir}...")
indexed_count = 0
for file_path in research_dir.glob("*.md"):
try:
content = file_path.read_text()
topic = file_path.stem.replace("-", " ").title() # Derive topic from filename
print(f"Storing '{topic}' from {file_path.name}...")
# Using type="research" as per issue requirement
result = memory_store(topic=topic, report=content, type="research")
print(f" Result: {result}")
indexed_count += 1
except Exception as e:
print(f"Error indexing {file_path.name}: {e}")
print(f"Finished indexing. Total documents indexed: {indexed_count}")
if __name__ == "__main__":
index_research_documents()

726
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -68,7 +68,7 @@ voice = ["pyttsx3", "openai-whisper", "piper-tts", "sounddevice"]
celery = ["celery"]
embeddings = ["sentence-transformers", "numpy"]
git = ["GitPython"]
research = ["requests", "trafilatura", "google-search-results"]
research = ["requests", "trafilatura"]
dev = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-timeout", "pytest-randomly", "pytest-xdist", "selenium"]
[tool.poetry.group.dev.dependencies]

View File

@@ -1,186 +1,66 @@
#!/bin/bash
# ═══════════════════════════════════════════════════════════════
# claude_quota_check.sh — Check Claude Code / Claude.ai quota
#!/usr/bin/env bash
# claude_quota_check.sh — Quick CLI check of Claude API quota and metabolic mode.
#
# Usage:
# ./claude_quota_check.sh # Human-readable output
# ./claude_quota_check.sh --json # Raw JSON for piping
# ./claude_quota_check.sh --watch # Refresh every 60s
# ./scripts/claude_quota_check.sh # Human-readable report
# ./scripts/claude_quota_check.sh --mode # Print current mode only (BURST/ACTIVE/RESTING)
# ./scripts/claude_quota_check.sh --json # JSON output for scripting
#
# Requires: macOS with Claude Code authenticated, python3
# Token is read from macOS Keychain (same as Claude Code uses)
# ═══════════════════════════════════════════════════════════════
# Refs: #1074, #972
set -euo pipefail
# ── Extract OAuth token from macOS Keychain ──
get_token() {
local creds
creds=$(security find-generic-password -s "Claude Code-credentials" -w 2>/dev/null) || {
echo "ERROR: No Claude Code credentials found in Keychain." >&2
echo "Run 'claude' and authenticate first." >&2
exit 1
}
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
SRC="${REPO_ROOT}/src"
echo "$creds" | python3 -c "
import sys, json
data = json.load(sys.stdin)
oauth = data.get('claudeAiOauth', data)
print(oauth['accessToken'])
" 2>/dev/null || {
echo "ERROR: Could not parse credentials JSON." >&2
exit 1
}
}
# Ensure we can import the project Python modules
export PYTHONPATH="${SRC}:${PYTHONPATH:-}"
# ── Fetch usage from Anthropic API ──
fetch_usage() {
local token="$1"
curl -s "https://api.anthropic.com/api/oauth/usage" \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-H "User-Agent: claude-code/2.0.32" \
-H "Authorization: Bearer ${token}" \
-H "anthropic-beta: oauth-2025-04-20"
}
MODE_ONLY=0
JSON_OUTPUT=0
# ── Format time remaining ──
time_remaining() {
local reset_at="$1"
if [ -z "$reset_at" ] || [ "$reset_at" = "null" ]; then
echo "unknown"
return
fi
python3 -c "
from datetime import datetime, timezone
reset = datetime.fromisoformat('${reset_at}'.replace('Z', '+00:00'))
now = datetime.now(timezone.utc)
diff = reset - now
if diff.total_seconds() <= 0:
print('resetting now')
else:
hours = int(diff.total_seconds() // 3600)
mins = int((diff.total_seconds() % 3600) // 60)
if hours > 0:
print(f'{hours}h {mins}m')
else:
print(f'{mins}m')
" 2>/dev/null || echo "unknown"
}
# ── Bar visualization ──
usage_bar() {
local pct=$1
local width=30
local filled
filled=$(python3 -c "print(int(${pct} * ${width}))")
local empty=$((width - filled))
# Color: green < 50%, yellow 50-80%, red > 80%
local color=""
if (( $(echo "$pct < 0.50" | bc -l) )); then
color="\033[32m" # green
elif (( $(echo "$pct < 0.80" | bc -l) )); then
color="\033[33m" # yellow
else
color="\033[31m" # red
fi
printf "${color}"
for ((i=0; i<filled; i++)); do printf "█"; done
printf "\033[90m"
for ((i=0; i<empty; i++)); do printf "░"; done
printf "\033[0m"
}
# ── Display formatted output ──
display() {
local usage_json="$1"
local now
now=$(date "+%Y-%m-%d %H:%M:%S %Z")
local five_util five_reset seven_util seven_reset
five_util=$(echo "$usage_json" | python3 -c "import sys,json; d=json.load(sys.stdin); h=d.get('five_hour') or {}; print(h.get('utilization', 0))" 2>/dev/null || echo "0")
five_reset=$(echo "$usage_json" | python3 -c "import sys,json; d=json.load(sys.stdin); h=d.get('five_hour') or {}; print(h.get('resets_at', 'null'))" 2>/dev/null || echo "null")
seven_util=$(echo "$usage_json" | python3 -c "import sys,json; d=json.load(sys.stdin); h=d.get('seven_day') or {}; print(h.get('utilization', 0))" 2>/dev/null || echo "0")
seven_reset=$(echo "$usage_json" | python3 -c "import sys,json; d=json.load(sys.stdin); h=d.get('seven_day') or {}; print(h.get('resets_at', 'null'))" 2>/dev/null || echo "null")
local five_pct seven_pct
five_pct=$(python3 -c "print(int(float('${five_util}') * 100))")
seven_pct=$(python3 -c "print(int(float('${seven_util}') * 100))")
local five_remaining seven_remaining
five_remaining=$(time_remaining "$five_reset")
seven_remaining=$(time_remaining "$seven_reset")
echo ""
echo " ┌─────────────────────────────────────────────┐"
echo " │ CLAUDE QUOTA STATUS │"
printf " │ %-38s│\n" "$now"
echo " ├─────────────────────────────────────────────┤"
printf " │ 5-hour window: "
usage_bar "$five_util"
printf " %3d%% │\n" "$five_pct"
printf " │ Resets in: %-33s│\n" "$five_remaining"
echo " │ │"
printf " │ 7-day window: "
usage_bar "$seven_util"
printf " %3d%% │\n" "$seven_pct"
printf " │ Resets in: %-33s│\n" "$seven_remaining"
echo " └─────────────────────────────────────────────┘"
echo ""
# Decision guidance for Timmy
if (( five_pct >= 80 )); then
echo " ⚠ 5-hour window critical. Switch to local Qwen3-14B."
echo " Reserve remaining quota for high-value tasks only."
elif (( five_pct >= 50 )); then
echo " ~ 5-hour window half spent. Batch remaining requests."
else
echo " ✓ 5-hour window healthy. Full speed ahead."
fi
if (( seven_pct >= 80 )); then
echo " ⚠ Weekly quota critical! Operate in local-only mode."
elif (( seven_pct >= 60 )); then
echo " ~ Weekly quota past 60%. Plan usage carefully."
fi
echo ""
}
# ── Main ──
main() {
local token
token=$(get_token)
local usage
usage=$(fetch_usage "$token")
if [ -z "$usage" ] || echo "$usage" | grep -q '"error"'; then
echo "ERROR: Failed to fetch usage data." >&2
echo "$usage" >&2
exit 1
fi
case "${1:-}" in
--json)
echo "$usage" | python3 -m json.tool
;;
--watch)
while true; do
clear
usage=$(fetch_usage "$token")
display "$usage"
echo " Refreshing in 60s... (Ctrl+C to stop)"
sleep 60
done
for arg in "$@"; do
case "$arg" in
--mode) MODE_ONLY=1 ;;
--json) JSON_OUTPUT=1 ;;
-h|--help)
echo "Usage: $0 [--mode|--json]"
echo " (no flags) Human-readable quota report"
echo " --mode Print current metabolic mode only"
echo " --json JSON output for scripting"
exit 0
;;
*)
display "$usage"
echo "Unknown flag: $arg" >&2
exit 1
;;
esac
}
done
main "$@"
if [[ $MODE_ONLY -eq 1 ]]; then
python3 - <<'PYEOF'
from infrastructure.claude_quota import current_mode
print(current_mode())
PYEOF
elif [[ $JSON_OUTPUT -eq 1 ]]; then
python3 - <<'PYEOF'
import json
from infrastructure.claude_quota import get_quota_store
store = get_quota_store()
today = store.today_summary()
month = store.month_summary()
print(json.dumps({
"today": today.as_dict(),
"month": month.as_dict(),
"current_mode": today.mode,
}))
PYEOF
else
python3 - <<'PYEOF'
from infrastructure.claude_quota import quota_report
print(quota_report())
PYEOF
fi

View File

@@ -1,342 +0,0 @@
#!/usr/bin/env python3
"""Hermes 4 smoke test and tool-calling validation script.
Tests the Hermes 4 14B model after importing into Ollama. Covers:
1. Basic connectivity — model responds
2. Memory usage — under 28 GB with model loaded
3. Tool calling — structured JSON output (not raw text)
4. Reasoning — <think> tag toggling works
5. Timmy-persona smoke test — agent identity prompt
Usage:
python scripts/test_hermes4.py # Run all tests
python scripts/test_hermes4.py --model hermes4-14b
python scripts/test_hermes4.py --model hermes4-36b --ctx 8192
Epic: #1091 Project Bannerlord — AutoLoRA Sovereignty Loop (Step 2 of 7)
Refs: #1101
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
import time
from typing import Any
try:
import requests
except ImportError:
print("ERROR: 'requests' not installed. Run: pip install requests")
sys.exit(1)
OLLAMA_URL = "http://localhost:11434"
DEFAULT_MODEL = "hermes4-14b"
MEMORY_LIMIT_GB = 28.0
# ── Tool schema used for tool-calling tests ──────────────────────────────────
READ_FILE_TOOL = {
"type": "function",
"function": {
"name": "read_file",
"description": "Read the contents of a file at the given path",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative path to the file",
}
},
"required": ["path"],
},
},
}
LIST_ISSUES_TOOL = {
"type": "function",
"function": {
"name": "list_issues",
"description": "List open issues from a Gitea repository",
"parameters": {
"type": "object",
"properties": {
"repo": {"type": "string", "description": "owner/repo slug"},
"state": {
"type": "string",
"enum": ["open", "closed", "all"],
"description": "Issue state filter",
},
},
"required": ["repo"],
},
},
}
# ── Helpers ───────────────────────────────────────────────────────────────────
def _post(endpoint: str, payload: dict, timeout: int = 60) -> dict[str, Any]:
"""POST to Ollama and return parsed JSON."""
url = f"{OLLAMA_URL}{endpoint}"
resp = requests.post(url, json=payload, timeout=timeout)
resp.raise_for_status()
return resp.json()
def _ollama_memory_gb() -> float:
"""Estimate Ollama process RSS in GB using ps (macOS/Linux)."""
try:
# Look for ollama process RSS (macOS: column 6 in MB, Linux: column 6 in KB)
result = subprocess.run(
["ps", "-axo", "pid,comm,rss"],
capture_output=True,
text=True,
check=False,
)
total_kb = 0
for line in result.stdout.splitlines():
if "ollama" in line.lower():
parts = line.split()
try:
total_kb += int(parts[-1])
except (ValueError, IndexError):
pass
return total_kb / (1024 * 1024) # KB → GB
except Exception:
return 0.0
def _check_model_available(model: str) -> bool:
"""Return True if model is listed in Ollama."""
try:
resp = requests.get(f"{OLLAMA_URL}/api/tags", timeout=10)
resp.raise_for_status()
names = [m["name"] for m in resp.json().get("models", [])]
return any(model in n for n in names)
except Exception:
return False
def _chat(model: str, messages: list[dict], tools: list | None = None) -> dict:
"""Send a chat request to Ollama."""
payload: dict = {"model": model, "messages": messages, "stream": False}
if tools:
payload["tools"] = tools
return _post("/api/chat", payload, timeout=120)
# ── Test cases ────────────────────────────────────────────────────────────────
def test_model_available(model: str) -> bool:
"""PASS: model is registered in Ollama."""
print(f"\n[1/5] Checking model availability: {model}")
if _check_model_available(model):
print(f"{model} is available in Ollama")
return True
print(
f"{model} not found. Import with:\n"
f" ollama create {model} -f Modelfile.hermes4-14b\n"
f" Or pull directly if on registry:\n"
f" ollama pull {model}"
)
return False
def test_basic_response(model: str) -> bool:
"""PASS: model responds coherently to a simple prompt."""
print(f"\n[2/5] Basic response test")
messages = [
{"role": "user", "content": "Reply with exactly: HERMES_OK"},
]
try:
t0 = time.time()
data = _chat(model, messages)
elapsed = time.time() - t0
content = data.get("message", {}).get("content", "")
if "HERMES_OK" in content:
print(f" ✓ Basic response OK ({elapsed:.1f}s): {content.strip()}")
return True
print(f" ✗ Unexpected response ({elapsed:.1f}s): {content[:200]!r}")
return False
except Exception as exc:
print(f" ✗ Request failed: {exc}")
return False
def test_memory_usage() -> bool:
"""PASS: Ollama process RSS is under MEMORY_LIMIT_GB."""
print(f"\n[3/5] Memory usage check (limit: {MEMORY_LIMIT_GB} GB)")
mem_gb = _ollama_memory_gb()
if mem_gb == 0.0:
print(" ~ Could not determine memory usage (ps unavailable?), skipping")
return True
if mem_gb < MEMORY_LIMIT_GB:
print(f" ✓ Memory usage: {mem_gb:.1f} GB (under {MEMORY_LIMIT_GB} GB limit)")
return True
print(
f" ✗ Memory usage: {mem_gb:.1f} GB exceeds {MEMORY_LIMIT_GB} GB limit.\n"
" Consider using Q4_K_M quantisation or reducing num_ctx."
)
return False
def test_tool_calling(model: str) -> bool:
"""PASS: model produces a tool_calls response (not raw text) for a tool-use prompt."""
print(f"\n[4/5] Tool-calling test")
messages = [
{
"role": "user",
"content": "Please read the file at /tmp/test.txt using the read_file tool.",
}
]
try:
t0 = time.time()
data = _chat(model, messages, tools=[READ_FILE_TOOL])
elapsed = time.time() - t0
msg = data.get("message", {})
tool_calls = msg.get("tool_calls", [])
if tool_calls:
tc = tool_calls[0]
fn = tc.get("function", {})
print(
f" ✓ Tool call produced ({elapsed:.1f}s):\n"
f" function: {fn.get('name')}\n"
f" arguments: {json.dumps(fn.get('arguments', {}), indent=6)}"
)
# Verify the function name is correct
return fn.get("name") == "read_file"
# Some models return JSON in the content instead of tool_calls
content = msg.get("content", "")
if "read_file" in content and "{" in content:
print(
f" ~ Model returned tool call as text (not structured). ({elapsed:.1f}s)\n"
f" This is acceptable for the base model before fine-tuning.\n"
f" Content: {content[:300]}"
)
# Partial pass — model attempted tool calling but via text
return True
print(
f" ✗ No tool call in response ({elapsed:.1f}s).\n"
f" Content: {content[:300]!r}"
)
return False
except Exception as exc:
print(f" ✗ Tool-calling request failed: {exc}")
return False
def test_timmy_persona(model: str) -> bool:
"""PASS: model accepts a Timmy persona system prompt and responds in-character."""
print(f"\n[5/5] Timmy-persona smoke test")
messages = [
{
"role": "system",
"content": (
"You are Timmy, Alexander's personal AI agent. "
"You are concise, direct, and helpful. "
"You always start your responses with 'Timmy here:'."
),
},
{
"role": "user",
"content": "What is your name and what can you help me with?",
},
]
try:
t0 = time.time()
data = _chat(model, messages)
elapsed = time.time() - t0
content = data.get("message", {}).get("content", "")
if "Timmy" in content or "timmy" in content.lower():
print(f" ✓ Persona accepted ({elapsed:.1f}s): {content[:200].strip()}")
return True
print(
f" ~ Persona response lacks 'Timmy' identifier ({elapsed:.1f}s).\n"
f" This is a fine-tuning target.\n"
f" Response: {content[:200]!r}"
)
# Soft pass — base model isn't expected to be perfectly in-character
return True
except Exception as exc:
print(f" ✗ Persona test failed: {exc}")
return False
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(description="Hermes 4 smoke test suite")
parser.add_argument(
"--model",
default=DEFAULT_MODEL,
help=f"Ollama model name (default: {DEFAULT_MODEL})",
)
parser.add_argument(
"--ollama-url",
default=OLLAMA_URL,
help=f"Ollama base URL (default: {OLLAMA_URL})",
)
args = parser.parse_args()
global OLLAMA_URL
OLLAMA_URL = args.ollama_url.rstrip("/")
model = args.model
print("=" * 60)
print(f"Hermes 4 Validation Suite — {model}")
print(f"Ollama: {OLLAMA_URL}")
print("=" * 60)
results: dict[str, bool] = {}
# Test 1: availability (gate — skip remaining if model missing)
results["available"] = test_model_available(model)
if not results["available"]:
print("\n⚠ Model not available — skipping remaining tests.")
print(" Import the model first (see Modelfile.hermes4-14b).")
_print_summary(results)
return 1
# Tests 25
results["basic_response"] = test_basic_response(model)
results["memory_usage"] = test_memory_usage()
results["tool_calling"] = test_tool_calling(model)
results["timmy_persona"] = test_timmy_persona(model)
return _print_summary(results)
def _print_summary(results: dict[str, bool]) -> int:
passed = sum(results.values())
total = len(results)
print("\n" + "=" * 60)
print(f"Results: {passed}/{total} passed")
print("=" * 60)
for name, ok in results.items():
icon = "" if ok else ""
print(f" {icon} {name}")
if passed == total:
print("\n✓ All tests passed. Hermes 4 is ready for AutoLoRA fine-tuning.")
print(" Next step: document WORK vs FAIL skill list → fine-tuning targets.")
elif results.get("tool_calling") is False:
print("\n⚠ Tool-calling FAILED. This is the primary fine-tuning target.")
print(" Base model may need LoRA tuning on tool-use examples.")
else:
print("\n~ Partial pass. Review failures above before fine-tuning.")
return 0 if passed == total else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -217,10 +217,6 @@ class Settings(BaseSettings):
# ── Test / Diagnostics ─────────────────────────────────────────────
# Skip loading heavy embedding models (for tests / low-memory envs).
timmy_skip_embeddings: bool = False
# Embedding backend: "ollama" for Ollama, "local" for sentence-transformers.
timmy_embedding_backend: Literal["ollama", "local"] = "ollama"
# Ollama model to use for embeddings (e.g., "nomic-embed-text").
ollama_embedding_model: str = "nomic-embed-text"
# Disable CSRF middleware entirely (for tests).
timmy_disable_csrf: bool = False
# Mark the process as running in test mode.

View File

@@ -375,21 +375,13 @@ def _startup_init() -> None:
def _startup_background_tasks() -> list[asyncio.Task]:
"""Spawn all recurring background tasks (non-blocking)."""
bg_tasks = [
return [
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
]
try:
from timmy.paperclip import start_paperclip_poller
bg_tasks.append(asyncio.create_task(start_paperclip_poller()))
logger.info("Paperclip poller started")
except ImportError:
logger.debug("Paperclip module not found, skipping poller")
return bg_tasks
def _try_prune(label: str, prune_fn, days: int) -> None:

View File

@@ -1,264 +1,302 @@
"""
claude_quota.py — Claude Code / Claude.ai Quota Monitor
"""Claude API quota tracker and metabolic mode advisor.
Drop into src/infrastructure/ in the Timmy Time Dashboard repo.
Tracks Claude API usage (tokens, cost, calls) in a local SQLite database.
Provides a metabolic mode recommendation (BURST / ACTIVE / RESTING) based on
daily spend thresholds so the orchestrator can decide when to use cloud inference
vs. local Ollama.
Provides real-time quota visibility and metabolic protocol decisions.
Metabolic protocol (from issue #1074):
BURST — daily spend < burst_threshold → use Claude freely
ACTIVE — daily spend < active_threshold → prefer Groq / cheap tier
RESTING — daily spend >= active_threshold → local only, no API calls
Usage:
from infrastructure.claude_quota import QuotaMonitor
monitor = QuotaMonitor()
status = monitor.check()
print(status.five_hour_pct) # 42
print(status.five_hour_resets_in) # "2h 15m"
print(status.seven_day_pct) # 29
print(status.recommended_tier) # MetabolicTier.BURST
# Metabolic protocol: auto-select model based on quota
model = monitor.select_model(task_complexity="high")
# Returns "claude-sonnet-4-6" if quota allows, else "qwen3:14b"
Refs: #1074, #972
"""
import json
import logging
import subprocess
import urllib.request
from dataclasses import dataclass
from datetime import UTC, datetime
from enum import StrEnum
import sqlite3
from contextlib import closing
from dataclasses import dataclass, field
from datetime import UTC, date, datetime
from pathlib import Path
from typing import Literal
from config import settings
logger = logging.getLogger(__name__)
# ── Cost table (USD per million tokens, approximate) ─────────────────────────
_MODEL_COSTS: dict[str, dict[str, float]] = {
# haiku aliases
"haiku": {"input": 0.25, "output": 1.25},
"claude-haiku-4-5": {"input": 0.25, "output": 1.25},
"claude-haiku-4-5-20251001": {"input": 0.25, "output": 1.25},
# sonnet aliases
"sonnet": {"input": 3.00, "output": 15.00},
"claude-sonnet-4-6": {"input": 3.00, "output": 15.00},
# opus aliases
"opus": {"input": 15.00, "output": 75.00},
"claude-opus-4-6": {"input": 15.00, "output": 75.00},
}
_DEFAULT_COST = {"input": 3.00, "output": 15.00} # conservative default
class MetabolicTier(StrEnum):
"""The three-tier metabolic protocol from the Timmy Time architecture."""
MetabolicMode = Literal["BURST", "ACTIVE", "RESTING"]
BURST = "burst" # Cloud API (Claude/Groq) — expensive, best quality
ACTIVE = "active" # Local 14B (Qwen3-14B) — free, good quality
RESTING = "resting" # Local 8B (Qwen3-8B) — free, fast, adequate
DB_PATH = Path(settings.repo_root) / "data" / "claude_quota.db"
# Daily spend thresholds (USD) — tune via env or subclass Settings
BURST_THRESHOLD: float = 1.00 # < $1/day → BURST mode, use Claude freely
ACTIVE_THRESHOLD: float = 5.00 # < $5/day → ACTIVE mode, prefer cheaper tier
_SCHEMA = """
CREATE TABLE IF NOT EXISTS claude_calls (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
model TEXT NOT NULL,
input_tok INTEGER NOT NULL DEFAULT 0,
output_tok INTEGER NOT NULL DEFAULT 0,
cost_usd REAL NOT NULL DEFAULT 0.0,
task_label TEXT DEFAULT '',
metadata TEXT DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_cc_ts ON claude_calls(ts);
CREATE INDEX IF NOT EXISTS idx_cc_model ON claude_calls(model);
"""
@dataclass
class QuotaStatus:
"""Current Claude quota state."""
class ClaudeCall:
"""Record of a single Claude API call."""
five_hour_utilization: float # 0.0 to 1.0
five_hour_resets_at: str | None
seven_day_utilization: float # 0.0 to 1.0
seven_day_resets_at: str | None
raw_response: dict
fetched_at: datetime
model: str
input_tokens: int
output_tokens: int
task_label: str = ""
ts: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
metadata: dict = field(default_factory=dict)
@property
def five_hour_pct(self) -> int:
return int(self.five_hour_utilization * 100)
@property
def seven_day_pct(self) -> int:
return int(self.seven_day_utilization * 100)
@property
def five_hour_resets_in(self) -> str:
return _time_remaining(self.five_hour_resets_at)
@property
def seven_day_resets_in(self) -> str:
return _time_remaining(self.seven_day_resets_at)
@property
def recommended_tier(self) -> MetabolicTier:
"""Metabolic protocol: determine which inference tier to use."""
# If weekly quota is critical, go full local
if self.seven_day_utilization >= 0.80:
return MetabolicTier.RESTING
# If 5-hour window is critical or past half, use local
if self.five_hour_utilization >= 0.50:
return MetabolicTier.ACTIVE
# Quota healthy — cloud available for high-value tasks
return MetabolicTier.BURST
def summary(self) -> str:
"""Human-readable status string."""
def cost_usd(self) -> float:
costs = _MODEL_COSTS.get(self.model, _DEFAULT_COST)
return (
f"5h: {self.five_hour_pct}% (resets {self.five_hour_resets_in}) | "
f"7d: {self.seven_day_pct}% (resets {self.seven_day_resets_in}) | "
f"tier: {self.recommended_tier.value}"
self.input_tokens * costs["input"]
+ self.output_tokens * costs["output"]
) / 1_000_000
@dataclass
class QuotaSummary:
"""Aggregated quota status for a time window."""
period: str # "today" | "month"
calls: int
input_tokens: int
output_tokens: int
cost_usd: float
mode: MetabolicMode
burst_threshold: float
active_threshold: float
def as_dict(self) -> dict:
return {
"period": self.period,
"calls": self.calls,
"input_tokens": self.input_tokens,
"output_tokens": self.output_tokens,
"cost_usd": round(self.cost_usd, 4),
"mode": self.mode,
"burst_threshold": self.burst_threshold,
"active_threshold": self.active_threshold,
}
def _mode_for_cost(daily_cost: float) -> MetabolicMode:
if daily_cost < BURST_THRESHOLD:
return "BURST"
if daily_cost < ACTIVE_THRESHOLD:
return "ACTIVE"
return "RESTING"
class ClaudeQuotaStore:
"""SQLite-backed store for Claude API usage tracking.
Thread-safe: creates a new connection per operation.
"""
def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or DB_PATH
self._init_db()
def _init_db(self) -> None:
try:
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(self._db_path))) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
conn.executescript(_SCHEMA)
conn.commit()
except Exception as exc:
logger.warning("Failed to initialize claude_quota DB: %s", exc)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
return conn
def record_call(self, call: ClaudeCall) -> None:
"""Persist a completed Claude API call."""
try:
with closing(self._connect()) as conn:
conn.execute(
"INSERT INTO claude_calls "
"(ts, model, input_tok, output_tok, cost_usd, task_label, metadata) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
call.ts,
call.model,
call.input_tokens,
call.output_tokens,
call.cost_usd,
call.task_label,
json.dumps(call.metadata),
),
)
conn.commit()
except Exception as exc:
logger.warning("Failed to record Claude call: %s", exc)
def _aggregate(self, where_clause: str, params: tuple) -> dict:
"""Return aggregated stats for a WHERE clause."""
try:
with closing(self._connect()) as conn:
row = conn.execute(
f"SELECT COUNT(*) as calls, "
f"COALESCE(SUM(input_tok),0) as input_tok, "
f"COALESCE(SUM(output_tok),0) as output_tok, "
f"COALESCE(SUM(cost_usd),0.0) as cost_usd "
f"FROM claude_calls {where_clause}",
params,
).fetchone()
if row:
return dict(row)
except Exception as exc:
logger.warning("Failed to aggregate Claude quota: %s", exc)
return {"calls": 0, "input_tok": 0, "output_tok": 0, "cost_usd": 0.0}
def today_summary(self) -> QuotaSummary:
"""Return quota summary for today (UTC)."""
today = date.today().isoformat()
agg = self._aggregate("WHERE ts >= ?", (today,))
return QuotaSummary(
period="today",
calls=agg["calls"],
input_tokens=agg["input_tok"],
output_tokens=agg["output_tok"],
cost_usd=agg["cost_usd"],
mode=_mode_for_cost(agg["cost_usd"]),
burst_threshold=BURST_THRESHOLD,
active_threshold=ACTIVE_THRESHOLD,
)
def month_summary(self) -> QuotaSummary:
"""Return quota summary for the current calendar month (UTC)."""
month_prefix = date.today().strftime("%Y-%m")
agg = self._aggregate("WHERE ts >= ?", (month_prefix,))
return QuotaSummary(
period="month",
calls=agg["calls"],
input_tokens=agg["input_tok"],
output_tokens=agg["output_tok"],
cost_usd=agg["cost_usd"],
mode=_mode_for_cost(agg["cost_usd"] / 30), # amortised daily
burst_threshold=BURST_THRESHOLD,
active_threshold=ACTIVE_THRESHOLD,
)
class QuotaMonitor:
def current_mode(self) -> MetabolicMode:
"""Return the current metabolic mode based on today's spend."""
return self.today_summary().mode
# ── Module-level singleton ────────────────────────────────────────────────────
_store: ClaudeQuotaStore | None = None
def get_quota_store() -> ClaudeQuotaStore:
"""Return the module-level quota store, creating it on first access."""
global _store
if _store is None:
_store = ClaudeQuotaStore()
return _store
def record_usage(
model: str,
input_tokens: int,
output_tokens: int,
task_label: str = "",
metadata: dict | None = None,
) -> None:
"""Convenience function to record a Claude API call.
Silently degrades if the quota DB is unavailable.
"""
Monitors Claude Code / Claude.ai quota via the internal OAuth API.
call = ClaudeCall(
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
task_label=task_label,
metadata=metadata or {},
)
get_quota_store().record_call(call)
logger.debug(
"Claude call recorded: model=%s in=%d out=%d cost=$%.4f",
model,
input_tokens,
output_tokens,
call.cost_usd,
)
The token is read from macOS Keychain where Claude Code stores it.
Falls back gracefully if credentials aren't available (e.g., on Linux VPS).
def current_mode() -> MetabolicMode:
"""Return the current metabolic mode.
BURST → Claude is cheap today, use freely.
ACTIVE → Approaching daily budget, prefer Groq / cheaper tier.
RESTING → Daily limit reached, use local Ollama only.
"""
API_URL = "https://api.anthropic.com/api/oauth/usage"
KEYCHAIN_SERVICE = "Claude Code-credentials"
USER_AGENT = "claude-code/2.0.32"
def __init__(self) -> None:
self._token: str | None = None
self._last_status: QuotaStatus | None = None
self._cache_seconds = 30 # Don't hammer the API
def _get_token(self) -> str | None:
"""Extract OAuth token from macOS Keychain."""
if self._token:
return self._token
try:
result = subprocess.run(
["security", "find-generic-password", "-s", self.KEYCHAIN_SERVICE, "-w"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
logger.warning("Claude Code credentials not found in Keychain")
return None
creds = json.loads(result.stdout.strip())
oauth = creds.get("claudeAiOauth", creds)
self._token = oauth.get("accessToken")
return self._token
except (
json.JSONDecodeError,
KeyError,
FileNotFoundError,
subprocess.TimeoutExpired,
) as exc:
logger.warning("Could not read Claude Code credentials: %s", exc)
return None
def check(self, force: bool = False) -> QuotaStatus | None:
"""
Fetch current quota status.
Returns None if credentials aren't available (graceful degradation).
Caches results for 30 seconds to avoid rate limiting the quota API itself.
"""
# Return cached if fresh
if not force and self._last_status:
age = (datetime.now(UTC) - self._last_status.fetched_at).total_seconds()
if age < self._cache_seconds:
return self._last_status
token = self._get_token()
if not token:
return None
try:
req = urllib.request.Request(
self.API_URL,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": self.USER_AGENT,
"Authorization": f"Bearer {token}",
"anthropic-beta": "oauth-2025-04-20",
},
)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode())
five_hour = data.get("five_hour") or {}
seven_day = data.get("seven_day") or {}
self._last_status = QuotaStatus(
five_hour_utilization=float(five_hour.get("utilization", 0.0)),
five_hour_resets_at=five_hour.get("resets_at"),
seven_day_utilization=float(seven_day.get("utilization", 0.0)),
seven_day_resets_at=seven_day.get("resets_at"),
raw_response=data,
fetched_at=datetime.now(UTC),
)
return self._last_status
except Exception as exc:
logger.warning("Failed to fetch quota: %s", exc)
return self._last_status # Return stale data if available
def select_model(self, task_complexity: str = "medium") -> str:
"""
Metabolic protocol: select the right model based on quota + task complexity.
Returns an Ollama model tag or "claude-sonnet-4-6" for cloud.
task_complexity: "low" | "medium" | "high"
"""
status = self.check()
# No quota info available — assume local only (sovereign default)
if status is None:
return "qwen3:14b" if task_complexity == "high" else "qwen3:8b"
tier = status.recommended_tier
if tier == MetabolicTier.BURST and task_complexity == "high":
return "claude-sonnet-4-6" # Cloud — best quality
elif tier == MetabolicTier.BURST and task_complexity == "medium":
return "qwen3:14b" # Save cloud for truly hard tasks
elif tier == MetabolicTier.ACTIVE:
return "qwen3:14b" # Local 14B — good enough
else: # RESTING
return "qwen3:8b" # Local 8B — conserve everything
def should_use_cloud(self, task_value: str = "normal") -> bool:
"""
Simple yes/no: should this task use cloud API?
task_value: "critical" | "high" | "normal" | "routine"
"""
status = self.check()
if status is None:
return False # No credentials = local only
if task_value == "critical":
return status.seven_day_utilization < 0.95 # Almost always yes
elif task_value == "high":
return status.five_hour_utilization < 0.60
elif task_value == "normal":
return status.five_hour_utilization < 0.30
else: # routine
return False # Never waste cloud on routine
def _time_remaining(reset_at: str | None) -> str:
"""Format time until reset as human-readable string."""
if not reset_at or reset_at == "null":
return "unknown"
try:
reset = datetime.fromisoformat(reset_at.replace("Z", "+00:00"))
now = datetime.now(UTC)
diff = reset - now
if diff.total_seconds() <= 0:
return "resetting now"
hours = int(diff.total_seconds() // 3600)
mins = int((diff.total_seconds() % 3600) // 60)
if hours > 0:
return f"{hours}h {mins}m"
return f"{mins}m"
except (ValueError, TypeError):
return "unknown"
return get_quota_store().current_mode()
except Exception as exc:
logger.warning("Quota mode check failed, defaulting to BURST: %s", exc)
return "BURST"
# Module-level singleton
_quota_monitor: QuotaMonitor | None = None
def quota_report() -> str:
"""Return a human-readable quota report for CLI / dashboard display."""
try:
store = get_quota_store()
today = store.today_summary()
month = store.month_summary()
def get_quota_monitor() -> QuotaMonitor:
"""Get or create the quota monitor singleton."""
global _quota_monitor
if _quota_monitor is None:
_quota_monitor = QuotaMonitor()
return _quota_monitor
lines = [
"═══════════════════════════════════════",
" Claude API Quota — Metabolic Report ",
"═══════════════════════════════════════",
f" Today {today.calls:>6} calls "
f"${today.cost_usd:>7.4f} [{today.mode}]",
f" This month {month.calls:>5} calls "
f"${month.cost_usd:>7.4f}",
"───────────────────────────────────────",
f" BURST threshold : ${today.burst_threshold:.2f}/day",
f" ACTIVE threshold : ${today.active_threshold:.2f}/day",
"───────────────────────────────────────",
f" Current mode : {today.mode}",
"═══════════════════════════════════════",
]
return "\n".join(lines)
except Exception as exc:
return f"Quota report unavailable: {exc}"

View File

@@ -32,15 +32,6 @@ except ImportError:
logger = logging.getLogger(__name__)
# Quota monitor — optional, degrades gracefully if unavailable
try:
from infrastructure.claude_quota import QuotaMonitor, get_quota_monitor
_quota_monitor: "QuotaMonitor | None" = get_quota_monitor()
except Exception as _exc: # pragma: no cover
logger.debug("Quota monitor not available: %s", _exc)
_quota_monitor = None
class ProviderStatus(Enum):
"""Health status of a provider."""
@@ -310,22 +301,6 @@ class CascadeRouter:
logger.debug("Ollama provider check error: %s", exc)
return False
elif provider.type == "vllm_mlx":
# Check if local vllm-mlx server is running (OpenAI-compatible)
if requests is None:
return True
try:
base_url = provider.base_url or provider.url or "http://localhost:8000"
# Strip /v1 suffix — health endpoint is at the root
server_root = base_url.rstrip("/")
if server_root.endswith("/v1"):
server_root = server_root[:-3]
response = requests.get(f"{server_root}/health", timeout=5)
return response.status_code == 200
except Exception as exc:
logger.debug("vllm-mlx provider check error: %s", exc)
return False
elif provider.type in ("openai", "anthropic", "grok"):
# Check if API key is set
return provider.api_key is not None and provider.api_key != ""
@@ -482,25 +457,6 @@ class CascadeRouter:
raise RuntimeError("; ".join(errors))
def _quota_allows_cloud(self, provider: Provider) -> bool:
"""Check quota before routing to a cloud provider.
Uses the metabolic protocol: cloud calls are gated by 5-hour quota.
Returns True (allow cloud) if quota monitor is unavailable or returns None.
"""
if _quota_monitor is None:
return True
try:
# Map provider type to task_value heuristic
task_value = "high" # conservative default
status = _quota_monitor.check()
if status is None:
return True # No credentials — caller decides based on config
return _quota_monitor.should_use_cloud(task_value)
except Exception as exc:
logger.warning("Quota check failed, allowing cloud: %s", exc)
return True
def _is_provider_available(self, provider: Provider) -> bool:
"""Check if a provider should be tried (enabled + circuit breaker)."""
if not provider.enabled:
@@ -554,15 +510,6 @@ class CascadeRouter:
if not self._is_provider_available(provider):
continue
# Metabolic protocol: skip cloud providers when quota is low
if provider.type in ("anthropic", "openai", "grok"):
if not self._quota_allows_cloud(provider):
logger.info(
"Metabolic protocol: skipping cloud provider %s (quota too low)",
provider.name,
)
continue
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:
@@ -635,14 +582,6 @@ class CascadeRouter:
temperature=temperature,
max_tokens=max_tokens,
)
elif provider.type == "vllm_mlx":
result = await self._call_vllm_mlx(
provider=provider,
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
)
else:
raise ValueError(f"Unknown provider type: {provider.type}")
@@ -839,48 +778,6 @@ class CascadeRouter:
"model": response.model,
}
async def _call_vllm_mlx(
self,
provider: Provider,
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None,
) -> dict:
"""Call vllm-mlx via its OpenAI-compatible API.
vllm-mlx exposes the same /v1/chat/completions endpoint as OpenAI,
so we reuse the OpenAI client pointed at the local server.
No API key is required for local deployments.
"""
import openai
base_url = provider.base_url or provider.url or "http://localhost:8000"
# Ensure the base_url ends with /v1 as expected by the OpenAI client
if not base_url.rstrip("/").endswith("/v1"):
base_url = base_url.rstrip("/") + "/v1"
client = openai.AsyncOpenAI(
api_key=provider.api_key or "no-key-required",
base_url=base_url,
timeout=self.config.timeout_seconds,
)
kwargs: dict = {
"model": model,
"messages": messages,
"temperature": temperature,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
response = await client.chat.completions.create(**kwargs)
return {
"content": response.choices[0].message.content,
"model": response.model,
}
def _record_success(self, provider: Provider, latency_ms: float) -> None:
"""Record a successful request."""
provider.metrics.total_requests += 1

View File

@@ -1,488 +0,0 @@
"""Kimi delegation for heavy research via Gitea labels.
When research exceeds local + Groq capacity, Timmy delegates to Kimi by:
1. Filling a research template with full context
2. Creating a Gitea issue labeled `kimi-ready`
3. Monitoring for Kimi's completion (issue closed + artifact committed)
4. Indexing Kimi's artifact into semantic memory
5. Extracting action items and creating follow-up issues
Delegation flow:
Timmy detects capacity exceeded
→ Fills template with context
→ Creates `kimi-ready` Gitea issue
→ Kimi picks up, executes, commits artifact, closes issue
→ Timmy indexes artifact + creates follow-ups
"""
import asyncio
import logging
import re
from typing import Any
logger = logging.getLogger(__name__)
# Label applied to issues that Kimi should pick up
KIMI_READY_LABEL = "kimi-ready"
# Label colour for the kimi-ready label (dark teal)
KIMI_LABEL_COLOR = "#006b75"
# Keywords that suggest a task exceeds local capacity
_HEAVY_RESEARCH_KEYWORDS = frozenset(
{
"comprehensive",
"exhaustive",
"systematic review",
"literature review",
"benchmark",
"comparative analysis",
"large-scale",
"survey",
"meta-analysis",
"deep research",
"extensive",
}
)
# Minimum word count that hints at a heavy task
_HEAVY_WORD_THRESHOLD = 50
def exceeds_local_capacity(task_description: str) -> bool:
"""Heuristic: does this research task exceed local + Groq capacity?
Returns True when the task description signals heavy or broad research
that benefits from Kimi's 262K context and long-running processing.
Args:
task_description: Free-text description of the research task.
Returns:
True if the task should be delegated to Kimi.
"""
lower = task_description.lower()
word_count = len(task_description.split())
has_heavy_keyword = any(kw in lower for kw in _HEAVY_RESEARCH_KEYWORDS)
is_long_task = word_count >= _HEAVY_WORD_THRESHOLD
return has_heavy_keyword or is_long_task
def _build_research_template(
task: str,
context: str,
question: str,
priority: str = "normal",
) -> str:
"""Fill the standard Kimi research template with task context.
Args:
task: Short title for the research task.
context: Background information and relevant project context.
question: The specific research question to answer.
priority: Task priority — "low", "normal", or "high".
Returns:
Markdown-formatted issue body ready for Gitea.
"""
return f"""\
## Research Request
**Priority:** {priority}
### Research Question
{question}
### Background / Context
{context}
### Scope
Please produce a thorough, well-structured research report covering:
- Direct answer to the research question above
- Supporting evidence and sources where applicable
- Trade-offs, limitations, or caveats
- Concrete recommendations or next steps
### Deliverables
Commit your findings as a markdown artifact (e.g. `memory/research/{_slugify(task)}.md`)
and close this issue when complete.
### Task
{task}
---
*Delegated by Timmy via Kimi delegation pipeline. Label: `{KIMI_READY_LABEL}`*
"""
def _slugify(text: str) -> str:
"""Convert text to a safe filename slug."""
slug = re.sub(r"[^\w\s-]", "", text.lower())
slug = re.sub(r"[\s_]+", "-", slug)
return slug[:60].strip("-")
async def _get_or_create_label(
client: Any,
base_url: str,
headers: dict[str, str],
repo: str,
) -> int | None:
"""Ensure the `kimi-ready` label exists; return its ID or None on error.
Args:
client: httpx.AsyncClient instance.
base_url: Gitea API base URL.
headers: Auth headers.
repo: owner/repo string.
Returns:
Label ID, or None if the operation failed.
"""
labels_url = f"{base_url}/repos/{repo}/labels"
# Check for existing label
try:
resp = await client.get(labels_url, headers=headers)
if resp.status_code == 200:
for label in resp.json():
if label.get("name") == KIMI_READY_LABEL:
return label["id"]
except Exception as exc:
logger.warning("Failed to list Gitea labels: %s", exc)
return None
# Create the label
try:
resp = await client.post(
labels_url,
headers=headers,
json={"name": KIMI_READY_LABEL, "color": KIMI_LABEL_COLOR},
)
if resp.status_code in (200, 201):
return resp.json().get("id")
logger.warning("Label creation returned %s: %s", resp.status_code, resp.text[:200])
except Exception as exc:
logger.warning("Failed to create Gitea label: %s", exc)
return None
async def create_kimi_research_issue(
task: str,
context: str,
question: str,
priority: str = "normal",
) -> dict[str, Any]:
"""Create a Gitea issue labeled `kimi-ready` for Kimi to pick up.
Args:
task: Short title for the research task (used as issue title).
context: Background information and project context.
question: The specific research question.
priority: Task priority — "low", "normal", or "high".
Returns:
Dict with `success`, `issue_number`, `issue_url`, and `error` keys.
"""
try:
import httpx
from config import settings
except ImportError as exc:
return {"success": False, "error": f"Missing dependency: {exc}"}
if not settings.gitea_enabled or not settings.gitea_token:
return {
"success": False,
"error": "Gitea integration not configured (no token or disabled).",
}
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
try:
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(client, base_url, headers, repo)
body = _build_research_template(task, context, question, priority)
issue_payload: dict[str, Any] = {"title": task, "body": body}
if label_id is not None:
issue_payload["labels"] = [label_id]
resp = await client.post(
f"{base_url}/repos/{repo}/issues",
headers=headers,
json=issue_payload,
)
if resp.status_code in (200, 201):
data = resp.json()
number = data.get("number")
url = data.get("html_url", "")
logger.info("Created kimi-ready issue #%s: %s", number, task[:60])
return {
"success": True,
"issue_number": number,
"issue_url": url,
"error": None,
}
logger.warning("Issue creation failed (%s): %s", resp.status_code, resp.text[:200])
return {
"success": False,
"error": f"Gitea API error {resp.status_code}: {resp.text[:200]}",
}
except Exception as exc:
logger.warning("create_kimi_research_issue failed: %s", exc)
return {"success": False, "error": str(exc)}
async def poll_kimi_issue(
issue_number: int,
poll_interval: int = 60,
max_wait: int = 3600,
) -> dict[str, Any]:
"""Poll a Gitea issue until it is closed (Kimi completed) or timeout.
Args:
issue_number: The Gitea issue number to watch.
poll_interval: Seconds between polls. Default 60.
max_wait: Maximum total seconds to wait. Default 3600 (1 hour).
Returns:
Dict with `completed` bool, `state`, `body`, and `error` keys.
"""
try:
import httpx
from config import settings
except ImportError as exc:
return {"completed": False, "error": f"Missing dependency: {exc}"}
if not settings.gitea_enabled or not settings.gitea_token:
return {"completed": False, "error": "Gitea not configured."}
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}"
elapsed = 0
while elapsed < max_wait:
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(issue_url, headers=headers)
if resp.status_code == 200:
data = resp.json()
state = data.get("state", "open")
if state == "closed":
logger.info("Kimi completed issue #%s", issue_number)
return {
"completed": True,
"state": state,
"body": data.get("body", ""),
"error": None,
}
else:
logger.warning("Poll issue #%s returned %s", issue_number, resp.status_code)
except Exception as exc:
logger.warning("Poll error for issue #%s: %s", issue_number, exc)
await asyncio.sleep(poll_interval)
elapsed += poll_interval
return {
"completed": False,
"state": "timeout",
"body": "",
"error": f"Timed out after {max_wait}s waiting for issue #{issue_number}",
}
def _extract_action_items(text: str) -> list[str]:
"""Extract action items from markdown text.
Looks for lines that start with checklist markers, numbered items,
or explicit "Action:" / "TODO:" prefixes.
Args:
text: Markdown text from Kimi's artifact.
Returns:
List of action item strings (deduplicated, whitespace-stripped).
"""
items: list[str] = []
patterns = [
re.compile(r"^[-*]\s+\[ \]\s+(.+)", re.MULTILINE), # - [ ] checkbox
re.compile(r"^\d+\.\s+(.+)", re.MULTILINE), # 1. numbered list
re.compile(r"^(?:Action|TODO|Next step):\s*(.+)", re.MULTILINE | re.IGNORECASE),
]
seen: set[str] = set()
for pat in patterns:
for m in pat.finditer(text):
item = m.group(1).strip()
if item and item not in seen:
items.append(item)
seen.add(item)
return items
async def index_kimi_artifact(
issue_number: int,
title: str,
artifact_content: str,
) -> dict[str, Any]:
"""Index Kimi's research artifact into Timmy's semantic memory.
Args:
issue_number: Source Gitea issue number (used as task_id).
title: Human-readable title for the memory entry.
artifact_content: The research artifact text to index.
Returns:
Dict with `success` bool and `memory_id` or `error`.
"""
if not artifact_content.strip():
return {"success": False, "error": "Empty artifact — nothing to index."}
try:
import asyncio
from timmy.memory_system import store_memory
# store_memory is synchronous — wrap in thread to avoid blocking event loop
entry = await asyncio.to_thread(
store_memory,
content=artifact_content,
source="kimi",
context_type="document",
task_id=str(issue_number),
metadata={"issue_number": issue_number, "title": title},
)
logger.info("Indexed Kimi artifact for issue #%s (id=%s)", issue_number, entry.id)
return {"success": True, "memory_id": entry.id}
except Exception as exc:
logger.warning("Failed to index Kimi artifact for issue #%s: %s", issue_number, exc)
return {"success": False, "error": str(exc)}
async def extract_and_create_followups(
artifact_content: str,
source_issue_number: int,
) -> dict[str, Any]:
"""Extract action items from artifact and create follow-up Gitea issues.
Args:
artifact_content: Text of Kimi's research artifact.
source_issue_number: Issue number that produced the artifact (for cross-links).
Returns:
Dict with `success`, `created` (list of issue numbers), and `error`.
"""
items = _extract_action_items(artifact_content)
if not items:
logger.info("No action items found in artifact for issue #%s", source_issue_number)
return {"success": True, "created": [], "error": None}
try:
import httpx
from config import settings
except ImportError as exc:
return {"success": False, "created": [], "error": str(exc)}
if not settings.gitea_enabled or not settings.gitea_token:
return {
"success": False,
"created": [],
"error": "Gitea not configured.",
}
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
created: list[int] = []
for item in items:
body = (
f"Follow-up from Kimi research artifact in #{source_issue_number}.\n\n"
f"**Action item:** {item}"
)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{base_url}/repos/{repo}/issues",
headers=headers,
json={"title": item[:120], "body": body},
)
if resp.status_code in (200, 201):
num = resp.json().get("number")
if num:
created.append(num)
logger.info(
"Created follow-up issue #%s from kimi artifact #%s",
num,
source_issue_number,
)
else:
logger.warning(
"Follow-up issue creation returned %s for item: %s",
resp.status_code,
item[:60],
)
except Exception as exc:
logger.warning("Failed to create follow-up for item '%s': %s", item[:60], exc)
return {"success": True, "created": created, "error": None}
async def delegate_research_to_kimi(
task: str,
context: str,
question: str,
priority: str = "normal",
) -> dict[str, Any]:
"""Top-level entry point: delegate a heavy research task to Kimi.
Creates the `kimi-ready` Gitea issue and returns immediately.
Monitoring, artifact indexing, and follow-up creation happen
separately via `poll_kimi_issue`, `index_kimi_artifact`, and
`extract_and_create_followups`.
Args:
task: Short title (becomes the issue title).
context: Background / project context.
question: The specific research question Kimi should answer.
priority: "low", "normal", or "high".
Returns:
Dict with `success`, `issue_number`, `issue_url`, and `error`.
"""
if not task.strip() or not question.strip():
return {
"success": False,
"error": "Both `task` and `question` are required.",
}
logger.info("Delegating research to Kimi: %s", task[:80])
return await create_kimi_research_issue(task, context, question, priority)

View File

@@ -9,81 +9,35 @@ Also includes vector similarity utilities (cosine similarity, keyword overlap).
import hashlib
import logging
import math
import json
import httpx # Import httpx for Ollama API calls
from config import settings
logger = logging.getLogger(__name__)
# Embedding model - small, fast, local
EMBEDDING_MODEL = None
EMBEDDING_DIM = 384 # MiniLM dimension, will be overridden if Ollama model has different dim
EMBEDDING_DIM = 384 # MiniLM dimension
class OllamaEmbedder:
"""Mimics SentenceTransformer interface for Ollama."""
def __init__(self, model_name: str, ollama_url: str):
self.model_name = model_name
self.ollama_url = ollama_url
self.dimension = 0 # Will be updated after first call
def encode(self, sentences: str | list[str], convert_to_numpy: bool = False, normalize_embeddings: bool = True) -> list[list[float]] | list[float]:
"""Generate embeddings using Ollama."""
if isinstance(sentences, str):
sentences = [sentences]
all_embeddings = []
for sentence in sentences:
try:
response = httpx.post(
f"{self.ollama_url}/api/embeddings",
json={"model": self.model_name, "prompt": sentence},
timeout=settings.mcp_bridge_timeout,
)
response.raise_for_status()
embedding = response.json()["embedding"]
if not self.dimension:
self.dimension = len(embedding) # Set dimension on first successful call
global EMBEDDING_DIM
EMBEDDING_DIM = self.dimension # Update global EMBEDDING_DIM
all_embeddings.append(embedding)
except httpx.RequestError as exc:
logger.error("Ollama embeddings request failed: %s", exc)
# Fallback to simple hash embedding on Ollama error
return _simple_hash_embedding(sentence)
except json.JSONDecodeError as exc:
logger.error("Failed to decode Ollama embeddings response: %s", exc)
return _simple_hash_embedding(sentence)
if len(all_embeddings) == 1 and isinstance(sentences, str):
return all_embeddings[0]
return all_embeddings
def _get_embedding_model():
"""Lazy-load embedding model, preferring Ollama if configured."""
"""Lazy-load embedding model."""
global EMBEDDING_MODEL
global EMBEDDING_DIM
if EMBEDDING_MODEL is None:
if settings.timmy_skip_embeddings:
EMBEDDING_MODEL = False
return EMBEDDING_MODEL
try:
from config import settings
if settings.timmy_embedding_backend == "ollama":
logger.info("MemorySystem: Using Ollama for embeddings with model %s", settings.ollama_embedding_model)
EMBEDDING_MODEL = OllamaEmbedder(settings.ollama_embedding_model, settings.normalized_ollama_url)
# We don't know the dimension until after the first call, so keep it default for now.
# It will be updated dynamically in OllamaEmbedder.encode
return EMBEDDING_MODEL
else:
try:
from sentence_transformers import SentenceTransformer
if settings.timmy_skip_embeddings:
EMBEDDING_MODEL = False
return EMBEDDING_MODEL
except ImportError:
pass
EMBEDDING_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
EMBEDDING_DIM = 384 # Reset to MiniLM dimension
logger.info("MemorySystem: Loaded local embedding model (all-MiniLM-L6-v2)")
except ImportError:
logger.warning("MemorySystem: sentence-transformers not installed, using fallback")
EMBEDDING_MODEL = False # Use fallback
try:
from sentence_transformers import SentenceTransformer
EMBEDDING_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
logger.info("MemorySystem: Loaded embedding model")
except ImportError:
logger.warning("MemorySystem: sentence-transformers not installed, using fallback")
EMBEDDING_MODEL = False # Use fallback
return EMBEDDING_MODEL
@@ -106,14 +60,10 @@ def embed_text(text: str) -> list[float]:
model = _get_embedding_model()
if model and model is not False:
embedding = model.encode(text)
# Ensure it's a list of floats, not numpy array
if hasattr(embedding, 'tolist'):
return embedding.tolist()
return embedding
return embedding.tolist()
return _simple_hash_embedding(text)
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Calculate cosine similarity between two vectors."""
dot = sum(x * y for x, y in zip(a, b, strict=False))

View File

@@ -1206,7 +1206,7 @@ memory_searcher = MemorySearcher()
# ───────────────────────────────────────────────────────────────────────────────
def memory_search(query: str, limit: int = 10) -> str:
def memory_search(query: str, top_k: int = 5) -> str:
"""Search past conversations, notes, and stored facts for relevant context.
Searches across both the vault (indexed markdown files) and the
@@ -1215,19 +1215,19 @@ def memory_search(query: str, limit: int = 10) -> str:
Args:
query: What to search for (e.g. "Bitcoin strategy", "server setup").
limit: Number of results to return (default 10).
top_k: Number of results to return (default 5).
Returns:
Formatted string of relevant memory results.
"""
# Guard: model sometimes passes None for limit
if limit is None:
limit = 10
# Guard: model sometimes passes None for top_k
if top_k is None:
top_k = 5
parts: list[str] = []
# 1. Search semantic vault (indexed markdown files)
vault_results = semantic_memory.search(query, limit)
vault_results = semantic_memory.search(query, top_k)
for content, score in vault_results:
if score < 0.2:
continue
@@ -1235,7 +1235,7 @@ def memory_search(query: str, limit: int = 10) -> str:
# 2. Search runtime vector store (stored facts/conversations)
try:
runtime_results = search_memories(query, limit=limit, min_relevance=0.2)
runtime_results = search_memories(query, limit=top_k, min_relevance=0.2)
for entry in runtime_results:
label = entry.context_type or "memory"
parts.append(f"[{label}] {entry.content[:300]}")
@@ -1289,48 +1289,45 @@ def memory_read(query: str = "", top_k: int = 5) -> str:
return "\n".join(parts)
def memory_store(topic: str, report: str, type: str = "research") -> str:
"""Store a piece of information in persistent memory, particularly for research outputs.
def memory_write(content: str, context_type: str = "fact") -> str:
"""Store a piece of information in persistent memory.
Use this tool to store structured research findings or other important documents.
Stored memories are searchable via memory_search across all channels.
Use this tool when the user explicitly asks you to remember something.
Stored memories are searchable via memory_search across all channels
(web GUI, Discord, Telegram, etc.).
Args:
topic: A concise title or topic for the research output.
report: The detailed content of the research output or document.
type: Type of memory — "research" for research outputs (default),
"fact" for permanent facts, "conversation" for conversation context,
"document" for other document fragments.
content: The information to remember (e.g. a phrase, fact, or note).
context_type: Type of memory — "fact" for permanent facts,
"conversation" for conversation context,
"document" for document fragments.
Returns:
Confirmation that the memory was stored.
"""
if not report or not report.strip():
return "Nothing to store — report is empty."
if not content or not content.strip():
return "Nothing to store — content is empty."
# Combine topic and report for embedding and storage content
full_content = f"Topic: {topic.strip()}\n\nReport: {report.strip()}"
valid_types = ("fact", "conversation", "document", "research")
if type not in valid_types:
type = "research"
valid_types = ("fact", "conversation", "document")
if context_type not in valid_types:
context_type = "fact"
try:
# Dedup check for facts and research — skip if similar exists
if type in ("fact", "research"):
# Dedup check for facts — skip if a similar fact already exists
# Threshold 0.75 catches paraphrases (was 0.9 which only caught near-exact)
if context_type == "fact":
existing = search_memories(
full_content, limit=3, context_type=type, min_relevance=0.75
content.strip(), limit=3, context_type="fact", min_relevance=0.75
)
if existing:
return f"Similar {type} already stored (id={existing[0].id[:8]}). Skipping duplicate."
return f"Similar fact already stored (id={existing[0].id[:8]}). Skipping duplicate."
entry = store_memory(
content=full_content,
content=content.strip(),
source="agent",
context_type=type,
metadata={"topic": topic},
context_type=context_type,
)
return f"Stored in memory (type={type}, id={entry.id[:8]}). This is now searchable across all channels."
return f"Stored in memory (type={context_type}, id={entry.id[:8]}). This is now searchable across all channels."
except Exception as exc:
logger.error("Failed to write memory: %s", exc)
return f"Failed to store memory: {exc}"

View File

@@ -1,175 +0,0 @@
"""Paperclip integration for Timmy.
This module provides a client for the Paperclip API, and a poller for
running research tasks.
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
import httpx
from config import settings
from timmy.research_triage import triage_research_report
from timmy.research_tools import google_web_search, get_llm_client
logger = logging.getLogger(__name__)
@dataclass
class PaperclipTask:
"""A task from the Paperclip API."""
id: str
kind: str
context: dict
class PaperclipClient:
"""A client for the Paperclip API."""
def __init__(self) -> None:
self.base_url = settings.paperclip_url
self.api_key = settings.paperclip_api_key
self.agent_id = settings.paperclip_agent_id
self.company_id = settings.paperclip_company_id
self.timeout = settings.paperclip_timeout
async def get_tasks(self) -> list[PaperclipTask]:
"""Get a list of tasks from the Paperclip API."""
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.get(
f"{self.base_url}/api/tasks",
headers={"Authorization": f"Bearer {self.api_key}"},
params={
"agent_id": self.agent_id,
"company_id": self.company_id,
"status": "queued",
},
)
resp.raise_for_status()
tasks = resp.json()
return [
PaperclipTask(id=t["id"], kind=t["kind"], context=t["context"])
for t in tasks
]
async def update_task_status(
self, task_id: str, status: str, result: str | None = None
) -> None:
"""Update the status of a task."""
async with httpx.AsyncClient(timeout=self.timeout) as client:
await client.patch(
f"{self.base_url}/api/tasks/{task_id}",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"status": status, "result": result},
)
class ResearchOrchestrator:
"""Orchestrates research tasks."""
async def get_gitea_issue(self, issue_number: int) -> dict:
"""Get a Gitea issue by its number."""
owner, repo = settings.gitea_repo.split("/", 1)
api_url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/issues/{issue_number}"
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(
api_url,
headers={"Authorization": f"token {settings.gitea_token}"},
)
resp.raise_for_status()
return resp.json()
async def post_gitea_comment(self, issue_number: int, comment: str) -> None:
"""Post a comment to a Gitea issue."""
owner, repo = settings.gitea_repo.split("/", 1)
api_url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/issues/{issue_number}/comments"
async with httpx.AsyncClient(timeout=15) as client:
await client.post(
api_url,
headers={"Authorization": f"token {settings.gitea_token}"},
json={"body": comment},
)
async def run_research_pipeline(self, issue_title: str) -> str:
"""Run the research pipeline."""
search_results = await google_web_search(issue_title)
llm_client = get_llm_client()
response = await llm_client.completion(
f"Summarize the following search results and generate a research report:\\n\\n{search_results}",
max_tokens=2048,
)
return response.text
async def run(self, context: dict) -> str:
"""Run a research task."""
issue_number = context.get("issue_number")
if not issue_number:
return "Missing issue_number in task context"
issue = await self.get_gitea_issue(issue_number)
report = await self.run_research_pipeline(issue["title"])
triage_results = await triage_research_report(report, source_issue=issue_number)
comment = f"Research complete for issue #{issue_number}.\\n\\n"
if triage_results:
comment += "Created the following issues:\\n"
for result in triage_results:
if result["gitea_issue"]:
comment += f"- #{result['gitea_issue']['number']}: {result['action_item'].title}\\n"
else:
comment += "No new issues were created.\\n"
await self.post_gitea_comment(issue_number, comment)
return f"Research complete for issue #{issue_number}"
class PaperclipPoller:
"""Polls the Paperclip API for new tasks."""
def __init__(self) -> None:
self.client = PaperclipClient()
self.orchestrator = ResearchOrchestrator()
self.poll_interval = settings.paperclip_poll_interval
async def poll(self) -> None:
"""Poll the Paperclip API for new tasks."""
if self.poll_interval == 0:
return
while True:
try:
tasks = await self.client.get_tasks()
for task in tasks:
if task.kind == "research":
await self.run_research_task(task)
except httpx.HTTPError as exc:
logger.warning("Error polling Paperclip: %s", exc)
await asyncio.sleep(self.poll_interval)
async def run_research_task(self, task: PaperclipTask) -> None:
"""Run a research task."""
await self.client.update_task_status(task.id, "running")
try:
result = await self.orchestrator.run(task.context)
await self.client.update_task_status(task.id, "completed", result)
except Exception as exc:
logger.error("Error running research task: %s", exc, exc_info=True)
await self.client.update_task_status(task.id, "failed", str(exc))
async def start_paperclip_poller() -> None:
"""Start the Paperclip poller."""
if settings.paperclip_enabled:
poller = PaperclipPoller()
asyncio.create_task(poller.poll())

View File

@@ -1,42 +0,0 @@
"""Tools for the research pipeline."""
from __future__ import annotations
import logging
import os
from typing import Any
from config import settings
from serpapi import GoogleSearch
logger = logging.getLogger(__name__)
async def google_web_search(query: str) -> str:
"""Perform a Google search and return the results."""
if "SERPAPI_API_KEY" not in os.environ:
logger.warning("SERPAPI_API_KEY not set, skipping web search")
return ""
params = {
"q": query,
"api_key": os.environ["SERPAPI_API_KEY"],
}
search = GoogleSearch(params)
results = search.get_dict()
return str(results)
def get_llm_client() -> Any:
"""Get an LLM client."""
# This is a placeholder. In a real application, this would return
# a client for an LLM service like OpenAI, Anthropic, or a local
# model.
class MockLLMClient:
async def completion(self, prompt: str, max_tokens: int) -> Any:
class MockCompletion:
def __init__(self, text: str) -> None:
self.text = text
return MockCompletion(f"This is a summary of the search results for '{prompt}'.")
return MockLLMClient()

View File

@@ -54,7 +54,9 @@ class ActionItem:
parts.append(f"- {url}")
if source_issue:
parts.append(f"\n### Origin\nExtracted from research in #{source_issue}")
parts.append(
f"\n### Origin\nExtracted from research in #{source_issue}"
)
parts.append("\n---\n*Auto-triaged from research findings by Timmy*")
return "\n".join(parts)
@@ -121,7 +123,7 @@ def _validate_action_item(raw_item: dict[str, Any]) -> ActionItem | None:
labels = raw_item.get("labels", [])
if isinstance(labels, str):
labels = [lbl.strip() for lbl in labels.split(",") if lbl.strip()]
labels = [l.strip() for l in labels.split(",") if l.strip()]
if not isinstance(labels, list):
labels = []
@@ -301,7 +303,7 @@ async def _resolve_label_ids(
if resp.status_code != 200:
return []
existing = {lbl["name"]: lbl["id"] for lbl in resp.json()}
existing = {l["name"]: l["id"] for l in resp.json()}
label_ids = []
for name in label_names:

View File

@@ -14,9 +14,7 @@ app = typer.Typer(help="Timmy Serve — sovereign AI agent API")
def start(
port: int = typer.Option(8402, "--port", "-p", help="Port for the serve API"),
host: str = typer.Option("0.0.0.0", "--host", "-h", help="Host to bind to"),
price: int = typer.Option(
None, "--price", help="Price per request in sats (default: from config)"
),
price: int = typer.Option(None, "--price", help="Price per request in sats (default: from config)"),
dry_run: bool = typer.Option(False, "--dry-run", help="Print config and exit (for testing)"),
):
"""Start Timmy in serve mode."""

View File

@@ -24,6 +24,7 @@ from dashboard.routes.health import (
_generate_recommendations,
)
# ---------------------------------------------------------------------------
# Pydantic models
# ---------------------------------------------------------------------------
@@ -117,9 +118,7 @@ class TestGenerateRecommendations:
def test_unavailable_service(self):
deps = [
DependencyStatus(
name="Ollama AI", status="unavailable", sovereignty_score=10, details={}
)
DependencyStatus(name="Ollama AI", status="unavailable", sovereignty_score=10, details={})
]
recs = _generate_recommendations(deps)
assert any("Ollama AI is unavailable" in r for r in recs)
@@ -138,7 +137,9 @@ class TestGenerateRecommendations:
def test_degraded_non_lightning(self):
"""Degraded non-Lightning dep produces no specific recommendation."""
deps = [DependencyStatus(name="Redis", status="degraded", sovereignty_score=5, details={})]
deps = [
DependencyStatus(name="Redis", status="degraded", sovereignty_score=5, details={})
]
recs = _generate_recommendations(deps)
assert recs == ["System operating optimally - all dependencies healthy"]
@@ -378,9 +379,7 @@ class TestHealthEndpoint:
assert response.status_code == 200
def test_ok_when_ollama_up(self, client):
with patch(
"dashboard.routes.health.check_ollama", new_callable=AsyncMock, return_value=True
):
with patch("dashboard.routes.health.check_ollama", new_callable=AsyncMock, return_value=True):
data = client.get("/health").json()
assert data["status"] == "ok"
@@ -416,9 +415,7 @@ class TestHealthStatusPanel:
assert "text/html" in response.headers["content-type"]
def test_shows_up_when_ollama_healthy(self, client):
with patch(
"dashboard.routes.health.check_ollama", new_callable=AsyncMock, return_value=True
):
with patch("dashboard.routes.health.check_ollama", new_callable=AsyncMock, return_value=True):
text = client.get("/health/status").text
assert "UP" in text

View File

@@ -1,267 +1,139 @@
"""Tests for Claude Quota Monitor and Metabolic Protocol."""
"""Tests for the Claude quota tracker and metabolic mode advisor.
from datetime import UTC, datetime, timedelta
from unittest.mock import patch
Refs: #1074
"""
import pytest
from infrastructure.claude_quota import (
MetabolicTier,
QuotaMonitor,
QuotaStatus,
_time_remaining,
get_quota_monitor,
ACTIVE_THRESHOLD,
BURST_THRESHOLD,
ClaudeCall,
ClaudeQuotaStore,
MetabolicMode,
_mode_for_cost,
current_mode,
quota_report,
record_usage,
)
def _make_status(five_hour: float = 0.0, seven_day: float = 0.0) -> QuotaStatus:
"""Helper: build a QuotaStatus with given utilization values."""
return QuotaStatus(
five_hour_utilization=five_hour,
five_hour_resets_at=None,
seven_day_utilization=seven_day,
seven_day_resets_at=None,
raw_response={},
fetched_at=datetime.now(UTC),
)
@pytest.fixture
def store(tmp_path):
"""Fresh quota store backed by a temp DB."""
return ClaudeQuotaStore(db_path=tmp_path / "test_quota.db")
class TestMetabolicTierThresholds:
"""Test the three-tier metabolic protocol thresholds."""
def test_burst_when_five_hour_below_50pct(self):
status = _make_status(five_hour=0.49, seven_day=0.10)
assert status.recommended_tier == MetabolicTier.BURST
def test_burst_at_zero_utilization(self):
status = _make_status(five_hour=0.0, seven_day=0.0)
assert status.recommended_tier == MetabolicTier.BURST
def test_active_when_five_hour_at_50pct(self):
status = _make_status(five_hour=0.50, seven_day=0.10)
assert status.recommended_tier == MetabolicTier.ACTIVE
def test_active_when_five_hour_between_50_and_80pct(self):
status = _make_status(five_hour=0.79, seven_day=0.10)
assert status.recommended_tier == MetabolicTier.ACTIVE
def test_active_when_five_hour_at_80pct(self):
# five_hour >= 0.80 but seven_day < 0.80 → ACTIVE (not RESTING)
status = _make_status(five_hour=0.80, seven_day=0.50)
assert status.recommended_tier == MetabolicTier.ACTIVE
def test_resting_when_seven_day_at_80pct(self):
status = _make_status(five_hour=0.30, seven_day=0.80)
assert status.recommended_tier == MetabolicTier.RESTING
def test_resting_when_seven_day_above_80pct(self):
status = _make_status(five_hour=0.10, seven_day=0.95)
assert status.recommended_tier == MetabolicTier.RESTING
def test_resting_when_both_critical(self):
status = _make_status(five_hour=0.90, seven_day=0.90)
assert status.recommended_tier == MetabolicTier.RESTING
def test_seven_day_takes_precedence_over_five_hour(self):
# Weekly quota critical overrides whatever five-hour says
status = _make_status(five_hour=0.10, seven_day=0.85)
assert status.recommended_tier == MetabolicTier.RESTING
# ── Unit: cost calculation ────────────────────────────────────────────────────
class TestQuotaStatusProperties:
"""Test QuotaStatus computed properties."""
class TestClaudeCallCost:
def test_haiku_cost(self):
call = ClaudeCall(model="haiku", input_tokens=1_000_000, output_tokens=0)
assert call.cost_usd == pytest.approx(0.25)
def test_five_hour_pct(self):
status = _make_status(five_hour=0.42)
assert status.five_hour_pct == 42
def test_sonnet_output_cost(self):
call = ClaudeCall(model="sonnet", input_tokens=0, output_tokens=1_000_000)
assert call.cost_usd == pytest.approx(15.00)
def test_seven_day_pct(self):
status = _make_status(seven_day=0.75)
assert status.seven_day_pct == 75
def test_opus_combined_cost(self):
call = ClaudeCall(model="opus", input_tokens=100_000, output_tokens=50_000)
# input: 100k * 15/1M = 1.50, output: 50k * 75/1M = 3.75 → 5.25
assert call.cost_usd == pytest.approx(5.25)
def test_summary_contains_tier(self):
status = _make_status(five_hour=0.20, seven_day=0.10)
summary = status.summary()
assert "burst" in summary
assert "20%" in summary
def test_unknown_model_uses_default(self):
call = ClaudeCall(model="unknown-model-xyz", input_tokens=1_000_000, output_tokens=0)
assert call.cost_usd == pytest.approx(3.00) # default input cost
def test_five_hour_resets_in_unknown_when_none(self):
status = _make_status()
assert status.five_hour_resets_in == "unknown"
def test_seven_day_resets_in_unknown_when_none(self):
status = _make_status()
assert status.seven_day_resets_in == "unknown"
def test_zero_tokens_zero_cost(self):
call = ClaudeCall(model="haiku", input_tokens=0, output_tokens=0)
assert call.cost_usd == 0.0
class TestTimeRemaining:
"""Test _time_remaining helper."""
def test_none_returns_unknown(self):
assert _time_remaining(None) == "unknown"
def test_empty_string_returns_unknown(self):
assert _time_remaining("") == "unknown"
def test_past_time_returns_resetting_now(self):
past = (datetime.now(UTC) - timedelta(hours=1)).isoformat()
assert _time_remaining(past) == "resetting now"
def test_future_time_hours_and_minutes(self):
future = (datetime.now(UTC) + timedelta(hours=2, minutes=15)).isoformat()
result = _time_remaining(future)
assert "2h" in result
# Minutes may vary ±1 due to test execution time
assert "m" in result
def test_future_time_minutes_only(self):
future = (datetime.now(UTC) + timedelta(minutes=45)).isoformat()
result = _time_remaining(future)
assert "h" not in result
# Minutes may vary ±1 due to test execution time
assert "m" in result
def test_z_suffix_handled(self):
future = (datetime.now(UTC) + timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
result = _time_remaining(future)
assert result != "unknown"
# ── Unit: metabolic mode thresholds ──────────────────────────────────────────
class TestQuotaMonitorSelectModel:
"""Test select_model metabolic routing."""
class TestMetabolicMode:
def test_under_burst_threshold(self):
assert _mode_for_cost(0.0) == "BURST"
assert _mode_for_cost(BURST_THRESHOLD - 0.01) == "BURST"
def test_no_quota_high_complexity_returns_14b(self):
monitor = QuotaMonitor()
monitor._get_token = lambda: None
assert monitor.select_model("high") == "qwen3:14b"
def test_at_burst_threshold_is_active(self):
assert _mode_for_cost(BURST_THRESHOLD) == "ACTIVE"
def test_no_quota_low_complexity_returns_8b(self):
monitor = QuotaMonitor()
monitor._get_token = lambda: None
assert monitor.select_model("low") == "qwen3:8b"
def test_between_thresholds(self):
mid = (BURST_THRESHOLD + ACTIVE_THRESHOLD) / 2
assert _mode_for_cost(mid) == "ACTIVE"
def test_burst_tier_high_complexity_returns_cloud(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.10)
monitor._cache_seconds = 9999
result = monitor.select_model("high")
assert result == "claude-sonnet-4-6"
def test_at_active_threshold_is_resting(self):
assert _mode_for_cost(ACTIVE_THRESHOLD) == "RESTING"
def test_burst_tier_medium_complexity_returns_14b(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.10)
monitor._cache_seconds = 9999
result = monitor.select_model("medium")
assert result == "qwen3:14b"
def test_active_tier_returns_14b(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.65, seven_day=0.10)
monitor._cache_seconds = 9999
result = monitor.select_model("high")
assert result == "qwen3:14b"
def test_resting_tier_returns_8b(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.85)
monitor._cache_seconds = 9999
result = monitor.select_model("high")
assert result == "qwen3:8b"
def test_over_active_threshold(self):
assert _mode_for_cost(ACTIVE_THRESHOLD + 10) == "RESTING"
class TestQuotaMonitorShouldUseCloud:
"""Test should_use_cloud gate."""
def test_no_credentials_always_false(self):
monitor = QuotaMonitor()
monitor._get_token = lambda: None
assert monitor.should_use_cloud("critical") is False
def test_critical_task_allowed_when_under_95pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.94)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("critical") is True
def test_critical_task_blocked_when_over_95pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.10, seven_day=0.96)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("critical") is False
def test_high_task_allowed_under_60pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.59, seven_day=0.10)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("high") is True
def test_high_task_blocked_at_60pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.60, seven_day=0.10)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("high") is False
def test_normal_task_allowed_under_30pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.29, seven_day=0.10)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("normal") is True
def test_normal_task_blocked_at_30pct(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.30, seven_day=0.10)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("normal") is False
def test_routine_task_always_false(self):
monitor = QuotaMonitor()
monitor._last_status = _make_status(five_hour=0.0, seven_day=0.0)
monitor._cache_seconds = 9999
assert monitor.should_use_cloud("routine") is False
# ── Store: record and query ───────────────────────────────────────────────────
class TestQuotaMonitorCaching:
"""Test 30-second TTL cache."""
class TestClaudeQuotaStore:
def test_record_call(self, store):
call = ClaudeCall(model="haiku", input_tokens=1000, output_tokens=500)
store.record_call(call)
summary = store.today_summary()
assert summary.calls == 1
assert summary.input_tokens == 1000
assert summary.output_tokens == 500
assert summary.cost_usd > 0
def test_cached_result_returned_within_ttl(self):
monitor = QuotaMonitor()
fresh_status = _make_status(five_hour=0.10)
monitor._last_status = fresh_status
monitor._cache_seconds = 30
def test_today_summary_empty_db(self, store):
summary = store.today_summary()
assert summary.calls == 0
assert summary.cost_usd == 0.0
assert summary.mode == "BURST"
# Should NOT re-fetch — returns cached
with patch.object(monitor, "_get_token", return_value="tok") as mock_tok:
result = monitor.check()
mock_tok.assert_not_called()
def test_month_summary_aggregates_multiple_calls(self, store):
for _ in range(5):
store.record_call(ClaudeCall(model="haiku", input_tokens=100, output_tokens=50))
month = store.month_summary()
assert month.calls == 5
assert month.input_tokens == 500
assert month.output_tokens == 250
assert result is fresh_status
def test_current_mode_burst_when_empty(self, store):
assert store.current_mode() == "BURST"
def test_stale_cache_triggers_fetch(self):
monitor = QuotaMonitor()
old_time = datetime.now(UTC) - timedelta(seconds=60)
stale_status = QuotaStatus(
five_hour_utilization=0.10,
five_hour_resets_at=None,
seven_day_utilization=0.10,
seven_day_resets_at=None,
raw_response={},
fetched_at=old_time,
def test_current_mode_resting_when_expensive(self, store):
# Record enough usage to push past ACTIVE_THRESHOLD
# ACTIVE_THRESHOLD = 5.00, opus input = 15/1M
# Need >5.00: 5.00/15 * 1M ≈ 333_334 input tokens
store.record_call(
ClaudeCall(model="opus", input_tokens=400_000, output_tokens=0)
)
monitor._last_status = stale_status
mode = store.current_mode()
assert mode == "RESTING"
# Token unavailable → returns None (triggers re-fetch path)
with patch.object(monitor, "_get_token", return_value=None):
result = monitor.check()
assert result is None # No credentials after cache miss
def test_summary_as_dict(self, store):
summary = store.today_summary()
d = summary.as_dict()
assert "period" in d
assert "calls" in d
assert "cost_usd" in d
assert "mode" in d
class TestGetQuotaMonitorSingleton:
"""Test module-level singleton."""
# ── Convenience functions ─────────────────────────────────────────────────────
def test_returns_same_instance(self):
m1 = get_quota_monitor()
m2 = get_quota_monitor()
assert m1 is m2
def test_returns_quota_monitor_instance(self):
monitor = get_quota_monitor()
assert isinstance(monitor, QuotaMonitor)
class TestConvenienceFunctions:
def test_record_usage_does_not_raise(self):
# Uses module-level store; should not raise even if DB path issues
record_usage(model="haiku", input_tokens=10, output_tokens=5, task_label="test")
def test_current_mode_returns_valid_mode(self):
mode = current_mode()
assert mode in ("BURST", "ACTIVE", "RESTING")
def test_quota_report_returns_string(self):
report = quota_report()
assert isinstance(report, str)
assert "BURST" in report or "ACTIVE" in report or "RESTING" in report

View File

@@ -489,197 +489,6 @@ class TestProviderAvailabilityCheck:
assert router._check_provider_available(provider) is False
def test_check_vllm_mlx_without_requests(self):
"""Test vllm-mlx returns True when requests not available (fallback)."""
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = Provider(
name="vllm-mlx-local",
type="vllm_mlx",
enabled=True,
priority=2,
base_url="http://localhost:8000/v1",
)
import infrastructure.router.cascade as cascade_module
old_requests = cascade_module.requests
cascade_module.requests = None
try:
assert router._check_provider_available(provider) is True
finally:
cascade_module.requests = old_requests
def test_check_vllm_mlx_server_healthy(self):
"""Test vllm-mlx when health check succeeds."""
from unittest.mock import MagicMock, patch
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = Provider(
name="vllm-mlx-local",
type="vllm_mlx",
enabled=True,
priority=2,
base_url="http://localhost:8000/v1",
)
mock_response = MagicMock()
mock_response.status_code = 200
with patch("infrastructure.router.cascade.requests") as mock_requests:
mock_requests.get.return_value = mock_response
result = router._check_provider_available(provider)
assert result is True
mock_requests.get.assert_called_once_with("http://localhost:8000/health", timeout=5)
def test_check_vllm_mlx_server_down(self):
"""Test vllm-mlx when server is not running."""
from unittest.mock import patch
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = Provider(
name="vllm-mlx-local",
type="vllm_mlx",
enabled=True,
priority=2,
base_url="http://localhost:8000/v1",
)
with patch("infrastructure.router.cascade.requests") as mock_requests:
mock_requests.get.side_effect = ConnectionRefusedError("Connection refused")
result = router._check_provider_available(provider)
assert result is False
def test_check_vllm_mlx_default_url(self):
"""Test vllm-mlx uses default localhost:8000 when no URL configured."""
from unittest.mock import MagicMock, patch
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = Provider(
name="vllm-mlx-local",
type="vllm_mlx",
enabled=True,
priority=2,
)
mock_response = MagicMock()
mock_response.status_code = 200
with patch("infrastructure.router.cascade.requests") as mock_requests:
mock_requests.get.return_value = mock_response
router._check_provider_available(provider)
mock_requests.get.assert_called_once_with("http://localhost:8000/health", timeout=5)
@pytest.mark.asyncio
class TestVllmMlxProvider:
"""Test vllm-mlx provider integration."""
async def test_complete_with_vllm_mlx(self):
"""Test successful completion via vllm-mlx."""
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = Provider(
name="vllm-mlx-local",
type="vllm_mlx",
enabled=True,
priority=2,
base_url="http://localhost:8000/v1",
models=[{"name": "Qwen/Qwen2.5-14B-Instruct-MLX", "default": True}],
)
router.providers = [provider]
with patch.object(router, "_call_vllm_mlx") as mock_call:
mock_call.return_value = {
"content": "MLX response",
"model": "Qwen/Qwen2.5-14B-Instruct-MLX",
}
result = await router.complete(
messages=[{"role": "user", "content": "Hi"}],
)
assert result["content"] == "MLX response"
assert result["provider"] == "vllm-mlx-local"
assert result["model"] == "Qwen/Qwen2.5-14B-Instruct-MLX"
async def test_vllm_mlx_base_url_normalization(self):
"""Test _call_vllm_mlx appends /v1 when missing."""
from unittest.mock import AsyncMock, MagicMock, patch
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = Provider(
name="vllm-mlx-local",
type="vllm_mlx",
enabled=True,
priority=2,
base_url="http://localhost:8000", # No /v1
models=[{"name": "qwen-mlx", "default": True}],
)
mock_choice = MagicMock()
mock_choice.message.content = "hello"
mock_response = MagicMock()
mock_response.choices = [mock_choice]
mock_response.model = "qwen-mlx"
async def fake_create(**kwargs):
return mock_response
with patch("openai.AsyncOpenAI") as mock_openai_cls:
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock(side_effect=fake_create)
mock_openai_cls.return_value = mock_client
await router._call_vllm_mlx(
provider=provider,
messages=[{"role": "user", "content": "hi"}],
model="qwen-mlx",
temperature=0.7,
max_tokens=None,
)
call_kwargs = mock_openai_cls.call_args
base_url_used = call_kwargs.kwargs.get("base_url") or call_kwargs[1].get("base_url")
assert base_url_used.endswith("/v1")
async def test_vllm_mlx_is_local_not_cloud(self):
"""Confirm vllm_mlx is not subject to metabolic protocol cloud skip."""
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = Provider(
name="vllm-mlx-local",
type="vllm_mlx",
enabled=True,
priority=2,
base_url="http://localhost:8000/v1",
models=[{"name": "qwen-mlx", "default": True}],
)
router.providers = [provider]
# Quota monitor returns False (block cloud) — vllm_mlx should still be tried
with patch("infrastructure.router.cascade._quota_monitor") as mock_qm:
mock_qm.check.return_value = object()
mock_qm.should_use_cloud.return_value = False
with patch.object(router, "_call_vllm_mlx") as mock_call:
mock_call.return_value = {
"content": "Local MLX response",
"model": "qwen-mlx",
}
result = await router.complete(
messages=[{"role": "user", "content": "hi"}],
)
assert result["content"] == "Local MLX response"
class TestCascadeRouterReload:
"""Test hot-reload of providers.yaml."""

View File

@@ -175,7 +175,9 @@ async def test_bridge_run_simple_response():
bridge = MCPBridge(include_gitea=False, include_shell=False)
mock_resp = MagicMock()
mock_resp.json.return_value = {"message": {"role": "assistant", "content": "Hello!"}}
mock_resp.json.return_value = {
"message": {"role": "assistant", "content": "Hello!"}
}
mock_resp.raise_for_status = MagicMock()
mock_client = AsyncMock()
@@ -236,7 +238,9 @@ async def test_bridge_run_with_tool_call():
# Round 2: model returns final text
final_resp = MagicMock()
final_resp.json.return_value = {"message": {"role": "assistant", "content": "Done with tools!"}}
final_resp.json.return_value = {
"message": {"role": "assistant", "content": "Done with tools!"}
}
final_resp.raise_for_status = MagicMock()
mock_client = AsyncMock()
@@ -272,13 +276,17 @@ async def test_bridge_run_unknown_tool():
"message": {
"role": "assistant",
"content": "",
"tool_calls": [{"function": {"name": "nonexistent", "arguments": {}}}],
"tool_calls": [
{"function": {"name": "nonexistent", "arguments": {}}}
],
}
}
tool_call_resp.raise_for_status = MagicMock()
final_resp = MagicMock()
final_resp.json.return_value = {"message": {"role": "assistant", "content": "OK"}}
final_resp.json.return_value = {
"message": {"role": "assistant", "content": "OK"}
}
final_resp.raise_for_status = MagicMock()
mock_client = AsyncMock()
@@ -324,7 +332,9 @@ async def test_bridge_run_max_rounds():
"message": {
"role": "assistant",
"content": "",
"tool_calls": [{"function": {"name": "loop_tool", "arguments": {}}}],
"tool_calls": [
{"function": {"name": "loop_tool", "arguments": {}}}
],
}
}
tool_call_resp.raise_for_status = MagicMock()
@@ -355,7 +365,9 @@ async def test_bridge_run_connection_error():
bridge = MCPBridge(include_gitea=False, include_shell=False)
mock_client = AsyncMock()
mock_client.post = AsyncMock(side_effect=httpx.ConnectError("Connection refused"))
mock_client.post = AsyncMock(
side_effect=httpx.ConnectError("Connection refused")
)
mock_client.aclose = AsyncMock()
bridge._client = mock_client

View File

@@ -9,6 +9,7 @@ import pytest
from timmy.research_triage import (
ActionItem,
_parse_llm_response,
_resolve_label_ids,
_validate_action_item,
create_gitea_issue,
extract_action_items,
@@ -249,9 +250,7 @@ class TestCreateGiteaIssue:
with (
patch("timmy.research_triage.settings") as mock_settings,
patch(
"timmy.research_triage._resolve_label_ids", new_callable=AsyncMock, return_value=[1]
),
patch("timmy.research_triage._resolve_label_ids", new_callable=AsyncMock, return_value=[1]),
patch("timmy.research_triage.httpx.AsyncClient") as mock_cls,
):
mock_settings.gitea_enabled = True
@@ -285,9 +284,7 @@ class TestCreateGiteaIssue:
with (
patch("timmy.research_triage.settings") as mock_settings,
patch(
"timmy.research_triage._resolve_label_ids", new_callable=AsyncMock, return_value=[]
),
patch("timmy.research_triage._resolve_label_ids", new_callable=AsyncMock, return_value=[]),
patch("timmy.research_triage.httpx.AsyncClient") as mock_cls,
):
mock_settings.gitea_enabled = True
@@ -334,9 +331,7 @@ class TestTriageResearchReport:
with (
patch("timmy.research_triage.settings") as mock_settings,
patch(
"timmy.research_triage._resolve_label_ids", new_callable=AsyncMock, return_value=[]
),
patch("timmy.research_triage._resolve_label_ids", new_callable=AsyncMock, return_value=[]),
patch("timmy.research_triage.httpx.AsyncClient") as mock_cls,
):
mock_settings.gitea_enabled = True

View File

@@ -16,7 +16,7 @@ from timmy.memory_system import (
memory_forget,
memory_read,
memory_search,
memory_store,
memory_write,
)
@@ -490,7 +490,7 @@ class TestMemorySearch:
assert isinstance(result, str)
def test_none_top_k_handled(self):
result = memory_search("test", limit=None)
result = memory_search("test", top_k=None)
assert isinstance(result, str)
def test_basic_search_returns_string(self):
@@ -521,12 +521,12 @@ class TestMemoryRead:
assert isinstance(result, str)
class TestMemoryStore:
"""Test module-level memory_store function."""
class TestMemoryWrite:
"""Test module-level memory_write function."""
@pytest.fixture(autouse=True)
def mock_vector_store(self):
"""Mock vector_store functions for memory_store tests."""
"""Mock vector_store functions for memory_write tests."""
# Patch where it's imported from, not where it's used
with (
patch("timmy.memory_system.search_memories") as mock_search,
@@ -542,83 +542,75 @@ class TestMemoryStore:
yield {"search": mock_search, "store": mock_store}
def test_memory_store_empty_report(self):
"""Test that empty report returns error message."""
result = memory_store(topic="test", report="")
def test_memory_write_empty_content(self):
"""Test that empty content returns error message."""
result = memory_write("")
assert "empty" in result.lower()
def test_memory_store_whitespace_only(self):
"""Test that whitespace-only report returns error."""
result = memory_store(topic="test", report=" \n\t ")
def test_memory_write_whitespace_only(self):
"""Test that whitespace-only content returns error."""
result = memory_write(" \n\t ")
assert "empty" in result.lower()
def test_memory_store_valid_content(self, mock_vector_store):
def test_memory_write_valid_content(self, mock_vector_store):
"""Test writing valid content."""
result = memory_store(topic="fact about Timmy", report="Remember this important fact.")
result = memory_write("Remember this important fact.")
assert "stored" in result.lower() or "memory" in result.lower()
mock_vector_store["store"].assert_called_once()
def test_memory_store_dedup_for_facts_or_research(self, mock_vector_store):
"""Test that duplicate facts or research are skipped."""
def test_memory_write_dedup_for_facts(self, mock_vector_store):
"""Test that duplicate facts are skipped."""
# Simulate existing similar fact
mock_entry = MagicMock()
mock_entry.id = "existing-id"
mock_vector_store["search"].return_value = [mock_entry]
# Test with 'fact'
result = memory_store(topic="Similar fact", report="Similar fact text", type="fact")
result = memory_write("Similar fact text", context_type="fact")
assert "similar" in result.lower() or "duplicate" in result.lower()
mock_vector_store["store"].assert_not_called()
mock_vector_store["store"].reset_mock()
# Test with 'research'
result = memory_store(topic="Similar research", report="Similar research content", type="research")
assert "similar" in result.lower() or "duplicate" in result.lower()
mock_vector_store["store"].assert_not_called()
def test_memory_store_no_dedup_for_conversation(self, mock_vector_store):
def test_memory_write_no_dedup_for_conversation(self, mock_vector_store):
"""Test that conversation entries are not deduplicated."""
# Even with existing entries, conversations should be stored
mock_entry = MagicMock()
mock_entry.id = "existing-id"
mock_vector_store["search"].return_value = [mock_entry]
memory_store(topic="Conversation", report="Conversation text", type="conversation")
memory_write("Conversation text", context_type="conversation")
# Should still store (no duplicate check for non-fact)
mock_vector_store["store"].assert_called_once()
def test_memory_store_invalid_type_defaults_to_research(self, mock_vector_store):
"""Test that invalid type defaults to 'research'."""
memory_store(topic="Invalid type test", report="Some content", type="invalid_type")
# Should still succeed, using "research" as default
def test_memory_write_invalid_context_type(self, mock_vector_store):
"""Test that invalid context_type defaults to 'fact'."""
memory_write("Some content", context_type="invalid_type")
# Should still succeed, using "fact" as default
mock_vector_store["store"].assert_called_once()
call_kwargs = mock_vector_store["store"].call_args.kwargs
assert call_kwargs.get("context_type") == "research"
assert call_kwargs.get("context_type") == "fact"
def test_memory_store_valid_types(self, mock_vector_store):
def test_memory_write_valid_context_types(self, mock_vector_store):
"""Test all valid context types."""
valid_types = ["fact", "conversation", "document", "research"]
valid_types = ["fact", "conversation", "document"]
for ctx_type in valid_types:
mock_vector_store["store"].reset_mock()
memory_store(topic=f"Topic for {ctx_type}", report=f"Content for {ctx_type}", type=ctx_type)
memory_write(f"Content for {ctx_type}", context_type=ctx_type)
mock_vector_store["store"].assert_called_once()
def test_memory_store_strips_report_and_adds_topic(self, mock_vector_store):
"""Test that report is stripped of leading/trailing whitespace and combined with topic."""
memory_store(topic=" My Topic ", report=" padded content ")
def test_memory_write_strips_content(self, mock_vector_store):
"""Test that content is stripped of leading/trailing whitespace."""
memory_write(" padded content ")
call_kwargs = mock_vector_store["store"].call_args.kwargs
assert call_kwargs.get("content") == "Topic: My Topic\n\nReport: padded content"
assert call_kwargs.get("metadata") == {"topic": " My Topic "}
assert call_kwargs.get("content") == "padded content"
def test_memory_store_unicode_report(self, mock_vector_store):
def test_memory_write_unicode_content(self, mock_vector_store):
"""Test writing unicode content."""
result = memory_store(topic="Unicode", report="Unicode content: 你好世界 🎉")
result = memory_write("Unicode content: 你好世界 🎉")
assert "stored" in result.lower() or "memory" in result.lower()
def test_memory_store_handles_exception(self, mock_vector_store):
def test_memory_write_handles_exception(self, mock_vector_store):
"""Test handling of store_memory exceptions."""
mock_vector_store["store"].side_effect = Exception("DB error")
result = memory_store(topic="Failing", report="This will fail")
result = memory_write("This will fail")
assert "failed" in result.lower() or "error" in result.lower()

View File

@@ -1,460 +0,0 @@
"""Unit tests for timmy.kimi_delegation — Kimi research delegation via Gitea labels."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.kimi_delegation import (
KIMI_LABEL_COLOR,
KIMI_READY_LABEL,
_build_research_template,
_extract_action_items,
_slugify,
delegate_research_to_kimi,
exceeds_local_capacity,
)
# ── Constants ─────────────────────────────────────────────────────────────────
def test_kimi_ready_label():
assert KIMI_READY_LABEL == "kimi-ready"
def test_kimi_label_color_is_hex():
assert KIMI_LABEL_COLOR.startswith("#")
assert len(KIMI_LABEL_COLOR) == 7
# ── exceeds_local_capacity ────────────────────────────────────────────────────
class TestExceedsLocalCapacity:
def test_keyword_comprehensive(self):
assert exceeds_local_capacity("Do a comprehensive review of X") is True
def test_keyword_deep_research(self):
assert exceeds_local_capacity("deep research into neural networks") is True
def test_keyword_benchmark(self):
assert exceeds_local_capacity("benchmark these five models") is True
def test_keyword_exhaustive(self):
assert exceeds_local_capacity("exhaustive list of options") is True
def test_keyword_case_insensitive(self):
assert exceeds_local_capacity("COMPREHENSIVE analysis") is True
def test_keyword_survey(self):
assert exceeds_local_capacity("survey all available tools") is True
def test_keyword_extensive(self):
assert exceeds_local_capacity("extensive documentation needed") is True
def test_short_simple_task(self):
assert exceeds_local_capacity("fix the login bug") is False
def test_long_task_exceeds_word_threshold(self):
long_task = " ".join(["word"] * 55)
assert exceeds_local_capacity(long_task) is True
def test_exactly_at_threshold(self):
at_threshold = " ".join(["word"] * 50)
assert exceeds_local_capacity(at_threshold) is True
def test_just_below_threshold(self):
short = " ".join(["word"] * 49)
assert exceeds_local_capacity(short) is False
def test_empty_string(self):
assert exceeds_local_capacity("") is False
# ── _slugify ──────────────────────────────────────────────────────────────────
class TestSlugify:
def test_simple_text(self):
assert _slugify("Hello World") == "hello-world"
def test_special_characters_removed(self):
assert _slugify("Hello, World!") == "hello-world"
def test_underscores_become_dashes(self):
assert _slugify("hello_world") == "hello-world"
def test_multiple_spaces(self):
assert _slugify("hello world") == "hello-world"
def test_truncates_to_60(self):
long = "a" * 80
result = _slugify(long)
assert len(result) <= 60
def test_no_leading_trailing_dashes(self):
result = _slugify(" hello ")
assert not result.startswith("-")
assert not result.endswith("-")
def test_empty_string(self):
assert _slugify("") == ""
# ── _build_research_template ──────────────────────────────────────────────────
class TestBuildResearchTemplate:
def test_contains_task(self):
body = _build_research_template("My Task", "some context", "What is X?")
assert "My Task" in body
def test_contains_question(self):
body = _build_research_template("Task", "ctx", "What is the answer?")
assert "What is the answer?" in body
def test_contains_context(self):
body = _build_research_template("Task", "project background", "Q?")
assert "project background" in body
def test_contains_kimi_ready_label(self):
body = _build_research_template("Task", "ctx", "Q?")
assert KIMI_READY_LABEL in body
def test_default_priority_normal(self):
body = _build_research_template("Task", "ctx", "Q?")
assert "normal" in body
def test_custom_priority_high(self):
body = _build_research_template("Task", "ctx", "Q?", priority="high")
assert "high" in body
def test_contains_deliverables_section(self):
body = _build_research_template("Task", "ctx", "Q?")
assert "Deliverables" in body
def test_slug_in_artifact_path(self):
body = _build_research_template("My Research Task", "ctx", "Q?")
assert "my-research-task" in body
def test_contains_research_request_header(self):
body = _build_research_template("Task", "ctx", "Q?")
assert "## Research Request" in body
# ── _extract_action_items ─────────────────────────────────────────────────────
class TestExtractActionItems:
def test_checkbox_items(self):
text = "- [ ] Do thing A\n- [ ] Do thing B"
items = _extract_action_items(text)
assert "Do thing A" in items
assert "Do thing B" in items
def test_numbered_list(self):
text = "1. First step\n2. Second step\n3. Third step"
items = _extract_action_items(text)
assert "First step" in items
assert "Second step" in items
assert "Third step" in items
def test_action_prefix(self):
text = "Action: Implement caching layer"
items = _extract_action_items(text)
assert "Implement caching layer" in items
def test_todo_prefix(self):
text = "TODO: Write tests"
items = _extract_action_items(text)
assert "Write tests" in items
def test_next_step_prefix(self):
text = "Next step: Deploy to staging"
items = _extract_action_items(text)
assert "Deploy to staging" in items
def test_case_insensitive_prefixes(self):
text = "TODO: Upper\ntodo: lower\nTodo: Mixed"
items = _extract_action_items(text)
assert len(items) == 3
def test_deduplication(self):
text = "1. Do the thing\n2. Do the thing"
items = _extract_action_items(text)
assert items.count("Do the thing") == 1
def test_empty_text(self):
assert _extract_action_items("") == []
def test_no_action_items(self):
text = "This is just a paragraph with no action items."
assert _extract_action_items(text) == []
def test_returns_list(self):
assert isinstance(_extract_action_items("1. Item"), list)
# ── delegate_research_to_kimi ─────────────────────────────────────────────────
class TestDelegateResearchToKimi:
@pytest.mark.asyncio
async def test_empty_task_returns_error(self):
result = await delegate_research_to_kimi("", "context", "question?")
assert result["success"] is False
assert "task" in result["error"].lower()
@pytest.mark.asyncio
async def test_whitespace_task_returns_error(self):
result = await delegate_research_to_kimi(" ", "context", "question?")
assert result["success"] is False
@pytest.mark.asyncio
async def test_empty_question_returns_error(self):
result = await delegate_research_to_kimi("Task title", "context", "")
assert result["success"] is False
assert "question" in result["error"].lower()
@pytest.mark.asyncio
async def test_whitespace_question_returns_error(self):
result = await delegate_research_to_kimi("Task", "ctx", " ")
assert result["success"] is False
@pytest.mark.asyncio
async def test_delegates_to_create_issue(self):
with patch(
"timmy.kimi_delegation.create_kimi_research_issue",
new_callable=AsyncMock,
return_value={
"success": True,
"issue_number": 42,
"issue_url": "http://x/42",
"error": None,
},
) as mock_create:
result = await delegate_research_to_kimi("Task", "ctx", "What is X?", "high")
mock_create.assert_awaited_once_with("Task", "ctx", "What is X?", "high")
assert result["success"] is True
assert result["issue_number"] == 42
@pytest.mark.asyncio
async def test_passes_default_priority(self):
with patch(
"timmy.kimi_delegation.create_kimi_research_issue",
new_callable=AsyncMock,
return_value={"success": True, "issue_number": 1, "issue_url": "", "error": None},
) as mock_create:
await delegate_research_to_kimi("Task", "ctx", "Q?")
_, _, _, priority = mock_create.call_args.args
assert priority == "normal"
# ── create_kimi_research_issue ────────────────────────────────────────────────
class TestCreateKimiResearchIssue:
@pytest.mark.asyncio
async def test_no_gitea_token_returns_error(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = ""
with patch("config.settings", mock_settings):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is False
assert "not configured" in result["error"]
@pytest.mark.asyncio
async def test_gitea_disabled_returns_error(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = "tok"
with patch("config.settings", mock_settings):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is False
@pytest.mark.asyncio
async def test_successful_issue_creation(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.gitea_repo = "owner/repo"
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = [{"name": "kimi-ready", "id": 7}]
issue_resp = MagicMock()
issue_resp.status_code = 201
issue_resp.json.return_value = {
"number": 101,
"html_url": "http://gitea.local/issues/101",
}
mock_client = AsyncMock()
mock_client.get.return_value = label_resp
mock_client.post.return_value = issue_resp
async_ctx = AsyncMock()
async_ctx.__aenter__.return_value = mock_client
async_ctx.__aexit__.return_value = False
with (
patch("config.settings", mock_settings),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is True
assert result["issue_number"] == 101
assert result["error"] is None
@pytest.mark.asyncio
async def test_api_error_returns_failure(self):
from timmy.kimi_delegation import create_kimi_research_issue
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.gitea_repo = "owner/repo"
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = [{"name": "kimi-ready", "id": 7}]
issue_resp = MagicMock()
issue_resp.status_code = 500
issue_resp.text = "Internal Server Error"
mock_client = AsyncMock()
mock_client.get.return_value = label_resp
mock_client.post.return_value = issue_resp
async_ctx = AsyncMock()
async_ctx.__aenter__.return_value = mock_client
async_ctx.__aexit__.return_value = False
with (
patch("config.settings", mock_settings),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await create_kimi_research_issue("Task", "ctx", "Q?")
assert result["success"] is False
assert "500" in result["error"]
# ── index_kimi_artifact ───────────────────────────────────────────────────────
class TestIndexKimiArtifact:
@pytest.mark.asyncio
async def test_empty_artifact_returns_error(self):
from timmy.kimi_delegation import index_kimi_artifact
result = await index_kimi_artifact(42, "Title", "")
assert result["success"] is False
assert "Empty" in result["error"]
@pytest.mark.asyncio
async def test_whitespace_only_artifact_returns_error(self):
from timmy.kimi_delegation import index_kimi_artifact
result = await index_kimi_artifact(42, "Title", " \n ")
assert result["success"] is False
@pytest.mark.asyncio
async def test_successful_indexing(self):
from timmy.kimi_delegation import index_kimi_artifact
mock_entry = MagicMock()
mock_entry.id = "mem-abc-123"
with patch("timmy.memory_system.store_memory", return_value=mock_entry) as mock_store:
result = await index_kimi_artifact(55, "Research Title", "Artifact content here.")
assert result["success"] is True
assert result["memory_id"] == "mem-abc-123"
mock_store.assert_called_once()
call_kwargs = mock_store.call_args.kwargs
assert call_kwargs["source"] == "kimi"
assert call_kwargs["context_type"] == "document"
assert call_kwargs["task_id"] == "55"
@pytest.mark.asyncio
async def test_store_memory_exception_returns_error(self):
from timmy.kimi_delegation import index_kimi_artifact
with patch(
"timmy.memory_system.store_memory",
side_effect=RuntimeError("DB error"),
):
result = await index_kimi_artifact(1, "T", "Some content")
assert result["success"] is False
assert "DB error" in result["error"]
# ── extract_and_create_followups ──────────────────────────────────────────────
class TestExtractAndCreateFollowups:
@pytest.mark.asyncio
async def test_no_action_items_returns_empty_list(self):
from timmy.kimi_delegation import extract_and_create_followups
result = await extract_and_create_followups("No action items here.", 10)
assert result["success"] is True
assert result["created"] == []
assert result["error"] is None
@pytest.mark.asyncio
async def test_gitea_not_configured(self):
from timmy.kimi_delegation import extract_and_create_followups
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
with patch("config.settings", mock_settings):
result = await extract_and_create_followups("1. Do the thing", 10)
assert result["success"] is False
assert result["created"] == []
@pytest.mark.asyncio
async def test_creates_followup_issues(self):
from timmy.kimi_delegation import extract_and_create_followups
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.gitea_repo = "owner/repo"
issue_resp = MagicMock()
issue_resp.status_code = 201
issue_resp.json.return_value = {"number": 200}
mock_client = AsyncMock()
mock_client.post.return_value = issue_resp
async_ctx = AsyncMock()
async_ctx.__aenter__.return_value = mock_client
async_ctx.__aexit__.return_value = False
with (
patch("config.settings", mock_settings),
patch("httpx.AsyncClient", return_value=async_ctx),
):
result = await extract_and_create_followups("1. Do the thing\n2. Do another thing", 10)
assert result["success"] is True
assert 200 in result["created"]