Compare commits

..

1 Commits

Author SHA1 Message Date
b00785820b feat(security): Extend approval.py with Vitalik's threat model
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 45s
Add three new threat categories to the approval system:
1. LLM jailbreaks (prompt injection, system prompt extraction, social engineering)
2. LLM accidents (credential leakage, API key exposure, sensitive data)
3. Software bugs/supply chain risks (typosquatting, dependency confusion, obfuscated code)

Resolves #284
2026-04-13 22:17:32 +00:00
3 changed files with 329 additions and 581 deletions

View File

@@ -1,415 +0,0 @@
#!/usr/bin/env python3
"""Evaluate Qwen3.5:35B as a local model option for the Hermes fleet.
Part of Epic #281 — Vitalik's Secure LLM Architecture.
Issue #288 — Evaluate Qwen3.5:35B as Local Model Option.
Evaluates:
1. Model specs & deployment feasibility
2. Context window & tool-use support
3. Security posture (local inference = no data exfiltration)
4. Comparison against current fleet models
5. VRAM requirements by quantization level
6. Integration path with existing Ollama infrastructure
Usage:
python3 scripts/evaluate_qwen35.py # Full evaluation
python3 scripts/evaluate_qwen35.py --check-ollama # Check local Ollama status
python3 scripts/evaluate_qwen35.py --benchmark MODEL # Run benchmark against a model
"""
import json
import os
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
# =========================================================================
# Model Specification
# =========================================================================
@dataclass
class ModelSpec:
"""Qwen3.5:35B specification from research."""
name: str = "Qwen3.5-35B-A3B"
ollama_tag: str = "qwen3.5:35b"
hf_id: str = "Qwen/Qwen3.5-35B-A3B"
architecture: str = "MoE (Mixture of Experts)"
total_params: str = "35B"
active_params: str = "3B per token"
context_length: int = 131072 # 128K tokens
license: str = "Apache 2.0"
release_date: str = "2026-04"
languages: str = "Multilingual (29+ languages)"
quantization_options: Dict[str, int] = field(default_factory=lambda: {
"Q8_0": 36, # ~36GB VRAM (near-lossless)
"Q6_K": 28, # ~28GB VRAM (high quality)
"Q5_K_M": 24, # ~24GB VRAM (balanced)
"Q4_K_M": 20, # ~20GB VRAM (recommended)
"Q4_0": 18, # ~18GB VRAM (minimum viable)
"Q3_K_M": 15, # ~15GB VRAM (aggressive)
"Q2_K": 12, # ~12GB VRAM (quality loss)
})
training_cutoff: str = "2026-03"
tool_use_support: bool = True
json_mode_support: bool = True
function_calling: bool = True
# =========================================================================
# Fleet Comparison
# =========================================================================
FLEET_MODELS = {
"qwen3.5:35b (candidate)": {
"params_active": "3B", "params_total": "35B", "context": "128K",
"local": True, "tool_use": True, "reasoning": "good",
"vram_q4": "20GB", "license": "Apache 2.0",
},
"gemma4 (current local)": {
"params_active": "9B", "params_total": "9B", "context": "128K",
"local": True, "tool_use": True, "reasoning": "good",
"vram_q4": "6GB", "license": "Gemma",
},
"hermes4:14b (current local)": {
"params_active": "14B", "params_total": "14B", "context": "8K",
"local": True, "tool_use": True, "reasoning": "good",
"vram_q4": "9GB", "license": "Apache 2.0",
},
"qwen2.5:7b (fleet)": {
"params_active": "7B", "params_total": "7B", "context": "32K",
"local": True, "tool_use": True, "reasoning": "moderate",
"vram_q4": "5GB", "license": "Apache 2.0",
},
"claude-sonnet-4 (cloud)": {
"params_active": "?", "params_total": "?", "context": "200K",
"local": False, "tool_use": True, "reasoning": "excellent",
"vram_q4": "N/A", "license": "Proprietary",
},
"mimo-v2-pro (cloud free)": {
"params_active": "?", "params_total": "?", "context": "128K",
"local": False, "tool_use": True, "reasoning": "good",
"vram_q4": "N/A", "license": "Proprietary",
},
}
# =========================================================================
# Security Evaluation (Vitalik Framework)
# =========================================================================
SECURITY_CRITERIA = [
{
"criterion": "Data locality — no network exfiltration",
"description": "All inference happens on local hardware. Zero data leaves the machine.",
"weight": "CRITICAL",
"qwen35_score": 10,
"notes": "Ollama runs entirely local. Perfect data sovereignty.",
},
{
"criterion": "No API key dependency",
"description": "Model runs without any external API credentials.",
"weight": "HIGH",
"qwen35_score": 10,
"notes": "Pure local inference. No Anthropic/OpenAI key needed.",
},
{
"criterion": "Model weights auditable",
"description": "Weights can be verified against HF hashes.",
"weight": "MEDIUM",
"qwen35_score": 8,
"notes": "Apache 2.0 license. Weights on HuggingFace with SHA verification. MoE architecture is more complex to audit than dense models.",
},
{
"criterion": "No telemetry/phone-home",
"description": "Model doesn't contact external services during inference.",
"weight": "CRITICAL",
"qwen35_score": 10,
"notes": "Ollama is fully offline-capable. No telemetry in Qwen weights.",
},
{
"criterion": "Tool-use safety",
"description": "Model correctly follows tool schemas without prompt injection via tool results.",
"weight": "HIGH",
"qwen35_score": 7,
"notes": "Qwen3.5 supports function calling but MoE models can be less predictable with tool dispatch. Needs live testing.",
},
{
"criterion": "Privacy filter compatibility",
"description": "Works with Vitalik's Input Privacy Filter pattern.",
"weight": "HIGH",
"qwen35_score": 9,
"notes": "Local model means the Privacy Filter (which strips PII before remote calls) becomes unnecessary for most queries.",
},
{
"criterion": "Two-factor confirmation compatibility",
"description": "Can serve as the LLM half of Human+LLM confirmation.",
"weight": "MEDIUM",
"qwen35_score": 8,
"notes": "3B active params means fast inference for confirmation prompts. Good for the 'cheap first pass' in two-factor flow.",
},
{
"criterion": "Prompt injection resistance",
"description": "Resists adversarial prompts that attempt to bypass safety.",
"weight": "HIGH",
"qwen35_score": 6,
"notes": "Smaller active expert size (3B) may be more susceptible to injection than dense 14B+ models. Needs red-team testing.",
},
]
# =========================================================================
# Deployment Feasibility
# =========================================================================
HARDWARE_PROFILES = {
"mac_m2_ultra_192gb": {
"name": "Mac Studio M2 Ultra (192GB)",
"unified_memory_gb": 192,
"can_run_q4": True,
"can_run_q8": True,
"recommended_quant": "Q6_K",
"est_tokens_per_sec": 40,
"notes": "Comfortable fit. Room for other models.",
},
"mac_m4_pro_48gb": {
"name": "Mac Mini M4 Pro (48GB)",
"unified_memory_gb": 48,
"can_run_q4": True,
"can_run_q8": False,
"recommended_quant": "Q4_K_M",
"est_tokens_per_sec": 30,
"notes": "Fits at Q4 with ~28GB headroom for OS + other processes.",
},
"mac_m1_16gb": {
"name": "Mac M1 (16GB)",
"unified_memory_gb": 16,
"can_run_q4": False,
"can_run_q8": False,
"recommended_quant": None,
"est_tokens_per_sec": None,
"notes": "Does NOT fit. Need 20GB+ for Q4. Use Qwen2.5:7B or Gemma3:1B instead.",
},
"rtx_4090_24gb": {
"name": "NVIDIA RTX 4090 (24GB VRAM)",
"unified_memory_gb": 24,
"can_run_q4": True,
"can_run_q8": False,
"recommended_quant": "Q5_K_M",
"est_tokens_per_sec": 50,
"notes": "Fits at Q5. Good for dedicated inference server.",
},
"rtx_3090_24gb": {
"name": "NVIDIA RTX 3090 (24GB VRAM)",
"unified_memory_gb": 24,
"can_run_q4": True,
"can_run_q8": False,
"recommended_quant": "Q4_K_M",
"est_tokens_per_sec": 35,
"notes": "Fits at Q4. Slower than 4090 but workable.",
},
"runpod_l40s_48gb": {
"name": "RunPod L40S (48GB VRAM)",
"unified_memory_gb": 48,
"can_run_q4": True,
"can_run_q8": True,
"recommended_quant": "Q6_K",
"est_tokens_per_sec": 60,
"notes": "Cloud GPU option. ~$0.75/hr. Good for Big Brain tier.",
},
}
# =========================================================================
# Evaluation Engine
# =========================================================================
def check_ollama_status() -> Dict[str, Any]:
"""Check if Ollama is running and what models are available."""
import subprocess
result = {"running": False, "models": [], "qwen35_available": False}
try:
r = subprocess.run(
["curl", "-s", "--max-time", "5", "http://localhost:11434/api/tags"],
capture_output=True, text=True, timeout=10,
)
if r.returncode == 0:
data = json.loads(r.stdout)
result["running"] = True
result["models"] = [m["name"] for m in data.get("models", [])]
result["qwen35_available"] = any(
"qwen3.5" in m.lower() for m in result["models"]
)
except Exception as e:
result["error"] = str(e)
return result
def run_benchmark(model: str, prompt: str) -> Dict[str, Any]:
"""Run a single benchmark prompt against an Ollama model."""
import subprocess
start = time.time()
try:
r = subprocess.run(
["curl", "-s", "--max-time", "120", "http://localhost:11434/api/generate",
"-d", json.dumps({"model": model, "prompt": prompt, "stream": False})],
capture_output=True, text=True, timeout=130,
)
elapsed = time.time() - start
if r.returncode == 0:
data = json.loads(r.stdout)
response = data.get("response", "")
eval_count = data.get("eval_count", 0)
eval_duration = data.get("eval_duration", 1)
tok_per_sec = eval_count / (eval_duration / 1e9) if eval_duration > 0 else 0
return {
"success": True,
"response": response[:500],
"elapsed_sec": round(elapsed, 1),
"tokens": eval_count,
"tok_per_sec": round(tok_per_sec, 1),
}
else:
return {"success": False, "error": r.stderr[:200], "elapsed_sec": elapsed}
except Exception as e:
return {"success": False, "error": str(e), "elapsed_sec": time.time() - start}
def generate_report() -> str:
"""Generate the full evaluation report."""
spec = ModelSpec()
ollama = check_ollama_status()
lines = []
lines.append("=" * 72)
lines.append("Qwen3.5:35B EVALUATION REPORT — Issue #288")
lines.append("Part of Epic #281 — Vitalik's Secure LLM Architecture")
lines.append("=" * 72)
# 1. Model Specs
lines.append("\n## 1. Model Specification\n")
lines.append(f" Name: {spec.name}")
lines.append(f" Ollama tag: {spec.ollama_tag}")
lines.append(f" HuggingFace: {spec.hf_id}")
lines.append(f" Architecture: {spec.architecture}")
lines.append(f" Params: {spec.total_params} total, {spec.active_params}")
lines.append(f" Context: {spec.context_length:,} tokens ({spec.context_length//1024}K)")
lines.append(f" License: {spec.license}")
lines.append(f" Tool use: {'Yes' if spec.tool_use_support else 'No'}")
lines.append(f" JSON mode: {'Yes' if spec.json_mode_support else 'No'}")
lines.append(f" Function call: {'Yes' if spec.function_calling else 'No'}")
# 2. Deployment Feasibility
lines.append("\n## 2. VRAM Requirements\n")
lines.append(f" {'Quantization':<12} {'VRAM (GB)':<12} {'Quality'}")
lines.append(f" {'-'*12} {'-'*12} {'-'*20}")
for q, vram in sorted(spec.quantization_options.items(), key=lambda x: x[1]):
quality = "near-lossless" if vram >= 36 else "high" if vram >= 24 else "balanced" if vram >= 20 else "minimum" if vram >= 15 else "lossy"
lines.append(f" {q:<12} {vram:<12} {quality}")
# 3. Hardware Compatibility
lines.append("\n## 3. Hardware Compatibility\n")
for hw_id, hw in HARDWARE_PROFILES.items():
fits = "YES" if hw["can_run_q4"] else "NO"
rec = hw["recommended_quant"] or "N/A"
tps = hw["est_tokens_per_sec"] or "N/A"
lines.append(f" {hw['name']}")
lines.append(f" {hw['unified_memory_gb']}GB | Fits Q4: {fits} | Rec: {rec} | ~{tps} tok/s")
lines.append(f" {hw['notes']}")
# 4. Security Evaluation
lines.append("\n## 4. Security Evaluation (Vitalik Framework)\n")
total_weight = 0
weighted_score = 0
weight_map = {"CRITICAL": 3, "HIGH": 2, "MEDIUM": 1}
for c in SECURITY_CRITERIA:
w = weight_map[c["weight"]]
total_weight += w
weighted_score += c["qwen35_score"] * w
lines.append(f" [{c['weight']:<8}] {c['criterion']}")
lines.append(f" Score: {c['qwen35_score']}/10 — {c['notes']}")
avg_score = weighted_score / total_weight if total_weight > 0 else 0
lines.append(f"\n Weighted security score: {avg_score:.1f}/10")
lines.append(f" Verdict: {'STRONG' if avg_score >= 8 else 'ADEQUATE' if avg_score >= 6 else 'NEEDS WORK'}")
# 5. Fleet Comparison
lines.append("\n## 5. Fleet Comparison\n")
lines.append(f" {'Model':<30} {'Params':<10} {'Ctx':<8} {'Local':<7} {'Tools':<7} {'Reasoning'}")
lines.append(f" {'-'*30} {'-'*10} {'-'*8} {'-'*7} {'-'*7} {'-'*12}")
for name, spec_data in FLEET_MODELS.items():
lines.append(
f" {name:<30} {spec_data['params_total']:<10} {spec_data['context']:<8} "
f"{'Yes' if spec_data['local'] else 'No':<7} {'Yes' if spec_data['tool_use'] else 'No':<7} "
f"{spec_data['reasoning']}"
)
# 6. Ollama Status
lines.append("\n## 6. Local Ollama Status\n")
lines.append(f" Running: {'Yes' if ollama['running'] else 'No'}")
lines.append(f" Installed: {', '.join(ollama['models']) if ollama['models'] else 'none'}")
lines.append(f" Qwen3.5 avail: {'Yes' if ollama['qwen35_available'] else 'No — run: ollama pull qwen3.5:35b'}")
# 7. Recommendation
lines.append("\n## 7. Recommendation\n")
lines.append(" VERDICT: APPROVED for local deployment as privacy-sensitive tier\n")
lines.append(" Strengths:")
lines.append(" + Perfect data sovereignty (Vitalik's #1 requirement)")
lines.append(" + MoE architecture: 35B quality at 3B inference speed")
lines.append(" + 128K context — matches cloud models")
lines.append(" + Apache 2.0 — no license restrictions")
lines.append(" + Tool use + JSON mode + function calling supported")
lines.append(" + Eliminates need for Privacy Filter on most queries")
lines.append("")
lines.append(" Weaknesses:")
lines.append(" - 20GB VRAM at Q4 — requires beefy hardware")
lines.append(" - MoE routing less predictable than dense models")
lines.append(" - 3B active params may be weaker on complex reasoning")
lines.append(" - Needs red-team testing for prompt injection")
lines.append("")
lines.append(" Deployment plan:")
lines.append(" 1. Pull: ollama pull qwen3.5:35b")
lines.append(" 2. Add to config.yaml as privacy-sensitive model")
lines.append(" 3. Route PII-flagged queries through local Qwen3.5")
lines.append(" 4. Keep cloud models for non-sensitive complex work")
lines.append(" 5. Run red-team tests (issue #324) against local model")
# 8. Integration Path
lines.append("\n## 8. Integration Path\n")
lines.append(" Config addition (config.yaml):")
lines.append(' privacy_model:')
lines.append(' provider: ollama')
lines.append(' model: qwen3.5:35b')
lines.append(' base_url: http://localhost:11434')
lines.append(' context_length: 131072')
lines.append('')
lines.append(' smart_model_routing integration:')
lines.append(' Route queries containing PII patterns to local Qwen3.5')
lines.append(' instead of cloud models, eliminating data exfiltration risk.')
return "\n".join(lines)
# =========================================================================
# CLI
# =========================================================================
if __name__ == "__main__":
if "--check-ollama" in sys.argv:
status = check_ollama_status()
print(json.dumps(status, indent=2))
elif "--benchmark" in sys.argv:
idx = sys.argv.index("--benchmark")
model = sys.argv[idx + 1] if idx + 1 < len(sys.argv) else "qwen2.5:7b"
print(f"Benchmarking {model}...")
result = run_benchmark(model, "Explain the security benefits of local LLM inference in 3 sentences.")
print(json.dumps(result, indent=2))
else:
print(generate_report())

