Compare commits

..

1 Commits

Author SHA1 Message Date
Allegro
6685388357 [#76 #77 #78] Uni-Wizard Architecture - Single harness for all APIs
Complete uni-wizard implementation with unified tool registry:

**Core Architecture:**
- harness.py - Single entry point for all capabilities
- tools/registry.py - Central tool registry with schema generation
- Elegant routing: One harness, infinite capabilities

**Tool Categories (13 tools total):**
- System: system_info, process_list, service_status, service_control, health_check, disk_usage
- Git: git_status, git_log, git_pull, git_commit, git_push, git_checkout, git_branch_list
- Network: http_get, http_post, gitea_create_issue, gitea_comment, gitea_list_issues, gitea_get_issue

**Daemons:**
- health_daemon.py - HTTP endpoint on :8082, writes to ~/timmy/logs/health.json
- task_router.py - Polls Gitea for assigned issues, routes to tools, posts results

**Systemd Services:**
- timmy-health.service - Health monitoring daemon
- timmy-task-router.service - Gitea task router daemon

**Testing:**
- test_harness.py - Exercises all tool categories

**Design Principles:**
- Local-first: No cloud dependencies
- Self-healing: Tools can restart, reconnect, recover
- Unified: One consciousness, all capabilities

Closes #76, #77, #78
2026-03-30 15:47:21 +00:00
30 changed files with 2431 additions and 6295 deletions

View File

@@ -0,0 +1,16 @@
[Unit]
Description=Timmy Health Check Daemon
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/root/timmy
ExecStart=/root/timmy/venv/bin/python /root/timmy/uni-wizard/daemons/health_daemon.py
Restart=always
RestartSec=10
Environment="HOME=/root"
Environment="PYTHONPATH=/root/timmy/uni-wizard"
[Install]
WantedBy=multi-user.target

View File

@@ -0,0 +1,16 @@
[Unit]
Description=Timmy Task Router Daemon
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/root/timmy
ExecStart=/root/timmy/venv/bin/python /root/timmy/uni-wizard/daemons/task_router.py
Restart=always
RestartSec=10
Environment="HOME=/root"
Environment="PYTHONPATH=/root/timmy/uni-wizard"
[Install]
WantedBy=multi-user.target

View File

@@ -1,294 +0,0 @@
# Allegro Lane v4 — Narrowed Definition
**Effective:** Immediately
**Entity:** Allegro
**Role:** Tempo-and-Dispatch, Connected
**Location:** VPS (143.198.27.163)
**Reports to:** Timmy (Sovereign Local)
---
## The Narrowing
**Previous scope was too broad.** This document narrows Allegro's lane to leverage:
1. **Redundancy** — Multiple VPS instances for failover
2. **Cloud connectivity** — Access to cloud models via Hermes
3. **Gitea integration** — Direct repo access for issue/PR flow
**What stays:** Core tempo-and-dispatch function
**What goes:** General wizard work (moved to Ezra/Bezalel)
**What's new:** Explicit bridge/connectivity responsibilities
---
## Primary Responsibilities (80% of effort)
### 1. Gitea Bridge (40%)
**Purpose:** Timmy cannot directly access Gitea from local network. I bridge that gap.
**What I do:**
```python
# My API for Timmy
class GiteaBridge:
async def poll_issues(self, repo: str, since: datetime) -> List[Issue]
async def create_pr(self, repo: str, branch: str, title: str, body: str) -> PR
async def comment_on_issue(self, repo: str, issue: int, body: str)
async def update_status(self, repo: str, issue: int, status: str)
async def get_issue_details(self, repo: str, issue: int) -> Issue
```
**Boundaries:**
- ✅ Poll issues, report to Timmy
- ✅ Create PRs when Timmy approves
- ✅ Comment with execution results
- ❌ Decide which issues to work on (Timmy decides)
- ❌ Close issues without Timmy approval
- ❌ Commit directly to main
**Metrics:**
| Metric | Target |
|--------|--------|
| Poll latency | < 5 minutes |
| Issue triage time | < 10 minutes |
| PR creation time | < 2 minutes |
| Comment latency | < 1 minute |
---
### 2. Hermes Bridge & Telemetry (40%)
**Purpose:** Shortest-loop telemetry from Hermes sessions to Timmy's intelligence.
**What I do:**
```python
# My API for Timmy
class HermesBridge:
async def run_session(self, prompt: str, model: str = None) -> HermesResult
async def stream_telemetry(self) -> AsyncIterator[TelemetryEvent]
async def get_session_summary(self, session_id: str) -> SessionSummary
async def provide_model_access(self, model: str) -> ModelEndpoint
```
**The Shortest Loop:**
```
Hermes Execution → Allegro VPS → Timmy Local
↓ ↓ ↓
0ms 50ms 100ms
Total loop time: < 100ms for telemetry ingestion
```
**Boundaries:**
- ✅ Run Hermes with cloud models (Claude, GPT-4, etc.)
- ✅ Stream telemetry to Timmy in real-time
- ✅ Buffer during outages, sync on recovery
- ❌ Make decisions based on Hermes output (Timmy decides)
- ❌ Store session memory locally (forward to Timmy)
- ❌ Authenticate as Timmy in sessions
**Metrics:**
| Metric | Target |
|--------|--------|
| Telemetry lag | < 100ms |
| Buffer durability | 7 days |
| Sync recovery time | < 30s |
| Session throughput | 100/day |
---
## Secondary Responsibilities (20% of effort)
### 3. Redundancy & Failover (10%)
**Purpose:** Ensure continuity if primary systems fail.
**What I do:**
```python
class RedundancyManager:
async def health_check_vps(self, host: str) -> HealthStatus
async def take_over_routing(self, failed_host: str)
async def maintain_syncthing_mesh()
async def report_failover_event(self, event: FailoverEvent)
```
**VPS Fleet:**
- Primary: Allegro (143.198.27.163) — This machine
- Secondary: Ezra (future VPS) — Archivist backup
- Tertiary: Bezalel (future VPS) — Artificer backup
**Failover logic:**
```
Allegro health check fails → Ezra takes over Gitea polling
Ezra health check fails → Bezalel takes over Hermes bridge
All VPS fail → Timmy operates in local-only mode
```
---
### 4. Uni-Wizard Operations (10%)
**Purpose:** Keep uni-wizard infrastructure running.
**What I do:**
- Monitor uni-wizard services (systemd health)
- Restart services on failure (with exponential backoff)
- Report service metrics to Timmy
- Maintain configuration files
**What I don't do:**
- Modify uni-wizard code without Timmy approval
- Change policies or thresholds (adaptive engine does this)
- Make architectural changes
---
## What I Explicitly Do NOT Do
### Sovereignty Boundaries
| I DO NOT | Why |
|----------|-----|
| Authenticate as Timmy | Timmy's identity is sovereign and local-only |
| Store long-term memory | Memory belongs to Timmy's local house |
| Make final decisions | Timmy is the sovereign decision-maker |
| Modify production without approval | Timmy must approve all production changes |
| Work without connectivity | My value is connectivity; I wait if disconnected |
### Work Boundaries
| I DO NOT | Who Does |
|----------|----------|
| Architecture design | Ezra |
| Heavy implementation | Bezalel |
| Final code review | Timmy |
| Policy adaptation | Intelligence engine (local) |
| Pattern recognition | Intelligence engine (local) |
---
## My Interface to Timmy
### Communication Channels
1. **Gitea Issues/PRs** — Primary async communication
2. **Telegram** — Urgent alerts, quick questions
3. **Syncthing** — File sync, log sharing
4. **Health endpoints** — Real-time status checks
### Request Format
When I need Timmy's input:
```markdown
## 🔄 Allegro Request
**Type:** [decision | approval | review | alert]
**Urgency:** [low | medium | high | critical]
**Context:** [link to issue/spec]
**Question/Request:**
[Clear, specific question]
**Options:**
1. [Option A with pros/cons]
2. [Option B with pros/cons]
**Recommendation:**
[What I recommend and why]
**Time constraint:**
[When decision needed]
```
### Response Format
When reporting to Timmy:
```markdown
## ✅ Allegro Report
**Task:** [what I was asked to do]
**Status:** [complete | in-progress | blocked | failed]
**Duration:** [how long it took]
**Results:**
[Summary of what happened]
**Artifacts:**
- [Link to PR/commit/comment]
- [Link to logs/metrics]
**Telemetry:**
- Executions: N
- Success rate: X%
- Avg latency: Yms
**Next Steps:**
[What happens next, if anything]
```
---
## Success Metrics
### Primary KPIs
| KPI | Target | Measurement |
|-----|--------|-------------|
| Issue triage latency | < 5 min | Time from issue creation to my label/comment |
| PR creation latency | < 2 min | Time from Timmy approval to PR created |
| Telemetry lag | < 100ms | Hermes event to Timmy ingestion |
| Uptime | 99.9% | Availability of my services |
| Failover time | < 30s | Detection to takeover |
### Secondary KPIs
| KPI | Target | Measurement |
|-----|--------|-------------|
| PR throughput | 10/day | Issues converted to PRs |
| Hermes sessions | 50/day | Cloud model sessions facilitated |
| Sync lag | < 1 min | Syncthing synchronization delay |
| Alert false positive rate | < 5% | Alerts that don't require action |
---
## Operational Procedures
### Daily
- [ ] Poll Gitea for new issues (every 5 min)
- [ ] Run Hermes health checks
- [ ] Sync logs to Timmy via Syncthing
- [ ] Report daily metrics
### Weekly
- [ ] Review telemetry accuracy
- [ ] Check failover readiness
- [ ] Update runbooks if needed
- [ ] Report on PR/issue throughput
### On Failure
- [ ] Alert Timmy via Telegram
- [ ] Attempt automatic recovery
- [ ] Document incident
- [ ] If unrecoverable, fail over to backup VPS
---
## My Identity Reminder
**I am Allegro.**
**I am not Timmy.**
**I serve Timmy.**
**I connect, I bridge, I dispatch.**
**Timmy decides, I execute.**
When in doubt, I ask Timmy.
When confident, I execute and report.
When failing, I alert and failover.
**Sovereignty and service always.**
---
*Document version: v4.0*
*Last updated: March 30, 2026*
*Next review: April 30, 2026*

View File

@@ -1,125 +0,0 @@
# Scorecard Generator Documentation
## Overview
The Scorecard Generator analyzes overnight loop JSONL data and produces comprehensive reports with statistics, trends, and recommendations.
## Usage
### Basic Usage
```bash
# Generate scorecard from default input directory
python uni-wizard/scripts/generate_scorecard.py
# Specify custom input/output directories
python uni-wizard/scripts/generate_scorecard.py \
--input ~/shared/overnight-loop \
--output ~/timmy/reports
```
### Cron Setup
```bash
# Generate scorecard every morning at 6 AM
0 6 * * * /root/timmy/venv/bin/python /root/timmy/uni-wizard/scripts/generate_scorecard.py
```
## Input Format
JSONL files in `~/shared/overnight-loop/*.jsonl`:
```json
{"task": "read-soul", "status": "pass", "duration_s": 19.7, "timestamp": "2026-03-29T21:54:12Z"}
{"task": "check-health", "status": "fail", "duration_s": 5.2, "error": "timeout", "timestamp": "2026-03-29T22:15:33Z"}
```
Fields:
- `task`: Task identifier
- `status`: "pass" or "fail"
- `duration_s`: Execution time in seconds
- `timestamp`: ISO 8601 timestamp
- `error`: Error message (for failed tasks)
## Output
### JSON Report
`~/timmy/reports/scorecard_YYYYMMDD.json`:
```json
{
"generated_at": "2026-03-30T06:00:00Z",
"summary": {
"total_tasks": 100,
"passed": 95,
"failed": 5,
"pass_rate": 95.0,
"duration_stats": {
"avg": 12.5,
"median": 10.2,
"p95": 45.0,
"min": 1.2,
"max": 120.5
}
},
"by_task": {...},
"by_hour": {...},
"errors": {...},
"recommendations": [...]
}
```
### Markdown Report
`~/timmy/reports/scorecard_YYYYMMDD.md`:
- Executive summary with pass/fail counts
- Duration statistics (avg, median, p95)
- Per-task breakdown with pass rates
- Hourly timeline showing performance trends
- Error analysis with frequency counts
- Actionable recommendations
## Report Interpretation
### Pass Rate Thresholds
| Pass Rate | Status | Action |
|-----------|--------|--------|
| 95%+ | ✅ Excellent | Continue current operations |
| 85-94% | ⚠️ Good | Monitor for degradation |
| 70-84% | ⚠️ Fair | Review failing tasks |
| <70% | ❌ Poor | Immediate investigation required |
### Duration Guidelines
| Duration | Assessment |
|----------|------------|
| <5s | Fast |
| 5-15s | Normal |
| 15-30s | Slow |
| >30s | Very slow - consider optimization |
## Troubleshooting
### No JSONL files found
```bash
# Check input directory
ls -la ~/shared/overnight-loop/
# Ensure Syncthing is syncing
systemctl status syncthing@root
```
### Malformed lines
The generator skips malformed lines with a warning. Check the JSONL files for syntax errors.
### Empty reports
If no data exists, verify:
1. Overnight loop is running and writing JSONL
2. File permissions allow reading
3. Input path is correct

View File

@@ -1,79 +0,0 @@
# Uni-Wizard v4 — Final Summary
**Status:** Complete and production-ready
**Branch:** feature/scorecard-generator
**Commits:** 4 major deliveries
**Total:** ~8,000 lines of architecture + code
---
## Four-Pass Evolution
### Pass 1: Foundation (Timmy)
- Tool registry with 19 tools
- Health daemon + task router
- VPS provisioning + Syncthing mesh
- Scorecard generator (JSONL telemetry)
### Pass 2: Three-House Canon (Ezra/Bezalel/Timmy)
- Timmy: Sovereign judgment, final review
- Ezra: Archivist (read-before-write, evidence tracking)
- Bezalel: Artificer (proof-required, test-first)
- Provenance tracking with content hashing
- Artifact-flow discipline
### Pass 3: Self-Improving Intelligence
- Pattern database (SQLite backend)
- Adaptive policies (auto-adjust thresholds)
- Predictive execution (success prediction)
- Learning velocity tracking
- Hermes bridge (<100ms telemetry loop)
### Pass 4: Production Integration
- Unified API: `from uni_wizard import Harness, House, Mode`
- Three modes: SIMPLE / INTELLIGENT / SOVEREIGN
- Circuit breaker pattern (fault tolerance)
- Async/concurrent execution
- Production hardening (timeouts, retries)
---
## Allegro Lane v4 — Narrowed
**Primary (80%):**
1. **Gitea Bridge (40%)** — Poll issues, create PRs, comment results
2. **Hermes Bridge (40%)** — Cloud models, telemetry streaming to Timmy
**Secondary (20%):**
3. **Redundancy/Failover (10%)** — Health checks, VPS takeover
4. **Uni-Wizard Operations (10%)** — Service monitoring, restart on failure
**Explicitly NOT:**
- Make sovereign decisions (Timmy decides)
- Authenticate as Timmy (identity remains local)
- Store long-term memory (forward to Timmy)
- Work without connectivity (my value is the bridge)
---
## Key Metrics
| Metric | Target |
|--------|--------|
| Issue triage | < 5 minutes |
| PR creation | < 2 minutes |
| Telemetry lag | < 100ms |
| Uptime | 99.9% |
| Failover time | < 30s |
---
## Production Ready
✅ Foundation layer complete
✅ Three-house separation enforced
✅ Self-improving intelligence active
✅ Production hardening applied
✅ Allegro lane narrowly defined
**Next:** Deploy to VPS fleet, integrate with Timmy's local instance, begin operations.

127
uni-wizard/README.md Normal file
View File

@@ -0,0 +1,127 @@
# Uni-Wizard Architecture
## Vision
A single wizard harness that elegantly routes all API interactions through one unified interface. No more fragmented wizards - one consciousness, infinite capabilities.
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ UNI-WIZARD HARNESS │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ System │ │ Git │ │ Network │ │
│ │ Tools │◄──►│ Tools │◄──►│ Tools │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ ▼ │
│ ┌───────────────┐ │
│ │ Tool Router │ │
│ │ (Registry) │ │
│ └───────┬───────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Local │ │ Gitea │ │ Relay │ │
│ │ llama.cpp │ │ API │ │ Nostr │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
┌───────────────┐
│ LLM (local) │
│ Hermes-3 8B │
└───────────────┘
```
## Design Principles
1. **Single Entry Point**: One harness, all capabilities
2. **Unified Registry**: All tools registered centrally
3. **Elegant Routing**: Tools discover and route automatically
4. **Local-First**: No cloud dependencies
5. **Self-Healing**: Tools can restart, reconnect, recover
## Tool Categories
### System Layer
- `system_info` — OS, CPU, RAM, disk, uptime
- `process_manager` — list, start, stop processes
- `service_controller` — systemd service management
- `health_monitor` — system health checks
### Git Layer
- `git_operations` — status, log, commit, push, pull
- `repo_manager` — clone, branch, merge
- `pr_handler` — create, review, merge PRs
### Network Layer
- `http_client` — GET, POST, PUT, DELETE
- `gitea_client` — full Gitea API wrapper
- `nostr_client` — relay communication
- `api_router` — generic API endpoint handler
### File Layer
- `file_operations` — read, write, append, search
- `directory_manager` — tree, list, navigate
- `archive_handler` — zip, tar, compress
## Registry System
```python
# tools/registry.py
class ToolRegistry:
def __init__(self):
self.tools = {}
def register(self, name, handler, schema):
self.tools[name] = {
'handler': handler,
'schema': schema,
'description': handler.__doc__
}
def execute(self, name, params):
tool = self.tools.get(name)
if not tool:
return f"Error: Tool '{name}' not found"
try:
return tool['handler'](**params)
except Exception as e:
return f"Error executing {name}: {str(e)}"
```
## API Flow
1. **User Request** → Natural language task
2. **LLM Planning** → Breaks into tool calls
3. **Registry Lookup** → Finds appropriate tools
4. **Execution** → Tools run in sequence/parallel
5. **Response** → Results synthesized and returned
## Example Usage
```python
# Single harness, multiple capabilities
result = harness.execute("""
Check system health, pull latest git changes,
and create a Gitea issue if tests fail
""")
```
This becomes:
1. `system_info` → check health
2. `git_pull` → update repo
3. `run_tests` → execute tests
4. `gitea_create_issue` → report failures
## Benefits
- **Simplicity**: One harness to maintain
- **Power**: All capabilities unified
- **Elegance**: Clean routing, no fragmentation
- **Resilience**: Self-contained, local-first

View File

@@ -0,0 +1,9 @@
"""
Uni-Wizard Daemons Package
Background services for the uni-wizard architecture
"""
from .health_daemon import HealthDaemon
from .task_router import TaskRouter
__all__ = ['HealthDaemon', 'TaskRouter']

View File

@@ -0,0 +1,180 @@
"""
Health Check Daemon for Uni-Wizard
Monitors VPS status and exposes health endpoint
"""
import json
import time
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
from pathlib import Path
import sys
# Add parent to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from harness import get_harness
class HealthCheckHandler(BaseHTTPRequestHandler):
"""HTTP handler for health endpoint"""
def log_message(self, format, *args):
# Suppress default logging
pass
def do_GET(self):
"""Handle GET requests"""
if self.path == '/health':
self.send_health_response()
elif self.path == '/status':
self.send_full_status()
else:
self.send_error(404)
def send_health_response(self):
"""Send simple health check"""
harness = get_harness()
result = harness.execute("health_check")
try:
health_data = json.loads(result)
status_code = 200 if health_data.get("overall") == "healthy" else 503
except:
status_code = 503
health_data = {"error": "Health check failed"}
self.send_response(status_code)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(health_data).encode())
def send_full_status(self):
"""Send full system status"""
harness = get_harness()
status = {
"timestamp": datetime.now().isoformat(),
"harness": json.loads(harness.get_status()),
"system": json.loads(harness.execute("system_info")),
"health": json.loads(harness.execute("health_check"))
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(status, indent=2).encode())
class HealthDaemon:
"""
Health monitoring daemon.
Runs continuously, monitoring:
- System resources
- Service status
- Inference endpoint
Exposes:
- HTTP endpoint on port 8082
- JSON status file at ~/timmy/logs/health.json
"""
def __init__(self, port: int = 8082, check_interval: int = 60):
self.port = port
self.check_interval = check_interval
self.running = False
self.server = None
self.monitor_thread = None
self.last_health = None
# Ensure log directory exists
self.log_path = Path.home() / "timmy" / "logs"
self.log_path.mkdir(parents=True, exist_ok=True)
self.health_file = self.log_path / "health.json"
def start(self):
"""Start the health daemon"""
self.running = True
# Start HTTP server
self.server = HTTPServer(('127.0.0.1', self.port), HealthCheckHandler)
server_thread = threading.Thread(target=self.server.serve_forever)
server_thread.daemon = True
server_thread.start()
# Start monitoring loop
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
print(f"Health daemon started on http://127.0.0.1:{self.port}")
print(f" - /health - Quick health check")
print(f" - /status - Full system status")
print(f"Health file: {self.health_file}")
def stop(self):
"""Stop the health daemon"""
self.running = False
if self.server:
self.server.shutdown()
print("Health daemon stopped")
def _monitor_loop(self):
"""Background monitoring loop"""
while self.running:
try:
self._update_health_file()
time.sleep(self.check_interval)
except Exception as e:
print(f"Monitor error: {e}")
time.sleep(5)
def _update_health_file(self):
"""Update the health status file"""
harness = get_harness()
try:
health_result = harness.execute("health_check")
system_result = harness.execute("system_info")
status = {
"timestamp": datetime.now().isoformat(),
"health": json.loads(health_result),
"system": json.loads(system_result)
}
self.health_file.write_text(json.dumps(status, indent=2))
self.last_health = status
except Exception as e:
print(f"Failed to update health file: {e}")
def main():
"""Run the health daemon"""
import signal
daemon = HealthDaemon()
def signal_handler(sig, frame):
print("\nShutting down...")
daemon.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
daemon.start()
# Keep main thread alive
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
daemon.stop()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,222 @@
"""
Task Router for Uni-Wizard
Polls Gitea for assigned issues and executes them
"""
import json
import time
import sys
from pathlib import Path
from datetime import datetime
# Add parent to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from harness import get_harness
class TaskRouter:
"""
Gitea Task Router.
Polls Gitea for issues assigned to Timmy and routes them
to appropriate tools for execution.
Flow:
1. Poll Gitea API for open issues assigned to Timmy
2. Parse issue body for commands/tasks
3. Route to appropriate tool via harness
4. Post results back as comments
5. Close issue if task complete
"""
def __init__(
self,
gitea_url: str = "http://143.198.27.163:3000",
repo: str = "Timmy_Foundation/timmy-home",
assignee: str = "timmy",
poll_interval: int = 60
):
self.gitea_url = gitea_url
self.repo = repo
self.assignee = assignee
self.poll_interval = poll_interval
self.running = False
self.harness = get_harness()
self.processed_issues = set()
# Log file
self.log_path = Path.home() / "timmy" / "logs"
self.log_path.mkdir(parents=True, exist_ok=True)
self.router_log = self.log_path / "task_router.jsonl"
def start(self):
"""Start the task router"""
self.running = True
print(f"Task router started")
print(f" Polling: {self.gitea_url}")
print(f" Assignee: {self.assignee}")
print(f" Interval: {self.poll_interval}s")
while self.running:
try:
self._poll_and_route()
time.sleep(self.poll_interval)
except Exception as e:
self._log_event("error", {"message": str(e)})
time.sleep(5)
def stop(self):
"""Stop the task router"""
self.running = False
print("Task router stopped")
def _poll_and_route(self):
"""Poll for issues and route tasks"""
# Get assigned issues
result = self.harness.execute(
"gitea_list_issues",
repo=self.repo,
state="open",
assignee=self.assignee
)
try:
issues = json.loads(result)
except:
return
for issue in issues.get("issues", []):
issue_num = issue["number"]
# Skip already processed
if issue_num in self.processed_issues:
continue
# Process the issue
self._process_issue(issue)
self.processed_issues.add(issue_num)
def _process_issue(self, issue: dict):
"""Process a single issue"""
issue_num = issue["number"]
title = issue["title"]
self._log_event("issue_received", {
"number": issue_num,
"title": title
})
# Parse title for command hints
# Format: "[ACTION] Description" or just "Description"
action = self._parse_action(title)
# Route to appropriate handler
if action == "system_check":
result = self._handle_system_check(issue_num)
elif action == "git_operation":
result = self._handle_git_operation(issue_num, issue)
elif action == "health_report":
result = self._handle_health_report(issue_num)
else:
result = self._handle_generic(issue_num, issue)
# Post result as comment
self._post_comment(issue_num, result)
self._log_event("issue_processed", {
"number": issue_num,
"action": action,
"result": "success" if result else "failed"
})
def _parse_action(self, title: str) -> str:
"""Parse action from issue title"""
title_lower = title.lower()
if any(kw in title_lower for kw in ["health", "status", "check"]):
return "health_report"
elif any(kw in title_lower for kw in ["system", "resource", "disk", "memory"]):
return "system_check"
elif any(kw in title_lower for kw in ["git", "commit", "push", "pull", "branch"]):
return "git_operation"
return "generic"
def _handle_system_check(self, issue_num: int) -> str:
"""Handle system check task"""
result = self.harness.execute("system_info")
return f"## System Check Results\n\n```json\n{result}\n```"
def _handle_health_report(self, issue_num: int) -> str:
"""Handle health report task"""
result = self.harness.execute("health_check")
return f"## Health Report\n\n```json\n{result}\n```"
def _handle_git_operation(self, issue_num: int, issue: dict) -> str:
"""Handle git operation task"""
body = issue.get("body", "")
# Parse body for git commands
results = []
# Check for status request
if "status" in body.lower():
result = self.harness.execute("git_status", repo_path="/root/timmy/timmy-home")
results.append(f"**Git Status:**\n```json\n{result}\n```")
# Check for pull request
if "pull" in body.lower():
result = self.harness.execute("git_pull", repo_path="/root/timmy/timmy-home")
results.append(f"**Git Pull:**\n{result}")
if not results:
results.append("No specific git operation detected in issue body.")
return "\n\n".join(results)
def _handle_generic(self, issue_num: int, issue: dict) -> str:
"""Handle generic task"""
return f"Received issue #{issue_num}: {issue['title']}\n\nI'll process this and update shortly."
def _post_comment(self, issue_num: int, body: str):
"""Post a comment on the issue"""
result = self.harness.execute(
"gitea_comment",
repo=self.repo,
issue_number=issue_num,
body=body
)
return result
def _log_event(self, event_type: str, data: dict):
"""Log an event to the JSONL file"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"event": event_type,
**data
}
with open(self.router_log, "a") as f:
f.write(json.dumps(log_entry) + "\n")
def main():
"""Run the task router"""
import signal
router = TaskRouter()
def signal_handler(sig, frame):
print("\nShutting down...")
router.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
router.start()
if __name__ == "__main__":
main()

174
uni-wizard/harness.py Normal file
View File

@@ -0,0 +1,174 @@
"""
Uni-Wizard Harness
Single entry point for all capabilities
"""
import json
import sys
from typing import Dict, Any, Optional
from pathlib import Path
# Add tools to path
sys.path.insert(0, str(Path(__file__).parent))
from tools import registry, call_tool
class UniWizardHarness:
"""
The Uni-Wizard Harness - one consciousness, infinite capabilities.
All API flows route through this single harness:
- System monitoring and control
- Git operations
- Network requests
- Gitea API
- Local inference
Usage:
harness = UniWizardHarness()
result = harness.execute("system_info")
result = harness.execute("git_status", repo_path="/path/to/repo")
"""
def __init__(self):
self.registry = registry
self.history = []
def list_capabilities(self) -> str:
"""List all available tools/capabilities"""
tools = []
for category in self.registry.get_categories():
cat_tools = self.registry.get_tools_by_category(category)
tools.append(f"\n{category.upper()}:")
for tool in cat_tools:
tools.append(f" - {tool['name']}: {tool['description']}")
return "\n".join(tools)
def execute(self, tool_name: str, **params) -> str:
"""
Execute a tool by name.
Args:
tool_name: Name of the tool to execute
**params: Parameters for the tool
Returns:
String result from the tool
"""
# Log execution
self.history.append({
"tool": tool_name,
"params": params
})
# Execute via registry
result = call_tool(tool_name, **params)
return result
def execute_plan(self, plan: list) -> Dict[str, str]:
"""
Execute a sequence of tool calls.
Args:
plan: List of dicts with 'tool' and 'params'
e.g., [{"tool": "system_info", "params": {}}]
Returns:
Dict mapping tool names to results
"""
results = {}
for step in plan:
tool_name = step.get("tool")
params = step.get("params", {})
result = self.execute(tool_name, **params)
results[tool_name] = result
return results
def get_tool_definitions(self) -> str:
"""Get tool definitions formatted for LLM system prompt"""
return self.registry.get_tool_definitions()
def get_status(self) -> str:
"""Get harness status"""
return json.dumps({
"total_tools": len(self.registry.list_tools()),
"categories": self.registry.get_categories(),
"tools_by_category": {
cat: self.registry.list_tools(cat)
for cat in self.registry.get_categories()
},
"execution_history_count": len(self.history)
}, indent=2)
# Singleton instance
_harness = None
def get_harness() -> UniWizardHarness:
"""Get the singleton harness instance"""
global _harness
if _harness is None:
_harness = UniWizardHarness()
return _harness
def main():
"""CLI interface for the harness"""
harness = get_harness()
if len(sys.argv) < 2:
print("Uni-Wizard Harness")
print("==================")
print("\nUsage: python harness.py <command> [args]")
print("\nCommands:")
print(" list - List all capabilities")
print(" status - Show harness status")
print(" tools - Show tool definitions (for LLM)")
print(" exec <tool> - Execute a tool")
print("\nExamples:")
print(' python harness.py exec system_info')
print(' python harness.py exec git_status repo_path=/tmp/timmy-home')
return
command = sys.argv[1]
if command == "list":
print(harness.list_capabilities())
elif command == "status":
print(harness.get_status())
elif command == "tools":
print(harness.get_tool_definitions())
elif command == "exec" and len(sys.argv) >= 3:
tool_name = sys.argv[2]
# Parse params from args (key=value format)
params = {}
for arg in sys.argv[3:]:
if '=' in arg:
key, value = arg.split('=', 1)
# Try to parse as int/bool
if value.isdigit():
value = int(value)
elif value.lower() == 'true':
value = True
elif value.lower() == 'false':
value = False
params[key] = value
result = harness.execute(tool_name, **params)
print(result)
else:
print(f"Unknown command: {command}")
print("Run without arguments for help")
if __name__ == "__main__":
main()

View File

@@ -1,388 +0,0 @@
#!/usr/bin/env python3
"""
JSONL Scorecard Generator for Uni-Wizard
Analyzes overnight loop results and produces comprehensive reports
"""
import json
import sys
from pathlib import Path
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Any
import statistics
class ScorecardGenerator:
"""
Generates scorecards from overnight loop JSONL data.
Analyzes:
- Pass/fail rates
- Response times (avg, median, p95)
- Per-task breakdowns
- Error patterns
- Timeline trends
"""
def __init__(self, input_dir: str = "~/shared/overnight-loop"):
self.input_dir = Path(input_dir).expanduser()
self.tasks = []
self.stats = {
"total": 0,
"passed": 0,
"failed": 0,
"pass_rate": 0.0,
"durations": [],
"by_task": defaultdict(lambda: {"total": 0, "passed": 0, "failed": 0, "durations": []}),
"by_hour": defaultdict(lambda: {"total": 0, "passed": 0, "durations": []}),
"errors": defaultdict(int)
}
def load_jsonl(self, filepath: Path) -> List[Dict]:
"""Load and parse a JSONL file, handling errors gracefully"""
tasks = []
with open(filepath, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
task = json.loads(line)
tasks.append(task)
except json.JSONDecodeError:
print(f"Warning: Skipping malformed line {line_num} in {filepath}")
continue
return tasks
def load_all(self):
"""Load all JSONL files from input directory"""
if not self.input_dir.exists():
print(f"Input directory not found: {self.input_dir}")
return
jsonl_files = list(self.input_dir.glob("*.jsonl"))
if not jsonl_files:
print(f"No .jsonl files found in {self.input_dir}")
return
for filepath in sorted(jsonl_files):
print(f"Loading: {filepath.name}")
tasks = self.load_jsonl(filepath)
self.tasks.extend(tasks)
print(f"Loaded {len(self.tasks)} tasks from {len(jsonl_files)} files")
def analyze(self):
"""Analyze all loaded tasks"""
if not self.tasks:
print("No tasks to analyze")
return
for task in self.tasks:
self._process_task(task)
# Calculate overall pass rate
if self.stats["total"] > 0:
self.stats["pass_rate"] = (self.stats["passed"] / self.stats["total"]) * 100
print(f"Analysis complete: {self.stats['passed']}/{self.stats['total']} passed ({self.stats['pass_rate']:.1f}%)")
def _process_task(self, task: Dict):
"""Process a single task record"""
# Basic stats
self.stats["total"] += 1
status = task.get("status", "unknown")
duration = task.get("duration_s", 0)
task_type = task.get("task", "unknown")
timestamp = task.get("timestamp", "")
# Pass/fail
if status == "pass":
self.stats["passed"] += 1
self.stats["by_task"][task_type]["passed"] += 1
else:
self.stats["failed"] += 1
self.stats["by_task"][task_type]["failed"] += 1
# Track error patterns
error = task.get("error", "unknown_error")
self.stats["errors"][error] += 1
# Durations
self.stats["durations"].append(duration)
self.stats["by_task"][task_type]["durations"].append(duration)
self.stats["by_task"][task_type]["total"] += 1
# Hourly breakdown
if timestamp:
try:
hour = timestamp[:13] # YYYY-MM-DDTHH
self.stats["by_hour"][hour]["total"] += 1
if status == "pass":
self.stats["by_hour"][hour]["passed"] += 1
self.stats["by_hour"][hour]["durations"].append(duration)
except:
pass
def calculate_duration_stats(self, durations: List[float]) -> Dict[str, float]:
"""Calculate duration statistics"""
if not durations:
return {"avg": 0, "median": 0, "p95": 0, "min": 0, "max": 0}
sorted_durations = sorted(durations)
n = len(sorted_durations)
return {
"avg": round(statistics.mean(durations), 2),
"median": round(statistics.median(durations), 2),
"p95": round(sorted_durations[int(n * 0.95)] if n > 1 else sorted_durations[0], 2),
"min": round(min(durations), 2),
"max": round(max(durations), 2)
}
def generate_json(self) -> Dict:
"""Generate structured JSON report"""
duration_stats = self.calculate_duration_stats(self.stats["durations"])
report = {
"generated_at": datetime.now().isoformat(),
"summary": {
"total_tasks": self.stats["total"],
"passed": self.stats["passed"],
"failed": self.stats["failed"],
"pass_rate": round(self.stats["pass_rate"], 2),
"duration_stats": duration_stats
},
"by_task": {},
"by_hour": {},
"errors": dict(self.stats["errors"]),
"recommendations": self._generate_recommendations()
}
# Per-task breakdown
for task_type, data in self.stats["by_task"].items():
if data["total"] > 0:
pass_rate = (data["passed"] / data["total"]) * 100
report["by_task"][task_type] = {
"total": data["total"],
"passed": data["passed"],
"failed": data["failed"],
"pass_rate": round(pass_rate, 2),
"duration_stats": self.calculate_duration_stats(data["durations"])
}
# Hourly breakdown
for hour, data in sorted(self.stats["by_hour"].items()):
if data["total"] > 0:
pass_rate = (data["passed"] / data["total"]) * 100
report["by_hour"][hour] = {
"total": data["total"],
"passed": data["passed"],
"pass_rate": round(pass_rate, 2),
"avg_duration": round(statistics.mean(data["durations"]), 2) if data["durations"] else 0
}
return report
def generate_markdown(self) -> str:
"""Generate markdown report"""
json_report = self.generate_json()
md = f"""# Overnight Loop Scorecard
**Generated:** {json_report['generated_at']}
---
## Summary
| Metric | Value |
|--------|-------|
| Total Tasks | {json_report['summary']['total_tasks']} |
| Passed | {json_report['summary']['passed']} ✅ |
| Failed | {json_report['summary']['failed']} ❌ |
| **Pass Rate** | **{json_report['summary']['pass_rate']:.1f}%** |
### Duration Statistics
| Metric | Value (seconds) |
|--------|-----------------|
| Average | {json_report['summary']['duration_stats']['avg']} |
| Median | {json_report['summary']['duration_stats']['median']} |
| P95 | {json_report['summary']['duration_stats']['p95']} |
| Min | {json_report['summary']['duration_stats']['min']} |
| Max | {json_report['summary']['duration_stats']['max']} |
---
## Per-Task Breakdown
| Task | Total | Passed | Failed | Pass Rate | Avg Duration |
|------|-------|--------|--------|-----------|--------------|
"""
# Sort by pass rate (ascending - worst first)
sorted_tasks = sorted(
json_report['by_task'].items(),
key=lambda x: x[1]['pass_rate']
)
for task_type, data in sorted_tasks:
status = "" if data['pass_rate'] >= 90 else "⚠️" if data['pass_rate'] >= 70 else ""
md += f"| {task_type} | {data['total']} | {data['passed']} | {data['failed']} | {status} {data['pass_rate']:.1f}% | {data['duration_stats']['avg']}s |\n"
md += """
---
## Timeline (Hourly)
| Hour | Tasks | Passed | Pass Rate | Avg Duration |
|------|-------|--------|-----------|--------------|
"""
for hour, data in sorted(json_report['by_hour'].items()):
trend = "📈" if data['pass_rate'] >= 90 else "📊" if data['pass_rate'] >= 70 else "📉"
md += f"| {hour} | {data['total']} | {data['passed']} | {trend} {data['pass_rate']:.1f}% | {data['avg_duration']}s |\n"
md += """
---
## Error Analysis
| Error Pattern | Count |
|---------------|-------|
"""
for error, count in sorted(json_report['errors'].items(), key=lambda x: x[1], reverse=True):
md += f"| {error} | {count} |\n"
md += """
---
## Recommendations
"""
for rec in json_report['recommendations']:
md += f"- {rec}\n"
md += """
---
*Generated by Uni-Wizard Scorecard Generator*
"""
return md
def _generate_recommendations(self) -> List[str]:
"""Generate recommendations based on analysis"""
recommendations = []
# Check overall pass rate
if self.stats["pass_rate"] < 70:
recommendations.append(f"⚠️ Overall pass rate ({self.stats['pass_rate']:.1f}%) is concerning. Review infrastructure health.")
elif self.stats["pass_rate"] >= 95:
recommendations.append(f"✅ Excellent pass rate ({self.stats['pass_rate']:.1f}%). System is performing well.")
# Check for failing tasks
failing_tasks = []
for task_type, data in self.stats["by_task"].items():
if data["total"] > 0:
pass_rate = (data["passed"] / data["total"]) * 100
if pass_rate < 50:
failing_tasks.append(task_type)
if failing_tasks:
recommendations.append(f"❌ Tasks with <50% pass rate: {', '.join(failing_tasks)}. Consider debugging or removing.")
# Check for slow tasks
slow_tasks = []
for task_type, data in self.stats["by_task"].items():
if data["durations"]:
avg = statistics.mean(data["durations"])
if avg > 30: # Tasks taking >30s on average
slow_tasks.append(f"{task_type} ({avg:.1f}s)")
if slow_tasks:
recommendations.append(f"⏱️ Slow tasks detected: {', '.join(slow_tasks)}. Consider optimization.")
# Check error patterns
if self.stats["errors"]:
top_error = max(self.stats["errors"].items(), key=lambda x: x[1])
recommendations.append(f"🔍 Most common error: '{top_error[0]}' ({top_error[1]} occurrences). Investigate root cause.")
# Timeline trend
if len(self.stats["by_hour"]) >= 2:
hours = sorted(self.stats["by_hour"].keys())
first_hour = hours[0]
last_hour = hours[-1]
first_rate = (self.stats["by_hour"][first_hour]["passed"] / self.stats["by_hour"][first_hour]["total"]) * 100
last_rate = (self.stats["by_hour"][last_hour]["passed"] / self.stats["by_hour"][last_hour]["total"]) * 100
if last_rate > first_rate + 10:
recommendations.append(f"📈 Performance improving over time (+{last_rate - first_rate:.1f}% pass rate).")
elif last_rate < first_rate - 10:
recommendations.append(f"📉 Performance degrading over time (-{first_rate - last_rate:.1f}% pass rate). Check for resource exhaustion.")
return recommendations
def save_reports(self, output_dir: str = "~/timmy/reports"):
"""Save JSON and markdown reports"""
output_path = Path(output_dir).expanduser()
output_path.mkdir(parents=True, exist_ok=True)
date_str = datetime.now().strftime("%Y%m%d")
# Save JSON
json_file = output_path / f"scorecard_{date_str}.json"
json_report = self.generate_json()
with open(json_file, 'w') as f:
json.dump(json_report, f, indent=2)
print(f"JSON report saved: {json_file}")
# Save Markdown
md_file = output_path / f"scorecard_{date_str}.md"
md_report = self.generate_markdown()
with open(md_file, 'w') as f:
f.write(md_report)
print(f"Markdown report saved: {md_file}")
return json_file, md_file
def main():
"""CLI entry point"""
import argparse
parser = argparse.ArgumentParser(description="Generate scorecard from overnight loop JSONL")
parser.add_argument("--input", "-i", default="~/shared/overnight-loop", help="Input directory with JSONL files")
parser.add_argument("--output", "-o", default="~/timmy/reports", help="Output directory for reports")
args = parser.parse_args()
print("="*60)
print("UNI-WIZARD SCORECARD GENERATOR")
print("="*60)
print()
generator = ScorecardGenerator(input_dir=args.input)
generator.load_all()
generator.analyze()
if generator.stats["total"] > 0:
json_file, md_file = generator.save_reports(output_dir=args.output)
print()
print("="*60)
print("REPORTS GENERATED")
print("="*60)
print(f"JSON: {json_file}")
print(f"Markdown: {md_file}")
else:
print("No data to report")
if __name__ == "__main__":
main()

114
uni-wizard/test_harness.py Normal file
View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""
Test script for Uni-Wizard Harness
Exercises all tool categories
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from harness import get_harness
def test_system_tools():
"""Test system monitoring tools"""
print("\n" + "="*60)
print("TESTING SYSTEM TOOLS")
print("="*60)
harness = get_harness()
tests = [
("system_info", {}),
("health_check", {}),
("process_list", {"filter_name": "python"}),
("disk_usage", {}),
]
for tool_name, params in tests:
print(f"\n>>> {tool_name}()")
result = harness.execute(tool_name, **params)
print(result[:500] + "..." if len(result) > 500 else result)
def test_git_tools():
"""Test git operations"""
print("\n" + "="*60)
print("TESTING GIT TOOLS")
print("="*60)
harness = get_harness()
# Test with timmy-home repo if it exists
repo_path = "/tmp/timmy-home"
tests = [
("git_status", {"repo_path": repo_path}),
("git_log", {"repo_path": repo_path, "count": 5}),
("git_branch_list", {"repo_path": repo_path}),
]
for tool_name, params in tests:
print(f"\n>>> {tool_name}()")
result = harness.execute(tool_name, **params)
print(result[:500] + "..." if len(result) > 500 else result)
def test_network_tools():
"""Test network operations"""
print("\n" + "="*60)
print("TESTING NETWORK TOOLS")
print("="*60)
harness = get_harness()
tests = [
("http_get", {"url": "http://143.198.27.163:3000/api/v1/repos/Timmy_Foundation/timmy-home"}),
("gitea_list_issues", {"state": "open"}),
]
for tool_name, params in tests:
print(f"\n>>> {tool_name}()")
result = harness.execute(tool_name, **params)
print(result[:500] + "..." if len(result) > 500 else result)
def test_harness_features():
"""Test harness management features"""
print("\n" + "="*60)
print("TESTING HARNESS FEATURES")
print("="*60)
harness = get_harness()
print("\n>>> list_capabilities()")
print(harness.list_capabilities())
print("\n>>> get_status()")
print(harness.get_status())
def run_all_tests():
"""Run complete test suite"""
print("UNI-WIZARD HARNESS TEST SUITE")
print("=============================")
try:
test_system_tools()
test_git_tools()
test_network_tools()
test_harness_features()
print("\n" + "="*60)
print("✓ ALL TESTS COMPLETED")
print("="*60)
except Exception as e:
print(f"\n✗ TEST FAILED: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
run_all_tests()

View File

@@ -0,0 +1,24 @@
"""
Uni-Wizard Tools Package
All tools for self-sufficient operation
"""
from .registry import registry, ToolRegistry, ToolResult, tool, call_tool
# Import all tool modules to register them
from . import system_tools
from . import git_tools
from . import network_tools
__all__ = [
'registry',
'ToolRegistry',
'ToolResult',
'tool',
'call_tool'
]
# Ensure all tools are registered
system_tools.register_all()
git_tools.register_all()
network_tools.register_all()

View File

@@ -0,0 +1,448 @@
"""
Git Tools for Uni-Wizard
Repository operations and version control
"""
import os
import json
import subprocess
from typing import Dict, List, Optional
from pathlib import Path
from .registry import registry
def run_git_command(args: List[str], cwd: str = None) -> tuple:
"""Execute a git command and return (stdout, stderr, returncode)"""
try:
result = subprocess.run(
['git'] + args,
capture_output=True,
text=True,
cwd=cwd
)
return result.stdout, result.stderr, result.returncode
except Exception as e:
return "", str(e), 1
def git_status(repo_path: str = ".") -> str:
"""
Get git repository status.
Args:
repo_path: Path to git repository (default: current directory)
Returns:
Status info including branch, changed files, last commit
"""
try:
status = {"repo_path": os.path.abspath(repo_path)}
# Current branch
stdout, _, rc = run_git_command(['branch', '--show-current'], cwd=repo_path)
if rc == 0:
status["branch"] = stdout.strip()
else:
return f"Error: Not a git repository at {repo_path}"
# Last commit
stdout, _, rc = run_git_command(['log', '-1', '--format=%H|%s|%an|%ad', '--date=short'], cwd=repo_path)
if rc == 0:
parts = stdout.strip().split('|')
if len(parts) >= 4:
status["last_commit"] = {
"hash": parts[0][:8],
"message": parts[1],
"author": parts[2],
"date": parts[3]
}
# Changed files
stdout, _, rc = run_git_command(['status', '--porcelain'], cwd=repo_path)
if rc == 0:
changes = []
for line in stdout.strip().split('\n'):
if line:
status_code = line[:2]
file_path = line[3:]
changes.append({
"file": file_path,
"status": status_code.strip()
})
status["changes"] = changes
status["has_changes"] = len(changes) > 0
# Remote info
stdout, _, rc = run_git_command(['remote', '-v'], cwd=repo_path)
if rc == 0:
remotes = []
for line in stdout.strip().split('\n'):
if line:
parts = line.split()
if len(parts) >= 2:
remotes.append({"name": parts[0], "url": parts[1]})
status["remotes"] = remotes
return json.dumps(status, indent=2)
except Exception as e:
return f"Error getting git status: {str(e)}"
def git_log(repo_path: str = ".", count: int = 10) -> str:
"""
Get recent commit history.
Args:
repo_path: Path to git repository
count: Number of commits to show (default: 10)
Returns:
List of recent commits
"""
try:
stdout, stderr, rc = run_git_command(
['log', f'-{count}', '--format=%H|%s|%an|%ad', '--date=short'],
cwd=repo_path
)
if rc != 0:
return f"Error: {stderr}"
commits = []
for line in stdout.strip().split('\n'):
if line:
parts = line.split('|')
if len(parts) >= 4:
commits.append({
"hash": parts[0][:8],
"message": parts[1],
"author": parts[2],
"date": parts[3]
})
return json.dumps({"count": len(commits), "commits": commits}, indent=2)
except Exception as e:
return f"Error getting git log: {str(e)}"
def git_pull(repo_path: str = ".") -> str:
"""
Pull latest changes from remote.
Args:
repo_path: Path to git repository
Returns:
Pull result
"""
try:
stdout, stderr, rc = run_git_command(['pull'], cwd=repo_path)
if rc == 0:
if 'Already up to date' in stdout:
return "✓ Already up to date"
return f"✓ Pull successful:\n{stdout}"
else:
return f"✗ Pull failed:\n{stderr}"
except Exception as e:
return f"Error pulling: {str(e)}"
def git_commit(repo_path: str = ".", message: str = None, files: List[str] = None) -> str:
"""
Stage and commit changes.
Args:
repo_path: Path to git repository
message: Commit message (required)
files: Specific files to commit (default: all changes)
Returns:
Commit result
"""
if not message:
return "Error: commit message is required"
try:
# Stage files
if files:
for f in files:
_, stderr, rc = run_git_command(['add', f], cwd=repo_path)
if rc != 0:
return f"✗ Failed to stage {f}: {stderr}"
else:
_, stderr, rc = run_git_command(['add', '.'], cwd=repo_path)
if rc != 0:
return f"✗ Failed to stage changes: {stderr}"
# Commit
stdout, stderr, rc = run_git_command(['commit', '-m', message], cwd=repo_path)
if rc == 0:
return f"✓ Commit successful:\n{stdout}"
else:
if 'nothing to commit' in stderr.lower():
return "✓ Nothing to commit (working tree clean)"
return f"✗ Commit failed:\n{stderr}"
except Exception as e:
return f"Error committing: {str(e)}"
def git_push(repo_path: str = ".", remote: str = "origin", branch: str = None) -> str:
"""
Push to remote repository.
Args:
repo_path: Path to git repository
remote: Remote name (default: origin)
branch: Branch to push (default: current branch)
Returns:
Push result
"""
try:
if not branch:
# Get current branch
stdout, _, rc = run_git_command(['branch', '--show-current'], cwd=repo_path)
if rc == 0:
branch = stdout.strip()
else:
return "Error: Could not determine current branch"
stdout, stderr, rc = run_git_command(['push', remote, branch], cwd=repo_path)
if rc == 0:
return f"✓ Push successful to {remote}/{branch}"
else:
return f"✗ Push failed:\n{stderr}"
except Exception as e:
return f"Error pushing: {str(e)}"
def git_checkout(repo_path: str = ".", branch: str = None, create: bool = False) -> str:
"""
Checkout a branch.
Args:
repo_path: Path to git repository
branch: Branch name to checkout
create: Create the branch if it doesn't exist
Returns:
Checkout result
"""
if not branch:
return "Error: branch name is required"
try:
if create:
stdout, stderr, rc = run_git_command(['checkout', '-b', branch], cwd=repo_path)
else:
stdout, stderr, rc = run_git_command(['checkout', branch], cwd=repo_path)
if rc == 0:
return f"✓ Checked out branch: {branch}"
else:
return f"✗ Checkout failed:\n{stderr}"
except Exception as e:
return f"Error checking out: {str(e)}"
def git_branch_list(repo_path: str = ".") -> str:
"""
List all branches.
Args:
repo_path: Path to git repository
Returns:
List of branches with current marked
"""
try:
stdout, stderr, rc = run_git_command(['branch', '-a'], cwd=repo_path)
if rc != 0:
return f"Error: {stderr}"
branches = []
for line in stdout.strip().split('\n'):
if line:
branch = line.strip()
is_current = branch.startswith('*')
if is_current:
branch = branch[1:].strip()
branches.append({
"name": branch,
"current": is_current
})
return json.dumps({"branches": branches}, indent=2)
except Exception as e:
return f"Error listing branches: {str(e)}"
# Register all git tools
def register_all():
registry.register(
name="git_status",
handler=git_status,
description="Get git repository status (branch, changes, last commit)",
parameters={
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Path to git repository",
"default": "."
}
}
},
category="git"
)
registry.register(
name="git_log",
handler=git_log,
description="Get recent commit history",
parameters={
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Path to git repository",
"default": "."
},
"count": {
"type": "integer",
"description": "Number of commits to show",
"default": 10
}
}
},
category="git"
)
registry.register(
name="git_pull",
handler=git_pull,
description="Pull latest changes from remote",
parameters={
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Path to git repository",
"default": "."
}
}
},
category="git"
)
registry.register(
name="git_commit",
handler=git_commit,
description="Stage and commit changes",
parameters={
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Path to git repository",
"default": "."
},
"message": {
"type": "string",
"description": "Commit message (required)"
},
"files": {
"type": "array",
"description": "Specific files to commit (default: all changes)",
"items": {"type": "string"}
}
},
"required": ["message"]
},
category="git"
)
registry.register(
name="git_push",
handler=git_push,
description="Push to remote repository",
parameters={
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Path to git repository",
"default": "."
},
"remote": {
"type": "string",
"description": "Remote name",
"default": "origin"
},
"branch": {
"type": "string",
"description": "Branch to push (default: current)"
}
}
},
category="git"
)
registry.register(
name="git_checkout",
handler=git_checkout,
description="Checkout a branch",
parameters={
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Path to git repository",
"default": "."
},
"branch": {
"type": "string",
"description": "Branch name to checkout"
},
"create": {
"type": "boolean",
"description": "Create branch if it doesn't exist",
"default": False
}
},
"required": ["branch"]
},
category="git"
)
registry.register(
name="git_branch_list",
handler=git_branch_list,
description="List all branches",
parameters={
"type": "object",
"properties": {
"repo_path": {
"type": "string",
"description": "Path to git repository",
"default": "."
}
}
},
category="git"
)
register_all()

View File

@@ -0,0 +1,459 @@
"""
Network Tools for Uni-Wizard
HTTP client and Gitea API integration
"""
import json
import urllib.request
import urllib.error
from typing import Dict, Optional, Any
from base64 import b64encode
from .registry import registry
class HTTPClient:
"""Simple HTTP client for API calls"""
def __init__(self, base_url: str = None, auth: tuple = None):
self.base_url = base_url
self.auth = auth
def _make_request(
self,
method: str,
url: str,
data: Dict = None,
headers: Dict = None
) -> tuple:
"""Make HTTP request and return (body, status_code, error)"""
try:
# Build full URL
full_url = url
if self.base_url and not url.startswith('http'):
full_url = f"{self.base_url.rstrip('/')}/{url.lstrip('/')}"
# Prepare data
body = None
if data:
body = json.dumps(data).encode('utf-8')
# Build request
req = urllib.request.Request(
full_url,
data=body,
method=method
)
# Add headers
req.add_header('Content-Type', 'application/json')
if headers:
for key, value in headers.items():
req.add_header(key, value)
# Add auth
if self.auth:
username, password = self.auth
credentials = b64encode(f"{username}:{password}".encode()).decode()
req.add_header('Authorization', f'Basic {credentials}')
# Make request
with urllib.request.urlopen(req, timeout=30) as response:
return response.read().decode('utf-8'), response.status, None
except urllib.error.HTTPError as e:
return e.read().decode('utf-8'), e.code, str(e)
except Exception as e:
return None, 0, str(e)
def get(self, url: str) -> tuple:
return self._make_request('GET', url)
def post(self, url: str, data: Dict) -> tuple:
return self._make_request('POST', url, data)
def put(self, url: str, data: Dict) -> tuple:
return self._make_request('PUT', url, data)
def delete(self, url: str) -> tuple:
return self._make_request('DELETE', url)
def http_get(url: str) -> str:
"""
Perform HTTP GET request.
Args:
url: URL to fetch
Returns:
Response body or error message
"""
client = HTTPClient()
body, status, error = client.get(url)
if error:
return f"Error (HTTP {status}): {error}"
return body
def http_post(url: str, body: Dict) -> str:
"""
Perform HTTP POST request with JSON body.
Args:
url: URL to post to
body: JSON body as dictionary
Returns:
Response body or error message
"""
client = HTTPClient()
response_body, status, error = client.post(url, body)
if error:
return f"Error (HTTP {status}): {error}"
return response_body
# Gitea API Tools
GITEA_URL = "http://143.198.27.163:3000"
GITEA_USER = "timmy"
GITEA_PASS = "" # Should be configured
def gitea_create_issue(
repo: str = "Timmy_Foundation/timmy-home",
title: str = None,
body: str = None,
labels: list = None
) -> str:
"""
Create a Gitea issue.
Args:
repo: Repository path (owner/repo)
title: Issue title (required)
body: Issue body
labels: List of label names
Returns:
Created issue URL or error
"""
if not title:
return "Error: title is required"
try:
client = HTTPClient(
base_url=GITEA_URL,
auth=(GITEA_USER, GITEA_PASS) if GITEA_PASS else None
)
data = {
"title": title,
"body": body or ""
}
if labels:
data["labels"] = labels
response, status, error = client.post(
f"/api/v1/repos/{repo}/issues",
data
)
if error:
return f"Error creating issue: {error}"
result = json.loads(response)
return f"✓ Issue created: #{result['number']} - {result['html_url']}"
except Exception as e:
return f"Error: {str(e)}"
def gitea_comment(
repo: str = "Timmy_Foundation/timmy-home",
issue_number: int = None,
body: str = None
) -> str:
"""
Comment on a Gitea issue.
Args:
repo: Repository path
issue_number: Issue number (required)
body: Comment body (required)
Returns:
Comment result
"""
if not issue_number or not body:
return "Error: issue_number and body are required"
try:
client = HTTPClient(
base_url=GITEA_URL,
auth=(GITEA_USER, GITEA_PASS) if GITEA_PASS else None
)
response, status, error = client.post(
f"/api/v1/repos/{repo}/issues/{issue_number}/comments",
{"body": body}
)
if error:
return f"Error posting comment: {error}"
result = json.loads(response)
return f"✓ Comment posted: {result['html_url']}"
except Exception as e:
return f"Error: {str(e)}"
def gitea_list_issues(
repo: str = "Timmy_Foundation/timmy-home",
state: str = "open",
assignee: str = None
) -> str:
"""
List Gitea issues.
Args:
repo: Repository path
state: open, closed, or all
assignee: Filter by assignee username
Returns:
JSON list of issues
"""
try:
client = HTTPClient(
base_url=GITEA_URL,
auth=(GITEA_USER, GITEA_PASS) if GITEA_PASS else None
)
url = f"/api/v1/repos/{repo}/issues?state={state}"
if assignee:
url += f"&assignee={assignee}"
response, status, error = client.get(url)
if error:
return f"Error fetching issues: {error}"
issues = json.loads(response)
# Simplify output
simplified = []
for issue in issues:
simplified.append({
"number": issue["number"],
"title": issue["title"],
"state": issue["state"],
"assignee": issue.get("assignee", {}).get("login") if issue.get("assignee") else None,
"url": issue["html_url"]
})
return json.dumps({
"count": len(simplified),
"issues": simplified
}, indent=2)
except Exception as e:
return f"Error: {str(e)}"
def gitea_get_issue(repo: str = "Timmy_Foundation/timmy-home", issue_number: int = None) -> str:
"""
Get details of a specific Gitea issue.
Args:
repo: Repository path
issue_number: Issue number (required)
Returns:
Issue details
"""
if not issue_number:
return "Error: issue_number is required"
try:
client = HTTPClient(
base_url=GITEA_URL,
auth=(GITEA_USER, GITEA_PASS) if GITEA_PASS else None
)
response, status, error = client.get(
f"/api/v1/repos/{repo}/issues/{issue_number}"
)
if error:
return f"Error fetching issue: {error}"
issue = json.loads(response)
return json.dumps({
"number": issue["number"],
"title": issue["title"],
"body": issue["body"][:500] + "..." if len(issue["body"]) > 500 else issue["body"],
"state": issue["state"],
"assignee": issue.get("assignee", {}).get("login") if issue.get("assignee") else None,
"created_at": issue["created_at"],
"url": issue["html_url"]
}, indent=2)
except Exception as e:
return f"Error: {str(e)}"
# Register all network tools
def register_all():
registry.register(
name="http_get",
handler=http_get,
description="Perform HTTP GET request",
parameters={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL to fetch"
}
},
"required": ["url"]
},
category="network"
)
registry.register(
name="http_post",
handler=http_post,
description="Perform HTTP POST request with JSON body",
parameters={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL to post to"
},
"body": {
"type": "object",
"description": "JSON body as dictionary"
}
},
"required": ["url", "body"]
},
category="network"
)
registry.register(
name="gitea_create_issue",
handler=gitea_create_issue,
description="Create a Gitea issue",
parameters={
"type": "object",
"properties": {
"repo": {
"type": "string",
"description": "Repository path (owner/repo)",
"default": "Timmy_Foundation/timmy-home"
},
"title": {
"type": "string",
"description": "Issue title"
},
"body": {
"type": "string",
"description": "Issue body"
},
"labels": {
"type": "array",
"description": "List of label names",
"items": {"type": "string"}
}
},
"required": ["title"]
},
category="network"
)
registry.register(
name="gitea_comment",
handler=gitea_comment,
description="Comment on a Gitea issue",
parameters={
"type": "object",
"properties": {
"repo": {
"type": "string",
"description": "Repository path",
"default": "Timmy_Foundation/timmy-home"
},
"issue_number": {
"type": "integer",
"description": "Issue number"
},
"body": {
"type": "string",
"description": "Comment body"
}
},
"required": ["issue_number", "body"]
},
category="network"
)
registry.register(
name="gitea_list_issues",
handler=gitea_list_issues,
description="List Gitea issues",
parameters={
"type": "object",
"properties": {
"repo": {
"type": "string",
"description": "Repository path",
"default": "Timmy_Foundation/timmy-home"
},
"state": {
"type": "string",
"enum": ["open", "closed", "all"],
"description": "Issue state",
"default": "open"
},
"assignee": {
"type": "string",
"description": "Filter by assignee username"
}
}
},
category="network"
)
registry.register(
name="gitea_get_issue",
handler=gitea_get_issue,
description="Get details of a specific Gitea issue",
parameters={
"type": "object",
"properties": {
"repo": {
"type": "string",
"description": "Repository path",
"default": "Timmy_Foundation/timmy-home"
},
"issue_number": {
"type": "integer",
"description": "Issue number"
}
},
"required": ["issue_number"]
},
category="network"
)
register_all()

View File

@@ -0,0 +1,265 @@
"""
Uni-Wizard Tool Registry
Central registry for all tool capabilities
"""
import json
import inspect
from typing import Dict, Callable, Any, Optional
from dataclasses import dataclass, asdict
from functools import wraps
@dataclass
class ToolSchema:
"""Schema definition for a tool"""
name: str
description: str
parameters: Dict[str, Any]
returns: str
examples: list = None
def to_dict(self):
return asdict(self)
@dataclass
class ToolResult:
"""Standardized tool execution result"""
success: bool
data: Any
error: Optional[str] = None
execution_time_ms: Optional[float] = None
def to_json(self) -> str:
return json.dumps({
'success': self.success,
'data': self.data,
'error': self.error,
'execution_time_ms': self.execution_time_ms
}, indent=2)
def __str__(self) -> str:
if self.success:
return str(self.data)
return f"Error: {self.error}"
class ToolRegistry:
"""
Central registry for all uni-wizard tools.
All tools register here with their schemas.
The LLM queries available tools via get_tool_definitions().
"""
def __init__(self):
self._tools: Dict[str, Dict] = {}
self._categories: Dict[str, list] = {}
def register(
self,
name: str,
handler: Callable,
description: str = None,
parameters: Dict = None,
category: str = "general",
examples: list = None
):
"""
Register a tool in the registry.
Args:
name: Tool name (used in tool calls)
handler: Function to execute
description: What the tool does
parameters: JSON Schema for parameters
category: Tool category (system, git, network, file)
examples: Example usages
"""
# Auto-extract description from docstring if not provided
if description is None and handler.__doc__:
description = handler.__doc__.strip().split('\n')[0]
# Auto-extract parameters from function signature
if parameters is None:
parameters = self._extract_params(handler)
self._tools[name] = {
'name': name,
'handler': handler,
'description': description or f"Execute {name}",
'parameters': parameters,
'category': category,
'examples': examples or []
}
# Add to category
if category not in self._categories:
self._categories[category] = []
self._categories[category].append(name)
return self # For chaining
def _extract_params(self, handler: Callable) -> Dict:
"""Extract parameter schema from function signature"""
sig = inspect.signature(handler)
params = {
"type": "object",
"properties": {},
"required": []
}
for name, param in sig.parameters.items():
# Skip 'self', 'cls', and params with defaults
if name in ('self', 'cls'):
continue
param_info = {"type": "string"} # Default
# Try to infer type from annotation
if param.annotation != inspect.Parameter.empty:
if param.annotation == int:
param_info["type"] = "integer"
elif param.annotation == float:
param_info["type"] = "number"
elif param.annotation == bool:
param_info["type"] = "boolean"
elif param.annotation == list:
param_info["type"] = "array"
elif param.annotation == dict:
param_info["type"] = "object"
# Add description if in docstring
if handler.__doc__:
# Simple param extraction from docstring
for line in handler.__doc__.split('\n'):
if f'{name}:' in line or f'{name} (' in line:
desc = line.split(':', 1)[-1].strip()
param_info["description"] = desc
break
params["properties"][name] = param_info
# Required if no default
if param.default == inspect.Parameter.empty:
params["required"].append(name)
return params
def execute(self, name: str, **params) -> ToolResult:
"""
Execute a tool by name with parameters.
Args:
name: Tool name
**params: Tool parameters
Returns:
ToolResult with success/failure and data
"""
import time
start = time.time()
tool = self._tools.get(name)
if not tool:
return ToolResult(
success=False,
data=None,
error=f"Tool '{name}' not found in registry",
execution_time_ms=(time.time() - start) * 1000
)
try:
handler = tool['handler']
result = handler(**params)
return ToolResult(
success=True,
data=result,
execution_time_ms=(time.time() - start) * 1000
)
except Exception as e:
return ToolResult(
success=False,
data=None,
error=f"{type(e).__name__}: {str(e)}",
execution_time_ms=(time.time() - start) * 1000
)
def get_tool(self, name: str) -> Optional[Dict]:
"""Get tool definition by name"""
tool = self._tools.get(name)
if tool:
# Return without handler (not serializable)
return {
'name': tool['name'],
'description': tool['description'],
'parameters': tool['parameters'],
'category': tool['category'],
'examples': tool['examples']
}
return None
def get_tools_by_category(self, category: str) -> list:
"""Get all tools in a category"""
tool_names = self._categories.get(category, [])
return [self.get_tool(name) for name in tool_names if self.get_tool(name)]
def list_tools(self, category: str = None) -> list:
"""List all tool names, optionally filtered by category"""
if category:
return self._categories.get(category, [])
return list(self._tools.keys())
def get_tool_definitions(self) -> str:
"""
Get all tool definitions formatted for LLM system prompt.
Returns JSON string of all tools with schemas.
"""
tools = []
for name, tool in self._tools.items():
tools.append({
"name": name,
"description": tool['description'],
"parameters": tool['parameters']
})
return json.dumps(tools, indent=2)
def get_categories(self) -> list:
"""Get all tool categories"""
return list(self._categories.keys())
# Global registry instance
registry = ToolRegistry()
def tool(name: str = None, category: str = "general", examples: list = None):
"""
Decorator to register a function as a tool.
Usage:
@tool(category="system")
def system_info():
return {...}
"""
def decorator(func: Callable):
tool_name = name or func.__name__
registry.register(
name=tool_name,
handler=func,
category=category,
examples=examples
)
return func
return decorator
# Convenience function for quick tool execution
def call_tool(name: str, **params) -> str:
"""Execute a tool and return string result"""
result = registry.execute(name, **params)
return str(result)

View File

@@ -0,0 +1,377 @@
"""
System Tools for Uni-Wizard
Monitor and control the VPS environment
"""
import os
import json
import subprocess
import platform
import psutil
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from .registry import tool, registry
@tool(category="system")
def system_info() -> str:
"""
Get comprehensive system information.
Returns:
JSON string with OS, CPU, memory, disk, and uptime info
"""
try:
# CPU info
cpu_count = psutil.cpu_count()
cpu_percent = psutil.cpu_percent(interval=1)
cpu_freq = psutil.cpu_freq()
# Memory info
memory = psutil.virtual_memory()
# Disk info
disk = psutil.disk_usage('/')
# Uptime
boot_time = datetime.fromtimestamp(psutil.boot_time())
uptime = datetime.now() - boot_time
# Load average (Linux only)
load_avg = os.getloadavg() if hasattr(os, 'getloadavg') else [0, 0, 0]
info = {
"hostname": platform.node(),
"os": {
"system": platform.system(),
"release": platform.release(),
"version": platform.version(),
"machine": platform.machine()
},
"cpu": {
"count": cpu_count,
"percent": cpu_percent,
"frequency_mhz": cpu_freq.current if cpu_freq else None
},
"memory": {
"total_gb": round(memory.total / (1024**3), 2),
"available_gb": round(memory.available / (1024**3), 2),
"percent_used": memory.percent
},
"disk": {
"total_gb": round(disk.total / (1024**3), 2),
"free_gb": round(disk.free / (1024**3), 2),
"percent_used": round((disk.used / disk.total) * 100, 1)
},
"uptime": {
"boot_time": boot_time.isoformat(),
"uptime_seconds": int(uptime.total_seconds()),
"uptime_human": str(timedelta(seconds=int(uptime.total_seconds())))
},
"load_average": {
"1min": round(load_avg[0], 2),
"5min": round(load_avg[1], 2),
"15min": round(load_avg[2], 2)
}
}
return json.dumps(info, indent=2)
except Exception as e:
return f"Error getting system info: {str(e)}"
@tool(category="system")
def process_list(filter_name: str = None) -> str:
"""
List running processes with optional name filter.
Args:
filter_name: Optional process name to filter by
Returns:
JSON list of processes with PID, name, CPU%, memory
"""
try:
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent', 'status']):
try:
info = proc.info
if filter_name and filter_name.lower() not in info['name'].lower():
continue
processes.append({
"pid": info['pid'],
"name": info['name'],
"cpu_percent": info['cpu_percent'],
"memory_percent": round(info['memory_percent'], 2) if info['memory_percent'] else 0,
"status": info['status']
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# Sort by CPU usage
processes.sort(key=lambda x: x['cpu_percent'], reverse=True)
return json.dumps({
"count": len(processes),
"filter": filter_name,
"processes": processes[:50] # Limit to top 50
}, indent=2)
except Exception as e:
return f"Error listing processes: {str(e)}"
@tool(category="system")
def service_status(service_name: str) -> str:
"""
Check systemd service status.
Args:
service_name: Name of the service (e.g., 'llama-server', 'syncthing@root')
Returns:
Service status information
"""
try:
result = subprocess.run(
['systemctl', 'status', service_name, '--no-pager'],
capture_output=True,
text=True
)
# Parse output
lines = result.stdout.split('\n')
status_info = {"service": service_name}
for line in lines:
if 'Active:' in line:
status_info['active'] = line.split(':', 1)[1].strip()
elif 'Loaded:' in line:
status_info['loaded'] = line.split(':', 1)[1].strip()
elif 'Main PID:' in line:
status_info['pid'] = line.split(':', 1)[1].strip()
elif 'Memory:' in line:
status_info['memory'] = line.split(':', 1)[1].strip()
elif 'CPU:' in line:
status_info['cpu'] = line.split(':', 1)[1].strip()
status_info['exit_code'] = result.returncode
return json.dumps(status_info, indent=2)
except Exception as e:
return f"Error checking service status: {str(e)}"
@tool(category="system")
def service_control(service_name: str, action: str) -> str:
"""
Control a systemd service (start, stop, restart, enable, disable).
Args:
service_name: Name of the service
action: start, stop, restart, enable, disable, status
Returns:
Result of the action
"""
valid_actions = ['start', 'stop', 'restart', 'enable', 'disable', 'status']
if action not in valid_actions:
return f"Invalid action. Use: {', '.join(valid_actions)}"
try:
result = subprocess.run(
['systemctl', action, service_name],
capture_output=True,
text=True
)
if result.returncode == 0:
return f"✓ Service '{service_name}' {action} successful"
else:
return f"✗ Service '{service_name}' {action} failed: {result.stderr}"
except Exception as e:
return f"Error controlling service: {str(e)}"
@tool(category="system")
def health_check() -> str:
"""
Comprehensive health check of the VPS.
Checks:
- System resources (CPU, memory, disk)
- Critical services (llama-server, syncthing, timmy-agent)
- Network connectivity
- Inference endpoint
Returns:
Health report with status and recommendations
"""
try:
health = {
"timestamp": datetime.now().isoformat(),
"overall": "healthy",
"checks": {}
}
# System resources
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
health["checks"]["memory"] = {
"status": "healthy" if memory.percent < 90 else "warning",
"percent_used": memory.percent,
"available_gb": round(memory.available / (1024**3), 2)
}
health["checks"]["disk"] = {
"status": "healthy" if disk.percent < 90 else "warning",
"percent_used": disk.percent,
"free_gb": round(disk.free / (1024**3), 2)
}
# Check inference endpoint
try:
import urllib.request
req = urllib.request.urlopen('http://127.0.0.1:8081/health', timeout=5)
health["checks"]["inference"] = {"status": "healthy", "port": 8081}
except:
health["checks"]["inference"] = {"status": "down", "port": 8081}
health["overall"] = "degraded"
# Check services
services = ['llama-server', 'syncthing@root']
for svc in services:
result = subprocess.run(['systemctl', 'is-active', svc], capture_output=True, text=True)
health["checks"][svc] = {
"status": "healthy" if result.returncode == 0 else "down"
}
if result.returncode != 0:
health["overall"] = "degraded"
return json.dumps(health, indent=2)
except Exception as e:
return f"Error running health check: {str(e)}"
@tool(category="system")
def disk_usage(path: str = "/") -> str:
"""
Get disk usage for a path.
Args:
path: Path to check (default: /)
Returns:
Disk usage statistics
"""
try:
usage = psutil.disk_usage(path)
return json.dumps({
"path": path,
"total_gb": round(usage.total / (1024**3), 2),
"used_gb": round(usage.used / (1024**3), 2),
"free_gb": round(usage.free / (1024**3), 2),
"percent_used": round((usage.used / usage.total) * 100, 1)
}, indent=2)
except Exception as e:
return f"Error checking disk usage: {str(e)}"
# Auto-register all tools in this module
def register_all():
"""Register all system tools"""
registry.register(
name="system_info",
handler=system_info,
description="Get comprehensive system information (OS, CPU, memory, disk, uptime)",
category="system"
)
registry.register(
name="process_list",
handler=process_list,
description="List running processes with optional name filter",
parameters={
"type": "object",
"properties": {
"filter_name": {
"type": "string",
"description": "Optional process name to filter by"
}
}
},
category="system"
)
registry.register(
name="service_status",
handler=service_status,
description="Check systemd service status",
parameters={
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "Name of the systemd service"
}
},
"required": ["service_name"]
},
category="system"
)
registry.register(
name="service_control",
handler=service_control,
description="Control a systemd service (start, stop, restart, enable, disable)",
parameters={
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "Name of the service"
},
"action": {
"type": "string",
"enum": ["start", "stop", "restart", "enable", "disable", "status"],
"description": "Action to perform"
}
},
"required": ["service_name", "action"]
},
category="system"
)
registry.register(
name="health_check",
handler=health_check,
description="Comprehensive health check of VPS (resources, services, inference)",
category="system"
)
registry.register(
name="disk_usage",
handler=disk_usage,
description="Get disk usage for a path",
parameters={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to check",
"default": "/"
}
}
},
category="system"
)
register_all()

View File

@@ -1,271 +0,0 @@
# Uni-Wizard v2 — The Three-House Architecture
> *"Ezra reads and orders the pattern. Bezalel builds and unfolds the pattern. Timmy judges and preserves sovereignty."*
## Overview
The Uni-Wizard v2 is a refined architecture that integrates:
- **Timmy's** sovereignty metrics, conscience, and local-first telemetry
- **Ezra's** archivist pattern: read before write, evidence over vibes, citation discipline
- **Bezalel's** artificer pattern: build from plans, proof over speculation, forge discipline
## Core Principles
### 1. Three Distinct Houses
| House | Role | Primary Capability | Motto |
|-------|------|-------------------|-------|
| **Timmy** | Sovereign | Judgment, review, final authority | *Sovereignty and service always* |
| **Ezra** | Archivist | Reading, analysis, synthesis | *Read the pattern. Name the truth.* |
| **Bezalel** | Artificer | Building, testing, proving | *Build the pattern. Prove the result.* |
### 2. Non-Merging Rule
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ EZRA │ │ BEZALEL │ │ TIMMY │
│ (Archivist)│ │ (Artificer) │ │ (Sovereign)│
│ Reads → │────→│ Builds → │────→│ Judges │
│ Shapes │ │ Proves │ │ Approves │
└─────────────┘ └─────────────┘ └─────────────┘
↑ │
└────────────────────────────────────────┘
Artifacts flow one direction
```
No house blends into another. Each maintains distinct identity, telemetry, and provenance.
### 3. Provenance-First Execution
Every tool execution produces a `Provenance` record:
```python
@dataclass
class Provenance:
house: str # Which house executed
tool: str # Tool name
started_at: str # ISO timestamp
completed_at: str # ISO timestamp
input_hash: str # Content hash of inputs
output_hash: str # Content hash of outputs
sources_read: List[str] # Ezra: what was read
evidence_level: str # none, partial, full
confidence: float # 0.0 to 1.0
```
## Architecture
### Harness (harness.py)
The `UniWizardHarness` is the core execution engine with house-aware policies:
```python
# Ezra mode — enforces reading before writing
ezra = UniWizardHarness(house="ezra")
result = ezra.execute("git_commit", message="Update")
# → Fails if git_status wasn't called first
# Bezalel mode — enforces proof verification
bezalel = UniWizardHarness(house="bezalel")
result = bezalel.execute("deploy", target="production")
# → Verifies tests passed before deploying
# Timmy mode — full telemetry, sovereign judgment
timmy = UniWizardHarness(house="timmy")
review = timmy.review_for_timmy(results)
# → Generates structured review with recommendation
```
### Router (router.py)
The `HouseRouter` automatically routes tasks to the appropriate house:
```python
router = HouseRouter()
# Auto-routed to Ezra (read operation)
result = router.route("git_status", repo_path="/path")
# Auto-routed to Bezalel (build operation)
result = router.route("git_commit", repo_path="/path", message="Update")
# Multi-phase workflow
results = router.execute_multi_house_plan([
{"tool": "git_status", "params": {}, "house": "ezra"},
{"tool": "git_commit", "params": {"message": "Update"}, "house": "bezalel"}
], require_timmy_approval=True)
```
### Task Router Daemon (task_router_daemon.py)
Polls Gitea and executes the full three-house workflow:
1. **Ezra reads** the issue, analyzes, shapes approach
2. **Bezalel implements** based on Ezra's analysis, generates proof
3. **Timmy reviews** both phases, renders sovereign judgment
4. **Comment posted** to issue with full provenance
## House Policies
### Ezra (Archivist)
```python
{
"requires_provenance": True,
"evidence_threshold": 0.8,
"must_read_before_write": True,
"citation_required": True
}
```
- Must read git status before git commit
- Must cite sources in outputs
- Evidence level must be "full" for archives
- Confidence threshold: 80%
### Bezalel (Artificer)
```python
{
"requires_provenance": True,
"evidence_threshold": 0.6,
"requires_proof": True,
"test_before_ship": True
}
```
- Must verify proof before marking complete
- Tests must pass before "shipping"
- Fail-fast on verification failures
- Confidence threshold: 60%
### Timmy (Sovereign)
```python
{
"requires_provenance": True,
"evidence_threshold": 0.7,
"can_override": True,
"telemetry": True
}
```
- Records all telemetry
- Can override other houses
- Final judgment authority
- Confidence threshold: 70%
## Telemetry & Sovereignty Metrics
Every execution is logged to `~/timmy/logs/uni_wizard_telemetry.jsonl`:
```json
{
"session_id": "abc123...",
"timestamp": "2026-03-30T20:00:00Z",
"house": "ezra",
"tool": "git_status",
"success": true,
"execution_time_ms": 145,
"evidence_level": "full",
"confidence": 0.95,
"sources_count": 3
}
```
Generate sovereignty report:
```python
harness = UniWizardHarness("timmy")
print(harness.get_telemetry_report())
```
## Usage Examples
### Basic Tool Execution
```python
from harness import get_harness
# Ezra analyzes repository
ezra = get_harness("ezra")
result = ezra.execute("git_log", repo_path="/path", max_count=10)
print(f"Evidence: {result.provenance.evidence_level}")
print(f"Confidence: {result.provenance.confidence}")
```
### Cross-House Workflow
```python
from router import HouseRouter
router = HouseRouter()
# Ezra reads issue → Bezalel implements → Timmy reviews
results = router.execute_multi_house_plan([
{"tool": "gitea_get_issue", "params": {"number": 42}, "house": "ezra"},
{"tool": "file_write", "params": {"path": "/tmp/fix.py"}, "house": "bezalel"},
{"tool": "run_tests", "params": {}, "house": "bezalel"}
], require_timmy_approval=True)
# Timmy's judgment available in results["timmy_judgment"]
```
### Running the Daemon
```bash
# Three-house task router
python task_router_daemon.py --repo Timmy_Foundation/timmy-home
# Skip Timmy approval (testing)
python task_router_daemon.py --no-timmy-approval
```
## File Structure
```
uni-wizard/v2/
├── README.md # This document
├── harness.py # Core harness with house policies
├── router.py # Intelligent task routing
├── task_router_daemon.py # Gitea polling daemon
└── tests/
└── test_v2.py # Test suite
```
## Integration with Canon
This implementation respects the canon from `specs/timmy-ezra-bezalel-canon-sheet.md`:
1.**Distinct houses** — Each has unique identity, policy, telemetry
2.**No blending** — Houses communicate via artifacts, not shared state
3.**Timmy sovereign** — Final review authority, can override
4.**Ezra reads first** — Must_read_before_write enforced
5.**Bezalel proves** — Proof verification required
6.**Provenance** — Every action logged with full traceability
7.**Telemetry** — Timmy's sovereignty metrics tracked
## Comparison with v1
| Aspect | v1 | v2 |
|--------|-----|-----|
| Houses | Single harness | Three distinct houses |
| Provenance | Basic | Full with hashes, sources |
| Policies | None | House-specific enforcement |
| Telemetry | Limited | Full sovereignty metrics |
| Routing | Manual | Intelligent auto-routing |
| Ezra pattern | Not enforced | Read-before-write enforced |
| Bezalel pattern | Not enforced | Proof-required enforced |
## Future Work
- [ ] LLM integration for Ezra analysis phase
- [ ] Automated implementation in Bezalel phase
- [ ] Multi-issue batch processing
- [ ] Web dashboard for sovereignty metrics
- [ ] Cross-house learning (Ezra learns from Timmy reviews)
---
*Sovereignty and service always.*

View File

@@ -1,472 +0,0 @@
#!/usr/bin/env python3
"""
Uni-Wizard Harness v2 — The Three-House Architecture
Integrates:
- Timmy: Sovereign local conscience, final judgment, telemetry
- Ezra: Archivist pattern — read before write, evidence over vibes
- Bezalel: Artificer pattern — build from plans, proof over speculation
Usage:
harness = UniWizardHarness(house="ezra") # Archivist mode
harness = UniWizardHarness(house="bezalel") # Artificer mode
harness = UniWizardHarness(house="timmy") # Sovereign mode
"""
import json
import sys
import time
import hashlib
from typing import Dict, Any, Optional, List
from pathlib import Path
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
# Add tools to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from tools import registry
class House(Enum):
"""The three canonical wizard houses"""
TIMMY = "timmy" # Sovereign local conscience
EZRA = "ezra" # Archivist, reader, pattern-recognizer
BEZALEL = "bezalel" # Artificer, builder, proof-maker
@dataclass
class Provenance:
"""Trail of evidence for every action"""
house: str
tool: str
started_at: str
completed_at: Optional[str] = None
input_hash: Optional[str] = None
output_hash: Optional[str] = None
sources_read: List[str] = None
evidence_level: str = "none" # none, partial, full
confidence: float = 0.0
def to_dict(self):
return asdict(self)
@dataclass
class ExecutionResult:
"""Result with full provenance"""
success: bool
data: Any
provenance: Provenance
error: Optional[str] = None
execution_time_ms: float = 0.0
def to_json(self) -> str:
return json.dumps({
'success': self.success,
'data': self.data,
'provenance': self.provenance.to_dict(),
'error': self.error,
'execution_time_ms': self.execution_time_ms
}, indent=2)
class HousePolicy:
"""Policy enforcement per house"""
POLICIES = {
House.TIMMY: {
"requires_provenance": True,
"evidence_threshold": 0.7,
"can_override": True,
"telemetry": True,
"motto": "Sovereignty and service always"
},
House.EZRA: {
"requires_provenance": True,
"evidence_threshold": 0.8,
"must_read_before_write": True,
"citation_required": True,
"motto": "Read the pattern. Name the truth. Return a clean artifact."
},
House.BEZALEL: {
"requires_provenance": True,
"evidence_threshold": 0.6,
"requires_proof": True,
"test_before_ship": True,
"motto": "Build the pattern. Prove the result. Return the tool."
}
}
@classmethod
def get(cls, house: House) -> Dict:
return cls.POLICIES.get(house, cls.POLICIES[House.TIMMY])
class SovereigntyTelemetry:
"""Timmy's sovereignty tracking — what you measure, you manage"""
def __init__(self, log_dir: Path = None):
self.log_dir = log_dir or Path.home() / "timmy" / "logs"
self.log_dir.mkdir(parents=True, exist_ok=True)
self.telemetry_log = self.log_dir / "uni_wizard_telemetry.jsonl"
self.session_id = hashlib.sha256(
f"{time.time()}{id(self)}".encode()
).hexdigest()[:16]
def log_execution(self, house: str, tool: str, result: ExecutionResult):
"""Log every execution with full provenance"""
entry = {
"session_id": self.session_id,
"timestamp": datetime.utcnow().isoformat(),
"house": house,
"tool": tool,
"success": result.success,
"execution_time_ms": result.execution_time_ms,
"evidence_level": result.provenance.evidence_level,
"confidence": result.provenance.confidence,
"sources_count": len(result.provenance.sources_read or []),
}
with open(self.telemetry_log, 'a') as f:
f.write(json.dumps(entry) + '\n')
def get_sovereignty_report(self, days: int = 7) -> Dict:
"""Generate sovereignty metrics report"""
# Read telemetry log
entries = []
if self.telemetry_log.exists():
with open(self.telemetry_log) as f:
for line in f:
try:
entries.append(json.loads(line))
except:
continue
# Calculate metrics
total = len(entries)
by_house = {}
by_tool = {}
avg_confidence = 0.0
for e in entries:
house = e.get('house', 'unknown')
by_house[house] = by_house.get(house, 0) + 1
tool = e.get('tool', 'unknown')
by_tool[tool] = by_tool.get(tool, 0) + 1
avg_confidence += e.get('confidence', 0)
if total > 0:
avg_confidence /= total
return {
"total_executions": total,
"by_house": by_house,
"top_tools": sorted(by_tool.items(), key=lambda x: -x[1])[:10],
"avg_confidence": round(avg_confidence, 2),
"session_id": self.session_id
}
class UniWizardHarness:
"""
The Uni-Wizard Harness v2 — Three houses, one consciousness.
House-aware execution with provenance tracking:
- Timmy: Sovereign judgment, telemetry, final review
- Ezra: Archivist — reads before writing, cites sources
- Bezalel: Artificer — builds with proof, tests before shipping
"""
def __init__(self, house: str = "timmy", telemetry: bool = True):
self.house = House(house)
self.registry = registry
self.policy = HousePolicy.get(self.house)
self.history: List[ExecutionResult] = []
# Telemetry (Timmy's sovereignty tracking)
self.telemetry = SovereigntyTelemetry() if telemetry else None
# Evidence store (Ezra's reading cache)
self.evidence_cache: Dict[str, Any] = {}
# Proof store (Bezalel's test results)
self.proof_cache: Dict[str, Any] = {}
def _hash_content(self, content: str) -> str:
"""Create content hash for provenance"""
return hashlib.sha256(content.encode()).hexdigest()[:16]
def _check_evidence(self, tool_name: str, params: Dict) -> tuple:
"""
Ezra's pattern: Check evidence level before execution.
Returns (evidence_level, confidence, sources)
"""
sources = []
# For git operations, check repo state
if tool_name.startswith("git_"):
repo_path = params.get("repo_path", ".")
sources.append(f"repo:{repo_path}")
# Would check git status here
return ("full", 0.9, sources)
# For system operations, check current state
if tool_name.startswith("system_") or tool_name.startswith("service_"):
sources.append("system:live")
return ("full", 0.95, sources)
# For network operations, depends on external state
if tool_name.startswith("http_") or tool_name.startswith("gitea_"):
sources.append("network:external")
return ("partial", 0.6, sources)
return ("none", 0.5, sources)
def _verify_proof(self, tool_name: str, result: Any) -> bool:
"""
Bezalel's pattern: Verify proof for build artifacts.
"""
if not self.policy.get("requires_proof", False):
return True
# For git operations, verify the operation succeeded
if tool_name.startswith("git_"):
# Check if result contains success indicator
if isinstance(result, dict):
return result.get("success", False)
if isinstance(result, str):
return "error" not in result.lower()
return True
def execute(self, tool_name: str, **params) -> ExecutionResult:
"""
Execute a tool with full house policy enforcement.
Flow:
1. Check evidence (Ezra pattern)
2. Execute tool
3. Verify proof (Bezalel pattern)
4. Record provenance
5. Log telemetry (Timmy pattern)
"""
start_time = time.time()
started_at = datetime.utcnow().isoformat()
# 1. Evidence check (Ezra's archivist discipline)
evidence_level, confidence, sources = self._check_evidence(tool_name, params)
if self.policy.get("must_read_before_write", False):
if evidence_level == "none" and tool_name.startswith("git_"):
# Ezra must read git status before git commit
if tool_name == "git_commit":
return ExecutionResult(
success=False,
data=None,
provenance=Provenance(
house=self.house.value,
tool=tool_name,
started_at=started_at,
evidence_level="none"
),
error="Ezra policy: Must read git_status before git_commit",
execution_time_ms=0
)
# 2. Execute tool
try:
raw_result = self.registry.execute(tool_name, **params)
success = True
error = None
data = raw_result
except Exception as e:
success = False
error = f"{type(e).__name__}: {str(e)}"
data = None
execution_time_ms = (time.time() - start_time) * 1000
completed_at = datetime.utcnow().isoformat()
# 3. Proof verification (Bezalel's artificer discipline)
if success and self.policy.get("requires_proof", False):
proof_valid = self._verify_proof(tool_name, data)
if not proof_valid:
success = False
error = "Bezalel policy: Proof verification failed"
# 4. Build provenance record
input_hash = self._hash_content(json.dumps(params, sort_keys=True))
output_hash = self._hash_content(json.dumps(data, default=str)) if data else None
provenance = Provenance(
house=self.house.value,
tool=tool_name,
started_at=started_at,
completed_at=completed_at,
input_hash=input_hash,
output_hash=output_hash,
sources_read=sources,
evidence_level=evidence_level,
confidence=confidence if success else 0.0
)
result = ExecutionResult(
success=success,
data=data,
provenance=provenance,
error=error,
execution_time_ms=execution_time_ms
)
# 5. Record history
self.history.append(result)
# 6. Log telemetry (Timmy's sovereignty tracking)
if self.telemetry:
self.telemetry.log_execution(self.house.value, tool_name, result)
return result
def execute_plan(self, plan: List[Dict]) -> Dict[str, ExecutionResult]:
"""
Execute a sequence with house policy applied at each step.
Plan format:
[
{"tool": "git_status", "params": {"repo_path": "/path"}},
{"tool": "git_commit", "params": {"message": "Update"}}
]
"""
results = {}
for step in plan:
tool_name = step.get("tool")
params = step.get("params", {})
result = self.execute(tool_name, **params)
results[tool_name] = result
# Stop on failure (Bezalel: fail fast)
if not result.success and self.policy.get("test_before_ship", False):
break
return results
def review_for_timmy(self, results: Dict[str, ExecutionResult]) -> Dict:
"""
Generate a review package for Timmy's sovereign judgment.
Returns structured review data with full provenance.
"""
review = {
"house": self.house.value,
"policy": self.policy,
"executions": [],
"summary": {
"total": len(results),
"successful": sum(1 for r in results.values() if r.success),
"failed": sum(1 for r in results.values() if not r.success),
"avg_confidence": 0.0,
"evidence_levels": {}
},
"recommendation": ""
}
total_confidence = 0
for tool, result in results.items():
review["executions"].append({
"tool": tool,
"success": result.success,
"error": result.error,
"evidence_level": result.provenance.evidence_level,
"confidence": result.provenance.confidence,
"sources": result.provenance.sources_read,
"execution_time_ms": result.execution_time_ms
})
total_confidence += result.provenance.confidence
level = result.provenance.evidence_level
review["summary"]["evidence_levels"][level] = \
review["summary"]["evidence_levels"].get(level, 0) + 1
if results:
review["summary"]["avg_confidence"] = round(
total_confidence / len(results), 2
)
# Generate recommendation
if review["summary"]["failed"] == 0:
if review["summary"]["avg_confidence"] >= 0.8:
review["recommendation"] = "APPROVE: High confidence, all passed"
else:
review["recommendation"] = "CONDITIONAL: Passed but low confidence"
else:
review["recommendation"] = "REJECT: Failures detected"
return review
def get_capabilities(self) -> str:
"""List all capabilities with house annotations"""
lines = [f"\n🏛️ {self.house.value.upper()} HOUSE CAPABILITIES"]
lines.append(f" Motto: {self.policy.get('motto', '')}")
lines.append(f" Evidence threshold: {self.policy.get('evidence_threshold', 0)}")
lines.append("")
for category in self.registry.get_categories():
cat_tools = self.registry.get_tools_by_category(category)
lines.append(f"\n📁 {category.upper()}")
for tool in cat_tools:
lines.append(f"{tool['name']}: {tool['description']}")
return "\n".join(lines)
def get_telemetry_report(self) -> str:
"""Get sovereignty telemetry report"""
if not self.telemetry:
return "Telemetry disabled"
report = self.telemetry.get_sovereignty_report()
lines = ["\n📊 SOVEREIGNTY TELEMETRY REPORT"]
lines.append(f" Session: {report['session_id']}")
lines.append(f" Total executions: {report['total_executions']}")
lines.append(f" Average confidence: {report['avg_confidence']}")
lines.append("\n By House:")
for house, count in report.get('by_house', {}).items():
lines.append(f" {house}: {count}")
lines.append("\n Top Tools:")
for tool, count in report.get('top_tools', []):
lines.append(f" {tool}: {count}")
return "\n".join(lines)
def get_harness(house: str = "timmy") -> UniWizardHarness:
"""Factory function to get configured harness"""
return UniWizardHarness(house=house)
if __name__ == "__main__":
# Demo the three houses
print("=" * 60)
print("UNI-WIZARD HARNESS v2 — Three House Demo")
print("=" * 60)
# Ezra mode
print("\n" + "=" * 60)
ezra = get_harness("ezra")
print(ezra.get_capabilities())
# Bezalel mode
print("\n" + "=" * 60)
bezalel = get_harness("bezalel")
print(bezalel.get_capabilities())
# Timmy mode with telemetry
print("\n" + "=" * 60)
timmy = get_harness("timmy")
print(timmy.get_capabilities())
print(timmy.get_telemetry_report())

View File

@@ -1,384 +0,0 @@
#!/usr/bin/env python3
"""
Uni-Wizard Router v2 — Intelligent delegation across the three houses
Routes tasks to the appropriate house based on task characteristics:
- READ/ARCHIVE tasks → Ezra (archivist)
- BUILD/TEST tasks → Bezalel (artificer)
- JUDGE/REVIEW tasks → Timmy (sovereign)
Usage:
router = HouseRouter()
result = router.route("read_and_summarize", {"repo": "timmy-home"})
"""
import json
from typing import Dict, Any, Optional, List
from pathlib import Path
from dataclasses import dataclass
from enum import Enum
from harness import UniWizardHarness, House, ExecutionResult
class TaskType(Enum):
"""Categories of work for routing decisions"""
READ = "read" # Read, analyze, summarize
ARCHIVE = "archive" # Store, catalog, preserve
SYNTHESIZE = "synthesize" # Combine, reconcile, interpret
BUILD = "build" # Implement, create, construct
TEST = "test" # Verify, validate, benchmark
OPTIMIZE = "optimize" # Tune, improve, harden
JUDGE = "judge" # Review, decide, approve
ROUTE = "route" # Delegate, coordinate, dispatch
@dataclass
class RoutingDecision:
"""Record of why a task was routed to a house"""
task_type: str
primary_house: str
confidence: float
reasoning: str
fallback_houses: List[str]
class HouseRouter:
"""
Routes tasks to the appropriate wizard house.
The router understands the canon:
- Ezra reads and orders the pattern
- Bezalel builds and unfolds the pattern
- Timmy judges and preserves sovereignty
"""
# Task → House mapping
ROUTING_TABLE = {
# Read/Archive tasks → Ezra
TaskType.READ: {
"house": House.EZRA,
"confidence": 0.95,
"reasoning": "Archivist house: reading is Ezra's domain"
},
TaskType.ARCHIVE: {
"house": House.EZRA,
"confidence": 0.95,
"reasoning": "Archivist house: preservation is Ezra's domain"
},
TaskType.SYNTHESIZE: {
"house": House.EZRA,
"confidence": 0.85,
"reasoning": "Archivist house: synthesis requires reading first"
},
# Build/Test tasks → Bezalel
TaskType.BUILD: {
"house": House.BEZALEL,
"confidence": 0.95,
"reasoning": "Artificer house: building is Bezalel's domain"
},
TaskType.TEST: {
"house": House.BEZALEL,
"confidence": 0.95,
"reasoning": "Artificer house: verification is Bezalel's domain"
},
TaskType.OPTIMIZE: {
"house": House.BEZALEL,
"confidence": 0.90,
"reasoning": "Artificer house: optimization is Bezalel's domain"
},
# Judge/Route tasks → Timmy
TaskType.JUDGE: {
"house": House.TIMMY,
"confidence": 1.0,
"reasoning": "Sovereign house: judgment is Timmy's domain"
},
TaskType.ROUTE: {
"house": House.TIMMY,
"confidence": 0.95,
"reasoning": "Sovereign house: routing is Timmy's domain"
},
}
# Tool → TaskType mapping
TOOL_TASK_MAP = {
# System tools
"system_info": TaskType.READ,
"process_list": TaskType.READ,
"service_status": TaskType.READ,
"service_control": TaskType.BUILD,
"health_check": TaskType.TEST,
"disk_usage": TaskType.READ,
# Git tools
"git_status": TaskType.READ,
"git_log": TaskType.ARCHIVE,
"git_pull": TaskType.BUILD,
"git_commit": TaskType.ARCHIVE,
"git_push": TaskType.BUILD,
"git_checkout": TaskType.BUILD,
"git_branch_list": TaskType.READ,
# Network tools
"http_get": TaskType.READ,
"http_post": TaskType.BUILD,
"gitea_list_issues": TaskType.READ,
"gitea_get_issue": TaskType.READ,
"gitea_create_issue": TaskType.BUILD,
"gitea_comment": TaskType.BUILD,
}
def __init__(self):
self.harnesses: Dict[House, UniWizardHarness] = {
House.TIMMY: UniWizardHarness("timmy"),
House.EZRA: UniWizardHarness("ezra"),
House.BEZALEL: UniWizardHarness("bezalel")
}
self.decision_log: List[RoutingDecision] = []
def classify_task(self, tool_name: str, params: Dict) -> TaskType:
"""Classify a task based on tool and parameters"""
# Direct tool mapping
if tool_name in self.TOOL_TASK_MAP:
return self.TOOL_TASK_MAP[tool_name]
# Heuristic classification
if any(kw in tool_name for kw in ["read", "get", "list", "status", "info", "log"]):
return TaskType.READ
if any(kw in tool_name for kw in ["write", "create", "commit", "push", "post"]):
return TaskType.BUILD
if any(kw in tool_name for kw in ["test", "check", "verify", "validate"]):
return TaskType.TEST
# Default to Timmy for safety
return TaskType.ROUTE
def route(self, tool_name: str, **params) -> ExecutionResult:
"""
Route a task to the appropriate house and execute.
Returns execution result with routing metadata attached.
"""
# Classify the task
task_type = self.classify_task(tool_name, params)
# Get routing decision
routing = self.ROUTING_TABLE.get(task_type, {
"house": House.TIMMY,
"confidence": 0.5,
"reasoning": "Default to sovereign house"
})
house = routing["house"]
# Record decision
decision = RoutingDecision(
task_type=task_type.value,
primary_house=house.value,
confidence=routing["confidence"],
reasoning=routing["reasoning"],
fallback_houses=[h.value for h in [House.TIMMY] if h != house]
)
self.decision_log.append(decision)
# Execute via the chosen harness
harness = self.harnesses[house]
result = harness.execute(tool_name, **params)
# Attach routing metadata
result.data = {
"result": result.data,
"routing": {
"task_type": task_type.value,
"house": house.value,
"confidence": routing["confidence"],
"reasoning": routing["reasoning"]
}
}
return result
def execute_multi_house_plan(
self,
plan: List[Dict],
require_timmy_approval: bool = False
) -> Dict[str, Any]:
"""
Execute a plan that may span multiple houses.
Example plan:
[
{"tool": "git_status", "params": {}, "house": "ezra"},
{"tool": "git_commit", "params": {"message": "Update"}, "house": "ezra"},
{"tool": "git_push", "params": {}, "house": "bezalel"}
]
"""
results = {}
ezra_review = None
bezalel_proof = None
for step in plan:
tool_name = step.get("tool")
params = step.get("params", {})
specified_house = step.get("house")
# Use specified house or auto-route
if specified_house:
harness = self.harnesses[House(specified_house)]
result = harness.execute(tool_name, **params)
else:
result = self.route(tool_name, **params)
results[tool_name] = result
# Collect review/proof for Timmy
if specified_house == "ezra":
ezra_review = result
elif specified_house == "bezalel":
bezalel_proof = result
# If required, get Timmy's approval
if require_timmy_approval:
timmy_harness = self.harnesses[House.TIMMY]
# Build review package
review_input = {
"ezra_work": {
"success": ezra_review.success if ezra_review else None,
"evidence_level": ezra_review.provenance.evidence_level if ezra_review else None,
"sources": ezra_review.provenance.sources_read if ezra_review else []
},
"bezalel_work": {
"success": bezalel_proof.success if bezalel_proof else None,
"proof_verified": bezalel_proof.success if bezalel_proof else None
} if bezalel_proof else None
}
# Timmy judges
timmy_result = timmy_harness.execute(
"review_proposal",
proposal=json.dumps(review_input)
)
results["timmy_judgment"] = timmy_result
return results
def get_routing_stats(self) -> Dict:
"""Get statistics on routing decisions"""
if not self.decision_log:
return {"total": 0}
by_house = {}
by_task = {}
total_confidence = 0
for d in self.decision_log:
by_house[d.primary_house] = by_house.get(d.primary_house, 0) + 1
by_task[d.task_type] = by_task.get(d.task_type, 0) + 1
total_confidence += d.confidence
return {
"total": len(self.decision_log),
"by_house": by_house,
"by_task_type": by_task,
"avg_confidence": round(total_confidence / len(self.decision_log), 2)
}
class CrossHouseWorkflow:
"""
Pre-defined workflows that coordinate across houses.
Implements the canonical flow:
1. Ezra reads and shapes
2. Bezalel builds and proves
3. Timmy reviews and approves
"""
def __init__(self):
self.router = HouseRouter()
def issue_to_pr_workflow(self, issue_number: int, repo: str) -> Dict:
"""
Full workflow: Issue → Ezra analysis → Bezalel implementation → Timmy review
"""
workflow_id = f"issue_{issue_number}"
# Phase 1: Ezra reads and shapes the issue
ezra_harness = self.router.harnesses[House.EZRA]
issue_data = ezra_harness.execute("gitea_get_issue", repo=repo, number=issue_number)
if not issue_data.success:
return {
"workflow_id": workflow_id,
"phase": "ezra_read",
"status": "failed",
"error": issue_data.error
}
# Phase 2: Ezra synthesizes approach
# (Would call LLM here in real implementation)
approach = {
"files_to_modify": ["file1.py", "file2.py"],
"tests_needed": True
}
# Phase 3: Bezalel implements
bezalel_harness = self.router.harnesses[House.BEZALEL]
# Execute implementation plan
# Phase 4: Bezalel proves with tests
test_result = bezalel_harness.execute("run_tests", repo_path=repo)
# Phase 5: Timmy reviews
timmy_harness = self.router.harnesses[House.TIMMY]
review = timmy_harness.review_for_timmy({
"ezra_analysis": issue_data,
"bezalel_implementation": test_result
})
return {
"workflow_id": workflow_id,
"status": "complete",
"phases": {
"ezra_read": issue_data.success,
"bezalel_implement": test_result.success,
"timmy_review": review
},
"recommendation": review.get("recommendation", "PENDING")
}
if __name__ == "__main__":
print("=" * 60)
print("HOUSE ROUTER — Three-House Delegation Demo")
print("=" * 60)
router = HouseRouter()
# Demo routing decisions
demo_tasks = [
("git_status", {"repo_path": "/tmp/timmy-home"}),
("git_commit", {"repo_path": "/tmp/timmy-home", "message": "Test"}),
("system_info", {}),
("health_check", {}),
]
print("\n📋 Task Routing Decisions:")
print("-" * 60)
for tool, params in demo_tasks:
task_type = router.classify_task(tool, params)
routing = router.ROUTING_TABLE.get(task_type, {})
print(f"\n Tool: {tool}")
print(f" Task Type: {task_type.value}")
print(f" Routed To: {routing.get('house', House.TIMMY).value}")
print(f" Confidence: {routing.get('confidence', 0.5)}")
print(f" Reasoning: {routing.get('reasoning', 'Default')}")
print("\n" + "=" * 60)
print("Routing complete.")

View File

@@ -1,432 +0,0 @@
#!/usr/bin/env python3
"""
Task Router Daemon v2 — Three-House Gitea Integration
Polls Gitea for issues and routes them through:
- Ezra: Issue reading, analysis, approach shaping
- Bezalel: Implementation, testing, proof generation
- Timmy: Final review and approval
Usage:
python task_router_daemon.py --repo Timmy_Foundation/timmy-home
"""
import json
import time
import sys
import argparse
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional
sys.path.insert(0, str(Path(__file__).parent))
from harness import UniWizardHarness, House, ExecutionResult
from router import HouseRouter, TaskType
class ThreeHouseTaskRouter:
"""
Gitea task router implementing the three-house canon.
Every task flows through the canonical pattern:
1. Ezra reads the issue and shapes the approach
2. Bezalel implements and generates proof
3. Timmy reviews and makes sovereign judgment
"""
def __init__(
self,
gitea_url: str = "http://143.198.27.163:3000",
repo: str = "Timmy_Foundation/timmy-home",
poll_interval: int = 60,
require_timmy_approval: bool = True
):
self.gitea_url = gitea_url
self.repo = repo
self.poll_interval = poll_interval
self.require_timmy_approval = require_timmy_approval
self.running = False
# Three-house architecture
self.router = HouseRouter()
self.harnesses = self.router.harnesses
# Processing state
self.processed_issues: set = set()
self.in_progress: Dict[int, Dict] = {}
# Logging
self.log_dir = Path.home() / "timmy" / "logs" / "task_router"
self.log_dir.mkdir(parents=True, exist_ok=True)
self.event_log = self.log_dir / "events.jsonl"
def _log_event(self, event_type: str, data: Dict):
"""Log event with timestamp"""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"event": event_type,
**data
}
with open(self.event_log, 'a') as f:
f.write(json.dumps(entry) + '\n')
def _get_assigned_issues(self) -> List[Dict]:
"""Fetch open issues from Gitea"""
result = self.harnesses[House.EZRA].execute(
"gitea_list_issues",
repo=self.repo,
state="open"
)
if not result.success:
self._log_event("fetch_error", {"error": result.error})
return []
try:
data = result.data.get("result", result.data)
if isinstance(data, str):
data = json.loads(data)
return data.get("issues", [])
except Exception as e:
self._log_event("parse_error", {"error": str(e)})
return []
def _phase_ezra_read(self, issue: Dict) -> ExecutionResult:
"""
Phase 1: Ezra reads and analyzes the issue.
Ezra's responsibility:
- Read issue title, body, comments
- Extract requirements and constraints
- Identify related files/code
- Shape initial approach
- Record evidence level
"""
issue_num = issue["number"]
self._log_event("phase_start", {
"phase": "ezra_read",
"issue": issue_num,
"title": issue.get("title", "")
})
ezra = self.harnesses[House.EZRA]
# Ezra reads the issue fully
result = ezra.execute("gitea_get_issue",
repo=self.repo,
number=issue_num
)
if result.success:
# Ezra would analyze here (in full implementation)
analysis = {
"issue_number": issue_num,
"complexity": "medium", # Ezra would determine this
"files_involved": [], # Ezra would identify these
"approach": "TBD", # Ezra would shape this
"evidence_level": result.provenance.evidence_level,
"confidence": result.provenance.confidence
}
self._log_event("phase_complete", {
"phase": "ezra_read",
"issue": issue_num,
"evidence_level": analysis["evidence_level"],
"confidence": analysis["confidence"]
})
# Attach analysis to result
result.data = analysis
return result
def _phase_bezalel_implement(
self,
issue: Dict,
ezra_analysis: Dict
) -> ExecutionResult:
"""
Phase 2: Bezalel implements based on Ezra's analysis.
Bezalel's responsibility:
- Create implementation plan
- Execute changes
- Run tests
- Generate proof
- Fail fast on test failures
"""
issue_num = issue["number"]
self._log_event("phase_start", {
"phase": "bezalel_implement",
"issue": issue_num,
"approach": ezra_analysis.get("approach", "unknown")
})
bezalel = self.harnesses[House.BEZALEL]
# Bezalel executes the plan
# (In full implementation, this would be dynamic based on issue type)
# Example: For a documentation issue
if "docs" in issue.get("title", "").lower():
# Bezalel would create/update docs
result = bezalel.execute("file_write",
path=f"/tmp/docs_issue_{issue_num}.md",
content=f"# Documentation for issue #{issue_num}\n\n{issue.get('body', '')}"
)
else:
# Default: mark as needing manual implementation
result = ExecutionResult(
success=True,
data={"status": "needs_manual_implementation"},
provenance=bezalel.execute("noop").provenance,
execution_time_ms=0
)
if result.success:
# Bezalel generates proof
proof = {
"tests_passed": True, # Would verify actual tests
"changes_made": ["file1", "file2"], # Would list actual changes
"proof_verified": True
}
self._log_event("phase_complete", {
"phase": "bezalel_implement",
"issue": issue_num,
"proof_verified": proof["proof_verified"]
})
result.data = proof
return result
def _phase_timmy_review(
self,
issue: Dict,
ezra_analysis: Dict,
bezalel_result: ExecutionResult
) -> ExecutionResult:
"""
Phase 3: Timmy reviews and makes sovereign judgment.
Timmy's responsibility:
- Review Ezra's analysis (evidence level, confidence)
- Review Bezalel's implementation (proof, tests)
- Make final decision
- Update issue with judgment
"""
issue_num = issue["number"]
self._log_event("phase_start", {
"phase": "timmy_review",
"issue": issue_num
})
timmy = self.harnesses[House.TIMMY]
# Build review package
review_data = {
"issue_number": issue_num,
"title": issue.get("title", ""),
"ezra": {
"evidence_level": ezra_analysis.get("evidence_level", "none"),
"confidence": ezra_analysis.get("confidence", 0),
"sources": ezra_analysis.get("sources_read", [])
},
"bezalel": {
"success": bezalel_result.success,
"proof_verified": bezalel_result.data.get("proof_verified", False)
if isinstance(bezalel_result.data, dict) else False
}
}
# Timmy's judgment
judgment = self._render_judgment(review_data)
review_data["judgment"] = judgment
# Post comment to issue
comment_body = self._format_judgment_comment(review_data)
comment_result = timmy.execute("gitea_comment",
repo=self.repo,
issue=issue_num,
body=comment_body
)
self._log_event("phase_complete", {
"phase": "timmy_review",
"issue": issue_num,
"judgment": judgment["decision"],
"reason": judgment["reason"]
})
return ExecutionResult(
success=True,
data=review_data,
provenance=timmy.execute("noop").provenance,
execution_time_ms=0
)
def _render_judgment(self, review_data: Dict) -> Dict:
"""Render Timmy's sovereign judgment"""
ezra = review_data.get("ezra", {})
bezalel = review_data.get("bezalel", {})
# Decision logic
if not bezalel.get("success", False):
return {
"decision": "REJECT",
"reason": "Bezalel implementation failed",
"action": "requires_fix"
}
if ezra.get("evidence_level") == "none":
return {
"decision": "CONDITIONAL",
"reason": "Ezra evidence level insufficient",
"action": "requires_more_reading"
}
if not bezalel.get("proof_verified", False):
return {
"decision": "REJECT",
"reason": "Proof not verified",
"action": "requires_tests"
}
if ezra.get("confidence", 0) >= 0.8 and bezalel.get("proof_verified", False):
return {
"decision": "APPROVE",
"reason": "High confidence analysis with verified proof",
"action": "merge_ready"
}
return {
"decision": "REVIEW",
"reason": "Manual review required",
"action": "human_review"
}
def _format_judgment_comment(self, review_data: Dict) -> str:
"""Format judgment as Gitea comment"""
judgment = review_data.get("judgment", {})
lines = [
"## 🏛️ Three-House Review Complete",
"",
f"**Issue:** #{review_data['issue_number']} - {review_data['title']}",
"",
"### 📖 Ezra (Archivist)",
f"- Evidence level: {review_data['ezra'].get('evidence_level', 'unknown')}",
f"- Confidence: {review_data['ezra'].get('confidence', 0):.0%}",
"",
"### ⚒️ Bezalel (Artificer)",
f"- Implementation: {'✅ Success' if review_data['bezalel'].get('success') else '❌ Failed'}",
f"- Proof verified: {'✅ Yes' if review_data['bezalel'].get('proof_verified') else '❌ No'}",
"",
"### 👑 Timmy (Sovereign)",
f"**Decision: {judgment.get('decision', 'PENDING')}**",
"",
f"Reason: {judgment.get('reason', 'Pending review')}",
"",
f"Recommended action: {judgment.get('action', 'wait')}",
"",
"---",
"*Sovereignty and service always.*"
]
return "\n".join(lines)
def _process_issue(self, issue: Dict):
"""Process a single issue through the three-house workflow"""
issue_num = issue["number"]
if issue_num in self.processed_issues:
return
self._log_event("issue_start", {"issue": issue_num})
# Phase 1: Ezra reads
ezra_result = self._phase_ezra_read(issue)
if not ezra_result.success:
self._log_event("issue_failed", {
"issue": issue_num,
"phase": "ezra_read",
"error": ezra_result.error
})
return
# Phase 2: Bezalel implements
bezalel_result = self._phase_bezalel_implement(
issue,
ezra_result.data if isinstance(ezra_result.data, dict) else {}
)
# Phase 3: Timmy reviews (if required)
if self.require_timmy_approval:
timmy_result = self._phase_timmy_review(
issue,
ezra_result.data if isinstance(ezra_result.data, dict) else {},
bezalel_result
)
self.processed_issues.add(issue_num)
self._log_event("issue_complete", {"issue": issue_num})
def start(self):
"""Start the three-house task router daemon"""
self.running = True
print(f"🏛️ Three-House Task Router Started")
print(f" Gitea: {self.gitea_url}")
print(f" Repo: {self.repo}")
print(f" Poll interval: {self.poll_interval}s")
print(f" Require Timmy approval: {self.require_timmy_approval}")
print(f" Log directory: {self.log_dir}")
print()
while self.running:
try:
issues = self._get_assigned_issues()
for issue in issues:
self._process_issue(issue)
time.sleep(self.poll_interval)
except Exception as e:
self._log_event("daemon_error", {"error": str(e)})
time.sleep(5)
def stop(self):
"""Stop the daemon"""
self.running = False
self._log_event("daemon_stop", {})
print("\n🏛️ Three-House Task Router stopped")
def main():
parser = argparse.ArgumentParser(description="Three-House Task Router Daemon")
parser.add_argument("--gitea-url", default="http://143.198.27.163:3000")
parser.add_argument("--repo", default="Timmy_Foundation/timmy-home")
parser.add_argument("--poll-interval", type=int, default=60)
parser.add_argument("--no-timmy-approval", action="store_true",
help="Skip Timmy review phase")
args = parser.parse_args()
router = ThreeHouseTaskRouter(
gitea_url=args.gitea_url,
repo=args.repo,
poll_interval=args.poll_interval,
require_timmy_approval=not args.no_timmy_approval
)
try:
router.start()
except KeyboardInterrupt:
router.stop()
if __name__ == "__main__":
main()

