Complete four-pass evolution to production-ready architecture: **Pass 1 → Foundation:** - Tool registry, basic harness, 19 tools - VPS provisioning, Syncthing mesh - Health daemon, systemd services **Pass 2 → Three-House Canon:** - Timmy (Sovereign), Ezra (Archivist), Bezalel (Artificer) - Provenance tracking, artifact-flow discipline - House-aware policy enforcement **Pass 3 → Self-Improvement:** - Pattern database with SQLite backend - Adaptive policies (auto-adjust thresholds) - Predictive execution (success prediction) - Hermes bridge for shortest-loop telemetry - Learning velocity tracking **Pass 4 → Production Integration:** - Unified API: `from uni_wizard import Harness, House, Mode` - Three modes: SIMPLE / INTELLIGENT / SOVEREIGN - Circuit breaker pattern for fault tolerance - Async/concurrent execution support - Production hardening (timeouts, retries) **Allegro Lane Definition:** - Narrowed to: Gitea integration, Hermes bridge, redundancy/failover - Provides: Cloud connectivity, telemetry streaming, issue routing - Does NOT: Make sovereign decisions, authenticate as Timmy **Files:** - v3/: Intelligence engine, adaptive harness, Hermes bridge - v4/: Unified API, production harness, final architecture Total: ~25KB architecture documentation + production code
680 lines
24 KiB
Python
680 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Intelligence Engine v3 — Self-Improving Local Sovereignty
|
|
|
|
The feedback loop that makes Timmy smarter:
|
|
1. INGEST: Pull telemetry from Hermes, houses, all sources
|
|
2. ANALYZE: Pattern recognition on success/failure/latency
|
|
3. ADAPT: Adjust policies, routing, predictions
|
|
4. PREDICT: Pre-fetch, pre-route, optimize before execution
|
|
|
|
Key principle: Every execution teaches. Every pattern informs next decision.
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
import time
|
|
import hashlib
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime, timedelta
|
|
from collections import defaultdict
|
|
import statistics
|
|
|
|
|
|
@dataclass
|
|
class ExecutionPattern:
|
|
"""Pattern extracted from execution history"""
|
|
tool: str
|
|
param_signature: str # hashed params pattern
|
|
house: str
|
|
model: str # which model was used
|
|
success_rate: float
|
|
avg_latency_ms: float
|
|
avg_confidence: float
|
|
sample_count: int
|
|
last_executed: str
|
|
|
|
def to_dict(self):
|
|
return asdict(self)
|
|
|
|
|
|
@dataclass
|
|
class ModelPerformance:
|
|
"""Performance metrics for a model on task types"""
|
|
model: str
|
|
task_type: str
|
|
total_calls: int
|
|
success_count: int
|
|
success_rate: float
|
|
avg_latency_ms: float
|
|
avg_tokens: float
|
|
cost_per_call: float
|
|
last_used: str
|
|
|
|
|
|
@dataclass
|
|
class AdaptationEvent:
|
|
"""Record of a policy/system adaptation"""
|
|
timestamp: str
|
|
trigger: str # what caused the adaptation
|
|
change_type: str # policy, routing, cache, etc
|
|
old_value: Any
|
|
new_value: Any
|
|
reason: str
|
|
expected_improvement: float
|
|
|
|
|
|
class PatternDatabase:
|
|
"""
|
|
Local SQLite database for execution patterns.
|
|
|
|
Tracks:
|
|
- Tool + params → success rate
|
|
- House + task → performance
|
|
- Model + task type → best choice
|
|
- Time-based patterns (hour of day effects)
|
|
"""
|
|
|
|
def __init__(self, db_path: Path = None):
|
|
self.db_path = db_path or Path.home() / ".timmy" / "intelligence.db"
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
"""Initialize database with performance tracking tables"""
|
|
conn = sqlite3.connect(str(self.db_path))
|
|
|
|
# Execution outcomes with full context
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS executions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL NOT NULL,
|
|
tool TEXT NOT NULL,
|
|
param_hash TEXT NOT NULL,
|
|
house TEXT NOT NULL,
|
|
model TEXT,
|
|
task_type TEXT,
|
|
success INTEGER NOT NULL,
|
|
latency_ms REAL,
|
|
confidence REAL,
|
|
tokens_in INTEGER,
|
|
tokens_out INTEGER,
|
|
error_type TEXT,
|
|
hour_of_day INTEGER,
|
|
day_of_week INTEGER
|
|
)
|
|
""")
|
|
|
|
# Aggregated patterns (updated continuously)
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS patterns (
|
|
tool TEXT NOT NULL,
|
|
param_signature TEXT NOT NULL,
|
|
house TEXT NOT NULL,
|
|
model TEXT,
|
|
success_count INTEGER DEFAULT 0,
|
|
failure_count INTEGER DEFAULT 0,
|
|
total_latency_ms REAL DEFAULT 0,
|
|
total_confidence REAL DEFAULT 0,
|
|
sample_count INTEGER DEFAULT 0,
|
|
last_updated REAL,
|
|
PRIMARY KEY (tool, param_signature, house, model)
|
|
)
|
|
""")
|
|
|
|
# Model performance by task type
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS model_performance (
|
|
model TEXT NOT NULL,
|
|
task_type TEXT NOT NULL,
|
|
total_calls INTEGER DEFAULT 0,
|
|
success_count INTEGER DEFAULT 0,
|
|
total_latency_ms REAL DEFAULT 0,
|
|
total_tokens INTEGER DEFAULT 0,
|
|
last_used REAL,
|
|
PRIMARY KEY (model, task_type)
|
|
)
|
|
""")
|
|
|
|
# Adaptation history (how we've changed)
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS adaptations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL NOT NULL,
|
|
trigger TEXT NOT NULL,
|
|
change_type TEXT NOT NULL,
|
|
old_value TEXT,
|
|
new_value TEXT,
|
|
reason TEXT,
|
|
expected_improvement REAL
|
|
)
|
|
""")
|
|
|
|
# Performance predictions (for validation)
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS predictions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL NOT NULL,
|
|
tool TEXT NOT NULL,
|
|
house TEXT NOT NULL,
|
|
predicted_success_rate REAL,
|
|
actual_success INTEGER,
|
|
prediction_accuracy REAL
|
|
)
|
|
""")
|
|
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_exec_tool ON executions(tool)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_exec_time ON executions(timestamp)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_patterns_tool ON patterns(tool)")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def record_execution(self, data: Dict):
|
|
"""Record a single execution outcome"""
|
|
conn = sqlite3.connect(str(self.db_path))
|
|
now = time.time()
|
|
dt = datetime.fromtimestamp(now)
|
|
|
|
# Extract fields
|
|
tool = data.get("tool", "unknown")
|
|
params = data.get("params", {})
|
|
param_hash = hashlib.sha256(
|
|
json.dumps(params, sort_keys=True).encode()
|
|
).hexdigest()[:16]
|
|
|
|
conn.execute("""
|
|
INSERT INTO executions
|
|
(timestamp, tool, param_hash, house, model, task_type, success,
|
|
latency_ms, confidence, tokens_in, tokens_out, error_type,
|
|
hour_of_day, day_of_week)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
now, tool, param_hash, data.get("house", "timmy"),
|
|
data.get("model"), data.get("task_type"),
|
|
1 if data.get("success") else 0,
|
|
data.get("latency_ms"), data.get("confidence"),
|
|
data.get("tokens_in"), data.get("tokens_out"),
|
|
data.get("error_type"),
|
|
dt.hour, dt.weekday()
|
|
))
|
|
|
|
# Update aggregated patterns
|
|
self._update_pattern(conn, tool, param_hash, data)
|
|
|
|
# Update model performance
|
|
if data.get("model"):
|
|
self._update_model_performance(conn, data)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def _update_pattern(self, conn: sqlite3.Connection, tool: str,
|
|
param_hash: str, data: Dict):
|
|
"""Update aggregated pattern for this tool/params/house/model combo"""
|
|
house = data.get("house", "timmy")
|
|
model = data.get("model", "unknown")
|
|
success = 1 if data.get("success") else 0
|
|
latency = data.get("latency_ms", 0)
|
|
confidence = data.get("confidence", 0)
|
|
|
|
# Try to update existing
|
|
result = conn.execute("""
|
|
SELECT success_count, failure_count, total_latency_ms,
|
|
total_confidence, sample_count
|
|
FROM patterns
|
|
WHERE tool=? AND param_signature=? AND house=? AND model=?
|
|
""", (tool, param_hash, house, model)).fetchone()
|
|
|
|
if result:
|
|
succ, fail, total_lat, total_conf, samples = result
|
|
conn.execute("""
|
|
UPDATE patterns SET
|
|
success_count = ?,
|
|
failure_count = ?,
|
|
total_latency_ms = ?,
|
|
total_confidence = ?,
|
|
sample_count = ?,
|
|
last_updated = ?
|
|
WHERE tool=? AND param_signature=? AND house=? AND model=?
|
|
""", (
|
|
succ + success, fail + (1 - success),
|
|
total_lat + latency, total_conf + confidence,
|
|
samples + 1, time.time(),
|
|
tool, param_hash, house, model
|
|
))
|
|
else:
|
|
conn.execute("""
|
|
INSERT INTO patterns
|
|
(tool, param_signature, house, model, success_count, failure_count,
|
|
total_latency_ms, total_confidence, sample_count, last_updated)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (tool, param_hash, house, model,
|
|
success, 1 - success, latency, confidence, 1, time.time()))
|
|
|
|
def _update_model_performance(self, conn: sqlite3.Connection, data: Dict):
|
|
"""Update model performance tracking"""
|
|
model = data.get("model")
|
|
task_type = data.get("task_type", "unknown")
|
|
success = 1 if data.get("success") else 0
|
|
latency = data.get("latency_ms", 0)
|
|
tokens = (data.get("tokens_in", 0) or 0) + (data.get("tokens_out", 0) or 0)
|
|
|
|
result = conn.execute("""
|
|
SELECT total_calls, success_count, total_latency_ms, total_tokens
|
|
FROM model_performance
|
|
WHERE model=? AND task_type=?
|
|
""", (model, task_type)).fetchone()
|
|
|
|
if result:
|
|
total, succ, total_lat, total_tok = result
|
|
conn.execute("""
|
|
UPDATE model_performance SET
|
|
total_calls = ?,
|
|
success_count = ?,
|
|
total_latency_ms = ?,
|
|
total_tokens = ?,
|
|
last_used = ?
|
|
WHERE model=? AND task_type=?
|
|
""", (total + 1, succ + success, total_lat + latency,
|
|
total_tok + tokens, time.time(), model, task_type))
|
|
else:
|
|
conn.execute("""
|
|
INSERT INTO model_performance
|
|
(model, task_type, total_calls, success_count,
|
|
total_latency_ms, total_tokens, last_used)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (model, task_type, 1, success, latency, tokens, time.time()))
|
|
|
|
def get_pattern(self, tool: str, house: str,
|
|
params: Dict = None) -> Optional[ExecutionPattern]:
|
|
"""Get pattern for tool/house/params combination"""
|
|
conn = sqlite3.connect(str(self.db_path))
|
|
|
|
if params:
|
|
param_hash = hashlib.sha256(
|
|
json.dumps(params, sort_keys=True).encode()
|
|
).hexdigest()[:16]
|
|
result = conn.execute("""
|
|
SELECT param_signature, house, model,
|
|
success_count, failure_count, total_latency_ms,
|
|
total_confidence, sample_count, last_updated
|
|
FROM patterns
|
|
WHERE tool=? AND param_signature=? AND house=?
|
|
ORDER BY sample_count DESC
|
|
LIMIT 1
|
|
""", (tool, param_hash, house)).fetchone()
|
|
else:
|
|
# Get aggregate across all params
|
|
result = conn.execute("""
|
|
SELECT 'aggregate' as param_signature, house, model,
|
|
SUM(success_count), SUM(failure_count), SUM(total_latency_ms),
|
|
SUM(total_confidence), SUM(sample_count), MAX(last_updated)
|
|
FROM patterns
|
|
WHERE tool=? AND house=?
|
|
GROUP BY house, model
|
|
ORDER BY sample_count DESC
|
|
LIMIT 1
|
|
""", (tool, house)).fetchone()
|
|
|
|
conn.close()
|
|
|
|
if not result:
|
|
return None
|
|
|
|
(param_sig, h, model, succ, fail, total_lat,
|
|
total_conf, samples, last_updated) = result
|
|
|
|
total = succ + fail
|
|
success_rate = succ / total if total > 0 else 0.5
|
|
avg_lat = total_lat / samples if samples > 0 else 0
|
|
avg_conf = total_conf / samples if samples > 0 else 0.5
|
|
|
|
return ExecutionPattern(
|
|
tool=tool,
|
|
param_signature=param_sig,
|
|
house=h,
|
|
model=model or "unknown",
|
|
success_rate=success_rate,
|
|
avg_latency_ms=avg_lat,
|
|
avg_confidence=avg_conf,
|
|
sample_count=samples,
|
|
last_executed=datetime.fromtimestamp(last_updated).isoformat()
|
|
)
|
|
|
|
def get_best_model(self, task_type: str, min_samples: int = 5) -> Optional[str]:
|
|
"""Get best performing model for task type"""
|
|
conn = sqlite3.connect(str(self.db_path))
|
|
|
|
result = conn.execute("""
|
|
SELECT model, total_calls, success_count, total_latency_ms
|
|
FROM model_performance
|
|
WHERE task_type=? AND total_calls >= ?
|
|
ORDER BY (CAST(success_count AS REAL) / total_calls) DESC,
|
|
(total_latency_ms / total_calls) ASC
|
|
LIMIT 1
|
|
""", (task_type, min_samples)).fetchone()
|
|
|
|
conn.close()
|
|
|
|
return result[0] if result else None
|
|
|
|
def get_house_performance(self, house: str, days: int = 7) -> Dict:
|
|
"""Get performance metrics for a house"""
|
|
conn = sqlite3.connect(str(self.db_path))
|
|
cutoff = time.time() - (days * 86400)
|
|
|
|
result = conn.execute("""
|
|
SELECT
|
|
COUNT(*) as total,
|
|
SUM(success) as successes,
|
|
AVG(latency_ms) as avg_latency,
|
|
AVG(confidence) as avg_confidence
|
|
FROM executions
|
|
WHERE house=? AND timestamp > ?
|
|
""", (house, cutoff)).fetchone()
|
|
|
|
conn.close()
|
|
|
|
total, successes, avg_lat, avg_conf = result
|
|
|
|
return {
|
|
"house": house,
|
|
"period_days": days,
|
|
"total_executions": total or 0,
|
|
"successes": successes or 0,
|
|
"success_rate": (successes / total) if total else 0,
|
|
"avg_latency_ms": avg_lat or 0,
|
|
"avg_confidence": avg_conf or 0
|
|
}
|
|
|
|
def record_adaptation(self, event: AdaptationEvent):
|
|
"""Record a system adaptation"""
|
|
conn = sqlite3.connect(str(self.db_path))
|
|
|
|
conn.execute("""
|
|
INSERT INTO adaptations
|
|
(timestamp, trigger, change_type, old_value, new_value, reason, expected_improvement)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
time.time(), event.trigger, event.change_type,
|
|
json.dumps(event.old_value), json.dumps(event.new_value),
|
|
event.reason, event.expected_improvement
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_adaptations(self, limit: int = 20) -> List[AdaptationEvent]:
|
|
"""Get recent adaptations"""
|
|
conn = sqlite3.connect(str(self.db_path))
|
|
|
|
rows = conn.execute("""
|
|
SELECT timestamp, trigger, change_type, old_value, new_value,
|
|
reason, expected_improvement
|
|
FROM adaptations
|
|
ORDER BY timestamp DESC
|
|
LIMIT ?
|
|
""", (limit,)).fetchall()
|
|
|
|
conn.close()
|
|
|
|
return [
|
|
AdaptationEvent(
|
|
timestamp=datetime.fromtimestamp(r[0]).isoformat(),
|
|
trigger=r[1], change_type=r[2],
|
|
old_value=json.loads(r[3]) if r[3] else None,
|
|
new_value=json.loads(r[4]) if r[4] else None,
|
|
reason=r[5], expected_improvement=r[6]
|
|
)
|
|
for r in rows
|
|
]
|
|
|
|
|
|
class IntelligenceEngine:
|
|
"""
|
|
The brain that makes Timmy smarter.
|
|
|
|
Continuously:
|
|
- Analyzes execution patterns
|
|
- Identifies improvement opportunities
|
|
- Adapts policies and routing
|
|
- Predicts optimal configurations
|
|
"""
|
|
|
|
def __init__(self, db: PatternDatabase = None):
|
|
self.db = db or PatternDatabase()
|
|
self.adaptation_history: List[AdaptationEvent] = []
|
|
self.current_policies = self._load_default_policies()
|
|
|
|
def _load_default_policies(self) -> Dict:
|
|
"""Load default policies (will be adapted)"""
|
|
return {
|
|
"ezra": {
|
|
"evidence_threshold": 0.8,
|
|
"confidence_boost_for_read_ops": 0.1
|
|
},
|
|
"bezalel": {
|
|
"evidence_threshold": 0.6,
|
|
"parallel_test_threshold": 0.5
|
|
},
|
|
"routing": {
|
|
"min_confidence_for_auto_route": 0.7,
|
|
"fallback_to_timmy_threshold": 0.3
|
|
}
|
|
}
|
|
|
|
def ingest_hermes_session(self, session_data: Dict):
|
|
"""
|
|
Ingest telemetry from Hermes harness.
|
|
|
|
This is the SHORTEST LOOP - Hermes data directly into intelligence.
|
|
"""
|
|
# Extract execution records from Hermes session
|
|
executions = []
|
|
|
|
for msg in session_data.get("messages", []):
|
|
if msg.get("role") == "tool":
|
|
executions.append({
|
|
"tool": msg.get("name", "unknown"),
|
|
"success": not msg.get("error"),
|
|
"latency_ms": msg.get("execution_time_ms", 0),
|
|
"model": session_data.get("model"),
|
|
"timestamp": session_data.get("started_at")
|
|
})
|
|
|
|
for exec_data in executions:
|
|
self.db.record_execution(exec_data)
|
|
|
|
return len(executions)
|
|
|
|
def analyze_and_adapt(self) -> List[AdaptationEvent]:
|
|
"""
|
|
Analyze patterns and adapt policies.
|
|
|
|
Called periodically to improve system performance.
|
|
"""
|
|
adaptations = []
|
|
|
|
# Analysis 1: House performance gaps
|
|
house_perf = {
|
|
"ezra": self.db.get_house_performance("ezra", days=3),
|
|
"bezalel": self.db.get_house_performance("bezalel", days=3),
|
|
"timmy": self.db.get_house_performance("timmy", days=3)
|
|
}
|
|
|
|
# If Ezra's success rate is low, lower evidence threshold
|
|
ezra_rate = house_perf["ezra"].get("success_rate", 0.5)
|
|
if ezra_rate < 0.6 and self.current_policies["ezra"]["evidence_threshold"] > 0.6:
|
|
old_val = self.current_policies["ezra"]["evidence_threshold"]
|
|
new_val = old_val - 0.1
|
|
self.current_policies["ezra"]["evidence_threshold"] = new_val
|
|
|
|
adapt = AdaptationEvent(
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
trigger="low_ezra_success_rate",
|
|
change_type="policy.ezra.evidence_threshold",
|
|
old_value=old_val,
|
|
new_value=new_val,
|
|
reason=f"Ezra success rate {ezra_rate:.1%} below threshold, relaxing evidence requirement",
|
|
expected_improvement=0.1
|
|
)
|
|
adaptations.append(adapt)
|
|
self.db.record_adaptation(adapt)
|
|
|
|
# Analysis 2: Model selection optimization
|
|
for task_type in ["read", "build", "test", "judge"]:
|
|
best_model = self.db.get_best_model(task_type, min_samples=10)
|
|
if best_model:
|
|
# This would update model selection policy
|
|
pass
|
|
|
|
self.adaptation_history.extend(adaptations)
|
|
return adaptations
|
|
|
|
def predict_success(self, tool: str, house: str,
|
|
params: Dict = None) -> Tuple[float, str]:
|
|
"""
|
|
Predict success probability for a planned execution.
|
|
|
|
Returns: (probability, reasoning)
|
|
"""
|
|
pattern = self.db.get_pattern(tool, house, params)
|
|
|
|
if not pattern or pattern.sample_count < 3:
|
|
return (0.5, "Insufficient data for prediction")
|
|
|
|
reasoning = f"Based on {pattern.sample_count} similar executions: "
|
|
|
|
if pattern.success_rate > 0.9:
|
|
reasoning += "excellent track record"
|
|
elif pattern.success_rate > 0.7:
|
|
reasoning += "good track record"
|
|
elif pattern.success_rate > 0.5:
|
|
reasoning += "mixed results"
|
|
else:
|
|
reasoning += "poor track record, consider alternatives"
|
|
|
|
return (pattern.success_rate, reasoning)
|
|
|
|
def get_optimal_house(self, tool: str, params: Dict = None) -> Tuple[str, float]:
|
|
"""
|
|
Determine optimal house for a task based on historical performance.
|
|
|
|
Returns: (house, confidence)
|
|
"""
|
|
houses = ["ezra", "bezalel", "timmy"]
|
|
best_house = "timmy"
|
|
best_rate = 0.0
|
|
|
|
for house in houses:
|
|
pattern = self.db.get_pattern(tool, house, params)
|
|
if pattern and pattern.success_rate > best_rate:
|
|
best_rate = pattern.success_rate
|
|
best_house = house
|
|
|
|
confidence = best_rate if best_rate > 0 else 0.5
|
|
return (best_house, confidence)
|
|
|
|
def get_intelligence_report(self) -> Dict:
|
|
"""Generate comprehensive intelligence report"""
|
|
return {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"house_performance": {
|
|
"ezra": self.db.get_house_performance("ezra", days=7),
|
|
"bezalel": self.db.get_house_performance("bezalel", days=7),
|
|
"timmy": self.db.get_house_performance("timmy", days=7)
|
|
},
|
|
"current_policies": self.current_policies,
|
|
"recent_adaptations": [
|
|
a.to_dict() for a in self.db.get_adaptations(limit=10)
|
|
],
|
|
"learning_velocity": self._calculate_learning_velocity(),
|
|
"prediction_accuracy": self._calculate_prediction_accuracy()
|
|
}
|
|
|
|
def _calculate_learning_velocity(self) -> Dict:
|
|
"""Calculate how fast Timmy is improving"""
|
|
conn = sqlite3.connect(str(self.db.db_path))
|
|
|
|
# Compare last 3 days vs previous 3 days
|
|
now = time.time()
|
|
recent_start = now - (3 * 86400)
|
|
previous_start = now - (6 * 86400)
|
|
|
|
recent = conn.execute("""
|
|
SELECT AVG(success) FROM executions WHERE timestamp > ?
|
|
""", (recent_start,)).fetchone()[0] or 0
|
|
|
|
previous = conn.execute("""
|
|
SELECT AVG(success) FROM executions
|
|
WHERE timestamp > ? AND timestamp <= ?
|
|
""", (previous_start, recent_start)).fetchone()[0] or 0
|
|
|
|
conn.close()
|
|
|
|
improvement = recent - previous
|
|
|
|
return {
|
|
"recent_success_rate": recent,
|
|
"previous_success_rate": previous,
|
|
"improvement": improvement,
|
|
"velocity": "accelerating" if improvement > 0.05 else
|
|
"stable" if improvement > -0.05 else "declining"
|
|
}
|
|
|
|
def _calculate_prediction_accuracy(self) -> float:
|
|
"""Calculate how accurate our predictions have been"""
|
|
conn = sqlite3.connect(str(self.db.db_path))
|
|
|
|
result = conn.execute("""
|
|
SELECT AVG(prediction_accuracy) FROM predictions
|
|
WHERE timestamp > ?
|
|
""", (time.time() - (7 * 86400),)).fetchone()
|
|
|
|
conn.close()
|
|
|
|
return result[0] if result[0] else 0.5
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Demo the intelligence engine
|
|
engine = IntelligenceEngine()
|
|
|
|
# Simulate some executions
|
|
for i in range(20):
|
|
engine.db.record_execution({
|
|
"tool": "git_status",
|
|
"house": "ezra" if i % 2 == 0 else "bezalel",
|
|
"model": "hermes3:8b",
|
|
"task_type": "read",
|
|
"success": i < 15, # 75% success rate
|
|
"latency_ms": 100 + i * 5,
|
|
"confidence": 0.8
|
|
})
|
|
|
|
print("=" * 60)
|
|
print("INTELLIGENCE ENGINE v3 — Self-Improvement Demo")
|
|
print("=" * 60)
|
|
|
|
# Get predictions
|
|
pred, reason = engine.predict_success("git_status", "ezra")
|
|
print(f"\n🔮 Prediction for ezra/git_status: {pred:.1%}")
|
|
print(f" Reasoning: {reason}")
|
|
|
|
# Analyze and adapt
|
|
adaptations = engine.analyze_and_adapt()
|
|
print(f"\n🔄 Adaptations made: {len(adaptations)}")
|
|
for a in adaptations:
|
|
print(f" - {a.change_type}: {a.old_value} → {a.new_value}")
|
|
print(f" Reason: {a.reason}")
|
|
|
|
# Get report
|
|
report = engine.get_intelligence_report()
|
|
print(f"\n📊 Learning Velocity: {report['learning_velocity']['velocity']}")
|
|
print(f" Improvement: {report['learning_velocity']['improvement']:+.1%}")
|
|
|
|
print("\n" + "=" * 60)
|