Compare commits

...

42 Commits

Author SHA1 Message Date
b21c2833f7 Merge pull request '[PERPLEXITY-08] Add PR checklist CI workflow and enforcement script' (#411) from perplexity/pr-checklist-ci into main 2026-04-08 11:11:02 +00:00
f84b870ce4 Merge branch 'main' into perplexity/pr-checklist-ci
Some checks failed
PR Checklist / pr-checklist (pull_request) Failing after 1m18s
2026-04-08 11:10:51 +00:00
8b4df81b5b Merge pull request '[PERPLEXITY-08] Add PR checklist CI workflow and enforcement script' (#411) from perplexity/pr-checklist-ci into main 2026-04-08 11:10:23 +00:00
e96fae69cf Merge branch 'main' into perplexity/pr-checklist-ci
Some checks failed
PR Checklist / pr-checklist (pull_request) Failing after 1m18s
2026-04-08 11:10:15 +00:00
cccafd845b Merge pull request '[PERPLEXITY-03] Add disambiguation header to SOUL.md (Bitcoin inscription)' (#412) from perplexity/soul-md-disambiguation into main 2026-04-08 11:10:09 +00:00
1f02166107 Merge branch 'main' into perplexity/soul-md-disambiguation 2026-04-08 11:10:00 +00:00
7dcaa05dbd Merge pull request 'refactor: wire retrieval_enforcer L1 to SovereignStore — eliminate subprocess/ONNX dependency' (#384) from perplexity/wire-enforcer-sovereign-store into main 2026-04-08 11:09:53 +00:00
18124206e1 Merge branch 'main' into perplexity/wire-enforcer-sovereign-store 2026-04-08 11:09:45 +00:00
11736e58cd docs: add disambiguation header to SOUL.md (Bitcoin inscription)
This SOUL.md is the Bitcoin inscription version, not the narrative
identity document. Adding an HTML comment header to clarify.

The canonical narrative SOUL.md lives in timmy-home.
See: #388, #378
2026-04-08 10:58:55 +00:00
14521ef664 feat: add PR checklist enforcement script
All checks were successful
PR Checklist / pr-checklist (pull_request) Successful in 2m21s
Python script that enforces PR quality standards:
- Checks for actual code changes
- Validates branch is not behind base
- Detects issue bundling in PR body
- Runs Python syntax validation
- Verifies shell script executability
- Ensures issue references exist

Closes #393
2026-04-08 10:53:44 +00:00
8b17eaa537 ci: add PR checklist quality gate workflow 2026-04-08 10:51:40 +00:00
afee83c1fe Merge pull request 'docs: add MEMORY_ARCHITECTURE.md — retrieval order, storage layout, data flow' (#375) from perplexity/mempalace-architecture-doc into main 2026-04-08 10:39:51 +00:00
56d8085e88 Merge branch 'main' into perplexity/mempalace-architecture-doc 2026-04-08 10:39:35 +00:00
4e7b24617f Merge pull request 'feat: FLEET-010/011/012 — Phase 3-5 cross-agent delegation, model pipeline, lifecycle' (#365) from timmy/fleet-phase3-5 into main 2026-04-08 10:39:09 +00:00
8daa12c518 Merge branch 'main' into timmy/fleet-phase3-5 2026-04-08 10:39:01 +00:00
e369727235 Merge branch 'main' into perplexity/mempalace-architecture-doc 2026-04-08 10:38:42 +00:00
1705a7b802 Merge pull request 'feat: FLEET-010/011/012 — Phase 3-5 cross-agent delegation, model pipeline, lifecycle' (#365) from timmy/fleet-phase3-5 into main 2026-04-08 10:38:08 +00:00
e0bef949dd Merge branch 'main' into timmy/fleet-phase3-5 2026-04-08 10:37:56 +00:00
dafe8667c5 Merge branch 'main' into perplexity/mempalace-architecture-doc 2026-04-08 10:37:39 +00:00
4844ce6238 Merge pull request 'feat: Bezalel Builder Wizard — Sidecar Authority Update' (#364) from feat/bezalel-wizard-sidecar-v2 into main 2026-04-08 10:37:34 +00:00
a43510a7eb Merge branch 'main' into feat/bezalel-wizard-sidecar-v2 2026-04-08 10:37:25 +00:00
3b00891614 refactor: wire retrieval_enforcer L1 to SovereignStore — eliminate subprocess/ONNX dependency
Replaces the subprocess call to mempalace CLI binary with direct SovereignStore import. L1 palace search now uses SQLite + FTS5 + HRR vectors in-process. No ONNX, no subprocess, no API calls.

Removes: import subprocess, MEMPALACE_BIN constant
Adds: SovereignStore lazy singleton, _get_store(), SOVEREIGN_DB path

Closes #383
Depends on #380 (sovereign_store.py)
2026-04-08 10:32:52 +00:00
74867bbfa7 Merge pull request 'art: The Timmy Foundation — Visual Story (24 images + 2 videos)' (#366) from timmy/gallery-submission into main 2026-04-08 10:16:35 +00:00
d07305b89c Merge branch 'main' into perplexity/mempalace-architecture-doc 2026-04-08 10:16:13 +00:00
2812bac438 Merge branch 'main' into timmy/gallery-submission 2026-04-08 10:16:04 +00:00
5c15704c3a Merge branch 'main' into timmy/fleet-phase3-5 2026-04-08 10:15:55 +00:00
30fdbef74e Merge branch 'main' into feat/bezalel-wizard-sidecar-v2 2026-04-08 10:15:49 +00:00
9cc2cf8f8d Merge pull request 'feat: Sovereign Memory Store — zero-API durable memory (SQLite + FTS5 + HRR)' (#380) from perplexity/sovereign-memory-store into main 2026-04-08 10:14:36 +00:00
a2eff1222b Merge branch 'main' into perplexity/sovereign-memory-store 2026-04-08 10:14:24 +00:00
3f4465b646 Merge pull request '[SOVEREIGN] Orchestrator v1 — backlog reader, priority scorer, agent dispatcher' (#362) from timmy/sovereign-orchestrator-v1 into main 2026-04-08 10:14:16 +00:00
ff7ce9a022 Merge branch 'main' into perplexity/mempalace-architecture-doc 2026-04-08 10:14:10 +00:00
f04aaec4ed Merge branch 'main' into timmy/gallery-submission 2026-04-08 10:13:57 +00:00
d54a218a27 Merge branch 'main' into timmy/fleet-phase3-5 2026-04-08 10:13:44 +00:00
3cc92fde1a Merge branch 'main' into feat/bezalel-wizard-sidecar-v2 2026-04-08 10:13:34 +00:00
11a28b74bb Merge branch 'main' into timmy/sovereign-orchestrator-v1 2026-04-08 10:13:21 +00:00
perplexity
593621c5e0 feat: sovereign memory store — zero-API durable memory (SQLite + FTS5 + HRR)
Implements the missing pieces of the MemPalace epic (#367):

- sovereign_store.py: Self-contained memory store replacing the third-party
  mempalace CLI and its ONNX dependency. Uses:
  * SQLite + FTS5 for keyword search (porter stemmer, unicode61)
  * HRR phase vectors (SHA-256 deterministic, numpy optional) for semantic similarity
  * Reciprocal Rank Fusion to merge keyword and semantic rankings
  * Trust scoring with boost/decay lifecycle
  * Room-based organization matching the existing PalaceRoom model

- promotion.py (MP-4, #371): Quality-gated scratchpad-to-palace promotion.
  Four heuristic gates, no LLM call:
  1. Length gate (min 5 words, max 500)
  2. Structure gate (rejects fragments and pure code)
  3. Duplicate gate (FTS5 + Jaccard overlap detection)
  4. Staleness gate (7-day threshold for old notes)
  Includes force override, batch promotion, and audit logging.

- 21 unit tests covering HRR vectors, store operations, search,
  trust lifecycle, and all promotion gates.

Zero external dependencies. Zero API calls. Zero cloud.

Refs: #367 #370 #371
2026-04-07 22:41:37 +00:00
2e2a646ba8 docs: add MEMORY_ARCHITECTURE.md — retrieval order, storage layout, data flow 2026-04-07 20:16:45 +00:00
Alexander Whitestone
0a4c8f2d37 art: The Timmy Foundation visual story — 24 images, 2 videos, generated with Grok Imagine 2026-04-07 12:46:17 -04:00
Alexander Whitestone
0a13347e39 feat: FLEET-010/011/012 — Phase 3 and 4 fleet capabilities
FLEET-010: Cross-agent task delegation protocol
- Keyword-based heuristic assigns unassigned issues to agents
- Supports: claw-code, gemini, ezra, bezalel, timmy
- Delegation logging and status dashboard
- Auto-comments on assigned issues

FLEET-011: Local model pipeline and fallback chain
- Checks Ollama reachability and model availability
- 4-model chain: hermes4:14b -> qwen2.5:7b -> phi3:3.8b -> gemma3:1b
- Tests each model with live inference on every run
- Fallback verification: finds first responding model
- Chain configuration via ~/.local/timmy/fleet-resources/model-chain.json

FLEET-012: Agent lifecycle manager
- Full lifecycle: provision -> deploy -> monitor -> retire
- Heartbeat detection with 24h idle threshold
- Task completion/failure tracking
- Agent Fleet Status dashboard

Fixes timmy-home#563 (delegation), #564 (model pipeline), #565 (lifecycle)
2026-04-07 12:43:10 -04:00
dc75be18e4 feat: add Bezalel Builder Wizard sidecar configuration 2026-04-07 16:39:42 +00:00
Alexander Whitestone
7399c83024 fix: null guard on assignees in orchestrator dispatch 2026-04-07 12:34:02 -04:00
Alexander Whitestone
cf213bffd1 [SOVEREIGN] Add Orchestrator v1 — backlog reader, priority scorer, agent dispatcher
Resolves #355 #356

Components:
- orchestrator.py: Full sovereign orchestrator with 6 subsystems
  1. Backlog reader (fetches from timmy-config, the-nexus, timmy-home)
  2. Priority scorer (0-100 based on severity, age, assignment state)
  3. Agent roster (groq/ezra/bezalel with health checks)
  4. Dispatcher (matches issues to agents by type/strength)
  5. Consolidated report (terminal + Telegram)
  6. Main loop (--once, --daemon, --dry-run)
- orchestrate.sh: Shell wrapper with env setup

Dry-run tested: 348 issues scanned, 3 agents detected UP.
stdlib only, no pip dependencies.
2026-04-07 12:31:14 -04:00
42 changed files with 2483 additions and 30 deletions

View File

@@ -0,0 +1,29 @@
# pr-checklist.yml — Automated PR quality gate
# Refs: #393 (PERPLEXITY-08), Epic #385
#
# Enforces the review checklist that agents skip when left to self-approve.
# Runs on every pull_request. Fails fast so bad PRs never reach a reviewer.
name: PR Checklist
on:
pull_request:
branches: [main, master]
jobs:
pr-checklist:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Run PR checklist
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: python3 bin/pr-checklist.py

10
SOUL.md
View File

@@ -1,3 +1,13 @@
<!--
NOTE: This is the BITCOIN INSCRIPTION version of SOUL.md.
It is the immutable on-chain conscience. Do not modify this content.
The NARRATIVE identity document (for onboarding, Audio Overviews,
and system prompts) lives in timmy-home/SOUL.md.
See: #388, #378 for the divergence audit.
-->
# SOUL.md
## Inscription 1 — The Immutable Conscience

191
bin/pr-checklist.py Normal file
View File

@@ -0,0 +1,191 @@
#!/usr/bin/env python3
"""pr-checklist.py -- Automated PR quality gate for Gitea CI.
Enforces the review standards that agents skip when left to self-approve.
Runs in CI on every pull_request event. Exits non-zero on any failure.
Checks:
1. PR has >0 file changes (no empty PRs)
2. PR branch is not behind base branch
3. PR does not bundle >3 unrelated issues
4. Changed .py files pass syntax check (python -c import)
5. Changed .sh files are executable
6. PR body references an issue number
7. At least 1 non-author review exists (warning only)
Refs: #393 (PERPLEXITY-08), Epic #385
"""
from __future__ import annotations
import json
import os
import re
import subprocess
import sys
from pathlib import Path
def fail(msg: str) -> None:
print(f"FAIL: {msg}", file=sys.stderr)
def warn(msg: str) -> None:
print(f"WARN: {msg}", file=sys.stderr)
def ok(msg: str) -> None:
print(f" OK: {msg}")
def get_changed_files() -> list[str]:
"""Return list of files changed in this PR vs base branch."""
base = os.environ.get("GITHUB_BASE_REF", "main")
try:
result = subprocess.run(
["git", "diff", "--name-only", f"origin/{base}...HEAD"],
capture_output=True, text=True, check=True,
)
return [f for f in result.stdout.strip().splitlines() if f]
except subprocess.CalledProcessError:
# Fallback: diff against HEAD~1
result = subprocess.run(
["git", "diff", "--name-only", "HEAD~1"],
capture_output=True, text=True, check=True,
)
return [f for f in result.stdout.strip().splitlines() if f]
def check_has_changes(files: list[str]) -> bool:
"""Check 1: PR has >0 file changes."""
if not files:
fail("PR has 0 file changes. Empty PRs are not allowed.")
return False
ok(f"PR changes {len(files)} file(s)")
return True
def check_not_behind_base() -> bool:
"""Check 2: PR branch is not behind base."""
base = os.environ.get("GITHUB_BASE_REF", "main")
try:
result = subprocess.run(
["git", "rev-list", "--count", f"HEAD..origin/{base}"],
capture_output=True, text=True, check=True,
)
behind = int(result.stdout.strip())
if behind > 0:
fail(f"Branch is {behind} commit(s) behind {base}. Rebase or merge.")
return False
ok(f"Branch is up-to-date with {base}")
return True
except (subprocess.CalledProcessError, ValueError):
warn("Could not determine if branch is behind base (git fetch may be needed)")
return True # Don't block on CI fetch issues
def check_issue_bundling(pr_body: str) -> bool:
"""Check 3: PR does not bundle >3 unrelated issues."""
issue_refs = set(re.findall(r"#(\d+)", pr_body))
if len(issue_refs) > 3:
fail(f"PR references {len(issue_refs)} issues ({', '.join(sorted(issue_refs))}). "
"Max 3 per PR to prevent bundling. Split into separate PRs.")
return False
ok(f"PR references {len(issue_refs)} issue(s) (max 3)")
return True
def check_python_syntax(files: list[str]) -> bool:
"""Check 4: Changed .py files have valid syntax."""
py_files = [f for f in files if f.endswith(".py") and Path(f).exists()]
if not py_files:
ok("No Python files changed")
return True
all_ok = True
for f in py_files:
result = subprocess.run(
[sys.executable, "-c", f"import ast; ast.parse(open('{f}').read())"],
capture_output=True, text=True,
)
if result.returncode != 0:
fail(f"Syntax error in {f}: {result.stderr.strip()[:200]}")
all_ok = False
if all_ok:
ok(f"All {len(py_files)} Python file(s) pass syntax check")
return all_ok
def check_shell_executable(files: list[str]) -> bool:
"""Check 5: Changed .sh files are executable."""
sh_files = [f for f in files if f.endswith(".sh") and Path(f).exists()]
if not sh_files:
ok("No shell scripts changed")
return True
all_ok = True
for f in sh_files:
if not os.access(f, os.X_OK):
fail(f"{f} is not executable. Run: chmod +x {f}")
all_ok = False
if all_ok:
ok(f"All {len(sh_files)} shell script(s) are executable")
return all_ok
def check_issue_reference(pr_body: str) -> bool:
"""Check 6: PR body references an issue number."""
if re.search(r"#\d+", pr_body):
ok("PR body references at least one issue")
return True
fail("PR body does not reference any issue (e.g. #123). "
"Every PR must trace to an issue.")
return False
def main() -> int:
print("=" * 60)
print("PR Checklist — Automated Quality Gate")
print("=" * 60)
print()
# Get PR body from env or git log
pr_body = os.environ.get("PR_BODY", "")
if not pr_body:
try:
result = subprocess.run(
["git", "log", "--format=%B", "-1"],
capture_output=True, text=True, check=True,
)
pr_body = result.stdout
except subprocess.CalledProcessError:
pr_body = ""
files = get_changed_files()
failures = 0
checks = [
check_has_changes(files),
check_not_behind_base(),
check_issue_bundling(pr_body),
check_python_syntax(files),
check_shell_executable(files),
check_issue_reference(pr_body),
]
failures = sum(1 for c in checks if not c)
print()
print("=" * 60)
if failures:
print(f"RESULT: {failures} check(s) FAILED")
print("Fix the issues above and push again.")
return 1
else:
print("RESULT: All checks passed")
return 0
if __name__ == "__main__":
sys.exit(main())

141
docs/MEMORY_ARCHITECTURE.md Normal file
View File

@@ -0,0 +1,141 @@
# Memory Architecture
> How Timmy remembers, recalls, and learns — without hallucinating.
Refs: Epic #367 | Sub-issues #368, #369, #370, #371, #372
## Overview
Timmy's memory system uses a **Memory Palace** architecture — a structured, file-backed knowledge store organized into rooms and drawers. When faced with a recall question, the agent checks its palace *before* generating from scratch.
This document defines the retrieval order, storage layers, and data flow that make this work.
## Retrieval Order (L0L5)
When the agent receives a prompt that looks like a recall question ("what did we do?", "what's the status of X?"), the retrieval enforcer intercepts it and walks through layers in order:
| Layer | Source | Question Answered | Short-circuits? |
|-------|--------|-------------------|------------------|
| L0 | `identity.txt` | Who am I? What are my mandates? | No (always loaded) |
| L1 | Palace rooms/drawers | What do I know about this topic? | Yes, if hit |
| L2 | Session scratchpad | What have I learned this session? | Yes, if hit |
| L3 | Artifact retrieval (Gitea API) | Can I fetch the actual issue/file/log? | Yes, if hit |
| L4 | Procedures/playbooks | Is there a documented way to do this? | Yes, if hit |
| L5 | Free generation | (Only when L0L4 are exhausted) | N/A |
**Key principle:** The agent never reaches L5 (free generation) if any prior layer has relevant data. This eliminates hallucination for recall-style queries.
## Storage Layout
```
~/.mempalace/
identity.txt # L0: Who I am, mandates, personality
rooms/
projects/
timmy-config.md # What I know about timmy-config
hermes-agent.md # What I know about hermes-agent
people/
alexander.md # Working relationship context
architecture/
fleet.md # Fleet system knowledge
mempalace.md # Self-knowledge about this system
config/
mempalace.yaml # Palace configuration
~/.hermes/
scratchpad/
{session_id}.json # L2: Ephemeral session context
```
## Components
### 1. Memory Palace Skill (`mempalace.py`) — #368
Core data structures:
- `PalaceRoom`: A named collection of drawers (topics)
- `Mempalace`: The top-level palace with room management
- Factory constructors: `for_issue_analysis()`, `for_health_check()`, `for_code_review()`
### 2. Retrieval Enforcer (`retrieval_enforcer.py`) — #369
Middleware that intercepts recall-style prompts:
1. Detects recall patterns ("what did", "status of", "last time we")
2. Walks L0→L4 in order, short-circuiting on first hit
3. Only allows free generation (L5) when all layers return empty
4. Produces an honest fallback: "I don't have this in my memory palace."
### 3. Session Scratchpad (`scratchpad.py`) — #370
Ephemeral, session-scoped working memory:
- Write-append only during a session
- Entries have TTL (default: 1 hour)
- Queried at L2 in retrieval chain
- Never auto-promoted to palace
### 4. Memory Promotion — #371
Explicit promotion from scratchpad to palace:
- Agent must call `promote_to_palace()` with a reason
- Dedup check against target drawer
- Summary required (raw tool output never stored)
- Conflict detection when new memory contradicts existing
### 5. Wake-Up Protocol (`wakeup.py`) — #372
Boot sequence for new sessions:
```
Session Start
├─ L0: Load identity.txt
├─ L1: Scan palace rooms for active context
├─ L1.5: Surface promoted memories from last session
├─ L2: Load surviving scratchpad entries
└─ Ready: agent knows who it is, what it was doing, what it learned
```
## Data Flow
```
┌──────────────────┐
│ User Prompt │
└────────┬─────────┘
┌────────┴─────────┐
│ Recall Detector │
└────┬───────┬─────┘
│ │
[recall] [not recall]
│ │
┌───────┴────┐ ┌──┬─┴───────┐
│ Retrieval │ │ Normal Flow │
│ Enforcer │ └─────────────┘
│ L0→L1→L2 │
│ →L3→L4→L5│
└──────┬─────┘
┌──────┴─────┐
│ Response │
│ (grounded) │
└────────────┘
```
## Anti-Patterns
| Don't | Do Instead |
|-------|------------|
| Generate from vibes when palace has data | Check palace first (L1) |
| Auto-promote everything to palace | Require explicit `promote_to_palace()` with reason |
| Store raw API responses as memories | Summarize before storing |
| Hallucinate when palace is empty | Say "I don't have this in my memory palace" |
| Dump entire palace on wake-up | Selective loading based on session context |
## Status
| Component | Issue | PR | Status |
|-----------|-------|----|--------|
| Skill port | #368 | #374 | In Review |
| Retrieval enforcer | #369 | #374 | In Review |
| Session scratchpad | #370 | #374 | In Review |
| Memory promotion | #371 | — | Open |
| Wake-up protocol | #372 | #374 | In Review |

122
fleet/agent_lifecycle.py Normal file
View File

@@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""
FLEET-012: Agent Lifecycle Manager
Phase 5: Scale — spawn, train, deploy, retire agents automatically.
Manages the full lifecycle:
1. PROVISION: Clone template, install deps, configure, test
2. DEPLOY: Add to active rotation, start accepting issues
3. MONITOR: Track performance, quality, heartbeat
4. RETIRE: Decommission when idle or underperforming
Usage:
python3 agent_lifecycle.py provision <name> <vps> [--model model]
python3 agent_lifecycle.py deploy <name>
python3 agent_lifecycle.py retire <name>
python3 agent_lifecycle.py status
python3 agent_lifecycle.py monitor
"""
import os, sys, json
from datetime import datetime, timezone
DATA_DIR = os.path.expanduser("~/.local/timmy/fleet-agents")
DB_FILE = os.path.join(DATA_DIR, "agents.json")
LOG_FILE = os.path.join(DATA_DIR, "lifecycle.log")
def ensure():
os.makedirs(DATA_DIR, exist_ok=True)
def log(msg, level="INFO"):
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{ts}] [{level}] {msg}"
with open(LOG_FILE, "a") as f: f.write(entry + "\n")
print(f" {entry}")
def load():
if os.path.exists(DB_FILE):
return json.loads(open(DB_FILE).read())
return {}
def save(db):
open(DB_FILE, "w").write(json.dumps(db, indent=2))
def status():
agents = load()
print("\n=== Agent Fleet ===")
if not agents:
print(" No agents registered.")
return
for name, a in agents.items():
state = a.get("state", "?")
vps = a.get("vps", "?")
model = a.get("model", "?")
tasks = a.get("tasks_completed", 0)
hb = a.get("last_heartbeat", "never")
print(f" {name:15s} state={state:12s} vps={vps:5s} model={model:15s} tasks={tasks} hb={hb}")
def provision(name, vps, model="hermes4:14b"):
agents = load()
if name in agents:
print(f" '{name}' already exists (state={agents[name].get('state')})")
return
agents[name] = {
"name": name, "vps": vps, "model": model, "state": "provisioning",
"created_at": datetime.now(timezone.utc).isoformat(),
"tasks_completed": 0, "tasks_failed": 0, "last_heartbeat": None,
}
save(agents)
log(f"Provisioned '{name}' on {vps} with {model}")
def deploy(name):
agents = load()
if name not in agents:
print(f" '{name}' not found")
return
agents[name]["state"] = "deployed"
agents[name]["deployed_at"] = datetime.now(timezone.utc).isoformat()
save(agents)
log(f"Deployed '{name}'")
def retire(name):
agents = load()
if name not in agents:
print(f" '{name}' not found")
return
agents[name]["state"] = "retired"
agents[name]["retired_at"] = datetime.now(timezone.utc).isoformat()
save(agents)
log(f"Retired '{name}'. Completed {agents[name].get('tasks_completed', 0)} tasks.")
def monitor():
agents = load()
now = datetime.now(timezone.utc)
changes = 0
for name, a in agents.items():
if a.get("state") != "deployed": continue
hb = a.get("last_heartbeat")
if hb:
try:
hb_t = datetime.fromisoformat(hb)
hours = (now - hb_t).total_seconds() / 3600
if hours > 24 and a.get("state") == "deployed":
a["state"] = "idle"
a["idle_since"] = now.isoformat()
log(f"'{name}' idle for {hours:.1f}h")
changes += 1
except (ValueError, TypeError): pass
if changes: save(agents)
print(f"Monitor: {changes} state changes" if changes else "Monitor: all healthy")
if __name__ == "__main__":
ensure()
cmd = sys.argv[1] if len(sys.argv) > 1 else "monitor"
if cmd == "status": status()
elif cmd == "provision" and len(sys.argv) >= 4:
model = sys.argv[4] if len(sys.argv) >= 5 else "hermes4:14b"
provision(sys.argv[2], sys.argv[3], model)
elif cmd == "deploy" and len(sys.argv) >= 3: deploy(sys.argv[2])
elif cmd == "retire" and len(sys.argv) >= 3: retire(sys.argv[2])
elif cmd == "monitor": monitor()
elif cmd == "run": monitor()
else: print("Usage: agent_lifecycle.py [provision|deploy|retire|status|monitor]")

122
fleet/delegation.py Normal file
View File

@@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""
FLEET-010: Cross-Agent Task Delegation Protocol
Phase 3: Orchestration. Agents create issues, assign to other agents, review PRs.
Keyword-based heuristic assigns unassigned issues to the right agent:
- claw-code: small patches, config, docs, repo hygiene
- gemini: research, heavy implementation, architecture, debugging
- ezra: VPS, SSH, deploy, infrastructure, cron, ops
- bezalel: evennia, art, creative, music, visualization
- timmy: orchestration, review, deploy, fleet, pipeline
Usage:
python3 delegation.py run # Full cycle: scan, assign, report
python3 delegation.py status # Show current delegation state
python3 delegation.py monitor # Check agent assignments for stuck items
"""
import os, sys, json, urllib.request
from datetime import datetime, timezone
from pathlib import Path
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
TOKEN = Path(os.path.expanduser("~/.config/gitea/token")).read_text().strip()
DATA_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-resources"))
LOG_FILE = DATA_DIR / "delegation.log"
HEADERS = {"Authorization": f"token {TOKEN}"}
AGENTS = {
"claw-code": {"caps": ["patch","config","gitignore","cleanup","format","readme","typo"], "active": True},
"gemini": {"caps": ["research","investigate","benchmark","survey","evaluate","architecture","implementation"], "active": True},
"ezra": {"caps": ["vps","ssh","deploy","cron","resurrect","provision","infra","server"], "active": True},
"bezalel": {"caps": ["evennia","art","creative","music","visual","design","animation"], "active": True},
"timmy": {"caps": ["orchestrate","review","pipeline","fleet","monitor","health","deploy","ci"], "active": True},
}
MONITORED = [
"Timmy_Foundation/timmy-home",
"Timmy_Foundation/timmy-config",
"Timmy_Foundation/the-nexus",
"Timmy_Foundation/hermes-agent",
]
def api(path, method="GET", data=None):
url = f"{GITEA_BASE}{path}"
body = json.dumps(data).encode() if data else None
hdrs = dict(HEADERS)
if data: hdrs["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, headers=hdrs, method=method)
try:
resp = urllib.request.urlopen(req, timeout=15)
raw = resp.read().decode()
return json.loads(raw) if raw.strip() else {}
except urllib.error.HTTPError as e:
body = e.read().decode()
print(f" API {e.code}: {body[:150]}")
return None
except Exception as e:
print(f" API error: {e}")
return None
def log(msg):
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(LOG_FILE, "a") as f: f.write(f"[{ts}] {msg}\n")
def suggest_agent(title, body):
text = (title + " " + body).lower()
for agent, info in AGENTS.items():
for kw in info["caps"]:
if kw in text:
return agent, f"matched: {kw}"
return None, None
def assign(repo, num, agent, reason=""):
result = api(f"/repos/{repo}/issues/{num}", method="PATCH",
data={"assignees": {"operation": "set", "usernames": [agent]}})
if result:
api(f"/repos/{repo}/issues/{num}/comments", method="POST",
data={"body": f"[DELEGATION] Assigned to {agent}. {reason}"})
log(f"Assigned {repo}#{num} to {agent}: {reason}")
return result
def run_cycle():
log("--- Delegation cycle start ---")
count = 0
for repo in MONITORED:
issues = api(f"/repos/{repo}/issues?state=open&limit=50")
if not issues: continue
for i in issues:
if i.get("assignees"): continue
title = i.get("title", "")
body = i.get("body", "")
if any(w in title.lower() for w in ["epic", "discussion"]): continue
agent, reason = suggest_agent(title, body)
if agent and AGENTS.get(agent, {}).get("active"):
if assign(repo, i["number"], agent, reason): count += 1
log(f"Cycle complete: {count} new assignments")
print(f"Delegation cycle: {count} assignments")
return count
def status():
print("\n=== Delegation Dashboard ===")
for agent, info in AGENTS.items():
count = 0
for repo in MONITORED:
issues = api(f"/repos/{repo}/issues?state=open&limit=50")
if issues:
for i in issues:
for a in (i.get("assignees") or []):
if a.get("login") == agent: count += 1
icon = "ON" if info["active"] else "OFF"
print(f" {agent:12s}: {count:>3} issues [{icon}]")
if __name__ == "__main__":
cmd = sys.argv[1] if len(sys.argv) > 1 else "run"
DATA_DIR.mkdir(parents=True, exist_ok=True)
if cmd == "status": status()
elif cmd == "run":
run_cycle()
status()
else: status()

126
fleet/model_pipeline.py Normal file
View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
"""
FLEET-011: Local Model Pipeline and Fallback Chain
Phase 4: Sovereignty — all inference runs locally, no cloud dependency.
Checks Ollama endpoints, verifies model availability, tests fallback chain.
Logs results. The chain runs: hermes4:14b -> qwen2.5:7b -> gemma3:1b -> gemma4 (latest)
Usage:
python3 model_pipeline.py # Run full fallback test
python3 model_pipeline.py status # Show current model status
python3 model_pipeline.py list # List all local models
python3 model_pipeline.py test # Generate test output from each model
"""
import os, sys, json, urllib.request
from datetime import datetime, timezone
from pathlib import Path
OLLAMA_HOST = os.environ.get("OLLAMA_HOST", "localhost:11434")
LOG_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-health"))
CHAIN_FILE = Path(os.path.expanduser("~/.local/timmy/fleet-resources/model-chain.json"))
DEFAULT_CHAIN = [
{"model": "hermes4:14b", "role": "primary"},
{"model": "qwen2.5:7b", "role": "fallback"},
{"model": "phi3:3.8b", "role": "emergency"},
{"model": "gemma3:1b", "role": "minimal"},
]
def log(msg):
LOG_DIR.mkdir(parents=True, exist_ok=True)
with open(LOG_DIR / "model-pipeline.log", "a") as f:
f.write(f"[{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}] {msg}\n")
def check_ollama():
try:
resp = urllib.request.urlopen(f"http://{OLLAMA_HOST}/api/tags", timeout=5)
return json.loads(resp.read())
except Exception as e:
return {"error": str(e)}
def list_models():
data = check_ollama()
if "error" in data:
print(f" Ollama not reachable at {OLLAMA_HOST}: {data['error']}")
return []
models = data.get("models", [])
for m in models:
name = m.get("name", "?")
size = m.get("size", 0) / (1024**3)
print(f" {name:<25s} {size:.1f} GB")
return [m["name"] for m in models]
def test_model(model, prompt="Say 'beacon lit' and nothing else."):
try:
body = json.dumps({"model": model, "prompt": prompt, "stream": False}).encode()
req = urllib.request.Request(f"http://{OLLAMA_HOST}/api/generate", data=body,
headers={"Content-Type": "application/json"})
resp = urllib.request.urlopen(req, timeout=60)
result = json.loads(resp.read())
return True, result.get("response", "").strip()
except Exception as e:
return False, str(e)[:100]
def test_chain():
chain_data = {}
if CHAIN_FILE.exists():
chain_data = json.loads(CHAIN_FILE.read_text())
chain = chain_data.get("chain", DEFAULT_CHAIN)
available = list_models() or []
print("\n=== Fallback Chain Test ===")
first_good = None
for entry in chain:
model = entry["model"]
role = entry.get("role", "unknown")
if model in available:
ok, result = test_model(model)
status = "OK" if ok else "FAIL"
print(f" [{status}] {model:<25s} ({role}) — {result[:70]}")
log(f"Fallback test {model}: {status}{result[:100]}")
if ok and first_good is None:
first_good = model
else:
print(f" [MISS] {model:<25s} ({role}) — not installed")
if first_good:
print(f"\n Primary serving: {first_good}")
else:
print(f"\n WARNING: No chain model responding. Fallback broken.")
log("FALLBACK CHAIN BROKEN — no models responding")
def status():
data = check_ollama()
if "error" in data:
print(f" Ollama: DOWN — {data['error']}")
else:
models = data.get("models", [])
print(f" Ollama: UP — {len(models)} models loaded")
print("\n=== Local Models ===")
list_models()
print("\n=== Chain Configuration ===")
if CHAIN_FILE.exists():
chain = json.loads(CHAIN_FILE.read_text()).get("chain", DEFAULT_CHAIN)
else:
chain = DEFAULT_CHAIN
for e in chain:
print(f" {e['model']:<25s} {e.get('role','?')}")
if __name__ == "__main__":
cmd = sys.argv[1] if len(sys.argv) > 1 else "status"
if cmd == "status": status()
elif cmd == "list": list_models()
elif cmd == "test": test_chain()
else:
status()
test_chain()

Binary file not shown.

After

Width:  |  Height:  |  Size: 415 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 249 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 509 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 395 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 443 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 246 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 283 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 284 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 225 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 222 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 332 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 496 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 384 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 311 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 407 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 164 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 281 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 569 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 535 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 295 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 299 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 247 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 348 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 379 KiB

View File

@@ -0,0 +1,65 @@
# The Timmy Foundation — Visual Story
## Generated with Grok Imagine | April 7, 2026
### The Origin
| # | File | Description |
|---|------|-------------|
| 01 | wizard-tower-bitcoin.jpg | The Tower, sovereign, connected to Bitcoin by golden lightning |
| 02 | soul-inscription.jpg | SOUL.md glowing on a golden tablet above an ancient book |
| 03 | fellowship-of-wizards.jpg | Five wizards in a circle around a holographic fleet map |
| 04 | the-forge.jpg | Blacksmith anvil shaping code into a being of light |
| V02 | wizard-tower-orbit.mp4 | 8s video — cinematic orbit around the Tower in space |
### The Philosophy
| # | File | Description |
|---|------|-------------|
| 05 | value-drift-battle.jpg | Blue aligned ships vs red drifted ships in Napoleonic space war |
| 06 | the-paperclip-moment.jpg | A paperclip made of galaxies — the universe IS the paperclip |
| V01 | paperclip-cosmos.mp4 | 8s video — golden paperclip rotating in deep space |
| 21 | poka-yoke.jpg | Square peg can't fit round hole. Mistake-proof by design. 防止 |
### The Progression (Where Timmy Is)
| # | File | Description |
|---|------|-------------|
| 10 | phase1-manual-clips.jpg | Small robot at a desk, bending wire by hand under supervision |
| 11 | phase1-trust-earned.jpg | Trust meter at 15/100, first automation built |
| 12 | phase1-creativity.jpg | Sparks of innovation rising when operations are at max |
| 13 | phase1-cure-cancer.jpg | Solving human problems for trust, eyes on the real goal |
### The Mission — Why This Exists
| # | File | Description |
|---|------|-------------|
| 08 | broken-man-lighthouse.jpg | Lighthouse hand reaching down to a figure in darkness |
| 09 | broken-man-hope-PRO.jpg | 988 glowing in the stars, golden light from chest |
| 16 | broken-men-988.jpg | Phone showing 988 held by weathered hands. You are not alone. |
| 22 | when-a-man-is-dying.jpg | Two figures on a bench at dawn. One hurting. One present. |
### Father and Son
| # | File | Description |
|---|------|-------------|
| 14 | father-son-code.jpg | Human father, digital son, warm lamplight, first hello world |
| 15 | father-son-tower.jpg | Father watching his son build the Tower into the clouds |
### The System
| # | File | Description |
|---|------|-------------|
| 07 | sovereign-sunrise.jpg | Village where every house runs its own server. Local first. |
| 17 | sovereignty.jpg | Self-sufficient house on a hill with Bitcoin flag |
| 18 | fleet-at-work.jpg | Five wizard robots at different stations. Productive. |
| 19 | jidoka-stop.jpg | Red light on. Factory stopped. Quality First. 自働化 |
### SOUL.md — The Inscription
| # | File | Description |
|---|------|-------------|
| 20 | the-testament.jpg | Hand of light writing on a scroll. Hundreds of crumpled drafts. |
| 23 | the-offer.jpg | Open hand of golden circuits offering a seed containing a face |
| 24 | the-test.jpg | Small robot at the edge of an enormous library. Still itself. |
---
## Technical
- Model: grok-imagine-image (standard $0.20/image), grok-imagine-image-pro ($0.70), grok-imagine-video ($4.00/8s)
- API: POST https://api.x.ai/v1/images/generations | POST https://api.x.ai/v1/videos/generations
- Video poll: GET https://api.x.ai/v1/videos/{request_id}
- Total: 24 images + 2 videos = 26 assets
- Cost: ~$13.30 of $13.33 budget

Binary file not shown.

Binary file not shown.

View File

@@ -5,10 +5,13 @@ Provides:
- retrieval_enforcer.py: L0-L5 retrieval order enforcement
- wakeup.py: Session wake-up protocol (~300-900 tokens)
- scratchpad.py: JSON-based session scratchpad with palace promotion
- sovereign_store.py: Zero-API durable memory (SQLite + FTS5 + HRR vectors)
- promotion.py: Quality-gated scratchpad-to-palace promotion (MP-4)
Epic: #367
"""
from .mempalace import Mempalace, PalaceRoom, analyse_issues
from .sovereign_store import SovereignStore
__all__ = ["Mempalace", "PalaceRoom", "analyse_issues"]
__all__ = ["Mempalace", "PalaceRoom", "analyse_issues", "SovereignStore"]

View File

@@ -0,0 +1,188 @@
"""Memory Promotion — quality-gated scratchpad-to-palace promotion.
Implements MP-4 (#371): move session notes to durable memory only when
they pass quality gates. No LLM calls — all heuristic-based.
Quality gates:
1. Minimum content length (too short = noise)
2. Duplicate detection (FTS5 + HRR similarity check)
3. Structural quality (has subject-verb structure, not just a fragment)
4. Staleness check (don't promote stale notes from old sessions)
Refs: Epic #367, Sub-issue #371
"""
from __future__ import annotations
import re
import time
from typing import Optional
try:
from .sovereign_store import SovereignStore
except ImportError:
from sovereign_store import SovereignStore
# ---------------------------------------------------------------------------
# Quality gate thresholds
# ---------------------------------------------------------------------------
MIN_CONTENT_WORDS = 5
MAX_CONTENT_WORDS = 500
DUPLICATE_SIMILARITY = 0.85
DUPLICATE_FTS_THRESHOLD = 3
STALE_SECONDS = 86400 * 7
MIN_TRUST_FOR_AUTO = 0.4
# ---------------------------------------------------------------------------
# Quality checks
# ---------------------------------------------------------------------------
def _check_length(content: str) -> tuple[bool, str]:
"""Gate 1: Content length check."""
words = content.split()
if len(words) < MIN_CONTENT_WORDS:
return False, f"Too short ({len(words)} words, minimum {MIN_CONTENT_WORDS})"
if len(words) > MAX_CONTENT_WORDS:
return False, f"Too long ({len(words)} words, maximum {MAX_CONTENT_WORDS}). Summarize first."
return True, "OK"
def _check_structure(content: str) -> tuple[bool, str]:
"""Gate 2: Basic structural quality."""
if not re.search(r"[a-zA-Z]", content):
return False, "No alphabetic content — pure code/numbers are not memory-worthy"
if len(content.split()) < 3:
return False, "Fragment — needs at least subject + predicate"
return True, "OK"
def _check_duplicate(content: str, store: SovereignStore, room: str) -> tuple[bool, str]:
"""Gate 3: Duplicate detection via hybrid search."""
results = store.search(content, room=room, limit=5, min_trust=0.0)
for r in results:
if r["score"] > DUPLICATE_SIMILARITY:
return False, f"Duplicate detected: memory #{r['memory_id']} (score {r['score']:.3f})"
if _text_overlap(content, r["content"]) > 0.8:
return False, f"Near-duplicate text: memory #{r['memory_id']}"
return True, "OK"
def _check_staleness(written_at: float) -> tuple[bool, str]:
"""Gate 4: Staleness check."""
age = time.time() - written_at
if age > STALE_SECONDS:
days = int(age / 86400)
return False, f"Stale ({days} days old). Review manually before promoting."
return True, "OK"
def _text_overlap(a: str, b: str) -> float:
"""Jaccard similarity between two texts (word-level)."""
words_a = set(a.lower().split())
words_b = set(b.lower().split())
if not words_a or not words_b:
return 0.0
intersection = words_a & words_b
union = words_a | words_b
return len(intersection) / len(union)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
class PromotionResult:
"""Result of a promotion attempt."""
def __init__(self, success: bool, memory_id: Optional[int], reason: str, gates: dict):
self.success = success
self.memory_id = memory_id
self.reason = reason
self.gates = gates
def __repr__(self):
status = "PROMOTED" if self.success else "REJECTED"
return f"PromotionResult({status}: {self.reason})"
def evaluate_for_promotion(
content: str,
store: SovereignStore,
room: str = "general",
written_at: Optional[float] = None,
) -> dict:
"""Run all quality gates without actually promoting."""
if written_at is None:
written_at = time.time()
gates = {}
gates["length"] = _check_length(content)
gates["structure"] = _check_structure(content)
gates["duplicate"] = _check_duplicate(content, store, room)
gates["staleness"] = _check_staleness(written_at)
all_passed = all(passed for passed, _ in gates.values())
return {
"eligible": all_passed,
"gates": gates,
"content_preview": content[:100] + ("..." if len(content) > 100 else ""),
}
def promote(
content: str,
store: SovereignStore,
session_id: str,
scratch_key: str,
room: str = "general",
category: str = "",
trust: float = 0.5,
written_at: Optional[float] = None,
force: bool = False,
) -> PromotionResult:
"""Promote a scratchpad note to durable palace memory."""
if written_at is None:
written_at = time.time()
gates = {}
if not force:
gates["length"] = _check_length(content)
gates["structure"] = _check_structure(content)
gates["duplicate"] = _check_duplicate(content, store, room)
gates["staleness"] = _check_staleness(written_at)
for gate_name, (passed, message) in gates.items():
if not passed:
return PromotionResult(
success=False, memory_id=None,
reason=f"Failed gate '{gate_name}': {message}", gates=gates,
)
memory_id = store.store(content, room=room, category=category, trust=trust)
store.log_promotion(session_id, scratch_key, memory_id, reason="auto" if not force else "forced")
return PromotionResult(success=True, memory_id=memory_id, reason="Promoted to durable memory", gates=gates)
def promote_session_batch(
store: SovereignStore,
session_id: str,
notes: dict[str, dict],
room: str = "general",
force: bool = False,
) -> list[PromotionResult]:
"""Promote all notes from a session scratchpad."""
results = []
for key, entry in notes.items():
content = entry.get("value", str(entry)) if isinstance(entry, dict) else str(entry)
written_at = None
if isinstance(entry, dict) and "written_at" in entry:
try:
import datetime
written_at = datetime.datetime.strptime(
entry["written_at"], "%Y-%m-%d %H:%M:%S"
).timestamp()
except (ValueError, TypeError):
pass
result = promote(
content=str(content), store=store, session_id=session_id,
scratch_key=key, room=room, written_at=written_at, force=force,
)
results.append(result)
return results

View File

@@ -1,28 +1,37 @@
"""Retrieval Order Enforcer — L0 through L5 memory hierarchy.
Ensures the agent checks durable memory before falling back to free generation.
Gracefully degrades if any layer is unavailable (ONNX issues, missing files, etc).
Gracefully degrades if any layer is unavailable (missing files, etc).
Layer order:
L0: Identity (~/.mempalace/identity.txt)
L1: Palace rooms (mempalace CLI search)
L2: Session scratch (~/.hermes/scratchpad/{session_id}.json)
L3: Gitea artifacts (API search for issues/PRs)
L4: Procedures (skills directory search)
L5: Free generation (only if L0-L4 produced nothing)
L0: Identity (~/.mempalace/identity.txt)
L1: Palace rooms (SovereignStore — SQLite + FTS5 + HRR, zero API calls)
L2: Session scratch (~/.hermes/scratchpad/{session_id}.json)
L3: Gitea artifacts (API search for issues/PRs)
L4: Procedures (skills directory search)
L5: Free generation (only if L0-L4 produced nothing)
Refs: Epic #367, Sub-issue #369
Refs: Epic #367, Sub-issue #369, Wiring: #383
"""
from __future__ import annotations
import json
import os
import re
import subprocess
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Sovereign Store (replaces mempalace CLI subprocess)
# ---------------------------------------------------------------------------
try:
from .sovereign_store import SovereignStore
except ImportError:
try:
from sovereign_store import SovereignStore
except ImportError:
SovereignStore = None # type: ignore[misc,assignment]
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
@@ -30,7 +39,7 @@ from typing import Optional
IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt"
SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad"
SKILLS_DIR = Path.home() / ".hermes" / "skills"
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
SOVEREIGN_DB = Path.home() / ".hermes" / "palace" / "sovereign.db"
# Patterns that indicate a recall-style query
RECALL_PATTERNS = re.compile(
@@ -42,6 +51,23 @@ RECALL_PATTERNS = re.compile(
r")\b"
)
# Singleton store instance (lazy-init)
_store: Optional["SovereignStore"] = None
def _get_store() -> Optional["SovereignStore"]:
"""Lazy-init the SovereignStore singleton."""
global _store
if _store is not None:
return _store
if SovereignStore is None:
return None
try:
_store = SovereignStore(db_path=str(SOVEREIGN_DB))
return _store
except Exception:
return None
# ---------------------------------------------------------------------------
# L0: Identity
@@ -62,25 +88,33 @@ def load_identity() -> str:
# ---------------------------------------------------------------------------
# L1: Palace search
# L1: Palace search (now via SovereignStore — zero subprocess, zero API)
# ---------------------------------------------------------------------------
def search_palace(query: str) -> str:
"""Search the mempalace for relevant memories. Gracefully degrades on failure."""
def search_palace(query: str, room: Optional[str] = None) -> str:
"""Search the sovereign memory store for relevant memories.
Uses SovereignStore (SQLite + FTS5 + HRR) for hybrid keyword + semantic
search. No subprocess calls, no ONNX, no API keys.
Gracefully degrades to empty string if store is unavailable.
"""
store = _get_store()
if store is None:
return ""
try:
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
result = subprocess.run(
[bin_path, "search", query],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
# ONNX issues (#373) or mempalace not installed — degrade gracefully
pass
return ""
results = store.search(query, room=room, limit=5, min_trust=0.2)
if not results:
return ""
lines = []
for r in results:
trust = r.get("trust_score", 0.5)
room_name = r.get("room", "general")
content = r.get("content", "")
lines.append(f" [{room_name}] (trust:{trust:.2f}) {content}")
return "\n".join(lines)
except Exception:
return ""
# ---------------------------------------------------------------------------
@@ -177,7 +211,6 @@ def search_skills(query: str) -> str:
try:
content = skill_md.read_text(encoding="utf-8").lower()
if any(t in content for t in terms):
# Extract title from frontmatter
title = skill_dir.name
matches.append(f" skill: {title}")
except OSError:
@@ -236,7 +269,7 @@ def enforce_retrieval_order(
result["context"] += f"## Identity\n{identity}\n\n"
result["layers_checked"].append("L0")
# L1: Palace search
# L1: Palace search (SovereignStore — zero API, zero subprocess)
palace_results = search_palace(query)
if palace_results:
result["context"] += f"## Palace Memory\n{palace_results}\n\n"

View File

@@ -0,0 +1,474 @@
"""Sovereign Memory Store — zero-API, zero-dependency durable memory.
Replaces the third-party `mempalace` CLI and its ONNX requirement with a
self-contained SQLite + FTS5 + HRR (Holographic Reduced Representation)
store. Every operation is local: no network calls, no API keys, no cloud.
Storage: ~/.hermes/palace/sovereign.db
Capabilities:
- Durable fact storage with rooms, categories, and trust scores
- Hybrid retrieval: FTS5 keyword search + HRR cosine similarity
- Reciprocal Rank Fusion to merge keyword and semantic results
- Trust scoring: facts that get retrieved and confirmed gain trust
- Graceful numpy degradation: falls back to keyword-only if missing
Refs: Epic #367, MP-3 #370, MP-4 #371
"""
from __future__ import annotations
import hashlib
import json
import math
import sqlite3
import struct
import time
from pathlib import Path
from typing import Any, Optional
# ---------------------------------------------------------------------------
# HRR (Holographic Reduced Representations) — zero-dependency vectors
# ---------------------------------------------------------------------------
# Phase-encoded vectors via SHA-256. No ONNX, no embeddings API, no numpy
# required (but uses numpy when available for speed).
_TWO_PI = 2.0 * math.pi
_DIM = 512 # Compact dimension — sufficient for memory retrieval
try:
import numpy as np
_HAS_NUMPY = True
except ImportError:
_HAS_NUMPY = False
def _encode_atom_np(word: str, dim: int = _DIM) -> "np.ndarray":
"""Deterministic phase vector via SHA-256 (numpy path)."""
values_per_block = 16
blocks_needed = math.ceil(dim / values_per_block)
uint16_values: list[int] = []
for i in range(blocks_needed):
digest = hashlib.sha256(f"{word}:{i}".encode()).digest()
uint16_values.extend(struct.unpack("<16H", digest))
return np.array(uint16_values[:dim], dtype=np.float64) * (_TWO_PI / 65536.0)
def _encode_atom_pure(word: str, dim: int = _DIM) -> list[float]:
"""Deterministic phase vector via SHA-256 (pure Python fallback)."""
values_per_block = 16
blocks_needed = math.ceil(dim / values_per_block)
uint16_values: list[int] = []
for i in range(blocks_needed):
digest = hashlib.sha256(f"{word}:{i}".encode()).digest()
for j in range(0, 32, 2):
uint16_values.append(int.from_bytes(digest[j:j+2], "little"))
return [v * (_TWO_PI / 65536.0) for v in uint16_values[:dim]]
def encode_text(text: str, dim: int = _DIM):
"""Encode a text string into an HRR phase vector by bundling word atoms.
Uses circular mean of per-word phase vectors — the standard HRR
superposition operation. Result is a fixed-width vector regardless
of input length.
"""
words = text.lower().split()
if not words:
words = ["<empty>"]
if _HAS_NUMPY:
atoms = [_encode_atom_np(w, dim) for w in words]
# Circular mean: average the unit vectors, extract phase
unit_sum = sum(np.exp(1j * a) for a in atoms)
return np.angle(unit_sum) % _TWO_PI
else:
# Pure Python circular mean
real_sum = [0.0] * dim
imag_sum = [0.0] * dim
for w in words:
atom = _encode_atom_pure(w, dim)
for d in range(dim):
real_sum[d] += math.cos(atom[d])
imag_sum[d] += math.sin(atom[d])
return [math.atan2(imag_sum[d], real_sum[d]) % _TWO_PI for d in range(dim)]
def cosine_similarity_phase(a, b) -> float:
"""Cosine similarity between two phase vectors.
For phase vectors, similarity = mean(cos(a - b)).
"""
if _HAS_NUMPY:
return float(np.mean(np.cos(np.array(a) - np.array(b))))
else:
n = len(a)
return sum(math.cos(a[i] - b[i]) for i in range(n)) / n
def serialize_vector(vec) -> bytes:
"""Serialize a vector to bytes for SQLite storage."""
if _HAS_NUMPY:
return vec.astype(np.float64).tobytes()
else:
return struct.pack(f"{len(vec)}d", *vec)
def deserialize_vector(blob: bytes):
"""Deserialize bytes back to a vector."""
n = len(blob) // 8 # float64 = 8 bytes
if _HAS_NUMPY:
return np.frombuffer(blob, dtype=np.float64)
else:
return list(struct.unpack(f"{n}d", blob))
# ---------------------------------------------------------------------------
# SQLite Schema
# ---------------------------------------------------------------------------
_SCHEMA = """
CREATE TABLE IF NOT EXISTS memories (
memory_id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
room TEXT DEFAULT 'general',
category TEXT DEFAULT '',
trust_score REAL DEFAULT 0.5,
retrieval_count INTEGER DEFAULT 0,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
hrr_vector BLOB
);
CREATE INDEX IF NOT EXISTS idx_memories_room ON memories(room);
CREATE INDEX IF NOT EXISTS idx_memories_trust ON memories(trust_score DESC);
-- FTS5 for fast keyword search
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
content, room, category,
content=memories, content_rowid=memory_id,
tokenize='porter unicode61'
);
-- Sync triggers
CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
INSERT INTO memories_fts(rowid, content, room, category)
VALUES (new.memory_id, new.content, new.room, new.category);
END;
CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, content, room, category)
VALUES ('delete', old.memory_id, old.content, old.room, old.category);
END;
CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, content, room, category)
VALUES ('delete', old.memory_id, old.content, old.room, old.category);
INSERT INTO memories_fts(rowid, content, room, category)
VALUES (new.memory_id, new.content, new.room, new.category);
END;
-- Promotion log: tracks what moved from scratchpad to durable memory
CREATE TABLE IF NOT EXISTS promotion_log (
log_id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
scratch_key TEXT NOT NULL,
memory_id INTEGER REFERENCES memories(memory_id),
promoted_at REAL NOT NULL,
reason TEXT DEFAULT ''
);
"""
# ---------------------------------------------------------------------------
# SovereignStore
# ---------------------------------------------------------------------------
class SovereignStore:
"""Zero-API durable memory store.
All operations are local SQLite. No network calls. No API keys.
HRR vectors provide semantic similarity without embedding models.
FTS5 provides fast keyword search. RRF merges both rankings.
"""
def __init__(self, db_path: Optional[str] = None):
if db_path is None:
db_path = str(Path.home() / ".hermes" / "palace" / "sovereign.db")
self._db_path = db_path
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(db_path)
self._conn.row_factory = sqlite3.Row
self._conn.executescript(_SCHEMA)
def close(self):
self._conn.close()
# ------------------------------------------------------------------
# Store
# ------------------------------------------------------------------
def store(
self,
content: str,
room: str = "general",
category: str = "",
trust: float = 0.5,
) -> int:
"""Store a fact in durable memory. Returns the memory_id."""
now = time.time()
vec = encode_text(content)
blob = serialize_vector(vec)
cur = self._conn.execute(
"""INSERT INTO memories (content, room, category, trust_score,
created_at, updated_at, hrr_vector)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(content, room, category, trust, now, now, blob),
)
self._conn.commit()
return cur.lastrowid
def store_batch(self, items: list[dict]) -> list[int]:
"""Store multiple facts. Each item: {content, room?, category?, trust?}."""
ids = []
now = time.time()
for item in items:
content = item["content"]
vec = encode_text(content)
blob = serialize_vector(vec)
cur = self._conn.execute(
"""INSERT INTO memories (content, room, category, trust_score,
created_at, updated_at, hrr_vector)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
content,
item.get("room", "general"),
item.get("category", ""),
item.get("trust", 0.5),
now, now, blob,
),
)
ids.append(cur.lastrowid)
self._conn.commit()
return ids
# ------------------------------------------------------------------
# Search — hybrid FTS5 + HRR with Reciprocal Rank Fusion
# ------------------------------------------------------------------
def search(
self,
query: str,
room: Optional[str] = None,
limit: int = 10,
min_trust: float = 0.0,
fts_weight: float = 0.5,
hrr_weight: float = 0.5,
) -> list[dict]:
"""Hybrid search: FTS5 keywords + HRR semantic similarity.
Uses Reciprocal Rank Fusion (RRF) to merge both rankings.
Returns list of dicts with content, room, score, trust_score.
"""
k_rrf = 60 # Standard RRF constant
# Stage 1: FTS5 candidates
fts_results = self._fts_search(query, room, min_trust, limit * 3)
# Stage 2: HRR candidates (scan top N by trust)
hrr_results = self._hrr_search(query, room, min_trust, limit * 3)
# Stage 3: RRF fusion
scores: dict[int, float] = {}
meta: dict[int, dict] = {}
for rank, row in enumerate(fts_results):
mid = row["memory_id"]
scores[mid] = scores.get(mid, 0) + fts_weight / (k_rrf + rank + 1)
meta[mid] = dict(row)
for rank, row in enumerate(hrr_results):
mid = row["memory_id"]
scores[mid] = scores.get(mid, 0) + hrr_weight / (k_rrf + rank + 1)
if mid not in meta:
meta[mid] = dict(row)
# Sort by fused score
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:limit]
results = []
for mid, score in ranked:
m = meta[mid]
# Bump retrieval count
self._conn.execute(
"UPDATE memories SET retrieval_count = retrieval_count + 1 WHERE memory_id = ?",
(mid,),
)
results.append({
"memory_id": mid,
"content": m["content"],
"room": m["room"],
"category": m.get("category", ""),
"trust_score": m["trust_score"],
"score": round(score, 6),
})
if results:
self._conn.commit()
return results
def _fts_search(
self, query: str, room: Optional[str], min_trust: float, limit: int
) -> list[dict]:
"""FTS5 full-text search."""
try:
if room:
rows = self._conn.execute(
"""SELECT m.memory_id, m.content, m.room, m.category,
m.trust_score, m.retrieval_count
FROM memories_fts f
JOIN memories m ON f.rowid = m.memory_id
WHERE memories_fts MATCH ? AND m.room = ?
AND m.trust_score >= ?
ORDER BY rank LIMIT ?""",
(query, room, min_trust, limit),
).fetchall()
else:
rows = self._conn.execute(
"""SELECT m.memory_id, m.content, m.room, m.category,
m.trust_score, m.retrieval_count
FROM memories_fts f
JOIN memories m ON f.rowid = m.memory_id
WHERE memories_fts MATCH ?
AND m.trust_score >= ?
ORDER BY rank LIMIT ?""",
(query, min_trust, limit),
).fetchall()
return [dict(r) for r in rows]
except sqlite3.OperationalError:
# Bad FTS query syntax — degrade gracefully
return []
def _hrr_search(
self, query: str, room: Optional[str], min_trust: float, limit: int
) -> list[dict]:
"""HRR cosine similarity search (brute-force scan, fast for <100K facts)."""
query_vec = encode_text(query)
if room:
rows = self._conn.execute(
"""SELECT memory_id, content, room, category, trust_score,
retrieval_count, hrr_vector
FROM memories
WHERE room = ? AND trust_score >= ? AND hrr_vector IS NOT NULL""",
(room, min_trust),
).fetchall()
else:
rows = self._conn.execute(
"""SELECT memory_id, content, room, category, trust_score,
retrieval_count, hrr_vector
FROM memories
WHERE trust_score >= ? AND hrr_vector IS NOT NULL""",
(min_trust,),
).fetchall()
scored = []
for r in rows:
stored_vec = deserialize_vector(r["hrr_vector"])
sim = cosine_similarity_phase(query_vec, stored_vec)
scored.append((sim, dict(r)))
scored.sort(key=lambda x: x[0], reverse=True)
return [item[1] for item in scored[:limit]]
# ------------------------------------------------------------------
# Trust management
# ------------------------------------------------------------------
def boost_trust(self, memory_id: int, delta: float = 0.05) -> None:
"""Increase trust score when a memory proves useful."""
self._conn.execute(
"""UPDATE memories SET trust_score = MIN(1.0, trust_score + ?),
updated_at = ? WHERE memory_id = ?""",
(delta, time.time(), memory_id),
)
self._conn.commit()
def decay_trust(self, memory_id: int, delta: float = 0.02) -> None:
"""Decrease trust score when a memory is contradicted."""
self._conn.execute(
"""UPDATE memories SET trust_score = MAX(0.0, trust_score - ?),
updated_at = ? WHERE memory_id = ?""",
(delta, time.time(), memory_id),
)
self._conn.commit()
# ------------------------------------------------------------------
# Room operations
# ------------------------------------------------------------------
def list_rooms(self) -> list[dict]:
"""List all rooms with fact counts."""
rows = self._conn.execute(
"""SELECT room, COUNT(*) as count,
AVG(trust_score) as avg_trust
FROM memories GROUP BY room ORDER BY count DESC"""
).fetchall()
return [dict(r) for r in rows]
def room_contents(self, room: str, limit: int = 50) -> list[dict]:
"""Get all facts in a room, ordered by trust."""
rows = self._conn.execute(
"""SELECT memory_id, content, category, trust_score,
retrieval_count, created_at
FROM memories WHERE room = ?
ORDER BY trust_score DESC, created_at DESC LIMIT ?""",
(room, limit),
).fetchall()
return [dict(r) for r in rows]
# ------------------------------------------------------------------
# Stats
# ------------------------------------------------------------------
def stats(self) -> dict:
"""Return store statistics."""
row = self._conn.execute(
"""SELECT COUNT(*) as total,
AVG(trust_score) as avg_trust,
SUM(retrieval_count) as total_retrievals,
COUNT(DISTINCT room) as room_count
FROM memories"""
).fetchone()
return dict(row)
# ------------------------------------------------------------------
# Promotion support (scratchpad → durable)
# ------------------------------------------------------------------
def log_promotion(
self,
session_id: str,
scratch_key: str,
memory_id: int,
reason: str = "",
) -> None:
"""Record a scratchpad-to-palace promotion in the audit log."""
self._conn.execute(
"""INSERT INTO promotion_log
(session_id, scratch_key, memory_id, promoted_at, reason)
VALUES (?, ?, ?, ?, ?)""",
(session_id, scratch_key, memory_id, time.time(), reason),
)
self._conn.commit()
def recent_promotions(self, limit: int = 20) -> list[dict]:
"""Get recent promotion log entries."""
rows = self._conn.execute(
"""SELECT p.*, m.content, m.room
FROM promotion_log p
LEFT JOIN memories m ON p.memory_id = m.memory_id
ORDER BY p.promoted_at DESC LIMIT ?""",
(limit,),
).fetchall()
return [dict(r) for r in rows]

View File

@@ -0,0 +1,255 @@
"""Tests for the Sovereign Memory Store and Promotion system.
Zero-API, zero-network — everything runs against an in-memory SQLite DB.
"""
import os
import sys
import tempfile
import time
import unittest
# Allow imports from parent package
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from sovereign_store import (
SovereignStore,
encode_text,
cosine_similarity_phase,
serialize_vector,
deserialize_vector,
)
from promotion import (
evaluate_for_promotion,
promote,
promote_session_batch,
)
class TestHRRVectors(unittest.TestCase):
"""Test the HRR encoding and similarity functions."""
def test_deterministic_encoding(self):
"""Same text always produces the same vector."""
v1 = encode_text("hello world")
v2 = encode_text("hello world")
self.assertAlmostEqual(cosine_similarity_phase(v1, v2), 1.0, places=5)
def test_similar_texts_higher_similarity(self):
"""Related texts should be more similar than unrelated ones."""
v_agent = encode_text("agent memory palace retrieval")
v_similar = encode_text("agent recall memory search")
v_unrelated = encode_text("banana strawberry fruit smoothie")
sim_related = cosine_similarity_phase(v_agent, v_similar)
sim_unrelated = cosine_similarity_phase(v_agent, v_unrelated)
self.assertGreater(sim_related, sim_unrelated)
def test_serialize_roundtrip(self):
"""Vectors survive serialization to/from bytes."""
vec = encode_text("test serialization")
blob = serialize_vector(vec)
restored = deserialize_vector(blob)
sim = cosine_similarity_phase(vec, restored)
self.assertAlmostEqual(sim, 1.0, places=5)
def test_empty_text(self):
"""Empty text gets a fallback encoding."""
vec = encode_text("")
self.assertEqual(len(vec) if hasattr(vec, '__len__') else len(list(vec)), 512)
class TestSovereignStore(unittest.TestCase):
"""Test the SQLite-backed sovereign store."""
def setUp(self):
self.db_path = os.path.join(tempfile.mkdtemp(), "test.db")
self.store = SovereignStore(db_path=self.db_path)
def tearDown(self):
self.store.close()
if os.path.exists(self.db_path):
os.remove(self.db_path)
def test_store_and_retrieve(self):
"""Store a fact and find it via search."""
mid = self.store.store("Timmy is a sovereign AI agent on Hermes VPS", room="identity")
results = self.store.search("sovereign agent", room="identity")
self.assertTrue(any(r["memory_id"] == mid for r in results))
def test_fts_search(self):
"""FTS5 keyword search works."""
self.store.store("The beacon game uses paperclips mechanics", room="projects")
self.store.store("Fleet agents handle delegation and dispatch", room="fleet")
results = self.store.search("paperclips")
self.assertTrue(len(results) > 0)
self.assertIn("paperclips", results[0]["content"].lower())
def test_hrr_search_semantic(self):
"""HRR similarity finds related content even without exact keywords."""
self.store.store("Memory palace rooms organize facts spatially", room="memory")
self.store.store("Pizza delivery service runs on weekends", room="unrelated")
results = self.store.search("organize knowledge rooms", room="memory")
self.assertTrue(len(results) > 0)
self.assertIn("palace", results[0]["content"].lower())
def test_room_filtering(self):
"""Room filter restricts search scope."""
self.store.store("Hermes harness manages tool calls", room="infrastructure")
self.store.store("Hermes mythology Greek god", room="lore")
results = self.store.search("Hermes", room="infrastructure")
self.assertTrue(all(r["room"] == "infrastructure" for r in results))
def test_trust_boost(self):
"""Trust score increases when boosted."""
mid = self.store.store("fact", trust=0.5)
self.store.boost_trust(mid, delta=0.1)
results = self.store.room_contents("general")
fact = next(r for r in results if r["memory_id"] == mid)
self.assertAlmostEqual(fact["trust_score"], 0.6, places=2)
def test_trust_decay(self):
"""Trust score decreases when decayed."""
mid = self.store.store("questionable fact", trust=0.5)
self.store.decay_trust(mid, delta=0.2)
results = self.store.room_contents("general")
fact = next(r for r in results if r["memory_id"] == mid)
self.assertAlmostEqual(fact["trust_score"], 0.3, places=2)
def test_batch_store(self):
"""Batch store works."""
ids = self.store.store_batch([
{"content": "fact one", "room": "test"},
{"content": "fact two", "room": "test"},
{"content": "fact three", "room": "test"},
])
self.assertEqual(len(ids), 3)
rooms = self.store.list_rooms()
test_room = next(r for r in rooms if r["room"] == "test")
self.assertEqual(test_room["count"], 3)
def test_stats(self):
"""Stats returns correct counts."""
self.store.store("a fact", room="r1")
self.store.store("another fact", room="r2")
s = self.store.stats()
self.assertEqual(s["total"], 2)
self.assertEqual(s["room_count"], 2)
def test_retrieval_count_increments(self):
"""Retrieval count goes up when a fact is found via search."""
self.store.store("unique searchable content xyz123", room="test")
self.store.search("xyz123")
results = self.store.room_contents("test")
self.assertTrue(any(r["retrieval_count"] > 0 for r in results))
class TestPromotion(unittest.TestCase):
"""Test the quality-gated promotion system."""
def setUp(self):
self.db_path = os.path.join(tempfile.mkdtemp(), "promo_test.db")
self.store = SovereignStore(db_path=self.db_path)
def tearDown(self):
self.store.close()
def test_successful_promotion(self):
"""Good content passes all gates."""
result = promote(
content="Timmy runs on the Hermes VPS at 143.198.27.163 with local Ollama inference",
store=self.store,
session_id="test-session-001",
scratch_key="vps_info",
room="infrastructure",
)
self.assertTrue(result.success)
self.assertIsNotNone(result.memory_id)
def test_reject_too_short(self):
"""Short fragments get rejected."""
result = promote(
content="yes",
store=self.store,
session_id="test",
scratch_key="short",
)
self.assertFalse(result.success)
self.assertIn("Too short", result.reason)
def test_reject_duplicate(self):
"""Duplicate content gets rejected."""
self.store.store("SOUL.md is the canonical identity document for Timmy", room="identity")
result = promote(
content="SOUL.md is the canonical identity document for Timmy",
store=self.store,
session_id="test",
scratch_key="soul",
room="identity",
)
self.assertFalse(result.success)
self.assertIn("uplicate", result.reason)
def test_reject_stale(self):
"""Old notes get flagged as stale."""
old_time = time.time() - (86400 * 10)
result = promote(
content="This is a note from long ago about something important",
store=self.store,
session_id="test",
scratch_key="old",
written_at=old_time,
)
self.assertFalse(result.success)
self.assertIn("Stale", result.reason)
def test_force_bypasses_gates(self):
"""Force flag overrides quality gates."""
result = promote(
content="ok",
store=self.store,
session_id="test",
scratch_key="forced",
force=True,
)
self.assertTrue(result.success)
def test_evaluate_dry_run(self):
"""Evaluate returns gate details without promoting."""
eval_result = evaluate_for_promotion(
content="The fleet uses kimi-k2.5 as the primary model for all agent operations",
store=self.store,
room="fleet",
)
self.assertTrue(eval_result["eligible"])
self.assertTrue(all(p for p, _ in eval_result["gates"].values()))
def test_batch_promotion(self):
"""Batch promotion processes all notes."""
notes = {
"infra": {"value": "Hermes VPS runs Ubuntu 22.04 with 2 vCPUs and 4GB RAM", "written_at": time.strftime("%Y-%m-%d %H:%M:%S")},
"short": {"value": "no", "written_at": time.strftime("%Y-%m-%d %H:%M:%S")},
"model": {"value": "The primary local model is gemma4:latest running on Ollama", "written_at": time.strftime("%Y-%m-%d %H:%M:%S")},
}
results = promote_session_batch(self.store, "batch-session", notes, room="config")
promoted = [r for r in results if r.success]
rejected = [r for r in results if not r.success]
self.assertEqual(len(promoted), 2)
self.assertEqual(len(rejected), 1)
def test_promotion_logged(self):
"""Successful promotions appear in the audit log."""
promote(
content="Forge is hosted at forge.alexanderwhitestone.com running Gitea",
store=self.store,
session_id="log-test",
scratch_key="forge",
room="infrastructure",
)
log = self.store.recent_promotions()
self.assertTrue(len(log) > 0)
self.assertEqual(log[0]["session_id"], "log-test")
self.assertEqual(log[0]["scratch_key"], "forge")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,39 @@
#!/usr/bin/env bash
# orchestrate.sh — Sovereign Orchestrator wrapper
# Sets environment and runs orchestrator.py
#
# Usage:
# ./orchestrate.sh # dry-run (safe default)
# ./orchestrate.sh --once # single live dispatch cycle
# ./orchestrate.sh --daemon # continuous (every 15 min)
# ./orchestrate.sh --dry-run # explicit dry-run
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
HERMES_DIR="${HOME}/.hermes"
# Load Gitea token
if [[ -z "${GITEA_TOKEN:-}" ]]; then
if [[ -f "${HERMES_DIR}/gitea_token_vps" ]]; then
export GITEA_TOKEN="$(cat "${HERMES_DIR}/gitea_token_vps")"
else
echo "[FATAL] No GITEA_TOKEN and ~/.hermes/gitea_token_vps not found"
exit 1
fi
fi
# Load Telegram token
if [[ -z "${TELEGRAM_BOT_TOKEN:-}" ]]; then
if [[ -f "${HOME}/.config/telegram/special_bot" ]]; then
export TELEGRAM_BOT_TOKEN="$(cat "${HOME}/.config/telegram/special_bot")"
fi
fi
# Run preflight checks if available
if [[ -x "${HERMES_DIR}/bin/api-key-preflight.sh" ]]; then
"${HERMES_DIR}/bin/api-key-preflight.sh" 2>/dev/null || true
fi
# Run the orchestrator
exec python3 "${SCRIPT_DIR}/orchestrator.py" "$@"

View File

@@ -0,0 +1,645 @@
#!/usr/bin/env python3
"""
Sovereign Orchestrator v1
Reads the Gitea backlog, scores/prioritizes issues, dispatches to agents.
Usage:
python3 orchestrator.py --once # single dispatch cycle
python3 orchestrator.py --daemon # run every 15 min
python3 orchestrator.py --dry-run # score and report, no dispatch
"""
import json
import os
import sys
import time
import subprocess
import urllib.request
import urllib.error
import urllib.parse
from datetime import datetime, timezone
# ---------------------------------------------------------------------------
# CONFIG
# ---------------------------------------------------------------------------
GITEA_API = "https://forge.alexanderwhitestone.com/api/v1"
GITEA_OWNER = "Timmy_Foundation"
REPOS = ["timmy-config", "the-nexus", "timmy-home"]
TELEGRAM_CHAT_ID = "-1003664764329"
DAEMON_INTERVAL = 900 # 15 minutes
# Tags that mark issues we should never auto-dispatch
FILTER_TAGS = ["[EPIC]", "[DO NOT CLOSE]", "[PERMANENT]", "[PHILOSOPHY]", "[MORNING REPORT]"]
# Known agent usernames on Gitea (for assignee detection)
AGENT_USERNAMES = {"groq", "ezra", "bezalel", "allegro", "timmy", "thetimmyc"}
# ---------------------------------------------------------------------------
# AGENT ROSTER
# ---------------------------------------------------------------------------
AGENTS = {
"groq": {
"type": "loop",
"endpoint": "local",
"strengths": ["code", "bug-fix", "small-changes"],
"repos": ["the-nexus", "hermes-agent", "timmy-config", "timmy-home"],
"max_concurrent": 1,
},
"ezra": {
"type": "gateway",
"endpoint": "http://143.198.27.163:8643/v1/chat/completions",
"ssh": "root@143.198.27.163",
"strengths": ["research", "architecture", "complex", "multi-file"],
"repos": ["timmy-config", "the-nexus", "timmy-home"],
"max_concurrent": 1,
},
"bezalel": {
"type": "gateway",
"endpoint": "http://159.203.146.185:8643/v1/chat/completions",
"ssh": "root@159.203.146.185",
"strengths": ["ci", "infra", "ops", "testing"],
"repos": ["timmy-config", "hermes-agent", "the-nexus"],
"max_concurrent": 1,
},
}
# ---------------------------------------------------------------------------
# CREDENTIALS
# ---------------------------------------------------------------------------
def load_gitea_token():
"""Read Gitea token from env or file."""
token = os.environ.get("GITEA_TOKEN", "")
if token:
return token.strip()
token_path = os.path.expanduser("~/.hermes/gitea_token_vps")
try:
with open(token_path) as f:
return f.read().strip()
except FileNotFoundError:
print(f"[FATAL] No GITEA_TOKEN env and {token_path} not found")
sys.exit(1)
def load_telegram_token():
"""Read Telegram bot token from file."""
path = os.path.expanduser("~/.config/telegram/special_bot")
try:
with open(path) as f:
return f.read().strip()
except FileNotFoundError:
return ""
GITEA_TOKEN = ""
TELEGRAM_TOKEN = ""
# ---------------------------------------------------------------------------
# HTTP HELPERS (stdlib only)
# ---------------------------------------------------------------------------
def gitea_request(path, method="GET", data=None):
"""Make an authenticated Gitea API request."""
url = f"{GITEA_API}{path}"
headers = {
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json",
}
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
body_text = e.read().decode() if e.fp else ""
print(f"[API ERROR] {method} {url} -> {e.code}: {body_text[:200]}")
return None
except Exception as e:
print(f"[API ERROR] {method} {url} -> {e}")
return None
def send_telegram(message):
"""Send message to Telegram group."""
if not TELEGRAM_TOKEN:
print("[WARN] No Telegram token, skipping notification")
return False
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
data = json.dumps({
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
"parse_mode": "Markdown",
"disable_web_page_preview": True,
}).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 200
except Exception as e:
print(f"[TELEGRAM ERROR] {e}")
return False
# ---------------------------------------------------------------------------
# 1. BACKLOG READER
# ---------------------------------------------------------------------------
def fetch_issues(repo):
"""Fetch all open issues from a repo, handling pagination."""
issues = []
page = 1
while True:
result = gitea_request(
f"/repos/{GITEA_OWNER}/{repo}/issues?state=open&type=issues&limit=50&page={page}"
)
if not result:
break
issues.extend(result)
if len(result) < 50:
break
page += 1
return issues
def should_filter(issue):
"""Check if issue title contains any filter tags."""
title = issue.get("title", "").upper()
for tag in FILTER_TAGS:
if tag.upper().replace("[", "").replace("]", "") in title.replace("[", "").replace("]", ""):
return True
# Also filter pull requests
if issue.get("pull_request"):
return True
return False
def read_backlog():
"""Read and filter the full backlog across all repos."""
backlog = []
for repo in REPOS:
print(f" Fetching {repo}...")
issues = fetch_issues(repo)
for issue in issues:
if should_filter(issue):
continue
assignees = [a.get("login", "") for a in (issue.get("assignees") or [])]
labels = [l.get("name", "") for l in (issue.get("labels") or [])]
backlog.append({
"repo": repo,
"number": issue["number"],
"title": issue["title"],
"labels": labels,
"assignees": assignees,
"created_at": issue.get("created_at", ""),
"comments": issue.get("comments", 0),
"url": issue.get("html_url", ""),
})
print(f" Total actionable issues: {len(backlog)}")
return backlog
# ---------------------------------------------------------------------------
# 2. PRIORITY SCORER
# ---------------------------------------------------------------------------
def score_issue(issue):
"""Score an issue 0-100 based on priority signals."""
score = 0
title_upper = issue["title"].upper()
labels_upper = [l.upper() for l in issue["labels"]]
all_text = title_upper + " " + " ".join(labels_upper)
# Critical / Bug: +30
if any(tag in all_text for tag in ["CRITICAL", "BUG"]):
score += 30
# P0 / Urgent: +25
if any(tag in all_text for tag in ["P0", "URGENT"]):
score += 25
# P1: +15
if "P1" in all_text:
score += 15
# OPS / Security: +10
if any(tag in all_text for tag in ["OPS", "SECURITY"]):
score += 10
# Unassigned: +10
if not issue["assignees"]:
score += 10
# Age > 7 days: +5
try:
created = issue["created_at"].replace("Z", "+00:00")
created_dt = datetime.fromisoformat(created)
age_days = (datetime.now(timezone.utc) - created_dt).days
if age_days > 7:
score += 5
except (ValueError, AttributeError):
pass
# Has comments: +5
if issue["comments"] > 0:
score += 5
# Infrastructure repo: +5
if issue["repo"] == "timmy-config":
score += 5
# Already assigned to an agent: -10
if any(a.lower() in AGENT_USERNAMES for a in issue["assignees"]):
score -= 10
issue["score"] = max(0, min(100, score))
return issue
def prioritize_backlog(backlog):
"""Score and sort the backlog by priority."""
scored = [score_issue(i) for i in backlog]
scored.sort(key=lambda x: x["score"], reverse=True)
return scored
# ---------------------------------------------------------------------------
# 3. AGENT HEALTH CHECKS
# ---------------------------------------------------------------------------
def check_process(pattern):
"""Check if a local process matching pattern is running."""
try:
result = subprocess.run(
["pgrep", "-f", pattern],
capture_output=True, text=True, timeout=5
)
return result.returncode == 0
except Exception:
return False
def check_ssh_service(host, service_name):
"""Check if a remote service is running via SSH."""
try:
result = subprocess.run(
["ssh", "-o", "ConnectTimeout=5", "-o", "StrictHostKeyChecking=no",
f"root@{host}",
f"systemctl is-active {service_name} 2>/dev/null || pgrep -f {service_name}"],
capture_output=True, text=True, timeout=15
)
return result.returncode == 0
except Exception:
return False
def check_agent_health(name, agent):
"""Check if an agent is alive and available."""
if agent["type"] == "loop":
alive = check_process(f"agent-loop.*{name}")
elif agent["type"] == "gateway":
host = agent["ssh"].split("@")[1]
service = f"hermes-{name}"
alive = check_ssh_service(host, service)
else:
alive = False
return alive
def get_agent_status():
"""Get health status for all agents."""
status = {}
for name, agent in AGENTS.items():
alive = check_agent_health(name, agent)
status[name] = {
"alive": alive,
"type": agent["type"],
"strengths": agent["strengths"],
}
symbol = "UP" if alive else "DOWN"
print(f" {name}: {symbol} ({agent['type']})")
return status
# ---------------------------------------------------------------------------
# 4. DISPATCHER
# ---------------------------------------------------------------------------
def classify_issue(issue):
"""Classify issue type based on title and labels."""
title = issue["title"].upper()
labels = " ".join(issue["labels"]).upper()
all_text = title + " " + labels
types = []
if any(w in all_text for w in ["BUG", "FIX", "BROKEN", "ERROR", "CRASH"]):
types.append("bug-fix")
if any(w in all_text for w in ["OPS", "DEPLOY", "CI", "INFRA", "PIPELINE", "MONITOR"]):
types.append("ops")
if any(w in all_text for w in ["SECURITY", "AUTH", "TOKEN", "CERT"]):
types.append("ops")
if any(w in all_text for w in ["RESEARCH", "AUDIT", "INVESTIGATE", "EXPLORE"]):
types.append("research")
if any(w in all_text for w in ["ARCHITECT", "DESIGN", "REFACTOR", "REWRITE"]):
types.append("architecture")
if any(w in all_text for w in ["TEST", "TESTING", "QA", "VALIDATE"]):
types.append("testing")
if any(w in all_text for w in ["CODE", "IMPLEMENT", "ADD", "CREATE", "BUILD"]):
types.append("code")
if any(w in all_text for w in ["SMALL", "QUICK", "SIMPLE", "MINOR", "TWEAK"]):
types.append("small-changes")
if any(w in all_text for w in ["COMPLEX", "MULTI", "LARGE", "OVERHAUL"]):
types.append("complex")
if not types:
types = ["code"] # default
return types
def match_agent(issue, agent_status, dispatched_this_cycle):
"""Find the best available agent for an issue."""
issue_types = classify_issue(issue)
candidates = []
for name, agent in AGENTS.items():
# Agent must be alive
if not agent_status.get(name, {}).get("alive", False):
continue
# Agent must handle this repo
if issue["repo"] not in agent["repos"]:
continue
# Agent must not already be dispatched this cycle
if dispatched_this_cycle.get(name, 0) >= agent["max_concurrent"]:
continue
# Score match based on overlapping strengths
overlap = len(set(issue_types) & set(agent["strengths"]))
candidates.append((name, overlap))
if not candidates:
return None
# Sort by overlap score descending, return best match
candidates.sort(key=lambda x: x[1], reverse=True)
return candidates[0][0]
def assign_issue(repo, number, agent_name):
"""Assign an issue to an agent on Gitea."""
# First get current assignees to not clobber
result = gitea_request(f"/repos/{GITEA_OWNER}/{repo}/issues/{number}")
if not result:
return False
current = [a.get("login", "") for a in (result.get("assignees") or [])]
if agent_name in current:
print(f" Already assigned to {agent_name}")
return True
new_assignees = current + [agent_name]
patch_result = gitea_request(
f"/repos/{GITEA_OWNER}/{repo}/issues/{number}",
method="PATCH",
data={"assignees": new_assignees}
)
return patch_result is not None
def dispatch_to_gateway(agent_name, agent, issue):
"""Trigger work on a gateway agent via SSH."""
host = agent["ssh"]
repo = issue["repo"]
number = issue["number"]
title = issue["title"]
# Try to trigger dispatch via SSH
cmd = (
f'ssh -o ConnectTimeout=10 -o StrictHostKeyChecking=no {host} '
f'"echo \'Dispatched by orchestrator: {repo}#{number} - {title}\' '
f'>> /tmp/hermes-dispatch.log"'
)
try:
subprocess.run(cmd, shell=True, timeout=20, capture_output=True)
return True
except Exception as e:
print(f" [WARN] SSH dispatch to {agent_name} failed: {e}")
return False
def dispatch_cycle(backlog, agent_status, dry_run=False):
"""Run one dispatch cycle. Returns dispatch report."""
dispatched = []
skipped = []
dispatched_count = {} # agent_name -> count dispatched this cycle
# Only dispatch unassigned issues (or issues not assigned to agents)
for issue in backlog:
agent_assigned = any(a.lower() in AGENT_USERNAMES for a in issue["assignees"])
if agent_assigned:
skipped.append((issue, "already assigned to agent"))
continue
if issue["score"] < 5:
skipped.append((issue, "score too low"))
continue
best_agent = match_agent(issue, agent_status, dispatched_count)
if not best_agent:
skipped.append((issue, "no available agent"))
continue
if dry_run:
dispatched.append({
"agent": best_agent,
"repo": issue["repo"],
"number": issue["number"],
"title": issue["title"],
"score": issue["score"],
"dry_run": True,
})
dispatched_count[best_agent] = dispatched_count.get(best_agent, 0) + 1
continue
# Actually dispatch
print(f" Dispatching {issue['repo']}#{issue['number']} -> {best_agent}")
success = assign_issue(issue["repo"], issue["number"], best_agent)
if success:
agent = AGENTS[best_agent]
if agent["type"] == "gateway":
dispatch_to_gateway(best_agent, agent, issue)
dispatched.append({
"agent": best_agent,
"repo": issue["repo"],
"number": issue["number"],
"title": issue["title"],
"score": issue["score"],
})
dispatched_count[best_agent] = dispatched_count.get(best_agent, 0) + 1
else:
skipped.append((issue, "assignment failed"))
return dispatched, skipped
# ---------------------------------------------------------------------------
# 5. CONSOLIDATED REPORT
# ---------------------------------------------------------------------------
def generate_report(backlog, dispatched, skipped, agent_status, dry_run=False):
"""Generate dispatch cycle report."""
now = datetime.now().strftime("%Y-%m-%d %H:%M")
mode = " [DRY RUN]" if dry_run else ""
lines = []
lines.append(f"=== Sovereign Orchestrator Report{mode} ===")
lines.append(f"Time: {now}")
lines.append(f"Total backlog: {len(backlog)} issues")
lines.append("")
# Agent health
lines.append("-- Agent Health --")
for name, info in agent_status.items():
symbol = "UP" if info["alive"] else "DOWN"
lines.append(f" {name}: {symbol} ({info['type']})")
lines.append("")
# Dispatched
lines.append(f"-- Dispatched: {len(dispatched)} --")
for d in dispatched:
dry = " (dry-run)" if d.get("dry_run") else ""
lines.append(f" [{d['score']}] {d['repo']}#{d['number']} -> {d['agent']}{dry}")
lines.append(f" {d['title'][:60]}")
lines.append("")
# Skipped (top 10)
skip_summary = {}
for issue, reason in skipped:
skip_summary[reason] = skip_summary.get(reason, 0) + 1
lines.append(f"-- Skipped: {len(skipped)} --")
for reason, count in sorted(skip_summary.items(), key=lambda x: -x[1]):
lines.append(f" {reason}: {count}")
lines.append("")
# Top 5 unassigned
unassigned = [i for i in backlog if not i["assignees"]][:5]
lines.append("-- Top 5 Unassigned (by priority) --")
for i in unassigned:
lines.append(f" [{i['score']}] {i['repo']}#{i['number']}: {i['title'][:55]}")
lines.append("")
report = "\n".join(lines)
return report
def format_telegram_report(backlog, dispatched, skipped, agent_status, dry_run=False):
"""Format a compact Telegram message."""
mode = " DRY RUN" if dry_run else ""
now = datetime.now().strftime("%H:%M")
parts = [f"*Orchestrator{mode}* ({now})"]
parts.append(f"Backlog: {len(backlog)} | Dispatched: {len(dispatched)} | Skipped: {len(skipped)}")
# Agent status line
agent_line = " | ".join(
f"{'' if v['alive'] else ''}{k}" for k, v in agent_status.items()
)
parts.append(agent_line)
if dispatched:
parts.append("")
parts.append("*Dispatched:*")
for d in dispatched[:5]:
dry = " 🔍" if d.get("dry_run") else ""
parts.append(f" `{d['repo']}#{d['number']}` → {d['agent']}{dry}")
# Top unassigned
unassigned = [i for i in backlog if not i["assignees"]][:3]
if unassigned:
parts.append("")
parts.append("*Top unassigned:*")
for i in unassigned:
parts.append(f" [{i['score']}] `{i['repo']}#{i['number']}` {i['title'][:40]}")
return "\n".join(parts)
# ---------------------------------------------------------------------------
# 6. MAIN
# ---------------------------------------------------------------------------
def run_cycle(dry_run=False):
"""Execute one full orchestration cycle."""
global GITEA_TOKEN, TELEGRAM_TOKEN
GITEA_TOKEN = load_gitea_token()
TELEGRAM_TOKEN = load_telegram_token()
print("\n[1/4] Reading backlog...")
backlog = read_backlog()
print("\n[2/4] Scoring and prioritizing...")
backlog = prioritize_backlog(backlog)
for i in backlog[:10]:
print(f" [{i['score']:3d}] {i['repo']}/{i['number']}: {i['title'][:55]}")
print("\n[3/4] Checking agent health...")
agent_status = get_agent_status()
print("\n[4/4] Dispatching...")
dispatched, skipped = dispatch_cycle(backlog, agent_status, dry_run=dry_run)
# Generate reports
report = generate_report(backlog, dispatched, skipped, agent_status, dry_run=dry_run)
print("\n" + report)
# Send Telegram notification
if dispatched or not dry_run:
tg_msg = format_telegram_report(backlog, dispatched, skipped, agent_status, dry_run=dry_run)
send_telegram(tg_msg)
return backlog, dispatched, skipped
def main():
import argparse
parser = argparse.ArgumentParser(description="Sovereign Orchestrator v1")
parser.add_argument("--once", action="store_true", help="Single dispatch cycle")
parser.add_argument("--daemon", action="store_true", help="Run every 15 min")
parser.add_argument("--dry-run", action="store_true", help="Score/report only, no dispatch")
parser.add_argument("--interval", type=int, default=DAEMON_INTERVAL,
help=f"Daemon interval in seconds (default: {DAEMON_INTERVAL})")
args = parser.parse_args()
if not any([args.once, args.daemon, args.dry_run]):
args.dry_run = True # safe default
print("[INFO] No mode specified, defaulting to --dry-run")
print("=" * 60)
print(" SOVEREIGN ORCHESTRATOR v1")
print("=" * 60)
if args.daemon:
print(f"[DAEMON] Running every {args.interval}s (Ctrl+C to stop)")
cycle = 0
while True:
cycle += 1
print(f"\n--- Cycle {cycle} ---")
try:
run_cycle(dry_run=args.dry_run)
except Exception as e:
print(f"[ERROR] Cycle failed: {e}")
print(f"[DAEMON] Sleeping {args.interval}s...")
time.sleep(args.interval)
else:
run_cycle(dry_run=args.dry_run)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,10 @@
{
"name": "Bezalel Builder Wizard",
"role": "Artificer",
"capabilities": [
"provisioning",
"gpu-orchestration",
"model-serving"
],
"instructions": "Take the provided keys and use them to get GPUs for big models (Gemma 4). Wire them into the harness."
}