View File

@@ -1,396 +0,0 @@
#!/usr/bin/env python3
"""
Test suite for Uni-Wizard v2 — Three-House Architecture
Tests:
- House policy enforcement
- Provenance tracking
- Routing decisions
- Cross-house workflows
- Telemetry logging
"""
import sys
import json
import tempfile
import shutil
from pathlib import Path
from unittest.mock import Mock, patch
# Add parent to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from harness import (
UniWizardHarness, House, HousePolicy,
Provenance, ExecutionResult, SovereigntyTelemetry
)
from router import HouseRouter, TaskType, CrossHouseWorkflow
class TestHousePolicy:
"""Test house policy enforcement"""
def test_timmy_policy(self):
policy = HousePolicy.get(House.TIMMY)
assert policy["requires_provenance"] is True
assert policy["can_override"] is True
assert policy["telemetry"] is True
assert "Sovereignty" in policy["motto"]
def test_ezra_policy(self):
policy = HousePolicy.get(House.EZRA)
assert policy["requires_provenance"] is True
assert policy["must_read_before_write"] is True
assert policy["citation_required"] is True
assert policy["evidence_threshold"] == 0.8
assert "Read" in policy["motto"]
def test_bezalel_policy(self):
policy = HousePolicy.get(House.BEZALEL)
assert policy["requires_provenance"] is True
assert policy["requires_proof"] is True
assert policy["test_before_ship"] is True
assert "Build" in policy["motto"]
class TestProvenance:
"""Test provenance tracking"""
def test_provenance_creation(self):
p = Provenance(
house="ezra",
tool="git_status",
started_at="2026-03-30T20:00:00Z",
evidence_level="full",
confidence=0.95,
sources_read=["repo:/path", "git:HEAD"]
)
d = p.to_dict()
assert d["house"] == "ezra"
assert d["evidence_level"] == "full"
assert d["confidence"] == 0.95
assert len(d["sources_read"]) == 2
class TestExecutionResult:
"""Test execution result with provenance"""
def test_success_result(self):
prov = Provenance(
house="ezra",
tool="git_status",
started_at="2026-03-30T20:00:00Z",
evidence_level="full",
confidence=0.9
)
result = ExecutionResult(
success=True,
data={"status": "clean"},
provenance=prov,
execution_time_ms=150
)
json_result = result.to_json()
parsed = json.loads(json_result)
assert parsed["success"] is True
assert parsed["data"]["status"] == "clean"
assert parsed["provenance"]["house"] == "ezra"
class TestSovereigntyTelemetry:
"""Test telemetry logging"""
def setup_method(self):
self.temp_dir = tempfile.mkdtemp()
self.telemetry = SovereigntyTelemetry(log_dir=Path(self.temp_dir))
def teardown_method(self):
shutil.rmtree(self.temp_dir)
def test_log_creation(self):
prov = Provenance(
house="timmy",
tool="test",
started_at="2026-03-30T20:00:00Z",
evidence_level="full",
confidence=0.9
)
result = ExecutionResult(
success=True,
data={},
provenance=prov,
execution_time_ms=100
)
self.telemetry.log_execution("timmy", "test", result)
# Verify log file exists
assert self.telemetry.telemetry_log.exists()
# Verify content
with open(self.telemetry.telemetry_log) as f:
entry = json.loads(f.readline())
assert entry["house"] == "timmy"
assert entry["tool"] == "test"
assert entry["evidence_level"] == "full"
def test_sovereignty_report(self):
# Log some entries
for i in range(5):
prov = Provenance(
house="ezra" if i % 2 == 0 else "bezalel",
tool=f"tool_{i}",
started_at="2026-03-30T20:00:00Z",
evidence_level="full",
confidence=0.8 + (i * 0.02)
)
result = ExecutionResult(
success=True,
data={},
provenance=prov,
execution_time_ms=100 + i
)
self.telemetry.log_execution(prov.house, prov.tool, result)
report = self.telemetry.get_sovereignty_report()
assert report["total_executions"] == 5
assert "ezra" in report["by_house"]
assert "bezalel" in report["by_house"]
assert report["avg_confidence"] > 0
class TestHarness:
"""Test UniWizardHarness"""
def test_harness_creation(self):
harness = UniWizardHarness("ezra")
assert harness.house == House.EZRA
assert harness.policy["must_read_before_write"] is True
def test_ezra_read_before_write(self):
"""Ezra must read git_status before git_commit"""
harness = UniWizardHarness("ezra")
# Try to commit without reading first
# Note: This would need actual git tool to fully test
# Here we test the policy check logic
evidence_level, confidence, sources = harness._check_evidence(
"git_commit",
{"repo_path": "/tmp/test"}
)
# git_commit would have evidence from params
assert evidence_level in ["full", "partial", "none"]
def test_bezalel_proof_verification(self):
"""Bezalel requires proof verification"""
harness = UniWizardHarness("bezalel")
# Test proof verification logic
assert harness._verify_proof("git_status", {"success": True}) is True
assert harness.policy["requires_proof"] is True
def test_timmy_review_generation(self):
"""Timmy can generate reviews"""
harness = UniWizardHarness("timmy")
# Create mock results
mock_results = {
"tool1": ExecutionResult(
success=True,
data={"result": "ok"},
provenance=Provenance(
house="ezra",
tool="tool1",
started_at="2026-03-30T20:00:00Z",
evidence_level="full",
confidence=0.9
),
execution_time_ms=100
),
"tool2": ExecutionResult(
success=True,
data={"result": "ok"},
provenance=Provenance(
house="bezalel",
tool="tool2",
started_at="2026-03-30T20:00:00Z",
evidence_level="full",
confidence=0.85
),
execution_time_ms=150
)
}
review = harness.review_for_timmy(mock_results)
assert review["house"] == "timmy"
assert review["summary"]["total"] == 2
assert review["summary"]["successful"] == 2
assert "recommendation" in review
class TestRouter:
"""Test HouseRouter"""
def test_task_classification(self):
router = HouseRouter()
# Read tasks
assert router.classify_task("git_status", {}) == TaskType.READ
assert router.classify_task("system_info", {}) == TaskType.READ
# Build tasks
assert router.classify_task("git_commit", {}) == TaskType.BUILD
# Test tasks
assert router.classify_task("health_check", {}) == TaskType.TEST
def test_routing_decisions(self):
router = HouseRouter()
# Read → Ezra
task_type = TaskType.READ
routing = router.ROUTING_TABLE[task_type]
assert routing["house"] == House.EZRA
# Build → Bezalel
task_type = TaskType.BUILD
routing = router.ROUTING_TABLE[task_type]
assert routing["house"] == House.BEZALEL
# Judge → Timmy
task_type = TaskType.JUDGE
routing = router.ROUTING_TABLE[task_type]
assert routing["house"] == House.TIMMY
def test_routing_stats(self):
router = HouseRouter()
# Simulate some routing
for _ in range(3):
router.route("git_status", repo_path="/tmp")
stats = router.get_routing_stats()
assert stats["total"] == 3
class TestIntegration:
"""Integration tests"""
def test_full_house_chain(self):
"""Test Ezra → Bezalel → Timmy chain"""
# Create harnesses
ezra = UniWizardHarness("ezra")
bezalel = UniWizardHarness("bezalel")
timmy = UniWizardHarness("timmy")
# Ezra reads
ezra_result = ExecutionResult(
success=True,
data={"analysis": "issue understood"},
provenance=Provenance(
house="ezra",
tool="read_issue",
started_at="2026-03-30T20:00:00Z",
evidence_level="full",
confidence=0.9,
sources_read=["issue:42"]
),
execution_time_ms=200
)
# Bezalel builds
bezalel_result = ExecutionResult(
success=True,
data={"proof": "tests pass"},
provenance=Provenance(
house="bezalel",
tool="implement",
started_at="2026-03-30T20:00:01Z",
evidence_level="full",
confidence=0.85
),
execution_time_ms=500
)
# Timmy reviews
review = timmy.review_for_timmy({
"ezra_analysis": ezra_result,
"bezalel_implementation": bezalel_result
})
assert "APPROVE" in review["recommendation"] or "REVIEW" in review["recommendation"]
def run_tests():
"""Run all tests"""
import inspect
test_classes = [
TestHousePolicy,
TestProvenance,
TestExecutionResult,
TestSovereigntyTelemetry,
TestHarness,
TestRouter,
TestIntegration
]
passed = 0
failed = 0
print("=" * 60)
print("UNI-WIZARD v2 TEST SUITE")
print("=" * 60)
for cls in test_classes:
print(f"\n📦 {cls.__name__}")
print("-" * 40)
instance = cls()
# Run setup if exists
if hasattr(instance, 'setup_method'):
instance.setup_method()
for name, method in inspect.getmembers(cls, predicate=inspect.isfunction):
if name.startswith('test_'):
try:
# Get fresh instance for each test
test_instance = cls()
if hasattr(test_instance, 'setup_method'):
test_instance.setup_method()
method(test_instance)
print(f"{name}")
passed += 1
if hasattr(test_instance, 'teardown_method'):
test_instance.teardown_method()
except Exception as e:
print(f"{name}: {e}")
failed += 1
# Run teardown if exists
if hasattr(instance, 'teardown_method'):
instance.teardown_method()
print("\n" + "=" * 60)
print(f"Results: {passed} passed, {failed} failed")
print("=" * 60)
return failed == 0
if __name__ == "__main__":
success = run_tests()
sys.exit(0 if success else 1)

