Compare commits
18 Commits
purge/open
...
perplexity
| Author | SHA1 | Date | |
|---|---|---|---|
| 55fc678dc3 | |||
| 77a95d0ca1 | |||
| 9677785d8a | |||
| a5ac4cc675 | |||
| d801c5bc78 | |||
| 90dbd8212c | |||
| a1d1359deb | |||
| a91d7e5f4f | |||
| 92415ce18c | |||
| 3040938c46 | |||
| 99af3526ce | |||
| af3ba9d594 | |||
| 7813871296 | |||
| de83f1fda8 | |||
|
|
6863d9c0c5 | ||
|
|
b49a0abf39 | ||
|
|
72de3eebdf | ||
| f9388f6875 |
@@ -59,7 +59,21 @@ jobs:
|
||||
- name: Flake8 critical errors only
|
||||
run: |
|
||||
flake8 --select=E9,F63,F7,F82 --show-source --statistics \
|
||||
scripts/ allegro/ cron/ || true
|
||||
scripts/ bin/ tests/
|
||||
|
||||
python-test:
|
||||
name: Python Test Suite
|
||||
runs-on: ubuntu-latest
|
||||
needs: python-check
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.11'
|
||||
- name: Install test dependencies
|
||||
run: pip install pytest pyyaml
|
||||
- name: Run tests
|
||||
run: python3 -m pytest tests/ -v --tb=short
|
||||
|
||||
shell-lint:
|
||||
name: Shell Script Lint
|
||||
@@ -70,7 +84,7 @@ jobs:
|
||||
run: sudo apt-get install -y shellcheck
|
||||
- name: Lint shell scripts
|
||||
run: |
|
||||
find . -name '*.sh' -print0 | xargs -0 -r shellcheck --severity=error || true
|
||||
find . -name '*.sh' -not -path './.git/*' -print0 | xargs -0 -r shellcheck --severity=error
|
||||
|
||||
cron-validate:
|
||||
name: Cron Syntax Check
|
||||
|
||||
120
bin/conflict_detector.py
Normal file
120
bin/conflict_detector.py
Normal file
@@ -0,0 +1,120 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Merge Conflict Detector — catches sibling PRs that will conflict.
|
||||
|
||||
When multiple PRs branch from the same base commit and touch the same files,
|
||||
merging one invalidates the others. This script detects that pattern
|
||||
before it creates a rebase cascade.
|
||||
|
||||
Usage:
|
||||
python3 conflict_detector.py # Check all repos
|
||||
python3 conflict_detector.py --repo OWNER/REPO # Check one repo
|
||||
|
||||
Environment:
|
||||
GITEA_URL — Gitea instance URL
|
||||
GITEA_TOKEN — API token
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import urllib.request
|
||||
from collections import defaultdict
|
||||
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
|
||||
REPOS = [
|
||||
"Timmy_Foundation/the-nexus",
|
||||
"Timmy_Foundation/timmy-config",
|
||||
"Timmy_Foundation/timmy-home",
|
||||
"Timmy_Foundation/fleet-ops",
|
||||
"Timmy_Foundation/hermes-agent",
|
||||
"Timmy_Foundation/the-beacon",
|
||||
]
|
||||
|
||||
def api(path):
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
req = urllib.request.Request(url)
|
||||
if GITEA_TOKEN:
|
||||
req.add_header("Authorization", f"token {GITEA_TOKEN}")
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
return json.loads(resp.read())
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def check_repo(repo):
|
||||
"""Find sibling PRs that touch the same files."""
|
||||
prs = api(f"/repos/{repo}/pulls?state=open&limit=50")
|
||||
if not prs:
|
||||
return []
|
||||
|
||||
# Group PRs by base commit
|
||||
by_base = defaultdict(list)
|
||||
for pr in prs:
|
||||
base_sha = pr.get("merge_base", pr.get("base", {}).get("sha", "unknown"))
|
||||
by_base[base_sha].append(pr)
|
||||
|
||||
conflicts = []
|
||||
|
||||
for base_sha, siblings in by_base.items():
|
||||
if len(siblings) < 2:
|
||||
continue
|
||||
|
||||
# Get files for each sibling
|
||||
file_map = {}
|
||||
for pr in siblings:
|
||||
files = api(f"/repos/{repo}/pulls/{pr['number']}/files")
|
||||
if files:
|
||||
file_map[pr['number']] = set(f['filename'] for f in files)
|
||||
|
||||
# Find overlapping file sets
|
||||
pr_nums = list(file_map.keys())
|
||||
for i in range(len(pr_nums)):
|
||||
for j in range(i+1, len(pr_nums)):
|
||||
a, b = pr_nums[i], pr_nums[j]
|
||||
overlap = file_map[a] & file_map[b]
|
||||
if overlap:
|
||||
conflicts.append({
|
||||
"repo": repo,
|
||||
"pr_a": a,
|
||||
"pr_b": b,
|
||||
"base": base_sha[:8],
|
||||
"files": sorted(overlap),
|
||||
"title_a": next(p["title"] for p in siblings if p["number"] == a),
|
||||
"title_b": next(p["title"] for p in siblings if p["number"] == b),
|
||||
})
|
||||
|
||||
return conflicts
|
||||
|
||||
def main():
|
||||
repos = REPOS
|
||||
if "--repo" in sys.argv:
|
||||
idx = sys.argv.index("--repo") + 1
|
||||
if idx < len(sys.argv):
|
||||
repos = [sys.argv[idx]]
|
||||
|
||||
all_conflicts = []
|
||||
for repo in repos:
|
||||
conflicts = check_repo(repo)
|
||||
all_conflicts.extend(conflicts)
|
||||
|
||||
if not all_conflicts:
|
||||
print("No sibling PR conflicts detected. Queue is clean.")
|
||||
return 0
|
||||
|
||||
print(f"Found {len(all_conflicts)} potential merge conflicts:")
|
||||
print()
|
||||
for c in all_conflicts:
|
||||
print(f" {c['repo']}:")
|
||||
print(f" PR #{c['pr_a']} vs #{c['pr_b']} (base: {c['base']})")
|
||||
print(f" #{c['pr_a']}: {c['title_a'][:60]}")
|
||||
print(f" #{c['pr_b']}: {c['title_b'][:60]}")
|
||||
print(f" Overlapping files: {', '.join(c['files'])}")
|
||||
print(f" → Merge one first, then rebase the other.")
|
||||
print()
|
||||
|
||||
return 1
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
11
scripts/captcha_bypass_handler.py
Normal file
11
scripts/captcha_bypass_handler.py
Normal file
@@ -0,0 +1,11 @@
|
||||
import json
|
||||
from hermes_tools import browser_navigate, browser_vision
|
||||
|
||||
def bypass_captcha():
|
||||
analysis = browser_vision(
|
||||
question="Solve the CAPTCHA on the current page. Provide the solution text or coordinate clicks required. Provide a PASS/FAIL."
|
||||
)
|
||||
return {"status": "PASS" if "PASS" in analysis.upper() else "FAIL", "solution": analysis}
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(json.dumps(bypass_captcha(), indent=2))
|
||||
11
scripts/diagram_meaning_extractor.py
Normal file
11
scripts/diagram_meaning_extractor.py
Normal file
@@ -0,0 +1,11 @@
|
||||
import json
|
||||
from hermes_tools import browser_navigate, browser_vision
|
||||
|
||||
def extract_meaning():
|
||||
analysis = browser_vision(
|
||||
question="Analyze the provided diagram. Extract the core logic flow and map it to a 'Meaning Kernel' (entity -> relationship -> entity). Provide output in JSON."
|
||||
)
|
||||
return {"analysis": analysis}
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(json.dumps(extract_meaning(), indent=2))
|
||||
390
scripts/fleet-dashboard.py
Executable file
390
scripts/fleet-dashboard.py
Executable file
@@ -0,0 +1,390 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
fleet-dashboard.py -- Timmy Foundation Fleet Status Dashboard.
|
||||
|
||||
One-page terminal dashboard showing:
|
||||
1. Gitea: open PRs, open issues, recent merges
|
||||
2. VPS health: SSH reachability, service status, disk usage
|
||||
3. Cron jobs: scheduled jobs, last run status
|
||||
|
||||
Usage:
|
||||
python3 scripts/fleet-dashboard.py
|
||||
python3 scripts/fleet-dashboard.py --json # machine-readable output
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
GITEA_BASE = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_API = f"{GITEA_BASE}/api/v1"
|
||||
GITEA_ORG = "Timmy_Foundation"
|
||||
|
||||
# Key repos to check for PRs/issues
|
||||
REPOS = [
|
||||
"timmy-config",
|
||||
"the-nexus",
|
||||
"hermes-agent",
|
||||
"the-forge",
|
||||
"timmy-sandbox",
|
||||
]
|
||||
|
||||
# VPS fleet
|
||||
VPS_HOSTS = {
|
||||
"ezra": {
|
||||
"ip": "143.198.27.163",
|
||||
"ssh_user": "root",
|
||||
"services": ["nginx", "gitea", "docker"],
|
||||
},
|
||||
"allegro": {
|
||||
"ip": "167.99.126.228",
|
||||
"ssh_user": "root",
|
||||
"services": ["hermes-agent"],
|
||||
},
|
||||
"bezalel": {
|
||||
"ip": "159.203.146.185",
|
||||
"ssh_user": "root",
|
||||
"services": ["hermes-agent", "evennia"],
|
||||
},
|
||||
}
|
||||
|
||||
CRON_JOBS_FILE = Path(__file__).parent.parent / "cron" / "jobs.json"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _gitea_token() -> str:
|
||||
for p in [
|
||||
Path.home() / ".hermes" / "gitea_token",
|
||||
Path.home() / ".hermes" / "gitea_token_vps",
|
||||
Path.home() / ".config" / "gitea" / "token",
|
||||
]:
|
||||
if p.exists():
|
||||
return p.read_text().strip()
|
||||
return ""
|
||||
|
||||
|
||||
def _gitea_get(path: str, params: dict | None = None) -> list | dict:
|
||||
url = f"{GITEA_API}{path}"
|
||||
if params:
|
||||
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
||||
if qs:
|
||||
url += f"?{qs}"
|
||||
req = urllib.request.Request(url)
|
||||
token = _gitea_token()
|
||||
if token:
|
||||
req.add_header("Authorization", f"token {token}")
|
||||
req.add_header("Accept", "application/json")
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
return json.loads(resp.read())
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
def check_gitea_health() -> dict:
|
||||
"""Ping Gitea and collect PR/issue stats."""
|
||||
result = {"reachable": False, "version": "", "repos": {}, "totals": {}}
|
||||
|
||||
# Ping
|
||||
data = _gitea_get("/version")
|
||||
if isinstance(data, dict) and "error" not in data:
|
||||
result["reachable"] = True
|
||||
result["version"] = data.get("version", "unknown")
|
||||
elif isinstance(data, dict) and "error" in data:
|
||||
return result
|
||||
|
||||
total_open_prs = 0
|
||||
total_open_issues = 0
|
||||
total_recent_merges = 0
|
||||
cutoff = (datetime.now(timezone.utc) - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
for repo in REPOS:
|
||||
repo_path = f"/repos/{GITEA_ORG}/{repo}"
|
||||
repo_info = {"prs": [], "issues": [], "recent_merges": 0}
|
||||
|
||||
# Open PRs
|
||||
prs = _gitea_get(f"{repo_path}/pulls", {"state": "open", "limit": "10", "sort": "newest"})
|
||||
if isinstance(prs, list):
|
||||
for pr in prs:
|
||||
repo_info["prs"].append({
|
||||
"number": pr.get("number"),
|
||||
"title": pr.get("title", "")[:60],
|
||||
"user": pr.get("user", {}).get("login", "unknown"),
|
||||
"created": pr.get("created_at", "")[:10],
|
||||
})
|
||||
total_open_prs += len(prs)
|
||||
|
||||
# Open issues (excluding PRs)
|
||||
issues = _gitea_get(f"{repo_path}/issues", {
|
||||
"state": "open", "type": "issues", "limit": "10", "sort": "newest"
|
||||
})
|
||||
if isinstance(issues, list):
|
||||
for iss in issues:
|
||||
repo_info["issues"].append({
|
||||
"number": iss.get("number"),
|
||||
"title": iss.get("title", "")[:60],
|
||||
"user": iss.get("user", {}).get("login", "unknown"),
|
||||
"created": iss.get("created_at", "")[:10],
|
||||
})
|
||||
total_open_issues += len(issues)
|
||||
|
||||
# Recent merges (closed PRs)
|
||||
merged = _gitea_get(f"{repo_path}/pulls", {"state": "closed", "limit": "20", "sort": "newest"})
|
||||
if isinstance(merged, list):
|
||||
recent = [p for p in merged if p.get("merged") and p.get("closed_at", "") >= cutoff]
|
||||
repo_info["recent_merges"] = len(recent)
|
||||
total_recent_merges += len(recent)
|
||||
|
||||
result["repos"][repo] = repo_info
|
||||
|
||||
result["totals"] = {
|
||||
"open_prs": total_open_prs,
|
||||
"open_issues": total_open_issues,
|
||||
"recent_merges_7d": total_recent_merges,
|
||||
}
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# VPS health helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def check_ssh(ip: str, timeout: int = 5) -> bool:
|
||||
try:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(timeout)
|
||||
result = sock.connect_ex((ip, 22))
|
||||
sock.close()
|
||||
return result == 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def check_service(ip: str, user: str, service: str) -> str:
|
||||
"""Check if a systemd service is active on remote host."""
|
||||
cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=8 {user}@{ip} 'systemctl is-active {service} 2>/dev/null || echo inactive'"
|
||||
try:
|
||||
proc = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=15)
|
||||
return proc.stdout.strip() or "unknown"
|
||||
except subprocess.TimeoutExpired:
|
||||
return "timeout"
|
||||
except Exception:
|
||||
return "error"
|
||||
|
||||
|
||||
def check_disk(ip: str, user: str) -> dict:
|
||||
cmd = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=8 {user}@{ip} 'df -h / | tail -1'"
|
||||
try:
|
||||
proc = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=15)
|
||||
if proc.returncode == 0:
|
||||
parts = proc.stdout.strip().split()
|
||||
if len(parts) >= 5:
|
||||
return {"total": parts[1], "used": parts[2], "avail": parts[3], "pct": parts[4]}
|
||||
except Exception:
|
||||
pass
|
||||
return {"total": "?", "used": "?", "avail": "?", "pct": "?"}
|
||||
|
||||
|
||||
def check_vps_health() -> dict:
|
||||
result = {}
|
||||
for name, cfg in VPS_HOSTS.items():
|
||||
ip = cfg["ip"]
|
||||
ssh_up = check_ssh(ip)
|
||||
entry = {"ip": ip, "ssh": ssh_up, "services": {}, "disk": {}}
|
||||
if ssh_up:
|
||||
for svc in cfg.get("services", []):
|
||||
entry["services"][svc] = check_service(ip, cfg["ssh_user"], svc)
|
||||
entry["disk"] = check_disk(ip, cfg["ssh_user"])
|
||||
result[name] = entry
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cron job status
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def check_cron_jobs() -> list[dict]:
|
||||
jobs = []
|
||||
if not CRON_JOBS_FILE.exists():
|
||||
return [{"name": "jobs.json", "status": "FILE NOT FOUND"}]
|
||||
try:
|
||||
data = json.loads(CRON_JOBS_FILE.read_text())
|
||||
for job in data.get("jobs", []):
|
||||
jobs.append({
|
||||
"name": job.get("name", "unnamed"),
|
||||
"schedule": job.get("schedule_display", job.get("schedule", {}).get("display", "?")),
|
||||
"enabled": job.get("enabled", False),
|
||||
"state": job.get("state", "unknown"),
|
||||
"completed": job.get("repeat", {}).get("completed", 0),
|
||||
"last_status": job.get("last_status") or "never run",
|
||||
"last_error": job.get("last_error"),
|
||||
})
|
||||
except Exception as e:
|
||||
jobs.append({"name": "jobs.json", "status": f"PARSE ERROR: {e}"})
|
||||
return jobs
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Terminal rendering
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
BOLD = "\033[1m"
|
||||
DIM = "\033[2m"
|
||||
GREEN = "\033[32m"
|
||||
RED = "\033[31m"
|
||||
YELLOW = "\033[33m"
|
||||
CYAN = "\033[36m"
|
||||
RESET = "\033[0m"
|
||||
|
||||
|
||||
def _ok(val: bool) -> str:
|
||||
return f"{GREEN}UP{RESET}" if val else f"{RED}DOWN{RESET}"
|
||||
|
||||
|
||||
def _svc_icon(status: str) -> str:
|
||||
s = status.lower().strip()
|
||||
if s in ("active", "running"):
|
||||
return f"{GREEN}active{RESET}"
|
||||
elif s in ("inactive", "dead", "failed"):
|
||||
return f"{RED}{s}{RESET}"
|
||||
elif s == "timeout":
|
||||
return f"{YELLOW}timeout{RESET}"
|
||||
else:
|
||||
return f"{YELLOW}{s}{RESET}"
|
||||
|
||||
|
||||
def render_dashboard(gitea: dict, vps: dict, cron: list[dict]) -> str:
|
||||
lines = []
|
||||
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
||||
lines.append("")
|
||||
lines.append(f"{BOLD}{'=' * 72}{RESET}")
|
||||
lines.append(f"{BOLD} TIMMY FOUNDATION -- FLEET STATUS DASHBOARD{RESET}")
|
||||
lines.append(f"{DIM} Generated: {now}{RESET}")
|
||||
lines.append(f"{BOLD}{'=' * 72}{RESET}")
|
||||
|
||||
# ── Section 1: Gitea ──────────────────────────────────────────────────
|
||||
lines.append("")
|
||||
lines.append(f"{BOLD}{CYAN} [1] GITEA{RESET}")
|
||||
lines.append(f" {'-' * 68}")
|
||||
if gitea.get("reachable"):
|
||||
lines.append(f" Status: {GREEN}REACHABLE{RESET} (version {gitea.get('version', '?')})")
|
||||
t = gitea.get("totals", {})
|
||||
lines.append(f" Totals: {t.get('open_prs', 0)} open PRs | {t.get('open_issues', 0)} open issues | {t.get('recent_merges_7d', 0)} merges (7d)")
|
||||
lines.append("")
|
||||
for repo_name, repo in gitea.get("repos", {}).items():
|
||||
prs = repo.get("prs", [])
|
||||
issues = repo.get("issues", [])
|
||||
merges = repo.get("recent_merges", 0)
|
||||
lines.append(f" {BOLD}{repo_name}{RESET} ({len(prs)} PRs, {len(issues)} issues, {merges} merges/7d)")
|
||||
for pr in prs[:5]:
|
||||
lines.append(f" PR #{pr['number']:>4} {pr['title'][:50]:<50} {DIM}{pr['user']}{RESET} {pr['created']}")
|
||||
for iss in issues[:3]:
|
||||
lines.append(f" IS #{iss['number']:>4} {iss['title'][:50]:<50} {DIM}{iss['user']}{RESET} {iss['created']}")
|
||||
else:
|
||||
lines.append(f" Status: {RED}UNREACHABLE{RESET}")
|
||||
|
||||
# ── Section 2: VPS Health ─────────────────────────────────────────────
|
||||
lines.append("")
|
||||
lines.append(f"{BOLD}{CYAN} [2] VPS HEALTH{RESET}")
|
||||
lines.append(f" {'-' * 68}")
|
||||
lines.append(f" {'Host':<12} {'IP':<18} {'SSH':<8} {'Disk':<12} {'Services'}")
|
||||
lines.append(f" {'-' * 12} {'-' * 17} {'-' * 7} {'-' * 11} {'-' * 30}")
|
||||
for name, info in vps.items():
|
||||
ssh_str = _ok(info["ssh"])
|
||||
disk = info.get("disk", {})
|
||||
disk_str = disk.get("pct", "?")
|
||||
if disk_str != "?":
|
||||
pct_val = int(disk_str.rstrip("%"))
|
||||
if pct_val >= 90:
|
||||
disk_str = f"{RED}{disk_str}{RESET}"
|
||||
elif pct_val >= 75:
|
||||
disk_str = f"{YELLOW}{disk_str}{RESET}"
|
||||
else:
|
||||
disk_str = f"{GREEN}{disk_str}{RESET}"
|
||||
svc_parts = []
|
||||
for svc, status in info.get("services", {}).items():
|
||||
svc_parts.append(f"{svc}:{_svc_icon(status)}")
|
||||
svc_str = " ".join(svc_parts) if svc_parts else f"{DIM}n/a{RESET}"
|
||||
lines.append(f" {name:<12} {info['ip']:<18} {ssh_str:<18} {disk_str:<22} {svc_str}")
|
||||
|
||||
# ── Section 3: Cron Jobs ──────────────────────────────────────────────
|
||||
lines.append("")
|
||||
lines.append(f"{BOLD}{CYAN} [3] CRON JOBS{RESET}")
|
||||
lines.append(f" {'-' * 68}")
|
||||
lines.append(f" {'Name':<28} {'Schedule':<16} {'State':<12} {'Last':<12} {'Runs'}")
|
||||
lines.append(f" {'-' * 27} {'-' * 15} {'-' * 11} {'-' * 11} {'-' * 5}")
|
||||
for job in cron:
|
||||
name = job.get("name", "?")[:27]
|
||||
sched = job.get("schedule", "?")[:15]
|
||||
state = job.get("state", "?")
|
||||
if state == "scheduled":
|
||||
state_str = f"{GREEN}{state}{RESET}"
|
||||
elif state == "paused":
|
||||
state_str = f"{YELLOW}{state}{RESET}"
|
||||
else:
|
||||
state_str = state
|
||||
last = job.get("last_status", "never")[:11]
|
||||
if last == "ok":
|
||||
last_str = f"{GREEN}{last}{RESET}"
|
||||
elif last in ("error", "never run"):
|
||||
last_str = f"{RED}{last}{RESET}"
|
||||
else:
|
||||
last_str = last
|
||||
runs = job.get("completed", 0)
|
||||
enabled = job.get("enabled", False)
|
||||
marker = " " if enabled else f"{DIM}(disabled){RESET}"
|
||||
lines.append(f" {name:<28} {sched:<16} {state_str:<22} {last_str:<22} {runs} {marker}")
|
||||
|
||||
# ── Footer ────────────────────────────────────────────────────────────
|
||||
lines.append("")
|
||||
lines.append(f"{BOLD}{'=' * 72}{RESET}")
|
||||
lines.append(f"{DIM} python3 scripts/fleet-dashboard.py | timmy-config{RESET}")
|
||||
lines.append(f"{BOLD}{'=' * 72}{RESET}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
json_mode = "--json" in sys.argv
|
||||
|
||||
if not json_mode:
|
||||
print(f"\n {DIM}Collecting fleet data...{RESET}\n", file=sys.stderr)
|
||||
|
||||
gitea = check_gitea_health()
|
||||
vps = check_vps_health()
|
||||
cron = check_cron_jobs()
|
||||
|
||||
if json_mode:
|
||||
output = {
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"gitea": gitea,
|
||||
"vps": vps,
|
||||
"cron": cron,
|
||||
}
|
||||
print(json.dumps(output, indent=2))
|
||||
else:
|
||||
print(render_dashboard(gitea, vps, cron))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
12
scripts/foundation_accessibility_audit.py
Normal file
12
scripts/foundation_accessibility_audit.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import json
|
||||
from hermes_tools import browser_navigate, browser_vision
|
||||
|
||||
def audit_accessibility():
|
||||
browser_navigate(url="https://timmyfoundation.org")
|
||||
analysis = browser_vision(
|
||||
question="Perform an accessibility audit. Check for: 1) Color contrast, 2) Font legibility, 3) Missing alt text for images. Provide a report with FAIL/PASS."
|
||||
)
|
||||
return {"status": "PASS" if "PASS" in analysis.upper() else "FAIL", "analysis": analysis}
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(json.dumps(audit_accessibility(), indent=2))
|
||||
12
scripts/matrix_glitch_detect.py
Normal file
12
scripts/matrix_glitch_detect.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import json
|
||||
from hermes_tools import browser_navigate, browser_vision
|
||||
|
||||
def detect_glitches():
|
||||
browser_navigate(url="https://matrix.alexanderwhitestone.com")
|
||||
analysis = browser_vision(
|
||||
question="Scan the 3D world for visual artifacts, floating assets, or z-fighting. List all coordinates/descriptions of glitches found. Provide a PASS/FAIL."
|
||||
)
|
||||
return {"status": "PASS" if "PASS" in analysis.upper() else "FAIL", "analysis": analysis}
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(json.dumps(detect_glitches(), indent=2))
|
||||
20
scripts/nexus_smoke_test.py
Normal file
20
scripts/nexus_smoke_test.py
Normal file
@@ -0,0 +1,20 @@
|
||||
import json
|
||||
from hermes_tools import browser_navigate, browser_vision
|
||||
|
||||
def run_smoke_test():
|
||||
print("Navigating to The Nexus...")
|
||||
browser_navigate(url="https://nexus.alexanderwhitestone.com")
|
||||
|
||||
print("Performing visual verification...")
|
||||
analysis = browser_vision(
|
||||
question="Is the Nexus landing page rendered correctly? Check for: 1) The Tower logo, 2) The main entry portal, 3) Absence of 404/Error messages. Provide a clear PASS or FAIL."
|
||||
)
|
||||
|
||||
result = {
|
||||
"status": "PASS" if "PASS" in analysis.upper() else "FAIL",
|
||||
"analysis": analysis
|
||||
}
|
||||
return result
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(json.dumps(run_smoke_test(), indent=2))
|
||||
@@ -48,6 +48,34 @@ class SelfHealer:
|
||||
self.log(f" [ERROR] Failed to run remote command on {host}: {e}")
|
||||
return None
|
||||
|
||||
def confirm(self, prompt: str) -> bool:
|
||||
"""Ask for confirmation unless --yes flag is set."""
|
||||
if self.yes:
|
||||
return True
|
||||
while True:
|
||||
response = input(f"{prompt} [y/N] ").strip().lower()
|
||||
if response in ("y", "yes"):
|
||||
return True
|
||||
if response in ("n", "no", ""):
|
||||
return False
|
||||
print("Please answer 'y' or 'n'.")
|
||||
|
||||
def check_llama_server(self, host: str):
|
||||
ip = FLEET[host]["ip"]
|
||||
port = FLEET[host]["port"]
|
||||
try:
|
||||
requests.get(f"http://{ip}:{port}/health", timeout=2)
|
||||
except requests.RequestException:
|
||||
self.log(f" [!] llama-server down on {host}.")
|
||||
if self.dry_run:
|
||||
self.log(f" [DRY-RUN] Would restart llama-server on {host}")
|
||||
else:
|
||||
if self.confirm(f" Restart llama-server on {host}?"):
|
||||
self.log(f" Restarting llama-server on {host}...")
|
||||
self.run_remote(host, "systemctl restart llama-server")
|
||||
else:
|
||||
self.log(f" Skipped restart on {host}.")
|
||||
|
||||
def check_disk_space(self, host: str):
|
||||
res = self.run_remote(host, "df -h / | tail -1 | awk '{print $5}' | sed 's/%//'")
|
||||
if res and res.returncode == 0:
|
||||
|
||||
12
scripts/tower_visual_mapper.py
Normal file
12
scripts/tower_visual_mapper.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import json
|
||||
from hermes_tools import browser_navigate, browser_vision
|
||||
|
||||
def map_tower():
|
||||
browser_navigate(url="https://tower.alexanderwhitestone.com")
|
||||
analysis = browser_vision(
|
||||
question="Map the visual architecture of The Tower. Identify key rooms and their relative positions. Output as a coordinate map."
|
||||
)
|
||||
return {"map": analysis}
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(json.dumps(map_tower(), indent=2))
|
||||
11
scripts/visual_pr_reviewer.py
Normal file
11
scripts/visual_pr_reviewer.py
Normal file
@@ -0,0 +1,11 @@
|
||||
import json
|
||||
from hermes_tools import browser_navigate, browser_vision
|
||||
|
||||
def review_pr():
|
||||
analysis = browser_vision(
|
||||
question="Compare the two provided screenshots of the UI. Does the 'After' match the design spec? List all discrepancies. Provide a PASS/FAIL."
|
||||
)
|
||||
return {"status": "PASS" if "PASS" in analysis.upper() else "FAIL", "analysis": analysis}
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(json.dumps(review_pr(), indent=2))
|
||||
1
test_write.txt
Normal file
1
test_write.txt
Normal file
@@ -0,0 +1 @@
|
||||
惦-
|
||||
@@ -3,6 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
@@ -60,3 +61,35 @@ class TestMainCli:
|
||||
|
||||
healer_cls.assert_called_once_with(dry_run=False, confirm_kill=True, yes=True)
|
||||
healer.run.assert_called_once_with()
|
||||
|
||||
def test_real_default_dry_run_path_completes(self, monkeypatch, capsys):
|
||||
class FakeExecutor:
|
||||
def __init__(self):
|
||||
self.calls = []
|
||||
|
||||
def run_script(self, host, command, *, local=False, timeout=None):
|
||||
self.calls.append((host, command, local, timeout))
|
||||
if command.startswith("df -h /"):
|
||||
return subprocess.CompletedProcess(command, 0, stdout="42\n", stderr="")
|
||||
if command.startswith("free -m"):
|
||||
return subprocess.CompletedProcess(command, 0, stdout="12.5\n", stderr="")
|
||||
if command.startswith("ps aux"):
|
||||
return subprocess.CompletedProcess(command, 0, stdout="", stderr="")
|
||||
raise AssertionError(f"unexpected command: {command}")
|
||||
|
||||
fake_executor = FakeExecutor()
|
||||
monkeypatch.setattr(sh, "FLEET", {"mac": {"ip": "127.0.0.1", "port": 8080}})
|
||||
monkeypatch.setattr(sh.requests, "get", lambda url, timeout: object())
|
||||
monkeypatch.setattr(sh, "VerifiedSSHExecutor", lambda: fake_executor)
|
||||
|
||||
sh.main([])
|
||||
|
||||
out = capsys.readouterr().out
|
||||
assert "Starting self-healing cycle (DRY-RUN mode)." in out
|
||||
assert "Auditing mac..." in out
|
||||
assert "Cycle complete." in out
|
||||
assert fake_executor.calls == [
|
||||
("127.0.0.1", "df -h / | tail -1 | awk '{print $5}' | sed 's/%//'", True, 15),
|
||||
("127.0.0.1", "free -m | awk '/^Mem:/{print $3/$2 * 100}'", True, 15),
|
||||
("127.0.0.1", "ps aux --sort=-%cpu | awk 'NR>1 && $3>80 {print $2, $11, $3}'", True, 15),
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user