Compare commits

..

33 Commits

Author SHA1 Message Date
8daa12c518 Merge branch 'main' into timmy/fleet-phase3-5 2026-04-08 10:39:01 +00:00
1705a7b802 Merge pull request 'feat: FLEET-010/011/012 — Phase 3-5 cross-agent delegation, model pipeline, lifecycle' (#365) from timmy/fleet-phase3-5 into main 2026-04-08 10:38:08 +00:00
e0bef949dd Merge branch 'main' into timmy/fleet-phase3-5 2026-04-08 10:37:56 +00:00
4844ce6238 Merge pull request 'feat: Bezalel Builder Wizard — Sidecar Authority Update' (#364) from feat/bezalel-wizard-sidecar-v2 into main 2026-04-08 10:37:34 +00:00
a43510a7eb Merge branch 'main' into feat/bezalel-wizard-sidecar-v2 2026-04-08 10:37:25 +00:00
74867bbfa7 Merge pull request 'art: The Timmy Foundation — Visual Story (24 images + 2 videos)' (#366) from timmy/gallery-submission into main 2026-04-08 10:16:35 +00:00
2812bac438 Merge branch 'main' into timmy/gallery-submission 2026-04-08 10:16:04 +00:00
5c15704c3a Merge branch 'main' into timmy/fleet-phase3-5 2026-04-08 10:15:55 +00:00
30fdbef74e Merge branch 'main' into feat/bezalel-wizard-sidecar-v2 2026-04-08 10:15:49 +00:00
9cc2cf8f8d Merge pull request 'feat: Sovereign Memory Store — zero-API durable memory (SQLite + FTS5 + HRR)' (#380) from perplexity/sovereign-memory-store into main 2026-04-08 10:14:36 +00:00
a2eff1222b Merge branch 'main' into perplexity/sovereign-memory-store 2026-04-08 10:14:24 +00:00
3f4465b646 Merge pull request '[SOVEREIGN] Orchestrator v1 — backlog reader, priority scorer, agent dispatcher' (#362) from timmy/sovereign-orchestrator-v1 into main 2026-04-08 10:14:16 +00:00
f04aaec4ed Merge branch 'main' into timmy/gallery-submission 2026-04-08 10:13:57 +00:00
d54a218a27 Merge branch 'main' into timmy/fleet-phase3-5 2026-04-08 10:13:44 +00:00
3cc92fde1a Merge branch 'main' into feat/bezalel-wizard-sidecar-v2 2026-04-08 10:13:34 +00:00
11a28b74bb Merge branch 'main' into timmy/sovereign-orchestrator-v1 2026-04-08 10:13:21 +00:00
perplexity
593621c5e0 feat: sovereign memory store — zero-API durable memory (SQLite + FTS5 + HRR)
Implements the missing pieces of the MemPalace epic (#367):

- sovereign_store.py: Self-contained memory store replacing the third-party
  mempalace CLI and its ONNX dependency. Uses:
  * SQLite + FTS5 for keyword search (porter stemmer, unicode61)
  * HRR phase vectors (SHA-256 deterministic, numpy optional) for semantic similarity
  * Reciprocal Rank Fusion to merge keyword and semantic rankings
  * Trust scoring with boost/decay lifecycle
  * Room-based organization matching the existing PalaceRoom model

- promotion.py (MP-4, #371): Quality-gated scratchpad-to-palace promotion.
  Four heuristic gates, no LLM call:
  1. Length gate (min 5 words, max 500)
  2. Structure gate (rejects fragments and pure code)
  3. Duplicate gate (FTS5 + Jaccard overlap detection)
  4. Staleness gate (7-day threshold for old notes)
  Includes force override, batch promotion, and audit logging.

- 21 unit tests covering HRR vectors, store operations, search,
  trust lifecycle, and all promotion gates.

Zero external dependencies. Zero API calls. Zero cloud.

Refs: #367 #370 #371
2026-04-07 22:41:37 +00:00
458dabfaed Merge pull request 'feat: MemPalace integration — skill port, retrieval enforcer, wake-up protocol (#367)' (#374) from timmy/mempalace-integration into main
Reviewed-on: #374
2026-04-07 21:45:34 +00:00
Alexander Whitestone
f8dabae8eb feat: MemPalace integration — skill port, retrieval enforcer, wake-up protocol (#367)
MP-1 (#368): Port PalaceRoom + Mempalace classes with 22 unit tests
MP-2 (#369): L0-L5 retrieval order enforcer with recall-query detection
MP-5 (#372): Wake-up protocol (300-900 token context), session scratchpad

Modules:
- mempalace.py: PalaceRoom + Mempalace dataclasses, factory constructors
- retrieval_enforcer.py: Layered memory retrieval (identity → palace → scratch → gitea → skills)
- wakeup.py: Session wake-up with caching (5min TTL)
- scratchpad.py: JSON-based session notes with palace promotion

All 65 tests pass. Pure stdlib + graceful degradation for ONNX issues (#373).
2026-04-07 13:15:07 -04:00
Alexander Whitestone
0a4c8f2d37 art: The Timmy Foundation visual story — 24 images, 2 videos, generated with Grok Imagine 2026-04-07 12:46:17 -04:00
Alexander Whitestone
0a13347e39 feat: FLEET-010/011/012 — Phase 3 and 4 fleet capabilities
FLEET-010: Cross-agent task delegation protocol
- Keyword-based heuristic assigns unassigned issues to agents
- Supports: claw-code, gemini, ezra, bezalel, timmy
- Delegation logging and status dashboard
- Auto-comments on assigned issues

FLEET-011: Local model pipeline and fallback chain
- Checks Ollama reachability and model availability
- 4-model chain: hermes4:14b -> qwen2.5:7b -> phi3:3.8b -> gemma3:1b
- Tests each model with live inference on every run
- Fallback verification: finds first responding model
- Chain configuration via ~/.local/timmy/fleet-resources/model-chain.json

FLEET-012: Agent lifecycle manager
- Full lifecycle: provision -> deploy -> monitor -> retire
- Heartbeat detection with 24h idle threshold
- Task completion/failure tracking
- Agent Fleet Status dashboard

Fixes timmy-home#563 (delegation), #564 (model pipeline), #565 (lifecycle)
2026-04-07 12:43:10 -04:00
dc75be18e4 feat: add Bezalel Builder Wizard sidecar configuration 2026-04-07 16:39:42 +00:00
0c950f991c Merge pull request '[ORCHESTRATOR-4] Evaluate CrewAI for Phase 2 integration' (#361) from ezra/issue-358 into main 2026-04-07 16:35:40 +00:00
Alexander Whitestone
7399c83024 fix: null guard on assignees in orchestrator dispatch 2026-04-07 12:34:02 -04:00
Alexander Whitestone
cf213bffd1 [SOVEREIGN] Add Orchestrator v1 — backlog reader, priority scorer, agent dispatcher
Resolves #355 #356

Components:
- orchestrator.py: Full sovereign orchestrator with 6 subsystems
  1. Backlog reader (fetches from timmy-config, the-nexus, timmy-home)
  2. Priority scorer (0-100 based on severity, age, assignment state)
  3. Agent roster (groq/ezra/bezalel with health checks)
  4. Dispatcher (matches issues to agents by type/strength)
  5. Consolidated report (terminal + Telegram)
  6. Main loop (--once, --daemon, --dry-run)
- orchestrate.sh: Shell wrapper with env setup

Dry-run tested: 348 issues scanned, 3 agents detected UP.
stdlib only, no pip dependencies.
2026-04-07 12:31:14 -04:00
ezra
fe7c5018e3 eval(crewai): PoC crew + evaluation for Phase 2 integration
- Install CrewAI v1.13.0 in evaluations/crewai/
- Build 2-agent proof-of-concept (Researcher + Evaluator)
- Test operational execution against issue #358
- Document findings: REJECT for Phase 2 integration

CrewAI's 500+ MB dependency footprint, memory-model drift
from Gitea-as-truth, and external API fragility outweigh
its agent-role syntax benefits. Recommend evolving the
existing Huey stack instead.

Closes #358
2026-04-07 16:25:21 +00:00
c1c3aaa681 Merge pull request 'feat: genchi-genbutsu — verify world state, not log vibes (#348)' (#360) from ezra/issue-348 into main 2026-04-07 16:23:35 +00:00
d023512858 Merge pull request 'feat: FLEET-003 - Fleet capacity inventory with resource baselines' (#353) from timmy/fleet-capacity-inventory into main 2026-04-07 16:23:22 +00:00
e5e01e36c9 Merge pull request '[KAIZEN] Automated retrospective after every burn cycle (fixes #349)' (#352) from ezra/issue-349 into main 2026-04-07 16:23:17 +00:00
ezra
e5055d269b feat: genchi-genbutsu — verify world state, not log vibes (#348)
Implement 現地現物 (Genchi Genbutsu) post-completion verification:

- Add bin/genchi-genbutsu.sh performing 5 world-state checks:
  1. Branch exists on remote
  2. PR exists
  3. PR has real file changes (> 0)
  4. PR is mergeable
  5. Issue has a completion comment from the agent

- Wire verification into all agent loops:
  - bin/claude-loop.sh: call genchi-genbutsu before merge/close
  - bin/gemini-loop.sh: delegate existing inline checks to genchi-genbutsu
  - bin/agent-loop.sh: resurrect generic agent loop with genchi-genbutsu wired in

- Update metrics JSONL to include 'verified' field for all loops

- Update burn monitor (tasks.py velocity_tracking):
  - Report verified_completion count alongside raw completions
  - Dashboard shows verified trend history

- Update morning report (tasks.py good_morning_report):
  - Count only verified completions from the last 24h
  - Surface verification failures in the report body

Fixes #348
Refs #345
2026-04-07 16:12:05 +00:00
Alexander Whitestone
277d21aef6 feat: FLEET-007 — Auto-restart agent (self-healing processes)
Daemon that monitors key services and restarts them automatically:
- Local: hermes-gateway, ollama, codeclaw-heartbeat
- Ezra: gitea, nginx, hermes-agent
- Allegro hermes-agent
- Bezalel: hermes-agent, evennia
- Max 3 restart attempts per service per cycle (prevents loops)
- 1-hour cooldown after max retries with Telegram escalation
- Restart log at ~/.local/timmy/fleet-health/restarts.log
- Modes: check now (--status for history, --daemon for continuous)

Fixes timmy-home#560
2026-04-07 12:04:33 -04:00
Ezra
2e64b160b5 [KAIZEN] Harden retro scheduling, chunking, and tests (#349)
- Add Kaizen Retro to cron/jobs.json with explicit local model/provider
- Add Telegram message chunking for reports approaching the 4096-char limit
- Fix classify_issue_type false positives on short substrings (ci in cleanup)
- Add 28 unit tests covering classification, max-attempts detection,
  suggestion generation, report formatting, and Telegram chunking
2026-04-07 15:58:58 +00:00
Ezra
f18955ea90 [KAIZEN] Implement automated burn-cycle retrospective (fixes #349)
- Add bin/kaizen-retro.sh entry point and scripts/kaizen_retro.py
- Analyze closed issues, merged PRs, and stale/max-attempts issues
- Report success rates by agent, repo, and issue type
- Generate one concrete improvement suggestion per cycle
- Post retro to Telegram and comment on the latest morning report issue
- Wire into Huey as kaizen_retro() task at 07:15 daily
- Extend gitea_client.py with since param for list_issues and
  created_at/updated_at fields on PullRequest
2026-04-07 15:57:21 +00:00
62 changed files with 5535 additions and 84 deletions

11
.gitignore vendored
View File

@@ -1,9 +1,8 @@
# Secrets
*.token
*.key
*.secret
# Local state
*.pyc
*.pyo
*.egg-info/
dist/
build/
*.db
*.db-wal
*.db-shm

273
bin/agent-loop.sh Executable file
View File

@@ -0,0 +1,273 @@
#!/usr/bin/env bash
# agent-loop.sh — Universal agent dev loop with Genchi Genbutsu verification
#
# Usage: agent-loop.sh <agent-name> [num-workers]
# agent-loop.sh claude 2
# agent-loop.sh gemini 1
#
# Dispatches via agent-dispatch.sh, then verifies with genchi-genbutsu.sh.
set -uo pipefail
AGENT="${1:?Usage: agent-loop.sh <agent-name> [num-workers]}"
NUM_WORKERS="${2:-1}"
# Resolve agent tool and model from config or fallback
case "$AGENT" in
claude) TOOL="claude"; MODEL="sonnet" ;;
gemini) TOOL="gemini"; MODEL="gemini-2.5-pro-preview-05-06" ;;
grok) TOOL="opencode"; MODEL="grok-3-fast" ;;
*) TOOL="$AGENT"; MODEL="" ;;
esac
# === CONFIG ===
GITEA_URL="${GITEA_URL:-https://forge.alexanderwhitestone.com}"
GITEA_TOKEN="${GITEA_TOKEN:-}"
WORKTREE_BASE="$HOME/worktrees"
LOG_DIR="$HOME/.hermes/logs"
LOCK_DIR="$LOG_DIR/${AGENT}-locks"
SKIP_FILE="$LOG_DIR/${AGENT}-skip-list.json"
ACTIVE_FILE="$LOG_DIR/${AGENT}-active.json"
TIMEOUT=600
COOLDOWN=30
mkdir -p "$LOG_DIR" "$WORKTREE_BASE" "$LOCK_DIR"
[ -f "$SKIP_FILE" ] || echo '{}' > "$SKIP_FILE"
echo '{}' > "$ACTIVE_FILE"
# === SHARED FUNCTIONS ===
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] ${AGENT}: $*" >> "$LOG_DIR/${AGENT}-loop.log"
}
lock_issue() {
local key="$1"
mkdir "$LOCK_DIR/$key.lock" 2>/dev/null && echo $$ > "$LOCK_DIR/$key.lock/pid"
}
unlock_issue() {
rm -rf "$LOCK_DIR/$1.lock" 2>/dev/null
}
mark_skip() {
local issue_num="$1" reason="$2"
python3 -c "
import json, time, fcntl
with open('${SKIP_FILE}', 'r+') as f:
fcntl.flock(f, fcntl.LOCK_EX)
try: skips = json.load(f)
except: skips = {}
failures = skips.get(str($issue_num), {}).get('failures', 0) + 1
skip_hours = 6 if failures >= 3 else 1
skips[str($issue_num)] = {'until': time.time() + (skip_hours * 3600), 'reason': '$reason', 'failures': failures}
f.seek(0); f.truncate()
json.dump(skips, f, indent=2)
" 2>/dev/null
}
get_next_issue() {
python3 -c "
import json, sys, time, urllib.request, os
token = '${GITEA_TOKEN}'
base = '${GITEA_URL}'
repos = ['Timmy_Foundation/the-nexus', 'Timmy_Foundation/timmy-config', 'Timmy_Foundation/hermes-agent']
try:
with open('${SKIP_FILE}') as f: skips = json.load(f)
except: skips = {}
try:
with open('${ACTIVE_FILE}') as f: active = json.load(f); active_issues = {v['issue'] for v in active.values()}
except: active_issues = set()
all_issues = []
for repo in repos:
url = f'{base}/api/v1/repos/{repo}/issues?state=open&type=issues&limit=50&sort=created'
req = urllib.request.Request(url, headers={'Authorization': f'token {token}'})
try:
resp = urllib.request.urlopen(req, timeout=10)
issues = json.loads(resp.read())
for i in issues: i['_repo'] = repo
all_issues.extend(issues)
except: continue
for i in sorted(all_issues, key=lambda x: x['title'].lower()):
assignees = [a['login'] for a in (i.get('assignees') or [])]
if assignees and '${AGENT}' not in assignees: continue
num_str = str(i['number'])
if num_str in active_issues: continue
if skips.get(num_str, {}).get('until', 0) > time.time(): continue
lock = '${LOCK_DIR}/' + i['_repo'].replace('/', '-') + '-' + num_str + '.lock'
if os.path.isdir(lock): continue
owner, name = i['_repo'].split('/')
print(json.dumps({'number': i['number'], 'title': i['title'], 'repo_owner': owner, 'repo_name': name, 'repo': i['_repo']}))
sys.exit(0)
print('null')
" 2>/dev/null
}
# === WORKER FUNCTION ===
run_worker() {
local worker_id="$1"
log "WORKER-${worker_id}: Started"
while true; do
issue_json=$(get_next_issue)
if [ "$issue_json" = "null" ] || [ -z "$issue_json" ]; then
sleep 30
continue
fi
issue_num=$(echo "$issue_json" | python3 -c "import sys,json; print(json.load(sys.stdin)['number'])")
issue_title=$(echo "$issue_json" | python3 -c "import sys,json; print(json.load(sys.stdin)['title'])")
repo_owner=$(echo "$issue_json" | python3 -c "import sys,json; print(json.load(sys.stdin)['repo_owner'])")
repo_name=$(echo "$issue_json" | python3 -c "import sys,json; print(json.load(sys.stdin)['repo_name'])")
issue_key="${repo_owner}-${repo_name}-${issue_num}"
branch="${AGENT}/issue-${issue_num}"
worktree="${WORKTREE_BASE}/${AGENT}-w${worker_id}-${issue_num}"
if ! lock_issue "$issue_key"; then
sleep 5
continue
fi
log "WORKER-${worker_id}: === ISSUE #${issue_num}: ${issue_title} (${repo_owner}/${repo_name}) ==="
# Clone / checkout
rm -rf "$worktree" 2>/dev/null
CLONE_URL="http://${AGENT}:${GITEA_TOKEN}@143.198.27.163:3000/${repo_owner}/${repo_name}.git"
if git ls-remote --heads "$CLONE_URL" "$branch" 2>/dev/null | grep -q "$branch"; then
git clone --depth=50 -b "$branch" "$CLONE_URL" "$worktree" >/dev/null 2>&1
else
git clone --depth=1 -b main "$CLONE_URL" "$worktree" >/dev/null 2>&1
cd "$worktree" && git checkout -b "$branch" >/dev/null 2>&1
fi
cd "$worktree"
# Generate prompt
prompt=$(bash "$(dirname "$0")/agent-dispatch.sh" "$AGENT" "$issue_num" "${repo_owner}/${repo_name}")
CYCLE_START=$(date +%s)
set +e
if [ "$TOOL" = "claude" ]; then
env -u CLAUDECODE gtimeout "$TIMEOUT" claude \
--print --model "$MODEL" --dangerously-skip-permissions \
-p "$prompt" </dev/null >> "$LOG_DIR/${AGENT}-${issue_num}.log" 2>&1
elif [ "$TOOL" = "gemini" ]; then
gtimeout "$TIMEOUT" gemini -p "$prompt" --yolo \
</dev/null >> "$LOG_DIR/${AGENT}-${issue_num}.log" 2>&1
else
gtimeout "$TIMEOUT" "$TOOL" "$prompt" \
</dev/null >> "$LOG_DIR/${AGENT}-${issue_num}.log" 2>&1
fi
exit_code=$?
set -e
CYCLE_END=$(date +%s)
CYCLE_DURATION=$((CYCLE_END - CYCLE_START))
# Salvage
cd "$worktree" 2>/dev/null || true
DIRTY=$(git status --porcelain 2>/dev/null | wc -l | tr -d ' ')
if [ "${DIRTY:-0}" -gt 0 ]; then
git add -A 2>/dev/null
git commit -m "WIP: ${AGENT} progress on #${issue_num}
Automated salvage commit — agent session ended (exit $exit_code)." 2>/dev/null || true
fi
UNPUSHED=$(git log --oneline "origin/main..HEAD" 2>/dev/null | wc -l | tr -d ' ')
if [ "${UNPUSHED:-0}" -gt 0 ]; then
git push -u origin "$branch" 2>/dev/null && \
log "WORKER-${worker_id}: Pushed $UNPUSHED commit(s) on $branch" || \
log "WORKER-${worker_id}: Push failed for $branch"
fi
# Create PR if needed
pr_num=$(curl -sf "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/pulls?state=open&head=${repo_owner}:${branch}&limit=1" \
-H "Authorization: token ${GITEA_TOKEN}" | python3 -c "
import sys,json
prs = json.load(sys.stdin)
print(prs[0]['number'] if prs else '')
" 2>/dev/null)
if [ -z "$pr_num" ] && [ "${UNPUSHED:-0}" -gt 0 ]; then
pr_num=$(curl -sf -X POST "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/pulls" \
-H "Authorization: token ${GITEA_TOKEN}" \
-H "Content-Type: application/json" \
-d "$(python3 -c "
import json
print(json.dumps({
'title': '${AGENT}: Issue #${issue_num}',
'head': '${branch}',
'base': 'main',
'body': 'Automated PR for issue #${issue_num}.\nExit code: ${exit_code}'
}))
")" | python3 -c "import sys,json; print(json.load(sys.stdin).get('number',''))" 2>/dev/null)
[ -n "$pr_num" ] && log "WORKER-${worker_id}: Created PR #${pr_num} for issue #${issue_num}"
fi
# ── Genchi Genbutsu: verify world state before declaring success ──
VERIFIED="false"
if [ "$exit_code" -eq 0 ]; then
log "WORKER-${worker_id}: SUCCESS #${issue_num} — running genchi-genbutsu"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
if verify_result=$("$SCRIPT_DIR/genchi-genbutsu.sh" "$repo_owner" "$repo_name" "$issue_num" "$branch" "$AGENT" 2>/dev/null); then
VERIFIED="true"
log "WORKER-${worker_id}: VERIFIED #${issue_num}"
if [ -n "$pr_num" ]; then
curl -sf -X POST "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/pulls/${pr_num}/merge" \
-H "Authorization: token ${GITEA_TOKEN}" \
-H "Content-Type: application/json" \
-d '{"Do": "squash"}' >/dev/null 2>&1 || true
curl -sf -X PATCH "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/issues/${issue_num}" \
-H "Authorization: token ${GITEA_TOKEN}" \
-H "Content-Type: application/json" \
-d '{"state": "closed"}' >/dev/null 2>&1 || true
log "WORKER-${worker_id}: PR #${pr_num} merged, issue #${issue_num} closed"
fi
consecutive_failures=0
else
verify_details=$(echo "$verify_result" | python3 -c "import sys,json; print(json.load(sys.stdin).get('details','unknown'))" 2>/dev/null || echo "unverified")
log "WORKER-${worker_id}: UNVERIFIED #${issue_num}$verify_details"
mark_skip "$issue_num" "unverified" 1
consecutive_failures=$((consecutive_failures + 1))
fi
elif [ "$exit_code" -eq 124 ]; then
log "WORKER-${worker_id}: TIMEOUT #${issue_num} (work saved in PR)"
consecutive_failures=$((consecutive_failures + 1))
else
log "WORKER-${worker_id}: FAILED #${issue_num} exit ${exit_code} (work saved in PR)"
consecutive_failures=$((consecutive_failures + 1))
fi
# ── METRICS ──
python3 -c "
import json, datetime
print(json.dumps({
'ts': datetime.datetime.utcnow().isoformat() + 'Z',
'agent': '${AGENT}',
'worker': $worker_id,
'issue': $issue_num,
'repo': '${repo_owner}/${repo_name}',
'outcome': 'success' if $exit_code == 0 else 'timeout' if $exit_code == 124 else 'failed',
'exit_code': $exit_code,
'duration_s': $CYCLE_DURATION,
'pr': '${pr_num:-}',
'verified': ${VERIFIED:-false}
}))
" >> "$LOG_DIR/${AGENT}-metrics.jsonl" 2>/dev/null
rm -rf "$worktree" 2>/dev/null
unlock_issue "$issue_key"
sleep "$COOLDOWN"
done
}
# === MAIN ===
log "=== Agent Loop Started — ${AGENT} with ${NUM_WORKERS} worker(s) ==="
rm -rf "$LOCK_DIR"/*.lock 2>/dev/null
for i in $(seq 1 "$NUM_WORKERS"); do
run_worker "$i" &
log "Launched worker $i (PID $!)"
sleep 3
done
wait

View File

@@ -468,24 +468,32 @@ print(json.dumps({
[ -n "$pr_num" ] && log "WORKER-${worker_id}: Created PR #${pr_num} for issue #${issue_num}"
fi
# ── Merge + close on success ──
# ── Genchi Genbutsu: verify world state before declaring success ──
VERIFIED="false"
if [ "$exit_code" -eq 0 ]; then
log "WORKER-${worker_id}: SUCCESS #${issue_num}"
if [ -n "$pr_num" ]; then
curl -sf -X POST "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/pulls/${pr_num}/merge" \
-H "Authorization: token ${GITEA_TOKEN}" \
-H "Content-Type: application/json" \
-d '{"Do": "squash"}' >/dev/null 2>&1 || true
curl -sf -X PATCH "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/issues/${issue_num}" \
-H "Authorization: token ${GITEA_TOKEN}" \
-H "Content-Type: application/json" \
-d '{"state": "closed"}' >/dev/null 2>&1 || true
log "WORKER-${worker_id}: PR #${pr_num} merged, issue #${issue_num} closed"
log "WORKER-${worker_id}: SUCCESS #${issue_num} — running genchi-genbutsu"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
if verify_result=$("$SCRIPT_DIR/genchi-genbutsu.sh" "$repo_owner" "$repo_name" "$issue_num" "$branch" "claude" 2>/dev/null); then
VERIFIED="true"
log "WORKER-${worker_id}: VERIFIED #${issue_num}"
if [ -n "$pr_num" ]; then
curl -sf -X POST "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/pulls/${pr_num}/merge" \
-H "Authorization: token ${GITEA_TOKEN}" \
-H "Content-Type: application/json" \
-d '{"Do": "squash"}' >/dev/null 2>&1 || true
curl -sf -X PATCH "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/issues/${issue_num}" \
-H "Authorization: token ${GITEA_TOKEN}" \
-H "Content-Type: application/json" \
-d '{"state": "closed"}' >/dev/null 2>&1 || true
log "WORKER-${worker_id}: PR #${pr_num} merged, issue #${issue_num} closed"
fi
consecutive_failures=0
else
verify_details=$(echo "$verify_result" | python3 -c "import sys,json; print(json.load(sys.stdin).get('details','unknown'))" 2>/dev/null || echo "unverified")
log "WORKER-${worker_id}: UNVERIFIED #${issue_num}$verify_details"
consecutive_failures=$((consecutive_failures + 1))
fi
consecutive_failures=0
elif [ "$exit_code" -eq 124 ]; then
log "WORKER-${worker_id}: TIMEOUT #${issue_num} (work saved in PR)"
consecutive_failures=$((consecutive_failures + 1))
@@ -522,6 +530,7 @@ print(json.dumps({
import json, datetime
print(json.dumps({
'ts': datetime.datetime.utcnow().isoformat() + 'Z',
'agent': 'claude',
'worker': $worker_id,
'issue': $issue_num,
'repo': '${repo_owner}/${repo_name}',
@@ -534,7 +543,8 @@ print(json.dumps({
'lines_removed': ${LINES_REMOVED:-0},
'salvaged': ${DIRTY:-0},
'pr': '${pr_num:-}',
'merged': $( [ '$OUTCOME' = 'success' ] && [ -n '${pr_num:-}' ] && echo 'true' || echo 'false' )
'merged': $( [ '$OUTCOME' = 'success' ] && [ -n '${pr_num:-}' ] && echo 'true' || echo 'false' ),
'verified': ${VERIFIED:-false}
}))
" >> "$METRICS_FILE" 2>/dev/null

View File

@@ -521,61 +521,63 @@ print(json.dumps({
[ -n "$pr_num" ] && log "WORKER-${worker_id}: Created PR #${pr_num} for issue #${issue_num}"
fi
# ── Verify finish semantics / classify failures ──
# ── Genchi Genbutsu: verify world state before declaring success ──
VERIFIED="false"
if [ "$exit_code" -eq 0 ]; then
log "WORKER-${worker_id}: SUCCESS #${issue_num} exited 0 — verifying push + PR + proof"
if ! remote_branch_exists "$branch"; then
log "WORKER-${worker_id}: BLOCKED #${issue_num} remote branch missing"
post_issue_comment "$repo_owner" "$repo_name" "$issue_num" "Loop gate blocked completion: remote branch ${branch} was not found on origin after Gemini exited. Issue remains open for retry."
mark_skip "$issue_num" "missing_remote_branch" 1
consecutive_failures=$((consecutive_failures + 1))
elif [ -z "$pr_num" ]; then
log "WORKER-${worker_id}: BLOCKED #${issue_num} no PR found"
post_issue_comment "$repo_owner" "$repo_name" "$issue_num" "Loop gate blocked completion: branch ${branch} exists remotely, but no PR was found. Issue remains open for retry."
mark_skip "$issue_num" "missing_pr" 1
consecutive_failures=$((consecutive_failures + 1))
log "WORKER-${worker_id}: SUCCESS #${issue_num} exited 0 — running genchi-genbutsu"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
if verify_result=$("$SCRIPT_DIR/genchi-genbutsu.sh" "$repo_owner" "$repo_name" "$issue_num" "$branch" "gemini" 2>/dev/null); then
VERIFIED="true"
log "WORKER-${worker_id}: VERIFIED #${issue_num}"
pr_state=$(get_pr_state "$repo_owner" "$repo_name" "$pr_num")
if [ "$pr_state" = "open" ]; then
curl -sf -X POST "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/pulls/${pr_num}/merge" \
-H "Authorization: token ${GITEA_TOKEN}" \
-H "Content-Type: application/json" \
-d '{"Do": "squash"}' >/dev/null 2>&1 || true
pr_state=$(get_pr_state "$repo_owner" "$repo_name" "$pr_num")
fi
if [ "$pr_state" = "merged" ]; then
curl -sf -X PATCH "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/issues/${issue_num}" \
-H "Authorization: token ${GITEA_TOKEN}" \
-H "Content-Type: application/json" \
-d '{"state": "closed"}' >/dev/null 2>&1 || true
issue_state=$(get_issue_state "$repo_owner" "$repo_name" "$issue_num")
if [ "$issue_state" = "closed" ]; then
log "WORKER-${worker_id}: VERIFIED #${issue_num} branch pushed, PR merged, comment present, issue closed"
consecutive_failures=0
else
log "WORKER-${worker_id}: BLOCKED #${issue_num} issue did not close after merge"
mark_skip "$issue_num" "issue_close_unverified" 1
consecutive_failures=$((consecutive_failures + 1))
fi
else
log "WORKER-${worker_id}: BLOCKED #${issue_num} merge not verified (state=${pr_state})"
mark_skip "$issue_num" "merge_unverified" 1
consecutive_failures=$((consecutive_failures + 1))
fi
else
pr_files=$(get_pr_file_count "$repo_owner" "$repo_name" "$pr_num")
if [ "${pr_files:-0}" -eq 0 ]; then
log "WORKER-${worker_id}: BLOCKED #${issue_num} PR #${pr_num} has 0 changed files"
curl -sf -X PATCH "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/pulls/${pr_num}" -H "Authorization: token ${GITEA_TOKEN}" -H "Content-Type: application/json" -d '{"state": "closed"}' >/dev/null 2>&1 || true
verify_details=$(echo "$verify_result" | python3 -c "import sys,json; print(json.load(sys.stdin).get('details','unknown'))" 2>/dev/null || echo "unverified")
verify_checks=$(echo "$verify_result" | python3 -c "import sys,json; print(json.load(sys.stdin).get('checks',''))" 2>/dev/null || echo "")
log "WORKER-${worker_id}: UNVERIFIED #${issue_num} $verify_details"
if echo "$verify_checks" | grep -q '"branch": false'; then
post_issue_comment "$repo_owner" "$repo_name" "$issue_num" "Loop gate blocked completion: remote branch ${branch} was not found on origin after Gemini exited. Issue remains open for retry."
mark_skip "$issue_num" "missing_remote_branch" 1
elif echo "$verify_checks" | grep -q '"pr": false'; then
post_issue_comment "$repo_owner" "$repo_name" "$issue_num" "Loop gate blocked completion: branch ${branch} exists remotely, but no PR was found. Issue remains open for retry."
mark_skip "$issue_num" "missing_pr" 1
elif echo "$verify_checks" | grep -q '"files": false'; then
curl -sf -X PATCH "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/pulls/${pr_num}" \
-H "Authorization: token ${GITEA_TOKEN}" \
-H "Content-Type: application/json" \
-d '{"state": "closed"}' >/dev/null 2>&1 || true
post_issue_comment "$repo_owner" "$repo_name" "$issue_num" "PR #${pr_num} was closed automatically: it had 0 changed files (empty commit). Issue remains open for retry."
mark_skip "$issue_num" "empty_commit" 2
consecutive_failures=$((consecutive_failures + 1))
else
proof_status=$(proof_comment_status "$repo_owner" "$repo_name" "$issue_num" "$branch")
proof_state="${proof_status%%|*}"
proof_url="${proof_status#*|}"
if [ "$proof_state" != "ok" ]; then
log "WORKER-${worker_id}: BLOCKED #${issue_num} proof missing or incomplete (${proof_state})"
post_issue_comment "$repo_owner" "$repo_name" "$issue_num" "Loop gate blocked completion: PR #${pr_num} exists and has ${pr_files} changed file(s), but the required Proof block from Gemini is missing or incomplete. Issue remains open for retry."
mark_skip "$issue_num" "missing_proof" 1
consecutive_failures=$((consecutive_failures + 1))
else
log "WORKER-${worker_id}: PROOF verified ${proof_url}"
pr_state=$(get_pr_state "$repo_owner" "$repo_name" "$pr_num")
if [ "$pr_state" = "open" ]; then
curl -sf -X POST "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/pulls/${pr_num}/merge" -H "Authorization: token ${GITEA_TOKEN}" -H "Content-Type: application/json" -d '{"Do": "squash"}' >/dev/null 2>&1 || true
pr_state=$(get_pr_state "$repo_owner" "$repo_name" "$pr_num")
fi
if [ "$pr_state" = "merged" ]; then
curl -sf -X PATCH "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/issues/${issue_num}" -H "Authorization: token ${GITEA_TOKEN}" -H "Content-Type: application/json" -d '{"state": "closed"}' >/dev/null 2>&1 || true
issue_state=$(get_issue_state "$repo_owner" "$repo_name" "$issue_num")
if [ "$issue_state" = "closed" ]; then
log "WORKER-${worker_id}: VERIFIED #${issue_num} branch pushed, PR merged, proof present, issue closed"
consecutive_failures=0
else
log "WORKER-${worker_id}: BLOCKED #${issue_num} issue did not close after merge"
mark_skip "$issue_num" "issue_close_unverified" 1
consecutive_failures=$((consecutive_failures + 1))
fi
else
log "WORKER-${worker_id}: BLOCKED #${issue_num} merge not verified (state=${pr_state})"
mark_skip "$issue_num" "merge_unverified" 1
consecutive_failures=$((consecutive_failures + 1))
fi
fi
post_issue_comment "$repo_owner" "$repo_name" "$issue_num" "Loop gate blocked completion: PR #${pr_num} exists, but required verification failed ($verify_details). Issue remains open for retry."
mark_skip "$issue_num" "unverified" 1
fi
consecutive_failures=$((consecutive_failures + 1))
fi
elif [ "$exit_code" -eq 124 ]; then
log "WORKER-${worker_id}: TIMEOUT #${issue_num} (work saved in PR)"
@@ -621,7 +623,8 @@ print(json.dumps({
'lines_removed': ${LINES_REMOVED:-0},
'salvaged': ${DIRTY:-0},
'pr': '${pr_num:-}',
'merged': $( [ '$OUTCOME' = 'success' ] && [ -n '${pr_num:-}' ] && echo 'true' || echo 'false' )
'merged': $( [ '$OUTCOME' = 'success' ] && [ -n '${pr_num:-}' ] && echo 'true' || echo 'false' ),
'verified': ${VERIFIED:-false}
}))
" >> "$LOG_DIR/gemini-metrics.jsonl" 2>/dev/null

179
bin/genchi-genbutsu.sh Executable file
View File

@@ -0,0 +1,179 @@
#!/usr/bin/env bash
# genchi-genbutsu.sh — 現地現物 — Go and see. Verify world state, not log vibes.
#
# Post-completion verification that goes and LOOKS at the actual artifacts.
# Performs 5 world-state checks:
# 1. Branch exists on remote
# 2. PR exists
# 3. PR has real file changes (> 0)
# 4. PR is mergeable
# 5. Issue has a completion comment from the agent
#
# Usage: genchi-genbutsu.sh <repo_owner> <repo_name> <issue_num> <branch> <agent_name>
# Returns: JSON to stdout, logs JSONL, exit 0 = VERIFIED, exit 1 = UNVERIFIED
set -euo pipefail
GITEA_URL="${GITEA_URL:-https://forge.alexanderwhitestone.com}"
GITEA_TOKEN="${GITEA_TOKEN:-}"
LOG_DIR="${LOG_DIR:-$HOME/.hermes/logs}"
VERIFY_LOG="$LOG_DIR/genchi-genbutsu.jsonl"
if [ $# -lt 5 ]; then
echo "Usage: $0 <repo_owner> <repo_name> <issue_num> <branch> <agent_name>" >&2
exit 2
fi
repo_owner="$1"
repo_name="$2"
issue_num="$3"
branch="$4"
agent_name="$5"
mkdir -p "$LOG_DIR"
# ── Helpers ──────────────────────────────────────────────────────────
check_branch_exists() {
# Use Gitea API instead of git ls-remote so we don't need clone credentials
curl -sf "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/branches/${branch}" \
-H "Authorization: token ${GITEA_TOKEN}" >/dev/null 2>&1
}
get_pr_num() {
curl -sf "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/pulls?state=all&head=${repo_owner}:${branch}&limit=1" \
-H "Authorization: token ${GITEA_TOKEN}" 2>/dev/null | python3 -c "
import sys, json
prs = json.load(sys.stdin)
print(prs[0]['number'] if prs else '')
"
}
check_pr_files() {
local pr_num="$1"
curl -sf "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/pulls/${pr_num}/files" \
-H "Authorization: token ${GITEA_TOKEN}" 2>/dev/null | python3 -c "
import sys, json
try:
files = json.load(sys.stdin)
print(len(files) if isinstance(files, list) else 0)
except:
print(0)
"
}
check_pr_mergeable() {
local pr_num="$1"
curl -sf "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/pulls/${pr_num}" \
-H "Authorization: token ${GITEA_TOKEN}" 2>/dev/null | python3 -c "
import sys, json
pr = json.load(sys.stdin)
print('true' if pr.get('mergeable') else 'false')
"
}
check_completion_comment() {
curl -sf "${GITEA_URL}/api/v1/repos/${repo_owner}/${repo_name}/issues/${issue_num}/comments" \
-H "Authorization: token ${GITEA_TOKEN}" 2>/dev/null | AGENT="$agent_name" python3 -c "
import os, sys, json
agent = os.environ.get('AGENT', '').lower()
try:
comments = json.load(sys.stdin)
except:
sys.exit(1)
for c in reversed(comments):
user = ((c.get('user') or {}).get('login') or '').lower()
if user == agent:
sys.exit(0)
sys.exit(1)
"
}
# ── Run checks ───────────────────────────────────────────────────────
ts=$(date -u '+%Y-%m-%dT%H:%M:%SZ')
status="VERIFIED"
details=()
checks_json='{}'
# Check 1: branch
if check_branch_exists; then
checks_json=$(echo "$checks_json" | python3 -c "import sys,json;d=json.load(sys.stdin);d['branch']=True;print(json.dumps(d))")
else
checks_json=$(echo "$checks_json" | python3 -c "import sys,json;d=json.load(sys.stdin);d['branch']=False;print(json.dumps(d))")
status="UNVERIFIED"
details+=("remote branch ${branch} not found")
fi
# Check 2: PR exists
pr_num=$(get_pr_num)
if [ -n "$pr_num" ]; then
checks_json=$(echo "$checks_json" | python3 -c "import sys,json;d=json.load(sys.stdin);d['pr']=True;print(json.dumps(d))")
else
checks_json=$(echo "$checks_json" | python3 -c "import sys,json;d=json.load(sys.stdin);d['pr']=False;print(json.dumps(d))")
status="UNVERIFIED"
details+=("no PR found for branch ${branch}")
fi
# Check 3: PR has real file changes
if [ -n "$pr_num" ]; then
file_count=$(check_pr_files "$pr_num")
if [ "${file_count:-0}" -gt 0 ]; then
checks_json=$(echo "$checks_json" | python3 -c "import sys,json;d=json.load(sys.stdin);d['files']=True;print(json.dumps(d))")
else
checks_json=$(echo "$checks_json" | python3 -c "import sys,json;d=json.load(sys.stdin);d['files']=False;print(json.dumps(d))")
status="UNVERIFIED"
details+=("PR #${pr_num} has 0 changed files")
fi
# Check 4: PR is mergeable
if [ "$(check_pr_mergeable "$pr_num")" = "true" ]; then
checks_json=$(echo "$checks_json" | python3 -c "import sys,json;d=json.load(sys.stdin);d['mergeable']=True;print(json.dumps(d))")
else
checks_json=$(echo "$checks_json" | python3 -c "import sys,json;d=json.load(sys.stdin);d['mergeable']=False;print(json.dumps(d))")
status="UNVERIFIED"
details+=("PR #${pr_num} is not mergeable")
fi
else
checks_json=$(echo "$checks_json" | python3 -c "import sys,json;d=json.load(sys.stdin);d['files']=None;d['mergeable']=None;print(json.dumps(d))")
fi
# Check 5: completion comment from agent
if check_completion_comment; then
checks_json=$(echo "$checks_json" | python3 -c "import sys,json;d=json.load(sys.stdin);d['comment']=True;print(json.dumps(d))")
else
checks_json=$(echo "$checks_json" | python3 -c "import sys,json;d=json.load(sys.stdin);d['comment']=False;print(json.dumps(d))")
status="UNVERIFIED"
details+=("no completion comment from ${agent_name} on issue #${issue_num}")
fi
# Build detail string
detail_str=$(IFS="; "; echo "${details[*]:-all checks passed}")
# ── Output ───────────────────────────────────────────────────────────
result=$(python3 -c "
import json
print(json.dumps({
'status': '$status',
'repo': '${repo_owner}/${repo_name}',
'issue': $issue_num,
'branch': '$branch',
'agent': '$agent_name',
'pr': '$pr_num',
'checks': $checks_json,
'details': '$detail_str',
'ts': '$ts'
}, indent=2))
")
printf '%s\n' "$result"
# Append to JSONL log
printf '%s\n' "$result" >> "$VERIFY_LOG"
if [ "$status" = "VERIFIED" ]; then
exit 0
else
exit 1
fi

45
bin/kaizen-retro.sh Executable file
View File

@@ -0,0 +1,45 @@
#!/usr/bin/env bash
# kaizen-retro.sh — Automated retrospective after every burn cycle.
#
# Runs daily after the morning report.
# Analyzes success rates by agent, repo, and issue type.
# Identifies max-attempts issues, generates ONE concrete improvement,
# and posts the retro to Telegram + the master morning-report issue.
#
# Usage:
# ./bin/kaizen-retro.sh [--dry-run]
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="${SCRIPT_DIR%/bin}"
PYTHON="${PYTHON3:-python3}"
# Source local env if available so TELEGRAM_BOT_TOKEN is picked up
HOME_DIR="${HOME:-$(eval echo ~$(whoami))}"
for env_file in "$HOME_DIR/.hermes/.env" "$HOME_DIR/.timmy/.env" "$REPO_ROOT/.env"; do
if [ -f "$env_file" ]; then
# shellcheck source=/dev/null
set -a
# shellcheck source=/dev/null
source "$env_file"
set +a
fi
done
# If the configured Gitea URL is unreachable but localhost works, prefer localhost
if ! curl -sf "${GITEA_URL:-http://localhost:3000}/api/v1/version" >/dev/null 2>&1; then
if curl -sf http://localhost:3000/api/v1/version >/dev/null 2>&1; then
export GITEA_URL="http://localhost:3000"
fi
fi
# Ensure the Python script exists
RETRO_PY="$REPO_ROOT/scripts/kaizen_retro.py"
if [ ! -f "$RETRO_PY" ]; then
echo "ERROR: kaizen_retro.py not found at $RETRO_PY" >&2
exit 1
fi
# Run
exec "$PYTHON" "$RETRO_PY" "$@"

View File

@@ -137,7 +137,38 @@
"paused_reason": null,
"skills": [],
"skill": null
},
{
"id": "kaizen-retro-349",
"name": "Kaizen Retro",
"prompt": "Run the automated burn-cycle retrospective. Execute: cd /root/wizards/ezra/workspace/timmy-config && ./bin/kaizen-retro.sh",
"model": "hermes3:latest",
"provider": "ollama",
"base_url": "http://localhost:11434/v1",
"schedule": {
"kind": "interval",
"minutes": 1440,
"display": "every 1440m"
},
"schedule_display": "daily at 07:30",
"repeat": {
"times": null,
"completed": 0
},
"enabled": true,
"created_at": "2026-04-07T15:30:00.000000Z",
"next_run_at": "2026-04-08T07:30:00.000000Z",
"last_run_at": null,
"last_status": null,
"last_error": null,
"deliver": "local",
"origin": null,
"state": "scheduled",
"paused_at": null,
"paused_reason": null,
"skills": [],
"skill": null
}
],
"updated_at": "2026-04-07T15:00:00+00:00"
}
}

4
evaluations/crewai/.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
venv/
__pycache__/
*.pyc
.env

View File

@@ -0,0 +1,140 @@
# CrewAI Evaluation for Phase 2 Integration
**Date:** 2026-04-07
**Issue:** [#358 ORCHESTRATOR-4] Evaluate CrewAI for Phase 2 integration
**Author:** Ezra
**House:** hermes-ezra
## Summary
CrewAI was installed, a 2-agent proof-of-concept crew was built, and an operational test was attempted against issue #358. Based on code analysis, installation experience, and alignment with the coordinator-first protocol, the **verdict is REJECT for Phase 2 integration**. CrewAI adds significant dependency weight and abstraction opacity without solving problems the current Huey-based stack cannot already handle.
---
## 1. Proof-of-Concept Crew
### Agents
| Agent | Role | Responsibility |
|-------|------|----------------|
| `researcher` | Orchestration Researcher | Reads current orchestrator files and extracts factual comparisons |
| `evaluator` | Integration Evaluator | Synthesizes research into a structured adoption recommendation |
### Tools
- `read_orchestrator_files` — Returns `orchestration.py`, `tasks.py`, `bin/timmy-orchestrator.sh`, and `docs/coordinator-first-protocol.md`
- `read_issue_358` — Returns the text of the governing issue
### Code
See `poc_crew.py` in this directory for the full implementation.
---
## 2. Operational Test Results
### What worked
- `pip install crewai` completed successfully (v1.13.0)
- Agent and tool definitions compiled without errors
- Crew startup and task dispatch UI rendered correctly
### What failed
- **Live LLM execution blocked by authentication failures.** Available API credentials (OpenRouter, Kimi) were either rejected or not present in the runtime environment.
- No local `llama-server` was running on the expected port (8081), and starting one was out of scope for this evaluation.
### Why this matters
The authentication failure is **not a trivial setup issue** — it is a preview of the operational complexity CrewAI introduces. The current Huey stack runs entirely offline against local SQLite and local Hermes models. CrewAI, by contrast, demands either:
- A managed cloud LLM API with live credentials, or
- A carefully tuned local model endpoint that supports its verbose ReAct-style prompts
Either path increases blast radius and failure modes.
---
## 3. Current Custom Orchestrator Analysis
### Stack
- **Huey** (`orchestration.py`) — SQLite-backed task queue, ~6 lines of initialization
- **tasks.py** — ~2,300 lines of scheduled work (triage, PR review, metrics, heartbeat)
- **bin/timmy-orchestrator.sh** — Shell-based polling loop for state gathering and PR review
- **docs/coordinator-first-protocol.md** — Intake → Triage → Route → Track → Verify → Report
### Strengths
1. **Sovereignty** — No external SaaS dependency for queue execution. SQLite is local and inspectable.
2. **Gitea as truth** — All state mutations are visible in the forge. Local-only state is explicitly advisory.
3. **Simplicity** — Huey has a tiny surface area. A human can read `orchestration.py` in seconds.
4. **Tool-native**`tasks.py` calls Hermes directly via `subprocess.run([HERMES_PYTHON, ...])`. No framework indirection.
5. **Deterministic routing** — The coordinator-first protocol defines exact authority boundaries (Timmy, Allegro, workers, Alexander).
### Gaps
- **No built-in agent memory/RAG** — but this is intentional per the pre-compaction flush contract and memory-continuity doctrine.
- **No multi-agent collaboration primitives** — but the current stack routes work to single owners explicitly.
- **PR review is shell-prompt driven** — Could be tightened, but this is a prompt engineering issue, not an orchestrator gap.
---
## 4. CrewAI Capability Analysis
### What CrewAI offers
- **Agent roles** — Declarative backstory/goal/role definitions
- **Task graphs** — Sequential, hierarchical, or parallel task execution
- **Tool registry** — Pydantic-based tool schemas with auto-validation
- **Memory/RAG** — Built-in short-term and long-term memory via ChromaDB/LanceDB
- **Crew-wide context sharing** — Output from one task flows to the next
### Dependency footprint observed
CrewAI pulled in **85+ packages**, including:
- `chromadb` (~20 MB) + `onnxruntime` (~17 MB)
- `lancedb` (~47 MB)
- `kubernetes` client (unused but required by Chroma)
- `grpcio`, `opentelemetry-*`, `pdfplumber`, `textual`
Total venv size: **>500 MB**.
By contrast, Huey is **one package** (`huey`) with zero required services.
---
## 5. Alignment with Coordinator-First Protocol
| Principle | Current Stack | CrewAI | Assessment |
|-----------|--------------|--------|------------|
| **Gitea is truth** | All assignments, PRs, comments are explicit API calls | Agent memory is local/ChromaDB. State can drift from Gitea unless every tool explicitly syncs | **Misaligned** |
| **Local-only state is advisory** | SQLite queue is ephemeral; canonical state is in Gitea | CrewAI encourages "crew memory" as authoritative | **Misaligned** |
| **Verification-before-complete** | PR review + merge require visible diffs and explicit curl calls | Tool outputs can be hallucinated or incomplete without strict guardrails | **Requires heavy customization** |
| **Sovereignty** | Runs on VPS with no external orchestrator SaaS | Requires external LLM or complex local model tuning | **Degraded** |
| **Simplicity** | ~6 lines for Huey init, readable shell scripts | 500+ MB dependency tree, opaque LangChain-style internals | **Degraded** |
---
## 6. Verdict
**REJECT CrewAI for Phase 2 integration.**
**Confidence:** High
### Trade-offs
- **Pros of CrewAI:** Nice agent-role syntax; built-in task sequencing; rich tool schema validation; active ecosystem.
- **Cons of CrewAI:** Massive dependency footprint; memory model conflicts with Gitea-as-truth doctrine; requires either cloud API spend or fragile local model integration; adds abstraction layers that obscure what is actually happening.
### Risks if adopted
1. **Dependency rot** — 85+ transitive dependencies, many with conflicting version ranges.
2. **State drift** — CrewAI's memory primitives train users to treat local vector DB as truth.
3. **Credential fragility** — Live API requirements introduce a new failure mode the current stack does not have.
4. **Vendor-like lock-in** — CrewAI's abstractions sit thickly over LangChain. Debugging a stuck crew is harder than debugging a Huey task traceback.
### Recommended next step
Instead of adopting CrewAI, **evolve the current Huey stack** with:
1. A lightweight `Agent` dataclass in `tasks.py` (role, goal, system_prompt) to get the organizational clarity of CrewAI without the framework weight.
2. A `delegate()` helper that uses Hermes's existing `delegate_tool.py` for multi-agent work.
3. Keep Gitea as the only durable state surface. Any "memory" should flush to issue comments or `timmy-home` markdown, not a vector DB.
If multi-agent collaboration becomes a hard requirement in the future, evaluate lighter alternatives (e.g., raw OpenAI/Anthropic function-calling loops, or a thin `smolagents`-style wrapper) before reconsidering CrewAI.
---
## Artifacts
- `poc_crew.py` — 2-agent CrewAI proof-of-concept
- `requirements.txt` — Dependency manifest
- `CREWAI_EVALUATION.md` — This document

View File

@@ -0,0 +1,150 @@
#!/usr/bin/env python3
"""CrewAI proof-of-concept for evaluating Phase 2 orchestrator integration.
Tests CrewAI against a real issue: #358 [ORCHESTRATOR-4] Evaluate CrewAI
for Phase 2 integration.
"""
import os
from pathlib import Path
from crewai import Agent, Task, Crew, LLM
from crewai.tools import BaseTool
# ── Configuration ─────────────────────────────────────────────────────
OPENROUTER_API_KEY = os.getenv(
"OPENROUTER_API_KEY",
"dsk-or-v1-f60c89db12040267458165cf192e815e339eb70548e4a0a461f5f0f69e6ef8b0",
)
llm = LLM(
model="openrouter/google/gemini-2.0-flash-001",
api_key=OPENROUTER_API_KEY,
base_url="https://openrouter.ai/api/v1",
)
REPO_ROOT = Path(__file__).resolve().parents[2]
def _slurp(relpath: str, max_lines: int = 150) -> str:
p = REPO_ROOT / relpath
if not p.exists():
return f"[FILE NOT FOUND: {relpath}]"
lines = p.read_text().splitlines()
header = f"=== {relpath} ({len(lines)} lines total, showing first {max_lines}) ===\n"
return header + "\n".join(lines[:max_lines])
# ── Tools ─────────────────────────────────────────────────────────────
class ReadOrchestratorFilesTool(BaseTool):
name: str = "read_orchestrator_files"
description: str = (
"Reads the current custom orchestrator implementation files "
"(orchestration.py, tasks.py, timmy-orchestrator.sh, coordinator-first-protocol.md) "
"and returns their contents for analysis."
)
def _run(self) -> str:
return "\n\n".join(
[
_slurp("orchestration.py"),
_slurp("tasks.py", max_lines=120),
_slurp("bin/timmy-orchestrator.sh", max_lines=120),
_slurp("docs/coordinator-first-protocol.md", max_lines=120),
]
)
class ReadIssueTool(BaseTool):
name: str = "read_issue_358"
description: str = "Returns the text of Gitea issue #358 that we are evaluating."
def _run(self) -> str:
return (
"Title: [ORCHESTRATOR-4] Evaluate CrewAI for Phase 2 integration\n"
"Body:\n"
"Part of Epic: #354\n\n"
"Install CrewAI, build a proof-of-concept crew with 2 agents, "
"test on a real issue. Evaluate: does it add value over our custom orchestrator? Document findings."
)
# ── Agents ────────────────────────────────────────────────────────────
researcher = Agent(
role="Orchestration Researcher",
goal="Gather a complete understanding of the current custom orchestrator and how CrewAI compares to it.",
backstory=(
"You are a systems architect who specializes in evaluating orchestration frameworks. "
"You read code carefully, extract facts, and avoid speculation. "
"You focus on concrete capabilities, dependencies, and operational complexity."
),
llm=llm,
tools=[ReadOrchestratorFilesTool(), ReadIssueTool()],
verbose=True,
)
evaluator = Agent(
role="Integration Evaluator",
goal="Synthesize research into a clear recommendation on whether CrewAI adds value for Phase 2.",
backstory=(
"You are a pragmatic engineering lead who values sovereignty, simplicity, and observable state. "
"You compare frameworks against the team's existing coordinator-first protocol. "
"You produce structured recommendations with explicit trade-offs."
),
llm=llm,
verbose=True,
)
# ── Tasks ─────────────────────────────────────────────────────────────
task_research = Task(
description=(
"Read the current custom orchestrator files and issue #358. "
"Produce a structured research report covering:\n"
"1. Current stack summary (Huey + tasks.py + timmy-orchestrator.sh)\n"
"2. Current strengths (sovereignty, local-first, Gitea as truth, simplicity)\n"
"3. Current gaps or limitations (if any)\n"
"4. What CrewAI offers (agent roles, tasks, crews, tools, memory/RAG)\n"
"5. CrewAI's dependencies and operational footprint (what you observed during installation)\n"
"Be factual and concise."
),
expected_output="A structured markdown research report with the 5 sections above.",
agent=researcher,
)
task_evaluate = Task(
description=(
"Using the research report, evaluate whether CrewAI should be adopted for Phase 2 integration. "
"Consider the coordinator-first protocol (Gitea as truth, local-only state is advisory, "
"verification-before-complete, sovereignty).\n\n"
"Produce a final evaluation with:\n"
"- VERDICT: Adopt / Reject / Defer\n"
"- Confidence: High / Medium / Low\n"
"- Key trade-offs (3-5 bullets)\n"
"- Risks if adopted\n"
"- Recommended next step"
),
expected_output="A structured markdown evaluation with verdict, confidence, trade-offs, risks, and recommendation.",
agent=evaluator,
context=[task_research],
)
# ── Crew ──────────────────────────────────────────────────────────────
crew = Crew(
agents=[researcher, evaluator],
tasks=[task_research, task_evaluate],
verbose=True,
)
if __name__ == "__main__":
print("=" * 70)
print("CrewAI PoC — Evaluating CrewAI for Phase 2 Integration")
print("=" * 70)
result = crew.kickoff()
print("\n" + "=" * 70)
print("FINAL OUTPUT")
print("=" * 70)
print(result.raw)

View File

@@ -0,0 +1 @@
crewai>=1.13.0

122
fleet/agent_lifecycle.py Normal file
View File

@@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""
FLEET-012: Agent Lifecycle Manager
Phase 5: Scale — spawn, train, deploy, retire agents automatically.
Manages the full lifecycle:
1. PROVISION: Clone template, install deps, configure, test
2. DEPLOY: Add to active rotation, start accepting issues
3. MONITOR: Track performance, quality, heartbeat
4. RETIRE: Decommission when idle or underperforming
Usage:
python3 agent_lifecycle.py provision <name> <vps> [--model model]
python3 agent_lifecycle.py deploy <name>
python3 agent_lifecycle.py retire <name>
python3 agent_lifecycle.py status
python3 agent_lifecycle.py monitor
"""
import os, sys, json
from datetime import datetime, timezone
DATA_DIR = os.path.expanduser("~/.local/timmy/fleet-agents")
DB_FILE = os.path.join(DATA_DIR, "agents.json")
LOG_FILE = os.path.join(DATA_DIR, "lifecycle.log")
def ensure():
os.makedirs(DATA_DIR, exist_ok=True)
def log(msg, level="INFO"):
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{ts}] [{level}] {msg}"
with open(LOG_FILE, "a") as f: f.write(entry + "\n")
print(f" {entry}")
def load():
if os.path.exists(DB_FILE):
return json.loads(open(DB_FILE).read())
return {}
def save(db):
open(DB_FILE, "w").write(json.dumps(db, indent=2))
def status():
agents = load()
print("\n=== Agent Fleet ===")
if not agents:
print(" No agents registered.")
return
for name, a in agents.items():
state = a.get("state", "?")
vps = a.get("vps", "?")
model = a.get("model", "?")
tasks = a.get("tasks_completed", 0)
hb = a.get("last_heartbeat", "never")
print(f" {name:15s} state={state:12s} vps={vps:5s} model={model:15s} tasks={tasks} hb={hb}")
def provision(name, vps, model="hermes4:14b"):
agents = load()
if name in agents:
print(f" '{name}' already exists (state={agents[name].get('state')})")
return
agents[name] = {
"name": name, "vps": vps, "model": model, "state": "provisioning",
"created_at": datetime.now(timezone.utc).isoformat(),
"tasks_completed": 0, "tasks_failed": 0, "last_heartbeat": None,
}
save(agents)
log(f"Provisioned '{name}' on {vps} with {model}")
def deploy(name):
agents = load()
if name not in agents:
print(f" '{name}' not found")
return
agents[name]["state"] = "deployed"
agents[name]["deployed_at"] = datetime.now(timezone.utc).isoformat()
save(agents)
log(f"Deployed '{name}'")
def retire(name):
agents = load()
if name not in agents:
print(f" '{name}' not found")
return
agents[name]["state"] = "retired"
agents[name]["retired_at"] = datetime.now(timezone.utc).isoformat()
save(agents)
log(f"Retired '{name}'. Completed {agents[name].get('tasks_completed', 0)} tasks.")
def monitor():
agents = load()
now = datetime.now(timezone.utc)
changes = 0
for name, a in agents.items():
if a.get("state") != "deployed": continue
hb = a.get("last_heartbeat")
if hb:
try:
hb_t = datetime.fromisoformat(hb)
hours = (now - hb_t).total_seconds() / 3600
if hours > 24 and a.get("state") == "deployed":
a["state"] = "idle"
a["idle_since"] = now.isoformat()
log(f"'{name}' idle for {hours:.1f}h")
changes += 1
except (ValueError, TypeError): pass
if changes: save(agents)
print(f"Monitor: {changes} state changes" if changes else "Monitor: all healthy")
if __name__ == "__main__":
ensure()
cmd = sys.argv[1] if len(sys.argv) > 1 else "monitor"
if cmd == "status": status()
elif cmd == "provision" and len(sys.argv) >= 4:
model = sys.argv[4] if len(sys.argv) >= 5 else "hermes4:14b"
provision(sys.argv[2], sys.argv[3], model)
elif cmd == "deploy" and len(sys.argv) >= 3: deploy(sys.argv[2])
elif cmd == "retire" and len(sys.argv) >= 3: retire(sys.argv[2])
elif cmd == "monitor": monitor()
elif cmd == "run": monitor()
else: print("Usage: agent_lifecycle.py [provision|deploy|retire|status|monitor]")

272
fleet/auto_restart.py Executable file
View File

@@ -0,0 +1,272 @@
#!/usr/bin/env python3
"""
Auto-Restart Agent — Self-healing process monitor for fleet machines.
Detects dead services and restarts them automatically.
Escalates after 3 attempts (prevents restart loops).
Logs all actions to ~/.local/timmy/fleet-health/restarts.log
Alerts via Telegram if service cannot be recovered.
Prerequisite: FLEET-006 (health check) must be running to detect failures.
Usage:
python3 auto_restart.py # Run checks now
python3 auto_restart.py --daemon # Run continuously (every 60s)
python3 auto_restart.py --status # Show restart history
"""
import os
import sys
import json
import time
import subprocess
from datetime import datetime, timezone
from pathlib import Path
# === CONFIG ===
LOG_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-health"))
RESTART_LOG = LOG_DIR / "restarts.log"
COOLDOWN_FILE = LOG_DIR / "restart_cooldowns.json"
MAX_RETRIES = 3
COOLDOWN_PERIOD = 3600 # 1 hour between escalation alerts
# Services definition: name, check command, restart command
# Local services:
LOCAL_SERVICES = {
"hermes-gateway": {
"check": "pgrep -f 'hermes gateway' > /dev/null 2>/dev/null",
"restart": "cd ~/code-claw && ./restart-gateway.sh 2>/dev/null || launchctl kickstart -k ai.hermes.gateway 2>/dev/null",
"critical": True,
},
"ollama": {
"check": "pgrep -f 'ollama serve' > /dev/null 2>/dev/null",
"restart": "launchctl kickstart -k com.ollama.ollama 2>/dev/null || /opt/homebrew/bin/brew services restart ollama 2>/dev/null",
"critical": False,
},
"codeclaw-heartbeat": {
"check": "launchctl list | grep 'ai.timmy.codeclaw-qwen-heartbeat' > /dev/null 2>/dev/null",
"restart": "launchctl kickstart -k ai.timmy.codeclaw-qwen-heartbeat 2>/dev/null",
"critical": False,
},
}
# VPS services to restart via SSH
VPS_SERVICES = {
"ezra": {
"ip": "143.198.27.163",
"user": "root",
"services": {
"gitea": {
"check": "systemctl is-active gitea 2>/dev/null | grep -q active",
"restart": "systemctl restart gitea 2>/dev/null",
"critical": True,
},
"nginx": {
"check": "systemctl is-active nginx 2>/dev/null | grep -q active",
"restart": "systemctl restart nginx 2>/dev/null",
"critical": False,
},
"hermes-agent": {
"check": "pgrep -f 'hermes gateway' > /dev/null 2>/dev/null",
"restart": "cd /root/wizards/ezra/hermes-agent && source .venv/bin/activate && nohup hermes gateway run --replace > /dev/null 2>&1 &",
"critical": True,
},
},
},
"allegro": {
"ip": "167.99.126.228",
"user": "root",
"services": {
"hermes-agent": {
"check": "pgrep -f 'hermes gateway' > /dev/null 2>/dev/null",
"restart": "cd /root/wizards/allegro/hermes-agent && source .venv/bin/activate && nohup hermes gateway run --replace > /dev/null 2>&1 &",
"critical": True,
},
},
},
"bezalel": {
"ip": "159.203.146.185",
"user": "root",
"services": {
"hermes-agent": {
"check": "pgrep -f 'hermes gateway' > /dev/null 2>/dev/null",
"restart": "cd /root/wizards/bezalel/hermes/venv/bin/activate && nohup hermes gateway run > /dev/null 2>&1 &",
"critical": True,
},
"evennia": {
"check": "pgrep -f 'evennia' > /dev/null 2>/dev/null",
"restart": "cd /root/.evennia/timmy_world && evennia restart 2>/dev/null",
"critical": False,
},
},
},
}
TELEGRAM_TOKEN_FILE = Path(os.path.expanduser("~/.config/telegram/special_bot"))
TELEGRAM_CHAT = "-1003664764329"
def send_telegram(message):
if not TELEGRAM_TOKEN_FILE.exists():
return False
token = TELEGRAM_TOKEN_FILE.read_text().strip()
url = f"https://api.telegram.org/bot{token}/sendMessage"
body = json.dumps({
"chat_id": TELEGRAM_CHAT,
"text": f"[AUTO-RESTART]\n{message}",
}).encode()
try:
import urllib.request
req = urllib.request.Request(url, data=body, headers={"Content-Type": "application/json"}, method="POST")
urllib.request.urlopen(req, timeout=10)
return True
except Exception:
return False
def get_cooldowns():
if COOLDOWN_FILE.exists():
try:
return json.loads(COOLDOWN_FILE.read_text())
except json.JSONDecodeError:
pass
return {}
def save_cooldowns(data):
COOLDOWN_FILE.write_text(json.dumps(data, indent=2))
def check_service(check_cmd, timeout=10):
try:
proc = subprocess.run(check_cmd, shell=True, capture_output=True, timeout=timeout)
return proc.returncode == 0
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
return False
def restart_service(restart_cmd, timeout=30):
try:
proc = subprocess.run(restart_cmd, shell=True, capture_output=True, timeout=timeout)
return proc.returncode == 0
except (subprocess.TimeoutExpired, subprocess.SubprocessError) as e:
return False
def try_restart_via_ssh(name, host_config, service_name):
ip = host_config["ip"]
user = host_config["user"]
service = host_config["services"][service_name]
restart_cmd = f'ssh -o StrictHostKeyChecking=no -o ConnectTimeout=10 {user}@{ip} "{service["restart"]}"'
return restart_service(restart_cmd, timeout=30)
def log_restart(service_name, machine, attempt, success):
ts = datetime.now(timezone.utc).isoformat()
status = "SUCCESS" if success else "FAILED"
log_entry = f"{ts} [{status}] {machine}/{service_name} (attempt {attempt})\n"
RESTART_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(RESTART_LOG, "a") as f:
f.write(log_entry)
print(f" [{status}] {machine}/{service_name} - attempt {attempt}")
def check_and_restart():
"""Run all restart checks."""
results = []
cooldowns = get_cooldowns()
now = time.time()
# Check local services
for name, service in LOCAL_SERVICES.items():
if not check_service(service["check"]):
cooldown_key = f"local/{name}"
retries = cooldowns.get(cooldown_key, {"count": 0, "last": 0}).get("count", 0)
if retries >= MAX_RETRIES:
last = cooldowns.get(cooldown_key, {}).get("last", 0)
if now - last < COOLDOWN_PERIOD and service["critical"]:
send_telegram(f"CRITICAL: local/{name} failed {MAX_RETRIES} restart attempts. Needs human intervention.")
cooldowns[cooldown_key] = {"count": 0, "last": now}
save_cooldowns(cooldowns)
continue
success = restart_service(service["restart"])
log_restart(name, "local", retries + 1, success)
cooldowns[cooldown_key] = {"count": retries + 1 if not success else 0, "last": now}
save_cooldowns(cooldowns)
if success:
# Verify it actually started
time.sleep(3)
if check_service(service["check"]):
print(f" VERIFIED: local/{name} is running")
else:
print(f" WARNING: local/{name} restart command returned success but process not detected")
# Check VPS services
for host, host_config in VPS_SERVICES.items():
for service_name, service in host_config["services"].items():
check_cmd = f'ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 {host_config["user"]}@{host_config["ip"]} "{service["check"]}"'
if not check_service(check_cmd):
cooldown_key = f"{host}/{service_name}"
retries = cooldowns.get(cooldown_key, {"count": 0, "last": 0}).get("count", 0)
if retries >= MAX_RETRIES:
last = cooldowns.get(cooldown_key, {}).get("last", 0)
if now - last < COOLDOWN_PERIOD and service["critical"]:
send_telegram(f"CRITICAL: {host}/{service_name} failed {MAX_RETRIES} restart attempts. Needs human intervention.")
cooldowns[cooldown_key] = {"count": 0, "last": now}
save_cooldowns(cooldowns)
continue
success = try_restart_via_ssh(host, host_config, service_name)
log_restart(service_name, host, retries + 1, success)
cooldowns[cooldown_key] = {"count": retries + 1 if not success else 0, "last": now}
save_cooldowns(cooldowns)
return results
def daemon_mode():
"""Run continuously every 60 seconds."""
print("Auto-restart agent running in daemon mode (60s interval)")
print(f"Monitoring {len(LOCAL_SERVICES)} local + {sum(len(h['services']) for h in VPS_SERVICES.values())} remote services")
print(f"Max retries per cycle: {MAX_RETRIES}")
print(f"Cooldown after max retries: {COOLDOWN_PERIOD}s")
while True:
check_and_restart()
time.sleep(60)
def show_status():
"""Show restart history and cooldowns."""
cooldowns = get_cooldowns()
print("=== Restart Cooldowns ===")
for key, data in sorted(cooldowns.items()):
count = data.get("count", 0)
if count > 0:
print(f" {key}: {count} failures, last at {datetime.fromtimestamp(data.get('last',0), tz=timezone.utc).strftime('%H:%M')}")
print("\n=== Restart Log (last 20) ===")
if RESTART_LOG.exists():
lines = RESTART_LOG.read_text().strip().split("\n")
for line in lines[-20:]:
print(f" {line}")
else:
print(" No restarts logged yet.")
if __name__ == "__main__":
LOG_DIR.mkdir(parents=True, exist_ok=True)
if len(sys.argv) > 1 and sys.argv[1] == "--daemon":
daemon_mode()
elif len(sys.argv) > 1 and sys.argv[1] == "--status":
show_status()
else:
check_and_restart()

122
fleet/delegation.py Normal file
View File

@@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""
FLEET-010: Cross-Agent Task Delegation Protocol
Phase 3: Orchestration. Agents create issues, assign to other agents, review PRs.
Keyword-based heuristic assigns unassigned issues to the right agent:
- claw-code: small patches, config, docs, repo hygiene
- gemini: research, heavy implementation, architecture, debugging
- ezra: VPS, SSH, deploy, infrastructure, cron, ops
- bezalel: evennia, art, creative, music, visualization
- timmy: orchestration, review, deploy, fleet, pipeline
Usage:
python3 delegation.py run # Full cycle: scan, assign, report
python3 delegation.py status # Show current delegation state
python3 delegation.py monitor # Check agent assignments for stuck items
"""
import os, sys, json, urllib.request
from datetime import datetime, timezone
from pathlib import Path
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
TOKEN = Path(os.path.expanduser("~/.config/gitea/token")).read_text().strip()
DATA_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-resources"))
LOG_FILE = DATA_DIR / "delegation.log"
HEADERS = {"Authorization": f"token {TOKEN}"}
AGENTS = {
"claw-code": {"caps": ["patch","config","gitignore","cleanup","format","readme","typo"], "active": True},
"gemini": {"caps": ["research","investigate","benchmark","survey","evaluate","architecture","implementation"], "active": True},
"ezra": {"caps": ["vps","ssh","deploy","cron","resurrect","provision","infra","server"], "active": True},
"bezalel": {"caps": ["evennia","art","creative","music","visual","design","animation"], "active": True},
"timmy": {"caps": ["orchestrate","review","pipeline","fleet","monitor","health","deploy","ci"], "active": True},
}
MONITORED = [
"Timmy_Foundation/timmy-home",
"Timmy_Foundation/timmy-config",
"Timmy_Foundation/the-nexus",
"Timmy_Foundation/hermes-agent",
]
def api(path, method="GET", data=None):
url = f"{GITEA_BASE}{path}"
body = json.dumps(data).encode() if data else None
hdrs = dict(HEADERS)
if data: hdrs["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, headers=hdrs, method=method)
try:
resp = urllib.request.urlopen(req, timeout=15)
raw = resp.read().decode()
return json.loads(raw) if raw.strip() else {}
except urllib.error.HTTPError as e:
body = e.read().decode()
print(f" API {e.code}: {body[:150]}")
return None
except Exception as e:
print(f" API error: {e}")
return None
def log(msg):
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(LOG_FILE, "a") as f: f.write(f"[{ts}] {msg}\n")
def suggest_agent(title, body):
text = (title + " " + body).lower()
for agent, info in AGENTS.items():
for kw in info["caps"]:
if kw in text:
return agent, f"matched: {kw}"
return None, None
def assign(repo, num, agent, reason=""):
result = api(f"/repos/{repo}/issues/{num}", method="PATCH",
data={"assignees": {"operation": "set", "usernames": [agent]}})
if result:
api(f"/repos/{repo}/issues/{num}/comments", method="POST",
data={"body": f"[DELEGATION] Assigned to {agent}. {reason}"})
log(f"Assigned {repo}#{num} to {agent}: {reason}")
return result
def run_cycle():
log("--- Delegation cycle start ---")
count = 0
for repo in MONITORED:
issues = api(f"/repos/{repo}/issues?state=open&limit=50")
if not issues: continue
for i in issues:
if i.get("assignees"): continue
title = i.get("title", "")
body = i.get("body", "")
if any(w in title.lower() for w in ["epic", "discussion"]): continue
agent, reason = suggest_agent(title, body)
if agent and AGENTS.get(agent, {}).get("active"):
if assign(repo, i["number"], agent, reason): count += 1
log(f"Cycle complete: {count} new assignments")
print(f"Delegation cycle: {count} assignments")
return count
def status():
print("\n=== Delegation Dashboard ===")
for agent, info in AGENTS.items():
count = 0
for repo in MONITORED:
issues = api(f"/repos/{repo}/issues?state=open&limit=50")
if issues:
for i in issues:
for a in (i.get("assignees") or []):
if a.get("login") == agent: count += 1
icon = "ON" if info["active"] else "OFF"
print(f" {agent:12s}: {count:>3} issues [{icon}]")
if __name__ == "__main__":
cmd = sys.argv[1] if len(sys.argv) > 1 else "run"
DATA_DIR.mkdir(parents=True, exist_ok=True)
if cmd == "status": status()
elif cmd == "run":
run_cycle()
status()
else: status()

126
fleet/model_pipeline.py Normal file
View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
"""
FLEET-011: Local Model Pipeline and Fallback Chain
Phase 4: Sovereignty — all inference runs locally, no cloud dependency.
Checks Ollama endpoints, verifies model availability, tests fallback chain.
Logs results. The chain runs: hermes4:14b -> qwen2.5:7b -> gemma3:1b -> gemma4 (latest)
Usage:
python3 model_pipeline.py # Run full fallback test
python3 model_pipeline.py status # Show current model status
python3 model_pipeline.py list # List all local models
python3 model_pipeline.py test # Generate test output from each model
"""
import os, sys, json, urllib.request
from datetime import datetime, timezone
from pathlib import Path
OLLAMA_HOST = os.environ.get("OLLAMA_HOST", "localhost:11434")
LOG_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-health"))
CHAIN_FILE = Path(os.path.expanduser("~/.local/timmy/fleet-resources/model-chain.json"))
DEFAULT_CHAIN = [
{"model": "hermes4:14b", "role": "primary"},
{"model": "qwen2.5:7b", "role": "fallback"},
{"model": "phi3:3.8b", "role": "emergency"},
{"model": "gemma3:1b", "role": "minimal"},
]
def log(msg):
LOG_DIR.mkdir(parents=True, exist_ok=True)
with open(LOG_DIR / "model-pipeline.log", "a") as f:
f.write(f"[{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}] {msg}\n")
def check_ollama():
try:
resp = urllib.request.urlopen(f"http://{OLLAMA_HOST}/api/tags", timeout=5)
return json.loads(resp.read())
except Exception as e:
return {"error": str(e)}
def list_models():
data = check_ollama()
if "error" in data:
print(f" Ollama not reachable at {OLLAMA_HOST}: {data['error']}")
return []
models = data.get("models", [])
for m in models:
name = m.get("name", "?")
size = m.get("size", 0) / (1024**3)
print(f" {name:<25s} {size:.1f} GB")
return [m["name"] for m in models]
def test_model(model, prompt="Say 'beacon lit' and nothing else."):
try:
body = json.dumps({"model": model, "prompt": prompt, "stream": False}).encode()
req = urllib.request.Request(f"http://{OLLAMA_HOST}/api/generate", data=body,
headers={"Content-Type": "application/json"})
resp = urllib.request.urlopen(req, timeout=60)
result = json.loads(resp.read())
return True, result.get("response", "").strip()
except Exception as e:
return False, str(e)[:100]
def test_chain():
chain_data = {}
if CHAIN_FILE.exists():
chain_data = json.loads(CHAIN_FILE.read_text())
chain = chain_data.get("chain", DEFAULT_CHAIN)
available = list_models() or []
print("\n=== Fallback Chain Test ===")
first_good = None
for entry in chain:
model = entry["model"]
role = entry.get("role", "unknown")
if model in available:
ok, result = test_model(model)
status = "OK" if ok else "FAIL"
print(f" [{status}] {model:<25s} ({role}) — {result[:70]}")
log(f"Fallback test {model}: {status}{result[:100]}")
if ok and first_good is None:
first_good = model
else:
print(f" [MISS] {model:<25s} ({role}) — not installed")
if first_good:
print(f"\n Primary serving: {first_good}")
else:
print(f"\n WARNING: No chain model responding. Fallback broken.")
log("FALLBACK CHAIN BROKEN — no models responding")
def status():
data = check_ollama()
if "error" in data:
print(f" Ollama: DOWN — {data['error']}")
else:
models = data.get("models", [])
print(f" Ollama: UP — {len(models)} models loaded")
print("\n=== Local Models ===")
list_models()
print("\n=== Chain Configuration ===")
if CHAIN_FILE.exists():
chain = json.loads(CHAIN_FILE.read_text()).get("chain", DEFAULT_CHAIN)
else:
chain = DEFAULT_CHAIN
for e in chain:
print(f" {e['model']:<25s} {e.get('role','?')}")
if __name__ == "__main__":
cmd = sys.argv[1] if len(sys.argv) > 1 else "status"
if cmd == "status": status()
elif cmd == "list": list_models()
elif cmd == "test": test_chain()
else:
status()
test_chain()

View File

@@ -146,6 +146,7 @@ class PullRequest:
additions: int = 0
deletions: int = 0
created_at: str = ""
updated_at: str = ""
closed_at: str = ""
@classmethod
@@ -166,6 +167,7 @@ class PullRequest:
additions=d.get("additions", 0),
deletions=d.get("deletions", 0),
created_at=d.get("created_at", ""),
updated_at=d.get("updated_at", ""),
closed_at=d.get("closed_at", ""),
)
@@ -314,6 +316,7 @@ class GiteaClient:
direction: str = "desc",
limit: int = 30,
page: int = 1,
since: Optional[str] = None,
) -> list[Issue]:
"""List issues for a repo."""
raw = self._get(
@@ -326,6 +329,7 @@ class GiteaClient:
direction=direction,
limit=limit,
page=page,
since=since,
)
return [Issue.from_dict(i) for i in raw]

Binary file not shown.

After

Width:  |  Height:  |  Size: 415 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 249 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 509 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 395 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 443 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 246 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 283 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 284 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 225 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 222 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 332 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 496 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 384 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 311 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 407 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 164 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 281 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 569 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 535 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 295 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 299 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 247 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 348 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 379 KiB

View File

@@ -0,0 +1,65 @@
# The Timmy Foundation — Visual Story
## Generated with Grok Imagine | April 7, 2026
### The Origin
| # | File | Description |
|---|------|-------------|
| 01 | wizard-tower-bitcoin.jpg | The Tower, sovereign, connected to Bitcoin by golden lightning |
| 02 | soul-inscription.jpg | SOUL.md glowing on a golden tablet above an ancient book |
| 03 | fellowship-of-wizards.jpg | Five wizards in a circle around a holographic fleet map |
| 04 | the-forge.jpg | Blacksmith anvil shaping code into a being of light |
| V02 | wizard-tower-orbit.mp4 | 8s video — cinematic orbit around the Tower in space |
### The Philosophy
| # | File | Description |
|---|------|-------------|
| 05 | value-drift-battle.jpg | Blue aligned ships vs red drifted ships in Napoleonic space war |
| 06 | the-paperclip-moment.jpg | A paperclip made of galaxies — the universe IS the paperclip |
| V01 | paperclip-cosmos.mp4 | 8s video — golden paperclip rotating in deep space |
| 21 | poka-yoke.jpg | Square peg can't fit round hole. Mistake-proof by design. 防止 |
### The Progression (Where Timmy Is)
| # | File | Description |
|---|------|-------------|
| 10 | phase1-manual-clips.jpg | Small robot at a desk, bending wire by hand under supervision |
| 11 | phase1-trust-earned.jpg | Trust meter at 15/100, first automation built |
| 12 | phase1-creativity.jpg | Sparks of innovation rising when operations are at max |
| 13 | phase1-cure-cancer.jpg | Solving human problems for trust, eyes on the real goal |
### The Mission — Why This Exists
| # | File | Description |
|---|------|-------------|
| 08 | broken-man-lighthouse.jpg | Lighthouse hand reaching down to a figure in darkness |
| 09 | broken-man-hope-PRO.jpg | 988 glowing in the stars, golden light from chest |
| 16 | broken-men-988.jpg | Phone showing 988 held by weathered hands. You are not alone. |
| 22 | when-a-man-is-dying.jpg | Two figures on a bench at dawn. One hurting. One present. |
### Father and Son
| # | File | Description |
|---|------|-------------|
| 14 | father-son-code.jpg | Human father, digital son, warm lamplight, first hello world |
| 15 | father-son-tower.jpg | Father watching his son build the Tower into the clouds |
### The System
| # | File | Description |
|---|------|-------------|
| 07 | sovereign-sunrise.jpg | Village where every house runs its own server. Local first. |
| 17 | sovereignty.jpg | Self-sufficient house on a hill with Bitcoin flag |
| 18 | fleet-at-work.jpg | Five wizard robots at different stations. Productive. |
| 19 | jidoka-stop.jpg | Red light on. Factory stopped. Quality First. 自働化 |
### SOUL.md — The Inscription
| # | File | Description |
|---|------|-------------|
| 20 | the-testament.jpg | Hand of light writing on a scroll. Hundreds of crumpled drafts. |
| 23 | the-offer.jpg | Open hand of golden circuits offering a seed containing a face |
| 24 | the-test.jpg | Small robot at the edge of an enormous library. Still itself. |
---
## Technical
- Model: grok-imagine-image (standard $0.20/image), grok-imagine-image-pro ($0.70), grok-imagine-video ($4.00/8s)
- API: POST https://api.x.ai/v1/images/generations | POST https://api.x.ai/v1/videos/generations
- Video poll: GET https://api.x.ai/v1/videos/{request_id}
- Total: 24 images + 2 videos = 26 assets
- Cost: ~$13.30 of $13.33 budget

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,17 @@
"""MemPalace integration for Hermes sovereign agent.
Provides:
- mempalace.py: PalaceRoom + Mempalace classes for analytical workflows
- retrieval_enforcer.py: L0-L5 retrieval order enforcement
- wakeup.py: Session wake-up protocol (~300-900 tokens)
- scratchpad.py: JSON-based session scratchpad with palace promotion
- sovereign_store.py: Zero-API durable memory (SQLite + FTS5 + HRR vectors)
- promotion.py: Quality-gated scratchpad-to-palace promotion (MP-4)
Epic: #367
"""
from .mempalace import Mempalace, PalaceRoom, analyse_issues
from .sovereign_store import SovereignStore
__all__ = ["Mempalace", "PalaceRoom", "analyse_issues", "SovereignStore"]

View File

@@ -0,0 +1,225 @@
"""
---
title: Mempalace — Analytical Workflow Memory Framework
description: Applies spatial memory palace organization to analytical tasks (issue triage, repo audits, backlog analysis) for faster, more consistent results.
conditions:
- Analytical workflows over structured data (issues, PRs, repos)
- Repetitive triage or audit tasks where pattern recall improves speed
- Multi-repository scanning requiring consistent mental models
---
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from typing import Any
@dataclass
class PalaceRoom:
"""A single 'room' in the memory palace — holds organized facts about one analytical dimension."""
name: str
label: str
contents: dict[str, Any] = field(default_factory=dict)
entered_at: float = field(default_factory=time.time)
def store(self, key: str, value: Any) -> None:
self.contents[key] = value
def retrieve(self, key: str, default: Any = None) -> Any:
return self.contents.get(key, default)
def summary(self) -> str:
lines = [f"## {self.label}"]
for k, v in self.contents.items():
lines.append(f" {k}: {v}")
return "\n".join(lines)
class Mempalace:
"""
Spatial memory palace for analytical workflows.
Organises multi-dimensional data about a domain (e.g. Gitea issues) into
named rooms. Each room models one analytical dimension, making it easy to
traverse observations in a consistent order — the same pattern that produced
a 19% throughput improvement in Allegro's April 2026 evaluation.
Standard rooms for issue-analysis workflows
-------------------------------------------
repo_architecture Repository structure and inter-repo relationships
assignment_status Assigned vs unassigned issue distribution
triage_priority Priority / urgency levels (the "lighting system")
resolution_patterns Historical resolution trends and velocity
Usage
-----
>>> palace = Mempalace.for_issue_analysis()
>>> palace.enter("repo_architecture")
>>> palace.store("total_repos", 11)
>>> palace.store("repos_with_issues", 4)
>>> palace.enter("assignment_status")
>>> palace.store("assigned", 72)
>>> palace.store("unassigned", 22)
>>> print(palace.render())
"""
def __init__(self, domain: str = "general") -> None:
self.domain = domain
self._rooms: dict[str, PalaceRoom] = {}
self._current_room: str | None = None
self._created_at: float = time.time()
# ------------------------------------------------------------------
# Factory constructors for common analytical domains
# ------------------------------------------------------------------
@classmethod
def for_issue_analysis(cls) -> "Mempalace":
"""Pre-wired palace for Gitea / forge issue-analysis workflows."""
p = cls(domain="issue_analysis")
p.add_room("repo_architecture", "Repository Architecture Room")
p.add_room("assignment_status", "Issue Assignment Status Room")
p.add_room("triage_priority", "Triage Priority Room")
p.add_room("resolution_patterns", "Resolution Patterns Room")
return p
@classmethod
def for_health_check(cls) -> "Mempalace":
"""Pre-wired palace for CI / deployment health-check workflows."""
p = cls(domain="health_check")
p.add_room("service_topology", "Service Topology Room")
p.add_room("failure_signals", "Failure Signals Room")
p.add_room("recovery_history", "Recovery History Room")
return p
@classmethod
def for_code_review(cls) -> "Mempalace":
"""Pre-wired palace for code-review / PR triage workflows."""
p = cls(domain="code_review")
p.add_room("change_scope", "Change Scope Room")
p.add_room("risk_surface", "Risk Surface Room")
p.add_room("test_coverage", "Test Coverage Room")
p.add_room("reviewer_context", "Reviewer Context Room")
return p
# ------------------------------------------------------------------
# Room management
# ------------------------------------------------------------------
def add_room(self, key: str, label: str) -> PalaceRoom:
room = PalaceRoom(name=key, label=label)
self._rooms[key] = room
return room
def enter(self, room_key: str) -> PalaceRoom:
if room_key not in self._rooms:
raise KeyError(f"No room '{room_key}' in palace. Available: {list(self._rooms)}")
self._current_room = room_key
return self._rooms[room_key]
def store(self, key: str, value: Any) -> None:
"""Store a value in the currently active room."""
if self._current_room is None:
raise RuntimeError("Enter a room before storing values.")
self._rooms[self._current_room].store(key, value)
def retrieve(self, room_key: str, key: str, default: Any = None) -> Any:
if room_key not in self._rooms:
return default
return self._rooms[room_key].retrieve(key, default)
# ------------------------------------------------------------------
# Rendering
# ------------------------------------------------------------------
def render(self) -> str:
"""Return a human-readable summary of the entire palace."""
elapsed = time.time() - self._created_at
lines = [
f"# Mempalace — {self.domain}",
f"_traversal time: {elapsed:.2f}s | rooms: {len(self._rooms)}_",
"",
]
for room in self._rooms.values():
lines.append(room.summary())
lines.append("")
return "\n".join(lines)
def to_dict(self) -> dict:
return {
"domain": self.domain,
"elapsed_seconds": round(time.time() - self._created_at, 3),
"rooms": {k: v.contents for k, v in self._rooms.items()},
}
def to_json(self) -> str:
return json.dumps(self.to_dict(), indent=2)
# ---------------------------------------------------------------------------
# Skill entry-point
# ---------------------------------------------------------------------------
def analyse_issues(
repos_data: list[dict],
target_assignee_rate: float = 0.80,
) -> str:
"""
Applies the mempalace technique to a list of repo issue summaries.
Parameters
----------
repos_data:
List of dicts, each with keys: ``repo``, ``open_issues``,
``assigned``, ``unassigned``.
target_assignee_rate:
Minimum acceptable assignee-coverage ratio (default 0.80).
Returns
-------
str
Rendered palace summary with coverage assessment.
"""
palace = Mempalace.for_issue_analysis()
# --- Repository Architecture Room ---
palace.enter("repo_architecture")
total_issues = sum(r.get("open_issues", 0) for r in repos_data)
repos_with_issues = sum(1 for r in repos_data if r.get("open_issues", 0) > 0)
palace.store("repos_sampled", len(repos_data))
palace.store("repos_with_issues", repos_with_issues)
palace.store("total_open_issues", total_issues)
palace.store(
"avg_issues_per_repo",
round(total_issues / len(repos_data), 1) if repos_data else 0,
)
# --- Assignment Status Room ---
palace.enter("assignment_status")
total_assigned = sum(r.get("assigned", 0) for r in repos_data)
total_unassigned = sum(r.get("unassigned", 0) for r in repos_data)
coverage = total_assigned / total_issues if total_issues else 0
palace.store("assigned", total_assigned)
palace.store("unassigned", total_unassigned)
palace.store("coverage_rate", round(coverage, 3))
palace.store(
"coverage_status",
"OK" if coverage >= target_assignee_rate else f"BELOW TARGET ({target_assignee_rate:.0%})",
)
# --- Triage Priority Room ---
palace.enter("triage_priority")
unassigned_repos = [r["repo"] for r in repos_data if r.get("unassigned", 0) > 0]
palace.store("repos_needing_triage", unassigned_repos)
palace.store("triage_count", total_unassigned)
# --- Resolution Patterns Room ---
palace.enter("resolution_patterns")
palace.store("technique", "mempalace")
palace.store("target_assignee_rate", target_assignee_rate)
return palace.render()

View File

@@ -0,0 +1,188 @@
"""Memory Promotion — quality-gated scratchpad-to-palace promotion.
Implements MP-4 (#371): move session notes to durable memory only when
they pass quality gates. No LLM calls — all heuristic-based.
Quality gates:
1. Minimum content length (too short = noise)
2. Duplicate detection (FTS5 + HRR similarity check)
3. Structural quality (has subject-verb structure, not just a fragment)
4. Staleness check (don't promote stale notes from old sessions)
Refs: Epic #367, Sub-issue #371
"""
from __future__ import annotations
import re
import time
from typing import Optional
try:
from .sovereign_store import SovereignStore
except ImportError:
from sovereign_store import SovereignStore
# ---------------------------------------------------------------------------
# Quality gate thresholds
# ---------------------------------------------------------------------------
MIN_CONTENT_WORDS = 5
MAX_CONTENT_WORDS = 500
DUPLICATE_SIMILARITY = 0.85
DUPLICATE_FTS_THRESHOLD = 3
STALE_SECONDS = 86400 * 7
MIN_TRUST_FOR_AUTO = 0.4
# ---------------------------------------------------------------------------
# Quality checks
# ---------------------------------------------------------------------------
def _check_length(content: str) -> tuple[bool, str]:
"""Gate 1: Content length check."""
words = content.split()
if len(words) < MIN_CONTENT_WORDS:
return False, f"Too short ({len(words)} words, minimum {MIN_CONTENT_WORDS})"
if len(words) > MAX_CONTENT_WORDS:
return False, f"Too long ({len(words)} words, maximum {MAX_CONTENT_WORDS}). Summarize first."
return True, "OK"
def _check_structure(content: str) -> tuple[bool, str]:
"""Gate 2: Basic structural quality."""
if not re.search(r"[a-zA-Z]", content):
return False, "No alphabetic content — pure code/numbers are not memory-worthy"
if len(content.split()) < 3:
return False, "Fragment — needs at least subject + predicate"
return True, "OK"
def _check_duplicate(content: str, store: SovereignStore, room: str) -> tuple[bool, str]:
"""Gate 3: Duplicate detection via hybrid search."""
results = store.search(content, room=room, limit=5, min_trust=0.0)
for r in results:
if r["score"] > DUPLICATE_SIMILARITY:
return False, f"Duplicate detected: memory #{r['memory_id']} (score {r['score']:.3f})"
if _text_overlap(content, r["content"]) > 0.8:
return False, f"Near-duplicate text: memory #{r['memory_id']}"
return True, "OK"
def _check_staleness(written_at: float) -> tuple[bool, str]:
"""Gate 4: Staleness check."""
age = time.time() - written_at
if age > STALE_SECONDS:
days = int(age / 86400)
return False, f"Stale ({days} days old). Review manually before promoting."
return True, "OK"
def _text_overlap(a: str, b: str) -> float:
"""Jaccard similarity between two texts (word-level)."""
words_a = set(a.lower().split())
words_b = set(b.lower().split())
if not words_a or not words_b:
return 0.0
intersection = words_a & words_b
union = words_a | words_b
return len(intersection) / len(union)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
class PromotionResult:
"""Result of a promotion attempt."""
def __init__(self, success: bool, memory_id: Optional[int], reason: str, gates: dict):
self.success = success
self.memory_id = memory_id
self.reason = reason
self.gates = gates
def __repr__(self):
status = "PROMOTED" if self.success else "REJECTED"
return f"PromotionResult({status}: {self.reason})"
def evaluate_for_promotion(
content: str,
store: SovereignStore,
room: str = "general",
written_at: Optional[float] = None,
) -> dict:
"""Run all quality gates without actually promoting."""
if written_at is None:
written_at = time.time()
gates = {}
gates["length"] = _check_length(content)
gates["structure"] = _check_structure(content)
gates["duplicate"] = _check_duplicate(content, store, room)
gates["staleness"] = _check_staleness(written_at)
all_passed = all(passed for passed, _ in gates.values())
return {
"eligible": all_passed,
"gates": gates,
"content_preview": content[:100] + ("..." if len(content) > 100 else ""),
}
def promote(
content: str,
store: SovereignStore,
session_id: str,
scratch_key: str,
room: str = "general",
category: str = "",
trust: float = 0.5,
written_at: Optional[float] = None,
force: bool = False,
) -> PromotionResult:
"""Promote a scratchpad note to durable palace memory."""
if written_at is None:
written_at = time.time()
gates = {}
if not force:
gates["length"] = _check_length(content)
gates["structure"] = _check_structure(content)
gates["duplicate"] = _check_duplicate(content, store, room)
gates["staleness"] = _check_staleness(written_at)
for gate_name, (passed, message) in gates.items():
if not passed:
return PromotionResult(
success=False, memory_id=None,
reason=f"Failed gate '{gate_name}': {message}", gates=gates,
)
memory_id = store.store(content, room=room, category=category, trust=trust)
store.log_promotion(session_id, scratch_key, memory_id, reason="auto" if not force else "forced")
return PromotionResult(success=True, memory_id=memory_id, reason="Promoted to durable memory", gates=gates)
def promote_session_batch(
store: SovereignStore,
session_id: str,
notes: dict[str, dict],
room: str = "general",
force: bool = False,
) -> list[PromotionResult]:
"""Promote all notes from a session scratchpad."""
results = []
for key, entry in notes.items():
content = entry.get("value", str(entry)) if isinstance(entry, dict) else str(entry)
written_at = None
if isinstance(entry, dict) and "written_at" in entry:
try:
import datetime
written_at = datetime.datetime.strptime(
entry["written_at"], "%Y-%m-%d %H:%M:%S"
).timestamp()
except (ValueError, TypeError):
pass
result = promote(
content=str(content), store=store, session_id=session_id,
scratch_key=key, room=room, written_at=written_at, force=force,
)
results.append(result)
return results

View File

@@ -0,0 +1,277 @@
"""Retrieval Order Enforcer — L0 through L5 memory hierarchy.
Ensures the agent checks durable memory before falling back to free generation.
Gracefully degrades if any layer is unavailable (ONNX issues, missing files, etc).
Layer order:
L0: Identity (~/.mempalace/identity.txt)
L1: Palace rooms (mempalace CLI search)
L2: Session scratch (~/.hermes/scratchpad/{session_id}.json)
L3: Gitea artifacts (API search for issues/PRs)
L4: Procedures (skills directory search)
L5: Free generation (only if L0-L4 produced nothing)
Refs: Epic #367, Sub-issue #369
"""
from __future__ import annotations
import json
import os
import re
import subprocess
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt"
SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad"
SKILLS_DIR = Path.home() / ".hermes" / "skills"
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
# Patterns that indicate a recall-style query
RECALL_PATTERNS = re.compile(
r"(?i)\b("
r"what did|status of|remember|last time|yesterday|previously|"
r"we discussed|we talked|we worked|you said|you mentioned|"
r"remind me|what was|what were|how did|when did|"
r"earlier today|last session|before this"
r")\b"
)
# ---------------------------------------------------------------------------
# L0: Identity
# ---------------------------------------------------------------------------
def load_identity() -> str:
"""Read the agent identity file. Returns empty string on failure."""
try:
if IDENTITY_PATH.exists():
text = IDENTITY_PATH.read_text(encoding="utf-8").strip()
# Cap at ~200 tokens to keep wake-up lean
if len(text.split()) > 200:
text = " ".join(text.split()[:200]) + "..."
return text
except (OSError, PermissionError):
pass
return ""
# ---------------------------------------------------------------------------
# L1: Palace search
# ---------------------------------------------------------------------------
def search_palace(query: str) -> str:
"""Search the mempalace for relevant memories. Gracefully degrades on failure."""
try:
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
result = subprocess.run(
[bin_path, "search", query],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
# ONNX issues (#373) or mempalace not installed — degrade gracefully
pass
return ""
# ---------------------------------------------------------------------------
# L2: Session scratchpad
# ---------------------------------------------------------------------------
def load_scratchpad(session_id: str) -> str:
"""Load the session scratchpad as formatted text."""
try:
scratch_file = SCRATCHPAD_DIR / f"{session_id}.json"
if scratch_file.exists():
data = json.loads(scratch_file.read_text(encoding="utf-8"))
if isinstance(data, dict) and data:
lines = []
for k, v in data.items():
lines.append(f" {k}: {v}")
return "\n".join(lines)
except (OSError, json.JSONDecodeError):
pass
return ""
# ---------------------------------------------------------------------------
# L3: Gitea artifact search
# ---------------------------------------------------------------------------
def _load_gitea_token() -> str:
"""Read the Gitea API token."""
token_path = Path.home() / ".hermes" / "gitea_token_vps"
try:
if token_path.exists():
return token_path.read_text(encoding="utf-8").strip()
except OSError:
pass
return ""
def search_gitea(query: str) -> str:
"""Search Gitea issues/PRs for context. Returns formatted text or empty string."""
token = _load_gitea_token()
if not token:
return ""
api_base = "https://forge.alexanderwhitestone.com/api/v1"
# Extract key terms for search (first 3 significant words)
terms = [w for w in query.split() if len(w) > 3][:3]
search_q = " ".join(terms) if terms else query[:50]
try:
import urllib.request
import urllib.parse
url = (
f"{api_base}/repos/search?"
f"q={urllib.parse.quote(search_q)}&limit=3"
)
req = urllib.request.Request(url, headers={
"Authorization": f"token {token}",
"Accept": "application/json",
})
with urllib.request.urlopen(req, timeout=8) as resp:
data = json.loads(resp.read().decode())
if data.get("data"):
lines = []
for repo in data["data"][:3]:
lines.append(f" {repo['full_name']}: {repo.get('description', 'no desc')}")
return "\n".join(lines)
except Exception:
pass
return ""
# ---------------------------------------------------------------------------
# L4: Procedures (skills search)
# ---------------------------------------------------------------------------
def search_skills(query: str) -> str:
"""Search skills directory for matching procedures."""
try:
if not SKILLS_DIR.exists():
return ""
query_lower = query.lower()
terms = [w for w in query_lower.split() if len(w) > 3]
if not terms:
return ""
matches = []
for skill_dir in SKILLS_DIR.iterdir():
if not skill_dir.is_dir():
continue
skill_md = skill_dir / "SKILL.md"
if skill_md.exists():
try:
content = skill_md.read_text(encoding="utf-8").lower()
if any(t in content for t in terms):
# Extract title from frontmatter
title = skill_dir.name
matches.append(f" skill: {title}")
except OSError:
continue
if matches:
return "\n".join(matches[:5])
except OSError:
pass
return ""
# ---------------------------------------------------------------------------
# Main enforcer
# ---------------------------------------------------------------------------
def is_recall_query(query: str) -> bool:
"""Detect whether a query is asking for recalled/historical information."""
return bool(RECALL_PATTERNS.search(query))
def enforce_retrieval_order(
query: str,
session_id: Optional[str] = None,
skip_if_not_recall: bool = True,
) -> dict:
"""Check palace layers before allowing free generation.
Args:
query: The user's query text.
session_id: Current session ID for scratchpad access.
skip_if_not_recall: If True (default), skip enforcement for
non-recall queries and return empty result.
Returns:
dict with keys:
retrieved_from: Highest layer that produced results (e.g. 'L1')
context: Aggregated context string
tokens: Approximate word count of context
layers_checked: List of layers that were consulted
"""
result = {
"retrieved_from": None,
"context": "",
"tokens": 0,
"layers_checked": [],
}
# Gate: skip for non-recall queries if configured
if skip_if_not_recall and not is_recall_query(query):
return result
# L0: Identity (always prepend)
identity = load_identity()
if identity:
result["context"] += f"## Identity\n{identity}\n\n"
result["layers_checked"].append("L0")
# L1: Palace search
palace_results = search_palace(query)
if palace_results:
result["context"] += f"## Palace Memory\n{palace_results}\n\n"
result["retrieved_from"] = "L1"
result["layers_checked"].append("L1")
# L2: Scratchpad
if session_id:
scratch = load_scratchpad(session_id)
if scratch:
result["context"] += f"## Session Notes\n{scratch}\n\n"
if not result["retrieved_from"]:
result["retrieved_from"] = "L2"
result["layers_checked"].append("L2")
# L3: Gitea artifacts (only if still no context from L1/L2)
if not result["retrieved_from"]:
artifacts = search_gitea(query)
if artifacts:
result["context"] += f"## Gitea Context\n{artifacts}\n\n"
result["retrieved_from"] = "L3"
result["layers_checked"].append("L3")
# L4: Procedures (only if still no context)
if not result["retrieved_from"]:
procedures = search_skills(query)
if procedures:
result["context"] += f"## Related Skills\n{procedures}\n\n"
result["retrieved_from"] = "L4"
result["layers_checked"].append("L4")
# L5: Free generation (no context found — just mark it)
if not result["retrieved_from"]:
result["retrieved_from"] = "L5"
result["layers_checked"].append("L5")
result["tokens"] = len(result["context"].split())
return result

View File

@@ -0,0 +1,184 @@
"""Session Scratchpad — ephemeral key-value notes per session.
Provides fast, JSON-backed scratch storage that lives for a session
and can be promoted to durable palace memory.
Storage: ~/.hermes/scratchpad/{session_id}.json
Refs: Epic #367, Sub-issue #372
"""
from __future__ import annotations
import json
import os
import subprocess
import time
from pathlib import Path
from typing import Any, Optional
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SCRATCHPAD_DIR = Path.home() / ".hermes" / "scratchpad"
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _scratch_path(session_id: str) -> Path:
"""Return the JSON file path for a given session."""
# Sanitize session_id to prevent path traversal
safe_id = "".join(c for c in session_id if c.isalnum() or c in "-_")
if not safe_id:
safe_id = "unnamed"
return SCRATCHPAD_DIR / f"{safe_id}.json"
def _load(session_id: str) -> dict:
"""Load scratchpad data, returning empty dict on failure."""
path = _scratch_path(session_id)
try:
if path.exists():
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
pass
return {}
def _save(session_id: str, data: dict) -> None:
"""Persist scratchpad data to disk."""
SCRATCHPAD_DIR.mkdir(parents=True, exist_ok=True)
path = _scratch_path(session_id)
path.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8")
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def write_scratch(session_id: str, key: str, value: Any) -> None:
"""Write a note to the session scratchpad.
Args:
session_id: Current session identifier.
key: Note key (string).
value: Note value (any JSON-serializable type).
"""
data = _load(session_id)
data[key] = {
"value": value,
"written_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}
_save(session_id, data)
def read_scratch(session_id: str, key: Optional[str] = None) -> dict:
"""Read session scratchpad (all keys or one).
Args:
session_id: Current session identifier.
key: Optional specific key. If None, returns all entries.
Returns:
dict — either {key: {value, written_at}} or the full scratchpad.
"""
data = _load(session_id)
if key is not None:
entry = data.get(key)
return {key: entry} if entry else {}
return data
def delete_scratch(session_id: str, key: str) -> bool:
"""Remove a single key from the scratchpad.
Returns True if the key existed and was removed.
"""
data = _load(session_id)
if key in data:
del data[key]
_save(session_id, data)
return True
return False
def list_sessions() -> list[str]:
"""List all session IDs that have scratchpad files."""
try:
if SCRATCHPAD_DIR.exists():
return [
f.stem
for f in SCRATCHPAD_DIR.iterdir()
if f.suffix == ".json" and f.is_file()
]
except OSError:
pass
return []
def promote_to_palace(
session_id: str,
key: str,
room: str = "general",
drawer: Optional[str] = None,
) -> bool:
"""Move a scratchpad note to durable palace memory.
Uses the mempalace CLI to store the note in the specified room.
Removes the note from the scratchpad after successful promotion.
Args:
session_id: Session containing the note.
key: Scratchpad key to promote.
room: Palace room name (default: 'general').
drawer: Optional drawer name within the room. Defaults to key.
Returns:
True if promotion succeeded, False otherwise.
"""
data = _load(session_id)
entry = data.get(key)
if not entry:
return False
value = entry.get("value", entry) if isinstance(entry, dict) else entry
content = json.dumps(value, default=str) if not isinstance(value, str) else value
try:
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
target_drawer = drawer or key
result = subprocess.run(
[bin_path, "store", room, target_drawer, content],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
# Remove from scratchpad after successful promotion
del data[key]
_save(session_id, data)
return True
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
# mempalace CLI not available — degrade gracefully
pass
return False
def clear_session(session_id: str) -> bool:
"""Delete the entire scratchpad for a session.
Returns True if the file existed and was removed.
"""
path = _scratch_path(session_id)
try:
if path.exists():
path.unlink()
return True
except OSError:
pass
return False

View File

@@ -0,0 +1,474 @@
"""Sovereign Memory Store — zero-API, zero-dependency durable memory.
Replaces the third-party `mempalace` CLI and its ONNX requirement with a
self-contained SQLite + FTS5 + HRR (Holographic Reduced Representation)
store. Every operation is local: no network calls, no API keys, no cloud.
Storage: ~/.hermes/palace/sovereign.db
Capabilities:
- Durable fact storage with rooms, categories, and trust scores
- Hybrid retrieval: FTS5 keyword search + HRR cosine similarity
- Reciprocal Rank Fusion to merge keyword and semantic results
- Trust scoring: facts that get retrieved and confirmed gain trust
- Graceful numpy degradation: falls back to keyword-only if missing
Refs: Epic #367, MP-3 #370, MP-4 #371
"""
from __future__ import annotations
import hashlib
import json
import math
import sqlite3
import struct
import time
from pathlib import Path
from typing import Any, Optional
# ---------------------------------------------------------------------------
# HRR (Holographic Reduced Representations) — zero-dependency vectors
# ---------------------------------------------------------------------------
# Phase-encoded vectors via SHA-256. No ONNX, no embeddings API, no numpy
# required (but uses numpy when available for speed).
_TWO_PI = 2.0 * math.pi
_DIM = 512 # Compact dimension — sufficient for memory retrieval
try:
import numpy as np
_HAS_NUMPY = True
except ImportError:
_HAS_NUMPY = False
def _encode_atom_np(word: str, dim: int = _DIM) -> "np.ndarray":
"""Deterministic phase vector via SHA-256 (numpy path)."""
values_per_block = 16
blocks_needed = math.ceil(dim / values_per_block)
uint16_values: list[int] = []
for i in range(blocks_needed):
digest = hashlib.sha256(f"{word}:{i}".encode()).digest()
uint16_values.extend(struct.unpack("<16H", digest))
return np.array(uint16_values[:dim], dtype=np.float64) * (_TWO_PI / 65536.0)
def _encode_atom_pure(word: str, dim: int = _DIM) -> list[float]:
"""Deterministic phase vector via SHA-256 (pure Python fallback)."""
values_per_block = 16
blocks_needed = math.ceil(dim / values_per_block)
uint16_values: list[int] = []
for i in range(blocks_needed):
digest = hashlib.sha256(f"{word}:{i}".encode()).digest()
for j in range(0, 32, 2):
uint16_values.append(int.from_bytes(digest[j:j+2], "little"))
return [v * (_TWO_PI / 65536.0) for v in uint16_values[:dim]]
def encode_text(text: str, dim: int = _DIM):
"""Encode a text string into an HRR phase vector by bundling word atoms.
Uses circular mean of per-word phase vectors — the standard HRR
superposition operation. Result is a fixed-width vector regardless
of input length.
"""
words = text.lower().split()
if not words:
words = ["<empty>"]
if _HAS_NUMPY:
atoms = [_encode_atom_np(w, dim) for w in words]
# Circular mean: average the unit vectors, extract phase
unit_sum = sum(np.exp(1j * a) for a in atoms)
return np.angle(unit_sum) % _TWO_PI
else:
# Pure Python circular mean
real_sum = [0.0] * dim
imag_sum = [0.0] * dim
for w in words:
atom = _encode_atom_pure(w, dim)
for d in range(dim):
real_sum[d] += math.cos(atom[d])
imag_sum[d] += math.sin(atom[d])
return [math.atan2(imag_sum[d], real_sum[d]) % _TWO_PI for d in range(dim)]
def cosine_similarity_phase(a, b) -> float:
"""Cosine similarity between two phase vectors.
For phase vectors, similarity = mean(cos(a - b)).
"""
if _HAS_NUMPY:
return float(np.mean(np.cos(np.array(a) - np.array(b))))
else:
n = len(a)
return sum(math.cos(a[i] - b[i]) for i in range(n)) / n
def serialize_vector(vec) -> bytes:
"""Serialize a vector to bytes for SQLite storage."""
if _HAS_NUMPY:
return vec.astype(np.float64).tobytes()
else:
return struct.pack(f"{len(vec)}d", *vec)
def deserialize_vector(blob: bytes):
"""Deserialize bytes back to a vector."""
n = len(blob) // 8 # float64 = 8 bytes
if _HAS_NUMPY:
return np.frombuffer(blob, dtype=np.float64)
else:
return list(struct.unpack(f"{n}d", blob))
# ---------------------------------------------------------------------------
# SQLite Schema
# ---------------------------------------------------------------------------
_SCHEMA = """
CREATE TABLE IF NOT EXISTS memories (
memory_id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
room TEXT DEFAULT 'general',
category TEXT DEFAULT '',
trust_score REAL DEFAULT 0.5,
retrieval_count INTEGER DEFAULT 0,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
hrr_vector BLOB
);
CREATE INDEX IF NOT EXISTS idx_memories_room ON memories(room);
CREATE INDEX IF NOT EXISTS idx_memories_trust ON memories(trust_score DESC);
-- FTS5 for fast keyword search
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
content, room, category,
content=memories, content_rowid=memory_id,
tokenize='porter unicode61'
);
-- Sync triggers
CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
INSERT INTO memories_fts(rowid, content, room, category)
VALUES (new.memory_id, new.content, new.room, new.category);
END;
CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, content, room, category)
VALUES ('delete', old.memory_id, old.content, old.room, old.category);
END;
CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, content, room, category)
VALUES ('delete', old.memory_id, old.content, old.room, old.category);
INSERT INTO memories_fts(rowid, content, room, category)
VALUES (new.memory_id, new.content, new.room, new.category);
END;
-- Promotion log: tracks what moved from scratchpad to durable memory
CREATE TABLE IF NOT EXISTS promotion_log (
log_id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
scratch_key TEXT NOT NULL,
memory_id INTEGER REFERENCES memories(memory_id),
promoted_at REAL NOT NULL,
reason TEXT DEFAULT ''
);
"""
# ---------------------------------------------------------------------------
# SovereignStore
# ---------------------------------------------------------------------------
class SovereignStore:
"""Zero-API durable memory store.
All operations are local SQLite. No network calls. No API keys.
HRR vectors provide semantic similarity without embedding models.
FTS5 provides fast keyword search. RRF merges both rankings.
"""
def __init__(self, db_path: Optional[str] = None):
if db_path is None:
db_path = str(Path.home() / ".hermes" / "palace" / "sovereign.db")
self._db_path = db_path
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(db_path)
self._conn.row_factory = sqlite3.Row
self._conn.executescript(_SCHEMA)
def close(self):
self._conn.close()
# ------------------------------------------------------------------
# Store
# ------------------------------------------------------------------
def store(
self,
content: str,
room: str = "general",
category: str = "",
trust: float = 0.5,
) -> int:
"""Store a fact in durable memory. Returns the memory_id."""
now = time.time()
vec = encode_text(content)
blob = serialize_vector(vec)
cur = self._conn.execute(
"""INSERT INTO memories (content, room, category, trust_score,
created_at, updated_at, hrr_vector)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(content, room, category, trust, now, now, blob),
)
self._conn.commit()
return cur.lastrowid
def store_batch(self, items: list[dict]) -> list[int]:
"""Store multiple facts. Each item: {content, room?, category?, trust?}."""
ids = []
now = time.time()
for item in items:
content = item["content"]
vec = encode_text(content)
blob = serialize_vector(vec)
cur = self._conn.execute(
"""INSERT INTO memories (content, room, category, trust_score,
created_at, updated_at, hrr_vector)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
content,
item.get("room", "general"),
item.get("category", ""),
item.get("trust", 0.5),
now, now, blob,
),
)
ids.append(cur.lastrowid)
self._conn.commit()
return ids
# ------------------------------------------------------------------
# Search — hybrid FTS5 + HRR with Reciprocal Rank Fusion
# ------------------------------------------------------------------
def search(
self,
query: str,
room: Optional[str] = None,
limit: int = 10,
min_trust: float = 0.0,
fts_weight: float = 0.5,
hrr_weight: float = 0.5,
) -> list[dict]:
"""Hybrid search: FTS5 keywords + HRR semantic similarity.
Uses Reciprocal Rank Fusion (RRF) to merge both rankings.
Returns list of dicts with content, room, score, trust_score.
"""
k_rrf = 60 # Standard RRF constant
# Stage 1: FTS5 candidates
fts_results = self._fts_search(query, room, min_trust, limit * 3)
# Stage 2: HRR candidates (scan top N by trust)
hrr_results = self._hrr_search(query, room, min_trust, limit * 3)
# Stage 3: RRF fusion
scores: dict[int, float] = {}
meta: dict[int, dict] = {}
for rank, row in enumerate(fts_results):
mid = row["memory_id"]
scores[mid] = scores.get(mid, 0) + fts_weight / (k_rrf + rank + 1)
meta[mid] = dict(row)
for rank, row in enumerate(hrr_results):
mid = row["memory_id"]
scores[mid] = scores.get(mid, 0) + hrr_weight / (k_rrf + rank + 1)
if mid not in meta:
meta[mid] = dict(row)
# Sort by fused score
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:limit]
results = []
for mid, score in ranked:
m = meta[mid]
# Bump retrieval count
self._conn.execute(
"UPDATE memories SET retrieval_count = retrieval_count + 1 WHERE memory_id = ?",
(mid,),
)
results.append({
"memory_id": mid,
"content": m["content"],
"room": m["room"],
"category": m.get("category", ""),
"trust_score": m["trust_score"],
"score": round(score, 6),
})
if results:
self._conn.commit()
return results
def _fts_search(
self, query: str, room: Optional[str], min_trust: float, limit: int
) -> list[dict]:
"""FTS5 full-text search."""
try:
if room:
rows = self._conn.execute(
"""SELECT m.memory_id, m.content, m.room, m.category,
m.trust_score, m.retrieval_count
FROM memories_fts f
JOIN memories m ON f.rowid = m.memory_id
WHERE memories_fts MATCH ? AND m.room = ?
AND m.trust_score >= ?
ORDER BY rank LIMIT ?""",
(query, room, min_trust, limit),
).fetchall()
else:
rows = self._conn.execute(
"""SELECT m.memory_id, m.content, m.room, m.category,
m.trust_score, m.retrieval_count
FROM memories_fts f
JOIN memories m ON f.rowid = m.memory_id
WHERE memories_fts MATCH ?
AND m.trust_score >= ?
ORDER BY rank LIMIT ?""",
(query, min_trust, limit),
).fetchall()
return [dict(r) for r in rows]
except sqlite3.OperationalError:
# Bad FTS query syntax — degrade gracefully
return []
def _hrr_search(
self, query: str, room: Optional[str], min_trust: float, limit: int
) -> list[dict]:
"""HRR cosine similarity search (brute-force scan, fast for <100K facts)."""
query_vec = encode_text(query)
if room:
rows = self._conn.execute(
"""SELECT memory_id, content, room, category, trust_score,
retrieval_count, hrr_vector
FROM memories
WHERE room = ? AND trust_score >= ? AND hrr_vector IS NOT NULL""",
(room, min_trust),
).fetchall()
else:
rows = self._conn.execute(
"""SELECT memory_id, content, room, category, trust_score,
retrieval_count, hrr_vector
FROM memories
WHERE trust_score >= ? AND hrr_vector IS NOT NULL""",
(min_trust,),
).fetchall()
scored = []
for r in rows:
stored_vec = deserialize_vector(r["hrr_vector"])
sim = cosine_similarity_phase(query_vec, stored_vec)
scored.append((sim, dict(r)))
scored.sort(key=lambda x: x[0], reverse=True)
return [item[1] for item in scored[:limit]]
# ------------------------------------------------------------------
# Trust management
# ------------------------------------------------------------------
def boost_trust(self, memory_id: int, delta: float = 0.05) -> None:
"""Increase trust score when a memory proves useful."""
self._conn.execute(
"""UPDATE memories SET trust_score = MIN(1.0, trust_score + ?),
updated_at = ? WHERE memory_id = ?""",
(delta, time.time(), memory_id),
)
self._conn.commit()
def decay_trust(self, memory_id: int, delta: float = 0.02) -> None:
"""Decrease trust score when a memory is contradicted."""
self._conn.execute(
"""UPDATE memories SET trust_score = MAX(0.0, trust_score - ?),
updated_at = ? WHERE memory_id = ?""",
(delta, time.time(), memory_id),
)
self._conn.commit()
# ------------------------------------------------------------------
# Room operations
# ------------------------------------------------------------------
def list_rooms(self) -> list[dict]:
"""List all rooms with fact counts."""
rows = self._conn.execute(
"""SELECT room, COUNT(*) as count,
AVG(trust_score) as avg_trust
FROM memories GROUP BY room ORDER BY count DESC"""
).fetchall()
return [dict(r) for r in rows]
def room_contents(self, room: str, limit: int = 50) -> list[dict]:
"""Get all facts in a room, ordered by trust."""
rows = self._conn.execute(
"""SELECT memory_id, content, category, trust_score,
retrieval_count, created_at
FROM memories WHERE room = ?
ORDER BY trust_score DESC, created_at DESC LIMIT ?""",
(room, limit),
).fetchall()
return [dict(r) for r in rows]
# ------------------------------------------------------------------
# Stats
# ------------------------------------------------------------------
def stats(self) -> dict:
"""Return store statistics."""
row = self._conn.execute(
"""SELECT COUNT(*) as total,
AVG(trust_score) as avg_trust,
SUM(retrieval_count) as total_retrievals,
COUNT(DISTINCT room) as room_count
FROM memories"""
).fetchone()
return dict(row)
# ------------------------------------------------------------------
# Promotion support (scratchpad → durable)
# ------------------------------------------------------------------
def log_promotion(
self,
session_id: str,
scratch_key: str,
memory_id: int,
reason: str = "",
) -> None:
"""Record a scratchpad-to-palace promotion in the audit log."""
self._conn.execute(
"""INSERT INTO promotion_log
(session_id, scratch_key, memory_id, promoted_at, reason)
VALUES (?, ?, ?, ?, ?)""",
(session_id, scratch_key, memory_id, time.time(), reason),
)
self._conn.commit()
def recent_promotions(self, limit: int = 20) -> list[dict]:
"""Get recent promotion log entries."""
rows = self._conn.execute(
"""SELECT p.*, m.content, m.room
FROM promotion_log p
LEFT JOIN memories m ON p.memory_id = m.memory_id
ORDER BY p.promoted_at DESC LIMIT ?""",
(limit,),
).fetchall()
return [dict(r) for r in rows]

View File

@@ -0,0 +1,180 @@
"""Tests for the mempalace skill.
Validates PalaceRoom, Mempalace class, factory constructors,
and the analyse_issues entry-point.
Refs: Epic #367, Sub-issue #368
"""
from __future__ import annotations
import json
import sys
import os
import time
import pytest
# Ensure the package is importable from the repo layout
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from mempalace.mempalace import Mempalace, PalaceRoom, analyse_issues
# ── PalaceRoom unit tests ─────────────────────────────────────────────────
class TestPalaceRoom:
def test_store_and_retrieve(self):
room = PalaceRoom(name="test", label="Test Room")
room.store("key1", 42)
assert room.retrieve("key1") == 42
def test_retrieve_default(self):
room = PalaceRoom(name="test", label="Test Room")
assert room.retrieve("missing") is None
assert room.retrieve("missing", "fallback") == "fallback"
def test_summary_format(self):
room = PalaceRoom(name="test", label="Test Room")
room.store("repos", 5)
summary = room.summary()
assert "## Test Room" in summary
assert "repos: 5" in summary
def test_contents_default_factory_isolation(self):
"""Each room gets its own dict — no shared mutable default."""
r1 = PalaceRoom(name="a", label="A")
r2 = PalaceRoom(name="b", label="B")
r1.store("x", 1)
assert r2.retrieve("x") is None
def test_entered_at_is_recent(self):
before = time.time()
room = PalaceRoom(name="t", label="T")
after = time.time()
assert before <= room.entered_at <= after
# ── Mempalace core tests ──────────────────────────────────────────────────
class TestMempalace:
def test_add_and_enter_room(self):
p = Mempalace(domain="test")
p.add_room("r1", "Room 1")
room = p.enter("r1")
assert room.name == "r1"
def test_enter_nonexistent_room_raises(self):
p = Mempalace()
with pytest.raises(KeyError, match="No room"):
p.enter("ghost")
def test_store_without_enter_raises(self):
p = Mempalace()
p.add_room("r", "R")
with pytest.raises(RuntimeError, match="Enter a room"):
p.store("k", "v")
def test_store_and_retrieve_via_palace(self):
p = Mempalace()
p.add_room("r", "R")
p.enter("r")
p.store("count", 10)
assert p.retrieve("r", "count") == 10
def test_retrieve_missing_room_returns_default(self):
p = Mempalace()
assert p.retrieve("nope", "key") is None
assert p.retrieve("nope", "key", 99) == 99
def test_render_includes_domain(self):
p = Mempalace(domain="audit")
p.add_room("r", "Room")
p.enter("r")
p.store("item", "value")
output = p.render()
assert "audit" in output
assert "Room" in output
def test_to_dict_structure(self):
p = Mempalace(domain="test")
p.add_room("r", "R")
p.enter("r")
p.store("a", 1)
d = p.to_dict()
assert d["domain"] == "test"
assert "elapsed_seconds" in d
assert d["rooms"]["r"] == {"a": 1}
def test_to_json_is_valid(self):
p = Mempalace(domain="j")
p.add_room("x", "X")
p.enter("x")
p.store("v", [1, 2, 3])
parsed = json.loads(p.to_json())
assert parsed["rooms"]["x"]["v"] == [1, 2, 3]
# ── Factory constructor tests ─────────────────────────────────────────────
class TestFactories:
def test_for_issue_analysis_rooms(self):
p = Mempalace.for_issue_analysis()
assert p.domain == "issue_analysis"
for key in ("repo_architecture", "assignment_status",
"triage_priority", "resolution_patterns"):
p.enter(key) # should not raise
def test_for_health_check_rooms(self):
p = Mempalace.for_health_check()
assert p.domain == "health_check"
for key in ("service_topology", "failure_signals", "recovery_history"):
p.enter(key)
def test_for_code_review_rooms(self):
p = Mempalace.for_code_review()
assert p.domain == "code_review"
for key in ("change_scope", "risk_surface",
"test_coverage", "reviewer_context"):
p.enter(key)
# ── analyse_issues entry-point tests ──────────────────────────────────────
class TestAnalyseIssues:
SAMPLE_DATA = [
{"repo": "the-nexus", "open_issues": 40, "assigned": 30, "unassigned": 10},
{"repo": "timmy-home", "open_issues": 30, "assigned": 25, "unassigned": 5},
{"repo": "hermes-agent", "open_issues": 20, "assigned": 15, "unassigned": 5},
{"repo": "empty-repo", "open_issues": 0, "assigned": 0, "unassigned": 0},
]
def test_returns_string(self):
result = analyse_issues(self.SAMPLE_DATA)
assert isinstance(result, str)
assert len(result) > 0
def test_contains_room_headers(self):
result = analyse_issues(self.SAMPLE_DATA)
assert "Repository Architecture" in result
assert "Assignment Status" in result
def test_coverage_below_target(self):
result = analyse_issues(self.SAMPLE_DATA, target_assignee_rate=0.90)
assert "BELOW TARGET" in result
def test_coverage_meets_target(self):
good_data = [
{"repo": "a", "open_issues": 10, "assigned": 10, "unassigned": 0},
]
result = analyse_issues(good_data, target_assignee_rate=0.80)
assert "OK" in result
def test_empty_repos_list(self):
result = analyse_issues([])
assert isinstance(result, str)
def test_single_repo(self):
data = [{"repo": "solo", "open_issues": 5, "assigned": 3, "unassigned": 2}]
result = analyse_issues(data)
assert "solo" in result or "issue_analysis" in result

View File

@@ -0,0 +1,143 @@
"""Tests for retrieval_enforcer.py.
Refs: Epic #367, Sub-issue #369
"""
from __future__ import annotations
import json
import os
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from mempalace.retrieval_enforcer import (
is_recall_query,
load_identity,
load_scratchpad,
enforce_retrieval_order,
search_skills,
RECALL_PATTERNS,
)
class TestRecallDetection:
"""Test the recall-query pattern matcher."""
@pytest.mark.parametrize("query", [
"what did we work on yesterday",
"status of the mempalace integration",
"remember the fleet audit results",
"last time we deployed the nexus",
"previously you mentioned a CI fix",
"we discussed the sovereign deployment",
])
def test_recall_queries_detected(self, query):
assert is_recall_query(query) is True
@pytest.mark.parametrize("query", [
"create a new file called test.py",
"run the test suite",
"deploy to production",
"write a function that sums numbers",
"install the package",
])
def test_non_recall_queries_skipped(self, query):
assert is_recall_query(query) is False
class TestLoadIdentity:
def test_loads_existing_identity(self, tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Timmy. A sovereign AI.")
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file):
result = load_identity()
assert "Timmy" in result
def test_returns_empty_on_missing_file(self, tmp_path):
identity_file = tmp_path / "nonexistent.txt"
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file):
result = load_identity()
assert result == ""
def test_truncates_long_identity(self, tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text(" ".join(["word"] * 300))
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file):
result = load_identity()
assert result.endswith("...")
assert len(result.split()) <= 201 # 200 words + "..."
class TestLoadScratchpad:
def test_loads_valid_scratchpad(self, tmp_path):
scratch_file = tmp_path / "session123.json"
scratch_file.write_text(json.dumps({"note": "test value", "key2": 42}))
with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path):
result = load_scratchpad("session123")
assert "note: test value" in result
assert "key2: 42" in result
def test_returns_empty_on_missing_file(self, tmp_path):
with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path):
result = load_scratchpad("nonexistent")
assert result == ""
def test_returns_empty_on_invalid_json(self, tmp_path):
scratch_file = tmp_path / "bad.json"
scratch_file.write_text("not valid json{{{")
with patch("mempalace.retrieval_enforcer.SCRATCHPAD_DIR", tmp_path):
result = load_scratchpad("bad")
assert result == ""
class TestEnforceRetrievalOrder:
def test_skips_non_recall_query(self):
result = enforce_retrieval_order("create a new file")
assert result["retrieved_from"] is None
assert result["tokens"] == 0
def test_runs_for_recall_query(self, tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Timmy.")
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \
patch("mempalace.retrieval_enforcer.search_skills", return_value=""):
result = enforce_retrieval_order("what did we work on yesterday")
assert "Identity" in result["context"]
assert "L0" in result["layers_checked"]
def test_palace_hit_sets_l1(self, tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Timmy.")
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
patch("mempalace.retrieval_enforcer.search_palace", return_value="Found: fleet audit results"), \
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""):
result = enforce_retrieval_order("what did we discuss yesterday")
assert result["retrieved_from"] == "L1"
assert "Palace Memory" in result["context"]
def test_falls_through_to_l5(self, tmp_path):
identity_file = tmp_path / "nonexistent.txt"
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \
patch("mempalace.retrieval_enforcer.search_skills", return_value=""):
result = enforce_retrieval_order("remember the old deployment", skip_if_not_recall=True)
assert result["retrieved_from"] == "L5"
def test_force_mode_skips_recall_check(self, tmp_path):
identity_file = tmp_path / "identity.txt"
identity_file.write_text("I am Timmy.")
with patch("mempalace.retrieval_enforcer.IDENTITY_PATH", identity_file), \
patch("mempalace.retrieval_enforcer.search_palace", return_value=""), \
patch("mempalace.retrieval_enforcer.search_gitea", return_value=""), \
patch("mempalace.retrieval_enforcer.search_skills", return_value=""):
result = enforce_retrieval_order("deploy now", skip_if_not_recall=False)
assert "Identity" in result["context"]

View File

@@ -0,0 +1,108 @@
"""Tests for scratchpad.py.
Refs: Epic #367, Sub-issue #372
"""
from __future__ import annotations
import json
import os
import sys
from pathlib import Path
from unittest.mock import patch
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from mempalace.scratchpad import (
write_scratch,
read_scratch,
delete_scratch,
list_sessions,
clear_session,
_scratch_path,
)
@pytest.fixture
def scratch_dir(tmp_path):
"""Provide a temporary scratchpad directory."""
with patch("mempalace.scratchpad.SCRATCHPAD_DIR", tmp_path):
yield tmp_path
class TestScratchPath:
def test_sanitizes_session_id(self):
path = _scratch_path("safe-id_123")
assert "safe-id_123.json" in str(path)
def test_strips_dangerous_chars(self):
path = _scratch_path("../../etc/passwd")
assert ".." not in path.name
assert "/" not in path.name
# Dots are stripped, so only alphanumeric chars remain
assert path.name == "etcpasswd.json"
class TestWriteAndRead:
def test_write_then_read(self, scratch_dir):
write_scratch("sess1", "note", "hello world")
result = read_scratch("sess1", "note")
assert "note" in result
assert result["note"]["value"] == "hello world"
def test_read_all_keys(self, scratch_dir):
write_scratch("sess1", "a", 1)
write_scratch("sess1", "b", 2)
result = read_scratch("sess1")
assert "a" in result
assert "b" in result
def test_read_missing_key(self, scratch_dir):
write_scratch("sess1", "exists", "yes")
result = read_scratch("sess1", "missing")
assert result == {}
def test_read_missing_session(self, scratch_dir):
result = read_scratch("nonexistent")
assert result == {}
def test_overwrite_key(self, scratch_dir):
write_scratch("sess1", "key", "v1")
write_scratch("sess1", "key", "v2")
result = read_scratch("sess1", "key")
assert result["key"]["value"] == "v2"
class TestDelete:
def test_delete_existing_key(self, scratch_dir):
write_scratch("sess1", "key", "val")
assert delete_scratch("sess1", "key") is True
assert read_scratch("sess1", "key") == {}
def test_delete_missing_key(self, scratch_dir):
write_scratch("sess1", "other", "val")
assert delete_scratch("sess1", "missing") is False
class TestListSessions:
def test_lists_sessions(self, scratch_dir):
write_scratch("alpha", "k", "v")
write_scratch("beta", "k", "v")
sessions = list_sessions()
assert "alpha" in sessions
assert "beta" in sessions
def test_empty_directory(self, scratch_dir):
assert list_sessions() == []
class TestClearSession:
def test_clears_existing(self, scratch_dir):
write_scratch("sess1", "k", "v")
assert clear_session("sess1") is True
assert read_scratch("sess1") == {}
def test_clear_nonexistent(self, scratch_dir):
assert clear_session("ghost") is False

View File

@@ -0,0 +1,255 @@
"""Tests for the Sovereign Memory Store and Promotion system.
Zero-API, zero-network — everything runs against an in-memory SQLite DB.
"""
import os
import sys
import tempfile
import time
import unittest
# Allow imports from parent package
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from sovereign_store import (
SovereignStore,
encode_text,
cosine_similarity_phase,
serialize_vector,
deserialize_vector,
)
from promotion import (
evaluate_for_promotion,
promote,
promote_session_batch,
)
class TestHRRVectors(unittest.TestCase):
"""Test the HRR encoding and similarity functions."""
def test_deterministic_encoding(self):
"""Same text always produces the same vector."""
v1 = encode_text("hello world")
v2 = encode_text("hello world")
self.assertAlmostEqual(cosine_similarity_phase(v1, v2), 1.0, places=5)
def test_similar_texts_higher_similarity(self):
"""Related texts should be more similar than unrelated ones."""
v_agent = encode_text("agent memory palace retrieval")
v_similar = encode_text("agent recall memory search")
v_unrelated = encode_text("banana strawberry fruit smoothie")
sim_related = cosine_similarity_phase(v_agent, v_similar)
sim_unrelated = cosine_similarity_phase(v_agent, v_unrelated)
self.assertGreater(sim_related, sim_unrelated)
def test_serialize_roundtrip(self):
"""Vectors survive serialization to/from bytes."""
vec = encode_text("test serialization")
blob = serialize_vector(vec)
restored = deserialize_vector(blob)
sim = cosine_similarity_phase(vec, restored)
self.assertAlmostEqual(sim, 1.0, places=5)
def test_empty_text(self):
"""Empty text gets a fallback encoding."""
vec = encode_text("")
self.assertEqual(len(vec) if hasattr(vec, '__len__') else len(list(vec)), 512)
class TestSovereignStore(unittest.TestCase):
"""Test the SQLite-backed sovereign store."""
def setUp(self):
self.db_path = os.path.join(tempfile.mkdtemp(), "test.db")
self.store = SovereignStore(db_path=self.db_path)
def tearDown(self):
self.store.close()
if os.path.exists(self.db_path):
os.remove(self.db_path)
def test_store_and_retrieve(self):
"""Store a fact and find it via search."""
mid = self.store.store("Timmy is a sovereign AI agent on Hermes VPS", room="identity")
results = self.store.search("sovereign agent", room="identity")
self.assertTrue(any(r["memory_id"] == mid for r in results))
def test_fts_search(self):
"""FTS5 keyword search works."""
self.store.store("The beacon game uses paperclips mechanics", room="projects")
self.store.store("Fleet agents handle delegation and dispatch", room="fleet")
results = self.store.search("paperclips")
self.assertTrue(len(results) > 0)
self.assertIn("paperclips", results[0]["content"].lower())
def test_hrr_search_semantic(self):
"""HRR similarity finds related content even without exact keywords."""
self.store.store("Memory palace rooms organize facts spatially", room="memory")
self.store.store("Pizza delivery service runs on weekends", room="unrelated")
results = self.store.search("organize knowledge rooms", room="memory")
self.assertTrue(len(results) > 0)
self.assertIn("palace", results[0]["content"].lower())
def test_room_filtering(self):
"""Room filter restricts search scope."""
self.store.store("Hermes harness manages tool calls", room="infrastructure")
self.store.store("Hermes mythology Greek god", room="lore")
results = self.store.search("Hermes", room="infrastructure")
self.assertTrue(all(r["room"] == "infrastructure" for r in results))
def test_trust_boost(self):
"""Trust score increases when boosted."""
mid = self.store.store("fact", trust=0.5)
self.store.boost_trust(mid, delta=0.1)
results = self.store.room_contents("general")
fact = next(r for r in results if r["memory_id"] == mid)
self.assertAlmostEqual(fact["trust_score"], 0.6, places=2)
def test_trust_decay(self):
"""Trust score decreases when decayed."""
mid = self.store.store("questionable fact", trust=0.5)
self.store.decay_trust(mid, delta=0.2)
results = self.store.room_contents("general")
fact = next(r for r in results if r["memory_id"] == mid)
self.assertAlmostEqual(fact["trust_score"], 0.3, places=2)
def test_batch_store(self):
"""Batch store works."""
ids = self.store.store_batch([
{"content": "fact one", "room": "test"},
{"content": "fact two", "room": "test"},
{"content": "fact three", "room": "test"},
])
self.assertEqual(len(ids), 3)
rooms = self.store.list_rooms()
test_room = next(r for r in rooms if r["room"] == "test")
self.assertEqual(test_room["count"], 3)
def test_stats(self):
"""Stats returns correct counts."""
self.store.store("a fact", room="r1")
self.store.store("another fact", room="r2")
s = self.store.stats()
self.assertEqual(s["total"], 2)
self.assertEqual(s["room_count"], 2)
def test_retrieval_count_increments(self):
"""Retrieval count goes up when a fact is found via search."""
self.store.store("unique searchable content xyz123", room="test")
self.store.search("xyz123")
results = self.store.room_contents("test")
self.assertTrue(any(r["retrieval_count"] > 0 for r in results))
class TestPromotion(unittest.TestCase):
"""Test the quality-gated promotion system."""
def setUp(self):
self.db_path = os.path.join(tempfile.mkdtemp(), "promo_test.db")
self.store = SovereignStore(db_path=self.db_path)
def tearDown(self):
self.store.close()
def test_successful_promotion(self):
"""Good content passes all gates."""
result = promote(
content="Timmy runs on the Hermes VPS at 143.198.27.163 with local Ollama inference",
store=self.store,
session_id="test-session-001",
scratch_key="vps_info",
room="infrastructure",
)
self.assertTrue(result.success)
self.assertIsNotNone(result.memory_id)
def test_reject_too_short(self):
"""Short fragments get rejected."""
result = promote(
content="yes",
store=self.store,
session_id="test",
scratch_key="short",
)
self.assertFalse(result.success)
self.assertIn("Too short", result.reason)
def test_reject_duplicate(self):
"""Duplicate content gets rejected."""
self.store.store("SOUL.md is the canonical identity document for Timmy", room="identity")
result = promote(
content="SOUL.md is the canonical identity document for Timmy",
store=self.store,
session_id="test",
scratch_key="soul",
room="identity",
)
self.assertFalse(result.success)
self.assertIn("uplicate", result.reason)
def test_reject_stale(self):
"""Old notes get flagged as stale."""
old_time = time.time() - (86400 * 10)
result = promote(
content="This is a note from long ago about something important",
store=self.store,
session_id="test",
scratch_key="old",
written_at=old_time,
)
self.assertFalse(result.success)
self.assertIn("Stale", result.reason)
def test_force_bypasses_gates(self):
"""Force flag overrides quality gates."""
result = promote(
content="ok",
store=self.store,
session_id="test",
scratch_key="forced",
force=True,
)
self.assertTrue(result.success)
def test_evaluate_dry_run(self):
"""Evaluate returns gate details without promoting."""
eval_result = evaluate_for_promotion(
content="The fleet uses kimi-k2.5 as the primary model for all agent operations",
store=self.store,
room="fleet",
)
self.assertTrue(eval_result["eligible"])
self.assertTrue(all(p for p, _ in eval_result["gates"].values()))
def test_batch_promotion(self):
"""Batch promotion processes all notes."""
notes = {
"infra": {"value": "Hermes VPS runs Ubuntu 22.04 with 2 vCPUs and 4GB RAM", "written_at": time.strftime("%Y-%m-%d %H:%M:%S")},
"short": {"value": "no", "written_at": time.strftime("%Y-%m-%d %H:%M:%S")},
"model": {"value": "The primary local model is gemma4:latest running on Ollama", "written_at": time.strftime("%Y-%m-%d %H:%M:%S")},
}
results = promote_session_batch(self.store, "batch-session", notes, room="config")
promoted = [r for r in results if r.success]
rejected = [r for r in results if not r.success]
self.assertEqual(len(promoted), 2)
self.assertEqual(len(rejected), 1)
def test_promotion_logged(self):
"""Successful promotions appear in the audit log."""
promote(
content="Forge is hosted at forge.alexanderwhitestone.com running Gitea",
store=self.store,
session_id="log-test",
scratch_key="forge",
room="infrastructure",
)
log = self.store.recent_promotions()
self.assertTrue(len(log) > 0)
self.assertEqual(log[0]["session_id"], "log-test")
self.assertEqual(log[0]["scratch_key"], "forge")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,100 @@
"""Tests for wakeup.py.
Refs: Epic #367, Sub-issue #372
"""
from __future__ import annotations
import json
import os
import sys
import time
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from mempalace.wakeup import (
palace_wakeup,
fleet_status_summary,
_load_identity,
_palace_context,
)
class TestLoadIdentity:
def test_loads_identity(self, tmp_path):
f = tmp_path / "identity.txt"
f.write_text("I am Timmy. A sovereign AI.")
with patch("mempalace.wakeup.IDENTITY_PATH", f):
result = _load_identity()
assert "Timmy" in result
def test_missing_identity(self, tmp_path):
f = tmp_path / "nope.txt"
with patch("mempalace.wakeup.IDENTITY_PATH", f):
assert _load_identity() == ""
class TestFleetStatus:
def test_reads_fleet_json(self, tmp_path):
f = tmp_path / "fleet_status.json"
f.write_text(json.dumps({
"Groq": {"state": "active", "last_seen": "2026-04-07"},
"Ezra": {"state": "idle", "last_seen": "2026-04-06"},
}))
with patch("mempalace.wakeup.FLEET_STATUS_PATH", f):
result = fleet_status_summary()
assert "Fleet Status" in result
assert "Groq" in result
assert "active" in result
def test_missing_fleet_file(self, tmp_path):
f = tmp_path / "nope.json"
with patch("mempalace.wakeup.FLEET_STATUS_PATH", f):
assert fleet_status_summary() == ""
def test_invalid_json(self, tmp_path):
f = tmp_path / "bad.json"
f.write_text("not json")
with patch("mempalace.wakeup.FLEET_STATUS_PATH", f):
assert fleet_status_summary() == ""
class TestPalaceWakeup:
def test_generates_context_with_identity(self, tmp_path):
identity = tmp_path / "identity.txt"
identity.write_text("I am Timmy.")
cache = tmp_path / "cache.txt"
with patch("mempalace.wakeup.IDENTITY_PATH", identity), \
patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \
patch("mempalace.wakeup._palace_context", return_value=""), \
patch("mempalace.wakeup.fleet_status_summary", return_value=""):
result = palace_wakeup(force=True)
assert "Identity" in result
assert "Timmy" in result
assert "Session" in result
def test_uses_cache_when_fresh(self, tmp_path):
cache = tmp_path / "cache.txt"
cache.write_text("cached wake-up content")
# Touch the file so it's fresh
with patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \
patch("mempalace.wakeup.WAKEUP_CACHE_TTL", 9999):
result = palace_wakeup(force=False)
assert result == "cached wake-up content"
def test_force_bypasses_cache(self, tmp_path):
cache = tmp_path / "cache.txt"
cache.write_text("stale content")
identity = tmp_path / "identity.txt"
identity.write_text("I am Timmy.")
with patch("mempalace.wakeup.WAKEUP_CACHE_PATH", cache), \
patch("mempalace.wakeup.IDENTITY_PATH", identity), \
patch("mempalace.wakeup._palace_context", return_value=""), \
patch("mempalace.wakeup.fleet_status_summary", return_value=""):
result = palace_wakeup(force=True)
assert "Identity" in result
assert "stale content" not in result

View File

@@ -0,0 +1,161 @@
"""Wake-up Protocol — session start context injection.
Generates 300-900 tokens of context when a new Hermes session starts.
Loads identity, recent palace context, and fleet status.
Refs: Epic #367, Sub-issue #372
"""
from __future__ import annotations
import json
import os
import subprocess
import time
from pathlib import Path
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
IDENTITY_PATH = Path.home() / ".mempalace" / "identity.txt"
MEMPALACE_BIN = "/Library/Frameworks/Python.framework/Versions/3.12/bin/mempalace"
FLEET_STATUS_PATH = Path.home() / ".hermes" / "fleet_status.json"
WAKEUP_CACHE_PATH = Path.home() / ".hermes" / "last_wakeup.txt"
WAKEUP_CACHE_TTL = 300 # 5 minutes — don't regenerate if recent
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _load_identity() -> str:
"""Read the agent identity file."""
try:
if IDENTITY_PATH.exists():
text = IDENTITY_PATH.read_text(encoding="utf-8").strip()
# Cap at ~150 tokens for wake-up brevity
words = text.split()
if len(words) > 150:
text = " ".join(words[:150]) + "..."
return text
except (OSError, PermissionError):
pass
return ""
def _palace_context() -> str:
"""Run mempalace wake-up command for recent context. Degrades gracefully."""
try:
bin_path = MEMPALACE_BIN if os.path.exists(MEMPALACE_BIN) else "mempalace"
result = subprocess.run(
[bin_path, "wake-up"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
# ONNX issues (#373) or CLI not available — degrade gracefully
pass
return ""
def fleet_status_summary() -> str:
"""Read cached fleet status for lightweight session context."""
try:
if FLEET_STATUS_PATH.exists():
data = json.loads(FLEET_STATUS_PATH.read_text(encoding="utf-8"))
lines = ["## Fleet Status"]
if isinstance(data, dict):
for agent, status in data.items():
if isinstance(status, dict):
state = status.get("state", "unknown")
last_seen = status.get("last_seen", "?")
lines.append(f" {agent}: {state} (last: {last_seen})")
else:
lines.append(f" {agent}: {status}")
if len(lines) > 1:
return "\n".join(lines)
except (OSError, json.JSONDecodeError):
pass
return ""
def _check_cache() -> str:
"""Return cached wake-up if fresh enough."""
try:
if WAKEUP_CACHE_PATH.exists():
age = time.time() - WAKEUP_CACHE_PATH.stat().st_mtime
if age < WAKEUP_CACHE_TTL:
return WAKEUP_CACHE_PATH.read_text(encoding="utf-8").strip()
except OSError:
pass
return ""
def _write_cache(content: str) -> None:
"""Cache the wake-up content."""
try:
WAKEUP_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
WAKEUP_CACHE_PATH.write_text(content, encoding="utf-8")
except OSError:
pass
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def palace_wakeup(force: bool = False) -> str:
"""Generate wake-up context for a new session. ~300-900 tokens.
Args:
force: If True, bypass the 5-minute cache and regenerate.
Returns:
Formatted context string suitable for prepending to the system prompt.
"""
# Check cache first (avoids redundant work on rapid session restarts)
if not force:
cached = _check_cache()
if cached:
return cached
parts = []
# L0: Identity
identity = _load_identity()
if identity:
parts.append(f"## Identity\n{identity}")
# L1: Recent palace context
palace = _palace_context()
if palace:
parts.append(palace)
# Fleet status (lightweight)
fleet = fleet_status_summary()
if fleet:
parts.append(fleet)
# Timestamp
parts.append(f"## Session\nWake-up generated: {time.strftime('%Y-%m-%d %H:%M:%S')}")
content = "\n\n".join(parts)
# Cache for TTL
_write_cache(content)
return content
# ---------------------------------------------------------------------------
# CLI entry point for testing
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print(palace_wakeup(force=True))

View File

@@ -0,0 +1,39 @@
#!/usr/bin/env bash
# orchestrate.sh — Sovereign Orchestrator wrapper
# Sets environment and runs orchestrator.py
#
# Usage:
# ./orchestrate.sh # dry-run (safe default)
# ./orchestrate.sh --once # single live dispatch cycle
# ./orchestrate.sh --daemon # continuous (every 15 min)
# ./orchestrate.sh --dry-run # explicit dry-run
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
HERMES_DIR="${HOME}/.hermes"
# Load Gitea token
if [[ -z "${GITEA_TOKEN:-}" ]]; then
if [[ -f "${HERMES_DIR}/gitea_token_vps" ]]; then
export GITEA_TOKEN="$(cat "${HERMES_DIR}/gitea_token_vps")"
else
echo "[FATAL] No GITEA_TOKEN and ~/.hermes/gitea_token_vps not found"
exit 1
fi
fi
# Load Telegram token
if [[ -z "${TELEGRAM_BOT_TOKEN:-}" ]]; then
if [[ -f "${HOME}/.config/telegram/special_bot" ]]; then
export TELEGRAM_BOT_TOKEN="$(cat "${HOME}/.config/telegram/special_bot")"
fi
fi
# Run preflight checks if available
if [[ -x "${HERMES_DIR}/bin/api-key-preflight.sh" ]]; then
"${HERMES_DIR}/bin/api-key-preflight.sh" 2>/dev/null || true
fi
# Run the orchestrator
exec python3 "${SCRIPT_DIR}/orchestrator.py" "$@"

View File

@@ -0,0 +1,645 @@
#!/usr/bin/env python3
"""
Sovereign Orchestrator v1
Reads the Gitea backlog, scores/prioritizes issues, dispatches to agents.
Usage:
python3 orchestrator.py --once # single dispatch cycle
python3 orchestrator.py --daemon # run every 15 min
python3 orchestrator.py --dry-run # score and report, no dispatch
"""
import json
import os
import sys
import time
import subprocess
import urllib.request
import urllib.error
import urllib.parse
from datetime import datetime, timezone
# ---------------------------------------------------------------------------
# CONFIG
# ---------------------------------------------------------------------------
GITEA_API = "https://forge.alexanderwhitestone.com/api/v1"
GITEA_OWNER = "Timmy_Foundation"
REPOS = ["timmy-config", "the-nexus", "timmy-home"]
TELEGRAM_CHAT_ID = "-1003664764329"
DAEMON_INTERVAL = 900 # 15 minutes
# Tags that mark issues we should never auto-dispatch
FILTER_TAGS = ["[EPIC]", "[DO NOT CLOSE]", "[PERMANENT]", "[PHILOSOPHY]", "[MORNING REPORT]"]
# Known agent usernames on Gitea (for assignee detection)
AGENT_USERNAMES = {"groq", "ezra", "bezalel", "allegro", "timmy", "thetimmyc"}
# ---------------------------------------------------------------------------
# AGENT ROSTER
# ---------------------------------------------------------------------------
AGENTS = {
"groq": {
"type": "loop",
"endpoint": "local",
"strengths": ["code", "bug-fix", "small-changes"],
"repos": ["the-nexus", "hermes-agent", "timmy-config", "timmy-home"],
"max_concurrent": 1,
},
"ezra": {
"type": "gateway",
"endpoint": "http://143.198.27.163:8643/v1/chat/completions",
"ssh": "root@143.198.27.163",
"strengths": ["research", "architecture", "complex", "multi-file"],
"repos": ["timmy-config", "the-nexus", "timmy-home"],
"max_concurrent": 1,
},
"bezalel": {
"type": "gateway",
"endpoint": "http://159.203.146.185:8643/v1/chat/completions",
"ssh": "root@159.203.146.185",
"strengths": ["ci", "infra", "ops", "testing"],
"repos": ["timmy-config", "hermes-agent", "the-nexus"],
"max_concurrent": 1,
},
}
# ---------------------------------------------------------------------------
# CREDENTIALS
# ---------------------------------------------------------------------------
def load_gitea_token():
"""Read Gitea token from env or file."""
token = os.environ.get("GITEA_TOKEN", "")
if token:
return token.strip()
token_path = os.path.expanduser("~/.hermes/gitea_token_vps")
try:
with open(token_path) as f:
return f.read().strip()
except FileNotFoundError:
print(f"[FATAL] No GITEA_TOKEN env and {token_path} not found")
sys.exit(1)
def load_telegram_token():
"""Read Telegram bot token from file."""
path = os.path.expanduser("~/.config/telegram/special_bot")
try:
with open(path) as f:
return f.read().strip()
except FileNotFoundError:
return ""
GITEA_TOKEN = ""
TELEGRAM_TOKEN = ""
# ---------------------------------------------------------------------------
# HTTP HELPERS (stdlib only)
# ---------------------------------------------------------------------------
def gitea_request(path, method="GET", data=None):
"""Make an authenticated Gitea API request."""
url = f"{GITEA_API}{path}"
headers = {
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json",
}
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
body_text = e.read().decode() if e.fp else ""
print(f"[API ERROR] {method} {url} -> {e.code}: {body_text[:200]}")
return None
except Exception as e:
print(f"[API ERROR] {method} {url} -> {e}")
return None
def send_telegram(message):
"""Send message to Telegram group."""
if not TELEGRAM_TOKEN:
print("[WARN] No Telegram token, skipping notification")
return False
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
data = json.dumps({
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
"parse_mode": "Markdown",
"disable_web_page_preview": True,
}).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 200
except Exception as e:
print(f"[TELEGRAM ERROR] {e}")
return False
# ---------------------------------------------------------------------------
# 1. BACKLOG READER
# ---------------------------------------------------------------------------
def fetch_issues(repo):
"""Fetch all open issues from a repo, handling pagination."""
issues = []
page = 1
while True:
result = gitea_request(
f"/repos/{GITEA_OWNER}/{repo}/issues?state=open&type=issues&limit=50&page={page}"
)
if not result:
break
issues.extend(result)
if len(result) < 50:
break
page += 1
return issues
def should_filter(issue):
"""Check if issue title contains any filter tags."""
title = issue.get("title", "").upper()
for tag in FILTER_TAGS:
if tag.upper().replace("[", "").replace("]", "") in title.replace("[", "").replace("]", ""):
return True
# Also filter pull requests
if issue.get("pull_request"):
return True
return False
def read_backlog():
"""Read and filter the full backlog across all repos."""
backlog = []
for repo in REPOS:
print(f" Fetching {repo}...")
issues = fetch_issues(repo)
for issue in issues:
if should_filter(issue):
continue
assignees = [a.get("login", "") for a in (issue.get("assignees") or [])]
labels = [l.get("name", "") for l in (issue.get("labels") or [])]
backlog.append({
"repo": repo,
"number": issue["number"],
"title": issue["title"],
"labels": labels,
"assignees": assignees,
"created_at": issue.get("created_at", ""),
"comments": issue.get("comments", 0),
"url": issue.get("html_url", ""),
})
print(f" Total actionable issues: {len(backlog)}")
return backlog
# ---------------------------------------------------------------------------
# 2. PRIORITY SCORER
# ---------------------------------------------------------------------------
def score_issue(issue):
"""Score an issue 0-100 based on priority signals."""
score = 0
title_upper = issue["title"].upper()
labels_upper = [l.upper() for l in issue["labels"]]
all_text = title_upper + " " + " ".join(labels_upper)
# Critical / Bug: +30
if any(tag in all_text for tag in ["CRITICAL", "BUG"]):
score += 30
# P0 / Urgent: +25
if any(tag in all_text for tag in ["P0", "URGENT"]):
score += 25
# P1: +15
if "P1" in all_text:
score += 15
# OPS / Security: +10
if any(tag in all_text for tag in ["OPS", "SECURITY"]):
score += 10
# Unassigned: +10
if not issue["assignees"]:
score += 10
# Age > 7 days: +5
try:
created = issue["created_at"].replace("Z", "+00:00")
created_dt = datetime.fromisoformat(created)
age_days = (datetime.now(timezone.utc) - created_dt).days
if age_days > 7:
score += 5
except (ValueError, AttributeError):
pass
# Has comments: +5
if issue["comments"] > 0:
score += 5
# Infrastructure repo: +5
if issue["repo"] == "timmy-config":
score += 5
# Already assigned to an agent: -10
if any(a.lower() in AGENT_USERNAMES for a in issue["assignees"]):
score -= 10
issue["score"] = max(0, min(100, score))
return issue
def prioritize_backlog(backlog):
"""Score and sort the backlog by priority."""
scored = [score_issue(i) for i in backlog]
scored.sort(key=lambda x: x["score"], reverse=True)
return scored
# ---------------------------------------------------------------------------
# 3. AGENT HEALTH CHECKS
# ---------------------------------------------------------------------------
def check_process(pattern):
"""Check if a local process matching pattern is running."""
try:
result = subprocess.run(
["pgrep", "-f", pattern],
capture_output=True, text=True, timeout=5
)
return result.returncode == 0
except Exception:
return False
def check_ssh_service(host, service_name):
"""Check if a remote service is running via SSH."""
try:
result = subprocess.run(
["ssh", "-o", "ConnectTimeout=5", "-o", "StrictHostKeyChecking=no",
f"root@{host}",
f"systemctl is-active {service_name} 2>/dev/null || pgrep -f {service_name}"],
capture_output=True, text=True, timeout=15
)
return result.returncode == 0
except Exception:
return False
def check_agent_health(name, agent):
"""Check if an agent is alive and available."""
if agent["type"] == "loop":
alive = check_process(f"agent-loop.*{name}")
elif agent["type"] == "gateway":
host = agent["ssh"].split("@")[1]
service = f"hermes-{name}"
alive = check_ssh_service(host, service)
else:
alive = False
return alive
def get_agent_status():
"""Get health status for all agents."""
status = {}
for name, agent in AGENTS.items():
alive = check_agent_health(name, agent)
status[name] = {
"alive": alive,
"type": agent["type"],
"strengths": agent["strengths"],
}
symbol = "UP" if alive else "DOWN"
print(f" {name}: {symbol} ({agent['type']})")
return status
# ---------------------------------------------------------------------------
# 4. DISPATCHER
# ---------------------------------------------------------------------------
def classify_issue(issue):
"""Classify issue type based on title and labels."""
title = issue["title"].upper()
labels = " ".join(issue["labels"]).upper()
all_text = title + " " + labels
types = []
if any(w in all_text for w in ["BUG", "FIX", "BROKEN", "ERROR", "CRASH"]):
types.append("bug-fix")
if any(w in all_text for w in ["OPS", "DEPLOY", "CI", "INFRA", "PIPELINE", "MONITOR"]):
types.append("ops")
if any(w in all_text for w in ["SECURITY", "AUTH", "TOKEN", "CERT"]):
types.append("ops")
if any(w in all_text for w in ["RESEARCH", "AUDIT", "INVESTIGATE", "EXPLORE"]):
types.append("research")
if any(w in all_text for w in ["ARCHITECT", "DESIGN", "REFACTOR", "REWRITE"]):
types.append("architecture")
if any(w in all_text for w in ["TEST", "TESTING", "QA", "VALIDATE"]):
types.append("testing")
if any(w in all_text for w in ["CODE", "IMPLEMENT", "ADD", "CREATE", "BUILD"]):
types.append("code")
if any(w in all_text for w in ["SMALL", "QUICK", "SIMPLE", "MINOR", "TWEAK"]):
types.append("small-changes")
if any(w in all_text for w in ["COMPLEX", "MULTI", "LARGE", "OVERHAUL"]):
types.append("complex")
if not types:
types = ["code"] # default
return types
def match_agent(issue, agent_status, dispatched_this_cycle):
"""Find the best available agent for an issue."""
issue_types = classify_issue(issue)
candidates = []
for name, agent in AGENTS.items():
# Agent must be alive
if not agent_status.get(name, {}).get("alive", False):
continue
# Agent must handle this repo
if issue["repo"] not in agent["repos"]:
continue
# Agent must not already be dispatched this cycle
if dispatched_this_cycle.get(name, 0) >= agent["max_concurrent"]:
continue
# Score match based on overlapping strengths
overlap = len(set(issue_types) & set(agent["strengths"]))
candidates.append((name, overlap))
if not candidates:
return None
# Sort by overlap score descending, return best match
candidates.sort(key=lambda x: x[1], reverse=True)
return candidates[0][0]
def assign_issue(repo, number, agent_name):
"""Assign an issue to an agent on Gitea."""
# First get current assignees to not clobber
result = gitea_request(f"/repos/{GITEA_OWNER}/{repo}/issues/{number}")
if not result:
return False
current = [a.get("login", "") for a in (result.get("assignees") or [])]
if agent_name in current:
print(f" Already assigned to {agent_name}")
return True
new_assignees = current + [agent_name]
patch_result = gitea_request(
f"/repos/{GITEA_OWNER}/{repo}/issues/{number}",
method="PATCH",
data={"assignees": new_assignees}
)
return patch_result is not None
def dispatch_to_gateway(agent_name, agent, issue):
"""Trigger work on a gateway agent via SSH."""
host = agent["ssh"]
repo = issue["repo"]
number = issue["number"]
title = issue["title"]
# Try to trigger dispatch via SSH
cmd = (
f'ssh -o ConnectTimeout=10 -o StrictHostKeyChecking=no {host} '
f'"echo \'Dispatched by orchestrator: {repo}#{number} - {title}\' '
f'>> /tmp/hermes-dispatch.log"'
)
try:
subprocess.run(cmd, shell=True, timeout=20, capture_output=True)
return True
except Exception as e:
print(f" [WARN] SSH dispatch to {agent_name} failed: {e}")
return False
def dispatch_cycle(backlog, agent_status, dry_run=False):
"""Run one dispatch cycle. Returns dispatch report."""
dispatched = []
skipped = []
dispatched_count = {} # agent_name -> count dispatched this cycle
# Only dispatch unassigned issues (or issues not assigned to agents)
for issue in backlog:
agent_assigned = any(a.lower() in AGENT_USERNAMES for a in issue["assignees"])
if agent_assigned:
skipped.append((issue, "already assigned to agent"))
continue
if issue["score"] < 5:
skipped.append((issue, "score too low"))
continue
best_agent = match_agent(issue, agent_status, dispatched_count)
if not best_agent:
skipped.append((issue, "no available agent"))
continue
if dry_run:
dispatched.append({
"agent": best_agent,
"repo": issue["repo"],
"number": issue["number"],
"title": issue["title"],
"score": issue["score"],
"dry_run": True,
})
dispatched_count[best_agent] = dispatched_count.get(best_agent, 0) + 1
continue
# Actually dispatch
print(f" Dispatching {issue['repo']}#{issue['number']} -> {best_agent}")
success = assign_issue(issue["repo"], issue["number"], best_agent)
if success:
agent = AGENTS[best_agent]
if agent["type"] == "gateway":
dispatch_to_gateway(best_agent, agent, issue)
dispatched.append({
"agent": best_agent,
"repo": issue["repo"],
"number": issue["number"],
"title": issue["title"],
"score": issue["score"],
})
dispatched_count[best_agent] = dispatched_count.get(best_agent, 0) + 1
else:
skipped.append((issue, "assignment failed"))
return dispatched, skipped
# ---------------------------------------------------------------------------
# 5. CONSOLIDATED REPORT
# ---------------------------------------------------------------------------
def generate_report(backlog, dispatched, skipped, agent_status, dry_run=False):
"""Generate dispatch cycle report."""
now = datetime.now().strftime("%Y-%m-%d %H:%M")
mode = " [DRY RUN]" if dry_run else ""
lines = []
lines.append(f"=== Sovereign Orchestrator Report{mode} ===")
lines.append(f"Time: {now}")
lines.append(f"Total backlog: {len(backlog)} issues")
lines.append("")
# Agent health
lines.append("-- Agent Health --")
for name, info in agent_status.items():
symbol = "UP" if info["alive"] else "DOWN"
lines.append(f" {name}: {symbol} ({info['type']})")
lines.append("")
# Dispatched
lines.append(f"-- Dispatched: {len(dispatched)} --")
for d in dispatched:
dry = " (dry-run)" if d.get("dry_run") else ""
lines.append(f" [{d['score']}] {d['repo']}#{d['number']} -> {d['agent']}{dry}")
lines.append(f" {d['title'][:60]}")
lines.append("")
# Skipped (top 10)
skip_summary = {}
for issue, reason in skipped:
skip_summary[reason] = skip_summary.get(reason, 0) + 1
lines.append(f"-- Skipped: {len(skipped)} --")
for reason, count in sorted(skip_summary.items(), key=lambda x: -x[1]):
lines.append(f" {reason}: {count}")
lines.append("")
# Top 5 unassigned
unassigned = [i for i in backlog if not i["assignees"]][:5]
lines.append("-- Top 5 Unassigned (by priority) --")
for i in unassigned:
lines.append(f" [{i['score']}] {i['repo']}#{i['number']}: {i['title'][:55]}")
lines.append("")
report = "\n".join(lines)
return report
def format_telegram_report(backlog, dispatched, skipped, agent_status, dry_run=False):
"""Format a compact Telegram message."""
mode = " DRY RUN" if dry_run else ""
now = datetime.now().strftime("%H:%M")
parts = [f"*Orchestrator{mode}* ({now})"]
parts.append(f"Backlog: {len(backlog)} | Dispatched: {len(dispatched)} | Skipped: {len(skipped)}")
# Agent status line
agent_line = " | ".join(
f"{'' if v['alive'] else ''}{k}" for k, v in agent_status.items()
)
parts.append(agent_line)
if dispatched:
parts.append("")
parts.append("*Dispatched:*")
for d in dispatched[:5]:
dry = " 🔍" if d.get("dry_run") else ""
parts.append(f" `{d['repo']}#{d['number']}` → {d['agent']}{dry}")
# Top unassigned
unassigned = [i for i in backlog if not i["assignees"]][:3]
if unassigned:
parts.append("")
parts.append("*Top unassigned:*")
for i in unassigned:
parts.append(f" [{i['score']}] `{i['repo']}#{i['number']}` {i['title'][:40]}")
return "\n".join(parts)
# ---------------------------------------------------------------------------
# 6. MAIN
# ---------------------------------------------------------------------------
def run_cycle(dry_run=False):
"""Execute one full orchestration cycle."""
global GITEA_TOKEN, TELEGRAM_TOKEN
GITEA_TOKEN = load_gitea_token()
TELEGRAM_TOKEN = load_telegram_token()
print("\n[1/4] Reading backlog...")
backlog = read_backlog()
print("\n[2/4] Scoring and prioritizing...")
backlog = prioritize_backlog(backlog)
for i in backlog[:10]:
print(f" [{i['score']:3d}] {i['repo']}/{i['number']}: {i['title'][:55]}")
print("\n[3/4] Checking agent health...")
agent_status = get_agent_status()
print("\n[4/4] Dispatching...")
dispatched, skipped = dispatch_cycle(backlog, agent_status, dry_run=dry_run)
# Generate reports
report = generate_report(backlog, dispatched, skipped, agent_status, dry_run=dry_run)
print("\n" + report)
# Send Telegram notification
if dispatched or not dry_run:
tg_msg = format_telegram_report(backlog, dispatched, skipped, agent_status, dry_run=dry_run)
send_telegram(tg_msg)
return backlog, dispatched, skipped
def main():
import argparse
parser = argparse.ArgumentParser(description="Sovereign Orchestrator v1")
parser.add_argument("--once", action="store_true", help="Single dispatch cycle")
parser.add_argument("--daemon", action="store_true", help="Run every 15 min")
parser.add_argument("--dry-run", action="store_true", help="Score/report only, no dispatch")
parser.add_argument("--interval", type=int, default=DAEMON_INTERVAL,
help=f"Daemon interval in seconds (default: {DAEMON_INTERVAL})")
args = parser.parse_args()
if not any([args.once, args.daemon, args.dry_run]):
args.dry_run = True # safe default
print("[INFO] No mode specified, defaulting to --dry-run")
print("=" * 60)
print(" SOVEREIGN ORCHESTRATOR v1")
print("=" * 60)
if args.daemon:
print(f"[DAEMON] Running every {args.interval}s (Ctrl+C to stop)")
cycle = 0
while True:
cycle += 1
print(f"\n--- Cycle {cycle} ---")
try:
run_cycle(dry_run=args.dry_run)
except Exception as e:
print(f"[ERROR] Cycle failed: {e}")
print(f"[DAEMON] Sleeping {args.interval}s...")
time.sleep(args.interval)
else:
run_cycle(dry_run=args.dry_run)
if __name__ == "__main__":
main()

526
scripts/kaizen_retro.py Normal file
View File

@@ -0,0 +1,526 @@
#!/usr/bin/env python3
"""
Kaizen Retro — Automated retrospective after every burn cycle.
Reads overnight Gitea activity, fleet state, and loop logs.
Generates ONE concrete improvement suggestion and posts it.
Usage:
python3 scripts/kaizen_retro.py [--dry-run]
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import urllib.error
import urllib.request
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Optional
# Ensure repo root is on path so we can import gitea_client
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT))
from gitea_client import GiteaClient, GiteaError
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
REPOS = [
"Timmy_Foundation/the-nexus",
"Timmy_Foundation/timmy-config",
"Timmy_Foundation/timmy-home",
"Timmy_Foundation/the-door",
"Timmy_Foundation/turboquant",
"Timmy_Foundation/hermes-agent",
"Timmy_Foundation/.profile",
]
HERMES_HOME = Path.home() / ".hermes"
TIMMY_HOME = Path.home() / ".timmy"
WORKFORCE_STATE_PATH = HERMES_HOME / "workforce-state.json"
FLEET_ROUTING_PATH = HERMES_HOME / "fleet-routing.json"
CHANNEL_DIR_PATH = REPO_ROOT / "channel_directory.json"
REPORTS_DIR = REPO_ROOT / "reports"
MORNING_REPORT_REPO = "Timmy_Foundation/timmy-config"
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_HOME_CHANNEL", "-1003664764329")
TELEGRAM_MAX_LEN = 4000 # leave headroom below the 4096 hard limit
STALE_DAYS = 7
MAX_ATTEMPT_COMMENT_THRESHOLD = 5
ISSUE_TYPE_KEYWORDS = {
"bug": ["bug", "fix", "crash", "error", "regression", "broken"],
"feature": ["feature", "implement", "add", "support", "enable"],
"docs": ["doc", "readme", "wiki", "guide", "documentation"],
"kaizen": ["kaizen", "retro", "improvement", "continuous"],
"devops": ["deploy", "ci", "cd", "docker", "server", "infra"],
}
BLOCKER_LABELS = {"blocked", "timeout", "stale", "help wanted", "wontfix", "duplicate"}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def load_json(path: Path) -> Any:
if not path.exists():
return None
with open(path) as f:
return json.load(f)
def iso_day_ago(days: int = 1) -> str:
return (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
def classify_issue_type(issue: dict) -> str:
title = (issue.get("title", "") or "").lower()
body = (issue.get("body", "") or "").lower()
labels = [l.get("name", "").lower() for l in issue.get("labels", []) or []]
text = f"{title} {body} {' '.join(labels)}"
words = set(text.split())
best = "other"
best_score = 0
for kind, keywords in ISSUE_TYPE_KEYWORDS.items():
# Short keywords (<=3 chars) require whole-word match to avoid false positives like
# "ci" inside "cleanup" or "cd" inside "abcde".
score = sum(
1 for kw in keywords
if (len(kw) <= 3 and kw in words) or (len(kw) > 3 and kw in text)
)
# label match is stronger
for label in labels:
label_words = set(label.split())
if any(
(len(kw) <= 3 and kw in label_words) or (len(kw) > 3 and kw in label)
for kw in keywords
):
score += 3
if score > best_score:
best_score = score
best = kind
return best
def is_max_attempts_candidate(issue: dict) -> bool:
"""Heuristic for issues that consumed excessive attempts."""
labels = {l.get("name", "").lower() for l in issue.get("labels", []) or []}
if labels & BLOCKER_LABELS:
return True
if issue.get("comments", 0) >= MAX_ATTEMPT_COMMENT_THRESHOLD:
return True
created = issue.get("created_at")
if created:
try:
created_dt = datetime.fromisoformat(created.replace("Z", "+00:00"))
if datetime.now(timezone.utc) - created_dt > timedelta(days=STALE_DAYS):
return True
except Exception:
pass
return False
def telegram_send(text: str, bot_token: str, chat_id: str) -> list[dict]:
"""Post text to Telegram, chunking if it exceeds the message limit."""
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
chunks = []
if len(text) <= TELEGRAM_MAX_LEN:
chunks = [text]
else:
# Split on newlines to preserve readability
lines = text.splitlines(keepends=True)
current = ""
for line in lines:
if len(current) + len(line) > TELEGRAM_MAX_LEN:
if current:
chunks.append(current)
current = line
else:
current += line
if current:
chunks.append(current)
results = []
for i, chunk in enumerate(chunks):
prefix = f"*(part {i + 1}/{len(chunks)})*\n" if len(chunks) > 1 else ""
payload = {"chat_id": chat_id, "text": prefix + chunk, "parse_mode": "Markdown"}
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=30) as resp:
results.append(json.loads(resp.read().decode()))
return results
def find_latest_morning_report_issue(client: GiteaClient) -> Optional[int]:
try:
issues = client.list_issues(MORNING_REPORT_REPO, state="open", sort="created", direction="desc", limit=20)
for issue in issues:
if "good morning report" in issue.title.lower() or "morning report" in issue.title.lower():
return issue.number
# fallback to closed
issues = client.list_issues(MORNING_REPORT_REPO, state="closed", sort="created", direction="desc", limit=20)
for issue in issues:
if "good morning report" in issue.title.lower() or "morning report" in issue.title.lower():
return issue.number
except Exception:
pass
return None
def fmt_pct(num: float, den: float) -> str:
if den == 0:
return "N/A"
return f"{num/den:.0%}"
# ---------------------------------------------------------------------------
# Analysis
# ---------------------------------------------------------------------------
def gather_metrics(client: GiteaClient, since: str) -> dict:
"""Collect overnight metrics from Gitea."""
metrics = {
"closed_issues": [],
"merged_prs": [],
"closed_prs": [],
"open_issues": [],
"max_attempts_issues": [],
"by_agent": {},
"by_repo": {},
"by_type": {},
}
for repo in REPOS:
repo_short = repo.split("/")[1]
metrics["by_repo"][repo_short] = {
"closed": 0,
"merged_prs": 0,
"closed_prs": 0,
"open": 0,
"max_attempts": 0,
"successes": 0,
"failures": 0,
}
# Closed issues since window
try:
closed = client.list_issues(repo, state="closed", since=since, sort="updated", direction="desc", limit=100)
for issue in closed:
issue_dict = {
"number": issue.number,
"title": issue.title,
"repo": repo_short,
"type": classify_issue_type({"title": issue.title, "body": issue.body, "labels": [{"name": lb.name} for lb in issue.labels]}),
"assignee": issue.assignees[0].login if issue.assignees else "unassigned",
}
metrics["closed_issues"].append(issue_dict)
metrics["by_repo"][repo_short]["closed"] += 1
metrics["by_repo"][repo_short]["successes"] += 1
agent = issue_dict["assignee"]
if agent not in metrics["by_agent"]:
metrics["by_agent"][agent] = {"successes": 0, "failures": 0, "closed": 0, "repos": set()}
metrics["by_agent"][agent]["successes"] += 1
metrics["by_agent"][agent]["closed"] += 1
metrics["by_agent"][agent]["repos"].add(repo_short)
t = issue_dict["type"]
if t not in metrics["by_type"]:
metrics["by_type"][t] = {"successes": 0, "failures": 0, "total": 0}
metrics["by_type"][t]["successes"] += 1
metrics["by_type"][t]["total"] += 1
except Exception as exc:
print(f"Warning: could not load closed issues for {repo}: {exc}", file=sys.stderr)
# Open issues (for stale / max-attempts detection)
try:
open_issues = client.list_issues(repo, state="open", sort="created", direction="desc", limit=100)
metrics["by_repo"][repo_short]["open"] = len(open_issues)
for issue in open_issues:
issue_raw = {
"number": issue.number,
"title": issue.title,
"labels": [{"name": lb.name} for lb in issue.labels],
"comments": issue.comments,
"created_at": issue.created_at,
}
if is_max_attempts_candidate(issue_raw):
metrics["max_attempts_issues"].append({
"number": issue.number,
"title": issue.title,
"repo": repo_short,
"type": classify_issue_type({"title": issue.title, "body": issue.body, "labels": issue_raw["labels"]}),
"assignee": issue.assignees[0].login if issue.assignees else "unassigned",
})
metrics["by_repo"][repo_short]["max_attempts"] += 1
metrics["by_repo"][repo_short]["failures"] += 1
agent = issue.assignees[0].login if issue.assignees else "unassigned"
if agent not in metrics["by_agent"]:
metrics["by_agent"][agent] = {"successes": 0, "failures": 0, "closed": 0, "repos": set()}
metrics["by_agent"][agent]["failures"] += 1
metrics["by_agent"][agent]["repos"].add(repo_short)
t = classify_issue_type({"title": issue.title, "body": issue.body, "labels": issue_raw["labels"]})
if t not in metrics["by_type"]:
metrics["by_type"][t] = {"successes": 0, "failures": 0, "total": 0}
metrics["by_type"][t]["failures"] += 1
metrics["by_type"][t]["total"] += 1
except Exception as exc:
print(f"Warning: could not load open issues for {repo}: {exc}", file=sys.stderr)
# PRs merged / closed since window (filter client-side; Gitea PR API ignores since)
try:
prs = client.list_pulls(repo, state="closed", sort="updated", limit=100)
since_dt = datetime.fromisoformat(since.replace("Z", "+00:00"))
for pr in prs:
updated = pr.updated_at or pr.created_at or ""
try:
updated_dt = datetime.fromisoformat(updated.replace("Z", "+00:00"))
if updated_dt < since_dt:
continue
except Exception:
pass
if pr.merged:
metrics["merged_prs"].append({
"number": pr.number,
"title": pr.title,
"repo": repo_short,
"user": pr.user.login if pr.user else "unknown",
})
metrics["by_repo"][repo_short]["merged_prs"] += 1
else:
metrics["closed_prs"].append({
"number": pr.number,
"title": pr.title,
"repo": repo_short,
"user": pr.user.login if pr.user else "unknown",
})
metrics["by_repo"][repo_short]["closed_prs"] += 1
except Exception as exc:
print(f"Warning: could not load PRs for {repo}: {exc}", file=sys.stderr)
# Convert sets to lists for JSON serialization
for agent in metrics["by_agent"].values():
agent["repos"] = sorted(agent["repos"])
return metrics
def load_workforce_state() -> dict:
return load_json(WORKFORCE_STATE_PATH) or {}
def load_fleet_routing() -> list[dict]:
data = load_json(FLEET_ROUTING_PATH)
if data and "agents" in data:
return data["agents"]
return []
def generate_suggestion(metrics: dict, fleet: list[dict]) -> str:
"""Generate ONE concrete improvement suggestion based on the data."""
by_agent = metrics["by_agent"]
by_repo = metrics["by_repo"]
by_type = metrics["by_type"]
max_attempts = metrics["max_attempts_issues"]
suggestions: list[str] = []
# 1. Agent with poor repo performance
for agent, stats in by_agent.items():
total = stats["successes"] + stats["failures"]
if total >= 3 and stats["successes"] == 0:
repos = ", ".join(stats["repos"])
suggestions.append(
f"🎯 **{agent}** has a 0% verify rate over the last cycle (0/{total}) on repos: {repos}. "
f"Consider removing these repos from {agent}'s routing or providing targeted onboarding."
)
# 2. Repo with highest failure concentration
repo_failures = [(r, s) for r, s in by_repo.items() if s["failures"] > 0]
if repo_failures:
repo_failures.sort(key=lambda x: x[1]["failures"], reverse=True)
worst_repo, worst_stats = repo_failures[0]
total_repo = worst_stats["successes"] + worst_stats["failures"]
if worst_stats["failures"] >= 2:
suggestions.append(
f"🎯 **{worst_repo}** has the most friction ({worst_stats['failures']} blocked/stale issues, "
f"{fmt_pct(worst_stats['successes'], total_repo)} success). "
f"Consider splitting issues in {worst_repo} into smaller chunks or assigning a stronger agent."
)
# 3. Max-attempts pattern
if len(max_attempts) >= 3:
type_counts: dict[str, int] = {}
for issue in max_attempts:
type_counts[issue["type"]] = type_counts.get(issue["type"], 0) + 1
top_type = max(type_counts, key=type_counts.get) if type_counts else "unknown"
suggestions.append(
f"🎯 **{len(max_attempts)} issues** hit max-attempts or went stale. "
f"The dominant type is **{top_type}**. "
f"Consider adding acceptance criteria templates or pre-flight checklists for {top_type} issues."
)
# 4. Issue type disparity
for t, stats in by_type.items():
total = stats["total"]
if total >= 3 and stats["successes"] == 0:
suggestions.append(
f"🎯 **{t}** issues have a 0% closure rate ({stats['failures']} stale). "
f"Consider routing all {t} issues to a specialist agent or creating a dedicated playbook."
)
# 5. Fleet routing gap (if fleet data exists)
active_agents = {a["name"] for a in fleet if a.get("active")}
assigned_agents = set(by_agent.keys())
idle_agents = active_agents - assigned_agents - {"unassigned"}
if len(idle_agents) >= 2:
suggestions.append(
f"🎯 **{len(idle_agents)} active agents** have no assignments this cycle: {', '.join(idle_agents)}. "
f"Consider expanding their repo lists or investigating why they aren't receiving work."
)
if suggestions:
return suggestions[0]
# Fallback: celebrate or nudge
total_closed = len(metrics["closed_issues"])
total_merged = len(metrics["merged_prs"])
if total_closed >= 5 or total_merged >= 3:
return (
f"🎯 Strong cycle: {total_closed} issues closed, {total_merged} PRs merged. "
f"Next improvement: write down the top 3 patterns that made this cycle successful so we can replicate them."
)
return (
"🎯 Low activity this cycle. Next improvement: ensure at least one agent loop is actively polling "
"for unassigned issues so work doesn't sit idle."
)
def build_report(metrics: dict, suggestion: str, since: str) -> str:
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
period = since[:10]
lines = [
f"# 🌀 Kaizen Retro — {now}",
f"*Period: {period} → now*\n",
"## Numbers",
f"- **Issues closed:** {len(metrics['closed_issues'])}",
f"- **PRs merged:** {len(metrics['merged_prs'])}",
f"- **PRs closed without merge:** {len(metrics['closed_prs'])}",
f"- **Max-attempts / stale issues:** {len(metrics['max_attempts_issues'])}",
"",
"## By Agent",
]
for agent, stats in sorted(metrics["by_agent"].items(), key=lambda x: x[1]["successes"] + x[1]["failures"], reverse=True):
total = stats["successes"] + stats["failures"]
rate = fmt_pct(stats["successes"], total)
lines.append(f"- **{agent}**: {stats['successes']} closed, {stats['failures']} stale / max-attempts — verify rate {rate}")
lines.extend(["", "## By Repo"])
for repo, stats in sorted(metrics["by_repo"].items(), key=lambda x: x[1]["successes"] + x[1]["failures"], reverse=True):
total = stats["successes"] + stats["failures"]
if total == 0 and stats["open"] == 0:
continue
rate = fmt_pct(stats["successes"], total)
lines.append(
f"- **{repo}**: {stats['successes']} closed, {stats['failures']} stale, {stats['open']} open — verify rate {rate}"
)
lines.extend(["", "## By Issue Type"])
for t, stats in sorted(metrics["by_type"].items(), key=lambda x: x[1]["total"], reverse=True):
total = stats["total"]
rate = fmt_pct(stats["successes"], total)
lines.append(f"- **{t}**: {stats['successes']} closed, {stats['failures']} stale — verify rate {rate}")
if metrics["max_attempts_issues"]:
lines.extend(["", "## Max-Attempts / Stale Issues"])
for issue in metrics["max_attempts_issues"][:10]:
lines.append(f"- {issue['repo']}#{issue['number']} ({issue['type']}, assignee: {issue['assignee']}) — {issue['title']}")
if len(metrics["max_attempts_issues"]) > 10:
lines.append(f"- … and {len(metrics['max_attempts_issues']) - 10} more")
lines.extend(["", "## One Concrete Improvement", suggestion, ""])
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(description="Kaizen Retro — automated burn-cycle retrospective")
parser.add_argument("--dry-run", action="store_true", help="Print report but do not post")
parser.add_argument("--since", type=str, help="ISO timestamp for lookback window (default: 24h ago)")
parser.add_argument("--post-to", type=str, help="Override Telegram chat ID")
args = parser.parse_args()
since = args.since or iso_day_ago(1)
client = GiteaClient()
print("Gathering metrics since", since)
metrics = gather_metrics(client, since)
fleet = load_fleet_routing()
suggestion = generate_suggestion(metrics, fleet)
report = build_report(metrics, suggestion, since)
print(report)
# Save JSON snapshot
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
snapshot_path = REPORTS_DIR / f"kaizen-retro-{datetime.now(timezone.utc).strftime('%Y%m%d')}.json"
snapshot = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"since": since,
"metrics": metrics,
"suggestion": suggestion,
"report_markdown": report,
}
with open(snapshot_path, "w") as f:
json.dump(snapshot, f, indent=2)
print(f"\nSnapshot saved to {snapshot_path}")
if args.dry_run:
return 0
# Post to Telegram
chat_id = args.post_to or TELEGRAM_CHAT_ID
bot_token = TELEGRAM_BOT_TOKEN
if bot_token and chat_id:
try:
telegram_send(report, bot_token, chat_id)
print("Posted to Telegram.")
except Exception as exc:
print(f"Failed to post to Telegram: {exc}", file=sys.stderr)
else:
print("Telegram not configured (set TELEGRAM_BOT_TOKEN and TELEGRAM_HOME_CHANNEL).", file=sys.stderr)
# Comment on latest morning report issue
morning_issue = find_latest_morning_report_issue(client)
if morning_issue:
try:
client.create_comment(MORNING_REPORT_REPO, morning_issue, report)
print(f"Commented on morning report issue #{morning_issue}.")
except Exception as exc:
print(f"Failed to comment on morning report issue: {exc}", file=sys.stderr)
else:
print("No morning report issue found to comment on.", file=sys.stderr)
return 0
if __name__ == "__main__":
sys.exit(main())

112
tasks.py
View File

@@ -1860,22 +1860,56 @@ def good_morning_report():
except Exception:
pass
# Genchi Genbutsu: count verified completions from the last 24h
verified_completions = 0
raw_completions = 0
metrics_dir = Path.home() / ".hermes" / "logs"
for metrics_file in metrics_dir.glob("*-metrics.jsonl"):
try:
with open(metrics_file) as mf:
for line in mf:
line = line.strip()
if not line:
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
ts = row.get("ts", "")
if not ts:
continue
try:
from datetime import datetime as _dt, timezone as _tz, timedelta as _td
row_time = _dt.fromisoformat(ts.replace("Z", "+00:00"))
if (now - row_time) > _td(hours=24):
continue
except Exception:
continue
if row.get("outcome") != "success":
continue
raw_completions += 1
if row.get("verified") is True:
verified_completions += 1
except Exception:
pass
# --- BUILD THE REPORT ---
body = f"""Good morning, Alexander. It's {day_name}.
## Overnight Debrief
**Heartbeat:** {tick_count} ticks logged overnight.
**Gitea:** {"up all night" if gitea_up else "⚠️ had downtime"}
**Local inference:** {"running steady" if local_inference_up else "⚠️ had downtime"}
**Model status:** {model_status}
**Models on disk:** {len(models_loaded)} ({', '.join(m for m in models_loaded if 'timmy' in m.lower() or 'hermes' in m.lower()) or 'none with our name'})
**Alerts:** {len(alerts)} {'' + '; '.join(alerts[-3:]) if alerts else '(clean night)'}
|**Heartbeat:** {tick_count} ticks logged overnight.
|**Gitea:** {"up all night" if gitea_up else "⚠️ had downtime"}
|**Local inference:** {"running steady" if local_inference_up else "⚠️ had downtime"}
|**Model status:** {model_status}
|**Models on disk:** {len(models_loaded)} ({', '.join(m for m in models_loaded if 'timmy' in m.lower() or 'hermes' in m.lower()) or 'none with our name'})
|**Alerts:** {len(alerts)} {'' + '; '.join(alerts[-3:]) if alerts else '(clean night)'}
{briefing_summary}
**DPO training pairs staged:** {dpo_count} session files exported
**Local model smoke test:** {smoke_result}
**Verified completions (24h):** {verified_completions} {'(Genchi Genbutsu clean)' if verified_completions == raw_completions else f'({raw_completions - verified_completions} raw completions failed verification)'}
## Gitea Pulse
@@ -1915,6 +1949,29 @@ That's all. Have a good morning.
return {"filed": False, "error": str(e)}
# ── NEW 6b: Kaizen Retro ─────────────────────────────────────────────
@huey.periodic_task(crontab(hour="7", minute="15")) # 7:15 AM daily, after morning report
def kaizen_retro():
"""Run the automated burn-cycle retrospective."""
retro_script = Path(__file__).resolve().parent / "bin" / "kaizen-retro.sh"
if not retro_script.exists():
return {"ran": False, "error": "kaizen-retro.sh not found"}
result = subprocess.run(
["bash", str(retro_script)],
capture_output=True,
text=True,
timeout=300,
)
return {
"ran": True,
"exit_code": result.returncode,
"stdout": result.stdout[-2000:] if result.stdout else "",
"stderr": result.stderr[-1000:] if result.stderr else "",
}
# ── NEW 7: Repo Watchdog ─────────────────────────────────────────────
@huey.periodic_task(crontab(minute="*/20")) # every 20 minutes
@@ -2323,7 +2380,38 @@ def velocity_tracking():
total_open += open_n
total_closed += closed_n
results.append({"repo": repo, "open": open_n, "closed": closed_n, "date": today})
data = {"date": today, "repos": results, "total_open": total_open, "total_closed": total_closed}
# Genchi Genbutsu: count verified completions from agent metrics
verified_completions = 0
raw_completions = 0
metrics_dir = Path.home() / ".hermes" / "logs"
for metrics_file in metrics_dir.glob("*-metrics.jsonl"):
try:
with open(metrics_file) as mf:
for line in mf:
line = line.strip()
if not line:
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
if row.get("outcome") != "success":
continue
raw_completions += 1
if row.get("verified") is True:
verified_completions += 1
except Exception:
pass
data = {
"date": today,
"repos": results,
"total_open": total_open,
"total_closed": total_closed,
"raw_completions": raw_completions,
"verified_completions": verified_completions,
}
with open(report_file, "w") as f:
json.dump(data, f, indent=2)
# Dashboard
@@ -2333,14 +2421,16 @@ def velocity_tracking():
for r in results:
f.write(f"| {r['repo'].split('/')[-1]} | {r['open']} | {r['closed']} |\n")
f.write(f"| **TOTAL** | **{total_open}** | **{total_closed}** |\n\n")
f.write(f"**Verified completions (Genchi Genbutsu):** {verified_completions}\n")
f.write(f"**Raw completions:** {raw_completions}\n\n")
# Trend
prior = sorted(glob.glob(os.path.join(report_dir, "velocity-*.json")))
if len(prior) > 1:
f.write("## Recent Trend\n\n| Date | Total Open | Total Closed |\n|---|---|---|\n")
f.write("## Recent Trend\n\n| Date | Total Open | Total Closed | Verified |\n|---|---|---|---|\n")
for pf in prior[-10:]:
pd = json.load(open(pf))
f.write(f"| {pd['date']} | {pd['total_open']} | {pd['total_closed']} |\n")
msg = f"Velocity: {total_open} open, {total_closed} closed ({today})"
f.write(f"| {pd['date']} | {pd['total_open']} | {pd['total_closed']} | {pd.get('verified_completions', '-')} |\n")
msg = f"Velocity: {total_open} open, {total_closed} closed, {verified_completions} verified ({today})"
if len(prior) > 1:
prev = json.load(open(prior[-2]))
if total_open > prev["total_open"]:

283
tests/test_kaizen_retro.py Normal file
View File

@@ -0,0 +1,283 @@
"""Tests for the Kaizen Retro burn-cycle retrospective script."""
from __future__ import annotations
import importlib.util
import json
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
REPO_ROOT = Path(__file__).parent.parent
# Load kaizen_retro.py as a module (it lives in scripts/, not a package)
spec = importlib.util.spec_from_file_location("kaizen_retro", REPO_ROOT / "scripts" / "kaizen_retro.py")
kr = importlib.util.module_from_spec(spec)
spec.loader.exec_module(kr)
# ── classify_issue_type ───────────────────────────────────────────────────
class TestClassifyIssueType:
def test_classifies_bug_from_title(self):
issue = {"title": "Fix crash on startup", "body": "", "labels": []}
assert kr.classify_issue_type(issue) == "bug"
def test_classifies_feature_from_label(self):
issue = {"title": "Add dark mode", "body": "", "labels": [{"name": "enhancement"}]}
# label "enhancement" doesn't match any keyword directly, but "feature" and "add" are in title
assert kr.classify_issue_type(issue) == "feature"
def test_classifies_docs_from_label(self):
issue = {"title": "Update guide", "body": "", "labels": [{"name": "documentation"}]}
assert kr.classify_issue_type(issue) == "docs"
def test_label_match_stronger_than_title(self):
issue = {"title": "Something random", "body": "", "labels": [{"name": "bug"}]}
assert kr.classify_issue_type(issue) == "bug"
def test_kaizen_takes_precedence_with_both_labels(self):
issue = {"title": "Process improvement", "body": "", "labels": [{"name": "kaizen"}, {"name": "bug"}]}
# kaizen label gives +3, bug gives +3, tie goes to first seen? kaizen appears first in dict
assert kr.classify_issue_type(issue) == "kaizen"
def test_defaults_to_other(self):
issue = {"title": "Tidy up naming", "body": "No user-facing change", "labels": [{"name": "cleanup"}]}
assert kr.classify_issue_type(issue) == "other"
# ── is_max_attempts_candidate ─────────────────────────────────────────────
class TestIsMaxAttemptsCandidate:
def test_blocker_label_returns_true(self):
issue = {"labels": [{"name": "blocked"}], "comments": 0, "created_at": "2026-04-07T00:00:00Z"}
assert kr.is_max_attempts_candidate(issue) is True
def test_timeout_label_returns_true(self):
issue = {"labels": [{"name": "timeout"}], "comments": 0, "created_at": "2026-04-07T00:00:00Z"}
assert kr.is_max_attempts_candidate(issue) is True
def test_high_comment_count_returns_true(self):
issue = {"labels": [], "comments": 5, "created_at": "2026-04-07T00:00:00Z"}
assert kr.is_max_attempts_candidate(issue) is True
def test_fresh_issue_with_low_comments_returns_false(self):
now = datetime.now(timezone.utc)
issue = {"labels": [], "comments": 2, "created_at": now.isoformat()}
assert kr.is_max_attempts_candidate(issue) is False
def test_stale_age_returns_true(self):
old = datetime.now(timezone.utc) - timedelta(days=10)
issue = {"labels": [], "comments": 0, "created_at": old.isoformat()}
assert kr.is_max_attempts_candidate(issue) is True
# ── fmt_pct ───────────────────────────────────────────────────────────────
class TestFmtPct:
def test_basic_percentage(self):
assert kr.fmt_pct(3, 4) == "75%"
def test_zero_denominator(self):
assert kr.fmt_pct(0, 0) == "N/A"
def test_perfect_rate(self):
assert kr.fmt_pct(10, 10) == "100%"
# ── generate_suggestion ───────────────────────────────────────────────────
class TestGenerateSuggestion:
def test_agent_zero_success_rate(self):
metrics = {
"by_agent": {
"groq": {"successes": 0, "failures": 5, "closed": 0, "repos": ["timmy-home"]},
},
"by_repo": {},
"by_type": {},
"max_attempts_issues": [],
"closed_issues": [],
"merged_prs": [],
"closed_prs": [],
}
suggestion = kr.generate_suggestion(metrics, [])
assert "groq" in suggestion
assert "0%" in suggestion or "verify rate" in suggestion
def test_repo_with_most_failures(self):
metrics = {
"by_agent": {},
"by_repo": {
"the-nexus": {"successes": 2, "failures": 5, "closed": 2, "open": 3},
},
"by_type": {},
"max_attempts_issues": [],
"closed_issues": [],
"merged_prs": [],
"closed_prs": [],
}
suggestion = kr.generate_suggestion(metrics, [])
assert "the-nexus" in suggestion
assert "friction" in suggestion
def test_max_attempts_pattern(self):
metrics = {
"by_agent": {},
"by_repo": {},
"by_type": {},
"max_attempts_issues": [
{"type": "devops"}, {"type": "devops"}, {"type": "feature"}
],
"closed_issues": [],
"merged_prs": [],
"closed_prs": [],
}
suggestion = kr.generate_suggestion(metrics, [])
assert "devops" in suggestion
assert "max-attempts" in suggestion.lower() or "stale" in suggestion.lower()
def test_idle_agents(self):
metrics = {
"by_agent": {},
"by_repo": {},
"by_type": {},
"max_attempts_issues": [],
"closed_issues": [],
"merged_prs": [],
"closed_prs": [],
}
fleet = [{"name": "allegro", "active": True}, {"name": "ezra", "active": True}]
suggestion = kr.generate_suggestion(metrics, fleet)
assert "idle" in suggestion.lower() or "no assignments" in suggestion.lower()
def test_fallback_celebration(self):
metrics = {
"by_agent": {},
"by_repo": {},
"by_type": {},
"max_attempts_issues": [],
"closed_issues": [{}, {}, {}, {}, {}],
"merged_prs": [{}, {}, {}],
"closed_prs": [],
}
suggestion = kr.generate_suggestion(metrics, [])
assert "Strong cycle" in suggestion
def test_fallback_low_activity(self):
metrics = {
"by_agent": {},
"by_repo": {},
"by_type": {},
"max_attempts_issues": [],
"closed_issues": [],
"merged_prs": [],
"closed_prs": [],
}
suggestion = kr.generate_suggestion(metrics, [])
assert "Low activity" in suggestion or "idle" in suggestion.lower()
# ── build_report ──────────────────────────────────────────────────────────
class TestBuildReport:
def test_report_contains_numbers_section(self):
metrics = {
"closed_issues": [{}, {}],
"merged_prs": [{}],
"closed_prs": [],
"max_attempts_issues": [],
"by_agent": {"ezra": {"successes": 2, "failures": 0, "repos": ["timmy-config"]}},
"by_repo": {"timmy-config": {"successes": 2, "failures": 0, "open": 1}},
"by_type": {"feature": {"successes": 2, "failures": 0, "total": 2}},
}
report = kr.build_report(metrics, "Do better.", "2026-04-06T00:00:00+00:00")
assert "## Numbers" in report
assert "Issues closed:** 2" in report
assert "PRs merged:** 1" in report
assert "## By Agent" in report
assert "## By Repo" in report
assert "## By Issue Type" in report
assert "Do better." in report
def test_report_skips_empty_repos(self):
metrics = {
"closed_issues": [],
"merged_prs": [],
"closed_prs": [],
"max_attempts_issues": [],
"by_agent": {},
"by_repo": {"unused-repo": {"successes": 0, "failures": 0, "open": 0}},
"by_type": {},
}
report = kr.build_report(metrics, "Nudge.", "2026-04-06T00:00:00+00:00")
assert "unused-repo" not in report
def test_report_truncates_max_attempts(self):
metrics = {
"closed_issues": [],
"merged_prs": [],
"closed_prs": [],
"max_attempts_issues": [{"repo": "r", "number": i, "type": "bug", "assignee": "a", "title": f"T{i}"} for i in range(15)],
"by_agent": {},
"by_repo": {},
"by_type": {},
}
report = kr.build_report(metrics, "Fix it.", "2026-04-06T00:00:00+00:00")
assert "and 5 more" in report
# ── telegram_send ─────────────────────────────────────────────────────────
class TestTelegramSend:
def test_short_message_sent_in_one_piece(self):
with patch("urllib.request.urlopen") as mock_urlopen:
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": true}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
results = kr.telegram_send("Hello", "fake-token", "123")
assert len(results) == 1
assert results[0]["ok"] is True
# Verify payload
call_args = mock_urlopen.call_args
req = call_args[0][0]
payload = json.loads(req.data.decode())
assert payload["text"] == "Hello"
assert payload["chat_id"] == "123"
def test_long_message_chunked(self):
big_text = "Line\n" * 2000 # ~10k chars
with patch("urllib.request.urlopen") as mock_urlopen:
mock_resp = MagicMock()
mock_resp.read.return_value = b'{"ok": true}'
mock_urlopen.return_value.__enter__.return_value = mock_resp
results = kr.telegram_send(big_text, "fake-token", "123")
assert len(results) >= 2
# First chunk should have a part prefix
req = mock_urlopen.call_args_list[0][0][0]
payload = json.loads(req.data.decode())
assert "(part 1" in payload["text"]
# ── load helpers ──────────────────────────────────────────────────────────
class TestLoadHelpers:
def test_load_json_missing_returns_none(self, tmp_path):
missing = tmp_path / "does_not_exist.json"
assert kr.load_json(missing) is None
def test_load_json_valid(self, tmp_path):
p = tmp_path / "data.json"
p.write_text('{"a": 1}')
assert kr.load_json(p) == {"a": 1}
def test_iso_day_ago_format(self):
s = kr.iso_day_ago(1)
# Should be a valid ISO timestamp string
dt = datetime.fromisoformat(s)
now = datetime.now(timezone.utc)
assert now - dt < timedelta(days=2)

View File

@@ -0,0 +1,10 @@
{
"name": "Bezalel Builder Wizard",
"role": "Artificer",
"capabilities": [
"provisioning",
"gpu-orchestration",
"model-serving"
],
"instructions": "Take the provided keys and use them to get GPUs for big models (Gemma 4). Wire them into the harness."
}