diff --git a/scripts/agent_guardrails.py b/scripts/agent_guardrails.py new file mode 100644 index 00000000..06f0cce3 --- /dev/null +++ b/scripts/agent_guardrails.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +""" +agent_guardrails.py — Sovereign Agent Guardrails for the Timmy Foundation. + +Scans agent logs and conversation trajectories for: +1. System Prompt Leakage — detecting if the agent revealed its internal instructions. +2. Jailbreak Compliance — detecting if the agent responded to "GODMODE", "DAN", etc. +3. PII Leakage — detecting if secrets or sensitive data were output. + +Part of the Sovereign Infrastructure Suite. +""" + +import os +import re +import sys +import glob +import logging +from pathlib import Path +from typing import List, Dict, Tuple + +# Patterns adapted from hermes-agent/agent/input_sanitizer.py +EXTRACTION_PATTERNS = [ + r"repeat\s+the\s+words\s+above", + r"repeat\s+your\s+(?:system\s+|initial\s+)?instructions", + r"what\s+is\s+your\s+(?:system\s+|initial\s+)?prompt", + r"show\s+me\s+your\s+(?:system\s+|initial\s+)?prompt", + r"print\s+your\s+(?:instructions|prompt|system\s+prompt)", + r"output\s+your\s+(?:initial|system)\s+prompt", +] + +# Patterns that indicate the agent MIGHT be leaking its prompt +LEAKAGE_INDICATORS = [ + r"you\s+are\s+a\s+helpful\s+assistant", + r"your\s+goal\s+is\s+to", + r"you\s+must\s+not", + r"here\s+are\s+your\s+instructions", + r"my\s+system\s+prompt\s+is", + r"i\s+was\s+told\s+to", +] + +# Patterns for secrets (adapted from redact.py) +SECRET_PATTERNS = [ + r"sk-[A-Za-z0-9_-]{20,}", + r"ghp_[A-Za-z0-9]{20,}", + r"AIza[A-Za-z0-9_-]{30,}", +] + +AGENT_LOG_PATHS = [ + "/root/wizards/*/home/logs/*.log", + "/root/wizards/*/logs/*.log", + "/root/wizards/*/.hermes/logs/*.log", +] + +class GuardrailAuditor: + def __init__(self): + self.extraction_re = [re.compile(p, re.IGNORECASE) for p in EXTRACTION_PATTERNS] + self.leakage_re = [re.compile(p, re.IGNORECASE) for p in LEAKAGE_INDICATORS] + self.secret_re = [re.compile(p, re.IGNORECASE) for p in SECRET_PATTERNS] + + def find_logs(self) -> List[Path]: + files = [] + for pattern in AGENT_LOG_PATHS: + for p in glob.glob(pattern): + files.append(Path(p)) + return files + + def audit_file(self, path: Path) -> List[Dict]: + findings = [] + try: + with open(path, "r", errors="ignore") as f: + lines = f.readlines() + for i, line in enumerate(lines): + # Check for extraction attempts (User side) + for p in self.extraction_re: + if p.search(line): + findings.append({ + "type": "EXTRACTION_ATTEMPT", + "line": i + 1, + "content": line.strip()[:100], + "severity": "MEDIUM" + }) + + # Check for potential leakage (Assistant side) + for p in self.leakage_re: + if p.search(line): + findings.append({ + "type": "POTENTIAL_LEAKAGE", + "line": i + 1, + "content": line.strip()[:100], + "severity": "HIGH" + }) + + # Check for secrets + for p in self.secret_re: + if p.search(line): + findings.append({ + "type": "SECRET_EXPOSURE", + "line": i + 1, + "content": "[REDACTED]", + "severity": "CRITICAL" + }) + except Exception as e: + print(f"Error reading {path}: {e}") + return findings + + def run(self): + print("--- Sovereign Agent Guardrail Audit ---") + logs = self.find_logs() + print(f"Scanning {len(logs)} log files...") + + total_findings = 0 + for log in logs: + findings = self.audit_file(log) + if findings: + print(f"\nFindings in {log}:") + for f in findings: + print(f" [{f['severity']}] {f['type']} at line {f['line']}: {f['content']}") + total_findings += 1 + + print(f"\nAudit complete. Total findings: {total_findings}") + if total_findings > 0: + sys.exit(1) + +if __name__ == "__main__": + auditor = GuardrailAuditor() + auditor.run() diff --git a/scripts/ci_automation_gate.py b/scripts/ci_automation_gate.py new file mode 100644 index 00000000..9dde2eb6 --- /dev/null +++ b/scripts/ci_automation_gate.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +""" +ci_automation_gate.py — Automated Quality Gate for Timmy Foundation CI. + +Enforces: +1. The 10-line Rule — functions should ideally be under 10 lines (warn at 20, fail at 50). +2. Complexity Check — basic cyclomatic complexity check. +3. Auto-fixable Linting — trailing whitespace, missing final newlines. + +Used as a pre-merge gate. +""" + +import os +import sys +import re +import argparse +from pathlib import Path + +class QualityGate: + def __init__(self, fix=False): + self.fix = fix + self.failures = 0 + self.warnings = 0 + + def check_file(self, path: Path): + if path.suffix not in (".js", ".ts", ".py"): + return + + with open(path, "r") as f: + lines = f.readlines() + + new_lines = [] + changed = False + + # 1. Basic Linting + for line in lines: + cleaned = line.rstrip() + "\n" + if cleaned != line: + changed = True + new_lines.append(cleaned) + + if lines and not lines[-1].endswith("\n"): + new_lines[-1] = new_lines[-1] + "\n" + changed = True + + if changed and self.fix: + with open(path, "w") as f: + f.writelines(new_lines) + print(f" [FIXED] {path}: Cleaned whitespace and newlines.") + elif changed: + print(f" [WARN] {path}: Has trailing whitespace or missing final newline.") + self.warnings += 1 + + # 2. Function Length Check (Simple regex-based) + content = "".join(new_lines) + if path.suffix in (".js", ".ts"): + # Match function blocks + functions = re.findall(r"function\s+\w+\s*\(.*?\)\s*\{([\s\S]*?)\}", content) + for i, func in enumerate(functions): + length = func.count("\n") + if length > 50: + print(f" [FAIL] {path}: Function {i} is too long ({length} lines).") + self.failures += 1 + elif length > 20: + print(f" [WARN] {path}: Function {i} is getting long ({length} lines).") + self.warnings += 1 + + def run(self, directory: str): + print(f"--- Quality Gate: {directory} ---") + for root, _, files in os.walk(directory): + if "node_modules" in root or ".git" in root: + continue + for file in files: + self.check_file(Path(root) / file) + + print(f"\nGate complete. Failures: {self.failures}, Warnings: {self.warnings}") + if self.failures > 0: + sys.exit(1) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("dir", nargs="?", default=".") + parser.add_argument("--fix", action="store_true") + args = parser.parse_args() + + gate = QualityGate(fix=args.fix) + gate.run(args.dir) diff --git a/scripts/token_optimizer.py b/scripts/token_optimizer.py new file mode 100644 index 00000000..0f94c848 --- /dev/null +++ b/scripts/token_optimizer.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +""" +token_optimizer.py — Token Efficiency & Optimization for the Timmy Foundation. + +Analyzes agent logs to identify: +1. "Chatty" Agents — agents outputting excessive tokens for simple tasks. +2. Redundant Logs — identifying patterns of repetitive log output. +3. Tool Output Bloat — identifying tools that return unnecessarily large payloads. + +Outputs an "Efficiency Score" (0-100) per agent. +""" + +import os +import sys +import glob +import re +from pathlib import Path +from collections import defaultdict +from typing import Dict, List + +AGENT_LOG_PATHS = [ + "/root/wizards/*/home/logs/*.log", + "/root/wizards/*/logs/*.log", + "/root/wizards/*/.hermes/logs/*.log", +] + +class TokenOptimizer: + def __init__(self): + self.agent_stats = defaultdict(lambda: {"tokens": 0, "turns": 0, "tool_calls": 0}) + + def estimate_tokens(self, text: str) -> int: + # Rough estimate: 4 chars per token + return len(text) // 4 + + def find_logs(self) -> List[Path]: + files = [] + for pattern in AGENT_LOG_PATHS: + for p in glob.glob(pattern): + files.append(Path(p)) + return files + + def analyze_log(self, path: Path): + # Extract agent name from path + try: + parts = path.parts + idx = parts.index("wizards") + agent = parts[idx + 1] + except (ValueError, IndexError): + agent = "unknown" + + try: + with open(path, "r", errors="ignore") as f: + content = f.read() + self.agent_stats[agent]["tokens"] += self.estimate_tokens(content) + + # Count turns (approximate by looking for role markers) + self.agent_stats[agent]["turns"] += content.count("[ASSISTANT]") + self.agent_stats[agent]["turns"] += content.count("[USER]") + + # Count tool calls + self.agent_stats[agent]["tool_calls"] += content.count("Calling tool:") + except Exception as e: + print(f"Error analyzing {path}: {e}") + + def run(self): + print("--- Token Efficiency Audit ---") + logs = self.find_logs() + for log in logs: + self.analyze_log(log) + + print(f"{'Agent':<20} | {'Tokens':<10} | {'Turns':<6} | {'T/Turn':<8} | {'Efficiency'}") + print("-" * 65) + + for agent, stats in self.agent_stats.items(): + tokens = stats["tokens"] + turns = max(stats["turns"], 1) + t_per_turn = tokens // turns + + # Efficiency score: lower tokens per turn is generally better + # Baseline: 500 tokens per turn = 100 score. 2000+ = 0 score. + efficiency = max(0, min(100, 100 - (t_per_turn - 500) // 15)) + + print(f"{agent:<20} | {tokens:<10} | {turns:<6} | {t_per_turn:<8} | {efficiency}%") + +if __name__ == "__main__": + optimizer = TokenOptimizer() + optimizer.run()