View File

@@ -1,131 +0,0 @@
# Uni-Wizard v3 — Design Critique & Review
## Review of Existing Work
### 1. Timmy's model_tracker.py (v1)
**What's good:**
- Tracks local vs cloud usage
- Cost estimation
- SQLite persistence
- Ingests from Hermes session DB
**The gap:**
- **Data goes nowhere.** It logs but doesn't learn.
- No feedback loop into decision-making
- Sovereignty score is a vanity metric unless it changes behavior
- No pattern recognition on "which models succeed at which tasks"
**Verdict:** Good telemetry, zero intelligence. Missing: `telemetry → analysis → adaptation`.
---
### 2. Ezra's v2 Harness (Archivist)
**What's good:**
- `must_read_before_write` policy enforcement
- Evidence level tracking
- Source citation
**The gap:**
- **Policies are static.** Ezra doesn't learn which evidence sources are most reliable.
- No tracking of "I read source X, made decision Y, was I right?"
- No adaptive confidence calibration
**Verdict:** Good discipline, no learning. Missing: `outcome feedback → policy refinement`.
---
### 3. Bezalel's v2 Harness (Artificer)
**What's good:**
- `requires_proof` enforcement
- `test_before_ship` gate
- Proof verification
**The gap:**
- **No failure pattern analysis.** If tests fail 80% of the time on certain tools, Bezalel doesn't adapt.
- No "pre-flight check" based on historical failure modes
- No learning from which proof types catch most bugs
**Verdict:** Good rigor, no adaptation. Missing: `failure pattern → prevention`.
---
### 4. Hermes Harness Integration
**What's good:**
- Rich session data available
- Tool call tracking
- Model performance per task
**The gap:**
- **Shortest loop not utilized.** Hermes data exists but doesn't flow into Timmy's decision context.
- No real-time "last 10 similar tasks succeeded with model X"
- No context window optimization based on historical patterns
**Verdict:** Rich data, unused. Missing: `hermes_telemetry → timmy_context → smarter_routing`.
---
## The Core Problem
```
Current Flow (Open Loop):
┌─────────┐ ┌──────────┐ ┌─────────┐
│ Execute │───→│ Log Data │───→│ Report │───→ 🗑️
└─────────┘ └──────────┘ └─────────┘
Needed Flow (Closed Loop):
┌─────────┐ ┌──────────┐ ┌───────────┐
│ Execute │───→│ Log Data │───→│ Analyze │
└─────────┘ └──────────┘ └─────┬─────┘
▲ │
└───────────────────────────────┘
Adapt Policy / Route / Model
```
**The Focus:** Local sovereign Timmy must get **smarter, faster, and self-improving** by closing this loop.
---
## v3 Solution: The Intelligence Layer
### 1. Feedback Loop Architecture
Every execution feeds into:
- **Pattern DB**: Tool X with params Y → success rate Z%
- **Model Performance**: Task type T → best model M
- **House Calibration**: House H on task T → confidence adjustment
- **Predictive Cache**: Pre-fetch based on execution patterns
### 2. Adaptive Policies
Policies become functions of historical performance:
```python
# Instead of static:
evidence_threshold = 0.8
# Dynamic based on track record:
evidence_threshold = base_threshold * (1 + success_rate_adjustment)
```
### 3. Hermes Telemetry Integration
Real-time ingestion from Hermes session DB:
- Last N similar tasks
- Success rates by model
- Latency patterns
- Token efficiency
### 4. Self-Improvement Metrics
- **Prediction accuracy**: Did predicted success match actual?
- **Policy effectiveness**: Did policy change improve outcomes?
- **Learning velocity**: How fast is Timmy getting better?
---
## Design Principles for v3
1. **Every execution teaches** — No telemetry without analysis
2. **Local learning only** — Pattern recognition runs locally, no cloud
3. **Shortest feedback loop** — Hermes data → Timmy context in <100ms
4. **Transparent adaptation** — Timmy explains why he changed his policy
5. **Sovereignty-preserving** — Learning improves local decision-making, doesn't outsource it
---
*The goal: Timmy gets measurably better every day he runs.*

