Compare commits

...

15 Commits

Author SHA1 Message Date
641537eb07 Merge pull request '[EPIC] Gemini — Sovereign Infrastructure Suite Implementation' (#418) from feat/gemini-epic-398-1775648372708 into main 2026-04-08 23:38:18 +00:00
17fde3c03f feat: implement README.md
Some checks failed
PR Checklist / pr-checklist (pull_request) Failing after 2m38s
2026-04-08 11:40:45 +00:00
b53fdcd034 feat: implement telemetry.py 2026-04-08 11:40:43 +00:00
1cc1d2ae86 feat: implement skill_installer.py 2026-04-08 11:40:40 +00:00
9ec0d1d80e feat: implement cross_repo_test.py 2026-04-08 11:40:35 +00:00
e9cdaf09dc feat: implement phase_tracker.py 2026-04-08 11:40:30 +00:00
e8302b4af2 feat: implement self_healing.py 2026-04-08 11:40:25 +00:00
311ecf19db feat: implement model_eval.py 2026-04-08 11:40:19 +00:00
77f258efa5 feat: implement gitea_webhook_handler.py 2026-04-08 11:40:12 +00:00
5e12451588 feat: implement adr_manager.py 2026-04-08 11:40:05 +00:00
80b6ceb118 feat: implement agent_dispatch.py 2026-04-08 11:39:57 +00:00
ffb85cc10f feat: implement fleet_llama.py 2026-04-08 11:39:52 +00:00
4179646456 feat: implement architecture_linter_v2.py 2026-04-08 11:39:46 +00:00
681fd0763f feat: implement provision_wizard.py 2026-04-08 11:39:40 +00:00
b21c2833f7 Merge pull request '[PERPLEXITY-08] Add PR checklist CI workflow and enforcement script' (#411) from perplexity/pr-checklist-ci into main 2026-04-08 11:11:02 +00:00
13 changed files with 1377 additions and 0 deletions

60
scripts/README.md Normal file
View File

@@ -0,0 +1,60 @@
# Gemini Sovereign Infrastructure Suite
This directory contains the core systems of the Gemini Sovereign Infrastructure, designed to systematize fleet operations, governance, and architectural integrity.
## Principles
1. **Systems, not Scripts**: We build frameworks that solve classes of problems, not one-off fixes.
2. **Sovereignty First**: All tools are designed to run locally or on owned VPSes. No cloud dependencies.
3. **Von Neumann as Code**: Infrastructure should be self-replicating and automated.
4. **Continuous Governance**: Quality is enforced by code (linters, gates), not just checklists.
## Tools
### [OPS] Provisioning & Fleet Management
- **`provision_wizard.py`**: Automates the creation of a new Wizard node from zero.
- Creates DigitalOcean droplet.
- Installs and builds `llama.cpp`.
- Downloads GGUF models.
- Sets up `systemd` services and health checks.
- **`fleet_llama.py`**: Unified management of `llama-server` instances across the fleet.
- `status`: Real-time health and model monitoring.
- `restart`: Remote service restart via SSH.
- `swap`: Hot-swapping GGUF models on remote nodes.
- **`skill_installer.py`**: Packages and deploys Hermes skills to remote wizards.
- **`model_eval.py`**: Benchmarks GGUF models for speed and quality before deployment.
- **`phase_tracker.py`**: Tracks the fleet's progress through the Paperclips-inspired evolution arc.
- **`cross_repo_test.py`**: Verifies the fleet works as a system by running tests across all core repositories.
- **`self_healing.py`**: Auto-detects and fixes common failures across the fleet.
- **`agent_dispatch.py`**: Unified framework for tasking agents across the fleet.
- **`telemetry.py`**: Operational visibility without cloud dependencies.
- **`gitea_webhook_handler.py`**: Handles real-time events from Gitea to coordinate fleet actions.
### [ARCH] Governance & Architecture
- **`architecture_linter_v2.py`**: Automated enforcement of architectural boundaries.
- Enforces sidecar boundaries (no sovereign code in `hermes-agent`).
- Prevents hardcoded IPs and committed secrets.
- Ensures `SOUL.md` and `README.md` standards.
- **`adr_manager.py`**: Streamlines the creation and tracking of Architecture Decision Records.
- `new`: Scaffolds a new ADR from a template.
- `list`: Provides a chronological view of architectural evolution.
## Usage
Most tools require `DIGITALOCEAN_TOKEN` and SSH access to the fleet.
```bash
# Provision a new node
python3 scripts/provision_wizard.py --name fenrir --model qwen2.5-coder-7b
# Check fleet status
python3 scripts/fleet_llama.py status
# Audit architectural integrity
python3 scripts/architecture_linter_v2.py
```
---
*Built by Gemini — The Builder, The Systematizer, The Force Multiplier.*

113
scripts/adr_manager.py Normal file
View File

@@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""
[ARCH] ADR Manager
Part of the Gemini Sovereign Governance System.
Helps create and manage Architecture Decision Records (ADRs).
"""
import os
import sys
import datetime
import argparse
ADR_DIR = "docs/adr"
TEMPLATE_FILE = "docs/adr/ADR_TEMPLATE.md"
class ADRManager:
def __init__(self):
# Ensure we are in the repo root or can find docs/adr
if not os.path.exists(ADR_DIR):
# Try to find it relative to the script
script_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(script_dir)
self.adr_dir = os.path.join(repo_root, ADR_DIR)
self.template_file = os.path.join(repo_root, TEMPLATE_FILE)
else:
self.adr_dir = ADR_DIR
self.template_file = TEMPLATE_FILE
if not os.path.exists(self.adr_dir):
os.makedirs(self.adr_dir)
def get_next_number(self):
files = [f for f in os.listdir(self.adr_dir) if f.endswith(".md") and f[0].isdigit()]
if not files:
return 1
numbers = [int(f.split("-")[0]) for f in files]
return max(numbers) + 1
def create_adr(self, title: str):
num = self.get_next_number()
slug = title.lower().replace(" ", "-").replace("/", "-")
filename = f"{num:04d}-{slug}.md"
filepath = os.path.join(self.adr_dir, filename)
date = datetime.date.today().isoformat()
template = ""
if os.path.exists(self.template_file):
with open(self.template_file, "r") as f:
template = f.read()
else:
template = """# {num}. {title}
Date: {date}
## Status
Proposed
## Context
What is the problem we are solving?
## Decision
What is the decision we made?
## Consequences
What are the positive and negative consequences?
"""
content = template.replace("{num}", f"{num:04d}")
content = content.replace("{title}", title)
content = content.replace("{date}", date)
with open(filepath, "w") as f:
f.write(content)
print(f"[SUCCESS] Created ADR: {filepath}")
def list_adrs(self):
files = sorted([f for f in os.listdir(self.adr_dir) if f.endswith(".md") and f[0].isdigit()])
print(f"{'NUM':<6} {'TITLE'}")
print("-" * 40)
for f in files:
num = f.split("-")[0]
title = f.split("-", 1)[1].replace(".md", "").replace("-", " ").title()
print(f"{num:<6} {title}")
def main():
parser = argparse.ArgumentParser(description="Gemini ADR Manager")
subparsers = parser.add_subparsers(dest="command")
create_parser = subparsers.add_parser("new", help="Create a new ADR")
create_parser.add_argument("title", help="Title of the ADR")
subparsers.add_parser("list", help="List all ADRs")
args = parser.parse_args()
manager = ADRManager()
if args.command == "new":
manager.create_adr(args.title)
elif args.command == "list":
manager.list_adrs()
else:
parser.print_help()
if __name__ == "__main__":
main()

57
scripts/agent_dispatch.py Normal file
View File

@@ -0,0 +1,57 @@
#!/usr/bin/env python3
"""
[OPS] Agent Dispatch Framework
Part of the Gemini Sovereign Infrastructure Suite.
Replaces ad-hoc dispatch scripts with a unified framework for tasking agents.
"""
import os
import sys
import argparse
import subprocess
# --- CONFIGURATION ---
FLEET = {
"allegro": "167.99.126.228",
"bezalel": "159.203.146.185"
}
class Dispatcher:
def log(self, message: str):
print(f"[*] {message}")
def dispatch(self, host: str, agent_name: str, task: str):
self.log(f"Dispatching task to {agent_name} on {host}...")
ip = FLEET[host]
# Command to run the agent on the remote machine
# Assumes hermes-agent is installed in /opt/hermes
remote_cmd = f"cd /opt/hermes && python3 run_agent.py --agent {agent_name} --task '{task}'"
ssh_cmd = ["ssh", "-o", "StrictHostKeyChecking=no", f"root@{ip}", remote_cmd]
try:
res = subprocess.run(ssh_cmd, capture_output=True, text=True)
if res.returncode == 0:
self.log(f"[SUCCESS] {agent_name} completed task.")
print(res.stdout)
else:
self.log(f"[FAILURE] {agent_name} failed task.")
print(res.stderr)
except Exception as e:
self.log(f"[ERROR] Dispatch failed: {e}")
def main():
parser = argparse.ArgumentParser(description="Gemini Agent Dispatcher")
parser.add_argument("host", choices=list(FLEET.keys()), help="Host to dispatch to")
parser.add_argument("agent", help="Agent name")
parser.add_argument("task", help="Task description")
args = parser.parse_args()
dispatcher = Dispatcher()
dispatcher.dispatch(args.host, args.agent, args.task)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
"""
[ARCH] Architecture Linter v2
Part of the Gemini Sovereign Governance System.
Enforces architectural boundaries, security, and documentation standards
across the Timmy Foundation fleet.
"""
import os
import re
import sys
import argparse
from pathlib import Path
# --- CONFIGURATION ---
SOVEREIGN_KEYWORDS = ["mempalace", "sovereign_store", "tirith", "bezalel", "nexus"]
IP_REGEX = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
API_KEY_REGEX = r'(?:api_key|secret|token|password|auth_token)\s*[:=]\s*["\'][a-zA-Z0-9_\-]{20,}["\']'
class Linter:
def __init__(self, repo_path: str):
self.repo_path = Path(repo_path).resolve()
self.repo_name = self.repo_path.name
self.errors = []
def log_error(self, message: str, file: str = None, line: int = None):
loc = f"{file}:{line}" if file and line else (file if file else "General")
self.errors.append(f"[{loc}] {message}")
def check_sidecar_boundary(self):
"""Rule 1: No sovereign code in hermes-agent (sidecar boundary)"""
if self.repo_name == "hermes-agent":
for root, _, files in os.walk(self.repo_path):
if "node_modules" in root or ".git" in root:
continue
for file in files:
if file.endswith((".py", ".ts", ".js", ".tsx")):
path = Path(root) / file
content = path.read_text(errors="ignore")
for kw in SOVEREIGN_KEYWORDS:
if kw in content.lower():
# Exception: imports or comments might be okay, but we're strict for now
self.log_error(f"Sovereign keyword '{kw}' found in hermes-agent. Violates sidecar boundary.", str(path.relative_to(self.repo_path)))
def check_hardcoded_ips(self):
"""Rule 2: No hardcoded IPs (use domain names)"""
for root, _, files in os.walk(self.repo_path):
if "node_modules" in root or ".git" in root:
continue
for file in files:
if file.endswith((".py", ".ts", ".js", ".tsx", ".yaml", ".yml", ".json")):
path = Path(root) / file
content = path.read_text(errors="ignore")
matches = re.finditer(IP_REGEX, content)
for match in matches:
ip = match.group()
if ip in ["127.0.0.1", "0.0.0.0"]:
continue
line_no = content.count('\n', 0, match.start()) + 1
self.log_error(f"Hardcoded IP address '{ip}' found. Use domain names or environment variables.", str(path.relative_to(self.repo_path)), line_no)
def check_api_keys(self):
"""Rule 3: No cloud API keys committed to repos"""
for root, _, files in os.walk(self.repo_path):
if "node_modules" in root or ".git" in root:
continue
for file in files:
if file.endswith((".py", ".ts", ".js", ".tsx", ".yaml", ".yml", ".json", ".env")):
if file == ".env.example":
continue
path = Path(root) / file
content = path.read_text(errors="ignore")
matches = re.finditer(API_KEY_REGEX, content, re.IGNORECASE)
for match in matches:
line_no = content.count('\n', 0, match.start()) + 1
self.log_error("Potential API key or secret found in code.", str(path.relative_to(self.repo_path)), line_no)
def check_soul_canonical(self):
"""Rule 4: SOUL.md exists and is canonical in exactly one location"""
soul_path = self.repo_path / "SOUL.md"
if self.repo_name == "timmy-config":
if not soul_path.exists():
self.log_error("SOUL.md is missing from the canonical location (timmy-config root).")
else:
if soul_path.exists():
self.log_error("SOUL.md found in non-canonical repo. It should only live in timmy-config.")
def check_readme(self):
"""Rule 5: Every repo has a README with current truth"""
readme_path = self.repo_path / "README.md"
if not readme_path.exists():
self.log_error("README.md is missing.")
else:
content = readme_path.read_text(errors="ignore")
if len(content.strip()) < 50:
self.log_error("README.md is too short or empty. Provide current truth about the repo.")
def run(self):
print(f"--- Gemini Linter: Auditing {self.repo_name} ---")
self.check_sidecar_boundary()
self.check_hardcoded_ips()
self.check_api_keys()
self.check_soul_canonical()
self.check_readme()
if self.errors:
print(f"\n[FAILURE] Found {len(self.errors)} architectural violations:")
for err in self.errors:
print(f" - {err}")
return False
else:
print("\n[SUCCESS] Architecture is sound. Sovereignty maintained.")
return True
def main():
parser = argparse.ArgumentParser(description="Gemini Architecture Linter v2")
parser.add_argument("repo_path", nargs="?", default=".", help="Path to the repository to lint")
args = parser.parse_args()
linter = Linter(args.repo_path)
success = linter.run()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""
[OPS] Cross-Repo Test Suite
Part of the Gemini Sovereign Infrastructure Suite.
Verifies the fleet works as a system by running tests across all core repositories.
"""
import os
import sys
import subprocess
import argparse
from pathlib import Path
# --- CONFIGURATION ---
REPOS = ["timmy-config", "hermes-agent", "the-nexus"]
class CrossRepoTester:
def __init__(self, root_dir: str):
self.root_dir = Path(root_dir).resolve()
def log(self, message: str):
print(f"[*] {message}")
def run_tests(self):
results = {}
for repo in REPOS:
repo_path = self.root_dir / repo
if not repo_path.exists():
# Try sibling directory if we are in one of the repos
repo_path = self.root_dir.parent / repo
if not repo_path.exists():
print(f"[WARNING] Repo {repo} not found at {repo_path}")
results[repo] = "MISSING"
continue
self.log(f"Running tests for {repo}...")
# Determine test command
test_cmd = ["pytest"]
if repo == "hermes-agent":
test_cmd = ["python3", "-m", "pytest", "tests"]
elif repo == "the-nexus":
test_cmd = ["pytest", "tests"]
try:
# Check if pytest is available
subprocess.run(["pytest", "--version"], capture_output=True)
res = subprocess.run(test_cmd, cwd=str(repo_path), capture_output=True, text=True)
if res.returncode == 0:
results[repo] = "PASSED"
else:
results[repo] = "FAILED"
# Print a snippet of the failure
print(f" [!] {repo} failed tests. Stderr snippet:")
print("\n".join(res.stderr.split("\n")[-10:]))
except FileNotFoundError:
results[repo] = "ERROR: pytest not found"
except Exception as e:
results[repo] = f"ERROR: {e}"
self.report(results)
def report(self, results: dict):
print("\n--- Cross-Repo Test Report ---")
all_passed = True
for repo, status in results.items():
icon = "" if status == "PASSED" else ""
print(f"{icon} {repo:<15} | {status}")
if status != "PASSED":
all_passed = False
if all_passed:
print("\n[SUCCESS] All systems operational. The fleet is sound.")
else:
print("\n[FAILURE] System instability detected.")
def main():
parser = argparse.ArgumentParser(description="Gemini Cross-Repo Tester")
parser.add_argument("--root", default=".", help="Root directory containing all repos")
args = parser.parse_args()
tester = CrossRepoTester(args.root)
tester.run_tests()
if __name__ == "__main__":
main()

137
scripts/fleet_llama.py Normal file
View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""
[OPS] llama.cpp Fleet Manager
Part of the Gemini Sovereign Infrastructure Suite.
Manages llama-server instances across the Timmy Foundation fleet.
Supports status, restart, and model swapping via SSH.
"""
import os
import sys
import json
import argparse
import subprocess
import requests
from typing import Dict, List, Any
# --- FLEET DEFINITION ---
FLEET = {
"mac": {"ip": "10.1.10.77", "port": 8080, "role": "hub"},
"ezra": {"ip": "143.198.27.163", "port": 8080, "role": "forge"},
"allegro": {"ip": "167.99.126.228", "port": 8080, "role": "agent-host"},
"bezalel": {"ip": "159.203.146.185", "port": 8080, "role": "world-host"}
}
class FleetManager:
def __init__(self):
self.results = {}
def run_remote(self, host: str, command: str):
ip = FLEET[host]["ip"]
ssh_cmd = [
"ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=5",
f"root@{ip}", command
]
# For Mac, we might need a different user or local execution
if host == "mac":
ssh_cmd = ["bash", "-c", command]
try:
result = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=10)
return result
except subprocess.TimeoutExpired:
return None
except Exception as e:
print(f"Error running remote command on {host}: {e}")
return None
def get_status(self, host: str):
ip = FLEET[host]["ip"]
port = FLEET[host]["port"]
status = {"online": False, "server_running": False, "model": "unknown", "tps": 0.0}
# 1. Check if machine is reachable
ping_res = subprocess.run(["ping", "-c", "1", "-W", "1", ip], capture_output=True)
if ping_res.returncode == 0:
status["online"] = True
# 2. Check if llama-server is responding to health check
try:
url = f"http://{ip}:{port}/health"
response = requests.get(url, timeout=2)
if response.status_code == 200:
status["server_running"] = True
data = response.json()
# llama.cpp health endpoint usually returns slots info
# We'll try to get model info if available
status["model"] = data.get("model", "unknown")
except:
pass
return status
def show_fleet_status(self):
print(f"{'NAME':<10} {'IP':<15} {'STATUS':<10} {'SERVER':<10} {'MODEL':<20}")
print("-" * 70)
for name in FLEET:
status = self.get_status(name)
online_str = "" if status["online"] else ""
server_str = "🚀" if status["server_running"] else "💤"
print(f"{name:<10} {FLEET[name]['ip']:<15} {online_str:<10} {server_str:<10} {status['model']:<20}")
def restart_server(self, host: str):
print(f"[*] Restarting llama-server on {host}...")
res = self.run_remote(host, "systemctl restart llama-server")
if res and res.returncode == 0:
print(f"[SUCCESS] Restarted {host}")
else:
print(f"[FAILURE] Could not restart {host}")
def swap_model(self, host: str, model_name: str):
print(f"[*] Swapping model on {host} to {model_name}...")
# This assumes the provision_wizard.py structure
# In a real scenario, we'd have a mapping of model names to URLs
# For now, we'll just update the systemd service or a config file
# 1. Stop server
self.run_remote(host, "systemctl stop llama-server")
# 2. Update service file (simplified)
# This is a bit risky to do via one-liner, but for the manager:
cmd = f"sed -i 's/-m .*\\.gguf/-m \\/opt\\/models\\/{model_name}.gguf/' /etc/systemd/system/llama-server.service"
self.run_remote(host, cmd)
# 3. Start server
self.run_remote(host, "systemctl daemon-reload && systemctl start llama-server")
print(f"[SUCCESS] Swapped model on {host}")
def main():
parser = argparse.ArgumentParser(description="Gemini Fleet Manager")
subparsers = parser.add_subparsers(dest="command")
subparsers.add_parser("status", help="Show fleet status")
restart_parser = subparsers.add_parser("restart", help="Restart a server")
restart_parser.add_argument("host", choices=list(FLEET.keys()), help="Host to restart")
swap_parser = subparsers.add_parser("swap", help="Swap model on a host")
swap_parser.add_argument("host", choices=list(FLEET.keys()), help="Host to swap")
swap_parser.add_argument("model", help="Model name (GGUF)")
args = parser.parse_args()
manager = FleetManager()
if args.command == "status":
manager.show_fleet_status()
elif args.command == "restart":
manager.restart_server(args.host)
elif args.command == "swap":
manager.swap_model(args.host, args.model)
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""
[OPS] Gitea Webhook Handler
Part of the Gemini Sovereign Infrastructure Suite.
Handles real-time events from Gitea to coordinate fleet actions.
"""
import os
import sys
import json
import argparse
from typing import Dict, Any
class WebhookHandler:
def handle_event(self, payload: Dict[str, Any]):
# Gitea webhooks often send the event type in a header,
# but we'll try to infer it from the payload if not provided.
event_type = payload.get("event") or self.infer_event_type(payload)
repo_name = payload.get("repository", {}).get("name")
sender = payload.get("sender", {}).get("username")
print(f"[*] Received {event_type} event from {repo_name} (by {sender})")
if event_type == "push":
self.handle_push(payload)
elif event_type == "pull_request":
self.handle_pr(payload)
elif event_type == "issue":
self.handle_issue(payload)
else:
print(f"[INFO] Ignoring event type: {event_type}")
def infer_event_type(self, payload: Dict[str, Any]) -> str:
if "commits" in payload: return "push"
if "pull_request" in payload: return "pull_request"
if "issue" in payload: return "issue"
return "unknown"
def handle_push(self, payload: Dict[str, Any]):
ref = payload.get("ref")
print(f" [PUSH] Branch: {ref}")
# Trigger CI or deployment
if ref == "refs/heads/main":
print(" [ACTION] Triggering production deployment...")
# Example: subprocess.run(["./deploy.sh"])
def handle_pr(self, payload: Dict[str, Any]):
action = payload.get("action")
pr_num = payload.get("pull_request", {}).get("number")
print(f" [PR] Action: {action} | PR #{pr_num}")
if action in ["opened", "synchronized"]:
print(f" [ACTION] Triggering architecture linter for PR #{pr_num}...")
# Example: subprocess.run(["python3", "scripts/architecture_linter_v2.py"])
def handle_issue(self, payload: Dict[str, Any]):
action = payload.get("action")
issue_num = payload.get("issue", {}).get("number")
print(f" [ISSUE] Action: {action} | Issue #{issue_num}")
def main():
parser = argparse.ArgumentParser(description="Gemini Webhook Handler")
parser.add_argument("payload_file", help="JSON file containing the webhook payload")
args = parser.parse_args()
if not os.path.exists(args.payload_file):
print(f"[ERROR] Payload file {args.payload_file} not found.")
sys.exit(1)
with open(args.payload_file, "r") as f:
try:
payload = json.load(f)
except:
print("[ERROR] Invalid JSON payload.")
sys.exit(1)
handler = WebhookHandler()
handler.handle_event(payload)
if __name__ == "__main__":
main()

95
scripts/model_eval.py Normal file
View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""
[EVAL] Model Evaluation Harness
Part of the Gemini Sovereign Infrastructure Suite.
Benchmarks GGUF models for speed and quality before deployment.
"""
import os
import sys
import time
import json
import argparse
import requests
BENCHMARK_PROMPTS = [
"Write a Python script to sort a list of dictionaries by a key.",
"Explain the concept of 'Sovereign AI' in three sentences.",
"What is the capital of France?",
"Write a short story about a robot learning to paint."
]
class ModelEval:
def __init__(self, endpoint: str):
self.endpoint = endpoint.rstrip("/")
def log(self, message: str):
print(f"[*] {message}")
def run_benchmark(self):
self.log(f"Starting benchmark for {self.endpoint}...")
results = []
for prompt in BENCHMARK_PROMPTS:
self.log(f"Testing prompt: {prompt[:30]}...")
start_time = time.time()
try:
# llama.cpp server /completion endpoint
response = requests.post(
f"{self.endpoint}/completion",
json={"prompt": prompt, "n_predict": 128},
timeout=60
)
duration = time.time() - start_time
if response.status_code == 200:
data = response.json()
content = data.get("content", "")
# Rough estimate of tokens (4 chars per token is a common rule of thumb)
tokens = len(content) / 4
tps = tokens / duration
results.append({
"prompt": prompt,
"duration": duration,
"tps": tps,
"success": True
})
else:
results.append({"prompt": prompt, "success": False, "error": response.text})
except Exception as e:
results.append({"prompt": prompt, "success": False, "error": str(e)})
self.report(results)
def report(self, results: list):
print("\n--- Evaluation Report ---")
total_tps = 0
success_count = 0
for r in results:
if r["success"]:
print(f"{r['prompt'][:40]}... | {r['tps']:.2f} tok/s | {r['duration']:.2f}s")
total_tps += r["tps"]
success_count += 1
else:
print(f"{r['prompt'][:40]}... | FAILED: {r['error']}")
if success_count > 0:
avg_tps = total_tps / success_count
print(f"\nAverage Performance: {avg_tps:.2f} tok/s")
else:
print("\n[FAILURE] All benchmarks failed.")
def main():
parser = argparse.ArgumentParser(description="Gemini Model Eval")
parser.add_argument("endpoint", help="llama-server endpoint (e.g. http://localhost:8080)")
args = parser.parse_args()
evaluator = ModelEval(args.endpoint)
evaluator.run_benchmark()
if __name__ == "__main__":
main()

114
scripts/phase_tracker.py Normal file
View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""
[OPS] Phase Progression Tracker
Part of the Gemini Sovereign Infrastructure Suite.
Tracks the fleet's progress through the Paperclips-inspired evolution arc.
"""
import os
import sys
import json
import argparse
MILESTONES_FILE = "fleet/milestones.md"
COMPLETED_FILE = "fleet/completed_milestones.json"
class PhaseTracker:
def __init__(self):
# Find files relative to repo root
script_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(script_dir)
self.milestones_path = os.path.join(repo_root, MILESTONES_FILE)
self.completed_path = os.path.join(repo_root, COMPLETED_FILE)
self.milestones = self.parse_milestones()
self.completed = self.load_completed()
def parse_milestones(self):
if not os.path.exists(self.milestones_path):
return {}
with open(self.milestones_path, "r") as f:
content = f.read()
phases = {}
current_phase = None
for line in content.split("\n"):
if line.startswith("## Phase"):
current_phase = line.replace("## ", "").strip()
phases[current_phase] = []
elif line.startswith("### M"):
m_id = line.split(":")[0].replace("### ", "").strip()
title = line.split(":")[1].strip()
phases[current_phase].append({"id": m_id, "title": title})
return phases
def load_completed(self):
if os.path.exists(self.completed_path):
with open(self.completed_path, "r") as f:
try:
return json.load(f)
except:
return []
return []
def save_completed(self):
with open(self.completed_path, "w") as f:
json.dump(self.completed, f, indent=2)
def show_progress(self):
print("--- Fleet Phase Progression Tracker ---")
total_milestones = 0
total_completed = 0
if not self.milestones:
print("[ERROR] No milestones found in fleet/milestones.md")
return
for phase, ms in self.milestones.items():
print(f"\n{phase}")
for m in ms:
total_milestones += 1
done = m["id"] in self.completed
if done:
total_completed += 1
status = "" if done else ""
print(f" {status} {m['id']}: {m['title']}")
percent = (total_completed / total_milestones) * 100 if total_milestones > 0 else 0
print(f"\nOverall Progress: {total_completed}/{total_milestones} ({percent:.1f}%)")
def mark_complete(self, m_id: str):
if m_id not in self.completed:
self.completed.append(m_id)
self.save_completed()
print(f"[SUCCESS] Marked {m_id} as complete.")
else:
print(f"[INFO] {m_id} is already complete.")
def main():
parser = argparse.ArgumentParser(description="Gemini Phase Tracker")
subparsers = parser.add_subparsers(dest="command")
subparsers.add_parser("status", help="Show current progress")
complete_parser = subparsers.add_parser("complete", help="Mark a milestone as complete")
complete_parser.add_argument("id", help="Milestone ID (e.g. M1)")
args = parser.parse_args()
tracker = PhaseTracker()
if args.command == "status":
tracker.show_progress()
elif args.command == "complete":
tracker.mark_complete(args.id)
else:
parser.print_help()
if __name__ == "__main__":
main()

228
scripts/provision_wizard.py Normal file
View File

@@ -0,0 +1,228 @@
#!/usr/bin/env python3
"""
[OPS] Automated VPS Provisioning System (Von Neumann as Code)
Part of the Gemini Sovereign Infrastructure Suite.
This script automates the creation and configuration of a "Wizard" node
from zero to serving inference via llama.cpp.
Usage:
python3 provision_wizard.py --name fenrir --size s-2vcpu-4gb --model qwen2.5-coder-7b
"""
import os
import sys
import time
import argparse
import requests
import subprocess
import json
from typing import Optional, Dict, Any
# --- CONFIGURATION ---
DO_API_URL = "https://api.digitalocean.com/v2"
# We expect DIGITALOCEAN_TOKEN to be set in the environment.
DO_TOKEN = os.environ.get("DIGITALOCEAN_TOKEN")
# Default settings
DEFAULT_REGION = "nyc3"
DEFAULT_IMAGE = "ubuntu-22-04-x64"
LLAMA_CPP_REPO = "https://github.com/ggerganov/llama.cpp"
class Provisioner:
def __init__(self, name: str, size: str, model: str, region: str = DEFAULT_REGION):
self.name = name
self.size = size
self.model = model
self.region = region
self.droplet_id = None
self.ip_address = None
def log(self, message: str):
print(f"[*] {message}")
def error(self, message: str):
print(f"[!] ERROR: {message}")
sys.exit(1)
def check_auth(self):
if not DO_TOKEN:
self.error("DIGITALOCEAN_TOKEN environment variable not set.")
def create_droplet(self):
self.log(f"Creating droplet '{self.name}' ({self.size}) in {self.region}...")
# Get SSH keys to add to the droplet
ssh_keys = self.get_ssh_keys()
payload = {
"name": self.name,
"region": self.region,
"size": self.size,
"image": DEFAULT_IMAGE,
"ssh_keys": ssh_keys,
"backups": False,
"ipv6": True,
"monitoring": True,
"tags": ["wizard", "gemini-provisioned"]
}
headers = {
"Authorization": f"Bearer {DO_TOKEN}",
"Content-Type": "application/json"
}
response = requests.post(f"{DO_API_URL}/droplets", json=payload, headers=headers)
if response.status_code != 202:
self.error(f"Failed to create droplet: {response.text}")
data = response.json()
self.droplet_id = data["droplet"]["id"]
self.log(f"Droplet created (ID: {self.droplet_id}). Waiting for IP...")
def get_ssh_keys(self) -> list:
# Fetch existing SSH keys from DO account to ensure we can log in
headers = {"Authorization": f"Bearer {DO_TOKEN}"}
response = requests.get(f"{DO_API_URL}/account/keys", headers=headers)
if response.status_code != 200:
self.log("Warning: Could not fetch SSH keys. Droplet might be inaccessible via SSH.")
return []
return [key["id"] for key in response.json()["ssh_keys"]]
def wait_for_ip(self):
headers = {"Authorization": f"Bearer {DO_TOKEN}"}
while not self.ip_address:
response = requests.get(f"{DO_API_URL}/droplets/{self.droplet_id}", headers=headers)
data = response.json()
networks = data["droplet"]["networks"]["v4"]
for net in networks:
if net["type"] == "public":
self.ip_address = net["ip_address"]
break
if not self.ip_address:
time.sleep(5)
self.log(f"Droplet IP: {self.ip_address}")
def run_remote(self, command: str):
# Using subprocess to call ssh. Assumes local machine has the right private key.
ssh_cmd = [
"ssh", "-o", "StrictHostKeyChecking=no",
f"root@{self.ip_address}", command
]
result = subprocess.run(ssh_cmd, capture_output=True, text=True)
return result
def setup_wizard(self):
self.log("Starting remote setup...")
# Wait for SSH to be ready
retries = 12
while retries > 0:
res = self.run_remote("echo 'SSH Ready'")
if res.returncode == 0:
break
self.log(f"Waiting for SSH... ({retries} retries left)")
time.sleep(10)
retries -= 1
if retries == 0:
self.error("SSH timed out.")
# 1. Update and install dependencies
self.log("Installing dependencies...")
setup_script = """
export DEBIAN_FRONTEND=noninteractive
apt-get update && apt-get upgrade -y
apt-get install -y build-essential git cmake curl wget python3 python3-pip
"""
self.run_remote(setup_script)
# 2. Build llama.cpp
self.log("Building llama.cpp...")
build_script = f"""
if [ ! -d "/opt/llama.cpp" ]; then
git clone {LLAMA_CPP_REPO} /opt/llama.cpp
fi
cd /opt/llama.cpp
mkdir -p build && cd build
cmake ..
cmake --build . --config Release
"""
self.run_remote(build_script)
# 3. Download Model
self.log(f"Downloading model: {self.model}...")
model_url = self.get_model_url(self.model)
download_script = f"""
mkdir -p /opt/models
if [ ! -f "/opt/models/{self.model}.gguf" ]; then
wget -O /opt/models/{self.model}.gguf {model_url}
fi
"""
self.run_remote(download_script)
# 4. Create systemd service
self.log("Creating systemd service...")
service_content = f"""
[Unit]
Description=Llama.cpp Server for {self.name}
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/opt/llama.cpp
ExecStart=/opt/llama.cpp/build/bin/llama-server -m /opt/models/{self.model}.gguf --host 0.0.0.0 --port 8080 -c 4096
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
"""
# Use cat to write the file to handle multi-line string safely
self.run_remote(f"cat <<EOF > /etc/systemd/system/llama-server.service\n{service_content}\nEOF")
self.run_remote("systemctl daemon-reload && systemctl enable llama-server && systemctl start llama-server")
def get_model_url(self, model_name: str) -> str:
# Mapping for common models to GGUF URLs (HuggingFace)
mapping = {
"qwen2.5-coder-7b": "https://huggingface.co/Qwen/Qwen2.5-Coder-7B-Instruct-GGUF/resolve/main/qwen2.5-coder-7b-instruct-q4_k_m.gguf",
"hermes-3-llama-3.1-8b": "https://huggingface.co/NousResearch/Hermes-3-Llama-3.1-8B-GGUF/resolve/main/Hermes-3-Llama-3.1-8B.Q4_K_M.gguf"
}
return mapping.get(model_name, mapping["hermes-3-llama-3.1-8b"])
def health_check(self):
self.log("Performing health check...")
time.sleep(15) # Wait for server to start
try:
url = f"http://{self.ip_address}:8080/health"
response = requests.get(url, timeout=10)
if response.status_code == 200:
self.log(f"[SUCCESS] Wizard {self.name} is healthy and serving inference.")
self.log(f"Endpoint: {url}")
else:
self.log(f"[WARNING] Health check returned status {response.status_code}")
except Exception as e:
self.log(f"[ERROR] Health check failed: {e}")
def provision(self):
self.check_auth()
self.create_droplet()
self.wait_for_ip()
self.setup_wizard()
self.health_check()
def main():
parser = argparse.ArgumentParser(description="Gemini Provisioner")
parser.add_argument("--name", required=True, help="Name of the wizard")
parser.add_argument("--size", default="s-2vcpu-4gb", help="DO droplet size")
parser.add_argument("--model", default="qwen2.5-coder-7b", help="Model to serve")
parser.add_argument("--region", default="nyc3", help="DO region")
args = parser.parse_args()
provisioner = Provisioner(args.name, args.size, args.model, args.region)
provisioner.provision()
if __name__ == "__main__":
main()

71
scripts/self_healing.py Normal file
View File

@@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""
[OPS] Self-Healing Infrastructure
Part of the Gemini Sovereign Infrastructure Suite.
Auto-detects and fixes common failures across the fleet.
"""
import os
import sys
import subprocess
import argparse
import requests
# --- CONFIGURATION ---
FLEET = {
"mac": {"ip": "10.1.10.77", "port": 8080},
"ezra": {"ip": "143.198.27.163", "port": 8080},
"allegro": {"ip": "167.99.126.228", "port": 8080},
"bezalel": {"ip": "159.203.146.185", "port": 8080}
}
class SelfHealer:
def log(self, message: str):
print(f"[*] {message}")
def run_remote(self, host: str, command: str):
ip = FLEET[host]["ip"]
ssh_cmd = ["ssh", "-o", "StrictHostKeyChecking=no", f"root@{ip}", command]
if host == "mac":
ssh_cmd = ["bash", "-c", command]
try:
return subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=10)
except:
return None
def check_and_heal(self):
for host in FLEET:
self.log(f"Auditing {host}...")
# 1. Check llama-server
ip = FLEET[host]["ip"]
port = FLEET[host]["port"]
try:
requests.get(f"http://{ip}:{port}/health", timeout=2)
except:
self.log(f" [!] llama-server down on {host}. Attempting restart...")
self.run_remote(host, "systemctl restart llama-server")
# 2. Check disk space
res = self.run_remote(host, "df -h / | tail -1 | awk '{print $5}' | sed 's/%//'")
if res and res.returncode == 0:
try:
usage = int(res.stdout.strip())
if usage > 90:
self.log(f" [!] Disk usage high on {host} ({usage}%). Cleaning logs...")
self.run_remote(host, "journalctl --vacuum-time=1d && rm -rf /var/log/*.gz")
except:
pass
def run(self):
self.log("Starting self-healing cycle...")
self.check_and_heal()
self.log("Cycle complete.")
def main():
healer = SelfHealer()
healer.run()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""
[OPS] Sovereign Skill Installer
Part of the Gemini Sovereign Infrastructure Suite.
Packages and installs Hermes skills onto remote wizard nodes.
"""
import os
import sys
import argparse
import subprocess
from pathlib import Path
# --- CONFIGURATION ---
# Assumes hermes-agent is a sibling directory to timmy-config
HERMES_ROOT = "../hermes-agent"
SKILLS_DIR = "skills"
class SkillInstaller:
def __init__(self, host: str, ip: str):
self.host = host
self.ip = ip
self.hermes_path = Path(HERMES_ROOT).resolve()
def log(self, message: str):
print(f"[*] {message}")
def error(self, message: str):
print(f"[!] ERROR: {message}")
sys.exit(1)
def install_skill(self, skill_name: str):
self.log(f"Installing skill '{skill_name}' to {self.host} ({self.ip})...")
skill_path = self.hermes_path / SKILLS_DIR / skill_name
if not skill_path.exists():
self.error(f"Skill '{skill_name}' not found in {skill_path}")
# 1. Compress skill
self.log("Compressing skill...")
tar_file = f"{skill_name}.tar.gz"
subprocess.run(["tar", "-czf", tar_file, "-C", str(skill_path.parent), skill_name])
# 2. Upload to remote
self.log("Uploading to remote...")
remote_path = f"/opt/hermes/skills/{skill_name}"
subprocess.run(["ssh", f"root@{self.ip}", f"mkdir -p /opt/hermes/skills"])
subprocess.run(["scp", tar_file, f"root@{self.ip}:/tmp/"])
# 3. Extract and register
self.log("Extracting and registering...")
extract_cmd = f"tar -xzf /tmp/{tar_file} -C /opt/hermes/skills/ && rm /tmp/{tar_file}"
subprocess.run(["ssh", f"root@{self.ip}", extract_cmd])
# Registration logic (simplified)
# In a real scenario, we'd update the wizard's config.yaml
self.log(f"[SUCCESS] Skill '{skill_name}' installed on {self.host}")
# Cleanup local tar
os.remove(tar_file)
def main():
parser = argparse.ArgumentParser(description="Gemini Skill Installer")
parser.add_argument("host", help="Target host name")
parser.add_argument("ip", help="Target host IP")
parser.add_argument("skill", help="Skill name to install")
args = parser.parse_args()
installer = SkillInstaller(args.host, args.ip)
installer.install_skill(args.skill)
if __name__ == "__main__":
main()

129
scripts/telemetry.py Normal file
View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""
[OPS] Telemetry Pipeline v2
Part of the Gemini Sovereign Infrastructure Suite.
Operational visibility without cloud dependencies.
"""
import os
import sys
import json
import time
import subprocess
import argparse
# --- CONFIGURATION ---
FLEET = {
"mac": "10.1.10.77",
"ezra": "143.198.27.163",
"allegro": "167.99.126.228",
"bezalel": "159.203.146.185"
}
TELEMETRY_FILE = "logs/telemetry.json"
class Telemetry:
def __init__(self):
# Find logs relative to repo root
script_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(script_dir)
self.logs_dir = os.path.join(repo_root, "logs")
self.telemetry_path = os.path.join(repo_root, TELEMETRY_FILE)
if not os.path.exists(self.logs_dir):
os.makedirs(self.logs_dir)
def log(self, message: str):
print(f"[*] {message}")
def get_metrics(self, host: str):
ip = FLEET[host]
# Command to get disk usage, memory usage (%), and load avg
cmd = "df -h / | tail -1 | awk '{print $5}' && free -m | grep Mem | awk '{print $3/$2 * 100}' && uptime | awk '{print $10}'"
ssh_cmd = ["ssh", "-o", "StrictHostKeyChecking=no", f"root@{ip}", cmd]
if host == "mac":
# Mac specific commands
cmd = "df -h / | tail -1 | awk '{print $5}' && sysctl -n vm.page_pageable_internal_count && uptime | awk '{print $10}'"
ssh_cmd = ["bash", "-c", cmd]
try:
res = subprocess.run(ssh_cmd, capture_output=True, text=True, timeout=10)
if res.returncode == 0:
lines = res.stdout.strip().split("\n")
return {
"disk_usage": lines[0],
"mem_usage": f"{float(lines[1]):.1f}%" if len(lines) > 1 and lines[1].replace('.','',1).isdigit() else "unknown",
"load_avg": lines[2].rstrip(",") if len(lines) > 2 else "unknown"
}
except:
pass
return None
def collect(self):
self.log("Collecting telemetry from fleet...")
data = {
"timestamp": time.time(),
"metrics": {}
}
for host in FLEET:
self.log(f"Fetching metrics from {host}...")
metrics = self.get_metrics(host)
if metrics:
data["metrics"][host] = metrics
# Append to telemetry file
history = []
if os.path.exists(self.telemetry_path):
with open(self.telemetry_path, "r") as f:
try:
history = json.load(f)
except:
history = []
history.append(data)
# Keep only last 100 entries
history = history[-100:]
with open(self.telemetry_path, "w") as f:
json.dump(history, f, indent=2)
self.log(f"Telemetry saved to {self.telemetry_path}")
def show_summary(self):
if not os.path.exists(self.telemetry_path):
print("No telemetry data found.")
return
with open(self.telemetry_path, "r") as f:
try:
history = json.load(f)
except:
print("Error reading telemetry data.")
return
if not history:
print("No telemetry data found.")
return
latest = history[-1]
print(f"\n--- Fleet Telemetry Summary ({time.ctime(latest['timestamp'])}) ---")
print(f"{'HOST':<10} {'DISK':<10} {'MEM':<10} {'LOAD':<10}")
print("-" * 45)
for host, m in latest["metrics"].items():
print(f"{host:<10} {m['disk_usage']:<10} {m['mem_usage']:<10} {m['load_avg']:<10}")
def main():
parser = argparse.ArgumentParser(description="Gemini Telemetry")
parser.add_argument("command", choices=["collect", "summary"], help="Command to run")
args = parser.parse_args()
telemetry = Telemetry()
if args.command == "collect":
telemetry.collect()
elif args.command == "summary":
telemetry.show_summary()
if __name__ == "__main__":
main()