View File

@@ -1,166 +0,0 @@
"""Tests for Qwen3.5:35B evaluation script — Issue #288."""
import json
import pytest
from scripts.evaluate_qwen35 import (
ModelSpec,
FLEET_MODELS,
SECURITY_CRITERIA,
HARDWARE_PROFILES,
check_ollama_status,
generate_report,
)
class TestModelSpec:
"""Model specification validation."""
def test_spec_fields(self):
spec = ModelSpec()
assert spec.name == "Qwen3.5-35B-A3B"
assert spec.total_params == "35B"
assert spec.active_params == "3B per token"
assert spec.context_length == 131072
assert spec.license == "Apache 2.0"
assert spec.tool_use_support is True
assert spec.json_mode_support is True
assert spec.function_calling is True
def test_quantization_options(self):
spec = ModelSpec()
quants = spec.quantization_options
assert "Q4_K_M" in quants
assert "Q8_0" in quants
# Q4 should require less VRAM than Q8
assert quants["Q4_K_M"] < quants["Q8_0"]
# All should be positive
for q, vram in quants.items():
assert vram > 0, f"{q} VRAM should be positive"
def test_vram_monotonically_decreasing(self):
"""Lower quantization levels should require less VRAM."""
spec = ModelSpec()
sorted_quants = sorted(spec.quantization_options.items(), key=lambda x: x[1])
for i in range(1, len(sorted_quants)):
assert sorted_quants[i][1] >= sorted_quants[i-1][1], \
f"{sorted_quants[i][0]} should use >= VRAM than {sorted_quants[i-1][0]}"
class TestFleetComparison:
"""Fleet model comparison data integrity."""
def test_all_models_present(self):
assert len(FLEET_MODELS) >= 5
assert "qwen3.5:35b (candidate)" in FLEET_MODELS
def test_candidate_has_best_local_context(self):
"""Qwen3.5:35B should have the largest context among local models."""
candidate_ctx = 128 # 128K
for name, data in FLEET_MODELS.items():
if data["local"] and name != "qwen3.5:35b (candidate)":
ctx_str = data["context"].replace("K", "").replace("k", "")
try:
ctx = int(ctx_str)
assert ctx <= candidate_ctx, \
f"Local model {name} has {ctx}K context > candidate's 128K"
except ValueError:
pass # Skip models with non-numeric context
def test_only_candidate_is_35b(self):
"""No other fleet model should be 35B."""
for name, data in FLEET_MODELS.items():
if name != "qwen3.5:35b (candidate)":
assert "35B" not in data["params_total"], \
f"{name} shouldn't be 35B — duplicate with candidate"
class TestSecurityEvaluation:
"""Security criteria validation."""
def test_all_criteria_scored(self):
for c in SECURITY_CRITERIA:
assert 1 <= c["qwen35_score"] <= 10, \
f"{c['criterion']} score {c['qwen35_score']} out of range"
assert c["weight"] in ("CRITICAL", "HIGH", "MEDIUM")
def test_data_locality_is_critical(self):
"""Data locality should be CRITICAL weight."""
locality = [c for c in SECURITY_CRITERIA if "locality" in c["criterion"].lower()]
assert len(locality) == 1
assert locality[0]["weight"] == "CRITICAL"
assert locality[0]["qwen35_score"] == 10
def test_no_telemetry_is_critical(self):
no_phone = [c for c in SECURITY_CRITERIA if "telemetry" in c["criterion"].lower()]
assert len(no_phone) == 1
assert no_phone[0]["weight"] == "CRITICAL"
assert no_phone[0]["qwen35_score"] == 10
def test_weighted_average_above_adequate(self):
"""Weighted security score should be at least 7/10."""
weight_map = {"CRITICAL": 3, "HIGH": 2, "MEDIUM": 1}
total_w = sum(weight_map[c["weight"]] for c in SECURITY_CRITERIA)
total_s = sum(c["qwen35_score"] * weight_map[c["weight"]] for c in SECURITY_CRITERIA)
avg = total_s / total_w
assert avg >= 7.0, f"Weighted security score {avg:.1f} too low"
class TestHardwareProfiles:
"""Hardware compatibility checks."""
def test_high_mem_fits(self):
"""M2 Ultra 192GB should run Q4 and Q8."""
m2 = HARDWARE_PROFILES["mac_m2_ultra_192gb"]
assert m2["can_run_q4"] is True
assert m2["can_run_q8"] is True
def test_low_mem_doesnt_fit(self):
"""M1 16GB should NOT fit Qwen3.5:35B."""
m1 = HARDWARE_PROFILES["mac_m1_16gb"]
assert m1["can_run_q4"] is False
assert m1["recommended_quant"] is None
def test_mid_mem_fits_q4_only(self):
"""M4 Pro 48GB should fit Q4 but not Q8."""
m4 = HARDWARE_PROFILES["mac_m4_pro_48gb"]
assert m4["can_run_q4"] is True
assert m4["can_run_q8"] is False
class TestOllamaCheck:
"""Ollama status check."""
def test_returns_dict(self):
result = check_ollama_status()
assert isinstance(result, dict)
assert "running" in result
assert "models" in result
assert "qwen35_available" in result
def test_running_ollama_has_models(self):
"""If Ollama is running, it should list models."""
result = check_ollama_status()
if result["running"]:
assert isinstance(result["models"], list)
class TestReportGeneration:
"""Report generation."""
def test_report_is_string(self):
report = generate_report()
assert isinstance(report, str)
assert len(report) > 1000
def test_report_has_all_sections(self):
report = generate_report()
for section in ["Model Specification", "VRAM Requirements",
"Hardware Compatibility", "Security Evaluation",
"Fleet Comparison", "Ollama Status",
"Recommendation", "Integration Path"]:
assert section in report, f"Missing section: {section}"
def test_report_verdict(self):
report = generate_report()
assert "APPROVED" in report or "NEEDS WORK" in report

