[perplexity] feat: Sovereignty Loop core framework — auto-crystallizer, graduation test, orchestration (#953) (#1331)
Co-authored-by: Perplexity Computer <perplexity@tower.local> Co-committed-by: Perplexity Computer <perplexity@tower.local>
This commit was merged in pull request #1331.
This commit is contained in:
201
docs/SOVEREIGNTY_INTEGRATION.md
Normal file
201
docs/SOVEREIGNTY_INTEGRATION.md
Normal file
@@ -0,0 +1,201 @@
|
||||
# Sovereignty Loop — Integration Guide
|
||||
|
||||
How to use the sovereignty subsystem in new code and existing modules.
|
||||
|
||||
> "The measure of progress is not features added. It is model calls eliminated."
|
||||
|
||||
Refs: #953 (The Sovereignty Loop)
|
||||
|
||||
---
|
||||
|
||||
## Quick Start
|
||||
|
||||
Every model call must follow the sovereignty protocol:
|
||||
**check cache → miss → infer → crystallize → return**
|
||||
|
||||
### Perception Layer (VLM calls)
|
||||
|
||||
```python
|
||||
from timmy.sovereignty.sovereignty_loop import sovereign_perceive
|
||||
from timmy.sovereignty.perception_cache import PerceptionCache
|
||||
|
||||
cache = PerceptionCache("data/templates.json")
|
||||
|
||||
state = await sovereign_perceive(
|
||||
screenshot=frame,
|
||||
cache=cache,
|
||||
vlm=my_vlm_client,
|
||||
session_id="session_001",
|
||||
)
|
||||
```
|
||||
|
||||
### Decision Layer (LLM calls)
|
||||
|
||||
```python
|
||||
from timmy.sovereignty.sovereignty_loop import sovereign_decide
|
||||
|
||||
result = await sovereign_decide(
|
||||
context={"health": 25, "enemy_count": 3},
|
||||
llm=my_llm_client,
|
||||
session_id="session_001",
|
||||
)
|
||||
# result["action"] could be "heal" from a cached rule or fresh LLM reasoning
|
||||
```
|
||||
|
||||
### Narration Layer
|
||||
|
||||
```python
|
||||
from timmy.sovereignty.sovereignty_loop import sovereign_narrate
|
||||
|
||||
text = await sovereign_narrate(
|
||||
event={"type": "combat_start", "enemy": "Cliff Racer"},
|
||||
llm=my_llm_client, # optional — None for template-only
|
||||
session_id="session_001",
|
||||
)
|
||||
```
|
||||
|
||||
### General Purpose (Decorator)
|
||||
|
||||
```python
|
||||
from timmy.sovereignty.sovereignty_loop import sovereignty_enforced
|
||||
|
||||
@sovereignty_enforced(
|
||||
layer="decision",
|
||||
cache_check=lambda a, kw: rule_store.find_matching(kw.get("ctx")),
|
||||
crystallize=lambda result, a, kw: rule_store.add(extract_rules(result)),
|
||||
)
|
||||
async def my_expensive_function(ctx):
|
||||
return await llm.reason(ctx)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Auto-Crystallizer
|
||||
|
||||
Automatically extracts rules from LLM reasoning chains:
|
||||
|
||||
```python
|
||||
from timmy.sovereignty.auto_crystallizer import crystallize_reasoning, get_rule_store
|
||||
|
||||
# After any LLM call with reasoning output:
|
||||
rules = crystallize_reasoning(
|
||||
llm_response="I chose heal because health was below 30%.",
|
||||
context={"game": "morrowind"},
|
||||
)
|
||||
|
||||
store = get_rule_store()
|
||||
added = store.add_many(rules)
|
||||
```
|
||||
|
||||
### Rule Lifecycle
|
||||
|
||||
1. **Extracted** — confidence 0.5, not yet reliable
|
||||
2. **Applied** — confidence increases (+0.05 per success, -0.10 per failure)
|
||||
3. **Reliable** — confidence ≥ 0.8 + ≥3 applications + ≥60% success rate
|
||||
4. **Autonomous** — reliably bypasses LLM calls
|
||||
|
||||
---
|
||||
|
||||
## Three-Strike Detector
|
||||
|
||||
Enforces automation for repetitive manual work:
|
||||
|
||||
```python
|
||||
from timmy.sovereignty.three_strike import get_detector, ThreeStrikeError
|
||||
|
||||
detector = get_detector()
|
||||
|
||||
try:
|
||||
detector.record("vlm_prompt_edit", "health_bar_template")
|
||||
except ThreeStrikeError:
|
||||
# Must register an automation before continuing
|
||||
detector.register_automation(
|
||||
"vlm_prompt_edit",
|
||||
"health_bar_template",
|
||||
"scripts/auto_health_bar.py",
|
||||
)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Falsework Checklist
|
||||
|
||||
Before any cloud API call, complete the checklist:
|
||||
|
||||
```python
|
||||
from timmy.sovereignty.three_strike import FalseworkChecklist, falsework_check
|
||||
|
||||
checklist = FalseworkChecklist(
|
||||
durable_artifact="embedding vectors for UI element foo",
|
||||
artifact_storage_path="data/vlm/foo_embeddings.json",
|
||||
local_rule_or_cache="vlm_cache",
|
||||
will_repeat=False,
|
||||
sovereignty_delta="eliminates repeated VLM call",
|
||||
)
|
||||
falsework_check(checklist) # raises ValueError if incomplete
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Graduation Test
|
||||
|
||||
Run the five-condition test to evaluate sovereignty readiness:
|
||||
|
||||
```python
|
||||
from timmy.sovereignty.graduation import run_graduation_test
|
||||
|
||||
report = run_graduation_test(
|
||||
sats_earned=100.0,
|
||||
sats_spent=50.0,
|
||||
uptime_hours=24.0,
|
||||
human_interventions=0,
|
||||
)
|
||||
print(report.to_markdown())
|
||||
```
|
||||
|
||||
API endpoint: `GET /sovereignty/graduation/test`
|
||||
|
||||
---
|
||||
|
||||
## Metrics
|
||||
|
||||
Record sovereignty events throughout the codebase:
|
||||
|
||||
```python
|
||||
from timmy.sovereignty.metrics import emit_sovereignty_event
|
||||
|
||||
# Perception hits
|
||||
await emit_sovereignty_event("perception_cache_hit", session_id="s1")
|
||||
await emit_sovereignty_event("perception_vlm_call", session_id="s1")
|
||||
|
||||
# Decision hits
|
||||
await emit_sovereignty_event("decision_rule_hit", session_id="s1")
|
||||
await emit_sovereignty_event("decision_llm_call", session_id="s1")
|
||||
|
||||
# Narration hits
|
||||
await emit_sovereignty_event("narration_template", session_id="s1")
|
||||
await emit_sovereignty_event("narration_llm", session_id="s1")
|
||||
|
||||
# Crystallization
|
||||
await emit_sovereignty_event("skill_crystallized", metadata={"layer": "perception"})
|
||||
```
|
||||
|
||||
Dashboard WebSocket: `ws://localhost:8000/ws/sovereignty`
|
||||
|
||||
---
|
||||
|
||||
## Module Map
|
||||
|
||||
| Module | Purpose | Issue |
|
||||
|--------|---------|-------|
|
||||
| `timmy.sovereignty.metrics` | SQLite event store + sovereignty % | #954 |
|
||||
| `timmy.sovereignty.perception_cache` | OpenCV template matching | #955 |
|
||||
| `timmy.sovereignty.auto_crystallizer` | LLM reasoning → local rules | #961 |
|
||||
| `timmy.sovereignty.sovereignty_loop` | Core orchestration wrappers | #953 |
|
||||
| `timmy.sovereignty.graduation` | Five-condition graduation test | #953 |
|
||||
| `timmy.sovereignty.session_report` | Markdown scorecard + Gitea commit | #957 |
|
||||
| `timmy.sovereignty.three_strike` | Automation enforcement | #962 |
|
||||
| `infrastructure.sovereignty_metrics` | Research sovereignty tracking | #981 |
|
||||
| `dashboard.routes.sovereignty_metrics` | HTMX + API endpoints | #960 |
|
||||
| `dashboard.routes.sovereignty_ws` | WebSocket real-time stream | #960 |
|
||||
| `dashboard.routes.graduation` | Graduation test API | #953 |
|
||||
58
src/dashboard/routes/graduation.py
Normal file
58
src/dashboard/routes/graduation.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""Graduation test dashboard routes.
|
||||
|
||||
Provides API endpoints for running and viewing the five-condition
|
||||
graduation test from the Sovereignty Loop (#953).
|
||||
|
||||
Refs: #953 (Graduation Test)
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter
|
||||
|
||||
router = APIRouter(prefix="/sovereignty/graduation", tags=["sovereignty"])
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@router.get("/test")
|
||||
async def run_graduation_test_api(
|
||||
sats_earned: float = 0.0,
|
||||
sats_spent: float = 0.0,
|
||||
uptime_hours: float = 0.0,
|
||||
human_interventions: int = 0,
|
||||
) -> dict[str, Any]:
|
||||
"""Run the full graduation test and return results.
|
||||
|
||||
Query parameters supply the external metrics (Lightning, heartbeat)
|
||||
that aren't tracked in the sovereignty metrics DB.
|
||||
"""
|
||||
from timmy.sovereignty.graduation import run_graduation_test
|
||||
|
||||
report = run_graduation_test(
|
||||
sats_earned=sats_earned,
|
||||
sats_spent=sats_spent,
|
||||
uptime_hours=uptime_hours,
|
||||
human_interventions=human_interventions,
|
||||
)
|
||||
return report.to_dict()
|
||||
|
||||
|
||||
@router.get("/report")
|
||||
async def graduation_report_markdown(
|
||||
sats_earned: float = 0.0,
|
||||
sats_spent: float = 0.0,
|
||||
uptime_hours: float = 0.0,
|
||||
human_interventions: int = 0,
|
||||
) -> dict[str, str]:
|
||||
"""Run graduation test and return a markdown report."""
|
||||
from timmy.sovereignty.graduation import run_graduation_test
|
||||
|
||||
report = run_graduation_test(
|
||||
sats_earned=sats_earned,
|
||||
sats_spent=sats_spent,
|
||||
uptime_hours=uptime_hours,
|
||||
human_interventions=human_interventions,
|
||||
)
|
||||
return {"markdown": report.to_markdown(), "passed": str(report.all_passed)}
|
||||
@@ -1,18 +1,18 @@
|
||||
"""Sovereignty metrics for the Bannerlord loop.
|
||||
"""Sovereignty subsystem for the Timmy agent.
|
||||
|
||||
Tracks how much of each AI layer (perception, decision, narration)
|
||||
runs locally vs. calls out to an LLM. Feeds the sovereignty dashboard.
|
||||
Implements the Sovereignty Loop governing architecture (#953):
|
||||
Discover → Crystallize → Replace → Measure → Repeat
|
||||
|
||||
Refs: #954, #953
|
||||
Modules:
|
||||
- metrics: SQLite-backed event store for sovereignty %
|
||||
- perception_cache: OpenCV template matching for VLM replacement
|
||||
- auto_crystallizer: Rule extraction from LLM reasoning chains
|
||||
- sovereignty_loop: Core orchestration (sovereign_perceive/decide/narrate)
|
||||
- graduation: Five-condition graduation test runner
|
||||
- session_report: Markdown scorecard generator + Gitea commit
|
||||
- three_strike: Automation enforcement (3-strike detector)
|
||||
|
||||
Three-strike detector and automation enforcement.
|
||||
|
||||
Refs: #962
|
||||
|
||||
Session reporting: auto-generates markdown scorecards at session end
|
||||
and commits them to the Gitea repo for institutional memory.
|
||||
|
||||
Refs: #957 (Session Sovereignty Report Generator)
|
||||
Refs: #953, #954, #955, #956, #957, #961, #962
|
||||
"""
|
||||
|
||||
from timmy.sovereignty.session_report import (
|
||||
@@ -23,6 +23,7 @@ from timmy.sovereignty.session_report import (
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
# Session reporting
|
||||
"generate_report",
|
||||
"commit_report",
|
||||
"generate_and_commit_report",
|
||||
|
||||
409
src/timmy/sovereignty/auto_crystallizer.py
Normal file
409
src/timmy/sovereignty/auto_crystallizer.py
Normal file
@@ -0,0 +1,409 @@
|
||||
"""Auto-Crystallizer for Groq/cloud reasoning chains.
|
||||
|
||||
Automatically analyses LLM reasoning output and extracts durable local
|
||||
rules that can preempt future cloud API calls. Each extracted rule is
|
||||
persisted to ``data/strategy.json`` with confidence tracking.
|
||||
|
||||
Workflow:
|
||||
1. LLM returns a reasoning chain (e.g. "I chose heal because HP < 30%")
|
||||
2. ``crystallize_reasoning()`` extracts condition → action rules
|
||||
3. Rules are stored locally with initial confidence 0.5
|
||||
4. Successful rule applications increase confidence; failures decrease it
|
||||
5. Rules with confidence > 0.8 bypass the LLM entirely
|
||||
|
||||
Rule format (JSON)::
|
||||
|
||||
{
|
||||
"id": "rule_abc123",
|
||||
"condition": "health_pct < 30",
|
||||
"action": "heal",
|
||||
"source": "groq_reasoning",
|
||||
"confidence": 0.5,
|
||||
"times_applied": 0,
|
||||
"times_succeeded": 0,
|
||||
"created_at": "2026-03-23T...",
|
||||
"updated_at": "2026-03-23T...",
|
||||
"reasoning_excerpt": "I chose to heal because health was below 30%"
|
||||
}
|
||||
|
||||
Refs: #961, #953 (The Sovereignty Loop — Section III.5)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Constants ─────────────────────────────────────────────────────────────────
|
||||
|
||||
STRATEGY_PATH = Path(settings.repo_root) / "data" / "strategy.json"
|
||||
|
||||
#: Minimum confidence for a rule to bypass the LLM.
|
||||
CONFIDENCE_THRESHOLD = 0.8
|
||||
|
||||
#: Minimum successful applications before a rule is considered reliable.
|
||||
MIN_APPLICATIONS = 3
|
||||
|
||||
#: Confidence adjustment on successful application.
|
||||
CONFIDENCE_BOOST = 0.05
|
||||
|
||||
#: Confidence penalty on failed application.
|
||||
CONFIDENCE_PENALTY = 0.10
|
||||
|
||||
# ── Regex patterns for extracting conditions from reasoning ───────────────────
|
||||
|
||||
_CONDITION_PATTERNS: list[tuple[str, re.Pattern[str]]] = [
|
||||
# "because X was below/above/less than/greater than Y"
|
||||
(
|
||||
"threshold",
|
||||
re.compile(
|
||||
r"because\s+(\w[\w\s]*?)\s+(?:was|is|were)\s+"
|
||||
r"(?:below|above|less than|greater than|under|over)\s+"
|
||||
r"(\d+(?:\.\d+)?)\s*%?",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
),
|
||||
# "when X is/was Y" or "if X is/was Y"
|
||||
(
|
||||
"state_check",
|
||||
re.compile(
|
||||
r"(?:when|if|since)\s+(\w[\w\s]*?)\s+(?:is|was|were)\s+"
|
||||
r"(\w[\w\s]*?)(?:\.|,|$)",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
),
|
||||
# "X < Y" or "X > Y" or "X <= Y" or "X >= Y"
|
||||
(
|
||||
"comparison",
|
||||
re.compile(
|
||||
r"(\w[\w_.]*)\s*(<=?|>=?|==|!=)\s*(\d+(?:\.\d+)?)",
|
||||
),
|
||||
),
|
||||
# "chose X because Y"
|
||||
(
|
||||
"choice_reason",
|
||||
re.compile(
|
||||
r"(?:chose|selected|picked|decided on)\s+(\w+)\s+because\s+(.+?)(?:\.|$)",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
),
|
||||
# "always X when Y" or "never X when Y"
|
||||
(
|
||||
"always_never",
|
||||
re.compile(
|
||||
r"(always|never)\s+(\w+)\s+when\s+(.+?)(?:\.|,|$)",
|
||||
re.IGNORECASE,
|
||||
),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
# ── Data classes ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass
|
||||
class Rule:
|
||||
"""A crystallised decision rule extracted from LLM reasoning."""
|
||||
|
||||
id: str
|
||||
condition: str
|
||||
action: str
|
||||
source: str = "groq_reasoning"
|
||||
confidence: float = 0.5
|
||||
times_applied: int = 0
|
||||
times_succeeded: int = 0
|
||||
created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
updated_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
reasoning_excerpt: str = ""
|
||||
pattern_type: str = ""
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@property
|
||||
def success_rate(self) -> float:
|
||||
"""Fraction of successful applications."""
|
||||
if self.times_applied == 0:
|
||||
return 0.0
|
||||
return self.times_succeeded / self.times_applied
|
||||
|
||||
@property
|
||||
def is_reliable(self) -> bool:
|
||||
"""True when the rule is reliable enough to bypass the LLM."""
|
||||
return (
|
||||
self.confidence >= CONFIDENCE_THRESHOLD
|
||||
and self.times_applied >= MIN_APPLICATIONS
|
||||
and self.success_rate >= 0.6
|
||||
)
|
||||
|
||||
|
||||
# ── Rule store ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class RuleStore:
|
||||
"""Manages the persistent collection of crystallised rules.
|
||||
|
||||
Rules are stored as a JSON list in ``data/strategy.json``.
|
||||
Thread-safe for read-only; writes should be serialised by the caller.
|
||||
"""
|
||||
|
||||
def __init__(self, path: Path | None = None) -> None:
|
||||
self._path = path or STRATEGY_PATH
|
||||
self._rules: dict[str, Rule] = {}
|
||||
self._load()
|
||||
|
||||
# ── persistence ───────────────────────────────────────────────────────
|
||||
|
||||
def _load(self) -> None:
|
||||
"""Load rules from disk."""
|
||||
if not self._path.exists():
|
||||
self._rules = {}
|
||||
return
|
||||
try:
|
||||
with self._path.open() as f:
|
||||
data = json.load(f)
|
||||
self._rules = {}
|
||||
for entry in data:
|
||||
rule = Rule(**{k: v for k, v in entry.items() if k in Rule.__dataclass_fields__})
|
||||
self._rules[rule.id] = rule
|
||||
logger.debug("Loaded %d crystallised rules from %s", len(self._rules), self._path)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to load strategy rules: %s", exc)
|
||||
self._rules = {}
|
||||
|
||||
def persist(self) -> None:
|
||||
"""Write current rules to disk."""
|
||||
try:
|
||||
self._path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with self._path.open("w") as f:
|
||||
json.dump(
|
||||
[asdict(r) for r in self._rules.values()],
|
||||
f,
|
||||
indent=2,
|
||||
default=str,
|
||||
)
|
||||
logger.debug("Persisted %d rules to %s", len(self._rules), self._path)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to persist strategy rules: %s", exc)
|
||||
|
||||
# ── CRUD ──────────────────────────────────────────────────────────────
|
||||
|
||||
def add(self, rule: Rule) -> None:
|
||||
"""Add or update a rule and persist."""
|
||||
self._rules[rule.id] = rule
|
||||
self.persist()
|
||||
|
||||
def add_many(self, rules: list[Rule]) -> int:
|
||||
"""Add multiple rules. Returns count of new rules added."""
|
||||
added = 0
|
||||
for rule in rules:
|
||||
if rule.id not in self._rules:
|
||||
self._rules[rule.id] = rule
|
||||
added += 1
|
||||
else:
|
||||
# Update confidence if existing rule seen again
|
||||
existing = self._rules[rule.id]
|
||||
existing.confidence = min(1.0, existing.confidence + CONFIDENCE_BOOST)
|
||||
existing.updated_at = datetime.now(UTC).isoformat()
|
||||
if rules:
|
||||
self.persist()
|
||||
return added
|
||||
|
||||
def get(self, rule_id: str) -> Rule | None:
|
||||
"""Retrieve a rule by ID."""
|
||||
return self._rules.get(rule_id)
|
||||
|
||||
def find_matching(self, context: dict[str, Any]) -> list[Rule]:
|
||||
"""Find rules whose conditions match the given context.
|
||||
|
||||
A simple keyword match: if the condition string contains keys
|
||||
from the context, and the rule is reliable, it is included.
|
||||
|
||||
This is intentionally simple — a production implementation would
|
||||
use embeddings or structured condition evaluation.
|
||||
"""
|
||||
matching = []
|
||||
context_str = json.dumps(context).lower()
|
||||
for rule in self._rules.values():
|
||||
if not rule.is_reliable:
|
||||
continue
|
||||
# Simple keyword overlap check
|
||||
condition_words = set(rule.condition.lower().split())
|
||||
if any(word in context_str for word in condition_words if len(word) > 2):
|
||||
matching.append(rule)
|
||||
return sorted(matching, key=lambda r: r.confidence, reverse=True)
|
||||
|
||||
def record_application(self, rule_id: str, succeeded: bool) -> None:
|
||||
"""Record a rule application outcome (success or failure)."""
|
||||
rule = self._rules.get(rule_id)
|
||||
if rule is None:
|
||||
return
|
||||
rule.times_applied += 1
|
||||
if succeeded:
|
||||
rule.times_succeeded += 1
|
||||
rule.confidence = min(1.0, rule.confidence + CONFIDENCE_BOOST)
|
||||
else:
|
||||
rule.confidence = max(0.0, rule.confidence - CONFIDENCE_PENALTY)
|
||||
rule.updated_at = datetime.now(UTC).isoformat()
|
||||
self.persist()
|
||||
|
||||
@property
|
||||
def all_rules(self) -> list[Rule]:
|
||||
"""Return all stored rules."""
|
||||
return list(self._rules.values())
|
||||
|
||||
@property
|
||||
def reliable_rules(self) -> list[Rule]:
|
||||
"""Return only reliable rules (above confidence threshold)."""
|
||||
return [r for r in self._rules.values() if r.is_reliable]
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._rules)
|
||||
|
||||
|
||||
# ── Extraction logic ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _make_rule_id(condition: str, action: str) -> str:
|
||||
"""Deterministic rule ID from condition + action."""
|
||||
key = f"{condition.strip().lower()}:{action.strip().lower()}"
|
||||
return f"rule_{hashlib.sha256(key.encode()).hexdigest()[:12]}"
|
||||
|
||||
|
||||
def crystallize_reasoning(
|
||||
llm_response: str,
|
||||
context: dict[str, Any] | None = None,
|
||||
source: str = "groq_reasoning",
|
||||
) -> list[Rule]:
|
||||
"""Extract actionable rules from an LLM reasoning chain.
|
||||
|
||||
Scans the response text for recognisable patterns (threshold checks,
|
||||
state comparisons, explicit choices) and converts them into ``Rule``
|
||||
objects that can replace future LLM calls.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
llm_response:
|
||||
The full text of the LLM's reasoning output.
|
||||
context:
|
||||
Optional context dict for metadata enrichment.
|
||||
source:
|
||||
Identifier for the originating model/service.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[Rule]
|
||||
Extracted rules (may be empty if no patterns found).
|
||||
"""
|
||||
rules: list[Rule] = []
|
||||
seen_ids: set[str] = set()
|
||||
|
||||
for pattern_type, pattern in _CONDITION_PATTERNS:
|
||||
for match in pattern.finditer(llm_response):
|
||||
groups = match.groups()
|
||||
|
||||
if pattern_type == "threshold" and len(groups) >= 2:
|
||||
variable = groups[0].strip().replace(" ", "_").lower()
|
||||
threshold = groups[1]
|
||||
# Determine direction from surrounding text
|
||||
action = _extract_nearby_action(llm_response, match.end())
|
||||
if "below" in match.group().lower() or "less" in match.group().lower():
|
||||
condition = f"{variable} < {threshold}"
|
||||
else:
|
||||
condition = f"{variable} > {threshold}"
|
||||
|
||||
elif pattern_type == "comparison" and len(groups) >= 3:
|
||||
variable = groups[0].strip()
|
||||
operator = groups[1]
|
||||
value = groups[2]
|
||||
condition = f"{variable} {operator} {value}"
|
||||
action = _extract_nearby_action(llm_response, match.end())
|
||||
|
||||
elif pattern_type == "choice_reason" and len(groups) >= 2:
|
||||
action = groups[0].strip()
|
||||
condition = groups[1].strip()
|
||||
|
||||
elif pattern_type == "always_never" and len(groups) >= 3:
|
||||
modifier = groups[0].strip().lower()
|
||||
action = groups[1].strip()
|
||||
condition = f"{modifier}: {groups[2].strip()}"
|
||||
|
||||
elif pattern_type == "state_check" and len(groups) >= 2:
|
||||
variable = groups[0].strip().replace(" ", "_").lower()
|
||||
state = groups[1].strip().lower()
|
||||
condition = f"{variable} == {state}"
|
||||
action = _extract_nearby_action(llm_response, match.end())
|
||||
|
||||
else:
|
||||
continue
|
||||
|
||||
if not action:
|
||||
action = "unknown"
|
||||
|
||||
rule_id = _make_rule_id(condition, action)
|
||||
if rule_id in seen_ids:
|
||||
continue
|
||||
seen_ids.add(rule_id)
|
||||
|
||||
# Extract a short excerpt around the match for provenance
|
||||
start = max(0, match.start() - 20)
|
||||
end = min(len(llm_response), match.end() + 50)
|
||||
excerpt = llm_response[start:end].strip()
|
||||
|
||||
rules.append(
|
||||
Rule(
|
||||
id=rule_id,
|
||||
condition=condition,
|
||||
action=action,
|
||||
source=source,
|
||||
pattern_type=pattern_type,
|
||||
reasoning_excerpt=excerpt,
|
||||
metadata=context or {},
|
||||
)
|
||||
)
|
||||
|
||||
if rules:
|
||||
logger.info(
|
||||
"Auto-crystallizer extracted %d rule(s) from %s response",
|
||||
len(rules),
|
||||
source,
|
||||
)
|
||||
|
||||
return rules
|
||||
|
||||
|
||||
def _extract_nearby_action(text: str, position: int) -> str:
|
||||
"""Try to extract an action verb/noun near a match position."""
|
||||
# Look at the next 100 chars for action-like words
|
||||
snippet = text[position : position + 100].strip()
|
||||
action_patterns = [
|
||||
re.compile(r"(?:so|then|thus)\s+(?:I\s+)?(\w+)", re.IGNORECASE),
|
||||
re.compile(r"→\s*(\w+)", re.IGNORECASE),
|
||||
re.compile(r"action:\s*(\w+)", re.IGNORECASE),
|
||||
]
|
||||
for pat in action_patterns:
|
||||
m = pat.search(snippet)
|
||||
if m:
|
||||
return m.group(1).strip()
|
||||
return ""
|
||||
|
||||
|
||||
# ── Module-level singleton ────────────────────────────────────────────────────
|
||||
|
||||
_store: RuleStore | None = None
|
||||
|
||||
|
||||
def get_rule_store() -> RuleStore:
|
||||
"""Return (or lazily create) the module-level rule store."""
|
||||
global _store
|
||||
if _store is None:
|
||||
_store = RuleStore()
|
||||
return _store
|
||||
341
src/timmy/sovereignty/graduation.py
Normal file
341
src/timmy/sovereignty/graduation.py
Normal file
@@ -0,0 +1,341 @@
|
||||
"""Graduation Test — Falsework Removal Criteria.
|
||||
|
||||
Evaluates whether the agent meets all five graduation conditions
|
||||
simultaneously. All conditions must be met within a single 24-hour
|
||||
period for the system to be considered sovereign.
|
||||
|
||||
Conditions:
|
||||
1. Perception Independence — 1 hour with no VLM calls after minute 15
|
||||
2. Decision Independence — Full session with <5 cloud API calls
|
||||
3. Narration Independence — All narration from local templates + local LLM
|
||||
4. Economic Independence — sats_earned > sats_spent
|
||||
5. Operational Independence — 24 hours unattended, no human intervention
|
||||
|
||||
Each condition returns a :class:`GraduationResult` with pass/fail,
|
||||
the actual measured value, and the target.
|
||||
|
||||
"The arch must hold after the falsework is removed."
|
||||
|
||||
Refs: #953 (The Sovereignty Loop — Graduation Test)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── Data classes ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConditionResult:
|
||||
"""Result of a single graduation condition evaluation."""
|
||||
|
||||
name: str
|
||||
passed: bool
|
||||
actual: float | int
|
||||
target: float | int
|
||||
unit: str = ""
|
||||
detail: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class GraduationReport:
|
||||
"""Full graduation test report."""
|
||||
|
||||
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
||||
all_passed: bool = False
|
||||
conditions: list[ConditionResult] = field(default_factory=list)
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Serialize to a JSON-safe dict."""
|
||||
return {
|
||||
"timestamp": self.timestamp,
|
||||
"all_passed": self.all_passed,
|
||||
"conditions": [asdict(c) for c in self.conditions],
|
||||
"metadata": self.metadata,
|
||||
}
|
||||
|
||||
def to_markdown(self) -> str:
|
||||
"""Render the report as a markdown string."""
|
||||
status = "PASSED ✓" if self.all_passed else "NOT YET"
|
||||
lines = [
|
||||
"# Graduation Test Report",
|
||||
"",
|
||||
f"**Status:** {status}",
|
||||
f"**Evaluated:** {self.timestamp}",
|
||||
"",
|
||||
"| # | Condition | Target | Actual | Result |",
|
||||
"|---|-----------|--------|--------|--------|",
|
||||
]
|
||||
for i, c in enumerate(self.conditions, 1):
|
||||
result_str = "PASS" if c.passed else "FAIL"
|
||||
actual_str = f"{c.actual}{c.unit}" if c.unit else str(c.actual)
|
||||
target_str = f"{c.target}{c.unit}" if c.unit else str(c.target)
|
||||
lines.append(f"| {i} | {c.name} | {target_str} | {actual_str} | {result_str} |")
|
||||
|
||||
lines.append("")
|
||||
for c in self.conditions:
|
||||
if c.detail:
|
||||
lines.append(f"- **{c.name}**: {c.detail}")
|
||||
|
||||
lines.append("")
|
||||
lines.append('> "The arch must hold after the falsework is removed."')
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── Evaluation functions ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def evaluate_perception_independence(
|
||||
time_window_seconds: float = 3600.0,
|
||||
warmup_seconds: float = 900.0,
|
||||
) -> ConditionResult:
|
||||
"""Test 1: No VLM calls after the first 15 minutes of a 1-hour window.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
time_window_seconds:
|
||||
Total window to evaluate (default: 1 hour).
|
||||
warmup_seconds:
|
||||
Initial warmup period where VLM calls are expected (default: 15 min).
|
||||
"""
|
||||
from timmy.sovereignty.metrics import get_metrics_store
|
||||
|
||||
store = get_metrics_store()
|
||||
|
||||
# Count VLM calls in the post-warmup period
|
||||
# We query all events in the window, then filter by timestamp
|
||||
try:
|
||||
from contextlib import closing
|
||||
|
||||
from timmy.sovereignty.metrics import _seconds_ago_iso
|
||||
|
||||
cutoff_total = _seconds_ago_iso(time_window_seconds)
|
||||
cutoff_warmup = _seconds_ago_iso(time_window_seconds - warmup_seconds)
|
||||
|
||||
with closing(store._connect()) as conn:
|
||||
vlm_calls_after_warmup = conn.execute(
|
||||
"SELECT COUNT(*) FROM events WHERE event_type = 'perception_vlm_call' "
|
||||
"AND timestamp >= ? AND timestamp < ?",
|
||||
(cutoff_total, cutoff_warmup),
|
||||
).fetchone()[0]
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to evaluate perception independence: %s", exc)
|
||||
vlm_calls_after_warmup = -1
|
||||
|
||||
passed = vlm_calls_after_warmup == 0
|
||||
return ConditionResult(
|
||||
name="Perception Independence",
|
||||
passed=passed,
|
||||
actual=vlm_calls_after_warmup,
|
||||
target=0,
|
||||
unit=" VLM calls",
|
||||
detail=f"VLM calls in last {int((time_window_seconds - warmup_seconds) / 60)} min: {vlm_calls_after_warmup}",
|
||||
)
|
||||
|
||||
|
||||
def evaluate_decision_independence(
|
||||
max_api_calls: int = 5,
|
||||
) -> ConditionResult:
|
||||
"""Test 2: Full session with <5 cloud API calls total.
|
||||
|
||||
Counts ``decision_llm_call`` events in the current session.
|
||||
"""
|
||||
from timmy.sovereignty.metrics import get_metrics_store
|
||||
|
||||
store = get_metrics_store()
|
||||
|
||||
try:
|
||||
from contextlib import closing
|
||||
|
||||
with closing(store._connect()) as conn:
|
||||
# Count LLM calls in the last 24 hours
|
||||
from timmy.sovereignty.metrics import _seconds_ago_iso
|
||||
|
||||
cutoff = _seconds_ago_iso(86400.0)
|
||||
api_calls = conn.execute(
|
||||
"SELECT COUNT(*) FROM events WHERE event_type IN "
|
||||
"('decision_llm_call', 'api_call') AND timestamp >= ?",
|
||||
(cutoff,),
|
||||
).fetchone()[0]
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to evaluate decision independence: %s", exc)
|
||||
api_calls = -1
|
||||
|
||||
passed = 0 <= api_calls < max_api_calls
|
||||
return ConditionResult(
|
||||
name="Decision Independence",
|
||||
passed=passed,
|
||||
actual=api_calls,
|
||||
target=max_api_calls,
|
||||
unit=" calls",
|
||||
detail=f"Cloud API calls in last 24h: {api_calls} (target: <{max_api_calls})",
|
||||
)
|
||||
|
||||
|
||||
def evaluate_narration_independence() -> ConditionResult:
|
||||
"""Test 3: All narration from local templates + local LLM (zero cloud calls).
|
||||
|
||||
Checks that ``narration_llm`` events are zero in the last 24 hours
|
||||
while ``narration_template`` events are non-zero.
|
||||
"""
|
||||
from timmy.sovereignty.metrics import get_metrics_store
|
||||
|
||||
store = get_metrics_store()
|
||||
|
||||
try:
|
||||
from contextlib import closing
|
||||
|
||||
from timmy.sovereignty.metrics import _seconds_ago_iso
|
||||
|
||||
cutoff = _seconds_ago_iso(86400.0)
|
||||
|
||||
with closing(store._connect()) as conn:
|
||||
cloud_narrations = conn.execute(
|
||||
"SELECT COUNT(*) FROM events WHERE event_type = 'narration_llm' AND timestamp >= ?",
|
||||
(cutoff,),
|
||||
).fetchone()[0]
|
||||
local_narrations = conn.execute(
|
||||
"SELECT COUNT(*) FROM events WHERE event_type = 'narration_template' "
|
||||
"AND timestamp >= ?",
|
||||
(cutoff,),
|
||||
).fetchone()[0]
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to evaluate narration independence: %s", exc)
|
||||
cloud_narrations = -1
|
||||
local_narrations = 0
|
||||
|
||||
passed = cloud_narrations == 0 and local_narrations > 0
|
||||
return ConditionResult(
|
||||
name="Narration Independence",
|
||||
passed=passed,
|
||||
actual=cloud_narrations,
|
||||
target=0,
|
||||
unit=" cloud calls",
|
||||
detail=f"Cloud narration calls: {cloud_narrations}, local: {local_narrations}",
|
||||
)
|
||||
|
||||
|
||||
def evaluate_economic_independence(
|
||||
sats_earned: float = 0.0,
|
||||
sats_spent: float = 0.0,
|
||||
) -> ConditionResult:
|
||||
"""Test 4: sats_earned > sats_spent.
|
||||
|
||||
Parameters are passed in because sat tracking may live in a separate
|
||||
ledger (Lightning, #851).
|
||||
"""
|
||||
passed = sats_earned > sats_spent and sats_earned > 0
|
||||
net = sats_earned - sats_spent
|
||||
return ConditionResult(
|
||||
name="Economic Independence",
|
||||
passed=passed,
|
||||
actual=net,
|
||||
target=0,
|
||||
unit=" sats net",
|
||||
detail=f"Earned: {sats_earned} sats, spent: {sats_spent} sats, net: {net}",
|
||||
)
|
||||
|
||||
|
||||
def evaluate_operational_independence(
|
||||
uptime_hours: float = 0.0,
|
||||
target_hours: float = 23.5,
|
||||
human_interventions: int = 0,
|
||||
) -> ConditionResult:
|
||||
"""Test 5: 24 hours unattended, no human intervention.
|
||||
|
||||
Uptime and intervention count are passed in from the heartbeat
|
||||
system (#872).
|
||||
"""
|
||||
passed = uptime_hours >= target_hours and human_interventions == 0
|
||||
return ConditionResult(
|
||||
name="Operational Independence",
|
||||
passed=passed,
|
||||
actual=uptime_hours,
|
||||
target=target_hours,
|
||||
unit=" hours",
|
||||
detail=f"Uptime: {uptime_hours}h (target: {target_hours}h), interventions: {human_interventions}",
|
||||
)
|
||||
|
||||
|
||||
# ── Full graduation test ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def run_graduation_test(
|
||||
sats_earned: float = 0.0,
|
||||
sats_spent: float = 0.0,
|
||||
uptime_hours: float = 0.0,
|
||||
human_interventions: int = 0,
|
||||
) -> GraduationReport:
|
||||
"""Run the full 5-condition graduation test.
|
||||
|
||||
Parameters for economic and operational independence must be supplied
|
||||
by the caller since they depend on external systems (Lightning ledger,
|
||||
heartbeat monitor).
|
||||
|
||||
Returns
|
||||
-------
|
||||
GraduationReport
|
||||
Full report with per-condition results and overall pass/fail.
|
||||
"""
|
||||
conditions = [
|
||||
evaluate_perception_independence(),
|
||||
evaluate_decision_independence(),
|
||||
evaluate_narration_independence(),
|
||||
evaluate_economic_independence(sats_earned, sats_spent),
|
||||
evaluate_operational_independence(uptime_hours, human_interventions=human_interventions),
|
||||
]
|
||||
|
||||
all_passed = all(c.passed for c in conditions)
|
||||
|
||||
report = GraduationReport(
|
||||
all_passed=all_passed,
|
||||
conditions=conditions,
|
||||
metadata={
|
||||
"sats_earned": sats_earned,
|
||||
"sats_spent": sats_spent,
|
||||
"uptime_hours": uptime_hours,
|
||||
"human_interventions": human_interventions,
|
||||
},
|
||||
)
|
||||
|
||||
if all_passed:
|
||||
logger.info("GRADUATION TEST PASSED — all 5 conditions met simultaneously")
|
||||
else:
|
||||
failed = [c.name for c in conditions if not c.passed]
|
||||
logger.info(
|
||||
"Graduation test: %d/5 passed. Failed: %s",
|
||||
len(conditions) - len(failed),
|
||||
", ".join(failed),
|
||||
)
|
||||
|
||||
return report
|
||||
|
||||
|
||||
def persist_graduation_report(report: GraduationReport) -> Path:
|
||||
"""Save a graduation report to ``data/graduation_reports/``."""
|
||||
reports_dir = Path(settings.repo_root) / "data" / "graduation_reports"
|
||||
reports_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
|
||||
path = reports_dir / f"graduation_{timestamp}.json"
|
||||
|
||||
try:
|
||||
with path.open("w") as f:
|
||||
json.dump(report.to_dict(), f, indent=2, default=str)
|
||||
logger.info("Graduation report saved to %s", path)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to persist graduation report: %s", exc)
|
||||
|
||||
return path
|
||||
@@ -1,7 +1,21 @@
|
||||
"""OpenCV template-matching cache for sovereignty perception (screen-state recognition)."""
|
||||
"""OpenCV template-matching cache for sovereignty perception.
|
||||
|
||||
Implements "See Once, Template Forever" from the Sovereignty Loop (#953).
|
||||
|
||||
First encounter: VLM analyses screenshot (3-6 sec) → structured JSON.
|
||||
Crystallized as: OpenCV template + bounding box → templates.json (3 ms).
|
||||
|
||||
The ``crystallize_perception()`` function converts VLM output into
|
||||
reusable OpenCV templates, and ``PerceptionCache.match()`` retrieves
|
||||
them without calling the VLM again.
|
||||
|
||||
Refs: #955, #953 (Section III.1 — Perception)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
@@ -9,85 +23,266 @@ from typing import Any
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Template:
|
||||
"""A reusable visual template extracted from VLM analysis."""
|
||||
|
||||
name: str
|
||||
image: np.ndarray
|
||||
threshold: float = 0.85
|
||||
bbox: tuple[int, int, int, int] | None = None # (x1, y1, x2, y2)
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class CacheResult:
|
||||
"""Result of a template match against a screenshot."""
|
||||
|
||||
confidence: float
|
||||
state: Any | None
|
||||
|
||||
|
||||
class PerceptionCache:
|
||||
def __init__(self, templates_path: Path | str = "data/templates.json"):
|
||||
"""OpenCV-based visual template cache.
|
||||
|
||||
Stores templates extracted from VLM responses and matches them
|
||||
against future screenshots using template matching, eliminating
|
||||
the need for repeated VLM calls on known visual patterns.
|
||||
"""
|
||||
|
||||
def __init__(self, templates_path: Path | str = "data/templates.json") -> None:
|
||||
self.templates_path = Path(templates_path)
|
||||
self.templates: list[Template] = []
|
||||
self.load()
|
||||
|
||||
def match(self, screenshot: np.ndarray) -> CacheResult:
|
||||
"""
|
||||
Matches templates against the screenshot.
|
||||
Returns the confidence and the name of the best matching template.
|
||||
"""Match stored templates against a screenshot.
|
||||
|
||||
Returns the highest-confidence match. If confidence exceeds
|
||||
the template's threshold, the cached state is returned.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
screenshot:
|
||||
The current frame as a numpy array (BGR or grayscale).
|
||||
|
||||
Returns
|
||||
-------
|
||||
CacheResult
|
||||
Confidence score and cached state (or None if no match).
|
||||
"""
|
||||
best_match_confidence = 0.0
|
||||
best_match_name = None
|
||||
best_match_metadata = None
|
||||
|
||||
for template in self.templates:
|
||||
res = cv2.matchTemplate(screenshot, template.image, cv2.TM_CCOEFF_NORMED)
|
||||
if template.image.size == 0:
|
||||
continue
|
||||
|
||||
try:
|
||||
# Convert to grayscale if needed for matching
|
||||
if len(screenshot.shape) == 3 and len(template.image.shape) == 2:
|
||||
frame = cv2.cvtColor(screenshot, cv2.COLOR_BGR2GRAY)
|
||||
elif len(screenshot.shape) == 2 and len(template.image.shape) == 3:
|
||||
frame = screenshot
|
||||
# skip mismatched template
|
||||
continue
|
||||
else:
|
||||
frame = screenshot
|
||||
|
||||
# Ensure template is smaller than frame
|
||||
if (
|
||||
template.image.shape[0] > frame.shape[0]
|
||||
or template.image.shape[1] > frame.shape[1]
|
||||
):
|
||||
continue
|
||||
|
||||
res = cv2.matchTemplate(frame, template.image, cv2.TM_CCOEFF_NORMED)
|
||||
_, max_val, _, _ = cv2.minMaxLoc(res)
|
||||
|
||||
if max_val > best_match_confidence:
|
||||
best_match_confidence = max_val
|
||||
best_match_name = template.name
|
||||
best_match_metadata = template.metadata
|
||||
except cv2.error:
|
||||
logger.debug("Template match failed for '%s'", template.name)
|
||||
continue
|
||||
|
||||
if best_match_confidence > 0.85: # TODO: Make this configurable per template
|
||||
if best_match_confidence >= 0.85 and best_match_name is not None:
|
||||
return CacheResult(
|
||||
confidence=best_match_confidence, state={"template_name": best_match_name}
|
||||
confidence=best_match_confidence,
|
||||
state={"template_name": best_match_name, **(best_match_metadata or {})},
|
||||
)
|
||||
else:
|
||||
return CacheResult(confidence=best_match_confidence, state=None)
|
||||
|
||||
def add(self, templates: list[Template]):
|
||||
def add(self, templates: list[Template]) -> None:
|
||||
"""Add new templates to the cache."""
|
||||
self.templates.extend(templates)
|
||||
|
||||
def persist(self):
|
||||
self.templates_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
# Note: This is a simplified persistence mechanism.
|
||||
# A more robust solution would store templates as images and metadata in JSON.
|
||||
with self.templates_path.open("w") as f:
|
||||
json.dump(
|
||||
[{"name": t.name, "threshold": t.threshold} for t in self.templates], f, indent=2
|
||||
)
|
||||
def persist(self) -> None:
|
||||
"""Write template metadata to disk.
|
||||
|
||||
def load(self):
|
||||
if self.templates_path.exists():
|
||||
Note: actual template images are stored alongside as .npy files
|
||||
for fast loading. The JSON file stores metadata only.
|
||||
"""
|
||||
self.templates_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
entries = []
|
||||
for t in self.templates:
|
||||
entry: dict[str, Any] = {"name": t.name, "threshold": t.threshold}
|
||||
if t.bbox is not None:
|
||||
entry["bbox"] = list(t.bbox)
|
||||
if t.metadata:
|
||||
entry["metadata"] = t.metadata
|
||||
|
||||
# Save non-empty template images as .npy
|
||||
if t.image.size > 0:
|
||||
img_path = self.templates_path.parent / f"template_{t.name}.npy"
|
||||
try:
|
||||
np.save(str(img_path), t.image)
|
||||
entry["image_path"] = str(img_path.name)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to save template image for '%s': %s", t.name, exc)
|
||||
|
||||
entries.append(entry)
|
||||
|
||||
with self.templates_path.open("w") as f:
|
||||
json.dump(entries, f, indent=2)
|
||||
logger.debug("Persisted %d templates to %s", len(entries), self.templates_path)
|
||||
|
||||
def load(self) -> None:
|
||||
"""Load templates from disk."""
|
||||
if not self.templates_path.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
with self.templates_path.open("r") as f:
|
||||
templates_data = json.load(f)
|
||||
# This is a simplified loading mechanism and assumes template images are stored elsewhere.
|
||||
# For now, we are not loading the actual images.
|
||||
self.templates = [
|
||||
Template(name=t["name"], image=np.array([]), threshold=t["threshold"])
|
||||
for t in templates_data
|
||||
]
|
||||
except (json.JSONDecodeError, OSError) as exc:
|
||||
logger.warning("Failed to load templates: %s", exc)
|
||||
return
|
||||
|
||||
self.templates = []
|
||||
for t in templates_data:
|
||||
# Try to load the image from .npy if available
|
||||
image = np.array([])
|
||||
image_path = t.get("image_path")
|
||||
if image_path:
|
||||
full_path = self.templates_path.parent / image_path
|
||||
if full_path.exists():
|
||||
try:
|
||||
image = np.load(str(full_path))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
bbox = tuple(t["bbox"]) if "bbox" in t else None
|
||||
|
||||
self.templates.append(
|
||||
Template(
|
||||
name=t["name"],
|
||||
image=image,
|
||||
threshold=t.get("threshold", 0.85),
|
||||
bbox=bbox,
|
||||
metadata=t.get("metadata"),
|
||||
)
|
||||
)
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Remove all templates."""
|
||||
self.templates.clear()
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.templates)
|
||||
|
||||
|
||||
def crystallize_perception(screenshot: np.ndarray, vlm_response: Any) -> list[Template]:
|
||||
def crystallize_perception(
|
||||
screenshot: np.ndarray,
|
||||
vlm_response: Any,
|
||||
) -> list[Template]:
|
||||
"""Extract reusable OpenCV templates from a VLM response.
|
||||
|
||||
Converts VLM-identified UI elements into cropped template images
|
||||
that can be matched in future frames without calling the VLM.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
screenshot:
|
||||
The full screenshot that was analysed by the VLM.
|
||||
vlm_response:
|
||||
Structured VLM output. Expected formats:
|
||||
- dict with ``"items"`` list, each having ``"name"`` and ``"bounding_box"``
|
||||
- dict with ``"elements"`` list (same structure)
|
||||
- list of dicts with ``"name"`` and ``"bbox"`` or ``"bounding_box"``
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[Template]
|
||||
Extracted templates ready to be added to a PerceptionCache.
|
||||
"""
|
||||
Extracts reusable patterns from VLM output and generates OpenCV templates.
|
||||
This is a placeholder and needs to be implemented based on the actual VLM response format.
|
||||
"""
|
||||
# Example implementation:
|
||||
# templates = []
|
||||
# for item in vlm_response.get("items", []):
|
||||
# bbox = item.get("bounding_box")
|
||||
# template_name = item.get("name")
|
||||
# if bbox and template_name:
|
||||
# x1, y1, x2, y2 = bbox
|
||||
# template_image = screenshot[y1:y2, x1:x2]
|
||||
# templates.append(Template(name=template_name, image=template_image))
|
||||
# return templates
|
||||
return []
|
||||
templates: list[Template] = []
|
||||
|
||||
# Normalize the response format
|
||||
items: list[dict[str, Any]] = []
|
||||
if isinstance(vlm_response, dict):
|
||||
items = vlm_response.get("items", vlm_response.get("elements", []))
|
||||
elif isinstance(vlm_response, list):
|
||||
items = vlm_response
|
||||
|
||||
for item in items:
|
||||
name = item.get("name") or item.get("label") or item.get("type")
|
||||
bbox = item.get("bounding_box") or item.get("bbox")
|
||||
|
||||
if not name or not bbox:
|
||||
continue
|
||||
|
||||
try:
|
||||
if len(bbox) == 4:
|
||||
x1, y1, x2, y2 = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])
|
||||
else:
|
||||
continue
|
||||
|
||||
# Validate bounds
|
||||
h, w = screenshot.shape[:2]
|
||||
x1 = max(0, min(x1, w - 1))
|
||||
y1 = max(0, min(y1, h - 1))
|
||||
x2 = max(x1 + 1, min(x2, w))
|
||||
y2 = max(y1 + 1, min(y2, h))
|
||||
|
||||
template_image = screenshot[y1:y2, x1:x2].copy()
|
||||
|
||||
if template_image.size == 0:
|
||||
continue
|
||||
|
||||
metadata = {
|
||||
k: v for k, v in item.items() if k not in ("name", "label", "bounding_box", "bbox")
|
||||
}
|
||||
|
||||
templates.append(
|
||||
Template(
|
||||
name=name,
|
||||
image=template_image,
|
||||
bbox=(x1, y1, x2, y2),
|
||||
metadata=metadata if metadata else None,
|
||||
)
|
||||
)
|
||||
logger.debug(
|
||||
"Crystallized perception template '%s' (%dx%d)",
|
||||
name,
|
||||
x2 - x1,
|
||||
y2 - y1,
|
||||
)
|
||||
|
||||
except (ValueError, IndexError, TypeError) as exc:
|
||||
logger.debug("Failed to crystallize item '%s': %s", name, exc)
|
||||
continue
|
||||
|
||||
if templates:
|
||||
logger.info(
|
||||
"Crystallized %d perception template(s) from VLM response",
|
||||
len(templates),
|
||||
)
|
||||
|
||||
return templates
|
||||
|
||||
379
src/timmy/sovereignty/sovereignty_loop.py
Normal file
379
src/timmy/sovereignty/sovereignty_loop.py
Normal file
@@ -0,0 +1,379 @@
|
||||
"""The Sovereignty Loop — core orchestration.
|
||||
|
||||
Implements the governing pattern from issue #953:
|
||||
|
||||
check cache → miss → infer → crystallize → return
|
||||
|
||||
This module provides wrapper functions that enforce the crystallization
|
||||
protocol for each AI layer (perception, decision, narration) and a
|
||||
decorator for general-purpose sovereignty enforcement.
|
||||
|
||||
Every function follows the same contract:
|
||||
1. Check local cache / rule store for a cached answer.
|
||||
2. On hit → record sovereign event, return cached answer.
|
||||
3. On miss → call the expensive model.
|
||||
4. Crystallize the model output into a durable local artifact.
|
||||
5. Record the model-call event + any new crystallizations.
|
||||
6. Return the result.
|
||||
|
||||
Refs: #953 (The Sovereignty Loop), #955, #956, #961
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import functools
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from typing import Any, TypeVar
|
||||
|
||||
from timmy.sovereignty.metrics import emit_sovereignty_event, get_metrics_store
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
# ── Perception Layer ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def sovereign_perceive(
|
||||
screenshot: Any,
|
||||
cache: Any, # PerceptionCache
|
||||
vlm: Any,
|
||||
*,
|
||||
session_id: str = "",
|
||||
parse_fn: Callable[..., Any] | None = None,
|
||||
crystallize_fn: Callable[..., Any] | None = None,
|
||||
) -> Any:
|
||||
"""Sovereignty-wrapped perception: cache check → VLM → crystallize.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
screenshot:
|
||||
The current frame / screenshot (numpy array or similar).
|
||||
cache:
|
||||
A :class:`~timmy.sovereignty.perception_cache.PerceptionCache`.
|
||||
vlm:
|
||||
An object with an async ``analyze(screenshot)`` method.
|
||||
session_id:
|
||||
Current session identifier for metrics.
|
||||
parse_fn:
|
||||
Optional function to parse the VLM response into game state.
|
||||
Signature: ``parse_fn(vlm_response) -> state``.
|
||||
crystallize_fn:
|
||||
Optional function to extract templates from VLM output.
|
||||
Signature: ``crystallize_fn(screenshot, state) -> list[Template]``.
|
||||
Defaults to ``perception_cache.crystallize_perception``.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Any
|
||||
The parsed game state (from cache or fresh VLM analysis).
|
||||
"""
|
||||
# Step 1: check cache
|
||||
cached = cache.match(screenshot)
|
||||
if cached.confidence > 0.85 and cached.state is not None:
|
||||
await emit_sovereignty_event("perception_cache_hit", session_id=session_id)
|
||||
return cached.state
|
||||
|
||||
# Step 2: cache miss — call VLM
|
||||
await emit_sovereignty_event("perception_vlm_call", session_id=session_id)
|
||||
raw = await vlm.analyze(screenshot)
|
||||
|
||||
# Step 3: parse
|
||||
if parse_fn is not None:
|
||||
state = parse_fn(raw)
|
||||
else:
|
||||
state = raw
|
||||
|
||||
# Step 4: crystallize
|
||||
if crystallize_fn is not None:
|
||||
new_templates = crystallize_fn(screenshot, state)
|
||||
else:
|
||||
from timmy.sovereignty.perception_cache import crystallize_perception
|
||||
|
||||
new_templates = crystallize_perception(screenshot, state)
|
||||
|
||||
if new_templates:
|
||||
cache.add(new_templates)
|
||||
cache.persist()
|
||||
for _ in new_templates:
|
||||
await emit_sovereignty_event(
|
||||
"skill_crystallized",
|
||||
metadata={"layer": "perception"},
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
return state
|
||||
|
||||
|
||||
# ── Decision Layer ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def sovereign_decide(
|
||||
context: dict[str, Any],
|
||||
llm: Any,
|
||||
*,
|
||||
session_id: str = "",
|
||||
rule_store: Any | None = None,
|
||||
confidence_threshold: float = 0.8,
|
||||
) -> dict[str, Any]:
|
||||
"""Sovereignty-wrapped decision: rule check → LLM → crystallize.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
context:
|
||||
Current game state / decision context.
|
||||
llm:
|
||||
An object with an async ``reason(context)`` method that returns
|
||||
a dict with at least ``"action"`` and ``"reasoning"`` keys.
|
||||
session_id:
|
||||
Current session identifier for metrics.
|
||||
rule_store:
|
||||
Optional :class:`~timmy.sovereignty.auto_crystallizer.RuleStore`.
|
||||
If ``None``, the module-level singleton is used.
|
||||
confidence_threshold:
|
||||
Minimum confidence for a rule to be used without LLM.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict[str, Any]
|
||||
The decision result, with at least an ``"action"`` key.
|
||||
"""
|
||||
from timmy.sovereignty.auto_crystallizer import (
|
||||
crystallize_reasoning,
|
||||
get_rule_store,
|
||||
)
|
||||
|
||||
store = rule_store if rule_store is not None else get_rule_store()
|
||||
|
||||
# Step 1: check rules
|
||||
matching_rules = store.find_matching(context)
|
||||
if matching_rules:
|
||||
best = matching_rules[0]
|
||||
if best.confidence >= confidence_threshold:
|
||||
await emit_sovereignty_event(
|
||||
"decision_rule_hit",
|
||||
metadata={"rule_id": best.id, "confidence": best.confidence},
|
||||
session_id=session_id,
|
||||
)
|
||||
return {
|
||||
"action": best.action,
|
||||
"source": "crystallized_rule",
|
||||
"rule_id": best.id,
|
||||
"confidence": best.confidence,
|
||||
}
|
||||
|
||||
# Step 2: rule miss — call LLM
|
||||
await emit_sovereignty_event("decision_llm_call", session_id=session_id)
|
||||
result = await llm.reason(context)
|
||||
|
||||
# Step 3: crystallize the reasoning
|
||||
reasoning_text = result.get("reasoning", "")
|
||||
if reasoning_text:
|
||||
new_rules = crystallize_reasoning(reasoning_text, context=context)
|
||||
added = store.add_many(new_rules)
|
||||
for _ in range(added):
|
||||
await emit_sovereignty_event(
|
||||
"skill_crystallized",
|
||||
metadata={"layer": "decision"},
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ── Narration Layer ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def sovereign_narrate(
|
||||
event: dict[str, Any],
|
||||
llm: Any | None = None,
|
||||
*,
|
||||
session_id: str = "",
|
||||
template_store: Any | None = None,
|
||||
) -> str:
|
||||
"""Sovereignty-wrapped narration: template check → LLM → crystallize.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
event:
|
||||
The game event to narrate (must have at least ``"type"`` key).
|
||||
llm:
|
||||
An optional LLM for novel narration. If ``None`` and no template
|
||||
matches, returns a default string.
|
||||
session_id:
|
||||
Current session identifier for metrics.
|
||||
template_store:
|
||||
Optional narration template store (dict-like mapping event types
|
||||
to template strings with ``{variable}`` slots). If ``None``,
|
||||
tries to load from ``data/narration.json``.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
The narration text.
|
||||
"""
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from config import settings
|
||||
|
||||
# Load template store
|
||||
if template_store is None:
|
||||
narration_path = Path(settings.repo_root) / "data" / "narration.json"
|
||||
if narration_path.exists():
|
||||
try:
|
||||
with narration_path.open() as f:
|
||||
template_store = json.load(f)
|
||||
except Exception:
|
||||
template_store = {}
|
||||
else:
|
||||
template_store = {}
|
||||
|
||||
event_type = event.get("type", "unknown")
|
||||
|
||||
# Step 1: check templates
|
||||
if event_type in template_store:
|
||||
template = template_store[event_type]
|
||||
try:
|
||||
text = template.format(**event)
|
||||
await emit_sovereignty_event("narration_template", session_id=session_id)
|
||||
return text
|
||||
except (KeyError, IndexError):
|
||||
# Template doesn't match event variables — fall through to LLM
|
||||
pass
|
||||
|
||||
# Step 2: no template — call LLM if available
|
||||
if llm is not None:
|
||||
await emit_sovereignty_event("narration_llm", session_id=session_id)
|
||||
narration = await llm.narrate(event)
|
||||
|
||||
# Step 3: crystallize — add template for this event type
|
||||
_crystallize_narration_template(event_type, narration, event, template_store)
|
||||
|
||||
return narration
|
||||
|
||||
# No LLM available — return minimal default
|
||||
await emit_sovereignty_event("narration_template", session_id=session_id)
|
||||
return f"[{event_type}]"
|
||||
|
||||
|
||||
def _crystallize_narration_template(
|
||||
event_type: str,
|
||||
narration: str,
|
||||
event: dict[str, Any],
|
||||
template_store: dict[str, str],
|
||||
) -> None:
|
||||
"""Attempt to crystallize a narration into a reusable template.
|
||||
|
||||
Replaces concrete values in the narration with format placeholders
|
||||
based on event keys, then saves to ``data/narration.json``.
|
||||
"""
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from config import settings
|
||||
|
||||
template = narration
|
||||
for key, value in event.items():
|
||||
if key == "type":
|
||||
continue
|
||||
if isinstance(value, str) and value and value in template:
|
||||
template = template.replace(value, f"{{{key}}}")
|
||||
|
||||
template_store[event_type] = template
|
||||
|
||||
narration_path = Path(settings.repo_root) / "data" / "narration.json"
|
||||
try:
|
||||
narration_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with narration_path.open("w") as f:
|
||||
json.dump(template_store, f, indent=2)
|
||||
logger.info("Crystallized narration template for event type '%s'", event_type)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to persist narration template: %s", exc)
|
||||
|
||||
|
||||
# ── Sovereignty decorator ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def sovereignty_enforced(
|
||||
layer: str,
|
||||
cache_check: Callable[..., Any] | None = None,
|
||||
crystallize: Callable[..., Any] | None = None,
|
||||
) -> Callable:
|
||||
"""Decorator that enforces the sovereignty protocol on any async function.
|
||||
|
||||
Wraps an async function with the check-cache → miss → infer →
|
||||
crystallize → return pattern. If ``cache_check`` returns a non-None
|
||||
result, the wrapped function is skipped entirely.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
layer:
|
||||
The sovereignty layer name (``"perception"``, ``"decision"``,
|
||||
``"narration"``). Used for metric event names.
|
||||
cache_check:
|
||||
A callable ``(args, kwargs) -> cached_result | None``.
|
||||
If it returns non-None, the decorated function is not called.
|
||||
crystallize:
|
||||
A callable ``(result, args, kwargs) -> None`` called after the
|
||||
decorated function returns, to persist the result as a local artifact.
|
||||
|
||||
Example
|
||||
-------
|
||||
::
|
||||
|
||||
@sovereignty_enforced(
|
||||
layer="decision",
|
||||
cache_check=lambda a, kw: rule_store.find_matching(kw.get("ctx")),
|
||||
crystallize=lambda result, a, kw: rule_store.add(extract_rules(result)),
|
||||
)
|
||||
async def decide(ctx):
|
||||
return await llm.reason(ctx)
|
||||
"""
|
||||
|
||||
sovereign_event = (
|
||||
f"{layer}_cache_hit"
|
||||
if layer in ("perception", "decision", "narration")
|
||||
else f"{layer}_sovereign"
|
||||
)
|
||||
miss_event = {
|
||||
"perception": "perception_vlm_call",
|
||||
"decision": "decision_llm_call",
|
||||
"narration": "narration_llm",
|
||||
}.get(layer, f"{layer}_model_call")
|
||||
|
||||
def decorator(fn: Callable) -> Callable:
|
||||
@functools.wraps(fn)
|
||||
async def wrapper(*args: Any, **kwargs: Any) -> Any:
|
||||
# Check cache
|
||||
if cache_check is not None:
|
||||
cached = cache_check(args, kwargs)
|
||||
if cached is not None:
|
||||
store = get_metrics_store()
|
||||
store.record(sovereign_event, session_id=kwargs.get("session_id", ""))
|
||||
return cached
|
||||
|
||||
# Cache miss — run the model
|
||||
store = get_metrics_store()
|
||||
store.record(miss_event, session_id=kwargs.get("session_id", ""))
|
||||
result = await fn(*args, **kwargs)
|
||||
|
||||
# Crystallize
|
||||
if crystallize is not None:
|
||||
try:
|
||||
crystallize(result, args, kwargs)
|
||||
store.record(
|
||||
"skill_crystallized",
|
||||
metadata={"layer": layer},
|
||||
session_id=kwargs.get("session_id", ""),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Crystallization failed for %s: %s", layer, exc)
|
||||
|
||||
return result
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
238
tests/sovereignty/test_auto_crystallizer.py
Normal file
238
tests/sovereignty/test_auto_crystallizer.py
Normal file
@@ -0,0 +1,238 @@
|
||||
"""Tests for the auto-crystallizer module.
|
||||
|
||||
Refs: #961, #953
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestCrystallizeReasoning:
|
||||
"""Tests for rule extraction from LLM reasoning chains."""
|
||||
|
||||
def test_extracts_threshold_rule(self):
|
||||
"""Extracts threshold-based rules from reasoning text."""
|
||||
from timmy.sovereignty.auto_crystallizer import crystallize_reasoning
|
||||
|
||||
reasoning = "I chose to heal because health was below 30%. So I used a healing potion."
|
||||
rules = crystallize_reasoning(reasoning)
|
||||
assert len(rules) >= 1
|
||||
# Should detect the threshold pattern
|
||||
found = any("health" in r.condition.lower() and "30" in r.condition for r in rules)
|
||||
assert found, f"Expected threshold rule, got: {[r.condition for r in rules]}"
|
||||
|
||||
def test_extracts_comparison_rule(self):
|
||||
"""Extracts comparison operators from reasoning."""
|
||||
from timmy.sovereignty.auto_crystallizer import crystallize_reasoning
|
||||
|
||||
reasoning = "The stamina_pct < 20 so I decided to rest."
|
||||
rules = crystallize_reasoning(reasoning)
|
||||
assert len(rules) >= 1
|
||||
found = any("stamina_pct" in r.condition and "<" in r.condition for r in rules)
|
||||
assert found, f"Expected comparison rule, got: {[r.condition for r in rules]}"
|
||||
|
||||
def test_extracts_choice_reason_rule(self):
|
||||
"""Extracts 'chose X because Y' patterns."""
|
||||
from timmy.sovereignty.auto_crystallizer import crystallize_reasoning
|
||||
|
||||
reasoning = "I chose retreat because the enemy outnumbered us."
|
||||
rules = crystallize_reasoning(reasoning)
|
||||
assert len(rules) >= 1
|
||||
found = any(r.action == "retreat" for r in rules)
|
||||
assert found, f"Expected 'retreat' action, got: {[r.action for r in rules]}"
|
||||
|
||||
def test_deduplicates_rules(self):
|
||||
"""Same pattern extracted once, not twice."""
|
||||
from timmy.sovereignty.auto_crystallizer import crystallize_reasoning
|
||||
|
||||
reasoning = (
|
||||
"I chose heal because health was below 30%. Again, health was below 30% so I healed."
|
||||
)
|
||||
rules = crystallize_reasoning(reasoning)
|
||||
ids = [r.id for r in rules]
|
||||
# Duplicate condition+action should produce same ID
|
||||
assert len(ids) == len(set(ids)), "Duplicate rules detected"
|
||||
|
||||
def test_empty_reasoning_returns_no_rules(self):
|
||||
"""Empty or unstructured text produces no rules."""
|
||||
from timmy.sovereignty.auto_crystallizer import crystallize_reasoning
|
||||
|
||||
rules = crystallize_reasoning("")
|
||||
assert rules == []
|
||||
|
||||
rules = crystallize_reasoning("The weather is nice today.")
|
||||
assert rules == []
|
||||
|
||||
def test_rule_has_excerpt(self):
|
||||
"""Extracted rules include a reasoning excerpt for provenance."""
|
||||
from timmy.sovereignty.auto_crystallizer import crystallize_reasoning
|
||||
|
||||
reasoning = "I chose attack because the enemy health was below 50%."
|
||||
rules = crystallize_reasoning(reasoning)
|
||||
assert len(rules) >= 1
|
||||
assert rules[0].reasoning_excerpt != ""
|
||||
|
||||
def test_context_stored_in_metadata(self):
|
||||
"""Context dict is stored in rule metadata."""
|
||||
from timmy.sovereignty.auto_crystallizer import crystallize_reasoning
|
||||
|
||||
context = {"game": "morrowind", "location": "balmora"}
|
||||
reasoning = "I chose to trade because gold_amount > 100."
|
||||
rules = crystallize_reasoning(reasoning, context=context)
|
||||
assert len(rules) >= 1
|
||||
assert rules[0].metadata.get("game") == "morrowind"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestRule:
|
||||
"""Tests for the Rule dataclass."""
|
||||
|
||||
def test_initial_state(self):
|
||||
"""New rules start with default confidence and no applications."""
|
||||
from timmy.sovereignty.auto_crystallizer import Rule
|
||||
|
||||
rule = Rule(id="test", condition="hp < 30", action="heal")
|
||||
assert rule.confidence == 0.5
|
||||
assert rule.times_applied == 0
|
||||
assert rule.times_succeeded == 0
|
||||
assert not rule.is_reliable
|
||||
|
||||
def test_success_rate(self):
|
||||
"""Success rate is calculated correctly."""
|
||||
from timmy.sovereignty.auto_crystallizer import Rule
|
||||
|
||||
rule = Rule(id="test", condition="hp < 30", action="heal")
|
||||
rule.times_applied = 10
|
||||
rule.times_succeeded = 8
|
||||
assert rule.success_rate == 0.8
|
||||
|
||||
def test_is_reliable(self):
|
||||
"""Rule becomes reliable with high confidence + enough applications."""
|
||||
from timmy.sovereignty.auto_crystallizer import Rule
|
||||
|
||||
rule = Rule(
|
||||
id="test",
|
||||
condition="hp < 30",
|
||||
action="heal",
|
||||
confidence=0.85,
|
||||
times_applied=5,
|
||||
times_succeeded=4,
|
||||
)
|
||||
assert rule.is_reliable
|
||||
|
||||
def test_not_reliable_low_confidence(self):
|
||||
"""Rule is not reliable with low confidence."""
|
||||
from timmy.sovereignty.auto_crystallizer import Rule
|
||||
|
||||
rule = Rule(
|
||||
id="test",
|
||||
condition="hp < 30",
|
||||
action="heal",
|
||||
confidence=0.5,
|
||||
times_applied=10,
|
||||
times_succeeded=8,
|
||||
)
|
||||
assert not rule.is_reliable
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestRuleStore:
|
||||
"""Tests for the RuleStore persistence layer."""
|
||||
|
||||
def test_add_and_retrieve(self, tmp_path):
|
||||
"""Rules can be added and retrieved."""
|
||||
from timmy.sovereignty.auto_crystallizer import Rule, RuleStore
|
||||
|
||||
store = RuleStore(path=tmp_path / "strategy.json")
|
||||
rule = Rule(id="r1", condition="hp < 30", action="heal")
|
||||
store.add(rule)
|
||||
|
||||
retrieved = store.get("r1")
|
||||
assert retrieved is not None
|
||||
assert retrieved.condition == "hp < 30"
|
||||
|
||||
def test_persist_and_reload(self, tmp_path):
|
||||
"""Rules survive persist → reload cycle."""
|
||||
from timmy.sovereignty.auto_crystallizer import Rule, RuleStore
|
||||
|
||||
path = tmp_path / "strategy.json"
|
||||
store = RuleStore(path=path)
|
||||
store.add(Rule(id="r1", condition="hp < 30", action="heal"))
|
||||
store.add(Rule(id="r2", condition="mana > 50", action="cast"))
|
||||
|
||||
# Create a new store from the same file
|
||||
store2 = RuleStore(path=path)
|
||||
assert len(store2) == 2
|
||||
assert store2.get("r1") is not None
|
||||
assert store2.get("r2") is not None
|
||||
|
||||
def test_record_application_success(self, tmp_path):
|
||||
"""Recording a successful application boosts confidence."""
|
||||
from timmy.sovereignty.auto_crystallizer import Rule, RuleStore
|
||||
|
||||
store = RuleStore(path=tmp_path / "strategy.json")
|
||||
store.add(Rule(id="r1", condition="hp < 30", action="heal", confidence=0.5))
|
||||
|
||||
store.record_application("r1", succeeded=True)
|
||||
rule = store.get("r1")
|
||||
assert rule.times_applied == 1
|
||||
assert rule.times_succeeded == 1
|
||||
assert rule.confidence > 0.5
|
||||
|
||||
def test_record_application_failure(self, tmp_path):
|
||||
"""Recording a failed application penalizes confidence."""
|
||||
from timmy.sovereignty.auto_crystallizer import Rule, RuleStore
|
||||
|
||||
store = RuleStore(path=tmp_path / "strategy.json")
|
||||
store.add(Rule(id="r1", condition="hp < 30", action="heal", confidence=0.8))
|
||||
|
||||
store.record_application("r1", succeeded=False)
|
||||
rule = store.get("r1")
|
||||
assert rule.times_applied == 1
|
||||
assert rule.times_succeeded == 0
|
||||
assert rule.confidence < 0.8
|
||||
|
||||
def test_add_many_counts_new(self, tmp_path):
|
||||
"""add_many returns count of genuinely new rules."""
|
||||
from timmy.sovereignty.auto_crystallizer import Rule, RuleStore
|
||||
|
||||
store = RuleStore(path=tmp_path / "strategy.json")
|
||||
store.add(Rule(id="r1", condition="hp < 30", action="heal"))
|
||||
|
||||
new_rules = [
|
||||
Rule(id="r1", condition="hp < 30", action="heal"), # existing
|
||||
Rule(id="r2", condition="mana > 50", action="cast"), # new
|
||||
]
|
||||
added = store.add_many(new_rules)
|
||||
assert added == 1
|
||||
assert len(store) == 2
|
||||
|
||||
def test_find_matching_returns_reliable_only(self, tmp_path):
|
||||
"""find_matching only returns rules above confidence threshold."""
|
||||
from timmy.sovereignty.auto_crystallizer import Rule, RuleStore
|
||||
|
||||
store = RuleStore(path=tmp_path / "strategy.json")
|
||||
store.add(
|
||||
Rule(
|
||||
id="r1",
|
||||
condition="health low",
|
||||
action="heal",
|
||||
confidence=0.9,
|
||||
times_applied=5,
|
||||
times_succeeded=4,
|
||||
)
|
||||
)
|
||||
store.add(
|
||||
Rule(
|
||||
id="r2",
|
||||
condition="health low",
|
||||
action="flee",
|
||||
confidence=0.3,
|
||||
times_applied=1,
|
||||
times_succeeded=0,
|
||||
)
|
||||
)
|
||||
|
||||
matches = store.find_matching({"health": "low"})
|
||||
assert len(matches) == 1
|
||||
assert matches[0].id == "r1"
|
||||
165
tests/sovereignty/test_graduation.py
Normal file
165
tests/sovereignty/test_graduation.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""Tests for the graduation test runner.
|
||||
|
||||
Refs: #953 (Graduation Test)
|
||||
"""
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestConditionResults:
|
||||
"""Tests for individual graduation condition evaluations."""
|
||||
|
||||
def test_economic_independence_pass(self):
|
||||
"""Passes when sats earned exceeds sats spent."""
|
||||
from timmy.sovereignty.graduation import evaluate_economic_independence
|
||||
|
||||
result = evaluate_economic_independence(sats_earned=100.0, sats_spent=50.0)
|
||||
assert result.passed is True
|
||||
assert result.actual == 50.0 # net
|
||||
assert "Earned: 100.0" in result.detail
|
||||
|
||||
def test_economic_independence_fail_net_negative(self):
|
||||
"""Fails when spending exceeds earnings."""
|
||||
from timmy.sovereignty.graduation import evaluate_economic_independence
|
||||
|
||||
result = evaluate_economic_independence(sats_earned=10.0, sats_spent=50.0)
|
||||
assert result.passed is False
|
||||
|
||||
def test_economic_independence_fail_zero_earnings(self):
|
||||
"""Fails when earnings are zero even if spending is zero."""
|
||||
from timmy.sovereignty.graduation import evaluate_economic_independence
|
||||
|
||||
result = evaluate_economic_independence(sats_earned=0.0, sats_spent=0.0)
|
||||
assert result.passed is False
|
||||
|
||||
def test_operational_independence_pass(self):
|
||||
"""Passes when uptime meets threshold and no interventions."""
|
||||
from timmy.sovereignty.graduation import evaluate_operational_independence
|
||||
|
||||
result = evaluate_operational_independence(uptime_hours=24.0, human_interventions=0)
|
||||
assert result.passed is True
|
||||
|
||||
def test_operational_independence_fail_low_uptime(self):
|
||||
"""Fails when uptime is below threshold."""
|
||||
from timmy.sovereignty.graduation import evaluate_operational_independence
|
||||
|
||||
result = evaluate_operational_independence(uptime_hours=20.0, human_interventions=0)
|
||||
assert result.passed is False
|
||||
|
||||
def test_operational_independence_fail_interventions(self):
|
||||
"""Fails when there are human interventions."""
|
||||
from timmy.sovereignty.graduation import evaluate_operational_independence
|
||||
|
||||
result = evaluate_operational_independence(uptime_hours=24.0, human_interventions=2)
|
||||
assert result.passed is False
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestGraduationReport:
|
||||
"""Tests for the GraduationReport rendering."""
|
||||
|
||||
def test_to_dict(self):
|
||||
"""Report serializes to dict correctly."""
|
||||
from timmy.sovereignty.graduation import ConditionResult, GraduationReport
|
||||
|
||||
report = GraduationReport(
|
||||
all_passed=False,
|
||||
conditions=[
|
||||
ConditionResult(name="Test", passed=True, actual=0, target=0, unit=" calls")
|
||||
],
|
||||
)
|
||||
d = report.to_dict()
|
||||
assert d["all_passed"] is False
|
||||
assert len(d["conditions"]) == 1
|
||||
assert d["conditions"][0]["name"] == "Test"
|
||||
|
||||
def test_to_markdown(self):
|
||||
"""Report renders to readable markdown."""
|
||||
from timmy.sovereignty.graduation import ConditionResult, GraduationReport
|
||||
|
||||
report = GraduationReport(
|
||||
all_passed=True,
|
||||
conditions=[
|
||||
ConditionResult(name="Perception", passed=True, actual=0, target=0),
|
||||
ConditionResult(name="Decision", passed=True, actual=3, target=5),
|
||||
],
|
||||
)
|
||||
md = report.to_markdown()
|
||||
assert "PASSED" in md
|
||||
assert "Perception" in md
|
||||
assert "Decision" in md
|
||||
assert "falsework" in md.lower()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestRunGraduationTest:
|
||||
"""Tests for the full graduation test runner."""
|
||||
|
||||
@patch("timmy.sovereignty.graduation.evaluate_perception_independence")
|
||||
@patch("timmy.sovereignty.graduation.evaluate_decision_independence")
|
||||
@patch("timmy.sovereignty.graduation.evaluate_narration_independence")
|
||||
def test_all_pass(self, mock_narr, mock_dec, mock_perc):
|
||||
"""Full graduation passes when all 5 conditions pass."""
|
||||
from timmy.sovereignty.graduation import ConditionResult, run_graduation_test
|
||||
|
||||
mock_perc.return_value = ConditionResult(name="Perception", passed=True, actual=0, target=0)
|
||||
mock_dec.return_value = ConditionResult(name="Decision", passed=True, actual=3, target=5)
|
||||
mock_narr.return_value = ConditionResult(name="Narration", passed=True, actual=0, target=0)
|
||||
|
||||
report = run_graduation_test(
|
||||
sats_earned=100.0,
|
||||
sats_spent=50.0,
|
||||
uptime_hours=24.0,
|
||||
human_interventions=0,
|
||||
)
|
||||
|
||||
assert report.all_passed is True
|
||||
assert len(report.conditions) == 5
|
||||
assert all(c.passed for c in report.conditions)
|
||||
|
||||
@patch("timmy.sovereignty.graduation.evaluate_perception_independence")
|
||||
@patch("timmy.sovereignty.graduation.evaluate_decision_independence")
|
||||
@patch("timmy.sovereignty.graduation.evaluate_narration_independence")
|
||||
def test_partial_fail(self, mock_narr, mock_dec, mock_perc):
|
||||
"""Graduation fails when any single condition fails."""
|
||||
from timmy.sovereignty.graduation import ConditionResult, run_graduation_test
|
||||
|
||||
mock_perc.return_value = ConditionResult(name="Perception", passed=True, actual=0, target=0)
|
||||
mock_dec.return_value = ConditionResult(name="Decision", passed=False, actual=10, target=5)
|
||||
mock_narr.return_value = ConditionResult(name="Narration", passed=True, actual=0, target=0)
|
||||
|
||||
report = run_graduation_test(
|
||||
sats_earned=100.0,
|
||||
sats_spent=50.0,
|
||||
uptime_hours=24.0,
|
||||
human_interventions=0,
|
||||
)
|
||||
|
||||
assert report.all_passed is False
|
||||
|
||||
def test_persist_report(self, tmp_path):
|
||||
"""Graduation report persists to JSON file."""
|
||||
from timmy.sovereignty.graduation import (
|
||||
ConditionResult,
|
||||
GraduationReport,
|
||||
persist_graduation_report,
|
||||
)
|
||||
|
||||
report = GraduationReport(
|
||||
all_passed=False,
|
||||
conditions=[ConditionResult(name="Test", passed=False, actual=5, target=0)],
|
||||
)
|
||||
|
||||
with patch("timmy.sovereignty.graduation.settings") as mock_settings:
|
||||
mock_settings.repo_root = str(tmp_path)
|
||||
path = persist_graduation_report(report)
|
||||
|
||||
assert path.exists()
|
||||
import json
|
||||
|
||||
with open(path) as f:
|
||||
data = json.load(f)
|
||||
assert data["all_passed"] is False
|
||||
@@ -196,9 +196,10 @@ class TestPerceptionCacheMatch:
|
||||
screenshot = np.array([[5, 6], [7, 8]])
|
||||
result = cache.match(screenshot)
|
||||
|
||||
# Note: current implementation uses > 0.85, so exactly 0.85 returns None state
|
||||
# Implementation uses >= 0.85 (inclusive threshold)
|
||||
assert result.confidence == 0.85
|
||||
assert result.state is None
|
||||
assert result.state is not None
|
||||
assert result.state["template_name"] == "threshold_match"
|
||||
|
||||
@patch("timmy.sovereignty.perception_cache.cv2")
|
||||
def test_match_just_above_threshold(self, mock_cv2, tmp_path):
|
||||
@@ -283,10 +284,12 @@ class TestPerceptionCachePersist:
|
||||
|
||||
templates_path = tmp_path / "templates.json"
|
||||
cache = PerceptionCache(templates_path=templates_path)
|
||||
cache.add([
|
||||
cache.add(
|
||||
[
|
||||
Template(name="template1", image=np.array([[1]]), threshold=0.85),
|
||||
Template(name="template2", image=np.array([[2]]), threshold=0.90),
|
||||
])
|
||||
]
|
||||
)
|
||||
|
||||
cache.persist()
|
||||
|
||||
@@ -312,8 +315,10 @@ class TestPerceptionCachePersist:
|
||||
with open(templates_path) as f:
|
||||
data = json.load(f)
|
||||
|
||||
assert "image" not in data[0]
|
||||
assert set(data[0].keys()) == {"name", "threshold"}
|
||||
assert "image" not in data[0] # raw image array is NOT in JSON
|
||||
# image_path is stored for .npy file reference
|
||||
assert "name" in data[0]
|
||||
assert "threshold" in data[0]
|
||||
|
||||
|
||||
class TestPerceptionCacheLoad:
|
||||
@@ -338,8 +343,8 @@ class TestPerceptionCacheLoad:
|
||||
assert len(cache2.templates) == 1
|
||||
assert cache2.templates[0].name == "loaded"
|
||||
assert cache2.templates[0].threshold == 0.88
|
||||
# Note: images are loaded as empty arrays per current implementation
|
||||
assert cache2.templates[0].image.size == 0
|
||||
# Images are now persisted as .npy files and loaded back
|
||||
assert cache2.templates[0].image.size > 0
|
||||
|
||||
def test_load_empty_file(self, tmp_path):
|
||||
"""Load handles empty template list in file."""
|
||||
|
||||
239
tests/sovereignty/test_sovereignty_loop.py
Normal file
239
tests/sovereignty/test_sovereignty_loop.py
Normal file
@@ -0,0 +1,239 @@
|
||||
"""Tests for the sovereignty loop orchestrator.
|
||||
|
||||
Refs: #953
|
||||
"""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
class TestSovereignPerceive:
|
||||
"""Tests for sovereign_perceive (perception layer)."""
|
||||
|
||||
async def test_cache_hit_skips_vlm(self):
|
||||
"""When cache has high-confidence match, VLM is not called."""
|
||||
from timmy.sovereignty.perception_cache import CacheResult
|
||||
from timmy.sovereignty.sovereignty_loop import sovereign_perceive
|
||||
|
||||
cache = MagicMock()
|
||||
cache.match.return_value = CacheResult(
|
||||
confidence=0.95, state={"template_name": "health_bar"}
|
||||
)
|
||||
|
||||
vlm = AsyncMock()
|
||||
screenshot = MagicMock()
|
||||
|
||||
with patch(
|
||||
"timmy.sovereignty.sovereignty_loop.emit_sovereignty_event",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_emit:
|
||||
result = await sovereign_perceive(screenshot, cache, vlm)
|
||||
|
||||
assert result == {"template_name": "health_bar"}
|
||||
vlm.analyze.assert_not_called()
|
||||
mock_emit.assert_called_once_with("perception_cache_hit", session_id="")
|
||||
|
||||
async def test_cache_miss_calls_vlm_and_crystallizes(self):
|
||||
"""On cache miss, VLM is called and output is crystallized."""
|
||||
from timmy.sovereignty.perception_cache import CacheResult
|
||||
from timmy.sovereignty.sovereignty_loop import sovereign_perceive
|
||||
|
||||
cache = MagicMock()
|
||||
cache.match.return_value = CacheResult(confidence=0.3, state=None)
|
||||
|
||||
vlm = AsyncMock()
|
||||
vlm.analyze.return_value = {"items": []}
|
||||
|
||||
screenshot = MagicMock()
|
||||
crystallize_fn = MagicMock(return_value=[])
|
||||
|
||||
with patch(
|
||||
"timmy.sovereignty.sovereignty_loop.emit_sovereignty_event",
|
||||
new_callable=AsyncMock,
|
||||
):
|
||||
await sovereign_perceive(screenshot, cache, vlm, crystallize_fn=crystallize_fn)
|
||||
|
||||
vlm.analyze.assert_called_once_with(screenshot)
|
||||
crystallize_fn.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
class TestSovereignDecide:
|
||||
"""Tests for sovereign_decide (decision layer)."""
|
||||
|
||||
async def test_rule_hit_skips_llm(self, tmp_path):
|
||||
"""Reliable rule match bypasses the LLM."""
|
||||
from timmy.sovereignty.auto_crystallizer import Rule, RuleStore
|
||||
from timmy.sovereignty.sovereignty_loop import sovereign_decide
|
||||
|
||||
store = RuleStore(path=tmp_path / "strategy.json")
|
||||
store.add(
|
||||
Rule(
|
||||
id="r1",
|
||||
condition="health low",
|
||||
action="heal",
|
||||
confidence=0.9,
|
||||
times_applied=5,
|
||||
times_succeeded=4,
|
||||
)
|
||||
)
|
||||
|
||||
llm = AsyncMock()
|
||||
context = {"health": "low", "mana": 50}
|
||||
|
||||
with patch(
|
||||
"timmy.sovereignty.sovereignty_loop.emit_sovereignty_event",
|
||||
new_callable=AsyncMock,
|
||||
):
|
||||
result = await sovereign_decide(context, llm, rule_store=store)
|
||||
|
||||
assert result["action"] == "heal"
|
||||
assert result["source"] == "crystallized_rule"
|
||||
llm.reason.assert_not_called()
|
||||
|
||||
async def test_no_rule_calls_llm_and_crystallizes(self, tmp_path):
|
||||
"""Without matching rules, LLM is called and reasoning is crystallized."""
|
||||
from timmy.sovereignty.auto_crystallizer import RuleStore
|
||||
from timmy.sovereignty.sovereignty_loop import sovereign_decide
|
||||
|
||||
store = RuleStore(path=tmp_path / "strategy.json")
|
||||
|
||||
llm = AsyncMock()
|
||||
llm.reason.return_value = {
|
||||
"action": "attack",
|
||||
"reasoning": "I chose attack because enemy_health was below 50%.",
|
||||
}
|
||||
|
||||
context = {"enemy_health": 45}
|
||||
|
||||
with patch(
|
||||
"timmy.sovereignty.sovereignty_loop.emit_sovereignty_event",
|
||||
new_callable=AsyncMock,
|
||||
):
|
||||
result = await sovereign_decide(context, llm, rule_store=store)
|
||||
|
||||
assert result["action"] == "attack"
|
||||
llm.reason.assert_called_once_with(context)
|
||||
# The reasoning should have been crystallized (threshold pattern detected)
|
||||
assert len(store) > 0
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
class TestSovereignNarrate:
|
||||
"""Tests for sovereign_narrate (narration layer)."""
|
||||
|
||||
async def test_template_hit_skips_llm(self):
|
||||
"""Known event type uses template without LLM."""
|
||||
from timmy.sovereignty.sovereignty_loop import sovereign_narrate
|
||||
|
||||
template_store = {
|
||||
"combat_start": "Battle begins against {enemy}!",
|
||||
}
|
||||
|
||||
llm = AsyncMock()
|
||||
|
||||
with patch(
|
||||
"timmy.sovereignty.sovereignty_loop.emit_sovereignty_event",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_emit:
|
||||
result = await sovereign_narrate(
|
||||
{"type": "combat_start", "enemy": "Cliff Racer"},
|
||||
llm=llm,
|
||||
template_store=template_store,
|
||||
)
|
||||
|
||||
assert result == "Battle begins against Cliff Racer!"
|
||||
llm.narrate.assert_not_called()
|
||||
mock_emit.assert_called_once_with("narration_template", session_id="")
|
||||
|
||||
async def test_unknown_event_calls_llm(self):
|
||||
"""Unknown event type falls through to LLM and crystallizes template."""
|
||||
from timmy.sovereignty.sovereignty_loop import sovereign_narrate
|
||||
|
||||
template_store = {}
|
||||
|
||||
llm = AsyncMock()
|
||||
llm.narrate.return_value = "You discovered a hidden cave in the mountains."
|
||||
|
||||
with patch(
|
||||
"timmy.sovereignty.sovereignty_loop.emit_sovereignty_event",
|
||||
new_callable=AsyncMock,
|
||||
):
|
||||
with patch(
|
||||
"timmy.sovereignty.sovereignty_loop._crystallize_narration_template"
|
||||
) as mock_cryst:
|
||||
result = await sovereign_narrate(
|
||||
{"type": "discovery", "location": "mountains"},
|
||||
llm=llm,
|
||||
template_store=template_store,
|
||||
)
|
||||
|
||||
assert result == "You discovered a hidden cave in the mountains."
|
||||
llm.narrate.assert_called_once()
|
||||
mock_cryst.assert_called_once()
|
||||
|
||||
async def test_no_llm_returns_default(self):
|
||||
"""Without LLM and no template, returns a default narration."""
|
||||
from timmy.sovereignty.sovereignty_loop import sovereign_narrate
|
||||
|
||||
with patch(
|
||||
"timmy.sovereignty.sovereignty_loop.emit_sovereignty_event",
|
||||
new_callable=AsyncMock,
|
||||
):
|
||||
result = await sovereign_narrate(
|
||||
{"type": "unknown_event"},
|
||||
llm=None,
|
||||
template_store={},
|
||||
)
|
||||
|
||||
assert "[unknown_event]" in result
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
class TestSovereigntyEnforcedDecorator:
|
||||
"""Tests for the @sovereignty_enforced decorator."""
|
||||
|
||||
async def test_cache_hit_skips_function(self):
|
||||
"""Decorator returns cached value without calling the wrapped function."""
|
||||
from timmy.sovereignty.sovereignty_loop import sovereignty_enforced
|
||||
|
||||
call_count = 0
|
||||
|
||||
@sovereignty_enforced(
|
||||
layer="decision",
|
||||
cache_check=lambda a, kw: "cached_result",
|
||||
)
|
||||
async def expensive_fn():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
return "expensive_result"
|
||||
|
||||
with patch("timmy.sovereignty.sovereignty_loop.get_metrics_store") as mock_store:
|
||||
mock_store.return_value = MagicMock()
|
||||
result = await expensive_fn()
|
||||
|
||||
assert result == "cached_result"
|
||||
assert call_count == 0
|
||||
|
||||
async def test_cache_miss_runs_function(self):
|
||||
"""Decorator calls function when cache returns None."""
|
||||
from timmy.sovereignty.sovereignty_loop import sovereignty_enforced
|
||||
|
||||
@sovereignty_enforced(
|
||||
layer="decision",
|
||||
cache_check=lambda a, kw: None,
|
||||
)
|
||||
async def expensive_fn():
|
||||
return "computed_result"
|
||||
|
||||
with patch("timmy.sovereignty.sovereignty_loop.get_metrics_store") as mock_store:
|
||||
mock_store.return_value = MagicMock()
|
||||
result = await expensive_fn()
|
||||
|
||||
assert result == "computed_result"
|
||||
Reference in New Issue
Block a user