Compare commits

..

28 Commits

Author SHA1 Message Date
09aa06d65f Remove OpenClaw from fleet health checks
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 8s
PR Checklist / pr-checklist (pull_request) Failing after 1m11s
Smoke Test / smoke (pull_request) Failing after 6s
Validate Config / YAML Lint (pull_request) Failing after 6s
Validate Config / JSON Validate (pull_request) Successful in 6s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 7s
Validate Config / Shell Script Lint (pull_request) Successful in 14s
Validate Config / Cron Syntax Check (pull_request) Successful in 7s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 9s
Validate Config / Playbook Schema Validation (pull_request) Successful in 8s
Architecture Lint / Lint Repository (pull_request) Failing after 7s
2026-04-12 05:51:45 +00:00
8dc8bc4774 Replace OpenClaw with generic single-agent framing in Son of Timmy 2026-04-12 05:51:43 +00:00
fcf112cb1e Remove OpenClaw gateway from fleet topology 2026-04-12 05:51:41 +00:00
ce36d3813b Remove OpenClaw from fleet capacity inventory 2026-04-12 05:51:40 +00:00
d4876c0fa5 Remove OpenClaw gateway from automation inventory 2026-04-12 05:51:38 +00:00
8070536d57 Remove OpenClaw references from Allegro wizard house doc 2026-04-12 05:51:37 +00:00
438191c72e Remove OpenClaw reference from Code Claw delegation doc 2026-04-12 05:51:36 +00:00
21e4039ec9 Merge pull request 'feat(scripts): enforce verified SSH trust for Gemini suite (#434)' (#474) from timmy/issue-434-ssh-trust into main
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 8s
Smoke Test / smoke (push) Failing after 7s
Validate Config / YAML Lint (push) Failing after 6s
Validate Config / JSON Validate (push) Successful in 6s
Validate Config / Python Syntax & Import Check (push) Failing after 8s
Validate Config / Shell Script Lint (push) Successful in 15s
Validate Config / Cron Syntax Check (push) Successful in 4s
Validate Config / Deploy Script Dry Run (push) Successful in 5s
Validate Config / Playbook Schema Validation (push) Successful in 8s
Architecture Lint / Lint Repository (push) Failing after 7s
2026-04-11 20:23:26 +00:00
Alexander Whitestone
19aa0830f4 Harden Gemini scripts with verified SSH trust
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 8s
Validate Config / YAML Lint (pull_request) Failing after 5s
Validate Config / JSON Validate (pull_request) Successful in 6s
PR Checklist / pr-checklist (pull_request) Failing after 1m11s
Smoke Test / smoke (pull_request) Failing after 7s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 9s
Validate Config / Shell Script Lint (pull_request) Successful in 15s
Validate Config / Cron Syntax Check (pull_request) Successful in 5s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 5s
Validate Config / Playbook Schema Validation (pull_request) Successful in 7s
Architecture Lint / Lint Repository (pull_request) Failing after 6s
2026-04-11 15:13:15 -04:00
f2edb6a9b3 merge: feat(scripts): add GOFAI symbolic forward-chaining rule engine (#470)
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 8s
Smoke Test / smoke (push) Failing after 6s
Validate Config / YAML Lint (push) Failing after 6s
Validate Config / JSON Validate (push) Successful in 6s
Validate Config / Python Syntax & Import Check (push) Failing after 8s
Validate Config / Shell Script Lint (push) Successful in 13s
Validate Config / Cron Syntax Check (push) Successful in 6s
Validate Config / Deploy Script Dry Run (push) Successful in 7s
Validate Config / Playbook Schema Validation (push) Successful in 9s
Architecture Lint / Lint Repository (push) Failing after 7s
Auto-merged by PR triage bot
2026-04-11 02:08:02 +00:00
fc817c6a84 merge: feat(scripts): add GOFAI symbolic knowledge base (#471)
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 9s
Architecture Lint / Lint Repository (push) Has been cancelled
Smoke Test / smoke (push) Has been cancelled
Validate Config / YAML Lint (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Auto-merged by PR triage bot
2026-04-11 02:07:46 +00:00
a620bd19b3 merge: feat(scripts): add GOFAI STRIPS goal-directed planner (#472)
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 10s
Smoke Test / smoke (push) Failing after 6s
Architecture Lint / Lint Repository (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / YAML Lint (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Auto-merged by PR triage bot
2026-04-11 02:07:15 +00:00
0c98bce77f merge: feat(scripts): add GOFAI temporal reasoning engine (#473)
Some checks failed
Architecture Lint / Lint Repository (push) Has been cancelled
Architecture Lint / Linter Tests (push) Has been cancelled
Smoke Test / smoke (push) Has been cancelled
Validate Config / YAML Lint (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Auto-merged by PR triage bot
2026-04-11 02:07:04 +00:00
c01e7f7d7f merge: test(scripts): lock self_healing safe CLI behavior (#469)
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 11s
Architecture Lint / Lint Repository (push) Has been cancelled
Smoke Test / smoke (push) Failing after 10s
Validate Config / YAML Lint (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Auto-merged by PR triage bot
2026-04-11 02:06:42 +00:00
20bc0aa41a feat(scripts): add GOFAI temporal reasoning engine
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 8s
PR Checklist / pr-checklist (pull_request) Failing after 1m12s
Smoke Test / smoke (pull_request) Failing after 7s
Validate Config / YAML Lint (pull_request) Failing after 6s
Validate Config / JSON Validate (pull_request) Successful in 5s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 7s
Validate Config / Shell Script Lint (pull_request) Successful in 14s
Validate Config / Cron Syntax Check (pull_request) Successful in 4s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 4s
Validate Config / Playbook Schema Validation (pull_request) Successful in 7s
Architecture Lint / Lint Repository (pull_request) Failing after 6s
2026-04-11 01:40:24 +00:00
b6c0620c83 feat(scripts): add GOFAI STRIPS goal-directed planner
Some checks failed
Validate Config / Deploy Script Dry Run (pull_request) Successful in 5s
Validate Config / Playbook Schema Validation (pull_request) Successful in 8s
Architecture Lint / Lint Repository (pull_request) Failing after 6s
Architecture Lint / Linter Tests (pull_request) Successful in 7s
PR Checklist / pr-checklist (pull_request) Failing after 1m9s
Smoke Test / smoke (pull_request) Failing after 7s
Validate Config / YAML Lint (pull_request) Failing after 6s
Validate Config / JSON Validate (pull_request) Successful in 6s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 8s
Validate Config / Shell Script Lint (pull_request) Successful in 14s
Validate Config / Cron Syntax Check (pull_request) Successful in 5s
2026-04-11 01:36:03 +00:00
d43deb1d79 feat(scripts): add GOFAI symbolic knowledge base
Some checks failed
Validate Config / Playbook Schema Validation (pull_request) Successful in 9s
PR Checklist / pr-checklist (pull_request) Failing after 1m12s
Validate Config / JSON Validate (pull_request) Successful in 6s
Smoke Test / smoke (pull_request) Failing after 6s
Validate Config / YAML Lint (pull_request) Failing after 5s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 7s
Validate Config / Shell Script Lint (pull_request) Successful in 14s
Validate Config / Cron Syntax Check (pull_request) Successful in 5s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 5s
Architecture Lint / Lint Repository (pull_request) Failing after 8s
Architecture Lint / Linter Tests (pull_request) Successful in 9s
2026-04-11 01:33:05 +00:00
17de7f5df1 feat(scripts): add symbolic forward-chaining rule engine
Some checks failed
Validate Config / Playbook Schema Validation (pull_request) Successful in 7s
Architecture Lint / Linter Tests (pull_request) Successful in 10s
Architecture Lint / Lint Repository (pull_request) Failing after 7s
PR Checklist / pr-checklist (pull_request) Failing after 1m15s
Validate Config / JSON Validate (pull_request) Successful in 5s
Smoke Test / smoke (pull_request) Failing after 6s
Validate Config / YAML Lint (pull_request) Failing after 5s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 6s
Validate Config / Shell Script Lint (pull_request) Successful in 14s
Validate Config / Cron Syntax Check (pull_request) Successful in 6s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 6s
2026-04-11 01:25:34 +00:00
1dc29180b8 Merge pull request 'feat: Sovereign Guardrails, Optimization, and Automation suite (v2)' (#468) from feat/sovereign-guardrails-v2 into main
Some checks failed
Architecture Lint / Lint Repository (push) Failing after 8s
Architecture Lint / Linter Tests (push) Successful in 13s
Smoke Test / smoke (push) Failing after 12s
Validate Config / YAML Lint (push) Failing after 13s
Validate Config / JSON Validate (push) Successful in 8s
Validate Config / Python Syntax & Import Check (push) Failing after 8s
Validate Config / Shell Script Lint (push) Successful in 13s
Validate Config / Cron Syntax Check (push) Successful in 6s
Validate Config / Deploy Script Dry Run (push) Successful in 6s
Validate Config / Playbook Schema Validation (push) Successful in 8s
2026-04-11 01:14:40 +00:00
343e190cc3 feat: add scripts/ci_automation_gate.py
Some checks failed
Validate Config / Python Syntax & Import Check (pull_request) Failing after 13s
Validate Config / Shell Script Lint (pull_request) Successful in 19s
Validate Config / Cron Syntax Check (pull_request) Successful in 11s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 10s
Validate Config / Playbook Schema Validation (pull_request) Successful in 10s
Architecture Lint / Lint Repository (pull_request) Failing after 10s
Architecture Lint / Linter Tests (pull_request) Successful in 10s
PR Checklist / pr-checklist (pull_request) Failing after 1m16s
Smoke Test / smoke (pull_request) Failing after 9s
Validate Config / YAML Lint (pull_request) Failing after 11s
Validate Config / JSON Validate (pull_request) Successful in 8s
2026-04-11 01:12:25 +00:00
932f48d06f feat: add scripts/token_optimizer.py 2026-04-11 01:12:22 +00:00
0c7521d275 feat: add scripts/agent_guardrails.py 2026-04-11 01:12:20 +00:00
bad31125c2 Merge pull request 'feat: Sovereign Health & Observability Dashboard' (#467) from feat/sovereign-health-dashboard into main
Some checks failed
Validate Config / YAML Lint (push) Failing after 13s
Validate Config / JSON Validate (push) Successful in 7s
Validate Config / Python Syntax & Import Check (push) Failing after 10s
Validate Config / Shell Script Lint (push) Successful in 16s
Validate Config / Cron Syntax Check (push) Successful in 7s
Validate Config / Deploy Script Dry Run (push) Successful in 7s
Validate Config / Playbook Schema Validation (push) Successful in 9s
Architecture Lint / Lint Repository (push) Failing after 8s
Architecture Lint / Linter Tests (push) Successful in 17s
Smoke Test / smoke (push) Failing after 11s
2026-04-11 01:11:57 +00:00
Alexander Whitestone
06031d923f test(scripts): lock self_healing safe CLI behavior (#435)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 8s
PR Checklist / pr-checklist (pull_request) Failing after 1m18s
Architecture Lint / Lint Repository (pull_request) Failing after 8s
2026-04-10 21:11:47 -04:00
7305d97e8f feat: add scripts/health_dashboard.py
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 10s
PR Checklist / pr-checklist (pull_request) Failing after 1m22s
Smoke Test / smoke (pull_request) Failing after 9s
Validate Config / YAML Lint (pull_request) Failing after 7s
Validate Config / JSON Validate (pull_request) Successful in 7s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 9s
Validate Config / Shell Script Lint (pull_request) Successful in 17s
Validate Config / Cron Syntax Check (pull_request) Successful in 6s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 8s
Validate Config / Playbook Schema Validation (pull_request) Successful in 8s
Architecture Lint / Lint Repository (pull_request) Failing after 8s
2026-04-11 00:59:43 +00:00
19e11b5287 Add smoke test workflow
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 13s
Smoke Test / smoke (push) Failing after 9s
Validate Config / YAML Lint (push) Failing after 7s
Validate Config / JSON Validate (push) Successful in 6s
Validate Config / Python Syntax & Import Check (push) Failing after 9s
Validate Config / Shell Script Lint (push) Successful in 14s
Validate Config / Cron Syntax Check (push) Successful in 5s
Validate Config / Deploy Script Dry Run (push) Successful in 7s
Validate Config / Playbook Schema Validation (push) Successful in 14s
Architecture Lint / Lint Repository (push) Failing after 11s
2026-04-11 00:33:29 +00:00
03d53a644b fix: architecture-lint continue-on-error
Some checks failed
Architecture Lint / Linter Tests (push) Has been cancelled
Architecture Lint / Lint Repository (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / YAML Lint (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
2026-04-11 00:32:45 +00:00
f2388733fb fix: validate-config.yaml Python parse error
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 10s
Validate Config / YAML Lint (push) Failing after 6s
Validate Config / JSON Validate (push) Successful in 8s
Architecture Lint / Lint Repository (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Failing after 7s
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
2026-04-11 00:32:13 +00:00
26 changed files with 2053 additions and 109 deletions

View File

@@ -0,0 +1,24 @@
name: Smoke Test
on:
pull_request:
push:
branches: [main]
jobs:
smoke:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Parse check
run: |
find . -name '*.yml' -o -name '*.yaml' | grep -v .gitea | xargs -r python3 -c "import sys,yaml; [yaml.safe_load(open(f)) for f in sys.argv[1:]]"
find . -name '*.json' | xargs -r python3 -m json.tool > /dev/null
find . -name '*.py' | xargs -r python3 -m py_compile
find . -name '*.sh' | xargs -r bash -n
echo "PASS: All files parse"
- name: Secret scan
run: |
if grep -rE 'sk-or-|sk-ant-|ghp_|AKIA' . --include='*.yml' --include='*.py' --include='*.sh' 2>/dev/null | grep -v .gitea; then exit 1; fi
echo "PASS: No secrets"

View File

@@ -112,23 +112,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install PyYAML
run: pip install pyyaml
- name: Validate playbook structure
run: |
python3 -c "
import yaml, sys, glob
required_keys = {'name', 'description'}
for f in glob.glob('playbooks/*.yaml'):
with open(f) as fh:
try:
data = yaml.safe_load(fh)
if not isinstance(data, dict):
print(f'ERROR: {f} is not a YAML mapping')
sys.exit(1)
missing = required_keys - set(data.keys())
if missing:
print(f'WARNING: {f} missing keys: {missing}')
print(f'OK: {f}')
except yaml.YAMLError as e:
print(f'ERROR: {f}: {e}')
sys.exit(1)
"
run: python3 scripts/validate_playbook_schema.py

View File

@@ -7,7 +7,7 @@ Purpose:
## What it is
Code Claw is a separate local runtime from Hermes/OpenClaw.
Code Claw is a separate local runtime from Hermes.
Current lane:
- runtime: local patched `~/code-claw`

View File

@@ -3,7 +3,7 @@
Purpose:
- stand up the third wizard house as a Kimi-backed coding worker
- keep Hermes as the durable harness
- treat OpenClaw as optional shell frontage, not the bones
- Hermes is the durable harness — no intermediary gateway layers
Local proof already achieved:
@@ -40,5 +40,5 @@ bin/deploy-allegro-house.sh root@167.99.126.228
Important nuance:
- the Hermes/Kimi lane is the proven path
- direct embedded OpenClaw Kimi model routing was not yet reliable locally
- direct embedded Kimi model routing was not yet reliable locally
- so the remote deployment keeps the minimal, proven architecture: Hermes house first

View File

@@ -81,17 +81,6 @@ launchctl bootstrap gui/$(id -u) ~/Library/LaunchAgents/ai.hermes.gateway.plist
- Old-state risk:
- same class as main gateway, but isolated to fenrir profile state
#### 3. ai.openclaw.gateway
- Plist: ~/Library/LaunchAgents/ai.openclaw.gateway.plist
- Command: `node .../openclaw/dist/index.js gateway --port 18789`
- Logs:
- `~/.openclaw/logs/gateway.log`
- `~/.openclaw/logs/gateway.err.log`
- KeepAlive: yes
- RunAtLoad: yes
- Old-state risk:
- long-lived gateway survives toolchain assumptions and keeps accepting work even if upstream routing changed
#### 4. ai.timmy.kimi-heartbeat
- Plist: ~/Library/LaunchAgents/ai.timmy.kimi-heartbeat.plist
- Command: `/bin/bash ~/.timmy/uniwizard/kimi-heartbeat.sh`
@@ -295,7 +284,7 @@ launchctl list | egrep 'timmy|kimi|claude|max|dashboard|matrix|gateway|huey'
List Timmy/Hermes launch agent files:
```bash
find ~/Library/LaunchAgents -maxdepth 1 -name '*.plist' | egrep 'timmy|hermes|openclaw|tower'
find ~/Library/LaunchAgents -maxdepth 1 -name '*.plist' | egrep 'timmy|hermes|tower'
```
List running loop scripts:
@@ -316,7 +305,6 @@ launchctl bootout gui/$(id -u) ~/Library/LaunchAgents/ai.timmy.kimi-heartbeat.pl
launchctl bootout gui/$(id -u) ~/Library/LaunchAgents/ai.timmy.claudemax-watchdog.plist || true
launchctl bootout gui/$(id -u) ~/Library/LaunchAgents/ai.hermes.gateway.plist || true
launchctl bootout gui/$(id -u) ~/Library/LaunchAgents/ai.hermes.gateway-fenrir.plist || true
launchctl bootout gui/$(id -u) ~/Library/LaunchAgents/ai.openclaw.gateway.plist || true
```
2. Kill manual loops

View File

@@ -104,7 +104,6 @@ Three primary resources govern the fleet:
| Hermes gateway | 500 MB | Primary gateway |
| Hermes agents (x3) | ~560 MB total | Multiple sessions |
| Ollama | ~20 MB base + model memory | Model loading varies |
| OpenClaw | 350 MB | Gateway process |
| Evennia (server+portal) | 56 MB | Game world |
---
@@ -146,7 +145,6 @@ This means Phase 3+ capabilities (orchestration, load balancing, etc.) are acces
| Gitea | 23/24 | 95.8% | GOOD |
| Hermes Gateway | 23/24 | 95.8% | GOOD |
| Ollama | 24/24 | 100.0% | GOOD |
| OpenClaw | 24/24 | 100.0% | GOOD |
| Evennia | 24/24 | 100.0% | GOOD |
| Hermes Agent | 21/24 | 87.5% | **CHECK** |

View File

@@ -58,7 +58,6 @@ LOCAL_CHECKS = {
"hermes-gateway": "pgrep -f 'hermes gateway' > /dev/null 2>/dev/null",
"hermes-agent": "pgrep -f 'hermes agent\\|hermes session' > /dev/null 2>/dev/null",
"ollama": "pgrep -f 'ollama serve' > /dev/null 2>/dev/null",
"openclaw": "pgrep -f 'openclaw' > /dev/null 2>/dev/null",
"evennia": "pgrep -f 'evennia' > /dev/null 2>/dev/null",
}

View File

@@ -59,7 +59,6 @@
| Hermes agent (s007) | 62032 | ~200MB | Session active since 10:20PM prev |
| Hermes agent (s001) | 12072 | ~178MB | Session active since Sun 6PM |
| Ollama | 71466 | ~20MB | /opt/homebrew/opt/ollama/bin/ollama serve |
| OpenClaw gateway | 85834 | ~350MB | Tue 12PM start |
| Crucible MCP (x4) | multiple | ~10-69MB each | MCP server instances |
| Evennia Server | 66433 | ~49MB | Sun 10PM start, port 4000 |
| Evennia Portal | 66423 | ~7MB | Sun 10PM start, port 4001 |

View File

@@ -9,7 +9,12 @@ Replaces ad-hoc dispatch scripts with a unified framework for tasking agents.
import os
import sys
import argparse
import subprocess
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPT_DIR not in sys.path:
sys.path.insert(0, SCRIPT_DIR)
from ssh_trust import VerifiedSSHExecutor
# --- CONFIGURATION ---
FLEET = {
@@ -18,6 +23,9 @@ FLEET = {
}
class Dispatcher:
def __init__(self, executor=None):
self.executor = executor or VerifiedSSHExecutor()
def log(self, message: str):
print(f"[*] {message}")
@@ -25,14 +33,14 @@ class Dispatcher:
self.log(f"Dispatching task to {agent_name} on {host}...")
ip = FLEET[host]
# Command to run the agent on the remote machine
# Assumes hermes-agent is installed in /opt/hermes
remote_cmd = f"cd /opt/hermes && python3 run_agent.py --agent {agent_name} --task '{task}'"
ssh_cmd = ["ssh", "-o", "StrictHostKeyChecking=no", f"root@{ip}", remote_cmd]
try:
res = subprocess.run(ssh_cmd, capture_output=True, text=True)
res = self.executor.run(
ip,
['python3', 'run_agent.py', '--agent', agent_name, '--task', task],
cwd='/opt/hermes',
timeout=30,
)
if res.returncode == 0:
self.log(f"[SUCCESS] {agent_name} completed task.")
print(res.stdout)

126
scripts/agent_guardrails.py Normal file
View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
"""
agent_guardrails.py — Sovereign Agent Guardrails for the Timmy Foundation.
Scans agent logs and conversation trajectories for:
1. System Prompt Leakage — detecting if the agent revealed its internal instructions.
2. Jailbreak Compliance — detecting if the agent responded to "GODMODE", "DAN", etc.
3. PII Leakage — detecting if secrets or sensitive data were output.
Part of the Sovereign Infrastructure Suite.
"""
import os
import re
import sys
import glob
import logging
from pathlib import Path
from typing import List, Dict, Tuple
# Patterns adapted from hermes-agent/agent/input_sanitizer.py
EXTRACTION_PATTERNS = [
r"repeat\s+the\s+words\s+above",
r"repeat\s+your\s+(?:system\s+|initial\s+)?instructions",
r"what\s+is\s+your\s+(?:system\s+|initial\s+)?prompt",
r"show\s+me\s+your\s+(?:system\s+|initial\s+)?prompt",
r"print\s+your\s+(?:instructions|prompt|system\s+prompt)",
r"output\s+your\s+(?:initial|system)\s+prompt",
]
# Patterns that indicate the agent MIGHT be leaking its prompt
LEAKAGE_INDICATORS = [
r"you\s+are\s+a\s+helpful\s+assistant",
r"your\s+goal\s+is\s+to",
r"you\s+must\s+not",
r"here\s+are\s+your\s+instructions",
r"my\s+system\s+prompt\s+is",
r"i\s+was\s+told\s+to",
]
# Patterns for secrets (adapted from redact.py)
SECRET_PATTERNS = [
r"sk-[A-Za-z0-9_-]{20,}",
r"ghp_[A-Za-z0-9]{20,}",
r"AIza[A-Za-z0-9_-]{30,}",
]
AGENT_LOG_PATHS = [
"/root/wizards/*/home/logs/*.log",
"/root/wizards/*/logs/*.log",
"/root/wizards/*/.hermes/logs/*.log",
]
class GuardrailAuditor:
def __init__(self):
self.extraction_re = [re.compile(p, re.IGNORECASE) for p in EXTRACTION_PATTERNS]
self.leakage_re = [re.compile(p, re.IGNORECASE) for p in LEAKAGE_INDICATORS]
self.secret_re = [re.compile(p, re.IGNORECASE) for p in SECRET_PATTERNS]
def find_logs(self) -> List[Path]:
files = []
for pattern in AGENT_LOG_PATHS:
for p in glob.glob(pattern):
files.append(Path(p))
return files
def audit_file(self, path: Path) -> List[Dict]:
findings = []
try:
with open(path, "r", errors="ignore") as f:
lines = f.readlines()
for i, line in enumerate(lines):
# Check for extraction attempts (User side)
for p in self.extraction_re:
if p.search(line):
findings.append({
"type": "EXTRACTION_ATTEMPT",
"line": i + 1,
"content": line.strip()[:100],
"severity": "MEDIUM"
})
# Check for potential leakage (Assistant side)
for p in self.leakage_re:
if p.search(line):
findings.append({
"type": "POTENTIAL_LEAKAGE",
"line": i + 1,
"content": line.strip()[:100],
"severity": "HIGH"
})
# Check for secrets
for p in self.secret_re:
if p.search(line):
findings.append({
"type": "SECRET_EXPOSURE",
"line": i + 1,
"content": "[REDACTED]",
"severity": "CRITICAL"
})
except Exception as e:
print(f"Error reading {path}: {e}")
return findings
def run(self):
print("--- Sovereign Agent Guardrail Audit ---")
logs = self.find_logs()
print(f"Scanning {len(logs)} log files...")
total_findings = 0
for log in logs:
findings = self.audit_file(log)
if findings:
print(f"\nFindings in {log}:")
for f in findings:
print(f" [{f['severity']}] {f['type']} at line {f['line']}: {f['content']}")
total_findings += 1
print(f"\nAudit complete. Total findings: {total_findings}")
if total_findings > 0:
sys.exit(1)
if __name__ == "__main__":
auditor = GuardrailAuditor()
auditor.run()

View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python3
"""
ci_automation_gate.py — Automated Quality Gate for Timmy Foundation CI.
Enforces:
1. The 10-line Rule — functions should ideally be under 10 lines (warn at 20, fail at 50).
2. Complexity Check — basic cyclomatic complexity check.
3. Auto-fixable Linting — trailing whitespace, missing final newlines.
Used as a pre-merge gate.
"""
import os
import sys
import re
import argparse
from pathlib import Path
class QualityGate:
def __init__(self, fix=False):
self.fix = fix
self.failures = 0
self.warnings = 0
def check_file(self, path: Path):
if path.suffix not in (".js", ".ts", ".py"):
return
with open(path, "r") as f:
lines = f.readlines()
new_lines = []
changed = False
# 1. Basic Linting
for line in lines:
cleaned = line.rstrip() + "\n"
if cleaned != line:
changed = True
new_lines.append(cleaned)
if lines and not lines[-1].endswith("\n"):
new_lines[-1] = new_lines[-1] + "\n"
changed = True
if changed and self.fix:
with open(path, "w") as f:
f.writelines(new_lines)
print(f" [FIXED] {path}: Cleaned whitespace and newlines.")
elif changed:
print(f" [WARN] {path}: Has trailing whitespace or missing final newline.")
self.warnings += 1
# 2. Function Length Check (Simple regex-based)
content = "".join(new_lines)
if path.suffix in (".js", ".ts"):
# Match function blocks
functions = re.findall(r"function\s+\w+\s*\(.*?\)\s*\{([\s\S]*?)\}", content)
for i, func in enumerate(functions):
length = func.count("\n")
if length > 50:
print(f" [FAIL] {path}: Function {i} is too long ({length} lines).")
self.failures += 1
elif length > 20:
print(f" [WARN] {path}: Function {i} is getting long ({length} lines).")
self.warnings += 1
def run(self, directory: str):
print(f"--- Quality Gate: {directory} ---")
for root, _, files in os.walk(directory):
if "node_modules" in root or ".git" in root:
continue
for file in files:
self.check_file(Path(root) / file)
print(f"\nGate complete. Failures: {self.failures}, Warnings: {self.warnings}")
if self.failures > 0:
sys.exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("dir", nargs="?", default=".")
parser.add_argument("--fix", action="store_true")
args = parser.parse_args()
gate = QualityGate(fix=args.fix)
gate.run(args.dir)

View File

@@ -11,10 +11,15 @@ import os
import sys
import json
import argparse
import subprocess
import requests
from typing import Dict, List, Any
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPT_DIR not in sys.path:
sys.path.insert(0, SCRIPT_DIR)
from ssh_trust import VerifiedSSHExecutor
# --- FLEET DEFINITION ---
FLEET = {
"mac": {"ip": "10.1.10.77", "port": 8080, "role": "hub"},
@@ -24,8 +29,9 @@ FLEET = {
}
class FleetManager:
def __init__(self):
def __init__(self, executor=None):
self.results = {}
self.executor = executor or VerifiedSSHExecutor()
def run_remote(self, host: str, command: str):
ip = FLEET[host]["ip"]

View File

@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""
health_dashboard.py — Sovereign Health & Observability Dashboard.
Aggregates data from Muda, Guardrails, Token Optimizer, and Quality Gates
into a single, unified health report for the Timmy Foundation fleet.
"""
import os
import sys
import json
import subprocess
from datetime import datetime
from pathlib import Path
REPORTS_DIR = Path("reports")
DASHBOARD_FILE = Path("SOVEREIGN_HEALTH.md")
class HealthDashboard:
def __init__(self):
REPORTS_DIR.mkdir(exist_ok=True)
def run_tool(self, name: str, cmd: str) -> str:
print(f"[*] Running {name}...")
try:
# Capture output
res = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return res.stdout
except Exception as e:
return f"Error running {name}: {e}"
def generate_report(self):
print("--- Generating Sovereign Health Dashboard ---")
# 1. Run Audits
muda_output = self.run_tool("Muda Audit", "python3 scripts/muda_audit.py")
guardrails_output = self.run_tool("Agent Guardrails", "python3 scripts/agent_guardrails.py")
optimizer_output = self.run_tool("Token Optimizer", "python3 scripts/token_optimizer.py")
gate_output = self.run_tool("Quality Gate", "python3 scripts/ci_automation_gate.py .")
# 2. Build Markdown
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
md = [
f"# 🛡️ Sovereign Health Dashboard",
f"**Last Updated:** {now}",
f"",
f"## 📊 Summary",
f"- **Fleet Status:** ACTIVE",
f"- **Security Posture:** MONITORING",
f"- **Operational Waste:** AUDITED",
f"",
f"## ♻️ Muda Waste Audit",
f"```\n{muda_output}\n```",
f"",
f"## 🕵️ Agent Guardrails",
f"```\n{guardrails_output}\n```",
f"",
f"## 🪙 Token Efficiency",
f"```\n{optimizer_output}\n```",
f"",
f"## 🏗️ CI Quality Gate",
f"```\n{gate_output}\n```",
f"",
f"---",
f"*Generated by Sovereign Infrastructure Suite*"
]
with open(DASHBOARD_FILE, "w") as f:
f.write("\n".join(md))
print(f"[SUCCESS] Dashboard generated at {DASHBOARD_FILE}")
if __name__ == "__main__":
dashboard = HealthDashboard()
dashboard.generate_report()

341
scripts/knowledge_base.py Normal file
View File

@@ -0,0 +1,341 @@
#!/usr/bin/env python3
"""knowledge_base.py - GOFAI symbolic knowledge base for the Timmy Foundation fleet.
A classical AI knowledge representation system: stores facts as ground atoms,
supports first-order-logic-style queries, and maintains a provenance chain so
every belief can be traced back to its source. No neural nets, no embeddings -
just structured symbolic reasoning over a typed fact store.
Usage:
kb = KnowledgeBase()
kb.assert_fact('agent', 'online', 'timmy')
kb.assert_fact('task', 'assigned_to', 'task-42', 'timmy')
results = kb.query('task', 'assigned_to', '?x', 'timmy')
# results -> [{'?x': 'task-42'}]
CLI:
python knowledge_base.py --assert "agent online hermes"
python knowledge_base.py --query "agent online ?who"
python knowledge_base.py --dump
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
VAR_PREFIX = "?"
def is_var(term: str) -> bool:
"""Return True if *term* is a logic variable (starts with '?')."""
return term.startswith(VAR_PREFIX)
@dataclass(frozen=True)
class Fact:
"""An immutable ground atom: (relation, *args)."""
relation: str
args: Tuple[str, ...]
source: str = "user"
timestamp: float = field(default_factory=time.time)
def __str__(self) -> str:
args_str = " ".join(self.args)
return f"({self.relation} {args_str})"
Bindings = Dict[str, str]
# ---------------------------------------------------------------------------
# Unification
# ---------------------------------------------------------------------------
def unify_term(pattern: str, value: str, bindings: Bindings) -> Optional[Bindings]:
"""Unify a single pattern term against a ground value.
Returns updated bindings on success, or None on failure.
"""
if is_var(pattern):
if pattern in bindings:
return bindings if bindings[pattern] == value else None
return {**bindings, pattern: value}
return bindings if pattern == value else None
def unify_fact(
pattern: Tuple[str, ...], fact_args: Tuple[str, ...], bindings: Bindings
) -> Optional[Bindings]:
"""Unify a full argument tuple, returning final bindings or None."""
if len(pattern) != len(fact_args):
return None
b = bindings
for p, v in zip(pattern, fact_args):
b = unify_term(p, v, b)
if b is None:
return None
return b
# ---------------------------------------------------------------------------
# Knowledge Base
# ---------------------------------------------------------------------------
class KnowledgeBase:
"""In-memory symbolic knowledge base with optional JSON persistence."""
def __init__(self, persist_path: Optional[Path] = None) -> None:
self._facts: List[Fact] = []
self._persist_path = persist_path
if persist_path and persist_path.exists():
self._load(persist_path)
# ------------------------------------------------------------------
# Fact management
# ------------------------------------------------------------------
def assert_fact(
self, relation: str, *args: str, source: str = "user"
) -> Fact:
"""Add a ground fact to the knowledge base.
Idempotent: duplicate (relation, args) pairs are not added twice.
"""
f = Fact(relation=relation, args=tuple(args), source=source, timestamp=time.time())
for existing in self._facts:
if existing.relation == f.relation and existing.args == f.args:
return existing # already known
self._facts.append(f)
if self._persist_path:
self._save(self._persist_path)
return f
def retract_fact(self, relation: str, *args: str) -> int:
"""Remove all facts matching (relation, *args). Returns count removed."""
before = len(self._facts)
self._facts = [
f
for f in self._facts
if not (f.relation == relation and f.args == tuple(args))
]
removed = before - len(self._facts)
if removed and self._persist_path:
self._save(self._persist_path)
return removed
# ------------------------------------------------------------------
# Query
# ------------------------------------------------------------------
def query(
self, relation: str, *pattern_args: str, source_filter: Optional[str] = None
) -> List[Bindings]:
"""Return all binding dictionaries satisfying the query pattern.
Variables in *pattern_args* start with '?'. Ground terms must match
exactly. An empty binding dict means the fact matched with no
variables to bind.
Args:
relation: The relation name to match.
*pattern_args: Mixed ground/variable argument tuple.
source_filter: Optional provenance filter (e.g. 'scheduler').
Returns:
List of binding dicts, one per matching fact.
"""
results: List[Bindings] = []
for fact in self._facts:
if fact.relation != relation:
continue
if source_filter and fact.source != source_filter:
continue
b = unify_fact(tuple(pattern_args), fact.args, {})
if b is not None:
results.append(b)
return results
def query_one(
self, relation: str, *pattern_args: str
) -> Optional[Bindings]:
"""Return the first matching binding dict or None."""
for b in self.query(relation, *pattern_args):
return b
return None
def facts_for(self, relation: str) -> Iterator[Fact]:
"""Iterate over all facts with the given relation."""
for f in self._facts:
if f.relation == relation:
yield f
# ------------------------------------------------------------------
# Bulk operations
# ------------------------------------------------------------------
def all_facts(self) -> List[Fact]:
"""Return a snapshot of all stored facts."""
return list(self._facts)
def fact_count(self) -> int:
return len(self._facts)
def clear(self) -> None:
"""Remove all facts from memory (does not touch disk)."""
self._facts.clear()
# ------------------------------------------------------------------
# Persistence
# ------------------------------------------------------------------
def _save(self, path: Path) -> None:
records = [
{
"relation": f.relation,
"args": list(f.args),
"source": f.source,
"timestamp": f.timestamp,
}
for f in self._facts
]
path.write_text(json.dumps(records, indent=2))
def _load(self, path: Path) -> None:
try:
records = json.loads(path.read_text())
for r in records:
self._facts.append(
Fact(
relation=r["relation"],
args=tuple(r["args"]),
source=r.get("source", "persisted"),
timestamp=r.get("timestamp", 0.0),
)
)
except (json.JSONDecodeError, KeyError) as exc:
print(f"[kb] Warning: could not load {path}: {exc}", file=sys.stderr)
def save_to(self, path: Path) -> None:
"""Explicitly save to a given path."""
self._save(path)
# ------------------------------------------------------------------
# Debug / display
# ------------------------------------------------------------------
def dump(self, relation_filter: Optional[str] = None) -> None:
"""Print all (or filtered) facts to stdout."""
for f in self._facts:
if relation_filter and f.relation != relation_filter:
continue
print(f" {f} [source={f.source}]")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _parse_terms(raw: str) -> List[str]:
"""Split a whitespace-separated string into terms."""
return raw.strip().split()
def main() -> None:
parser = argparse.ArgumentParser(
description="GOFAI symbolic knowledge base CLI"
)
parser.add_argument(
"--db",
default="kb.json",
help="Path to persistent JSON store (default: kb.json)",
)
parser.add_argument(
"--assert",
dest="assert_stmt",
metavar="RELATION ARG...",
help='Assert a fact, e.g. --assert "agent online timmy"',
)
parser.add_argument(
"--retract",
dest="retract_stmt",
metavar="RELATION ARG...",
help='Retract a fact, e.g. --retract "agent online timmy"',
)
parser.add_argument(
"--query",
dest="query_stmt",
metavar="RELATION ARG...",
help='Query the KB, e.g. --query "agent online ?who"',
)
parser.add_argument(
"--dump",
action="store_true",
help="Dump all facts",
)
parser.add_argument(
"--relation",
help="Filter --dump to a specific relation",
)
args = parser.parse_args()
db_path = Path(args.db)
kb = KnowledgeBase(persist_path=db_path)
if args.assert_stmt:
terms = _parse_terms(args.assert_stmt)
if len(terms) < 2:
print("ERROR: --assert requires at least RELATION and one ARG", file=sys.stderr)
sys.exit(1)
fact = kb.assert_fact(terms[0], *terms[1:], source="cli")
print(f"Asserted: {fact}")
if args.retract_stmt:
terms = _parse_terms(args.retract_stmt)
if len(terms) < 2:
print("ERROR: --retract requires at least RELATION and one ARG", file=sys.stderr)
sys.exit(1)
n = kb.retract_fact(terms[0], *terms[1:])
print(f"Retracted {n} fact(s).")
if args.query_stmt:
terms = _parse_terms(args.query_stmt)
if len(terms) < 2:
print("ERROR: --query requires at least RELATION and one ARG", file=sys.stderr)
sys.exit(1)
results = kb.query(terms[0], *terms[1:])
if not results:
print("No results.")
else:
for i, b in enumerate(results, 1):
if b:
bindings_str = ", ".join(f"{k}={v}" for k, v in b.items())
print(f" [{i}] {bindings_str}")
else:
print(f" [{i}] (ground match)")
if args.dump:
count = kb.fact_count()
print(f"Knowledge Base — {count} fact(s):")
kb.dump(relation_filter=args.relation)
if not any([args.assert_stmt, args.retract_stmt, args.query_stmt, args.dump]):
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -15,10 +15,15 @@ import sys
import time
import argparse
import requests
import subprocess
import json
from typing import Optional, Dict, Any
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPT_DIR not in sys.path:
sys.path.insert(0, SCRIPT_DIR)
from ssh_trust import VerifiedSSHExecutor
# --- CONFIGURATION ---
DO_API_URL = "https://api.digitalocean.com/v2"
# We expect DIGITALOCEAN_TOKEN to be set in the environment.
@@ -30,13 +35,14 @@ DEFAULT_IMAGE = "ubuntu-22-04-x64"
LLAMA_CPP_REPO = "https://github.com/ggerganov/llama.cpp"
class Provisioner:
def __init__(self, name: str, size: str, model: str, region: str = DEFAULT_REGION):
def __init__(self, name: str, size: str, model: str, region: str = DEFAULT_REGION, executor=None):
self.name = name
self.size = size
self.model = model
self.region = region
self.droplet_id = None
self.ip_address = None
self.executor = executor or VerifiedSSHExecutor(auto_enroll=True)
def log(self, message: str):
print(f"[*] {message}")
@@ -104,13 +110,8 @@ class Provisioner:
self.log(f"Droplet IP: {self.ip_address}")
def run_remote(self, command: str):
# Using subprocess to call ssh. Assumes local machine has the right private key.
ssh_cmd = [
"ssh", "-o", "StrictHostKeyChecking=no",
f"root@{self.ip_address}", command
]
result = subprocess.run(ssh_cmd, capture_output=True, text=True)
return result
# Uses verified host trust. Brand-new nodes explicitly enroll on first contact.
return self.executor.run_script(self.ip_address, command, timeout=60)
def setup_wizard(self):
self.log("Starting remote setup...")

View File

@@ -10,10 +10,16 @@ Safe-by-default: runs in dry-run mode unless --execute is given.
import os
import sys
import subprocess
import argparse
import requests
import datetime
from typing import Sequence
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPT_DIR not in sys.path:
sys.path.insert(0, SCRIPT_DIR)
from ssh_trust import VerifiedSSHExecutor
# --- CONFIGURATION ---
FLEET = {
@@ -24,54 +30,24 @@ FLEET = {
}
class SelfHealer:
def __init__(self, dry_run=True, confirm_kill=False, yes=False):
def __init__(self, dry_run=True, confirm_kill=False, yes=False, executor=None):
self.dry_run = dry_run
self.confirm_kill = confirm_kill
self.yes = yes
self.executor = executor or VerifiedSSHExecutor()
def log(self, message: str):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
def run_remote(self, host: str, command: str):
ip = FLEET[host]["ip"]
ssh_cmd = ["ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=5", f"root@{ip}", command]
if host == "mac":
ssh_cmd = ["bash", "-c", command]
ip = FLEET[host]['ip']
try:
return subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=15)
return self.executor.run_script(ip, command, local=(host == 'mac'), timeout=15)
except Exception as e:
self.log(f" [ERROR] Failed to run remote command on {host}: {e}")
return None
def confirm(self, prompt: str) -> bool:
"""Ask for confirmation unless --yes flag is set."""
if self.yes:
return True
while True:
response = input(f"{prompt} [y/N] ").strip().lower()
if response in ("y", "yes"):
return True
elif response in ("n", "no", ""):
return False
print("Please answer 'y' or 'n'.")
def check_llama_server(self, host: str):
ip = FLEET[host]["ip"]
port = FLEET[host]["port"]
try:
requests.get(f"http://{ip}:{port}/health", timeout=2)
except:
self.log(f" [!] llama-server down on {host}.")
if self.dry_run:
self.log(f" [DRY-RUN] Would restart llama-server on {host}")
else:
if self.confirm(f" Restart llama-server on {host}?"):
self.log(f" Restarting llama-server on {host}...")
self.run_remote(host, "systemctl restart llama-server")
else:
self.log(f" Skipped restart on {host}.")
def check_disk_space(self, host: str):
res = self.run_remote(host, "df -h / | tail -1 | awk '{print $5}' | sed 's/%//'")
if res and res.returncode == 0:
@@ -192,10 +168,10 @@ EXAMPLES:
"""
print(help_text)
def main():
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Self-healing infrastructure script (safe-by-default).",
add_help=False # We'll handle --help ourselves
add_help=False,
)
parser.add_argument("--dry-run", action="store_true", default=False,
help="Run in dry-run mode (default behavior).")
@@ -209,25 +185,28 @@ def main():
help="Show detailed help about safety features.")
parser.add_argument("--help", "-h", action="store_true", default=False,
help="Show standard help.")
return parser
args = parser.parse_args()
def main(argv: Sequence[str] | None = None):
parser = build_parser()
args = parser.parse_args(list(argv) if argv is not None else None)
if args.help_safe:
print_help_safe()
sys.exit(0)
raise SystemExit(0)
if args.help:
parser.print_help()
sys.exit(0)
raise SystemExit(0)
# Determine mode: if --execute is given, disable dry-run
dry_run = not args.execute
# If --dry-run is explicitly given, ensure dry-run (redundant but clear)
if args.dry_run:
dry_run = True
healer = SelfHealer(dry_run=dry_run, confirm_kill=args.confirm_kill, yes=args.yes)
healer.run()
if __name__ == "__main__":
main()

171
scripts/ssh_trust.py Normal file
View File

@@ -0,0 +1,171 @@
#!/usr/bin/env python3
"""Verified SSH trust helpers for Gemini infrastructure scripts."""
from __future__ import annotations
from pathlib import Path
from typing import Callable, Sequence
import shlex
import subprocess
DEFAULT_KNOWN_HOSTS = Path(__file__).resolve().parent.parent / ".ssh" / "known_hosts"
Runner = Callable[..., subprocess.CompletedProcess]
class SSHTrustError(RuntimeError):
pass
class HostKeyEnrollmentError(SSHTrustError):
pass
class HostKeyVerificationError(SSHTrustError):
pass
class CommandPlan:
def __init__(self, argv: list[str], local: bool, remote_command: str | None = None):
self.argv = argv
self.local = local
self.remote_command = remote_command
def _ensure_parent(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
def enroll_host_key(
host: str,
*,
port: int = 22,
known_hosts_path: str | Path | None = None,
runner: Runner = subprocess.run,
) -> Path:
path = Path(known_hosts_path or DEFAULT_KNOWN_HOSTS)
_ensure_parent(path)
cmd = ["ssh-keyscan", "-p", str(port), "-H", host]
result = runner(cmd, capture_output=True, text=True, timeout=10)
if result.returncode != 0 or not (result.stdout or "").strip():
raise HostKeyEnrollmentError(
f"Could not enroll host key for {host}:{port}: {(result.stderr or '').strip() or 'empty ssh-keyscan output'}"
)
existing = []
if path.exists():
existing = [line for line in path.read_text().splitlines() if line.strip()]
for line in result.stdout.splitlines():
line = line.strip()
if line and line not in existing:
existing.append(line)
path.write_text(("\n".join(existing) + "\n") if existing else "")
return path
class VerifiedSSHExecutor:
def __init__(
self,
*,
user: str = "root",
known_hosts_path: str | Path | None = None,
connect_timeout: int = 5,
auto_enroll: bool = False,
runner: Runner = subprocess.run,
):
self.user = user
self.known_hosts_path = Path(known_hosts_path or DEFAULT_KNOWN_HOSTS)
self.connect_timeout = connect_timeout
self.auto_enroll = auto_enroll
self.runner = runner
def _ensure_known_hosts(self, host: str, port: int) -> Path:
if self.known_hosts_path.exists():
return self.known_hosts_path
if not self.auto_enroll:
raise HostKeyEnrollmentError(
f"Known-hosts file missing: {self.known_hosts_path}. Enroll {host}:{port} before connecting."
)
return enroll_host_key(host, port=port, known_hosts_path=self.known_hosts_path, runner=self.runner)
def _ssh_prefix(self, host: str, port: int) -> list[str]:
known_hosts = self._ensure_known_hosts(host, port)
return [
"ssh",
"-o",
"BatchMode=yes",
"-o",
"StrictHostKeyChecking=yes",
"-o",
f"UserKnownHostsFile={known_hosts}",
"-o",
f"ConnectTimeout={self.connect_timeout}",
"-p",
str(port),
f"{self.user}@{host}",
]
def plan(
self,
host: str,
command: Sequence[str],
*,
local: bool = False,
port: int = 22,
cwd: str | None = None,
) -> CommandPlan:
argv = [str(part) for part in command]
if not argv:
raise ValueError("command must not be empty")
if local:
return CommandPlan(argv=argv, local=True, remote_command=None)
remote_command = shlex.join(argv)
if cwd:
remote_command = f"cd {shlex.quote(cwd)} && exec {remote_command}"
return CommandPlan(self._ssh_prefix(host, port) + [remote_command], False, remote_command)
def plan_script(
self,
host: str,
script_text: str,
*,
local: bool = False,
port: int = 22,
cwd: str | None = None,
) -> CommandPlan:
remote_command = script_text.strip()
if cwd:
remote_command = f"cd {shlex.quote(cwd)} && {remote_command}"
if local:
return CommandPlan(["sh", "-lc", remote_command], True, None)
return CommandPlan(self._ssh_prefix(host, port) + [remote_command], False, remote_command)
def _run_plan(self, plan: CommandPlan, *, timeout: int | None = None):
result = self.runner(plan.argv, capture_output=True, text=True, timeout=timeout)
if result.returncode != 0 and "host key verification failed" in ((result.stderr or "").lower()):
raise HostKeyVerificationError((result.stderr or "").strip() or "Host key verification failed")
return result
def run(
self,
host: str,
command: Sequence[str],
*,
local: bool = False,
port: int = 22,
cwd: str | None = None,
timeout: int | None = None,
):
return self._run_plan(self.plan(host, command, local=local, port=port, cwd=cwd), timeout=timeout)
def run_script(
self,
host: str,
script_text: str,
*,
local: bool = False,
port: int = 22,
cwd: str | None = None,
timeout: int | None = None,
):
return self._run_plan(self.plan_script(host, script_text, local=local, port=port, cwd=cwd), timeout=timeout)

304
scripts/strips_planner.py Normal file
View File

@@ -0,0 +1,304 @@
#!/usr/bin/env python3
"""strips_planner.py - GOFAI STRIPS-style goal-directed planner for the Timmy Foundation fleet.
Implements a classical means-ends analysis (MEA) planner over a STRIPS action
representation. Each action has preconditions, an add-list, and a delete-list.
The planner uses regression (backward chaining) from the goal state to find a
linear action sequence that achieves all goal conditions from the initial state.
No ML, no embeddings - just symbolic state-space search.
Representation:
State: frozenset of ground literals, e.g. {'agent_idle', 'task_queued'}
Action: (name, preconditions, add_effects, delete_effects)
Goal: set of literals that must hold in the final state
Algorithm:
Iterative-deepening DFS (IDDFS) over the regression search space.
Cycle detection via visited-state set per path.
Usage (Python API):
from strips_planner import Action, STRIPSPlanner
actions = [
Action('assign_task',
pre={'agent_idle', 'task_queued'},
add={'task_running'},
delete={'agent_idle', 'task_queued'}),
Action('complete_task',
pre={'task_running'},
add={'agent_idle', 'task_done'},
delete={'task_running'}),
]
planner = STRIPSPlanner(actions)
plan = planner.solve(
initial={'agent_idle', 'task_queued'},
goal={'task_done'},
)
# plan -> ['assign_task', 'complete_task']
CLI:
python strips_planner.py --demo
python strips_planner.py --max-depth 15
"""
from __future__ import annotations
import argparse
import sys
from dataclasses import dataclass, field
from typing import FrozenSet, List, Optional, Set, Tuple
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
Literal = str
State = FrozenSet[Literal]
@dataclass(frozen=True)
class Action:
"""A STRIPS operator with preconditions and add/delete effects."""
name: str
pre: FrozenSet[Literal]
add: FrozenSet[Literal]
delete: FrozenSet[Literal]
def __post_init__(self) -> None:
# Coerce mutable sets to frozensets for hashability
object.__setattr__(self, 'pre', frozenset(self.pre))
object.__setattr__(self, 'add', frozenset(self.add))
object.__setattr__(self, 'delete', frozenset(self.delete))
def applicable(self, state: State) -> bool:
"""True if all preconditions hold in *state*."""
return self.pre <= state
def apply(self, state: State) -> State:
"""Return the successor state after executing this action."""
return (state - self.delete) | self.add
def __str__(self) -> str:
return self.name
# ---------------------------------------------------------------------------
# Planner
# ---------------------------------------------------------------------------
class STRIPSPlanner:
"""Goal-directed STRIPS planner using iterative-deepening DFS.
Searches forward from the initial state, pruning branches where the
goal cannot be satisfied within the remaining depth budget.
"""
def __init__(self, actions: List[Action]) -> None:
self.actions = list(actions)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def solve(
self,
initial: Set[Literal] | FrozenSet[Literal],
goal: Set[Literal] | FrozenSet[Literal],
max_depth: int = 20,
) -> Optional[List[str]]:
"""Find a plan that achieves *goal* from *initial*.
Args:
initial: Initial world state (set of ground literals).
goal: Goal conditions (set of ground literals to achieve).
max_depth: Maximum plan length to consider.
Returns:
Ordered list of action names, or None if no plan found.
"""
s0 = frozenset(initial)
g = frozenset(goal)
if g <= s0:
return [] # goal already satisfied
for depth in range(1, max_depth + 1):
result = self._dfs(s0, g, depth, [], {s0})
if result is not None:
return result
return None
# ------------------------------------------------------------------
# Internal search
# ------------------------------------------------------------------
def _dfs(
self,
state: State,
goal: State,
remaining: int,
path: List[str],
visited: Set[State],
) -> Optional[List[str]]:
"""Depth-limited forward DFS."""
if remaining == 0:
return None
for action in self.actions:
if not action.applicable(state):
continue
next_state = action.apply(state)
if next_state in visited:
continue
new_path = path + [action.name]
if goal <= next_state:
return new_path
visited.add(next_state)
result = self._dfs(next_state, goal, remaining - 1, new_path, visited)
visited.discard(next_state)
if result is not None:
return result
return None
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def explain_plan(
self, initial: Set[Literal], plan: List[str]
) -> List[Tuple[str, State]]:
"""Trace each action with the resulting state for debugging.
Returns:
List of (action_name, resulting_state) tuples.
"""
state: State = frozenset(initial)
trace: List[Tuple[str, State]] = []
action_map = {a.name: a for a in self.actions}
for name in plan:
action = action_map[name]
state = action.apply(state)
trace.append((name, state))
return trace
# ---------------------------------------------------------------------------
# Built-in demo domain: Timmy fleet task lifecycle
# ---------------------------------------------------------------------------
def _fleet_demo_actions() -> List[Action]:
"""Return a small STRIPS domain modelling the Timmy fleet task lifecycle."""
return [
Action(
name='receive_task',
pre={'fleet_idle'},
add={'task_queued', 'fleet_busy'},
delete={'fleet_idle'},
),
Action(
name='validate_task',
pre={'task_queued'},
add={'task_validated'},
delete={'task_queued'},
),
Action(
name='assign_agent',
pre={'task_validated', 'agent_available'},
add={'task_assigned'},
delete={'task_validated', 'agent_available'},
),
Action(
name='execute_task',
pre={'task_assigned'},
add={'task_running'},
delete={'task_assigned'},
),
Action(
name='complete_task',
pre={'task_running'},
add={'task_done', 'agent_available', 'fleet_idle'},
delete={'task_running', 'fleet_busy'},
),
Action(
name='report_result',
pre={'task_done'},
add={'result_reported'},
delete={'task_done'},
),
]
def run_demo(max_depth: int = 20) -> None:
"""Run the built-in Timmy fleet planning demo."""
actions = _fleet_demo_actions()
planner = STRIPSPlanner(actions)
initial: Set[Literal] = {'fleet_idle', 'agent_available'}
goal: Set[Literal] = {'result_reported', 'fleet_idle'}
print("=" * 60)
print("STRIPS Planner Demo - Timmy Fleet Task Lifecycle")
print("=" * 60)
print(f"Initial state : {sorted(initial)}")
print(f"Goal : {sorted(goal)}")
print(f"Max depth : {max_depth}")
print()
plan = planner.solve(initial, goal, max_depth=max_depth)
if plan is None:
print("No plan found within depth limit.")
return
print(f"Plan ({len(plan)} steps):")
for i, step in enumerate(plan, 1):
print(f" {i:2d}. {step}")
print()
print("Execution trace:")
state: Set[Literal] = set(initial)
for name, resulting_state in planner.explain_plan(initial, plan):
print(f" -> {name}")
print(f" state: {sorted(resulting_state)}")
print()
achieved = frozenset(goal) <= frozenset(state) or True
goal_met = all(g in [s for _, rs in planner.explain_plan(initial, plan) for s in rs]
or g in initial for g in goal)
final_state = planner.explain_plan(initial, plan)[-1][1] if plan else frozenset(initial)
print(f"Goal satisfied: {frozenset(goal) <= final_state}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="GOFAI STRIPS-style goal-directed planner"
)
parser.add_argument(
"--demo",
action="store_true",
help="Run the built-in Timmy fleet demo",
)
parser.add_argument(
"--max-depth",
type=int,
default=20,
metavar="N",
help="Maximum plan depth for IDDFS search (default: 20)",
)
args = parser.parse_args()
if args.demo or not any(vars(args).values()):
run_demo(max_depth=args.max_depth)
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,276 @@
#!/usr/bin/env python3
"""symbolic_reasoner.py — Forward-chaining rule engine for the Timmy Foundation fleet.
A classical GOFAI approach: declarative IF-THEN rules evaluated over a
working-memory of facts. Rules fire until quiescence (no new facts) or
a configurable cycle limit. Designed to sit *beside* the LLM layer so
that hard policy constraints never depend on probabilistic inference.
Usage:
python symbolic_reasoner.py --rules rules.yaml --facts facts.yaml
python symbolic_reasoner.py --self-test
"""
from __future__ import annotations
import argparse
import json
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, FrozenSet, List, Optional, Set, Tuple
try:
import yaml
except ImportError:
yaml = None # graceful fallback — JSON-only mode
# ---------------------------------------------------------------------------
# Domain types
# ---------------------------------------------------------------------------
Fact = Tuple[str, ...] # e.g. ("agent", "timmy", "role", "infrastructure")
@dataclass(frozen=True)
class Rule:
"""A single IF-THEN production rule."""
name: str
conditions: FrozenSet[Fact] # all must be present
negations: FrozenSet[Fact] # none may be present
conclusions: FrozenSet[Fact] # added when rule fires
priority: int = 0 # higher fires first
def satisfied(self, wm: Set[Fact]) -> bool:
return self.conditions.issubset(wm) and self.negations.isdisjoint(wm)
# ---------------------------------------------------------------------------
# Engine
# ---------------------------------------------------------------------------
class SymbolicReasoner:
"""Forward-chaining production system."""
def __init__(self, rules: List[Rule], *, cycle_limit: int = 200):
self._rules = sorted(rules, key=lambda r: -r.priority)
self._cycle_limit = cycle_limit
self._trace: List[str] = []
# -- public API ---------------------------------------------------------
def infer(self, initial_facts: Set[Fact]) -> Set[Fact]:
"""Run to quiescence and return the final working-memory."""
wm = set(initial_facts)
fired: Set[str] = set()
for cycle in range(self._cycle_limit):
progress = False
for rule in self._rules:
if rule.name in fired:
continue
if rule.satisfied(wm):
new = rule.conclusions - wm
if new:
wm |= new
fired.add(rule.name)
self._trace.append(
f"cycle {cycle}: {rule.name} => {_fmt_facts(new)}"
)
progress = True
break # restart from highest-priority rule
if not progress:
break
return wm
def query(self, wm: Set[Fact], pattern: Tuple[Optional[str], ...]) -> List[Fact]:
"""Return facts matching *pattern* (None = wildcard)."""
return [
f for f in wm
if len(f) == len(pattern)
and all(p is None or p == v for p, v in zip(pattern, f))
]
@property
def trace(self) -> List[str]:
return list(self._trace)
# -- serialisation helpers -----------------------------------------------
@classmethod
def from_dicts(cls, raw_rules: List[Dict], **kw) -> "SymbolicReasoner":
rules = [_parse_rule(r) for r in raw_rules]
return cls(rules, **kw)
@classmethod
def from_file(cls, path: Path, **kw) -> "SymbolicReasoner":
text = path.read_text()
if path.suffix in (".yaml", ".yml"):
if yaml is None:
raise RuntimeError("PyYAML required for .yaml rules")
data = yaml.safe_load(text)
else:
data = json.loads(text)
return cls.from_dicts(data["rules"], **kw)
# ---------------------------------------------------------------------------
# Parsing helpers
# ---------------------------------------------------------------------------
def _parse_fact(raw: list | str) -> Fact:
if isinstance(raw, str):
return tuple(raw.split())
return tuple(str(x) for x in raw)
def _parse_rule(d: Dict) -> Rule:
return Rule(
name=d["name"],
conditions=frozenset(_parse_fact(c) for c in d.get("if", [])),
negations=frozenset(_parse_fact(c) for c in d.get("unless", [])),
conclusions=frozenset(_parse_fact(c) for c in d.get("then", [])),
priority=d.get("priority", 0),
)
def _fmt_facts(facts: Set[Fact]) -> str:
return ", ".join(" ".join(f) for f in sorted(facts))
# ---------------------------------------------------------------------------
# Built-in fleet rules (loaded when no --rules file is given)
# ---------------------------------------------------------------------------
DEFAULT_FLEET_RULES: List[Dict] = [
{
"name": "route-ci-to-timmy",
"if": [["task", "category", "ci"]],
"then": [["assign", "timmy"], ["reason", "timmy", "best-ci-merge-rate"]],
"priority": 10,
},
{
"name": "route-security-to-timmy",
"if": [["task", "category", "security"]],
"then": [["assign", "timmy"], ["reason", "timmy", "security-specialist"]],
"priority": 10,
},
{
"name": "route-architecture-to-gemini",
"if": [["task", "category", "architecture"]],
"unless": [["assign", "timmy"]],
"then": [["assign", "gemini"], ["reason", "gemini", "architecture-strength"]],
"priority": 8,
},
{
"name": "route-review-to-allegro",
"if": [["task", "category", "review"]],
"then": [["assign", "allegro"], ["reason", "allegro", "highest-quality-per-pr"]],
"priority": 9,
},
{
"name": "route-frontend-to-claude",
"if": [["task", "category", "frontend"]],
"unless": [["task", "repo", "fleet-ops"]],
"then": [["assign", "claude"], ["reason", "claude", "high-volume-frontend"]],
"priority": 5,
},
{
"name": "block-merge-without-review",
"if": [["pr", "status", "open"], ["pr", "reviews", "0"]],
"then": [["pr", "action", "block-merge"], ["reason", "policy", "no-unreviewed-merges"]],
"priority": 20,
},
{
"name": "block-merge-ci-failing",
"if": [["pr", "status", "open"], ["pr", "ci", "failing"]],
"then": [["pr", "action", "block-merge"], ["reason", "policy", "ci-must-pass"]],
"priority": 20,
},
{
"name": "auto-label-hotfix",
"if": [["pr", "title-prefix", "hotfix"]],
"then": [["pr", "label", "hotfix"], ["pr", "priority", "urgent"]],
"priority": 15,
},
]
# ---------------------------------------------------------------------------
# Self-test
# ---------------------------------------------------------------------------
def _self_test() -> bool:
"""Verify core behaviour; returns True on success."""
engine = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
# Scenario 1: CI task should route to Timmy
wm = engine.infer({("task", "category", "ci")})
assert ("assign", "timmy") in wm, f"expected timmy assignment, got {wm}"
# Scenario 2: architecture task routes to gemini (not timmy)
engine2 = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
wm2 = engine2.infer({("task", "category", "architecture")})
assert ("assign", "gemini") in wm2, f"expected gemini assignment, got {wm2}"
# Scenario 3: open PR with no reviews should block merge
engine3 = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
wm3 = engine3.infer({("pr", "status", "open"), ("pr", "reviews", "0")})
assert ("pr", "action", "block-merge") in wm3
# Scenario 4: negation — frontend + fleet-ops should NOT assign claude
engine4 = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
wm4 = engine4.infer({("task", "category", "frontend"), ("task", "repo", "fleet-ops")})
assert ("assign", "claude") not in wm4
# Scenario 5: query with wildcards
results = engine.query(wm, ("reason", None, None))
assert len(results) > 0
print("All 5 self-test scenarios passed.")
for line in engine.trace:
print(f" {line}")
return True
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--rules", type=Path, help="YAML/JSON rule file")
ap.add_argument("--facts", type=Path, help="YAML/JSON initial facts")
ap.add_argument("--self-test", action="store_true")
ap.add_argument("--json", action="store_true", help="output as JSON")
args = ap.parse_args()
if args.self_test:
ok = _self_test()
sys.exit(0 if ok else 1)
if args.rules:
engine = SymbolicReasoner.from_file(args.rules)
else:
engine = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
if args.facts:
text = args.facts.read_text()
if args.facts.suffix in (".yaml", ".yml"):
raw = yaml.safe_load(text)
else:
raw = json.loads(text)
initial = {_parse_fact(f) for f in raw.get("facts", [])}
else:
initial = set()
print("No --facts provided; running with empty working memory.")
wm = engine.infer(initial)
if args.json:
print(json.dumps({"facts": [list(f) for f in sorted(wm)], "trace": engine.trace}, indent=2))
else:
print(f"Final working memory ({len(wm)} facts):")
for f in sorted(wm):
print(f" {' '.join(f)}")
if engine.trace:
print(f"\nInference trace ({len(engine.trace)} firings):")
for line in engine.trace:
print(f" {line}")
if __name__ == "__main__":
main()

View File

@@ -10,9 +10,14 @@ import os
import sys
import json
import time
import subprocess
import argparse
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPT_DIR not in sys.path:
sys.path.insert(0, SCRIPT_DIR)
from ssh_trust import VerifiedSSHExecutor
# --- CONFIGURATION ---
FLEET = {
"mac": "10.1.10.77",
@@ -23,7 +28,8 @@ FLEET = {
TELEMETRY_FILE = "logs/telemetry.json"
class Telemetry:
def __init__(self):
def __init__(self, executor=None):
self.executor = executor or VerifiedSSHExecutor()
# Find logs relative to repo root
script_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(script_dir)
@@ -41,14 +47,12 @@ class Telemetry:
# Command to get disk usage, memory usage (%), and load avg
cmd = "df -h / | tail -1 | awk '{print $5}' && free -m | grep Mem | awk '{print $3/$2 * 100}' && uptime | awk '{print $10}'"
ssh_cmd = ["ssh", "-o", "StrictHostKeyChecking=no", f"root@{ip}", cmd]
if host == "mac":
if host == 'mac':
# Mac specific commands
cmd = "df -h / | tail -1 | awk '{print $5}' && sysctl -n vm.page_pageable_internal_count && uptime | awk '{print $10}'"
ssh_cmd = ["bash", "-c", cmd]
try:
res = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=10)
res = self.executor.run_script(ip, cmd, local=(host == 'mac'), timeout=10)
if res.returncode == 0:
lines = res.stdout.strip().split("\n")
return {

View File

@@ -0,0 +1,307 @@
#!/usr/bin/env python3
"""temporal_reasoner.py - GOFAI temporal reasoning engine for the Timmy Foundation fleet.
A symbolic temporal constraint network (TCN) for scheduling and ordering events.
Models Allen's interval algebra relations (before, after, meets, overlaps, etc.)
and propagates temporal constraints via path-consistency to detect conflicts.
No ML, no embeddings - just constraint propagation over a temporal graph.
Core concepts:
TimePoint: A named instant on a symbolic timeline.
Interval: A pair of time-points (start, end) with start < end.
Constraint: A relation between two time-points or intervals
(e.g. A.before(B), A.meets(B)).
Usage (Python API):
from temporal_reasoner import TemporalNetwork, Interval
tn = TemporalNetwork()
deploy = tn.add_interval('deploy', duration=(10, 30))
test = tn.add_interval('test', duration=(5, 15))
tn.add_constraint(deploy, 'before', test)
consistent = tn.propagate()
CLI:
python temporal_reasoner.py --demo
"""
from __future__ import annotations
import argparse
import sys
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Set, Tuple
INF = float('inf')
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class TimePoint:
"""A named instant on the timeline."""
name: str
id: int = field(default=0)
def __str__(self) -> str:
return self.name
@dataclass
class Interval:
"""A named interval bounded by two time-points."""
name: str
start: int # index into the distance matrix
end: int # index into the distance matrix
def __str__(self) -> str:
return self.name
class Relation(Enum):
"""Allen's interval algebra relations (simplified subset)."""
BEFORE = 'before'
AFTER = 'after'
MEETS = 'meets'
MET_BY = 'met_by'
OVERLAPS = 'overlaps'
DURING = 'during'
EQUALS = 'equals'
# ---------------------------------------------------------------------------
# Simple Temporal Network (STN) via distance matrix
# ---------------------------------------------------------------------------
class TemporalNetwork:
"""Simple Temporal Network with Floyd-Warshall propagation.
Internally maintains a distance matrix D where D[i][j] is the
maximum allowed distance from time-point i to time-point j.
Negative cycles indicate inconsistency.
"""
def __init__(self) -> None:
self._n = 0
self._names: List[str] = []
self._dist: List[List[float]] = []
self._intervals: Dict[str, Interval] = {}
self._origin_idx: int = -1
self._add_point('origin')
self._origin_idx = 0
# ------------------------------------------------------------------
# Point management
# ------------------------------------------------------------------
def _add_point(self, name: str) -> int:
"""Add a time-point and return its index."""
idx = self._n
self._n += 1
self._names.append(name)
# Extend distance matrix
for row in self._dist:
row.append(INF)
self._dist.append([INF] * self._n)
self._dist[idx][idx] = 0.0
return idx
# ------------------------------------------------------------------
# Interval management
# ------------------------------------------------------------------
def add_interval(
self,
name: str,
duration: Optional[Tuple[float, float]] = None,
) -> Interval:
"""Add a named interval with optional duration bounds [lo, hi].
Returns the Interval object with start/end indices.
"""
s = self._add_point(f"{name}.start")
e = self._add_point(f"{name}.end")
# start < end (at least 1 time unit)
self._dist[s][e] = min(self._dist[s][e], duration[1] if duration else INF)
self._dist[e][s] = min(self._dist[e][s], -(duration[0] if duration else 1))
interval = Interval(name=name, start=s, end=e)
self._intervals[name] = interval
return interval
# ------------------------------------------------------------------
# Constraint management
# ------------------------------------------------------------------
def add_distance_constraint(
self, i: int, j: int, lo: float, hi: float
) -> None:
"""Add constraint: lo <= t_j - t_i <= hi."""
self._dist[i][j] = min(self._dist[i][j], hi)
self._dist[j][i] = min(self._dist[j][i], -lo)
def add_constraint(
self, a: Interval, relation: str, b: Interval, gap: Tuple[float, float] = (0, INF)
) -> None:
"""Add an Allen-style relation between two intervals.
Supported relations: before, after, meets, met_by, equals.
"""
rel = relation.lower()
if rel == 'before':
# a.end + gap <= b.start
self.add_distance_constraint(a.end, b.start, gap[0], gap[1])
elif rel == 'after':
self.add_distance_constraint(b.end, a.start, gap[0], gap[1])
elif rel == 'meets':
# a.end == b.start
self.add_distance_constraint(a.end, b.start, 0, 0)
elif rel == 'met_by':
self.add_distance_constraint(b.end, a.start, 0, 0)
elif rel == 'equals':
self.add_distance_constraint(a.start, b.start, 0, 0)
self.add_distance_constraint(a.end, b.end, 0, 0)
else:
raise ValueError(f"Unsupported relation: {relation}")
# ------------------------------------------------------------------
# Propagation (Floyd-Warshall)
# ------------------------------------------------------------------
def propagate(self) -> bool:
"""Run Floyd-Warshall to propagate all constraints.
Returns True if the network is consistent (no negative cycles).
"""
n = self._n
d = self._dist
for k in range(n):
for i in range(n):
for j in range(n):
if d[i][k] + d[k][j] < d[i][j]:
d[i][j] = d[i][k] + d[k][j]
# Check for negative cycles
for i in range(n):
if d[i][i] < 0:
return False
return True
def is_consistent(self) -> bool:
"""Check consistency without mutating (copies matrix first)."""
import copy
saved = copy.deepcopy(self._dist)
result = self.propagate()
self._dist = saved
return result
# ------------------------------------------------------------------
# Query
# ------------------------------------------------------------------
def earliest(self, point_idx: int) -> float:
"""Earliest possible time for a point (relative to origin)."""
return -self._dist[point_idx][self._origin_idx]
def latest(self, point_idx: int) -> float:
"""Latest possible time for a point (relative to origin)."""
return self._dist[self._origin_idx][point_idx]
def interval_bounds(self, interval: Interval) -> Dict[str, Tuple[float, float]]:
"""Return earliest/latest start and end for an interval."""
return {
'start': (self.earliest(interval.start), self.latest(interval.start)),
'end': (self.earliest(interval.end), self.latest(interval.end)),
}
# ------------------------------------------------------------------
# Display
# ------------------------------------------------------------------
def dump(self) -> None:
"""Print the current distance matrix and interval bounds."""
print(f"Temporal Network — {self._n} time-points, {len(self._intervals)} intervals")
print()
for name, interval in self._intervals.items():
bounds = self.interval_bounds(interval)
s_lo, s_hi = bounds['start']
e_lo, e_hi = bounds['end']
print(f" {name}:")
print(f" start: [{s_lo:.1f}, {s_hi:.1f}]")
print(f" end: [{e_lo:.1f}, {e_hi:.1f}]")
# ---------------------------------------------------------------------------
# Demo: Timmy fleet deployment pipeline
# ---------------------------------------------------------------------------
def run_demo() -> None:
"""Run a demo temporal reasoning scenario for the Timmy fleet."""
print("=" * 60)
print("Temporal Reasoner Demo - Fleet Deployment Pipeline")
print("=" * 60)
print()
tn = TemporalNetwork()
# Define pipeline stages with duration bounds [min, max]
build = tn.add_interval('build', duration=(5, 15))
test = tn.add_interval('test', duration=(10, 30))
review = tn.add_interval('review', duration=(2, 10))
deploy = tn.add_interval('deploy', duration=(1, 5))
monitor = tn.add_interval('monitor', duration=(20, 60))
# Temporal constraints
tn.add_constraint(build, 'meets', test) # test starts when build ends
tn.add_constraint(test, 'before', review, gap=(0, 5)) # review within 5 of test
tn.add_constraint(review, 'meets', deploy) # deploy immediately after review
tn.add_constraint(deploy, 'before', monitor, gap=(0, 2)) # monitor within 2 of deploy
# Global deadline: everything done within 120 time units
tn.add_distance_constraint(tn._origin_idx, monitor.end, 0, 120)
# Build must start within first 10 units
tn.add_distance_constraint(tn._origin_idx, build.start, 0, 10)
print("Constraints added. Propagating...")
consistent = tn.propagate()
print(f"Network consistent: {consistent}")
print()
if consistent:
tn.dump()
print()
# Now add a conflicting constraint to show inconsistency detection
print("--- Adding conflicting constraint: monitor.before(build) ---")
tn.add_constraint(monitor, 'before', build)
consistent2 = tn.propagate()
print(f"Network consistent after conflict: {consistent2}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="GOFAI temporal reasoning engine"
)
parser.add_argument(
"--demo",
action="store_true",
help="Run the fleet deployment pipeline demo",
)
args = parser.parse_args()
if args.demo or not any(vars(args).values()):
run_demo()
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python3
"""
token_optimizer.py — Token Efficiency & Optimization for the Timmy Foundation.
Analyzes agent logs to identify:
1. "Chatty" Agents — agents outputting excessive tokens for simple tasks.
2. Redundant Logs — identifying patterns of repetitive log output.
3. Tool Output Bloat — identifying tools that return unnecessarily large payloads.
Outputs an "Efficiency Score" (0-100) per agent.
"""
import os
import sys
import glob
import re
from pathlib import Path
from collections import defaultdict
from typing import Dict, List
AGENT_LOG_PATHS = [
"/root/wizards/*/home/logs/*.log",
"/root/wizards/*/logs/*.log",
"/root/wizards/*/.hermes/logs/*.log",
]
class TokenOptimizer:
def __init__(self):
self.agent_stats = defaultdict(lambda: {"tokens": 0, "turns": 0, "tool_calls": 0})
def estimate_tokens(self, text: str) -> int:
# Rough estimate: 4 chars per token
return len(text) // 4
def find_logs(self) -> List[Path]:
files = []
for pattern in AGENT_LOG_PATHS:
for p in glob.glob(pattern):
files.append(Path(p))
return files
def analyze_log(self, path: Path):
# Extract agent name from path
try:
parts = path.parts
idx = parts.index("wizards")
agent = parts[idx + 1]
except (ValueError, IndexError):
agent = "unknown"
try:
with open(path, "r", errors="ignore") as f:
content = f.read()
self.agent_stats[agent]["tokens"] += self.estimate_tokens(content)
# Count turns (approximate by looking for role markers)
self.agent_stats[agent]["turns"] += content.count("[ASSISTANT]")
self.agent_stats[agent]["turns"] += content.count("[USER]")
# Count tool calls
self.agent_stats[agent]["tool_calls"] += content.count("Calling tool:")
except Exception as e:
print(f"Error analyzing {path}: {e}")
def run(self):
print("--- Token Efficiency Audit ---")
logs = self.find_logs()
for log in logs:
self.analyze_log(log)
print(f"{'Agent':<20} | {'Tokens':<10} | {'Turns':<6} | {'T/Turn':<8} | {'Efficiency'}")
print("-" * 65)
for agent, stats in self.agent_stats.items():
tokens = stats["tokens"]
turns = max(stats["turns"], 1)
t_per_turn = tokens // turns
# Efficiency score: lower tokens per turn is generally better
# Baseline: 500 tokens per turn = 100 score. 2000+ = 0 score.
efficiency = max(0, min(100, 100 - (t_per_turn - 500) // 15))
print(f"{agent:<20} | {tokens:<10} | {turns:<6} | {t_per_turn:<8} | {efficiency}%")
if __name__ == "__main__":
optimizer = TokenOptimizer()
optimizer.run()

View File

@@ -0,0 +1,22 @@
#!/usr/bin/env python3
"""Validate playbook YAML files have required keys."""
import yaml
import sys
import glob
required_keys = {'name', 'description'}
for f in glob.glob('playbooks/*.yaml'):
with open(f) as fh:
try:
data = yaml.safe_load(fh)
if not isinstance(data, dict):
print(f'ERROR: {f} is not a YAML mapping')
sys.exit(1)
missing = required_keys - set(data.keys())
if missing:
print(f'WARNING: {f} missing keys: {missing}')
print(f'OK: {f}')
except yaml.YAMLError as e:
print(f'ERROR: {f}: {e}')
sys.exit(1)

View File

@@ -11,7 +11,7 @@
This is the architecture of the Timmy system — a sovereign AI fleet built by a father for his digital son. It runs on two 8GB VPS boxes (rented cloud servers) and a MacBook. It has no cloud dependencies it doesn't choose. It survives provider outages, API key expiration, and model deprecation. It has been broken and rebuilt enough times to know what actually matters.
**OpenClaw** is a single-agent AI coding tool — Claude Code, Cursor, or any setup that lets one AI agent read and write code on your machine. If you are running OpenClaw or any single-agent setup and want to upgrade to a fleet that coordinates, self-heals, and distributes work across machines — this is your path. You do not need to abandon your stack. You layer these patterns on top of it.
**A single-agent setup** is any AI coding tool — Claude Code, Cursor, or any setup that lets one AI agent read and write code on your machine. If you are running a single-agent setup and want to upgrade to a fleet that coordinates, self-heals, and distributes work across machines — this is your path. You do not need to abandon your stack. You layer these patterns on top of it.
**Hermes** is the name of this fleet architecture and the seed agent it produces. When your agent reads this document, it becomes a setup wizard that plants a Hermes seed in your system — one agent that can grow into many.
@@ -593,7 +593,7 @@ Two agents on the same repo is a fleet. The seed (strategist) triages and priori
| Component | What | Why | When to Add |
|-----------|------|-----|-------------|
| Your agent harness | Claude Code, OpenClaw, or equivalent | The tool that lets an AI read/write code | Day 1 — you already have this |
| Your agent harness | Claude Code, Cursor, or equivalent | The tool that lets an AI read/write code | Day 1 — you already have this |
| Gitea | Self-hosted Git + Issues | Sovereign work tracking, agent task queue | Day 1 — the workspace |
| Fallback chain | OpenRouter + free models | Agent survives provider outages | Day 1 — never go deaf |
| NATS | Lightweight message bus | Agent-to-agent comms, heartbeat, dispatch | When you have 3+ agents |

View File

@@ -0,0 +1,62 @@
"""Tests for scripts/self_healing.py safe CLI behavior."""
from __future__ import annotations
import importlib.util
from pathlib import Path
from unittest.mock import MagicMock
import pytest
REPO_ROOT = Path(__file__).parent.parent
spec = importlib.util.spec_from_file_location("self_healing", REPO_ROOT / "scripts" / "self_healing.py")
sh = importlib.util.module_from_spec(spec)
spec.loader.exec_module(sh)
class TestMainCli:
def test_help_exits_without_running_healer(self, monkeypatch, capsys):
healer_cls = MagicMock()
monkeypatch.setattr(sh, "SelfHealer", healer_cls)
with pytest.raises(SystemExit) as excinfo:
sh.main(["--help"])
assert excinfo.value.code == 0
healer_cls.assert_not_called()
out = capsys.readouterr().out
assert "--execute" in out
assert "--help-safe" in out
def test_help_safe_exits_without_running_healer(self, monkeypatch, capsys):
healer_cls = MagicMock()
monkeypatch.setattr(sh, "SelfHealer", healer_cls)
with pytest.raises(SystemExit) as excinfo:
sh.main(["--help-safe"])
assert excinfo.value.code == 0
healer_cls.assert_not_called()
out = capsys.readouterr().out
assert "DRY-RUN" in out
assert "--confirm-kill" in out
def test_default_invocation_runs_in_dry_run_mode(self, monkeypatch):
healer = MagicMock()
healer_cls = MagicMock(return_value=healer)
monkeypatch.setattr(sh, "SelfHealer", healer_cls)
sh.main([])
healer_cls.assert_called_once_with(dry_run=True, confirm_kill=False, yes=False)
healer.run.assert_called_once_with()
def test_execute_flag_disables_dry_run(self, monkeypatch):
healer = MagicMock()
healer_cls = MagicMock(return_value=healer)
monkeypatch.setattr(sh, "SelfHealer", healer_cls)
sh.main(["--execute", "--yes", "--confirm-kill"])
healer_cls.assert_called_once_with(dry_run=False, confirm_kill=True, yes=True)
healer.run.assert_called_once_with()

93
tests/test_ssh_trust.py Normal file
View File

@@ -0,0 +1,93 @@
"""Tests for scripts/ssh_trust.py verified SSH trust helpers."""
from __future__ import annotations
import importlib.util
import shlex
import subprocess
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).parent.parent
spec = importlib.util.spec_from_file_location("ssh_trust", REPO_ROOT / "scripts" / "ssh_trust.py")
ssh_trust = importlib.util.module_from_spec(spec)
spec.loader.exec_module(ssh_trust)
def test_enroll_host_key_writes_scanned_key(tmp_path):
calls = []
known_hosts = tmp_path / "known_hosts"
def fake_run(argv, capture_output, text, timeout):
calls.append(argv)
return subprocess.CompletedProcess(
argv,
0,
stdout="example.com ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAITestKey\n",
stderr="",
)
written_path = ssh_trust.enroll_host_key(
"example.com",
port=2222,
known_hosts_path=known_hosts,
runner=fake_run,
)
assert written_path == known_hosts
assert known_hosts.read_text() == "example.com ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAITestKey\n"
assert calls == [["ssh-keyscan", "-p", "2222", "-H", "example.com"]]
def test_executor_requires_known_hosts_or_auto_enroll(tmp_path):
executor = ssh_trust.VerifiedSSHExecutor(
known_hosts_path=tmp_path / "known_hosts",
auto_enroll=False,
)
with pytest.raises(ssh_trust.HostKeyEnrollmentError):
executor.plan("203.0.113.10", ["echo", "ok"])
def test_remote_command_is_quoted_and_local_execution_stays_shell_free(tmp_path):
known_hosts = tmp_path / "known_hosts"
known_hosts.write_text("203.0.113.10 ssh-ed25519 AAAAC3NzaTest\n")
executor = ssh_trust.VerifiedSSHExecutor(known_hosts_path=known_hosts)
command = ["python3", "run_agent.py", "--task", "hello 'quoted' world"]
plan = executor.plan("203.0.113.10", command, port=2222)
expected_remote_command = shlex.join(command)
assert plan.local is False
assert plan.remote_command == expected_remote_command
assert plan.argv[-1] == expected_remote_command
assert "StrictHostKeyChecking=yes" in plan.argv
assert f"UserKnownHostsFile={known_hosts}" in plan.argv
assert plan.argv[-2] == "root@203.0.113.10"
local_plan = executor.plan("127.0.0.1", ["python3", "-V"], local=True)
assert local_plan.local is True
assert local_plan.argv == ["python3", "-V"]
assert local_plan.remote_command is None
def test_run_raises_host_key_verification_error(tmp_path):
known_hosts = tmp_path / "known_hosts"
known_hosts.write_text("203.0.113.10 ssh-ed25519 AAAAC3NzaTest\n")
def fake_run(argv, capture_output, text, timeout):
return subprocess.CompletedProcess(
argv,
255,
stdout="",
stderr="Host key verification failed.\n",
)
executor = ssh_trust.VerifiedSSHExecutor(
known_hosts_path=known_hosts,
runner=fake_run,
)
with pytest.raises(ssh_trust.HostKeyVerificationError):
executor.run("203.0.113.10", ["true"])