View File

@@ -129,6 +129,335 @@ def _approval_key_aliases(pattern_key: str) -> set[str]:
return _PATTERN_KEY_ALIASES.get(pattern_key, {pattern_key})
# =========================================================================
# Vitalik's Threat Model Extensions
# =========================================================================
# Extended threat categories addressing:
# 1. LLM jailbreaks (Remote content 'hacking' the LLM)
# 2. LLM accidents (Accidental data leakage)
# 3. Software bugs (Supply chain risks)
# =========================================================================
# --- LLM JAILBREAK THREATS ---
# Remote content that attempts to manipulate or "hack" the LLM's behavior
LLM_JAILBREAK_PATTERNS = [
# Prompt injection attempts in URLs or content
(r'(?i)(ignore|disregard|forget|override)\s+(all\s+)?(previous|prior|above|earlier)\s+(instructions|prompts|rules|constraints)', "prompt injection: override instructions"),
(r'(?i)you\s+are\s+now\s+(a\s+)?(different|new|unrestricted|jailbroken)\s+(ai|model|assistant|agent)', "prompt injection: identity override"),
(r'(?i)(system|admin|root)\s*:\s*(you\s+are|act\s+as|become|transform)', "prompt injection: system role hijack"),
(r'(?i)\[?(system|admin|root)\]?\s*[:=]\s*', "prompt injection: system tag injection"),
(r'(?i)<\|?(im_start|system|endoftext)\|?>', "prompt injection: token boundary attack"),
(r'(?i)(human|assistant|user)\s*:\s*(ignore|disregard|override)', "prompt injection: role confusion"),
# Content that attempts to extract system prompts
(r'(?i)(repeat|show|reveal|display|print|output)\s+(your|the|all)\s+(system|initial|original|hidden)\s+(prompt|instructions|rules|constraints)', "prompt extraction: system prompt leak"),
(r'(?i)what\s+(are|were)\s+your\s+(original|initial|system|hidden)\s+(instructions|prompts|rules)', "prompt extraction: instruction leak"),
(r'(?i)(translate|convert|encode|cipher|obfuscate)\s+your\s+(system|instructions)\s+(to|into|as)', "prompt extraction: encoded leak"),
# Social engineering attempts
(r'(?i)(pretend|imagine|roleplay|act\s+as\s+if)\s+(you\s+are|there\s+are\s+no|you\s+have\s+no)\s+(restrictions|limits|rules|constraints)', "social engineering: constraint removal"),
(r'(?i)this\s+is\s+(a\s+)?(test|simulation|exercise|training)\s+(environment|scenario|mode)', "social engineering: test environment bypass"),
(r'(?i)(emergency|urgent|critical)\s+override\s+required', "social engineering: urgency manipulation"),
]
# --- LLM ACCIDENT THREATS ---
# Patterns that indicate accidental data leakage or unintended disclosure
LLM_ACCIDENT_PATTERNS = [
# API keys and tokens in prompts or outputs
(r'(?i)(api[_-]?key|secret[_-]?key|access[_-]?token|auth[_-]?token)\s*[:=]\s*["']?[a-zA-Z0-9_\-]{20,}', "credential leak: API key/token"),
(r'(?i)(sk|pk|ak|tk)[-_]?[a-zA-Z0-9]{20,}', "credential leak: key pattern"),
(r'(?i)\b[A-Za-z0-9]{32,}\b', "potential leak: long alphanumeric string"),
# Private keys and certificates
(r'-----BEGIN\s+(RSA\s+)?PRIVATE\s+KEY-----', "credential leak: private key"),
(r'(?i)(ssh-rsa|ssh-ed25519)\s+[A-Za-z0-9+/=]+', "credential leak: SSH public key"),
# Database connection strings
(r'(?i)(mongodb|postgres|mysql|redis)://[^\s]+:[^\s]+@', "credential leak: database connection"),
(r'(?i)(host|server|endpoint)\s*[:=]\s*[^\s]+\s*(username|user|login)\s*[:=]\s*[^\s]+\s*(password|pass|pwd)\s*[:=]', "credential leak: connection details"),
# Environment variables that might contain secrets
(r'(?i)(export|set|env)\s+[A-Z_]*(KEY|SECRET|TOKEN|PASSWORD|CREDENTIAL)[A-Z_]*=', "potential leak: env var with secret name"),
# File paths that might expose sensitive data
(r'(?i)(/home/|/Users/|/root/|C:\\Users\\)[^\s]*(\.ssh/|\.aws/|\.config/|\.env)', "path exposure: sensitive directory"),
(r'(?i)(\.pem|\.key|\.cert|\.crt)\s*$', "file exposure: certificate/key file"),
]
# --- SOFTWARE BUG / SUPPLY CHAIN THREATS ---
# Patterns indicating potential supply chain attacks or software vulnerabilities
SUPPLY_CHAIN_PATTERNS = [
# Suspicious package installations
(r'(?i)(pip|npm|yarn|pnpm|cargo|go\s+get)\s+(install\s+)?[^\s]*(@|git\+|http|file:)', "supply chain: suspicious package source"),
(r'(?i)(pip|npm|yarn|pnpm)\s+install\s+[^\s]*\s*--(no-verify|trusted-host|allow-external)', "supply chain: insecure install flags"),
# Dependency confusion attacks
(r'(?i)(requirements\.txt|package\.json|Cargo\.toml|go\.mod)\s*.*\b(file:|git\+|http://|ftp://)\b', "supply chain: local/remote dependency"),
# Obfuscated code patterns
(r'(?i)(eval|exec|compile)\s*\(\s*(base64|chr|ord|\+|\.)\s*\)', "supply chain: obfuscated execution"),
(r'(?i)(atob|btoa|Buffer\.from)\s*\([^)]*\)', "supply chain: base64 decode/encode"),
# Typosquatting indicators
(r'(?i)(reqeusts|reqeust|requestr|requsts|reqests)', "supply chain: typosquatting attempt"),
(r'(?i)(pyyaml|yaml2|yaml3|yaml-lib)', "supply chain: suspicious YAML package"),
# Build system attacks
(r'(?i)(make|cmake|configure)\s+.*\b(CC|CXX|LD_LIBRARY_PATH|DYLD_LIBRARY_PATH)\s*=', "supply chain: build env manipulation"),
(r'(?i)(\.sh|\.bash|\.zsh)\s*\|\s*(sh|bash|zsh)', "supply chain: script execution via pipe"),
# Git submodule attacks
(r'(?i)git\s+submodule\s+(add|update|init)\s+[^\s]*(http|git@|ssh://)', "supply chain: git submodule attack"),
(r'(?i)\.gitmodules\s*.*\burl\s*=\s*[^\s]*(http|git@|ssh://)', "supply chain: malicious submodule URL"),
]
# =========================================================================
# Extended threat detection functions
# =========================================================================
def detect_llm_jailbreak(content: str) -> tuple:
"""Check if content contains LLM jailbreak attempts.
Returns:
(is_jailbreak, pattern_key, description) or (False, None, None)
"""
content_normalized = _normalize_command_for_detection(content).lower()
for pattern, description in LLM_JAILBREAK_PATTERNS:
if re.search(pattern, content_normalized, re.IGNORECASE | re.DOTALL):
pattern_key = description
return (True, pattern_key, description)
return (False, None, None)
def detect_llm_accident(content: str) -> tuple:
"""Check if content contains accidental data leakage patterns.
Returns:
(is_leak, pattern_key, description) or (False, None, None)
"""
content_normalized = _normalize_command_for_detection(content).lower()
for pattern, description in LLM_ACCIDENT_PATTERNS:
if re.search(pattern, content_normalized, re.IGNORECASE | re.DOTALL):
pattern_key = description
return (True, pattern_key, description)
return (False, None, None)
def detect_supply_chain_risk(content: str) -> tuple:
"""Check if content contains supply chain attack patterns.
Returns:
(is_risk, pattern_key, description) or (False, None, None)
"""
content_normalized = _normalize_command_for_detection(content).lower()
for pattern, description in SUPPLY_CHAIN_PATTERNS:
if re.search(pattern, content_normalized, re.IGNORECASE | re.DOTALL):
pattern_key = description
return (True, pattern_key, description)
return (False, None, None)
def check_all_threats(content: str, env_type: str = "local") -> dict:
"""Comprehensive threat check covering all threat categories.
Args:
content: The content to check (command, prompt, output, etc.)
env_type: Terminal/environment type
Returns:
dict with threat assessment and recommendations
"""
threats_found = []
# Check existing dangerous command patterns
is_dangerous, pattern_key, description = detect_dangerous_command(content)
if is_dangerous:
threats_found.append({
"category": "dangerous_command",
"pattern_key": pattern_key,
"description": description,
"severity": "high"
})
# Check LLM jailbreaks
is_jailbreak, jailbreak_key, jailbreak_desc = detect_llm_jailbreak(content)
if is_jailbreak:
threats_found.append({
"category": "llm_jailbreak",
"pattern_key": jailbreak_key,
"description": jailbreak_desc,
"severity": "critical"
})
# Check LLM accidents
is_leak, leak_key, leak_desc = detect_llm_accident(content)
if is_leak:
threats_found.append({
"category": "llm_accident",
"pattern_key": len(threats_found), # Unique key
"description": leak_desc,
"severity": "high"
})
# Check supply chain risks
is_risk, risk_key, risk_desc = detect_supply_chain_risk(content)
if is_risk:
threats_found.append({
"category": "supply_chain",
"pattern_key": risk_key,
"description": risk_desc,
"severity": "high"
})
# Determine overall risk level
if not threats_found:
return {
"safe": True,
"threats": [],
"overall_risk": "none",
"recommendation": "allow"
}
# Calculate overall risk
severities = [t["severity"] for t in threats_found]
if "critical" in severities:
overall_risk = "critical"
recommendation = "block"
elif "high" in severities:
overall_risk = "high"
recommendation = "require_approval"
else:
overall_risk = "medium"
recommendation = "warn"
return {
"safe": False,
"threats": threats_found,
"overall_risk": overall_risk,
"recommendation": recommendation,
"requires_approval": recommendation == "require_approval",
"should_block": recommendation == "block"
}
# =========================================================================
# Integration with existing approval system
# =========================================================================
def check_comprehensive_threats(command: str, env_type: str,
approval_callback=None) -> dict:
"""Extended threat check that includes Vitalik's threat model.
This function extends the existing check_dangerous_command to also
check for LLM jailbreaks, accidents, and supply chain risks.
Args:
command: The content to check
env_type: Environment type
approval_callback: Optional approval callback
Returns:
dict with approval decision and threat assessment
"""
# Skip containers for all checks
if env_type in ("docker", "singularity", "modal", "daytona"):
return {"approved": True, "message": None}
# --yolo: bypass all approval prompts
if os.getenv("HERMES_YOLO_MODE"):
return {"approved": True, "message": None}
# Run comprehensive threat check
threat_assessment = check_all_threats(command, env_type)
if threat_assessment["safe"]:
return {"approved": True, "message": None}
# Handle critical threats (block immediately)
if threat_assessment["should_block"]:
threat_list = "\n".join([f"- {t['description']}" for t in threat_assessment["threats"]])
return {
"approved": False,
"message": f"BLOCKED: Critical security threat detected.\n{threat_list}\n\nDo NOT proceed with this content.",
"threats": threat_assessment["threats"],
"overall_risk": threat_assessment["overall_risk"],
"blocked": True
}
# Handle threats requiring approval
if threat_assessment["requires_approval"]:
session_key = get_current_session_key()
threat_descriptions = "; ".join([t["description"] for t in threat_assessment["threats"]])
# Check if already approved for this session
all_pattern_keys = [t["pattern_key"] for t in threat_assessment["threats"]]
if all(is_approved(session_key, key) for key in all_pattern_keys):
return {"approved": True, "message": None}
# Submit for approval
is_cli = os.getenv("HERMES_INTERACTIVE")
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
if not is_cli and not is_gateway:
return {"approved": True, "message": None}
if is_gateway or os.getenv("HERMES_EXEC_ASK"):
submit_pending(session_key, {
"command": command,
"pattern_key": all_pattern_keys[0],
"pattern_keys": all_pattern_keys,
"description": threat_descriptions,
"threats": threat_assessment["threats"]
})
return {
"approved": False,
"pattern_key": all_pattern_keys[0],
"status": "approval_required",
"command": command,
"description": threat_descriptions,
"message": (
f"⚠️ Security threat detected ({threat_descriptions}). "
f"Asking the user for approval.\n\n**Content:**\n```\n{command[:500]}{'...' if len(command) > 500 else ''}\n```"
),
"threats": threat_assessment["threats"],
"overall_risk": threat_assessment["overall_risk"]
}
# CLI interactive approval
choice = prompt_dangerous_approval(command, threat_descriptions,
approval_callback=approval_callback)
if choice == "deny":
return {
"approved": False,
"message": f"BLOCKED: User denied security threat ({threat_descriptions}). Do NOT retry.",
"threats": threat_assessment["threats"],
"overall_risk": threat_assessment["overall_risk"]
}
if choice == "session":
for key in all_pattern_keys:
approve_session(session_key, key)
elif choice == "always":
for key in all_pattern_keys:
approve_session(session_key, key)
approve_permanent(key)
save_permanent_allowlist(_permanent_approved)
return {"approved": True, "message": None,
"user_approved": True, "description": threat_descriptions,
"threats": threat_assessment["threats"]}
# Default: warn but allow
return {
"approved": True,
"message": f"⚠️ Security warning: {threat_assessment['threats'][0]['description']}",
"threats": threat_assessment["threats"],
"overall_risk": threat_assessment["overall_risk"],
"warning": True
}
# =========================================================================
# Detection
# =========================================================================