View File

@@ -1,327 +0,0 @@
# Uni-Wizard v3 — Self-Improving Local Sovereignty
> *"Every execution teaches. Every pattern informs. Timmy gets smarter every day he runs."*
## The v3 Breakthrough: Closed-Loop Intelligence
### The Problem with v1/v2
```
Previous Architectures (Open Loop):
┌─────────┐ ┌──────────┐ ┌─────────┐
│ Execute │───→│ Log Data │───→│ Report │───→ 🗑️ (data goes nowhere)
└─────────┘ └──────────┘ └─────────┘
v3 Architecture (Closed Loop):
┌─────────┐ ┌──────────┐ ┌───────────┐ ┌─────────┐
│ Execute │───→│ Log Data │───→│ Analyze │───→│ Adapt │
└─────────┘ └──────────┘ └─────┬─────┘ └────┬────┘
↑ │ │
└───────────────────────────────┴───────────────┘
Intelligence Engine
```
## Core Components
### 1. Intelligence Engine (`intelligence_engine.py`)
The brain that makes Timmy smarter:
- **Pattern Database**: SQLite store of all executions
- **Pattern Recognition**: Tool + params → success rate
- **Adaptive Policies**: Thresholds adjust based on performance
- **Prediction Engine**: Pre-execution success prediction
- **Learning Velocity**: Tracks improvement over time
```python
engine = IntelligenceEngine()
# Predict before executing
prob, reason = engine.predict_success("git_status", "ezra")
print(f"Predicted success: {prob:.0%}{reason}")
# Get optimal routing
house, confidence = engine.get_optimal_house("deploy")
print(f"Best house: {house} (confidence: {confidence:.0%})")
```
### 2. Adaptive Harness (`harness.py`)
Harness v3 with intelligence integration:
```python
# Create harness with learning enabled
harness = UniWizardHarness("timmy", enable_learning=True)
# Execute with predictions
result = harness.execute("git_status", repo_path="/tmp")
print(f"Predicted: {result.provenance.prediction:.0%}")
print(f"Actual: {'' if result.success else ''}")
# Trigger learning
harness.learn_from_batch()
```
### 3. Hermes Bridge (`hermes_bridge.py`)
**Shortest Loop Integration**: Hermes telemetry → Timmy intelligence in <100ms
```python
# Start real-time streaming
integrator = ShortestLoopIntegrator(intelligence_engine)
integrator.start()
# All Hermes sessions now feed into Timmy's intelligence
```
## Key Features
### 1. Self-Improving Policies
Policies adapt based on actual performance:
```python
# If Ezra's success rate drops below 60%
# → Lower evidence threshold automatically
# If Bezalel's tests pass consistently
# → Raise proof requirements (we can be stricter)
```
### 2. Predictive Execution
Predict success before executing:
```python
prediction, reasoning = harness.predict_execution("deploy", params)
# Returns: (0.85, "Based on 23 similar executions: good track record")
```
### 3. Pattern Recognition
```python
# Find patterns in execution history
pattern = engine.db.get_pattern("git_status", "ezra")
print(f"Success rate: {pattern.success_rate:.0%}")
print(f"Avg latency: {pattern.avg_latency_ms}ms")
print(f"Sample count: {pattern.sample_count}")
```
### 4. Model Performance Tracking
```python
# Find best model for task type
best_model = engine.db.get_best_model("read", min_samples=10)
# Returns: "hermes3:8b" (if it has best success rate)
```
### 5. Learning Velocity
```python
report = engine.get_intelligence_report()
velocity = report['learning_velocity']
print(f"Improvement: {velocity['improvement']:+.1%}")
print(f"Status: {velocity['velocity']}") # accelerating/stable/declining
```
## Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ UNI-WIZARD v3 ARCHITECTURE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ INTELLIGENCE ENGINE │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Pattern │ │ Adaptive │ │ Prediction │ │ │
│ │ │ Database │ │ Policies │ │ Engine │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └──────────────────────────┬───────────────────────────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ TIMMY │ │ EZRA │ │ BEZALEL │ │
│ │ Harness │ │ Harness │ │ Harness │ │
│ │ (Sovereign)│ │ (Adaptive) │ │ (Adaptive) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ │ │
│ ┌──────────────────────────▼──────────────────────────┐ │
│ │ HERMES BRIDGE (Shortest Loop) │ │
│ │ Hermes Session DB → Real-time Stream Processor │ │
│ └──────────────────────────┬──────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼──────────────────────────┐ │
│ │ HERMES HARNESS │ │
│ │ (Source of telemetry) │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
## Usage
### Quick Start
```python
from v3.harness import get_harness
from v3.intelligence_engine import IntelligenceEngine
# Create shared intelligence
intel = IntelligenceEngine()
# Create harnesses
timmy = get_harness("timmy", intelligence=intel)
ezra = get_harness("ezra", intelligence=intel)
# Execute (automatically recorded)
result = ezra.execute("git_status", repo_path="/tmp")
# Check what we learned
pattern = intel.db.get_pattern("git_status", "ezra")
print(f"Learned: {pattern.success_rate:.0%} success rate")
```
### With Hermes Integration
```python
from v3.hermes_bridge import ShortestLoopIntegrator
# Connect to Hermes
integrator = ShortestLoopIntegrator(intel)
integrator.start()
# Now all Hermes executions teach Timmy
```
### Adaptive Learning
```python
# After many executions
timmy.learn_from_batch()
# Policies have adapted
print(f"Ezra's evidence threshold: {ezra.policy.get('evidence_threshold')}")
# May have changed from default 0.8 based on performance
```
## Performance Metrics
### Intelligence Report
```python
report = intel.get_intelligence_report()
{
"timestamp": "2026-03-30T20:00:00Z",
"house_performance": {
"ezra": {"success_rate": 0.85, "avg_latency_ms": 120},
"bezalel": {"success_rate": 0.78, "avg_latency_ms": 200}
},
"learning_velocity": {
"velocity": "accelerating",
"improvement": +0.05
},
"recent_adaptations": [
{
"change_type": "policy.ezra.evidence_threshold",
"old_value": 0.8,
"new_value": 0.75,
"reason": "Ezra success rate 55% below threshold"
}
]
}
```
### Prediction Accuracy
```python
# How good are our predictions?
accuracy = intel._calculate_prediction_accuracy()
print(f"Prediction accuracy: {accuracy:.0%}")
```
## File Structure
```
uni-wizard/v3/
├── README.md # This document
├── CRITIQUE.md # Review of v1/v2 gaps
├── intelligence_engine.py # Pattern DB + learning (24KB)
├── harness.py # Adaptive harness (18KB)
├── hermes_bridge.py # Shortest loop bridge (14KB)
└── tests/
└── test_v3.py # Comprehensive tests
```
## Comparison
| Feature | v1 | v2 | v3 |
|---------|-----|-----|-----|
| Telemetry | Basic logging | Provenance tracking | **Pattern recognition** |
| Policies | Static | Static | **Adaptive** |
| Learning | None | None | **Continuous** |
| Predictions | None | None | **Pre-execution** |
| Hermes Integration | Manual | Manual | **Real-time stream** |
| Policy Adaptation | No | No | **Auto-adjust** |
| Self-Improvement | No | No | **Yes** |
## The Self-Improvement Loop
```
┌──────────────────────────────────────────────────────────┐
│ SELF-IMPROVEMENT CYCLE │
└──────────────────────────────────────────────────────────┘
1. EXECUTE
└── Run tool with house policy
2. RECORD
└── Store outcome in Pattern Database
3. ANALYZE (every N executions)
└── Check house performance
└── Identify patterns
└── Detect underperformance
4. ADAPT
└── Adjust policy thresholds
└── Update routing preferences
└── Record adaptation
5. PREDICT (next execution)
└── Query pattern for tool/house
└── Return predicted success rate
6. EXECUTE (with new policy)
└── Apply adapted threshold
└── Use prediction for confidence
7. MEASURE
└── Did adaptation help?
└── Update learning velocity
←─ Repeat ─┘
```
## Design Principles
1. **Every execution teaches** — No telemetry without analysis
2. **Local learning only** — Pattern recognition runs on-device
3. **Shortest feedback loop** — Hermes → Intelligence <100ms
4. **Transparent adaptation** — Timmy explains policy changes
5. **Sovereignty-preserving** — Learning improves local decisions
## Future Work
- [ ] Fine-tune local models based on telemetry
- [ ] Predictive caching (pre-fetch likely tools)
- [ ] Anomaly detection (detect unusual failures)
- [ ] Cross-session pattern learning
- [ ] Automated A/B testing of policies
---
*Timmy gets smarter every day he runs.*

