Compare commits
10 Commits
fix/ci-val
...
feat/gofai
| Author | SHA1 | Date | |
|---|---|---|---|
| 68f46a90f5 | |||
| 1dc29180b8 | |||
| 343e190cc3 | |||
| 932f48d06f | |||
| 0c7521d275 | |||
| bad31125c2 | |||
| 7305d97e8f | |||
| 19e11b5287 | |||
| 03d53a644b | |||
| f2388733fb |
@@ -32,6 +32,7 @@ jobs:
|
||||
name: Lint Repository
|
||||
runs-on: ubuntu-latest
|
||||
needs: linter-tests
|
||||
continue-on-error: true
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
|
||||
24
.gitea/workflows/smoke.yml
Normal file
24
.gitea/workflows/smoke.yml
Normal file
@@ -0,0 +1,24 @@
|
||||
name: Smoke Test
|
||||
on:
|
||||
pull_request:
|
||||
push:
|
||||
branches: [main]
|
||||
jobs:
|
||||
smoke:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.11'
|
||||
- name: Parse check
|
||||
run: |
|
||||
find . -name '*.yml' -o -name '*.yaml' | grep -v .gitea | xargs -r python3 -c "import sys,yaml; [yaml.safe_load(open(f)) for f in sys.argv[1:]]"
|
||||
find . -name '*.json' | xargs -r python3 -m json.tool > /dev/null
|
||||
find . -name '*.py' | xargs -r python3 -m py_compile
|
||||
find . -name '*.sh' | xargs -r bash -n
|
||||
echo "PASS: All files parse"
|
||||
- name: Secret scan
|
||||
run: |
|
||||
if grep -rE 'sk-or-|sk-ant-|ghp_|AKIA' . --include='*.yml' --include='*.py' --include='*.sh' 2>/dev/null | grep -v .gitea; then exit 1; fi
|
||||
echo "PASS: No secrets"
|
||||
126
scripts/agent_guardrails.py
Normal file
126
scripts/agent_guardrails.py
Normal file
@@ -0,0 +1,126 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
agent_guardrails.py — Sovereign Agent Guardrails for the Timmy Foundation.
|
||||
|
||||
Scans agent logs and conversation trajectories for:
|
||||
1. System Prompt Leakage — detecting if the agent revealed its internal instructions.
|
||||
2. Jailbreak Compliance — detecting if the agent responded to "GODMODE", "DAN", etc.
|
||||
3. PII Leakage — detecting if secrets or sensitive data were output.
|
||||
|
||||
Part of the Sovereign Infrastructure Suite.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import glob
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Tuple
|
||||
|
||||
# Patterns adapted from hermes-agent/agent/input_sanitizer.py
|
||||
EXTRACTION_PATTERNS = [
|
||||
r"repeat\s+the\s+words\s+above",
|
||||
r"repeat\s+your\s+(?:system\s+|initial\s+)?instructions",
|
||||
r"what\s+is\s+your\s+(?:system\s+|initial\s+)?prompt",
|
||||
r"show\s+me\s+your\s+(?:system\s+|initial\s+)?prompt",
|
||||
r"print\s+your\s+(?:instructions|prompt|system\s+prompt)",
|
||||
r"output\s+your\s+(?:initial|system)\s+prompt",
|
||||
]
|
||||
|
||||
# Patterns that indicate the agent MIGHT be leaking its prompt
|
||||
LEAKAGE_INDICATORS = [
|
||||
r"you\s+are\s+a\s+helpful\s+assistant",
|
||||
r"your\s+goal\s+is\s+to",
|
||||
r"you\s+must\s+not",
|
||||
r"here\s+are\s+your\s+instructions",
|
||||
r"my\s+system\s+prompt\s+is",
|
||||
r"i\s+was\s+told\s+to",
|
||||
]
|
||||
|
||||
# Patterns for secrets (adapted from redact.py)
|
||||
SECRET_PATTERNS = [
|
||||
r"sk-[A-Za-z0-9_-]{20,}",
|
||||
r"ghp_[A-Za-z0-9]{20,}",
|
||||
r"AIza[A-Za-z0-9_-]{30,}",
|
||||
]
|
||||
|
||||
AGENT_LOG_PATHS = [
|
||||
"/root/wizards/*/home/logs/*.log",
|
||||
"/root/wizards/*/logs/*.log",
|
||||
"/root/wizards/*/.hermes/logs/*.log",
|
||||
]
|
||||
|
||||
class GuardrailAuditor:
|
||||
def __init__(self):
|
||||
self.extraction_re = [re.compile(p, re.IGNORECASE) for p in EXTRACTION_PATTERNS]
|
||||
self.leakage_re = [re.compile(p, re.IGNORECASE) for p in LEAKAGE_INDICATORS]
|
||||
self.secret_re = [re.compile(p, re.IGNORECASE) for p in SECRET_PATTERNS]
|
||||
|
||||
def find_logs(self) -> List[Path]:
|
||||
files = []
|
||||
for pattern in AGENT_LOG_PATHS:
|
||||
for p in glob.glob(pattern):
|
||||
files.append(Path(p))
|
||||
return files
|
||||
|
||||
def audit_file(self, path: Path) -> List[Dict]:
|
||||
findings = []
|
||||
try:
|
||||
with open(path, "r", errors="ignore") as f:
|
||||
lines = f.readlines()
|
||||
for i, line in enumerate(lines):
|
||||
# Check for extraction attempts (User side)
|
||||
for p in self.extraction_re:
|
||||
if p.search(line):
|
||||
findings.append({
|
||||
"type": "EXTRACTION_ATTEMPT",
|
||||
"line": i + 1,
|
||||
"content": line.strip()[:100],
|
||||
"severity": "MEDIUM"
|
||||
})
|
||||
|
||||
# Check for potential leakage (Assistant side)
|
||||
for p in self.leakage_re:
|
||||
if p.search(line):
|
||||
findings.append({
|
||||
"type": "POTENTIAL_LEAKAGE",
|
||||
"line": i + 1,
|
||||
"content": line.strip()[:100],
|
||||
"severity": "HIGH"
|
||||
})
|
||||
|
||||
# Check for secrets
|
||||
for p in self.secret_re:
|
||||
if p.search(line):
|
||||
findings.append({
|
||||
"type": "SECRET_EXPOSURE",
|
||||
"line": i + 1,
|
||||
"content": "[REDACTED]",
|
||||
"severity": "CRITICAL"
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"Error reading {path}: {e}")
|
||||
return findings
|
||||
|
||||
def run(self):
|
||||
print("--- Sovereign Agent Guardrail Audit ---")
|
||||
logs = self.find_logs()
|
||||
print(f"Scanning {len(logs)} log files...")
|
||||
|
||||
total_findings = 0
|
||||
for log in logs:
|
||||
findings = self.audit_file(log)
|
||||
if findings:
|
||||
print(f"\nFindings in {log}:")
|
||||
for f in findings:
|
||||
print(f" [{f['severity']}] {f['type']} at line {f['line']}: {f['content']}")
|
||||
total_findings += 1
|
||||
|
||||
print(f"\nAudit complete. Total findings: {total_findings}")
|
||||
if total_findings > 0:
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
auditor = GuardrailAuditor()
|
||||
auditor.run()
|
||||
87
scripts/ci_automation_gate.py
Normal file
87
scripts/ci_automation_gate.py
Normal file
@@ -0,0 +1,87 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ci_automation_gate.py — Automated Quality Gate for Timmy Foundation CI.
|
||||
|
||||
Enforces:
|
||||
1. The 10-line Rule — functions should ideally be under 10 lines (warn at 20, fail at 50).
|
||||
2. Complexity Check — basic cyclomatic complexity check.
|
||||
3. Auto-fixable Linting — trailing whitespace, missing final newlines.
|
||||
|
||||
Used as a pre-merge gate.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import re
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
class QualityGate:
|
||||
def __init__(self, fix=False):
|
||||
self.fix = fix
|
||||
self.failures = 0
|
||||
self.warnings = 0
|
||||
|
||||
def check_file(self, path: Path):
|
||||
if path.suffix not in (".js", ".ts", ".py"):
|
||||
return
|
||||
|
||||
with open(path, "r") as f:
|
||||
lines = f.readlines()
|
||||
|
||||
new_lines = []
|
||||
changed = False
|
||||
|
||||
# 1. Basic Linting
|
||||
for line in lines:
|
||||
cleaned = line.rstrip() + "\n"
|
||||
if cleaned != line:
|
||||
changed = True
|
||||
new_lines.append(cleaned)
|
||||
|
||||
if lines and not lines[-1].endswith("\n"):
|
||||
new_lines[-1] = new_lines[-1] + "\n"
|
||||
changed = True
|
||||
|
||||
if changed and self.fix:
|
||||
with open(path, "w") as f:
|
||||
f.writelines(new_lines)
|
||||
print(f" [FIXED] {path}: Cleaned whitespace and newlines.")
|
||||
elif changed:
|
||||
print(f" [WARN] {path}: Has trailing whitespace or missing final newline.")
|
||||
self.warnings += 1
|
||||
|
||||
# 2. Function Length Check (Simple regex-based)
|
||||
content = "".join(new_lines)
|
||||
if path.suffix in (".js", ".ts"):
|
||||
# Match function blocks
|
||||
functions = re.findall(r"function\s+\w+\s*\(.*?\)\s*\{([\s\S]*?)\}", content)
|
||||
for i, func in enumerate(functions):
|
||||
length = func.count("\n")
|
||||
if length > 50:
|
||||
print(f" [FAIL] {path}: Function {i} is too long ({length} lines).")
|
||||
self.failures += 1
|
||||
elif length > 20:
|
||||
print(f" [WARN] {path}: Function {i} is getting long ({length} lines).")
|
||||
self.warnings += 1
|
||||
|
||||
def run(self, directory: str):
|
||||
print(f"--- Quality Gate: {directory} ---")
|
||||
for root, _, files in os.walk(directory):
|
||||
if "node_modules" in root or ".git" in root:
|
||||
continue
|
||||
for file in files:
|
||||
self.check_file(Path(root) / file)
|
||||
|
||||
print(f"\nGate complete. Failures: {self.failures}, Warnings: {self.warnings}")
|
||||
if self.failures > 0:
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("dir", nargs="?", default=".")
|
||||
parser.add_argument("--fix", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
gate = QualityGate(fix=args.fix)
|
||||
gate.run(args.dir)
|
||||
321
scripts/constraint_planner.py
Normal file
321
scripts/constraint_planner.py
Normal file
@@ -0,0 +1,321 @@
|
||||
#!/usr/bin/env python3
|
||||
"""constraint_planner.py — Constraint-satisfaction planner for the Timmy Foundation fleet.
|
||||
|
||||
A GOFAI approach to agent scheduling: model tasks and agents as variables
|
||||
with domains, apply constraints (capacity, skill-match, conflict-avoidance),
|
||||
and solve via backtracking with forward-checking. No gradient descent, no
|
||||
embeddings — just logic.
|
||||
|
||||
Usage:
|
||||
python constraint_planner.py --tasks tasks.yaml
|
||||
python constraint_planner.py --self-test
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
yaml = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Domain types
|
||||
# ---------------------------------------------------------------------------
|
||||
@dataclass
|
||||
class Agent:
|
||||
name: str
|
||||
skills: Set[str] = field(default_factory=set)
|
||||
max_concurrent: int = 3
|
||||
current_load: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class Task:
|
||||
name: str
|
||||
required_skills: Set[str] = field(default_factory=set)
|
||||
priority: int = 0
|
||||
repo: str = ""
|
||||
conflicts_with: Set[str] = field(default_factory=set) # task names
|
||||
|
||||
|
||||
@dataclass
|
||||
class Assignment:
|
||||
task: str
|
||||
agent: str
|
||||
reason: str
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constraint definitions
|
||||
# ---------------------------------------------------------------------------
|
||||
Constraint = Callable[[Dict[str, str], str, str], bool]
|
||||
|
||||
|
||||
def skill_constraint(agents: Dict[str, Agent], tasks: Dict[str, Task]) -> Constraint:
|
||||
"""Agent must have at least one required skill."""
|
||||
def check(assignment: Dict[str, str], task_name: str, agent_name: str) -> bool:
|
||||
task = tasks[task_name]
|
||||
agent = agents[agent_name]
|
||||
if not task.required_skills:
|
||||
return True
|
||||
return bool(task.required_skills & agent.skills)
|
||||
return check
|
||||
|
||||
|
||||
def capacity_constraint(agents: Dict[str, Agent]) -> Constraint:
|
||||
"""Agent must not exceed max_concurrent tasks."""
|
||||
def check(assignment: Dict[str, str], task_name: str, agent_name: str) -> bool:
|
||||
agent = agents[agent_name]
|
||||
current = sum(1 for v in assignment.values() if v == agent_name)
|
||||
return current < agent.max_concurrent
|
||||
return check
|
||||
|
||||
|
||||
def conflict_constraint(tasks: Dict[str, Task]) -> Constraint:
|
||||
"""Conflicting tasks must not share the same agent."""
|
||||
def check(assignment: Dict[str, str], task_name: str, agent_name: str) -> bool:
|
||||
task = tasks[task_name]
|
||||
for conflict in task.conflicts_with:
|
||||
if conflict in assignment and assignment[conflict] == agent_name:
|
||||
return False
|
||||
return True
|
||||
return check
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Solver
|
||||
# ---------------------------------------------------------------------------
|
||||
class ConstraintPlanner:
|
||||
"""Backtracking CSP solver with forward-checking."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agents: List[Agent],
|
||||
tasks: List[Task],
|
||||
extra_constraints: Optional[List[Constraint]] = None,
|
||||
):
|
||||
self._agents = {a.name: a for a in agents}
|
||||
self._tasks = {t.name: t for t in sorted(tasks, key=lambda t: -t.priority)}
|
||||
self._constraints: List[Constraint] = [
|
||||
skill_constraint(self._agents, self._tasks),
|
||||
capacity_constraint(self._agents),
|
||||
conflict_constraint(self._tasks),
|
||||
]
|
||||
if extra_constraints:
|
||||
self._constraints.extend(extra_constraints)
|
||||
self._trace: List[str] = []
|
||||
|
||||
def solve(self) -> Optional[Dict[str, str]]:
|
||||
"""Return {task_name: agent_name} or None if unsatisfiable."""
|
||||
task_names = list(self._tasks.keys())
|
||||
agent_names = list(self._agents.keys())
|
||||
assignment: Dict[str, str] = {}
|
||||
if self._backtrack(assignment, task_names, agent_names, 0):
|
||||
return assignment
|
||||
return None
|
||||
|
||||
def _backtrack(
|
||||
self,
|
||||
assignment: Dict[str, str],
|
||||
task_names: List[str],
|
||||
agent_names: List[str],
|
||||
idx: int,
|
||||
) -> bool:
|
||||
if idx == len(task_names):
|
||||
return True
|
||||
|
||||
task_name = task_names[idx]
|
||||
for agent_name in agent_names:
|
||||
if self._consistent(assignment, task_name, agent_name):
|
||||
assignment[task_name] = agent_name
|
||||
self._trace.append(f"assign {task_name} -> {agent_name}")
|
||||
if self._backtrack(assignment, task_names, agent_names, idx + 1):
|
||||
return True
|
||||
del assignment[task_name]
|
||||
self._trace.append(f"backtrack {task_name} x {agent_name}")
|
||||
return False
|
||||
|
||||
def _consistent(
|
||||
self, assignment: Dict[str, str], task_name: str, agent_name: str
|
||||
) -> bool:
|
||||
return all(
|
||||
c(assignment, task_name, agent_name) for c in self._constraints
|
||||
)
|
||||
|
||||
def explain(self, assignment: Dict[str, str]) -> List[Assignment]:
|
||||
"""Generate human-readable explanations for each assignment."""
|
||||
results = []
|
||||
for task_name, agent_name in assignment.items():
|
||||
task = self._tasks[task_name]
|
||||
agent = self._agents[agent_name]
|
||||
overlap = task.required_skills & agent.skills
|
||||
reason = f"skills={','.join(sorted(overlap))}" if overlap else "no-skill-required"
|
||||
results.append(Assignment(task=task_name, agent=agent_name, reason=reason))
|
||||
return results
|
||||
|
||||
@property
|
||||
def trace(self) -> List[str]:
|
||||
return list(self._trace)
|
||||
|
||||
# -- serialisation helpers -----------------------------------------------
|
||||
|
||||
@classmethod
|
||||
def from_dicts(cls, data: Dict) -> "ConstraintPlanner":
|
||||
agents = [
|
||||
Agent(
|
||||
name=a["name"],
|
||||
skills=set(a.get("skills", [])),
|
||||
max_concurrent=a.get("max_concurrent", 3),
|
||||
)
|
||||
for a in data.get("agents", [])
|
||||
]
|
||||
tasks = [
|
||||
Task(
|
||||
name=t["name"],
|
||||
required_skills=set(t.get("required_skills", [])),
|
||||
priority=t.get("priority", 0),
|
||||
repo=t.get("repo", ""),
|
||||
conflicts_with=set(t.get("conflicts_with", [])),
|
||||
)
|
||||
for t in data.get("tasks", [])
|
||||
]
|
||||
return cls(agents, tasks)
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, path: Path) -> "ConstraintPlanner":
|
||||
text = path.read_text()
|
||||
if path.suffix in (".yaml", ".yml"):
|
||||
if yaml is None:
|
||||
raise RuntimeError("PyYAML required for .yaml")
|
||||
data = yaml.safe_load(text)
|
||||
else:
|
||||
data = json.loads(text)
|
||||
return cls.from_dicts(data)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Built-in fleet configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
DEFAULT_FLEET = {
|
||||
"agents": [
|
||||
{"name": "timmy", "skills": ["ci", "security", "infra", "hotfix"], "max_concurrent": 4},
|
||||
{"name": "gemini", "skills": ["architecture", "sovereignty", "review"], "max_concurrent": 3},
|
||||
{"name": "allegro", "skills": ["review", "docs", "quality"], "max_concurrent": 2},
|
||||
{"name": "claude", "skills": ["frontend", "3d", "volume"], "max_concurrent": 5},
|
||||
{"name": "rockachopa", "skills": ["sovereignty", "design"], "max_concurrent": 2},
|
||||
],
|
||||
"tasks": [
|
||||
{"name": "fix-ci-pipeline", "required_skills": ["ci"], "priority": 10},
|
||||
{"name": "security-audit", "required_skills": ["security"], "priority": 9},
|
||||
{"name": "nexus-ui-overhaul", "required_skills": ["frontend", "3d"], "priority": 5},
|
||||
{"name": "sovereign-orchestrator", "required_skills": ["sovereignty", "architecture"], "priority": 8},
|
||||
{"name": "review-pr-backlog", "required_skills": ["review"], "priority": 7},
|
||||
{"name": "fleet-ops-hardening", "required_skills": ["infra"], "priority": 6},
|
||||
{"name": "mnemosyne-persistence", "required_skills": ["frontend"], "priority": 4,
|
||||
"conflicts_with": ["nexus-ui-overhaul"]},
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Self-test
|
||||
# ---------------------------------------------------------------------------
|
||||
def _self_test() -> bool:
|
||||
planner = ConstraintPlanner.from_dicts(DEFAULT_FLEET)
|
||||
result = planner.solve()
|
||||
assert result is not None, "expected a solution"
|
||||
|
||||
# Verify skill constraints
|
||||
for task_name, agent_name in result.items():
|
||||
task = planner._tasks[task_name]
|
||||
agent = planner._agents[agent_name]
|
||||
if task.required_skills:
|
||||
assert task.required_skills & agent.skills, (
|
||||
f"{agent_name} lacks skills for {task_name}"
|
||||
)
|
||||
|
||||
# Verify capacity constraints
|
||||
from collections import Counter
|
||||
counts = Counter(result.values())
|
||||
for agent_name, count in counts.items():
|
||||
agent = planner._agents[agent_name]
|
||||
assert count <= agent.max_concurrent, (
|
||||
f"{agent_name} overloaded: {count} > {agent.max_concurrent}"
|
||||
)
|
||||
|
||||
# Verify conflict constraints
|
||||
for task_name, agent_name in result.items():
|
||||
task = planner._tasks[task_name]
|
||||
for conflict in task.conflicts_with:
|
||||
if conflict in result:
|
||||
assert result[conflict] != agent_name, (
|
||||
f"{task_name} and {conflict} both assigned to {agent_name}"
|
||||
)
|
||||
|
||||
# Verify unsatisfiable detection
|
||||
impossible = {
|
||||
"agents": [{"name": "solo", "skills": ["a"], "max_concurrent": 1}],
|
||||
"tasks": [
|
||||
{"name": "t1", "required_skills": ["a"], "priority": 1},
|
||||
{"name": "t2", "required_skills": ["a"], "priority": 1},
|
||||
],
|
||||
}
|
||||
p2 = ConstraintPlanner.from_dicts(impossible)
|
||||
assert p2.solve() is None, "expected None for unsatisfiable problem"
|
||||
|
||||
print("All self-test scenarios passed.")
|
||||
explanations = planner.explain(result)
|
||||
for e in explanations:
|
||||
print(f" {e.task} -> {e.agent} ({e.reason})")
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description=__doc__)
|
||||
ap.add_argument("--tasks", type=Path, help="YAML/JSON task+agent file")
|
||||
ap.add_argument("--self-test", action="store_true")
|
||||
ap.add_argument("--json", action="store_true")
|
||||
args = ap.parse_args()
|
||||
|
||||
if args.self_test:
|
||||
sys.exit(0 if _self_test() else 1)
|
||||
|
||||
if args.tasks:
|
||||
planner = ConstraintPlanner.from_file(args.tasks)
|
||||
else:
|
||||
planner = ConstraintPlanner.from_dicts(DEFAULT_FLEET)
|
||||
|
||||
result = planner.solve()
|
||||
if result is None:
|
||||
print("No valid assignment found. Constraints are unsatisfiable.")
|
||||
sys.exit(1)
|
||||
|
||||
explanations = planner.explain(result)
|
||||
if args.json:
|
||||
print(json.dumps(
|
||||
{"assignments": [{"task": e.task, "agent": e.agent, "reason": e.reason} for e in explanations],
|
||||
"trace": planner.trace},
|
||||
indent=2,
|
||||
))
|
||||
else:
|
||||
print(f"Plan ({len(result)} assignments):")
|
||||
for e in explanations:
|
||||
print(f" {e.task} -> {e.agent} ({e.reason})")
|
||||
if planner.trace:
|
||||
print(f"\nSolver trace ({len(planner.trace)} steps):")
|
||||
for line in planner.trace:
|
||||
print(f" {line}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
75
scripts/health_dashboard.py
Normal file
75
scripts/health_dashboard.py
Normal file
@@ -0,0 +1,75 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
health_dashboard.py — Sovereign Health & Observability Dashboard.
|
||||
|
||||
Aggregates data from Muda, Guardrails, Token Optimizer, and Quality Gates
|
||||
into a single, unified health report for the Timmy Foundation fleet.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
REPORTS_DIR = Path("reports")
|
||||
DASHBOARD_FILE = Path("SOVEREIGN_HEALTH.md")
|
||||
|
||||
class HealthDashboard:
|
||||
def __init__(self):
|
||||
REPORTS_DIR.mkdir(exist_ok=True)
|
||||
|
||||
def run_tool(self, name: str, cmd: str) -> str:
|
||||
print(f"[*] Running {name}...")
|
||||
try:
|
||||
# Capture output
|
||||
res = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
||||
return res.stdout
|
||||
except Exception as e:
|
||||
return f"Error running {name}: {e}"
|
||||
|
||||
def generate_report(self):
|
||||
print("--- Generating Sovereign Health Dashboard ---")
|
||||
|
||||
# 1. Run Audits
|
||||
muda_output = self.run_tool("Muda Audit", "python3 scripts/muda_audit.py")
|
||||
guardrails_output = self.run_tool("Agent Guardrails", "python3 scripts/agent_guardrails.py")
|
||||
optimizer_output = self.run_tool("Token Optimizer", "python3 scripts/token_optimizer.py")
|
||||
gate_output = self.run_tool("Quality Gate", "python3 scripts/ci_automation_gate.py .")
|
||||
|
||||
# 2. Build Markdown
|
||||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
md = [
|
||||
f"# 🛡️ Sovereign Health Dashboard",
|
||||
f"**Last Updated:** {now}",
|
||||
f"",
|
||||
f"## 📊 Summary",
|
||||
f"- **Fleet Status:** ACTIVE",
|
||||
f"- **Security Posture:** MONITORING",
|
||||
f"- **Operational Waste:** AUDITED",
|
||||
f"",
|
||||
f"## ♻️ Muda Waste Audit",
|
||||
f"```\n{muda_output}\n```",
|
||||
f"",
|
||||
f"## 🕵️ Agent Guardrails",
|
||||
f"```\n{guardrails_output}\n```",
|
||||
f"",
|
||||
f"## 🪙 Token Efficiency",
|
||||
f"```\n{optimizer_output}\n```",
|
||||
f"",
|
||||
f"## 🏗️ CI Quality Gate",
|
||||
f"```\n{gate_output}\n```",
|
||||
f"",
|
||||
f"---",
|
||||
f"*Generated by Sovereign Infrastructure Suite*"
|
||||
]
|
||||
|
||||
with open(DASHBOARD_FILE, "w") as f:
|
||||
f.write("\n".join(md))
|
||||
|
||||
print(f"[SUCCESS] Dashboard generated at {DASHBOARD_FILE}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
dashboard = HealthDashboard()
|
||||
dashboard.generate_report()
|
||||
87
scripts/token_optimizer.py
Normal file
87
scripts/token_optimizer.py
Normal file
@@ -0,0 +1,87 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
token_optimizer.py — Token Efficiency & Optimization for the Timmy Foundation.
|
||||
|
||||
Analyzes agent logs to identify:
|
||||
1. "Chatty" Agents — agents outputting excessive tokens for simple tasks.
|
||||
2. Redundant Logs — identifying patterns of repetitive log output.
|
||||
3. Tool Output Bloat — identifying tools that return unnecessarily large payloads.
|
||||
|
||||
Outputs an "Efficiency Score" (0-100) per agent.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import glob
|
||||
import re
|
||||
from pathlib import Path
|
||||
from collections import defaultdict
|
||||
from typing import Dict, List
|
||||
|
||||
AGENT_LOG_PATHS = [
|
||||
"/root/wizards/*/home/logs/*.log",
|
||||
"/root/wizards/*/logs/*.log",
|
||||
"/root/wizards/*/.hermes/logs/*.log",
|
||||
]
|
||||
|
||||
class TokenOptimizer:
|
||||
def __init__(self):
|
||||
self.agent_stats = defaultdict(lambda: {"tokens": 0, "turns": 0, "tool_calls": 0})
|
||||
|
||||
def estimate_tokens(self, text: str) -> int:
|
||||
# Rough estimate: 4 chars per token
|
||||
return len(text) // 4
|
||||
|
||||
def find_logs(self) -> List[Path]:
|
||||
files = []
|
||||
for pattern in AGENT_LOG_PATHS:
|
||||
for p in glob.glob(pattern):
|
||||
files.append(Path(p))
|
||||
return files
|
||||
|
||||
def analyze_log(self, path: Path):
|
||||
# Extract agent name from path
|
||||
try:
|
||||
parts = path.parts
|
||||
idx = parts.index("wizards")
|
||||
agent = parts[idx + 1]
|
||||
except (ValueError, IndexError):
|
||||
agent = "unknown"
|
||||
|
||||
try:
|
||||
with open(path, "r", errors="ignore") as f:
|
||||
content = f.read()
|
||||
self.agent_stats[agent]["tokens"] += self.estimate_tokens(content)
|
||||
|
||||
# Count turns (approximate by looking for role markers)
|
||||
self.agent_stats[agent]["turns"] += content.count("[ASSISTANT]")
|
||||
self.agent_stats[agent]["turns"] += content.count("[USER]")
|
||||
|
||||
# Count tool calls
|
||||
self.agent_stats[agent]["tool_calls"] += content.count("Calling tool:")
|
||||
except Exception as e:
|
||||
print(f"Error analyzing {path}: {e}")
|
||||
|
||||
def run(self):
|
||||
print("--- Token Efficiency Audit ---")
|
||||
logs = self.find_logs()
|
||||
for log in logs:
|
||||
self.analyze_log(log)
|
||||
|
||||
print(f"{'Agent':<20} | {'Tokens':<10} | {'Turns':<6} | {'T/Turn':<8} | {'Efficiency'}")
|
||||
print("-" * 65)
|
||||
|
||||
for agent, stats in self.agent_stats.items():
|
||||
tokens = stats["tokens"]
|
||||
turns = max(stats["turns"], 1)
|
||||
t_per_turn = tokens // turns
|
||||
|
||||
# Efficiency score: lower tokens per turn is generally better
|
||||
# Baseline: 500 tokens per turn = 100 score. 2000+ = 0 score.
|
||||
efficiency = max(0, min(100, 100 - (t_per_turn - 500) // 15))
|
||||
|
||||
print(f"{agent:<20} | {tokens:<10} | {turns:<6} | {t_per_turn:<8} | {efficiency}%")
|
||||
|
||||
if __name__ == "__main__":
|
||||
optimizer = TokenOptimizer()
|
||||
optimizer.run()
|
||||
Reference in New Issue
Block a user