diff --git a/hermes-sovereign/orchestrator/orchestrate.sh b/hermes-sovereign/orchestrator/orchestrate.sh new file mode 100755 index 00000000..8af52ebc --- /dev/null +++ b/hermes-sovereign/orchestrator/orchestrate.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# orchestrate.sh — Sovereign Orchestrator wrapper +# Sets environment and runs orchestrator.py +# +# Usage: +# ./orchestrate.sh # dry-run (safe default) +# ./orchestrate.sh --once # single live dispatch cycle +# ./orchestrate.sh --daemon # continuous (every 15 min) +# ./orchestrate.sh --dry-run # explicit dry-run + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +HERMES_DIR="${HOME}/.hermes" + +# Load Gitea token +if [[ -z "${GITEA_TOKEN:-}" ]]; then + if [[ -f "${HERMES_DIR}/gitea_token_vps" ]]; then + export GITEA_TOKEN="$(cat "${HERMES_DIR}/gitea_token_vps")" + else + echo "[FATAL] No GITEA_TOKEN and ~/.hermes/gitea_token_vps not found" + exit 1 + fi +fi + +# Load Telegram token +if [[ -z "${TELEGRAM_BOT_TOKEN:-}" ]]; then + if [[ -f "${HOME}/.config/telegram/special_bot" ]]; then + export TELEGRAM_BOT_TOKEN="$(cat "${HOME}/.config/telegram/special_bot")" + fi +fi + +# Run preflight checks if available +if [[ -x "${HERMES_DIR}/bin/api-key-preflight.sh" ]]; then + "${HERMES_DIR}/bin/api-key-preflight.sh" 2>/dev/null || true +fi + +# Run the orchestrator +exec python3 "${SCRIPT_DIR}/orchestrator.py" "$@" diff --git a/hermes-sovereign/orchestrator/orchestrator.py b/hermes-sovereign/orchestrator/orchestrator.py new file mode 100755 index 00000000..075e5d59 --- /dev/null +++ b/hermes-sovereign/orchestrator/orchestrator.py @@ -0,0 +1,645 @@ +#!/usr/bin/env python3 +""" +Sovereign Orchestrator v1 +Reads the Gitea backlog, scores/prioritizes issues, dispatches to agents. + +Usage: + python3 orchestrator.py --once # single dispatch cycle + python3 orchestrator.py --daemon # run every 15 min + python3 orchestrator.py --dry-run # score and report, no dispatch +""" + +import json +import os +import sys +import time +import subprocess +import urllib.request +import urllib.error +import urllib.parse +from datetime import datetime, timezone + +# --------------------------------------------------------------------------- +# CONFIG +# --------------------------------------------------------------------------- + +GITEA_API = "https://forge.alexanderwhitestone.com/api/v1" +GITEA_OWNER = "Timmy_Foundation" +REPOS = ["timmy-config", "the-nexus", "timmy-home"] + +TELEGRAM_CHAT_ID = "-1003664764329" +DAEMON_INTERVAL = 900 # 15 minutes + +# Tags that mark issues we should never auto-dispatch +FILTER_TAGS = ["[EPIC]", "[DO NOT CLOSE]", "[PERMANENT]", "[PHILOSOPHY]", "[MORNING REPORT]"] + +# Known agent usernames on Gitea (for assignee detection) +AGENT_USERNAMES = {"groq", "ezra", "bezalel", "allegro", "timmy", "thetimmyc"} + +# --------------------------------------------------------------------------- +# AGENT ROSTER +# --------------------------------------------------------------------------- + +AGENTS = { + "groq": { + "type": "loop", + "endpoint": "local", + "strengths": ["code", "bug-fix", "small-changes"], + "repos": ["the-nexus", "hermes-agent", "timmy-config", "timmy-home"], + "max_concurrent": 1, + }, + "ezra": { + "type": "gateway", + "endpoint": "http://143.198.27.163:8643/v1/chat/completions", + "ssh": "root@143.198.27.163", + "strengths": ["research", "architecture", "complex", "multi-file"], + "repos": ["timmy-config", "the-nexus", "timmy-home"], + "max_concurrent": 1, + }, + "bezalel": { + "type": "gateway", + "endpoint": "http://159.203.146.185:8643/v1/chat/completions", + "ssh": "root@159.203.146.185", + "strengths": ["ci", "infra", "ops", "testing"], + "repos": ["timmy-config", "hermes-agent", "the-nexus"], + "max_concurrent": 1, + }, +} + +# --------------------------------------------------------------------------- +# CREDENTIALS +# --------------------------------------------------------------------------- + +def load_gitea_token(): + """Read Gitea token from env or file.""" + token = os.environ.get("GITEA_TOKEN", "") + if token: + return token.strip() + token_path = os.path.expanduser("~/.hermes/gitea_token_vps") + try: + with open(token_path) as f: + return f.read().strip() + except FileNotFoundError: + print(f"[FATAL] No GITEA_TOKEN env and {token_path} not found") + sys.exit(1) + + +def load_telegram_token(): + """Read Telegram bot token from file.""" + path = os.path.expanduser("~/.config/telegram/special_bot") + try: + with open(path) as f: + return f.read().strip() + except FileNotFoundError: + return "" + + +GITEA_TOKEN = "" +TELEGRAM_TOKEN = "" + +# --------------------------------------------------------------------------- +# HTTP HELPERS (stdlib only) +# --------------------------------------------------------------------------- + +def gitea_request(path, method="GET", data=None): + """Make an authenticated Gitea API request.""" + url = f"{GITEA_API}{path}" + headers = { + "Authorization": f"token {GITEA_TOKEN}", + "Content-Type": "application/json", + "Accept": "application/json", + } + body = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=body, headers=headers, method=method) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + body_text = e.read().decode() if e.fp else "" + print(f"[API ERROR] {method} {url} -> {e.code}: {body_text[:200]}") + return None + except Exception as e: + print(f"[API ERROR] {method} {url} -> {e}") + return None + + +def send_telegram(message): + """Send message to Telegram group.""" + if not TELEGRAM_TOKEN: + print("[WARN] No Telegram token, skipping notification") + return False + url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" + data = json.dumps({ + "chat_id": TELEGRAM_CHAT_ID, + "text": message, + "parse_mode": "Markdown", + "disable_web_page_preview": True, + }).encode() + req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=15) as resp: + return resp.status == 200 + except Exception as e: + print(f"[TELEGRAM ERROR] {e}") + return False + + +# --------------------------------------------------------------------------- +# 1. BACKLOG READER +# --------------------------------------------------------------------------- + +def fetch_issues(repo): + """Fetch all open issues from a repo, handling pagination.""" + issues = [] + page = 1 + while True: + result = gitea_request( + f"/repos/{GITEA_OWNER}/{repo}/issues?state=open&type=issues&limit=50&page={page}" + ) + if not result: + break + issues.extend(result) + if len(result) < 50: + break + page += 1 + return issues + + +def should_filter(issue): + """Check if issue title contains any filter tags.""" + title = issue.get("title", "").upper() + for tag in FILTER_TAGS: + if tag.upper().replace("[", "").replace("]", "") in title.replace("[", "").replace("]", ""): + return True + # Also filter pull requests + if issue.get("pull_request"): + return True + return False + + +def read_backlog(): + """Read and filter the full backlog across all repos.""" + backlog = [] + for repo in REPOS: + print(f" Fetching {repo}...") + issues = fetch_issues(repo) + for issue in issues: + if should_filter(issue): + continue + assignees = [a.get("login", "") for a in (issue.get("assignees") or [])] + labels = [l.get("name", "") for l in (issue.get("labels") or [])] + backlog.append({ + "repo": repo, + "number": issue["number"], + "title": issue["title"], + "labels": labels, + "assignees": assignees, + "created_at": issue.get("created_at", ""), + "comments": issue.get("comments", 0), + "url": issue.get("html_url", ""), + }) + print(f" Total actionable issues: {len(backlog)}") + return backlog + + +# --------------------------------------------------------------------------- +# 2. PRIORITY SCORER +# --------------------------------------------------------------------------- + +def score_issue(issue): + """Score an issue 0-100 based on priority signals.""" + score = 0 + title_upper = issue["title"].upper() + labels_upper = [l.upper() for l in issue["labels"]] + all_text = title_upper + " " + " ".join(labels_upper) + + # Critical / Bug: +30 + if any(tag in all_text for tag in ["CRITICAL", "BUG"]): + score += 30 + + # P0 / Urgent: +25 + if any(tag in all_text for tag in ["P0", "URGENT"]): + score += 25 + + # P1: +15 + if "P1" in all_text: + score += 15 + + # OPS / Security: +10 + if any(tag in all_text for tag in ["OPS", "SECURITY"]): + score += 10 + + # Unassigned: +10 + if not issue["assignees"]: + score += 10 + + # Age > 7 days: +5 + try: + created = issue["created_at"].replace("Z", "+00:00") + created_dt = datetime.fromisoformat(created) + age_days = (datetime.now(timezone.utc) - created_dt).days + if age_days > 7: + score += 5 + except (ValueError, AttributeError): + pass + + # Has comments: +5 + if issue["comments"] > 0: + score += 5 + + # Infrastructure repo: +5 + if issue["repo"] == "timmy-config": + score += 5 + + # Already assigned to an agent: -10 + if any(a.lower() in AGENT_USERNAMES for a in issue["assignees"]): + score -= 10 + + issue["score"] = max(0, min(100, score)) + return issue + + +def prioritize_backlog(backlog): + """Score and sort the backlog by priority.""" + scored = [score_issue(i) for i in backlog] + scored.sort(key=lambda x: x["score"], reverse=True) + return scored + + +# --------------------------------------------------------------------------- +# 3. AGENT HEALTH CHECKS +# --------------------------------------------------------------------------- + +def check_process(pattern): + """Check if a local process matching pattern is running.""" + try: + result = subprocess.run( + ["pgrep", "-f", pattern], + capture_output=True, text=True, timeout=5 + ) + return result.returncode == 0 + except Exception: + return False + + +def check_ssh_service(host, service_name): + """Check if a remote service is running via SSH.""" + try: + result = subprocess.run( + ["ssh", "-o", "ConnectTimeout=5", "-o", "StrictHostKeyChecking=no", + f"root@{host}", + f"systemctl is-active {service_name} 2>/dev/null || pgrep -f {service_name}"], + capture_output=True, text=True, timeout=15 + ) + return result.returncode == 0 + except Exception: + return False + + +def check_agent_health(name, agent): + """Check if an agent is alive and available.""" + if agent["type"] == "loop": + alive = check_process(f"agent-loop.*{name}") + elif agent["type"] == "gateway": + host = agent["ssh"].split("@")[1] + service = f"hermes-{name}" + alive = check_ssh_service(host, service) + else: + alive = False + return alive + + +def get_agent_status(): + """Get health status for all agents.""" + status = {} + for name, agent in AGENTS.items(): + alive = check_agent_health(name, agent) + status[name] = { + "alive": alive, + "type": agent["type"], + "strengths": agent["strengths"], + } + symbol = "UP" if alive else "DOWN" + print(f" {name}: {symbol} ({agent['type']})") + return status + + +# --------------------------------------------------------------------------- +# 4. DISPATCHER +# --------------------------------------------------------------------------- + +def classify_issue(issue): + """Classify issue type based on title and labels.""" + title = issue["title"].upper() + labels = " ".join(issue["labels"]).upper() + all_text = title + " " + labels + + types = [] + if any(w in all_text for w in ["BUG", "FIX", "BROKEN", "ERROR", "CRASH"]): + types.append("bug-fix") + if any(w in all_text for w in ["OPS", "DEPLOY", "CI", "INFRA", "PIPELINE", "MONITOR"]): + types.append("ops") + if any(w in all_text for w in ["SECURITY", "AUTH", "TOKEN", "CERT"]): + types.append("ops") + if any(w in all_text for w in ["RESEARCH", "AUDIT", "INVESTIGATE", "EXPLORE"]): + types.append("research") + if any(w in all_text for w in ["ARCHITECT", "DESIGN", "REFACTOR", "REWRITE"]): + types.append("architecture") + if any(w in all_text for w in ["TEST", "TESTING", "QA", "VALIDATE"]): + types.append("testing") + if any(w in all_text for w in ["CODE", "IMPLEMENT", "ADD", "CREATE", "BUILD"]): + types.append("code") + if any(w in all_text for w in ["SMALL", "QUICK", "SIMPLE", "MINOR", "TWEAK"]): + types.append("small-changes") + if any(w in all_text for w in ["COMPLEX", "MULTI", "LARGE", "OVERHAUL"]): + types.append("complex") + + if not types: + types = ["code"] # default + + return types + + +def match_agent(issue, agent_status, dispatched_this_cycle): + """Find the best available agent for an issue.""" + issue_types = classify_issue(issue) + candidates = [] + + for name, agent in AGENTS.items(): + # Agent must be alive + if not agent_status.get(name, {}).get("alive", False): + continue + + # Agent must handle this repo + if issue["repo"] not in agent["repos"]: + continue + + # Agent must not already be dispatched this cycle + if dispatched_this_cycle.get(name, 0) >= agent["max_concurrent"]: + continue + + # Score match based on overlapping strengths + overlap = len(set(issue_types) & set(agent["strengths"])) + candidates.append((name, overlap)) + + if not candidates: + return None + + # Sort by overlap score descending, return best match + candidates.sort(key=lambda x: x[1], reverse=True) + return candidates[0][0] + + +def assign_issue(repo, number, agent_name): + """Assign an issue to an agent on Gitea.""" + # First get current assignees to not clobber + result = gitea_request(f"/repos/{GITEA_OWNER}/{repo}/issues/{number}") + if not result: + return False + + current = [a.get("login", "") for a in result.get("assignees", [])] + if agent_name in current: + print(f" Already assigned to {agent_name}") + return True + + new_assignees = current + [agent_name] + patch_result = gitea_request( + f"/repos/{GITEA_OWNER}/{repo}/issues/{number}", + method="PATCH", + data={"assignees": new_assignees} + ) + return patch_result is not None + + +def dispatch_to_gateway(agent_name, agent, issue): + """Trigger work on a gateway agent via SSH.""" + host = agent["ssh"] + repo = issue["repo"] + number = issue["number"] + title = issue["title"] + + # Try to trigger dispatch via SSH + cmd = ( + f'ssh -o ConnectTimeout=10 -o StrictHostKeyChecking=no {host} ' + f'"echo \'Dispatched by orchestrator: {repo}#{number} - {title}\' ' + f'>> /tmp/hermes-dispatch.log"' + ) + try: + subprocess.run(cmd, shell=True, timeout=20, capture_output=True) + return True + except Exception as e: + print(f" [WARN] SSH dispatch to {agent_name} failed: {e}") + return False + + +def dispatch_cycle(backlog, agent_status, dry_run=False): + """Run one dispatch cycle. Returns dispatch report.""" + dispatched = [] + skipped = [] + dispatched_count = {} # agent_name -> count dispatched this cycle + + # Only dispatch unassigned issues (or issues not assigned to agents) + for issue in backlog: + agent_assigned = any(a.lower() in AGENT_USERNAMES for a in issue["assignees"]) + + if agent_assigned: + skipped.append((issue, "already assigned to agent")) + continue + + if issue["score"] < 5: + skipped.append((issue, "score too low")) + continue + + best_agent = match_agent(issue, agent_status, dispatched_count) + if not best_agent: + skipped.append((issue, "no available agent")) + continue + + if dry_run: + dispatched.append({ + "agent": best_agent, + "repo": issue["repo"], + "number": issue["number"], + "title": issue["title"], + "score": issue["score"], + "dry_run": True, + }) + dispatched_count[best_agent] = dispatched_count.get(best_agent, 0) + 1 + continue + + # Actually dispatch + print(f" Dispatching {issue['repo']}#{issue['number']} -> {best_agent}") + success = assign_issue(issue["repo"], issue["number"], best_agent) + if success: + agent = AGENTS[best_agent] + if agent["type"] == "gateway": + dispatch_to_gateway(best_agent, agent, issue) + + dispatched.append({ + "agent": best_agent, + "repo": issue["repo"], + "number": issue["number"], + "title": issue["title"], + "score": issue["score"], + }) + dispatched_count[best_agent] = dispatched_count.get(best_agent, 0) + 1 + else: + skipped.append((issue, "assignment failed")) + + return dispatched, skipped + + +# --------------------------------------------------------------------------- +# 5. CONSOLIDATED REPORT +# --------------------------------------------------------------------------- + +def generate_report(backlog, dispatched, skipped, agent_status, dry_run=False): + """Generate dispatch cycle report.""" + now = datetime.now().strftime("%Y-%m-%d %H:%M") + mode = " [DRY RUN]" if dry_run else "" + + lines = [] + lines.append(f"=== Sovereign Orchestrator Report{mode} ===") + lines.append(f"Time: {now}") + lines.append(f"Total backlog: {len(backlog)} issues") + lines.append("") + + # Agent health + lines.append("-- Agent Health --") + for name, info in agent_status.items(): + symbol = "UP" if info["alive"] else "DOWN" + lines.append(f" {name}: {symbol} ({info['type']})") + lines.append("") + + # Dispatched + lines.append(f"-- Dispatched: {len(dispatched)} --") + for d in dispatched: + dry = " (dry-run)" if d.get("dry_run") else "" + lines.append(f" [{d['score']}] {d['repo']}#{d['number']} -> {d['agent']}{dry}") + lines.append(f" {d['title'][:60]}") + lines.append("") + + # Skipped (top 10) + skip_summary = {} + for issue, reason in skipped: + skip_summary[reason] = skip_summary.get(reason, 0) + 1 + lines.append(f"-- Skipped: {len(skipped)} --") + for reason, count in sorted(skip_summary.items(), key=lambda x: -x[1]): + lines.append(f" {reason}: {count}") + lines.append("") + + # Top 5 unassigned + unassigned = [i for i in backlog if not i["assignees"]][:5] + lines.append("-- Top 5 Unassigned (by priority) --") + for i in unassigned: + lines.append(f" [{i['score']}] {i['repo']}#{i['number']}: {i['title'][:55]}") + lines.append("") + + report = "\n".join(lines) + return report + + +def format_telegram_report(backlog, dispatched, skipped, agent_status, dry_run=False): + """Format a compact Telegram message.""" + mode = " DRY RUN" if dry_run else "" + now = datetime.now().strftime("%H:%M") + + parts = [f"*Orchestrator{mode}* ({now})"] + parts.append(f"Backlog: {len(backlog)} | Dispatched: {len(dispatched)} | Skipped: {len(skipped)}") + + # Agent status line + agent_line = " | ".join( + f"{'✅' if v['alive'] else '❌'}{k}" for k, v in agent_status.items() + ) + parts.append(agent_line) + + if dispatched: + parts.append("") + parts.append("*Dispatched:*") + for d in dispatched[:5]: + dry = " 🔍" if d.get("dry_run") else "" + parts.append(f" `{d['repo']}#{d['number']}` → {d['agent']}{dry}") + + # Top unassigned + unassigned = [i for i in backlog if not i["assignees"]][:3] + if unassigned: + parts.append("") + parts.append("*Top unassigned:*") + for i in unassigned: + parts.append(f" [{i['score']}] `{i['repo']}#{i['number']}` {i['title'][:40]}") + + return "\n".join(parts) + + +# --------------------------------------------------------------------------- +# 6. MAIN +# --------------------------------------------------------------------------- + +def run_cycle(dry_run=False): + """Execute one full orchestration cycle.""" + global GITEA_TOKEN, TELEGRAM_TOKEN + GITEA_TOKEN = load_gitea_token() + TELEGRAM_TOKEN = load_telegram_token() + + print("\n[1/4] Reading backlog...") + backlog = read_backlog() + + print("\n[2/4] Scoring and prioritizing...") + backlog = prioritize_backlog(backlog) + for i in backlog[:10]: + print(f" [{i['score']:3d}] {i['repo']}/{i['number']}: {i['title'][:55]}") + + print("\n[3/4] Checking agent health...") + agent_status = get_agent_status() + + print("\n[4/4] Dispatching...") + dispatched, skipped = dispatch_cycle(backlog, agent_status, dry_run=dry_run) + + # Generate reports + report = generate_report(backlog, dispatched, skipped, agent_status, dry_run=dry_run) + print("\n" + report) + + # Send Telegram notification + if dispatched or not dry_run: + tg_msg = format_telegram_report(backlog, dispatched, skipped, agent_status, dry_run=dry_run) + send_telegram(tg_msg) + + return backlog, dispatched, skipped + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Sovereign Orchestrator v1") + parser.add_argument("--once", action="store_true", help="Single dispatch cycle") + parser.add_argument("--daemon", action="store_true", help="Run every 15 min") + parser.add_argument("--dry-run", action="store_true", help="Score/report only, no dispatch") + parser.add_argument("--interval", type=int, default=DAEMON_INTERVAL, + help=f"Daemon interval in seconds (default: {DAEMON_INTERVAL})") + args = parser.parse_args() + + if not any([args.once, args.daemon, args.dry_run]): + args.dry_run = True # safe default + print("[INFO] No mode specified, defaulting to --dry-run") + + print("=" * 60) + print(" SOVEREIGN ORCHESTRATOR v1") + print("=" * 60) + + if args.daemon: + print(f"[DAEMON] Running every {args.interval}s (Ctrl+C to stop)") + cycle = 0 + while True: + cycle += 1 + print(f"\n--- Cycle {cycle} ---") + try: + run_cycle(dry_run=args.dry_run) + except Exception as e: + print(f"[ERROR] Cycle failed: {e}") + print(f"[DAEMON] Sleeping {args.interval}s...") + time.sleep(args.interval) + else: + run_cycle(dry_run=args.dry_run) + + +if __name__ == "__main__": + main()