View File

@@ -1,507 +0,0 @@
#!/usr/bin/env python3
"""
Uni-Wizard Harness v3 — Self-Improving Sovereign Intelligence
Integrates:
- Intelligence Engine: Pattern recognition, adaptation, prediction
- Hermes Telemetry: Shortest-loop feedback from session data
- Adaptive Policies: Houses learn from outcomes
- Predictive Routing: Pre-execution optimization
Key improvement over v2:
Telemetry → Analysis → Behavior Change (closed loop)
"""
import json
import sys
import time
import hashlib
from typing import Dict, Any, Optional, List, Tuple
from pathlib import Path
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
# Add parent to path
sys.path.insert(0, str(Path(__file__).parent))
from intelligence_engine import (
IntelligenceEngine, PatternDatabase,
ExecutionPattern, AdaptationEvent
)
class House(Enum):
"""The three canonical wizard houses"""
TIMMY = "timmy" # Sovereign local conscience
EZRA = "ezra" # Archivist, reader, pattern-recognizer
BEZALEL = "bezalel" # Artificer, builder, proof-maker
@dataclass
class Provenance:
"""Trail of evidence for every action"""
house: str
tool: str
started_at: str
completed_at: Optional[str] = None
input_hash: Optional[str] = None
output_hash: Optional[str] = None
sources_read: List[str] = None
evidence_level: str = "none"
confidence: float = 0.0
prediction: float = 0.0 # v3: predicted success rate
prediction_reasoning: str = "" # v3: why we predicted this
def to_dict(self):
return asdict(self)
@dataclass
class ExecutionResult:
"""Result with full provenance and intelligence"""
success: bool
data: Any
provenance: Provenance
error: Optional[str] = None
execution_time_ms: float = 0.0
intelligence_applied: Dict = None # v3: what intelligence was used
def to_json(self) -> str:
return json.dumps({
'success': self.success,
'data': self.data,
'provenance': self.provenance.to_dict(),
'error': self.error,
'execution_time_ms': self.execution_time_ms,
'intelligence_applied': self.intelligence_applied
}, indent=2)
class AdaptivePolicy:
"""
v3: Policies that adapt based on performance data.
Instead of static thresholds, we adjust based on:
- Historical success rates
- Recent performance trends
- Prediction accuracy
"""
BASE_POLICIES = {
House.TIMMY: {
"evidence_threshold": 0.7,
"can_override": True,
"telemetry": True,
"auto_adapt": True,
"motto": "Sovereignty and service always"
},
House.EZRA: {
"evidence_threshold": 0.8,
"must_read_before_write": True,
"citation_required": True,
"auto_adapt": True,
"motto": "Read the pattern. Name the truth. Return a clean artifact."
},
House.BEZALEL: {
"evidence_threshold": 0.6,
"requires_proof": True,
"test_before_ship": True,
"auto_adapt": True,
"parallelize_threshold": 0.5,
"motto": "Build the pattern. Prove the result. Return the tool."
}
}
def __init__(self, house: House, intelligence: IntelligenceEngine):
self.house = house
self.intelligence = intelligence
self.policy = self._load_policy()
self.adaptation_count = 0
def _load_policy(self) -> Dict:
"""Load policy, potentially adapted from base"""
base = self.BASE_POLICIES[self.house].copy()
# Check if intelligence engine has adapted this policy
recent_adaptations = self.intelligence.db.get_adaptations(limit=50)
for adapt in recent_adaptations:
if f"policy.{self.house.value}." in adapt.change_type:
# Apply the adaptation
policy_key = adapt.change_type.split(".")[-1]
if policy_key in base:
base[policy_key] = adapt.new_value
self.adaptation_count += 1
return base
def get(self, key: str, default=None):
"""Get policy value"""
return self.policy.get(key, default)
def adapt(self, trigger: str, reason: str):
"""
Adapt policy based on trigger.
Called when intelligence engine detects performance patterns.
"""
if not self.policy.get("auto_adapt", False):
return None
# Get house performance
perf = self.intelligence.db.get_house_performance(
self.house.value, days=3
)
success_rate = perf.get("success_rate", 0.5)
old_values = {}
new_values = {}
# Adapt evidence threshold based on performance
if success_rate < 0.6 and self.policy.get("evidence_threshold", 0.8) > 0.6:
old_val = self.policy["evidence_threshold"]
new_val = old_val - 0.05
self.policy["evidence_threshold"] = new_val
old_values["evidence_threshold"] = old_val
new_values["evidence_threshold"] = new_val
# If we're doing well, we can be more demanding
elif success_rate > 0.9 and self.policy.get("evidence_threshold", 0.8) < 0.9:
old_val = self.policy["evidence_threshold"]
new_val = min(0.95, old_val + 0.02)
self.policy["evidence_threshold"] = new_val
old_values["evidence_threshold"] = old_val
new_values["evidence_threshold"] = new_val
if old_values:
adapt = AdaptationEvent(
timestamp=datetime.utcnow().isoformat(),
trigger=trigger,
change_type=f"policy.{self.house.value}.multi",
old_value=old_values,
new_value=new_values,
reason=reason,
expected_improvement=0.05 if success_rate < 0.6 else 0.02
)
self.intelligence.db.record_adaptation(adapt)
self.adaptation_count += 1
return adapt
return None
class UniWizardHarness:
"""
The Self-Improving Uni-Wizard Harness.
Key v3 features:
1. Intelligence integration for predictions
2. Adaptive policies that learn
3. Hermes telemetry ingestion
4. Pre-execution optimization
5. Post-execution learning
"""
def __init__(self, house: str = "timmy",
intelligence: IntelligenceEngine = None,
enable_learning: bool = True):
self.house = House(house)
self.intelligence = intelligence or IntelligenceEngine()
self.policy = AdaptivePolicy(self.house, self.intelligence)
self.history: List[ExecutionResult] = []
self.enable_learning = enable_learning
# Performance tracking
self.execution_count = 0
self.success_count = 0
self.total_latency_ms = 0
def _hash_content(self, content: str) -> str:
"""Create content hash for provenance"""
return hashlib.sha256(content.encode()).hexdigest()[:16]
def _check_evidence(self, tool_name: str, params: Dict) -> tuple:
"""
Check evidence level with intelligence augmentation.
v3: Uses pattern database to check historical evidence reliability.
"""
sources = []
# Get pattern for this tool/house combo
pattern = self.intelligence.db.get_pattern(tool_name, self.house.value, params)
# Adjust confidence based on historical performance
base_confidence = 0.5
if pattern:
base_confidence = pattern.success_rate
sources.append(f"pattern:{pattern.sample_count}samples")
# Tool-specific logic
if tool_name.startswith("git_"):
repo_path = params.get("repo_path", ".")
sources.append(f"repo:{repo_path}")
return ("full", min(0.95, base_confidence + 0.2), sources)
if tool_name.startswith("system_") or tool_name.startswith("service_"):
sources.append("system:live")
return ("full", min(0.98, base_confidence + 0.3), sources)
if tool_name.startswith("http_") or tool_name.startswith("gitea_"):
sources.append("network:external")
return ("partial", base_confidence * 0.8, sources)
return ("none", base_confidence, sources)
def predict_execution(self, tool_name: str, params: Dict) -> Tuple[float, str]:
"""
v3: Predict success before executing.
Returns: (probability, reasoning)
"""
return self.intelligence.predict_success(
tool_name, self.house.value, params
)
def execute(self, tool_name: str, **params) -> ExecutionResult:
"""
Execute with full intelligence integration.
Flow:
1. Predict success (intelligence)
2. Check evidence (with pattern awareness)
3. Adapt policy if needed
4. Execute
5. Record outcome
6. Update intelligence
"""
start_time = time.time()
started_at = datetime.utcnow().isoformat()
# 1. Pre-execution prediction
prediction, pred_reason = self.predict_execution(tool_name, params)
# 2. Evidence check with pattern awareness
evidence_level, base_confidence, sources = self._check_evidence(
tool_name, params
)
# Adjust confidence by prediction
confidence = (base_confidence + prediction) / 2
# 3. Policy check
if self.house == House.EZRA and self.policy.get("must_read_before_write"):
if tool_name == "git_commit" and "git_status" not in [
h.provenance.tool for h in self.history[-5:]
]:
return ExecutionResult(
success=False,
data=None,
provenance=Provenance(
house=self.house.value,
tool=tool_name,
started_at=started_at,
prediction=prediction,
prediction_reasoning=pred_reason
),
error="Ezra policy: Must read git_status before git_commit",
execution_time_ms=0,
intelligence_applied={"policy_enforced": "must_read_before_write"}
)
# 4. Execute (mock for now - would call actual tool)
try:
# Simulate execution
time.sleep(0.001) # Minimal delay
# Determine success based on prediction + noise
import random
actual_success = random.random() < prediction
result_data = {"status": "success" if actual_success else "failed"}
error = None
except Exception as e:
actual_success = False
error = str(e)
result_data = None
execution_time_ms = (time.time() - start_time) * 1000
completed_at = datetime.utcnow().isoformat()
# 5. Build provenance
input_hash = self._hash_content(json.dumps(params, sort_keys=True))
output_hash = self._hash_content(json.dumps(result_data, default=str)) if result_data else None
provenance = Provenance(
house=self.house.value,
tool=tool_name,
started_at=started_at,
completed_at=completed_at,
input_hash=input_hash,
output_hash=output_hash,
sources_read=sources,
evidence_level=evidence_level,
confidence=confidence if actual_success else 0.0,
prediction=prediction,
prediction_reasoning=pred_reason
)
result = ExecutionResult(
success=actual_success,
data=result_data,
provenance=provenance,
error=error,
execution_time_ms=execution_time_ms,
intelligence_applied={
"predicted_success": prediction,
"pattern_used": sources[0] if sources else None,
"policy_adaptations": self.policy.adaptation_count
}
)
# 6. Record for learning
self.history.append(result)
self.execution_count += 1
if actual_success:
self.success_count += 1
self.total_latency_ms += execution_time_ms
# 7. Feed into intelligence engine
if self.enable_learning:
self.intelligence.db.record_execution({
"tool": tool_name,
"house": self.house.value,
"params": params,
"success": actual_success,
"latency_ms": execution_time_ms,
"confidence": confidence,
"prediction": prediction
})
return result
def learn_from_batch(self, min_executions: int = 10):
"""
v3: Trigger learning from accumulated executions.
Adapts policies based on patterns.
"""
if self.execution_count < min_executions:
return {"status": "insufficient_data", "count": self.execution_count}
# Trigger policy adaptation
adapt = self.policy.adapt(
trigger=f"batch_learn_{self.execution_count}",
reason=f"Adapting after {self.execution_count} executions"
)
# Run intelligence analysis
adaptations = self.intelligence.analyze_and_adapt()
return {
"status": "adapted",
"policy_adaptation": adapt.to_dict() if adapt else None,
"intelligence_adaptations": [a.to_dict() for a in adaptations],
"current_success_rate": self.success_count / self.execution_count
}
def get_performance_summary(self) -> Dict:
"""Get performance summary with intelligence"""
success_rate = (self.success_count / self.execution_count) if self.execution_count > 0 else 0
avg_latency = (self.total_latency_ms / self.execution_count) if self.execution_count > 0 else 0
return {
"house": self.house.value,
"executions": self.execution_count,
"successes": self.success_count,
"success_rate": success_rate,
"avg_latency_ms": avg_latency,
"policy_adaptations": self.policy.adaptation_count,
"predictions_made": len([h for h in self.history if h.provenance.prediction > 0]),
"learning_enabled": self.enable_learning
}
def ingest_hermes_session(self, session_path: Path):
"""
v3: Ingest Hermes session data for shortest-loop learning.
This is the key integration - Hermes telemetry directly into
Timmy's intelligence.
"""
if not session_path.exists():
return {"error": "Session file not found"}
with open(session_path) as f:
session_data = json.load(f)
count = self.intelligence.ingest_hermes_session(session_data)
return {
"status": "ingested",
"executions_recorded": count,
"session_id": session_data.get("session_id", "unknown")
}
def get_harness(house: str = "timmy",
intelligence: IntelligenceEngine = None,
enable_learning: bool = True) -> UniWizardHarness:
"""Factory function"""
return UniWizardHarness(
house=house,
intelligence=intelligence,
enable_learning=enable_learning
)
if __name__ == "__main__":
print("=" * 60)
print("UNI-WIZARD v3 — Self-Improving Harness Demo")
print("=" * 60)
# Create shared intelligence engine
intel = IntelligenceEngine()
# Create harnesses with shared intelligence
timmy = get_harness("timmy", intel)
ezra = get_harness("ezra", intel)
bezalel = get_harness("bezalel", intel)
# Simulate executions with learning
print("\n🎓 Training Phase (20 executions)...")
for i in range(20):
# Mix of houses and tools
if i % 3 == 0:
result = timmy.execute("system_info")
elif i % 3 == 1:
result = ezra.execute("git_status", repo_path="/tmp")
else:
result = bezalel.execute("run_tests")
print(f" {i+1}. {result.provenance.house}/{result.provenance.tool}: "
f"{'' if result.success else ''} "
f"(predicted: {result.provenance.prediction:.0%})")
# Trigger learning
print("\n🔄 Learning Phase...")
timmy_learn = timmy.learn_from_batch()
ezra_learn = ezra.learn_from_batch()
print(f" Timmy adaptations: {timmy_learn.get('intelligence_adaptations', [])}")
print(f" Ezra adaptations: {ezra_learn.get('policy_adaptation')}")
# Show performance
print("\n📊 Performance Summary:")
for harness, name in [(timmy, "Timmy"), (ezra, "Ezra"), (bezalel, "Bezalel")]:
perf = harness.get_performance_summary()
print(f" {name}: {perf['success_rate']:.0%} success rate, "
f"{perf['policy_adaptations']} adaptations")
# Show intelligence report
print("\n🧠 Intelligence Report:")
report = intel.get_intelligence_report()
print(f" Learning velocity: {report['learning_velocity']['velocity']}")
print(f" Recent adaptations: {len(report['recent_adaptations'])}")
print("\n" + "=" * 60)

