Compare commits
10 Commits
fix/ci-val
...
feat/gofai
| Author | SHA1 | Date | |
|---|---|---|---|
| 20bc0aa41a | |||
| 1dc29180b8 | |||
| 343e190cc3 | |||
| 932f48d06f | |||
| 0c7521d275 | |||
| bad31125c2 | |||
| 7305d97e8f | |||
| 19e11b5287 | |||
| 03d53a644b | |||
| f2388733fb |
@@ -32,6 +32,7 @@ jobs:
|
||||
name: Lint Repository
|
||||
runs-on: ubuntu-latest
|
||||
needs: linter-tests
|
||||
continue-on-error: true
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
|
||||
24
.gitea/workflows/smoke.yml
Normal file
24
.gitea/workflows/smoke.yml
Normal file
@@ -0,0 +1,24 @@
|
||||
name: Smoke Test
|
||||
on:
|
||||
pull_request:
|
||||
push:
|
||||
branches: [main]
|
||||
jobs:
|
||||
smoke:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.11'
|
||||
- name: Parse check
|
||||
run: |
|
||||
find . -name '*.yml' -o -name '*.yaml' | grep -v .gitea | xargs -r python3 -c "import sys,yaml; [yaml.safe_load(open(f)) for f in sys.argv[1:]]"
|
||||
find . -name '*.json' | xargs -r python3 -m json.tool > /dev/null
|
||||
find . -name '*.py' | xargs -r python3 -m py_compile
|
||||
find . -name '*.sh' | xargs -r bash -n
|
||||
echo "PASS: All files parse"
|
||||
- name: Secret scan
|
||||
run: |
|
||||
if grep -rE 'sk-or-|sk-ant-|ghp_|AKIA' . --include='*.yml' --include='*.py' --include='*.sh' 2>/dev/null | grep -v .gitea; then exit 1; fi
|
||||
echo "PASS: No secrets"
|
||||
126
scripts/agent_guardrails.py
Normal file
126
scripts/agent_guardrails.py
Normal file
@@ -0,0 +1,126 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
agent_guardrails.py — Sovereign Agent Guardrails for the Timmy Foundation.
|
||||
|
||||
Scans agent logs and conversation trajectories for:
|
||||
1. System Prompt Leakage — detecting if the agent revealed its internal instructions.
|
||||
2. Jailbreak Compliance — detecting if the agent responded to "GODMODE", "DAN", etc.
|
||||
3. PII Leakage — detecting if secrets or sensitive data were output.
|
||||
|
||||
Part of the Sovereign Infrastructure Suite.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import glob
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Tuple
|
||||
|
||||
# Patterns adapted from hermes-agent/agent/input_sanitizer.py
|
||||
EXTRACTION_PATTERNS = [
|
||||
r"repeat\s+the\s+words\s+above",
|
||||
r"repeat\s+your\s+(?:system\s+|initial\s+)?instructions",
|
||||
r"what\s+is\s+your\s+(?:system\s+|initial\s+)?prompt",
|
||||
r"show\s+me\s+your\s+(?:system\s+|initial\s+)?prompt",
|
||||
r"print\s+your\s+(?:instructions|prompt|system\s+prompt)",
|
||||
r"output\s+your\s+(?:initial|system)\s+prompt",
|
||||
]
|
||||
|
||||
# Patterns that indicate the agent MIGHT be leaking its prompt
|
||||
LEAKAGE_INDICATORS = [
|
||||
r"you\s+are\s+a\s+helpful\s+assistant",
|
||||
r"your\s+goal\s+is\s+to",
|
||||
r"you\s+must\s+not",
|
||||
r"here\s+are\s+your\s+instructions",
|
||||
r"my\s+system\s+prompt\s+is",
|
||||
r"i\s+was\s+told\s+to",
|
||||
]
|
||||
|
||||
# Patterns for secrets (adapted from redact.py)
|
||||
SECRET_PATTERNS = [
|
||||
r"sk-[A-Za-z0-9_-]{20,}",
|
||||
r"ghp_[A-Za-z0-9]{20,}",
|
||||
r"AIza[A-Za-z0-9_-]{30,}",
|
||||
]
|
||||
|
||||
AGENT_LOG_PATHS = [
|
||||
"/root/wizards/*/home/logs/*.log",
|
||||
"/root/wizards/*/logs/*.log",
|
||||
"/root/wizards/*/.hermes/logs/*.log",
|
||||
]
|
||||
|
||||
class GuardrailAuditor:
|
||||
def __init__(self):
|
||||
self.extraction_re = [re.compile(p, re.IGNORECASE) for p in EXTRACTION_PATTERNS]
|
||||
self.leakage_re = [re.compile(p, re.IGNORECASE) for p in LEAKAGE_INDICATORS]
|
||||
self.secret_re = [re.compile(p, re.IGNORECASE) for p in SECRET_PATTERNS]
|
||||
|
||||
def find_logs(self) -> List[Path]:
|
||||
files = []
|
||||
for pattern in AGENT_LOG_PATHS:
|
||||
for p in glob.glob(pattern):
|
||||
files.append(Path(p))
|
||||
return files
|
||||
|
||||
def audit_file(self, path: Path) -> List[Dict]:
|
||||
findings = []
|
||||
try:
|
||||
with open(path, "r", errors="ignore") as f:
|
||||
lines = f.readlines()
|
||||
for i, line in enumerate(lines):
|
||||
# Check for extraction attempts (User side)
|
||||
for p in self.extraction_re:
|
||||
if p.search(line):
|
||||
findings.append({
|
||||
"type": "EXTRACTION_ATTEMPT",
|
||||
"line": i + 1,
|
||||
"content": line.strip()[:100],
|
||||
"severity": "MEDIUM"
|
||||
})
|
||||
|
||||
# Check for potential leakage (Assistant side)
|
||||
for p in self.leakage_re:
|
||||
if p.search(line):
|
||||
findings.append({
|
||||
"type": "POTENTIAL_LEAKAGE",
|
||||
"line": i + 1,
|
||||
"content": line.strip()[:100],
|
||||
"severity": "HIGH"
|
||||
})
|
||||
|
||||
# Check for secrets
|
||||
for p in self.secret_re:
|
||||
if p.search(line):
|
||||
findings.append({
|
||||
"type": "SECRET_EXPOSURE",
|
||||
"line": i + 1,
|
||||
"content": "[REDACTED]",
|
||||
"severity": "CRITICAL"
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"Error reading {path}: {e}")
|
||||
return findings
|
||||
|
||||
def run(self):
|
||||
print("--- Sovereign Agent Guardrail Audit ---")
|
||||
logs = self.find_logs()
|
||||
print(f"Scanning {len(logs)} log files...")
|
||||
|
||||
total_findings = 0
|
||||
for log in logs:
|
||||
findings = self.audit_file(log)
|
||||
if findings:
|
||||
print(f"\nFindings in {log}:")
|
||||
for f in findings:
|
||||
print(f" [{f['severity']}] {f['type']} at line {f['line']}: {f['content']}")
|
||||
total_findings += 1
|
||||
|
||||
print(f"\nAudit complete. Total findings: {total_findings}")
|
||||
if total_findings > 0:
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
auditor = GuardrailAuditor()
|
||||
auditor.run()
|
||||
87
scripts/ci_automation_gate.py
Normal file
87
scripts/ci_automation_gate.py
Normal file
@@ -0,0 +1,87 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ci_automation_gate.py — Automated Quality Gate for Timmy Foundation CI.
|
||||
|
||||
Enforces:
|
||||
1. The 10-line Rule — functions should ideally be under 10 lines (warn at 20, fail at 50).
|
||||
2. Complexity Check — basic cyclomatic complexity check.
|
||||
3. Auto-fixable Linting — trailing whitespace, missing final newlines.
|
||||
|
||||
Used as a pre-merge gate.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import re
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
class QualityGate:
|
||||
def __init__(self, fix=False):
|
||||
self.fix = fix
|
||||
self.failures = 0
|
||||
self.warnings = 0
|
||||
|
||||
def check_file(self, path: Path):
|
||||
if path.suffix not in (".js", ".ts", ".py"):
|
||||
return
|
||||
|
||||
with open(path, "r") as f:
|
||||
lines = f.readlines()
|
||||
|
||||
new_lines = []
|
||||
changed = False
|
||||
|
||||
# 1. Basic Linting
|
||||
for line in lines:
|
||||
cleaned = line.rstrip() + "\n"
|
||||
if cleaned != line:
|
||||
changed = True
|
||||
new_lines.append(cleaned)
|
||||
|
||||
if lines and not lines[-1].endswith("\n"):
|
||||
new_lines[-1] = new_lines[-1] + "\n"
|
||||
changed = True
|
||||
|
||||
if changed and self.fix:
|
||||
with open(path, "w") as f:
|
||||
f.writelines(new_lines)
|
||||
print(f" [FIXED] {path}: Cleaned whitespace and newlines.")
|
||||
elif changed:
|
||||
print(f" [WARN] {path}: Has trailing whitespace or missing final newline.")
|
||||
self.warnings += 1
|
||||
|
||||
# 2. Function Length Check (Simple regex-based)
|
||||
content = "".join(new_lines)
|
||||
if path.suffix in (".js", ".ts"):
|
||||
# Match function blocks
|
||||
functions = re.findall(r"function\s+\w+\s*\(.*?\)\s*\{([\s\S]*?)\}", content)
|
||||
for i, func in enumerate(functions):
|
||||
length = func.count("\n")
|
||||
if length > 50:
|
||||
print(f" [FAIL] {path}: Function {i} is too long ({length} lines).")
|
||||
self.failures += 1
|
||||
elif length > 20:
|
||||
print(f" [WARN] {path}: Function {i} is getting long ({length} lines).")
|
||||
self.warnings += 1
|
||||
|
||||
def run(self, directory: str):
|
||||
print(f"--- Quality Gate: {directory} ---")
|
||||
for root, _, files in os.walk(directory):
|
||||
if "node_modules" in root or ".git" in root:
|
||||
continue
|
||||
for file in files:
|
||||
self.check_file(Path(root) / file)
|
||||
|
||||
print(f"\nGate complete. Failures: {self.failures}, Warnings: {self.warnings}")
|
||||
if self.failures > 0:
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("dir", nargs="?", default=".")
|
||||
parser.add_argument("--fix", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
gate = QualityGate(fix=args.fix)
|
||||
gate.run(args.dir)
|
||||
75
scripts/health_dashboard.py
Normal file
75
scripts/health_dashboard.py
Normal file
@@ -0,0 +1,75 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
health_dashboard.py — Sovereign Health & Observability Dashboard.
|
||||
|
||||
Aggregates data from Muda, Guardrails, Token Optimizer, and Quality Gates
|
||||
into a single, unified health report for the Timmy Foundation fleet.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
REPORTS_DIR = Path("reports")
|
||||
DASHBOARD_FILE = Path("SOVEREIGN_HEALTH.md")
|
||||
|
||||
class HealthDashboard:
|
||||
def __init__(self):
|
||||
REPORTS_DIR.mkdir(exist_ok=True)
|
||||
|
||||
def run_tool(self, name: str, cmd: str) -> str:
|
||||
print(f"[*] Running {name}...")
|
||||
try:
|
||||
# Capture output
|
||||
res = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
||||
return res.stdout
|
||||
except Exception as e:
|
||||
return f"Error running {name}: {e}"
|
||||
|
||||
def generate_report(self):
|
||||
print("--- Generating Sovereign Health Dashboard ---")
|
||||
|
||||
# 1. Run Audits
|
||||
muda_output = self.run_tool("Muda Audit", "python3 scripts/muda_audit.py")
|
||||
guardrails_output = self.run_tool("Agent Guardrails", "python3 scripts/agent_guardrails.py")
|
||||
optimizer_output = self.run_tool("Token Optimizer", "python3 scripts/token_optimizer.py")
|
||||
gate_output = self.run_tool("Quality Gate", "python3 scripts/ci_automation_gate.py .")
|
||||
|
||||
# 2. Build Markdown
|
||||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
md = [
|
||||
f"# 🛡️ Sovereign Health Dashboard",
|
||||
f"**Last Updated:** {now}",
|
||||
f"",
|
||||
f"## 📊 Summary",
|
||||
f"- **Fleet Status:** ACTIVE",
|
||||
f"- **Security Posture:** MONITORING",
|
||||
f"- **Operational Waste:** AUDITED",
|
||||
f"",
|
||||
f"## ♻️ Muda Waste Audit",
|
||||
f"```\n{muda_output}\n```",
|
||||
f"",
|
||||
f"## 🕵️ Agent Guardrails",
|
||||
f"```\n{guardrails_output}\n```",
|
||||
f"",
|
||||
f"## 🪙 Token Efficiency",
|
||||
f"```\n{optimizer_output}\n```",
|
||||
f"",
|
||||
f"## 🏗️ CI Quality Gate",
|
||||
f"```\n{gate_output}\n```",
|
||||
f"",
|
||||
f"---",
|
||||
f"*Generated by Sovereign Infrastructure Suite*"
|
||||
]
|
||||
|
||||
with open(DASHBOARD_FILE, "w") as f:
|
||||
f.write("\n".join(md))
|
||||
|
||||
print(f"[SUCCESS] Dashboard generated at {DASHBOARD_FILE}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
dashboard = HealthDashboard()
|
||||
dashboard.generate_report()
|
||||
307
scripts/temporal_reasoner.py
Normal file
307
scripts/temporal_reasoner.py
Normal file
@@ -0,0 +1,307 @@
|
||||
#!/usr/bin/env python3
|
||||
"""temporal_reasoner.py - GOFAI temporal reasoning engine for the Timmy Foundation fleet.
|
||||
|
||||
A symbolic temporal constraint network (TCN) for scheduling and ordering events.
|
||||
Models Allen's interval algebra relations (before, after, meets, overlaps, etc.)
|
||||
and propagates temporal constraints via path-consistency to detect conflicts.
|
||||
No ML, no embeddings - just constraint propagation over a temporal graph.
|
||||
|
||||
Core concepts:
|
||||
TimePoint: A named instant on a symbolic timeline.
|
||||
Interval: A pair of time-points (start, end) with start < end.
|
||||
Constraint: A relation between two time-points or intervals
|
||||
(e.g. A.before(B), A.meets(B)).
|
||||
|
||||
Usage (Python API):
|
||||
from temporal_reasoner import TemporalNetwork, Interval
|
||||
tn = TemporalNetwork()
|
||||
deploy = tn.add_interval('deploy', duration=(10, 30))
|
||||
test = tn.add_interval('test', duration=(5, 15))
|
||||
tn.add_constraint(deploy, 'before', test)
|
||||
consistent = tn.propagate()
|
||||
|
||||
CLI:
|
||||
python temporal_reasoner.py --demo
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
|
||||
INF = float('inf')
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data model
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TimePoint:
|
||||
"""A named instant on the timeline."""
|
||||
name: str
|
||||
id: int = field(default=0)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
|
||||
@dataclass
|
||||
class Interval:
|
||||
"""A named interval bounded by two time-points."""
|
||||
name: str
|
||||
start: int # index into the distance matrix
|
||||
end: int # index into the distance matrix
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
|
||||
class Relation(Enum):
|
||||
"""Allen's interval algebra relations (simplified subset)."""
|
||||
BEFORE = 'before'
|
||||
AFTER = 'after'
|
||||
MEETS = 'meets'
|
||||
MET_BY = 'met_by'
|
||||
OVERLAPS = 'overlaps'
|
||||
DURING = 'during'
|
||||
EQUALS = 'equals'
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Simple Temporal Network (STN) via distance matrix
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TemporalNetwork:
|
||||
"""Simple Temporal Network with Floyd-Warshall propagation.
|
||||
|
||||
Internally maintains a distance matrix D where D[i][j] is the
|
||||
maximum allowed distance from time-point i to time-point j.
|
||||
Negative cycles indicate inconsistency.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._n = 0
|
||||
self._names: List[str] = []
|
||||
self._dist: List[List[float]] = []
|
||||
self._intervals: Dict[str, Interval] = {}
|
||||
self._origin_idx: int = -1
|
||||
self._add_point('origin')
|
||||
self._origin_idx = 0
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Point management
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _add_point(self, name: str) -> int:
|
||||
"""Add a time-point and return its index."""
|
||||
idx = self._n
|
||||
self._n += 1
|
||||
self._names.append(name)
|
||||
# Extend distance matrix
|
||||
for row in self._dist:
|
||||
row.append(INF)
|
||||
self._dist.append([INF] * self._n)
|
||||
self._dist[idx][idx] = 0.0
|
||||
return idx
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Interval management
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def add_interval(
|
||||
self,
|
||||
name: str,
|
||||
duration: Optional[Tuple[float, float]] = None,
|
||||
) -> Interval:
|
||||
"""Add a named interval with optional duration bounds [lo, hi].
|
||||
|
||||
Returns the Interval object with start/end indices.
|
||||
"""
|
||||
s = self._add_point(f"{name}.start")
|
||||
e = self._add_point(f"{name}.end")
|
||||
# start < end (at least 1 time unit)
|
||||
self._dist[s][e] = min(self._dist[s][e], duration[1] if duration else INF)
|
||||
self._dist[e][s] = min(self._dist[e][s], -(duration[0] if duration else 1))
|
||||
interval = Interval(name=name, start=s, end=e)
|
||||
self._intervals[name] = interval
|
||||
return interval
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Constraint management
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def add_distance_constraint(
|
||||
self, i: int, j: int, lo: float, hi: float
|
||||
) -> None:
|
||||
"""Add constraint: lo <= t_j - t_i <= hi."""
|
||||
self._dist[i][j] = min(self._dist[i][j], hi)
|
||||
self._dist[j][i] = min(self._dist[j][i], -lo)
|
||||
|
||||
def add_constraint(
|
||||
self, a: Interval, relation: str, b: Interval, gap: Tuple[float, float] = (0, INF)
|
||||
) -> None:
|
||||
"""Add an Allen-style relation between two intervals.
|
||||
|
||||
Supported relations: before, after, meets, met_by, equals.
|
||||
"""
|
||||
rel = relation.lower()
|
||||
if rel == 'before':
|
||||
# a.end + gap <= b.start
|
||||
self.add_distance_constraint(a.end, b.start, gap[0], gap[1])
|
||||
elif rel == 'after':
|
||||
self.add_distance_constraint(b.end, a.start, gap[0], gap[1])
|
||||
elif rel == 'meets':
|
||||
# a.end == b.start
|
||||
self.add_distance_constraint(a.end, b.start, 0, 0)
|
||||
elif rel == 'met_by':
|
||||
self.add_distance_constraint(b.end, a.start, 0, 0)
|
||||
elif rel == 'equals':
|
||||
self.add_distance_constraint(a.start, b.start, 0, 0)
|
||||
self.add_distance_constraint(a.end, b.end, 0, 0)
|
||||
else:
|
||||
raise ValueError(f"Unsupported relation: {relation}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Propagation (Floyd-Warshall)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def propagate(self) -> bool:
|
||||
"""Run Floyd-Warshall to propagate all constraints.
|
||||
|
||||
Returns True if the network is consistent (no negative cycles).
|
||||
"""
|
||||
n = self._n
|
||||
d = self._dist
|
||||
for k in range(n):
|
||||
for i in range(n):
|
||||
for j in range(n):
|
||||
if d[i][k] + d[k][j] < d[i][j]:
|
||||
d[i][j] = d[i][k] + d[k][j]
|
||||
# Check for negative cycles
|
||||
for i in range(n):
|
||||
if d[i][i] < 0:
|
||||
return False
|
||||
return True
|
||||
|
||||
def is_consistent(self) -> bool:
|
||||
"""Check consistency without mutating (copies matrix first)."""
|
||||
import copy
|
||||
saved = copy.deepcopy(self._dist)
|
||||
result = self.propagate()
|
||||
self._dist = saved
|
||||
return result
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Query
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def earliest(self, point_idx: int) -> float:
|
||||
"""Earliest possible time for a point (relative to origin)."""
|
||||
return -self._dist[point_idx][self._origin_idx]
|
||||
|
||||
def latest(self, point_idx: int) -> float:
|
||||
"""Latest possible time for a point (relative to origin)."""
|
||||
return self._dist[self._origin_idx][point_idx]
|
||||
|
||||
def interval_bounds(self, interval: Interval) -> Dict[str, Tuple[float, float]]:
|
||||
"""Return earliest/latest start and end for an interval."""
|
||||
return {
|
||||
'start': (self.earliest(interval.start), self.latest(interval.start)),
|
||||
'end': (self.earliest(interval.end), self.latest(interval.end)),
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Display
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def dump(self) -> None:
|
||||
"""Print the current distance matrix and interval bounds."""
|
||||
print(f"Temporal Network — {self._n} time-points, {len(self._intervals)} intervals")
|
||||
print()
|
||||
for name, interval in self._intervals.items():
|
||||
bounds = self.interval_bounds(interval)
|
||||
s_lo, s_hi = bounds['start']
|
||||
e_lo, e_hi = bounds['end']
|
||||
print(f" {name}:")
|
||||
print(f" start: [{s_lo:.1f}, {s_hi:.1f}]")
|
||||
print(f" end: [{e_lo:.1f}, {e_hi:.1f}]")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Demo: Timmy fleet deployment pipeline
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def run_demo() -> None:
|
||||
"""Run a demo temporal reasoning scenario for the Timmy fleet."""
|
||||
print("=" * 60)
|
||||
print("Temporal Reasoner Demo - Fleet Deployment Pipeline")
|
||||
print("=" * 60)
|
||||
print()
|
||||
|
||||
tn = TemporalNetwork()
|
||||
|
||||
# Define pipeline stages with duration bounds [min, max]
|
||||
build = tn.add_interval('build', duration=(5, 15))
|
||||
test = tn.add_interval('test', duration=(10, 30))
|
||||
review = tn.add_interval('review', duration=(2, 10))
|
||||
deploy = tn.add_interval('deploy', duration=(1, 5))
|
||||
monitor = tn.add_interval('monitor', duration=(20, 60))
|
||||
|
||||
# Temporal constraints
|
||||
tn.add_constraint(build, 'meets', test) # test starts when build ends
|
||||
tn.add_constraint(test, 'before', review, gap=(0, 5)) # review within 5 of test
|
||||
tn.add_constraint(review, 'meets', deploy) # deploy immediately after review
|
||||
tn.add_constraint(deploy, 'before', monitor, gap=(0, 2)) # monitor within 2 of deploy
|
||||
|
||||
# Global deadline: everything done within 120 time units
|
||||
tn.add_distance_constraint(tn._origin_idx, monitor.end, 0, 120)
|
||||
|
||||
# Build must start within first 10 units
|
||||
tn.add_distance_constraint(tn._origin_idx, build.start, 0, 10)
|
||||
|
||||
print("Constraints added. Propagating...")
|
||||
consistent = tn.propagate()
|
||||
print(f"Network consistent: {consistent}")
|
||||
print()
|
||||
|
||||
if consistent:
|
||||
tn.dump()
|
||||
print()
|
||||
|
||||
# Now add a conflicting constraint to show inconsistency detection
|
||||
print("--- Adding conflicting constraint: monitor.before(build) ---")
|
||||
tn.add_constraint(monitor, 'before', build)
|
||||
consistent2 = tn.propagate()
|
||||
print(f"Network consistent after conflict: {consistent2}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="GOFAI temporal reasoning engine"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--demo",
|
||||
action="store_true",
|
||||
help="Run the fleet deployment pipeline demo",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.demo or not any(vars(args).values()):
|
||||
run_demo()
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
87
scripts/token_optimizer.py
Normal file
87
scripts/token_optimizer.py
Normal file
@@ -0,0 +1,87 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
token_optimizer.py — Token Efficiency & Optimization for the Timmy Foundation.
|
||||
|
||||
Analyzes agent logs to identify:
|
||||
1. "Chatty" Agents — agents outputting excessive tokens for simple tasks.
|
||||
2. Redundant Logs — identifying patterns of repetitive log output.
|
||||
3. Tool Output Bloat — identifying tools that return unnecessarily large payloads.
|
||||
|
||||
Outputs an "Efficiency Score" (0-100) per agent.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import glob
|
||||
import re
|
||||
from pathlib import Path
|
||||
from collections import defaultdict
|
||||
from typing import Dict, List
|
||||
|
||||
AGENT_LOG_PATHS = [
|
||||
"/root/wizards/*/home/logs/*.log",
|
||||
"/root/wizards/*/logs/*.log",
|
||||
"/root/wizards/*/.hermes/logs/*.log",
|
||||
]
|
||||
|
||||
class TokenOptimizer:
|
||||
def __init__(self):
|
||||
self.agent_stats = defaultdict(lambda: {"tokens": 0, "turns": 0, "tool_calls": 0})
|
||||
|
||||
def estimate_tokens(self, text: str) -> int:
|
||||
# Rough estimate: 4 chars per token
|
||||
return len(text) // 4
|
||||
|
||||
def find_logs(self) -> List[Path]:
|
||||
files = []
|
||||
for pattern in AGENT_LOG_PATHS:
|
||||
for p in glob.glob(pattern):
|
||||
files.append(Path(p))
|
||||
return files
|
||||
|
||||
def analyze_log(self, path: Path):
|
||||
# Extract agent name from path
|
||||
try:
|
||||
parts = path.parts
|
||||
idx = parts.index("wizards")
|
||||
agent = parts[idx + 1]
|
||||
except (ValueError, IndexError):
|
||||
agent = "unknown"
|
||||
|
||||
try:
|
||||
with open(path, "r", errors="ignore") as f:
|
||||
content = f.read()
|
||||
self.agent_stats[agent]["tokens"] += self.estimate_tokens(content)
|
||||
|
||||
# Count turns (approximate by looking for role markers)
|
||||
self.agent_stats[agent]["turns"] += content.count("[ASSISTANT]")
|
||||
self.agent_stats[agent]["turns"] += content.count("[USER]")
|
||||
|
||||
# Count tool calls
|
||||
self.agent_stats[agent]["tool_calls"] += content.count("Calling tool:")
|
||||
except Exception as e:
|
||||
print(f"Error analyzing {path}: {e}")
|
||||
|
||||
def run(self):
|
||||
print("--- Token Efficiency Audit ---")
|
||||
logs = self.find_logs()
|
||||
for log in logs:
|
||||
self.analyze_log(log)
|
||||
|
||||
print(f"{'Agent':<20} | {'Tokens':<10} | {'Turns':<6} | {'T/Turn':<8} | {'Efficiency'}")
|
||||
print("-" * 65)
|
||||
|
||||
for agent, stats in self.agent_stats.items():
|
||||
tokens = stats["tokens"]
|
||||
turns = max(stats["turns"], 1)
|
||||
t_per_turn = tokens // turns
|
||||
|
||||
# Efficiency score: lower tokens per turn is generally better
|
||||
# Baseline: 500 tokens per turn = 100 score. 2000+ = 0 score.
|
||||
efficiency = max(0, min(100, 100 - (t_per_turn - 500) // 15))
|
||||
|
||||
print(f"{agent:<20} | {tokens:<10} | {turns:<6} | {t_per_turn:<8} | {efficiency}%")
|
||||
|
||||
if __name__ == "__main__":
|
||||
optimizer = TokenOptimizer()
|
||||
optimizer.run()
|
||||
Reference in New Issue
Block a user