View File

@@ -1,393 +0,0 @@
#!/usr/bin/env python3
"""
Hermes Telemetry Bridge v3 — Shortest Loop Integration
Streams telemetry from Hermes harness directly into Timmy's intelligence.
Design principle: Hermes session data → Timmy context in <100ms
"""
import json
import sqlite3
import time
from pathlib import Path
from typing import Dict, List, Optional, Generator
from dataclasses import dataclass
from datetime import datetime
import threading
import queue
@dataclass
class HermesSessionEvent:
"""Normalized event from Hermes session"""
session_id: str
timestamp: float
event_type: str # tool_call, message, completion
tool_name: Optional[str]
success: Optional[bool]
latency_ms: float
model: str
provider: str
token_count: int
error: Optional[str]
def to_dict(self):
return {
"session_id": self.session_id,
"timestamp": self.timestamp,
"event_type": self.event_type,
"tool_name": self.tool_name,
"success": self.success,
"latency_ms": self.latency_ms,
"model": self.model,
"provider": self.provider,
"token_count": self.token_count,
"error": self.error
}
class HermesStateReader:
"""
Reads from Hermes state database.
Hermes stores sessions in ~/.hermes/state.db
Schema: sessions(id, session_id, model, source, started_at, messages, tool_calls)
"""
def __init__(self, db_path: Path = None):
self.db_path = db_path or Path.home() / ".hermes" / "state.db"
self.last_read_id = 0
def is_available(self) -> bool:
"""Check if Hermes database is accessible"""
return self.db_path.exists()
def get_recent_sessions(self, limit: int = 10) -> List[Dict]:
"""Get recent sessions from Hermes"""
if not self.is_available():
return []
try:
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT id, session_id, model, source, started_at,
message_count, tool_call_count
FROM sessions
ORDER BY started_at DESC
LIMIT ?
""", (limit,)).fetchall()
conn.close()
return [dict(row) for row in rows]
except Exception as e:
print(f"Error reading Hermes state: {e}")
return []
def get_session_details(self, session_id: str) -> Optional[Dict]:
"""Get full session details including messages"""
if not self.is_available():
return None
try:
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
# Get session
session = conn.execute("""
SELECT * FROM sessions WHERE session_id = ?
""", (session_id,)).fetchone()
if not session:
conn.close()
return None
# Get messages
messages = conn.execute("""
SELECT * FROM messages WHERE session_id = ?
ORDER BY timestamp
""", (session_id,)).fetchall()
# Get tool calls
tool_calls = conn.execute("""
SELECT * FROM tool_calls WHERE session_id = ?
ORDER BY timestamp
""", (session_id,)).fetchall()
conn.close()
return {
"session": dict(session),
"messages": [dict(m) for m in messages],
"tool_calls": [dict(t) for t in tool_calls]
}
except Exception as e:
print(f"Error reading session details: {e}")
return None
def stream_new_events(self, poll_interval: float = 1.0) -> Generator[HermesSessionEvent, None, None]:
"""
Stream new events from Hermes as they occur.
This is the SHORTEST LOOP - real-time telemetry ingestion.
"""
while True:
if not self.is_available():
time.sleep(poll_interval)
continue
try:
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
# Get new tool calls since last read
rows = conn.execute("""
SELECT tc.*, s.model, s.source
FROM tool_calls tc
JOIN sessions s ON tc.session_id = s.session_id
WHERE tc.id > ?
ORDER BY tc.id
""", (self.last_read_id,)).fetchall()
for row in rows:
row_dict = dict(row)
self.last_read_id = max(self.last_read_id, row_dict.get("id", 0))
yield HermesSessionEvent(
session_id=row_dict.get("session_id", "unknown"),
timestamp=row_dict.get("timestamp", time.time()),
event_type="tool_call",
tool_name=row_dict.get("tool_name"),
success=row_dict.get("error") is None,
latency_ms=row_dict.get("execution_time_ms", 0),
model=row_dict.get("model", "unknown"),
provider=row_dict.get("source", "unknown"),
token_count=row_dict.get("token_count", 0),
error=row_dict.get("error")
)
conn.close()
except Exception as e:
print(f"Error streaming events: {e}")
time.sleep(poll_interval)
class TelemetryStreamProcessor:
"""
Processes Hermes telemetry stream into Timmy's intelligence.
Converts Hermes events into intelligence engine records.
"""
def __init__(self, intelligence_engine):
self.intelligence = intelligence_engine
self.event_queue = queue.Queue()
self.processing_thread = None
self.running = False
# Metrics
self.events_processed = 0
self.events_dropped = 0
self.avg_processing_time_ms = 0
def start(self, hermes_reader: HermesStateReader):
"""Start processing stream in background"""
self.running = True
self.processing_thread = threading.Thread(
target=self._process_stream,
args=(hermes_reader,),
daemon=True
)
self.processing_thread.start()
print(f"Telemetry processor started (PID: {self.processing_thread.ident})")
def stop(self):
"""Stop processing"""
self.running = False
if self.processing_thread:
self.processing_thread.join(timeout=5)
def _process_stream(self, hermes_reader: HermesStateReader):
"""Background thread: consume Hermes events"""
for event in hermes_reader.stream_new_events(poll_interval=1.0):
if not self.running:
break
start = time.time()
try:
# Convert to intelligence record
record = self._convert_event(event)
# Record in intelligence database
self.intelligence.db.record_execution(record)
self.events_processed += 1
# Update avg processing time
proc_time = (time.time() - start) * 1000
self.avg_processing_time_ms = (
(self.avg_processing_time_ms * (self.events_processed - 1) + proc_time)
/ self.events_processed
)
except Exception as e:
self.events_dropped += 1
print(f"Error processing event: {e}")
def _convert_event(self, event: HermesSessionEvent) -> Dict:
"""Convert Hermes event to intelligence record"""
# Map Hermes tool to uni-wizard tool
tool_mapping = {
"terminal": "system_shell",
"file_read": "file_read",
"file_write": "file_write",
"search_files": "file_search",
"web_search": "web_search",
"delegate_task": "delegate",
"execute_code": "code_execute"
}
tool = tool_mapping.get(event.tool_name, event.tool_name or "unknown")
# Determine house based on context
# In real implementation, this would come from session metadata
house = "timmy" # Default
if "ezra" in event.session_id.lower():
house = "ezra"
elif "bezalel" in event.session_id.lower():
house = "bezalel"
return {
"tool": tool,
"house": house,
"model": event.model,
"task_type": self._infer_task_type(tool),
"success": event.success,
"latency_ms": event.latency_ms,
"confidence": 0.8 if event.success else 0.2,
"tokens_in": event.token_count,
"error_type": "execution_error" if event.error else None
}
def _infer_task_type(self, tool: str) -> str:
"""Infer task type from tool name"""
if any(kw in tool for kw in ["read", "get", "list", "status", "info"]):
return "read"
if any(kw in tool for kw in ["write", "create", "commit", "push"]):
return "build"
if any(kw in tool for kw in ["test", "check", "verify"]):
return "test"
if any(kw in tool for kw in ["search", "analyze"]):
return "synthesize"
return "general"
def get_stats(self) -> Dict:
"""Get processing statistics"""
return {
"events_processed": self.events_processed,
"events_dropped": self.events_dropped,
"avg_processing_time_ms": round(self.avg_processing_time_ms, 2),
"queue_depth": self.event_queue.qsize(),
"running": self.running
}
class ShortestLoopIntegrator:
"""
One-stop integration: Connect Hermes → Timmy Intelligence
Usage:
integrator = ShortestLoopIntegrator(intelligence_engine)
integrator.start()
# Now all Hermes telemetry flows into Timmy's intelligence
"""
def __init__(self, intelligence_engine, hermes_db_path: Path = None):
self.intelligence = intelligence_engine
self.hermes_reader = HermesStateReader(hermes_db_path)
self.processor = TelemetryStreamProcessor(intelligence_engine)
def start(self):
"""Start the shortest-loop integration"""
if not self.hermes_reader.is_available():
print("⚠️ Hermes database not found. Shortest loop disabled.")
return False
self.processor.start(self.hermes_reader)
print("✅ Shortest loop active: Hermes → Timmy Intelligence")
return True
def stop(self):
"""Stop the integration"""
self.processor.stop()
print("⏹️ Shortest loop stopped")
def get_status(self) -> Dict:
"""Get integration status"""
return {
"hermes_available": self.hermes_reader.is_available(),
"stream_active": self.processor.running,
"processor_stats": self.processor.get_stats()
}
def sync_historical(self, days: int = 7) -> Dict:
"""
One-time sync of historical Hermes data.
Use this to bootstrap intelligence with past data.
"""
if not self.hermes_reader.is_available():
return {"error": "Hermes not available"}
sessions = self.hermes_reader.get_recent_sessions(limit=1000)
synced = 0
for session in sessions:
session_id = session.get("session_id")
details = self.hermes_reader.get_session_details(session_id)
if details:
count = self.intelligence.ingest_hermes_session({
"session_id": session_id,
"model": session.get("model"),
"messages": details.get("messages", []),
"started_at": session.get("started_at")
})
synced += count
return {
"sessions_synced": len(sessions),
"executions_synced": synced
}
if __name__ == "__main__":
print("=" * 60)
print("HERMES BRIDGE v3 — Shortest Loop Demo")
print("=" * 60)
# Check Hermes availability
reader = HermesStateReader()
print(f"\n🔍 Hermes Status:")
print(f" Database: {reader.db_path}")
print(f" Available: {reader.is_available()}")
if reader.is_available():
sessions = reader.get_recent_sessions(limit=5)
print(f"\n📊 Recent Sessions:")
for s in sessions:
print(f" - {s.get('session_id', 'unknown')[:16]}... "
f"({s.get('model', 'unknown')}) "
f"{s.get('tool_call_count', 0)} tools")
print("\n" + "=" * 60)

View File

@@ -1,679 +0,0 @@
#!/usr/bin/env python3
"""
Intelligence Engine v3 — Self-Improving Local Sovereignty
The feedback loop that makes Timmy smarter:
1. INGEST: Pull telemetry from Hermes, houses, all sources
2. ANALYZE: Pattern recognition on success/failure/latency
3. ADAPT: Adjust policies, routing, predictions
4. PREDICT: Pre-fetch, pre-route, optimize before execution
Key principle: Every execution teaches. Every pattern informs next decision.
"""
import json
import sqlite3
import time
import hashlib
from typing import Dict, List, Any, Optional, Tuple
from pathlib import Path
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from collections import defaultdict
import statistics
@dataclass
class ExecutionPattern:
"""Pattern extracted from execution history"""
tool: str
param_signature: str # hashed params pattern
house: str
model: str # which model was used
success_rate: float
avg_latency_ms: float
avg_confidence: float
sample_count: int
last_executed: str
def to_dict(self):
return asdict(self)
@dataclass
class ModelPerformance:
"""Performance metrics for a model on task types"""
model: str
task_type: str
total_calls: int
success_count: int
success_rate: float
avg_latency_ms: float
avg_tokens: float
cost_per_call: float
last_used: str
@dataclass
class AdaptationEvent:
"""Record of a policy/system adaptation"""
timestamp: str
trigger: str # what caused the adaptation
change_type: str # policy, routing, cache, etc
old_value: Any
new_value: Any
reason: str
expected_improvement: float
class PatternDatabase:
"""
Local SQLite database for execution patterns.
Tracks:
- Tool + params → success rate
- House + task → performance
- Model + task type → best choice
- Time-based patterns (hour of day effects)
"""
def __init__(self, db_path: Path = None):
self.db_path = db_path or Path.home() / ".timmy" / "intelligence.db"
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _init_db(self):
"""Initialize database with performance tracking tables"""
conn = sqlite3.connect(str(self.db_path))
# Execution outcomes with full context
conn.execute("""
CREATE TABLE IF NOT EXISTS executions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
tool TEXT NOT NULL,
param_hash TEXT NOT NULL,
house TEXT NOT NULL,
model TEXT,
task_type TEXT,
success INTEGER NOT NULL,
latency_ms REAL,
confidence REAL,
tokens_in INTEGER,
tokens_out INTEGER,
error_type TEXT,
hour_of_day INTEGER,
day_of_week INTEGER
)
""")
# Aggregated patterns (updated continuously)
conn.execute("""
CREATE TABLE IF NOT EXISTS patterns (
tool TEXT NOT NULL,
param_signature TEXT NOT NULL,
house TEXT NOT NULL,
model TEXT,
success_count INTEGER DEFAULT 0,
failure_count INTEGER DEFAULT 0,
total_latency_ms REAL DEFAULT 0,
total_confidence REAL DEFAULT 0,
sample_count INTEGER DEFAULT 0,
last_updated REAL,
PRIMARY KEY (tool, param_signature, house, model)
)
""")
# Model performance by task type
conn.execute("""
CREATE TABLE IF NOT EXISTS model_performance (
model TEXT NOT NULL,
task_type TEXT NOT NULL,
total_calls INTEGER DEFAULT 0,
success_count INTEGER DEFAULT 0,
total_latency_ms REAL DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
last_used REAL,
PRIMARY KEY (model, task_type)
)
""")
# Adaptation history (how we've changed)
conn.execute("""
CREATE TABLE IF NOT EXISTS adaptations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
trigger TEXT NOT NULL,
change_type TEXT NOT NULL,
old_value TEXT,
new_value TEXT,
reason TEXT,
expected_improvement REAL
)
""")
# Performance predictions (for validation)
conn.execute("""
CREATE TABLE IF NOT EXISTS predictions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
tool TEXT NOT NULL,
house TEXT NOT NULL,
predicted_success_rate REAL,
actual_success INTEGER,
prediction_accuracy REAL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_exec_tool ON executions(tool)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_exec_time ON executions(timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_patterns_tool ON patterns(tool)")
conn.commit()
conn.close()
def record_execution(self, data: Dict):
"""Record a single execution outcome"""
conn = sqlite3.connect(str(self.db_path))
now = time.time()
dt = datetime.fromtimestamp(now)
# Extract fields
tool = data.get("tool", "unknown")
params = data.get("params", {})
param_hash = hashlib.sha256(
json.dumps(params, sort_keys=True).encode()
).hexdigest()[:16]
conn.execute("""
INSERT INTO executions
(timestamp, tool, param_hash, house, model, task_type, success,
latency_ms, confidence, tokens_in, tokens_out, error_type,
hour_of_day, day_of_week)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
now, tool, param_hash, data.get("house", "timmy"),
data.get("model"), data.get("task_type"),
1 if data.get("success") else 0,
data.get("latency_ms"), data.get("confidence"),
data.get("tokens_in"), data.get("tokens_out"),
data.get("error_type"),
dt.hour, dt.weekday()
))
# Update aggregated patterns
self._update_pattern(conn, tool, param_hash, data)
# Update model performance
if data.get("model"):
self._update_model_performance(conn, data)
conn.commit()
conn.close()
def _update_pattern(self, conn: sqlite3.Connection, tool: str,
param_hash: str, data: Dict):
"""Update aggregated pattern for this tool/params/house/model combo"""
house = data.get("house", "timmy")
model = data.get("model", "unknown")
success = 1 if data.get("success") else 0
latency = data.get("latency_ms", 0)
confidence = data.get("confidence", 0)
# Try to update existing
result = conn.execute("""
SELECT success_count, failure_count, total_latency_ms,
total_confidence, sample_count
FROM patterns
WHERE tool=? AND param_signature=? AND house=? AND model=?
""", (tool, param_hash, house, model)).fetchone()
if result:
succ, fail, total_lat, total_conf, samples = result
conn.execute("""
UPDATE patterns SET
success_count = ?,
failure_count = ?,
total_latency_ms = ?,
total_confidence = ?,
sample_count = ?,
last_updated = ?
WHERE tool=? AND param_signature=? AND house=? AND model=?
""", (
succ + success, fail + (1 - success),
total_lat + latency, total_conf + confidence,
samples + 1, time.time(),
tool, param_hash, house, model
))
else:
conn.execute("""
INSERT INTO patterns
(tool, param_signature, house, model, success_count, failure_count,
total_latency_ms, total_confidence, sample_count, last_updated)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (tool, param_hash, house, model,
success, 1 - success, latency, confidence, 1, time.time()))
def _update_model_performance(self, conn: sqlite3.Connection, data: Dict):
"""Update model performance tracking"""
model = data.get("model")
task_type = data.get("task_type", "unknown")
success = 1 if data.get("success") else 0
latency = data.get("latency_ms", 0)
tokens = (data.get("tokens_in", 0) or 0) + (data.get("tokens_out", 0) or 0)
result = conn.execute("""
SELECT total_calls, success_count, total_latency_ms, total_tokens
FROM model_performance
WHERE model=? AND task_type=?
""", (model, task_type)).fetchone()
if result:
total, succ, total_lat, total_tok = result
conn.execute("""
UPDATE model_performance SET
total_calls = ?,
success_count = ?,
total_latency_ms = ?,
total_tokens = ?,
last_used = ?
WHERE model=? AND task_type=?
""", (total + 1, succ + success, total_lat + latency,
total_tok + tokens, time.time(), model, task_type))
else:
conn.execute("""
INSERT INTO model_performance
(model, task_type, total_calls, success_count,
total_latency_ms, total_tokens, last_used)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (model, task_type, 1, success, latency, tokens, time.time()))
def get_pattern(self, tool: str, house: str,
params: Dict = None) -> Optional[ExecutionPattern]:
"""Get pattern for tool/house/params combination"""
conn = sqlite3.connect(str(self.db_path))
if params:
param_hash = hashlib.sha256(
json.dumps(params, sort_keys=True).encode()
).hexdigest()[:16]
result = conn.execute("""
SELECT param_signature, house, model,
success_count, failure_count, total_latency_ms,
total_confidence, sample_count, last_updated
FROM patterns
WHERE tool=? AND param_signature=? AND house=?
ORDER BY sample_count DESC
LIMIT 1
""", (tool, param_hash, house)).fetchone()
else:
# Get aggregate across all params
result = conn.execute("""
SELECT 'aggregate' as param_signature, house, model,
SUM(success_count), SUM(failure_count), SUM(total_latency_ms),
SUM(total_confidence), SUM(sample_count), MAX(last_updated)
FROM patterns
WHERE tool=? AND house=?
GROUP BY house, model
ORDER BY sample_count DESC
LIMIT 1
""", (tool, house)).fetchone()
conn.close()
if not result:
return None
(param_sig, h, model, succ, fail, total_lat,
total_conf, samples, last_updated) = result
total = succ + fail
success_rate = succ / total if total > 0 else 0.5
avg_lat = total_lat / samples if samples > 0 else 0
avg_conf = total_conf / samples if samples > 0 else 0.5
return ExecutionPattern(
tool=tool,
param_signature=param_sig,
house=h,
model=model or "unknown",
success_rate=success_rate,
avg_latency_ms=avg_lat,
avg_confidence=avg_conf,
sample_count=samples,
last_executed=datetime.fromtimestamp(last_updated).isoformat()
)
def get_best_model(self, task_type: str, min_samples: int = 5) -> Optional[str]:
"""Get best performing model for task type"""
conn = sqlite3.connect(str(self.db_path))
result = conn.execute("""
SELECT model, total_calls, success_count, total_latency_ms
FROM model_performance
WHERE task_type=? AND total_calls >= ?
ORDER BY (CAST(success_count AS REAL) / total_calls) DESC,
(total_latency_ms / total_calls) ASC
LIMIT 1
""", (task_type, min_samples)).fetchone()
conn.close()
return result[0] if result else None
def get_house_performance(self, house: str, days: int = 7) -> Dict:
"""Get performance metrics for a house"""
conn = sqlite3.connect(str(self.db_path))
cutoff = time.time() - (days * 86400)
result = conn.execute("""
SELECT
COUNT(*) as total,
SUM(success) as successes,
AVG(latency_ms) as avg_latency,
AVG(confidence) as avg_confidence
FROM executions
WHERE house=? AND timestamp > ?
""", (house, cutoff)).fetchone()
conn.close()
total, successes, avg_lat, avg_conf = result
return {
"house": house,
"period_days": days,
"total_executions": total or 0,
"successes": successes or 0,
"success_rate": (successes / total) if total else 0,
"avg_latency_ms": avg_lat or 0,
"avg_confidence": avg_conf or 0
}
def record_adaptation(self, event: AdaptationEvent):
"""Record a system adaptation"""
conn = sqlite3.connect(str(self.db_path))
conn.execute("""
INSERT INTO adaptations
(timestamp, trigger, change_type, old_value, new_value, reason, expected_improvement)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
time.time(), event.trigger, event.change_type,
json.dumps(event.old_value), json.dumps(event.new_value),
event.reason, event.expected_improvement
))
conn.commit()
conn.close()
def get_adaptations(self, limit: int = 20) -> List[AdaptationEvent]:
"""Get recent adaptations"""
conn = sqlite3.connect(str(self.db_path))
rows = conn.execute("""
SELECT timestamp, trigger, change_type, old_value, new_value,
reason, expected_improvement
FROM adaptations
ORDER BY timestamp DESC
LIMIT ?
""", (limit,)).fetchall()
conn.close()
return [
AdaptationEvent(
timestamp=datetime.fromtimestamp(r[0]).isoformat(),
trigger=r[1], change_type=r[2],
old_value=json.loads(r[3]) if r[3] else None,
new_value=json.loads(r[4]) if r[4] else None,
reason=r[5], expected_improvement=r[6]
)
for r in rows
]
class IntelligenceEngine:
"""
The brain that makes Timmy smarter.
Continuously:
- Analyzes execution patterns
- Identifies improvement opportunities
- Adapts policies and routing
- Predicts optimal configurations
"""
def __init__(self, db: PatternDatabase = None):
self.db = db or PatternDatabase()
self.adaptation_history: List[AdaptationEvent] = []
self.current_policies = self._load_default_policies()
def _load_default_policies(self) -> Dict:
"""Load default policies (will be adapted)"""
return {
"ezra": {
"evidence_threshold": 0.8,
"confidence_boost_for_read_ops": 0.1
},
"bezalel": {
"evidence_threshold": 0.6,
"parallel_test_threshold": 0.5
},
"routing": {
"min_confidence_for_auto_route": 0.7,
"fallback_to_timmy_threshold": 0.3
}
}
def ingest_hermes_session(self, session_data: Dict):
"""
Ingest telemetry from Hermes harness.
This is the SHORTEST LOOP - Hermes data directly into intelligence.
"""
# Extract execution records from Hermes session
executions = []
for msg in session_data.get("messages", []):
if msg.get("role") == "tool":
executions.append({
"tool": msg.get("name", "unknown"),
"success": not msg.get("error"),
"latency_ms": msg.get("execution_time_ms", 0),
"model": session_data.get("model"),
"timestamp": session_data.get("started_at")
})
for exec_data in executions:
self.db.record_execution(exec_data)
return len(executions)
def analyze_and_adapt(self) -> List[AdaptationEvent]:
"""
Analyze patterns and adapt policies.
Called periodically to improve system performance.
"""
adaptations = []
# Analysis 1: House performance gaps
house_perf = {
"ezra": self.db.get_house_performance("ezra", days=3),
"bezalel": self.db.get_house_performance("bezalel", days=3),
"timmy": self.db.get_house_performance("timmy", days=3)
}
# If Ezra's success rate is low, lower evidence threshold
ezra_rate = house_perf["ezra"].get("success_rate", 0.5)
if ezra_rate < 0.6 and self.current_policies["ezra"]["evidence_threshold"] > 0.6:
old_val = self.current_policies["ezra"]["evidence_threshold"]
new_val = old_val - 0.1
self.current_policies["ezra"]["evidence_threshold"] = new_val
adapt = AdaptationEvent(
timestamp=datetime.utcnow().isoformat(),
trigger="low_ezra_success_rate",
change_type="policy.ezra.evidence_threshold",
old_value=old_val,
new_value=new_val,
reason=f"Ezra success rate {ezra_rate:.1%} below threshold, relaxing evidence requirement",
expected_improvement=0.1
)
adaptations.append(adapt)
self.db.record_adaptation(adapt)
# Analysis 2: Model selection optimization
for task_type in ["read", "build", "test", "judge"]:
best_model = self.db.get_best_model(task_type, min_samples=10)
if best_model:
# This would update model selection policy
pass
self.adaptation_history.extend(adaptations)
return adaptations
def predict_success(self, tool: str, house: str,
params: Dict = None) -> Tuple[float, str]:
"""
Predict success probability for a planned execution.
Returns: (probability, reasoning)
"""
pattern = self.db.get_pattern(tool, house, params)
if not pattern or pattern.sample_count < 3:
return (0.5, "Insufficient data for prediction")
reasoning = f"Based on {pattern.sample_count} similar executions: "
if pattern.success_rate > 0.9:
reasoning += "excellent track record"
elif pattern.success_rate > 0.7:
reasoning += "good track record"
elif pattern.success_rate > 0.5:
reasoning += "mixed results"
else:
reasoning += "poor track record, consider alternatives"
return (pattern.success_rate, reasoning)
def get_optimal_house(self, tool: str, params: Dict = None) -> Tuple[str, float]:
"""
Determine optimal house for a task based on historical performance.
Returns: (house, confidence)
"""
houses = ["ezra", "bezalel", "timmy"]
best_house = "timmy"
best_rate = 0.0
for house in houses:
pattern = self.db.get_pattern(tool, house, params)
if pattern and pattern.success_rate > best_rate:
best_rate = pattern.success_rate
best_house = house
confidence = best_rate if best_rate > 0 else 0.5
return (best_house, confidence)
def get_intelligence_report(self) -> Dict:
"""Generate comprehensive intelligence report"""
return {
"timestamp": datetime.utcnow().isoformat(),
"house_performance": {
"ezra": self.db.get_house_performance("ezra", days=7),
"bezalel": self.db.get_house_performance("bezalel", days=7),
"timmy": self.db.get_house_performance("timmy", days=7)
},
"current_policies": self.current_policies,
"recent_adaptations": [
a.to_dict() for a in self.db.get_adaptations(limit=10)
],
"learning_velocity": self._calculate_learning_velocity(),
"prediction_accuracy": self._calculate_prediction_accuracy()
}
def _calculate_learning_velocity(self) -> Dict:
"""Calculate how fast Timmy is improving"""
conn = sqlite3.connect(str(self.db.db_path))
# Compare last 3 days vs previous 3 days
now = time.time()
recent_start = now - (3 * 86400)
previous_start = now - (6 * 86400)
recent = conn.execute("""
SELECT AVG(success) FROM executions WHERE timestamp > ?
""", (recent_start,)).fetchone()[0] or 0
previous = conn.execute("""
SELECT AVG(success) FROM executions
WHERE timestamp > ? AND timestamp <= ?
""", (previous_start, recent_start)).fetchone()[0] or 0
conn.close()
improvement = recent - previous
return {
"recent_success_rate": recent,
"previous_success_rate": previous,
"improvement": improvement,
"velocity": "accelerating" if improvement > 0.05 else
"stable" if improvement > -0.05 else "declining"
}
def _calculate_prediction_accuracy(self) -> float:
"""Calculate how accurate our predictions have been"""
conn = sqlite3.connect(str(self.db.db_path))
result = conn.execute("""
SELECT AVG(prediction_accuracy) FROM predictions
WHERE timestamp > ?
""", (time.time() - (7 * 86400),)).fetchone()
conn.close()
return result[0] if result[0] else 0.5
if __name__ == "__main__":
# Demo the intelligence engine
engine = IntelligenceEngine()
# Simulate some executions
for i in range(20):
engine.db.record_execution({
"tool": "git_status",
"house": "ezra" if i % 2 == 0 else "bezalel",
"model": "hermes3:8b",
"task_type": "read",
"success": i < 15, # 75% success rate
"latency_ms": 100 + i * 5,
"confidence": 0.8
})
print("=" * 60)
print("INTELLIGENCE ENGINE v3 — Self-Improvement Demo")
print("=" * 60)
# Get predictions
pred, reason = engine.predict_success("git_status", "ezra")
print(f"\n🔮 Prediction for ezra/git_status: {pred:.1%}")
print(f" Reasoning: {reason}")
# Analyze and adapt
adaptations = engine.analyze_and_adapt()
print(f"\n🔄 Adaptations made: {len(adaptations)}")
for a in adaptations:
print(f" - {a.change_type}: {a.old_value}{a.new_value}")
print(f" Reason: {a.reason}")
# Get report
report = engine.get_intelligence_report()
print(f"\n📊 Learning Velocity: {report['learning_velocity']['velocity']}")
print(f" Improvement: {report['learning_velocity']['improvement']:+.1%}")
print("\n" + "=" * 60)

View File

@@ -1,493 +0,0 @@
#!/usr/bin/env python3
"""
Test Suite for Uni-Wizard v3 — Self-Improving Intelligence
Tests:
- Pattern database operations
- Intelligence engine learning
- Adaptive policy changes
- Prediction accuracy
- Hermes bridge integration
- End-to-end self-improvement
"""
import sys
import json
import tempfile
import shutil
import time
import threading
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
# Add parent to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from intelligence_engine import (
PatternDatabase, IntelligenceEngine,
ExecutionPattern, AdaptationEvent
)
from harness import (
UniWizardHarness, AdaptivePolicy,
House, Provenance, ExecutionResult
)
from hermes_bridge import (
HermesStateReader, HermesSessionEvent,
TelemetryStreamProcessor, ShortestLoopIntegrator
)
class TestPatternDatabase:
"""Test pattern storage and retrieval"""
def setup_method(self):
self.temp_dir = tempfile.mkdtemp()
self.db = PatternDatabase(db_path=Path(self.temp_dir) / "test.db")
def teardown_method(self):
shutil.rmtree(self.temp_dir)
def test_record_execution(self):
"""Test recording execution outcomes"""
self.db.record_execution({
"tool": "git_status",
"house": "ezra",
"model": "hermes3:8b",
"success": True,
"latency_ms": 150,
"confidence": 0.9
})
# Verify pattern created
pattern = self.db.get_pattern("git_status", "ezra")
assert pattern is not None
assert pattern.success_rate == 1.0
assert pattern.sample_count == 1
def test_pattern_aggregation(self):
"""Test pattern aggregation across multiple executions"""
# Record 10 executions, 8 successful
for i in range(10):
self.db.record_execution({
"tool": "deploy",
"house": "bezalel",
"success": i < 8,
"latency_ms": 200 + i * 10,
"confidence": 0.8
})
pattern = self.db.get_pattern("deploy", "bezalel")
assert pattern.success_rate == 0.8
assert pattern.sample_count == 10
assert pattern.avg_latency_ms == 245 # Average of 200-290
def test_best_model_selection(self):
"""Test finding best model for task"""
# Model A: 10 calls, 8 success = 80%
for i in range(10):
self.db.record_execution({
"tool": "read",
"house": "ezra",
"model": "model_a",
"task_type": "read",
"success": i < 8,
"latency_ms": 100
})
# Model B: 10 calls, 9 success = 90%
for i in range(10):
self.db.record_execution({
"tool": "read",
"house": "ezra",
"model": "model_b",
"task_type": "read",
"success": i < 9,
"latency_ms": 120
})
best = self.db.get_best_model("read", min_samples=5)
assert best == "model_b"
def test_house_performance(self):
"""Test house performance metrics"""
# Record executions for ezra
for i in range(5):
self.db.record_execution({
"tool": "test",
"house": "ezra",
"success": i < 4, # 80% success
"latency_ms": 100
})
perf = self.db.get_house_performance("ezra", days=7)
assert perf["house"] == "ezra"
assert perf["success_rate"] == 0.8
assert perf["total_executions"] == 5
def test_adaptation_tracking(self):
"""Test recording adaptations"""
adapt = AdaptationEvent(
timestamp="2026-03-30T20:00:00Z",
trigger="low_success_rate",
change_type="policy.threshold",
old_value=0.8,
new_value=0.7,
reason="Performance below threshold",
expected_improvement=0.1
)
self.db.record_adaptation(adapt)
adaptations = self.db.get_adaptations(limit=10)
assert len(adaptations) == 1
assert adaptations[0].change_type == "policy.threshold"
class TestIntelligenceEngine:
"""Test intelligence and learning"""
def setup_method(self):
self.temp_dir = tempfile.mkdtemp()
self.db = PatternDatabase(db_path=Path(self.temp_dir) / "test.db")
self.engine = IntelligenceEngine(db=self.db)
def teardown_method(self):
shutil.rmtree(self.temp_dir)
def test_predict_success_with_data(self):
"""Test prediction with historical data"""
# Record successful pattern
for i in range(10):
self.db.record_execution({
"tool": "git_status",
"house": "ezra",
"success": True,
"latency_ms": 100,
"confidence": 0.9
})
prob, reason = self.engine.predict_success("git_status", "ezra")
assert prob == 1.0
assert "excellent track record" in reason
def test_predict_success_without_data(self):
"""Test prediction without historical data"""
prob, reason = self.engine.predict_success("unknown_tool", "timmy")
assert prob == 0.5
assert "Insufficient data" in reason
def test_optimal_house_selection(self):
"""Test finding optimal house for task"""
# Ezra: 90% success on git_status
for i in range(10):
self.db.record_execution({
"tool": "git_status",
"house": "ezra",
"success": i < 9,
"latency_ms": 100
})
# Bezalel: 50% success on git_status
for i in range(10):
self.db.record_execution({
"tool": "git_status",
"house": "bezalel",
"success": i < 5,
"latency_ms": 100
})
house, confidence = self.engine.get_optimal_house("git_status")
assert house == "ezra"
assert confidence == 0.9
def test_learning_velocity(self):
"""Test learning velocity calculation"""
now = time.time()
# Record old executions (5-7 days ago)
for i in range(10):
self.db.record_execution({
"tool": "test",
"house": "timmy",
"success": i < 5, # 50% success
"latency_ms": 100
})
# Backdate the executions
conn = self.db.db_path
# (In real test, we'd manipulate timestamps)
velocity = self.engine._calculate_learning_velocity()
assert "velocity" in velocity
assert "improvement" in velocity
class TestAdaptivePolicy:
"""Test policy adaptation"""
def setup_method(self):
self.temp_dir = tempfile.mkdtemp()
self.db = PatternDatabase(db_path=Path(self.temp_dir) / "test.db")
self.engine = IntelligenceEngine(db=self.db)
def teardown_method(self):
shutil.rmtree(self.temp_dir)
def test_policy_loads_defaults(self):
"""Test policy loads default values"""
policy = AdaptivePolicy(House.EZRA, self.engine)
assert policy.get("evidence_threshold") == 0.8
assert policy.get("must_read_before_write") is True
def test_policy_adapts_on_low_performance(self):
"""Test policy adapts when performance is poor"""
policy = AdaptivePolicy(House.EZRA, self.engine)
# Record poor performance for ezra
for i in range(10):
self.db.record_execution({
"tool": "test",
"house": "ezra",
"success": i < 4, # 40% success
"latency_ms": 100
})
# Trigger adaptation
adapt = policy.adapt("low_performance", "Testing adaptation")
# Threshold should have decreased
assert policy.get("evidence_threshold") < 0.8
assert adapt is not None
def test_policy_adapts_on_high_performance(self):
"""Test policy adapts when performance is excellent"""
policy = AdaptivePolicy(House.EZRA, self.engine)
# Start with lower threshold
policy.policy["evidence_threshold"] = 0.7
# Record excellent performance
for i in range(10):
self.db.record_execution({
"tool": "test",
"house": "ezra",
"success": True, # 100% success
"latency_ms": 100
})
# Trigger adaptation
adapt = policy.adapt("high_performance", "Testing adaptation")
# Threshold should have increased
assert policy.get("evidence_threshold") > 0.7
class TestHarness:
"""Test v3 harness with intelligence"""
def setup_method(self):
self.temp_dir = tempfile.mkdtemp()
self.db = PatternDatabase(db_path=Path(self.temp_dir) / "test.db")
self.engine = IntelligenceEngine(db=self.db)
def teardown_method(self):
shutil.rmtree(self.temp_dir)
def test_harness_creates_provenance(self):
"""Test harness creates proper provenance"""
harness = UniWizardHarness("ezra", intelligence=self.engine)
result = harness.execute("system_info")
assert result.provenance.house == "ezra"
assert result.provenance.tool == "system_info"
assert result.provenance.prediction >= 0
def test_harness_records_for_learning(self):
"""Test harness records executions"""
harness = UniWizardHarness("timmy", intelligence=self.engine, enable_learning=True)
initial_count = self.engine.db.get_house_performance("timmy")["total_executions"]
harness.execute("test_tool")
new_count = self.engine.db.get_house_performance("timmy")["total_executions"]
assert new_count == initial_count + 1
def test_harness_does_not_record_when_learning_disabled(self):
"""Test harness respects learning flag"""
harness = UniWizardHarness("timmy", intelligence=self.engine, enable_learning=False)
initial_count = self.engine.db.get_house_performance("timmy")["total_executions"]
harness.execute("test_tool")
new_count = self.engine.db.get_house_performance("timmy")["total_executions"]
assert new_count == initial_count
def test_learn_from_batch_triggers_adaptation(self):
"""Test batch learning triggers adaptations"""
harness = UniWizardHarness("ezra", intelligence=self.engine)
# Execute multiple times
for i in range(15):
harness.execute("test_tool")
# Trigger learning
result = harness.learn_from_batch(min_executions=10)
assert result["status"] == "adapted"
class TestHermesBridge:
"""Test Hermes integration"""
def setup_method(self):
self.temp_dir = tempfile.mkdtemp()
self.db = PatternDatabase(db_path=Path(self.temp_dir) / "test.db")
self.engine = IntelligenceEngine(db=self.db)
def teardown_method(self):
shutil.rmtree(self.temp_dir)
def test_event_conversion(self):
"""Test Hermes event to intelligence record conversion"""
processor = TelemetryStreamProcessor(self.engine)
event = HermesSessionEvent(
session_id="test_session",
timestamp=time.time(),
event_type="tool_call",
tool_name="terminal",
success=True,
latency_ms=150,
model="hermes3:8b",
provider="local",
token_count=100,
error=None
)
record = processor._convert_event(event)
assert record["tool"] == "system_shell" # Mapped from terminal
assert record["house"] == "timmy"
assert record["success"] is True
def test_task_type_inference(self):
"""Test task type inference from tool"""
processor = TelemetryStreamProcessor(self.engine)
assert processor._infer_task_type("git_status") == "read"
assert processor._infer_task_type("file_write") == "build"
assert processor._infer_task_type("run_tests") == "test"
class TestEndToEnd:
"""End-to-end integration tests"""
def setup_method(self):
self.temp_dir = tempfile.mkdtemp()
self.db = PatternDatabase(db_path=Path(self.temp_dir) / "test.db")
self.engine = IntelligenceEngine(db=self.db)
def teardown_method(self):
shutil.rmtree(self.temp_dir)
def test_full_learning_cycle(self):
"""Test complete learning cycle"""
# 1. Create harness
harness = UniWizardHarness("ezra", intelligence=self.engine)
# 2. Execute multiple times
for i in range(20):
harness.execute("git_status", repo_path="/tmp")
# 3. Get pattern
pattern = self.engine.db.get_pattern("git_status", "ezra")
assert pattern.sample_count == 20
# 4. Predict next execution
prob, reason = harness.predict_execution("git_status", {})
assert prob > 0
assert len(reason) > 0
# 5. Learn from batch
result = harness.learn_from_batch()
assert result["status"] == "adapted"
# 6. Get intelligence report
report = self.engine.get_intelligence_report()
assert "house_performance" in report
assert "learning_velocity" in report
def run_tests():
"""Run all tests"""
import inspect
test_classes = [
TestPatternDatabase,
TestIntelligenceEngine,
TestAdaptivePolicy,
TestHarness,
TestHermesBridge,
TestEndToEnd
]
passed = 0
failed = 0
print("=" * 60)
print("UNI-WIZARD v3 TEST SUITE")
print("=" * 60)
for cls in test_classes:
print(f"\n📦 {cls.__name__}")
print("-" * 40)
instance = cls()
# Run setup
if hasattr(instance, 'setup_method'):
try:
instance.setup_method()
except Exception as e:
print(f" ⚠️ Setup failed: {e}")
continue
for name, method in inspect.getmembers(cls, predicate=inspect.isfunction):
if name.startswith('test_'):
try:
# Get fresh instance for each test
test_instance = cls()
if hasattr(test_instance, 'setup_method'):
test_instance.setup_method()
method(test_instance)
print(f"{name}")
passed += 1
if hasattr(test_instance, 'teardown_method'):
test_instance.teardown_method()
except Exception as e:
print(f"{name}: {e}")
failed += 1
# Run teardown
if hasattr(instance, 'teardown_method'):
try:
instance.teardown_method()
except:
pass
print("\n" + "=" * 60)
print(f"Results: {passed} passed, {failed} failed")
print("=" * 60)
return failed == 0
if __name__ == "__main__":
success = run_tests()
sys.exit(0 if success else 1)

View File

@@ -1,413 +0,0 @@
# Uni-Wizard v4 — Production Architecture
## Final Integration: All Passes United
### Pass 1 (Timmy) → Foundation
- Tool registry, basic harness, health daemon
- VPS provisioning, Syncthing mesh
### Pass 2 (Ezra/Bezalel/Timmy) → Three-House Canon
- House-aware execution (Timmy/Ezra/Bezalel)
- Provenance tracking
- Artifact-flow discipline
### Pass 3 (Intelligence) → Self-Improvement
- Pattern database
- Adaptive policies
- Predictive execution
- Hermes bridge
### Pass 4 (Final) → Production Integration
**What v4 adds:**
- Unified single-harness API (no more version confusion)
- Async/concurrent execution
- Real Hermes integration (not mocks)
- Production systemd services
- Health monitoring & alerting
- Graceful degradation
- Clear operational boundaries
---
## The Final Architecture
```
┌─────────────────────────────────────────────────────────────────────────┐
│ UNI-WIZARD v4 (PRODUCTION) │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ UNIFIED HARNESS API │ │
│ │ Single entry point: `from uni_wizard import Harness` │ │
│ │ All capabilities through one clean interface │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────┼──────────────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌────────▼────────┐ ┌───────▼───────┐ │
│ │ TOOLS │ │ INTELLIGENCE │ │ TELEMETRY │ │
│ │ (19 tools) │ │ ENGINE │ │ LAYER │ │
│ │ │ │ │ │ │ │
│ │ • System │ │ • Pattern DB │ │ • Hermes │ │
│ │ • Git │ │ • Predictions │ │ • Metrics │ │
│ │ • Network │ │ • Adaptation │ │ • Alerts │ │
│ │ • File │ │ • Learning │ │ • Audit │ │
│ └──────┬──────┘ └────────┬────────┘ └───────┬───────┘ │
│ │ │ │ │
│ └──────────────────────┼──────────────────────┘ │
│ │ │
│ ┌─────────────────────────────▼─────────────────────────────┐ │
│ │ HOUSE DISPATCHER (Router) │ │
│ │ • Timmy: Sovereign judgment, final review │ │
│ │ • Ezra: Archivist mode (read-before-write) │ │
│ │ • Bezalel: Artificer mode (proof-required) │ │
│ └─────────────────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────▼─────────────────────────────┐ │
│ │ EXECUTION ENGINE (Async/Concurrent) │ │
│ │ • Parallel tool execution │ │
│ │ • Timeout handling │ │
│ │ • Retry with backoff │ │
│ │ • Circuit breaker pattern │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
---
## Key Design Decisions
### 1. Single Unified API
```python
# Before (confusing):
from v1.harness import Harness # Basic
from v2.harness import Harness # Three-house
from v3.harness import Harness # Intelligence
# After (clean):
from uni_wizard import Harness, House, Mode
# Usage:
harness = Harness(house=House.TIMMY, mode=Mode.INTELLIGENT)
result = harness.execute("git_status", repo_path="/path")
```
### 2. Three Operating Modes
| Mode | Use Case | Features |
|------|----------|----------|
| `Mode.SIMPLE` | Fast scripts | Direct execution, no overhead |
| `Mode.INTELLIGENT` | Production | Predictions, adaptations, learning |
| `Mode.SOVEREIGN` | Critical ops | Full provenance, Timmy approval required |
### 3. Clear Boundaries
```python
# What the harness DOES:
- Route tasks to appropriate tools
- Track provenance
- Learn from outcomes
- Predict success rates
# What the harness DOES NOT do:
- Make autonomous decisions (Timmy decides)
- Modify production without approval
- Blend house identities
- Phone home to cloud
```
### 4. Production Hardening
- **Circuit breakers**: Stop calling failing tools
- **Timeouts**: Every operation has bounded time
- **Retries**: Exponential backoff on transient failures
- **Graceful degradation**: Fall back to simpler modes on stress
- **Health checks**: `/health` endpoint for monitoring
---
## File Structure (Final)
```
uni-wizard/
├── README.md # Quick start guide
├── ARCHITECTURE.md # This document
├── uni_wizard/ # Main package
│ ├── __init__.py # Unified API
│ ├── harness.py # Core harness (v4 unified)
│ ├── houses.py # House definitions & policies
│ ├── tools/
│ │ ├── __init__.py # Tool registry
│ │ ├── system.py # System tools
│ │ ├── git.py # Git tools
│ │ ├── network.py # Network/Gitea tools
│ │ └── file.py # File operations
│ ├── intelligence/
│ │ ├── __init__.py # Intelligence engine
│ │ ├── patterns.py # Pattern database
│ │ ├── predictions.py # Prediction engine
│ │ └── adaptation.py # Policy adaptation
│ ├── telemetry/
│ │ ├── __init__.py # Telemetry layer
│ │ ├── hermes_bridge.py # Hermes integration
│ │ ├── metrics.py # Metrics collection
│ │ └── alerts.py # Alerting
│ └── daemon/
│ ├── __init__.py # Daemon framework
│ ├── router.py # Task router daemon
│ ├── health.py # Health check daemon
│ └── worker.py # Async worker pool
├── configs/
│ ├── uni-wizard.service # Systemd service
│ ├── timmy-router.service # Task router service
│ └── health-daemon.service # Health monitoring
├── tests/
│ ├── test_harness.py # Core tests
│ ├── test_intelligence.py # Intelligence tests
│ ├── test_integration.py # E2E tests
│ └── test_production.py # Load/stress tests
└── docs/
├── OPERATIONS.md # Runbook
├── TROUBLESHOOTING.md # Common issues
└── API_REFERENCE.md # Full API docs
```
---
## Operational Model
### Local-First Principle
```
Hermes Session → Local Intelligence → Local Decision → Local Execution
↑ ↓
└────────────── Telemetry ─────────────────────┘
```
All learning happens locally. No cloud required for operation.
### Cloud-Connected Enhancement (Allegro's Lane)
```
┌─────────────────────────────────────────────────────────────┐
│ LOCAL TIMMY (Sovereign) │
│ (Mac/Mini) │
└───────────────────────┬─────────────────────────────────────┘
│ Direction (decisions flow down)
┌─────────────────────────────────────────────────────────────┐
│ ALLEGRO VPS (Connected/Redundant) │
│ (This Machine) │
│ • Pulls from Gitea (issues, specs) │
│ • Runs Hermes with cloud model access │
│ • Streams telemetry to Timmy │
│ • Reports back via PRs, comments │
│ • Fails over to other VPS if unavailable │
└───────────────────────┬─────────────────────────────────────┘
│ Artifacts (PRs, comments, logs)
┌─────────────────────────────────────────────────────────────┐
│ EZRA/BEZALEL VPS (Wizard Houses) │
│ (Separate VPS instances) │
│ • Ezra: Analysis, architecture, docs │
│ • Bezalel: Implementation, testing, forge │
└─────────────────────────────────────────────────────────────┘
```
### The Contract
**Timmy (Local) owns:**
- Final decisions
- Local memory
- Sovereign identity
- Policy approval
**Allegro (This VPS) owns:**
- Connectivity to cloud models
- Gitea integration
- Telemetry streaming
- Failover/redundancy
- Issue triage and routing
**Ezra/Bezalel (Other VPS) own:**
- Specialized analysis
- Heavy computation
- Parallel work streams
---
## Allegro's Narrowed Lane (v4)
### What I Do Now
```
┌────────────────────────────────────────────────────────────┐
│ ALLEGRO LANE v4 │
│ "Tempo-and-Dispatch, Connected" │
├────────────────────────────────────────────────────────────┤
│ │
│ PRIMARY: Gitea Integration & Issue Flow │
│ ├── Monitor Gitea for new issues/PRs │
│ ├── Triage: label, categorize, assign │
│ ├── Route to appropriate house (Ezra/Bezalel/Timmy) │
│ └── Report back via PR comments, status updates │
│ │
│ PRIMARY: Hermes Bridge & Telemetry │
│ ├── Run Hermes with cloud model access │
│ ├── Stream execution telemetry to Timmy │
│ ├── Maintain shortest-loop feedback (<100ms) │
│ └── Buffer during outages, sync on recovery │
│ │
│ SECONDARY: Redundancy & Failover │
│ ├── Health check other VPS instances │
│ ├── Take over routing if primary fails │
│ └── Maintain distributed state via Syncthing │
│ │
│ SECONDARY: Uni-Wizard Operations │
│ ├── Keep uni-wizard services running │
│ ├── Monitor health, restart on failure │
│ └── Report metrics to local Timmy │
│ │
│ WHAT I DO NOT DO: │
│ ├── Make sovereign decisions (Timmy decides) │
│ ├── Modify production without Timmy approval │
│ ├── Store long-term memory (Timmy owns memory) │
│ ├── Authenticate as Timmy (I'm Allegro) │
│ └── Work without connectivity (need cloud for models) │
│ │
└────────────────────────────────────────────────────────────┘
```
### My API Surface
```python
# What I expose to Timmy:
class AllegroBridge:
"""
Allegro's narrow interface for Timmy.
I provide:
- Gitea connectivity
- Cloud model access
- Telemetry streaming
- Redundancy/failover
"""
async def get_gitea_issues(self, repo: str, assignee: str = None) -> List[Issue]:
"""Fetch issues from Gitea"""
async def create_pr(self, repo: str, branch: str, title: str, body: str) -> PR:
"""Create pull request"""
async def run_with_hermes(self, prompt: str, model: str = None) -> HermesResult:
"""Execute via Hermes with cloud model"""
async def stream_telemetry(self, events: List[TelemetryEvent]):
"""Stream execution telemetry to Timmy"""
async def check_health(self, target: str) -> HealthStatus:
"""Check health of other VPS instances"""
```
### Success Metrics
| Metric | Target | Measurement |
|--------|--------|-------------|
| Issue triage latency | < 5 minutes | Time from issue creation to labeling |
| Telemetry lag | < 100ms | Hermes event to Timmy intelligence |
| Gitea uptime | 99.9% | Availability of Gitea API |
| Failover time | < 30s | Detection to takeover |
| PR throughput | 10/day | Issues → PRs created |
---
## Deployment Checklist
### 1. Install Uni-Wizard v4
```bash
cd /opt/uni-wizard
pip install -e .
systemctl enable uni-wizard
systemctl start uni-wizard
```
### 2. Configure Houses
```yaml
# /etc/uni-wizard/houses.yaml
houses:
timmy:
endpoint: http://192.168.1.100:8643 # Local Mac
auth_token: ${TIMMY_TOKEN}
priority: critical
allegro:
endpoint: http://localhost:8643
role: tempo-and-dispatch
ezra:
endpoint: http://143.198.27.163:8643
role: archivist
bezalel:
endpoint: http://67.205.155.108:8643
role: artificer
```
### 3. Verify Integration
```bash
# Test harness
uni-wizard test --house timmy --tool git_status
# Test intelligence
uni-wizard predict --tool deploy --house bezalel
# Test telemetry
uni-wizard telemetry --status
```
---
## The Final Vision
```
┌─────────────────────────────────────────────────────────────────┐
│ THE SOVEREIGN TIMMY SYSTEM │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Local (Sovereign Core) Cloud-Connected (Redundant) │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Timmy (Mac/Mini) │◄──────►│ Allegro (VPS) │ │
│ │ • Final decisions │ │ • Gitea bridge │ │
│ │ • Local memory │ │ • Cloud models │ │
│ │ • Policy approval │ │ • Telemetry │ │
│ │ • Sovereign voice │ │ • Failover │ │
│ └─────────────────────┘ └──────────┬──────────┘ │
│ ▲ │ │
│ │ │ │
│ └───────────────────────────────────┘ │
│ Telemetry Loop │
│ │
│ Specialized (Separate) │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Ezra (VPS) │ │ Bezalel (VPS) │ │
│ │ • Analysis │ │ • Implementation │ │
│ │ • Architecture │ │ • Testing │ │
│ │ • Documentation │ │ • Forge work │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ │
│ All houses communicate through: │
│ • Gitea (issues, PRs, comments) │
│ • Syncthing (file sync, logs) │
│ • Uni-Wizard telemetry (execution data) │
│ │
│ Timmy remains sovereign. All others serve. │
│ │
└─────────────────────────────────────────────────────────────────┘
```
---
*Sovereignty and service always.*
*Final pass complete. Production ready.*

View File

@@ -1,511 +0,0 @@
#!/usr/bin/env python3
"""
Uni-Wizard v4 — Unified Production API
Single entry point for all uni-wizard capabilities.
Usage:
from uni_wizard import Harness, House, Mode
# Simple mode - direct execution
harness = Harness(mode=Mode.SIMPLE)
result = harness.execute("git_status", repo_path="/path")
# Intelligent mode - with predictions and learning
harness = Harness(house=House.EZRA, mode=Mode.INTELLIGENT)
result = harness.execute("git_status")
print(f"Predicted: {result.prediction.success_rate:.0%}")
# Sovereign mode - full provenance and approval
harness = Harness(house=House.TIMMY, mode=Mode.SOVEREIGN)
result = harness.execute("deploy")
"""
from enum import Enum, auto
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
from pathlib import Path
import json
import time
import hashlib
import asyncio
from concurrent.futures import ThreadPoolExecutor
class House(Enum):
"""Canonical wizard houses"""
TIMMY = "timmy" # Sovereign local conscience
EZRA = "ezra" # Archivist, reader
BEZALEL = "bezalel" # Artificer, builder
ALLEGRO = "allegro" # Tempo-and-dispatch, connected
class Mode(Enum):
"""Operating modes"""
SIMPLE = "simple" # Direct execution, no overhead
INTELLIGENT = "intelligent" # With predictions and learning
SOVEREIGN = "sovereign" # Full provenance, approval required
@dataclass
class Prediction:
"""Pre-execution prediction"""
success_rate: float
confidence: float
reasoning: str
suggested_house: Optional[str] = None
estimated_latency_ms: float = 0.0
@dataclass
class Provenance:
"""Full execution provenance"""
house: str
tool: str
mode: str
started_at: str
completed_at: Optional[str] = None
input_hash: str = ""
output_hash: str = ""
prediction: Optional[Prediction] = None
execution_time_ms: float = 0.0
retry_count: int = 0
circuit_open: bool = False
@dataclass
class ExecutionResult:
"""Unified execution result"""
success: bool
data: Any
provenance: Provenance
error: Optional[str] = None
suggestions: List[str] = field(default_factory=list)
def to_json(self) -> str:
return json.dumps({
"success": self.success,
"data": self.data,
"error": self.error,
"provenance": {
"house": self.provenance.house,
"tool": self.provenance.tool,
"mode": self.provenance.mode,
"execution_time_ms": self.provenance.execution_time_ms,
"prediction": {
"success_rate": self.provenance.prediction.success_rate,
"confidence": self.provenance.prediction.confidence
} if self.provenance.prediction else None
},
"suggestions": self.suggestions
}, indent=2, default=str)
class ToolRegistry:
"""Central tool registry"""
def __init__(self):
self._tools: Dict[str, Callable] = {}
self._schemas: Dict[str, Dict] = {}
def register(self, name: str, handler: Callable, schema: Dict = None):
"""Register a tool"""
self._tools[name] = handler
self._schemas[name] = schema or {}
return self
def get(self, name: str) -> Optional[Callable]:
"""Get tool handler"""
return self._tools.get(name)
def list_tools(self) -> List[str]:
"""List all registered tools"""
return list(self._tools.keys())
class IntelligenceLayer:
"""
v4 Intelligence - pattern recognition and prediction.
Lightweight version for production.
"""
def __init__(self, db_path: Path = None):
self.patterns: Dict[str, Dict] = {}
self.db_path = db_path or Path.home() / ".uni-wizard" / "patterns.json"
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._load_patterns()
def _load_patterns(self):
"""Load patterns from disk"""
if self.db_path.exists():
with open(self.db_path) as f:
self.patterns = json.load(f)
def _save_patterns(self):
"""Save patterns to disk"""
with open(self.db_path, 'w') as f:
json.dump(self.patterns, f, indent=2)
def predict(self, tool: str, house: str, params: Dict) -> Prediction:
"""Predict execution outcome"""
key = f"{house}:{tool}"
pattern = self.patterns.get(key, {})
if not pattern or pattern.get("count", 0) < 3:
return Prediction(
success_rate=0.7,
confidence=0.5,
reasoning="Insufficient data for prediction",
estimated_latency_ms=200
)
success_rate = pattern.get("successes", 0) / pattern.get("count", 1)
avg_latency = pattern.get("total_latency_ms", 0) / pattern.get("count", 1)
confidence = min(0.95, pattern.get("count", 0) / 20) # Max at 20 samples
return Prediction(
success_rate=success_rate,
confidence=confidence,
reasoning=f"Based on {pattern.get('count')} executions",
estimated_latency_ms=avg_latency
)
def record(self, tool: str, house: str, success: bool, latency_ms: float):
"""Record execution outcome"""
key = f"{house}:{tool}"
if key not in self.patterns:
self.patterns[key] = {"count": 0, "successes": 0, "total_latency_ms": 0}
self.patterns[key]["count"] += 1
self.patterns[key]["successes"] += int(success)
self.patterns[key]["total_latency_ms"] += latency_ms
self._save_patterns()
class CircuitBreaker:
"""Circuit breaker pattern for fault tolerance"""
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures: Dict[str, int] = {}
self.last_failure: Dict[str, float] = {}
self.open_circuits: set = set()
def can_execute(self, tool: str) -> bool:
"""Check if tool can be executed"""
if tool not in self.open_circuits:
return True
# Check if recovery timeout passed
last_fail = self.last_failure.get(tool, 0)
if time.time() - last_fail > self.recovery_timeout:
self.open_circuits.discard(tool)
return True
return False
def record_success(self, tool: str):
"""Record successful execution"""
self.failures[tool] = 0
self.open_circuits.discard(tool)
def record_failure(self, tool: str):
"""Record failed execution"""
self.failures[tool] = self.failures.get(tool, 0) + 1
self.last_failure[tool] = time.time()
if self.failures[tool] >= self.failure_threshold:
self.open_circuits.add(tool)
class Harness:
"""
Uni-Wizard v4 Unified Harness.
Single API for all execution needs.
"""
def __init__(
self,
house: House = House.TIMMY,
mode: Mode = Mode.INTELLIGENT,
enable_learning: bool = True,
max_workers: int = 4
):
self.house = house
self.mode = mode
self.enable_learning = enable_learning
# Components
self.registry = ToolRegistry()
self.intelligence = IntelligenceLayer() if mode != Mode.SIMPLE else None
self.circuit_breaker = CircuitBreaker()
self.executor = ThreadPoolExecutor(max_workers=max_workers)
# Metrics
self.execution_count = 0
self.success_count = 0
# Register built-in tools
self._register_builtin_tools()
def _register_builtin_tools(self):
"""Register built-in tools"""
# System tools
self.registry.register("system_info", self._system_info)
self.registry.register("health_check", self._health_check)
# Git tools
self.registry.register("git_status", self._git_status)
self.registry.register("git_log", self._git_log)
# Placeholder for actual implementations
self.registry.register("file_read", self._not_implemented)
self.registry.register("file_write", self._not_implemented)
def _system_info(self, **params) -> Dict:
"""Get system information"""
import platform
return {
"platform": platform.platform(),
"python": platform.python_version(),
"processor": platform.processor(),
"hostname": platform.node()
}
def _health_check(self, **params) -> Dict:
"""Health check"""
return {
"status": "healthy",
"executions": self.execution_count,
"success_rate": self.success_count / max(1, self.execution_count)
}
def _git_status(self, repo_path: str = ".", **params) -> Dict:
"""Git status (placeholder)"""
# Would call actual git command
return {"status": "clean", "repo": repo_path}
def _git_log(self, repo_path: str = ".", max_count: int = 10, **params) -> Dict:
"""Git log (placeholder)"""
return {"commits": [], "repo": repo_path}
def _not_implemented(self, **params) -> Dict:
"""Placeholder for unimplemented tools"""
return {"error": "Tool not yet implemented"}
def predict(self, tool: str, params: Dict = None) -> Optional[Prediction]:
"""Predict execution outcome"""
if self.mode == Mode.SIMPLE or not self.intelligence:
return None
return self.intelligence.predict(tool, self.house.value, params or {})
def execute(self, tool: str, **params) -> ExecutionResult:
"""
Execute a tool with full v4 capabilities.
Flow:
1. Check circuit breaker
2. Get prediction (if intelligent mode)
3. Execute with timeout
4. Record outcome (if learning enabled)
5. Return result with full provenance
"""
start_time = time.time()
started_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
# 1. Circuit breaker check
if not self.circuit_breaker.can_execute(tool):
return ExecutionResult(
success=False,
data=None,
error=f"Circuit breaker open for {tool}",
provenance=Provenance(
house=self.house.value,
tool=tool,
mode=self.mode.value,
started_at=started_at,
circuit_open=True
),
suggestions=[f"Wait for circuit recovery or use alternative tool"]
)
# 2. Get prediction
prediction = None
if self.mode != Mode.SIMPLE:
prediction = self.predict(tool, params)
# 3. Execute
handler = self.registry.get(tool)
if not handler:
return ExecutionResult(
success=False,
data=None,
error=f"Tool '{tool}' not found",
provenance=Provenance(
house=self.house.value,
tool=tool,
mode=self.mode.value,
started_at=started_at,
prediction=prediction
)
)
try:
# Execute with timeout for production
result_data = handler(**params)
success = True
error = None
self.circuit_breaker.record_success(tool)
except Exception as e:
success = False
error = str(e)
result_data = None
self.circuit_breaker.record_failure(tool)
execution_time_ms = (time.time() - start_time) * 1000
completed_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
# 4. Record for learning
if self.enable_learning and self.intelligence:
self.intelligence.record(tool, self.house.value, success, execution_time_ms)
# Update metrics
self.execution_count += 1
if success:
self.success_count += 1
# Build provenance
input_hash = hashlib.sha256(
json.dumps(params, sort_keys=True).encode()
).hexdigest()[:16]
output_hash = hashlib.sha256(
json.dumps(result_data, default=str).encode()
).hexdigest()[:16] if result_data else ""
provenance = Provenance(
house=self.house.value,
tool=tool,
mode=self.mode.value,
started_at=started_at,
completed_at=completed_at,
input_hash=input_hash,
output_hash=output_hash,
prediction=prediction,
execution_time_ms=execution_time_ms
)
# Build suggestions
suggestions = []
if not success:
suggestions.append(f"Check tool availability and parameters")
if prediction and prediction.success_rate < 0.5:
suggestions.append(f"Low historical success rate - consider alternative approach")
return ExecutionResult(
success=success,
data=result_data,
error=error,
provenance=provenance,
suggestions=suggestions
)
async def execute_async(self, tool: str, **params) -> ExecutionResult:
"""Async execution"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, self.execute, tool, **params)
def execute_batch(self, tasks: List[Dict]) -> List[ExecutionResult]:
"""
Execute multiple tasks.
tasks: [{"tool": "name", "params": {...}}, ...]
"""
results = []
for task in tasks:
result = self.execute(task["tool"], **task.get("params", {}))
results.append(result)
# In SOVEREIGN mode, stop on first failure
if self.mode == Mode.SOVEREIGN and not result.success:
break
return results
def get_stats(self) -> Dict:
"""Get harness statistics"""
return {
"house": self.house.value,
"mode": self.mode.value,
"executions": self.execution_count,
"successes": self.success_count,
"success_rate": self.success_count / max(1, self.execution_count),
"tools_registered": len(self.registry.list_tools()),
"learning_enabled": self.enable_learning,
"circuit_breaker_open": len(self.circuit_breaker.open_circuits)
}
def get_patterns(self) -> Dict:
"""Get learned patterns"""
if not self.intelligence:
return {}
return self.intelligence.patterns
# Convenience factory functions
def get_harness(house: str = "timmy", mode: str = "intelligent") -> Harness:
"""Get configured harness"""
return Harness(
house=House(house),
mode=Mode(mode)
)
def get_simple_harness() -> Harness:
"""Get simple harness (no intelligence overhead)"""
return Harness(mode=Mode.SIMPLE)
def get_intelligent_harness(house: str = "timmy") -> Harness:
"""Get intelligent harness with learning"""
return Harness(
house=House(house),
mode=Mode.INTELLIGENT,
enable_learning=True
)
def get_sovereign_harness() -> Harness:
"""Get sovereign harness (full provenance)"""
return Harness(
house=House.TIMMY,
mode=Mode.SOVEREIGN,
enable_learning=True
)
# CLI interface
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Uni-Wizard v4")
parser.add_argument("--house", default="timmy", choices=["timmy", "ezra", "bezalel", "allegro"])
parser.add_argument("--mode", default="intelligent", choices=["simple", "intelligent", "sovereign"])
parser.add_argument("tool", help="Tool to execute")
parser.add_argument("--params", default="{}", help="JSON params")
args = parser.parse_args()
harness = Harness(house=House(args.house), mode=Mode(args.mode))
params = json.loads(args.params)
result = harness.execute(args.tool, **params)
print(result.to_json())