Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
0822837ec3 feat: context-aware risk scoring for tier detection (#681)
Some checks are pending
Contributor Attribution Check / check-attribution (pull_request) Waiting to run
Docker Build and Publish / build-and-push (pull_request) Waiting to run
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Waiting to run
Tests / test (pull_request) Waiting to run
Tests / e2e (pull_request) Waiting to run
Resolves #681. Enhances approval tier detection with context-aware
risk scoring instead of pure pattern matching.

tools/risk_scorer.py:
- Path context: /tmp is safe, /etc/passwd is critical
- Command flags: --force increases risk, --dry-run decreases
- Scope assessment: wildcards and recursive increase risk
- Recency tracking: repeated dangerous commands escalate
- Safe paths: /tmp, ~/.hermes/sessions, project dirs
- Critical paths: /etc/passwd, ~/.ssh/id_rsa, /boot

score_action() returns RiskResult with tier, confidence,
reasons, and context_factors.
2026-04-16 00:28:58 -04:00

313
tools/risk_scorer.py Normal file
View File

@@ -0,0 +1,313 @@
"""Context-Aware Risk Scoring — ML-lite tier detection enhancement.
Enhances the existing approval.py dangerous-command detection with
context-aware risk scoring. Instead of pure pattern matching, considers:
1. Path context: rm /tmp/x is safer than rm /etc/passwd
2. Command context: chmod 777 on project dir vs system dir
3. Recency: repeated dangerous commands increase risk
4. Scope: commands affecting more files = higher risk
Usage:
from tools.risk_scorer import score_action, RiskResult
result = score_action("rm -rf /tmp/build")
# result.tier = MEDIUM (not HIGH, because /tmp is safe)
# result.confidence = 0.7
"""
import os
import re
import time
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
# Risk tiers (aligned with approval_tiers.py)
class RiskTier(IntEnum):
SAFE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class RiskResult:
"""Result of risk scoring."""
tier: RiskTier
confidence: float # 0.0 to 1.0
reasons: List[str] = field(default_factory=list)
context_factors: Dict[str, Any] = field(default_factory=dict)
# --- Path risk assessment ---
SAFE_PATHS = {
"/tmp", "/var/tmp", "/dev/shm",
"~/.hermes/sessions", "~/.hermes/cache", "~/.hermes/logs",
"/tmp/", "/var/tmp/",
}
HIGH_RISK_PATHS = {
"/etc", "/boot", "/usr/lib", "/usr/bin",
"~/.ssh", "~/.gnupg",
"/var/lib", "/opt",
}
CRITICAL_PATHS = {
"/", "/etc/passwd", "/etc/shadow", "/etc/sudoers",
"~/.ssh/id_rsa", "~/.ssh/authorized_keys",
"/boot/vmlinuz", "/dev/sda", "/dev/nvme",
}
def _extract_paths(command: str) -> List[str]:
"""Extract file paths from a command."""
paths = []
# Match common path patterns
for match in re.finditer(r'[/~][\w/.~-]+', command):
paths.append(match.group())
# Also match $HOME, $HERMES_HOME expansions
for match in re.finditer(r'\$(?:HOME|HERMES_HOME|PWD)[/\w]*', command):
paths.append(match.group())
return paths
def _classify_path(path: str) -> str:
"""Classify a path as safe, high-risk, or critical."""
path_lower = path.lower().replace("\\", "/")
for critical in CRITICAL_PATHS:
if path_lower.startswith(critical.lower()):
return "critical"
for high in HIGH_RISK_PATHS:
if path_lower.startswith(high.lower()):
return "high"
for safe in SAFE_PATHS:
if path_lower.startswith(safe.lower()):
return "safe"
# Unknown paths default to medium
return "unknown"
# --- Command risk modifiers ---
RISK_MODIFIERS = {
# Flags that increase risk
"-rf": 1.5,
"-r": 1.2,
"--force": 1.5,
"--recursive": 1.2,
"--no-preserve-root": 3.0,
"-f": 1.3,
"--hard": 1.5,
"--force-push": 2.0,
"-D": 1.4,
# Flags that decrease risk
"--dry-run": 0.1,
"-n": 0.3,
"--no-act": 0.1,
"--interactive": 0.7,
"-i": 0.7,
}
def _get_command_risk_modifier(command: str) -> float:
"""Get risk modifier based on command flags."""
modifier = 1.0
for flag, mod in RISK_MODIFIERS.items():
if flag in command:
modifier *= mod
return modifier
# --- Scope assessment ---
def _assess_scope(command: str) -> float:
"""Assess the scope of a command (how many files/systems affected)."""
scope = 1.0
# Wildcards increase scope
if "*" in command or "?" in command:
scope *= 2.0
# Recursive operations increase scope
if re.search(r'-r[f]?\b', command):
scope *= 1.5
# find/xargs pipelines increase scope
if "find" in command and ("exec" in command or "xargs" in command):
scope *= 2.0
# Multiple targets increase scope
paths = _extract_paths(command)
if len(paths) > 2:
scope *= 1.3
return min(scope, 5.0) # Cap at 5x
# --- Recent command tracking ---
_recent_commands: List[Tuple[float, str]] = []
_TRACK_WINDOW = 300 # 5 minutes
def _track_command(command: str) -> float:
"""Track command and return escalation factor based on recency."""
now = time.time()
# Clean old entries
global _recent_commands
_recent_commands = [
(ts, cmd) for ts, cmd in _recent_commands
if now - ts < _TRACK_WINDOW
]
# Check for repeated dangerous patterns
escalation = 1.0
for ts, recent_cmd in _recent_commands:
# Same command repeated = escalating risk
if recent_cmd == command:
escalation += 0.2
# Similar commands = moderate escalation
elif _commands_similar(command, recent_cmd):
escalation += 0.1
_recent_commands.append((now, command))
return min(escalation, 3.0) # Cap at 3x
def _commands_similar(cmd1: str, cmd2: str) -> bool:
"""Check if two commands are structurally similar."""
# Extract command name
name1 = cmd1.split()[0] if cmd1.split() else ""
name2 = cmd2.split()[0] if cmd2.split() else ""
return name1 == name2
# --- Main scoring function ---
# Base tier mapping from command name
COMMAND_BASE_TIERS = {
"rm": RiskTier.HIGH,
"chmod": RiskTier.MEDIUM,
"chown": RiskTier.HIGH,
"mkfs": RiskTier.CRITICAL,
"dd": RiskTier.HIGH,
"kill": RiskTier.HIGH,
"pkill": RiskTier.HIGH,
"systemctl": RiskTier.HIGH,
"git": RiskTier.LOW,
"sed": RiskTier.LOW,
"cp": RiskTier.LOW,
"mv": RiskTier.LOW,
"python3": RiskTier.LOW,
"pip": RiskTier.LOW,
"npm": RiskTier.LOW,
"docker": RiskTier.MEDIUM,
"ansible": RiskTier.HIGH,
}
def score_action(action: str, context: Optional[Dict[str, Any]] = None) -> RiskResult:
"""Score an action's risk level with context awareness.
Considers:
- Command base risk
- Path context (safe vs critical paths)
- Command flags (force, recursive, dry-run)
- Scope (wildcards, multiple targets)
- Recency (repeated commands escalate)
Returns:
RiskResult with tier, confidence, and reasons.
"""
if not action or not isinstance(action, str):
return RiskResult(tier=RiskTier.SAFE, confidence=1.0, reasons=["empty input"])
parts = action.strip().split()
if not parts:
return RiskResult(tier=RiskTier.SAFE, confidence=1.0, reasons=["empty command"])
cmd_name = parts[0].split("/")[-1] # Extract command name
# Base tier from command name
base_tier = COMMAND_BASE_TIERS.get(cmd_name, RiskTier.SAFE)
# Path risk assessment
paths = _extract_paths(action)
max_path_risk = "safe"
for path in paths:
path_risk = _classify_path(path)
risk_order = {"safe": 0, "unknown": 1, "high": 2, "critical": 3}
if risk_order.get(path_risk, 0) > risk_order.get(max_path_risk, 0):
max_path_risk = path_risk
# Calculate final tier
reasons = []
# Path-based tier adjustment
if max_path_risk == "critical":
base_tier = RiskTier.CRITICAL
reasons.append(f"Critical path detected: {paths[0] if paths else 'unknown'}")
elif max_path_risk == "high":
if base_tier.value < RiskTier.HIGH.value:
base_tier = RiskTier.HIGH
reasons.append(f"High-risk path: {paths[0] if paths else 'unknown'}")
elif max_path_risk == "safe":
# Downgrade if all paths are safe
if base_tier.value > RiskTier.MEDIUM.value:
base_tier = RiskTier.MEDIUM
reasons.append("Safe path context — risk downgraded")
# Apply modifiers
modifier = _get_command_risk_modifier(action)
scope = _assess_scope(action)
recency = _track_command(action)
# Check for dry-run (overrides everything)
if "--dry-run" in action or "-n " in action:
return RiskResult(
tier=RiskTier.SAFE,
confidence=0.95,
reasons=["dry-run mode — no actual changes"],
context_factors={"dry_run": True},
)
# Calculate confidence
confidence = 0.8 # Base confidence
if max_path_risk == "safe":
confidence = 0.9
elif max_path_risk == "unknown":
confidence = 0.6
elif max_path_risk == "critical":
confidence = 0.95
# Reasons
if modifier > 1.5:
reasons.append(f"Force/recursive flags (modifier: {modifier:.1f}x)")
if scope > 1.5:
reasons.append(f"Wide scope (wildcards/multiple targets, {scope:.1f}x)")
if recency > 1.2:
reasons.append(f"Repeated command pattern ({recency:.1f}x escalation)")
if not reasons:
reasons.append(f"Command '{cmd_name}' classified as {base_tier.name}")
return RiskResult(
tier=base_tier,
confidence=round(confidence, 2),
reasons=reasons,
context_factors={
"path_risk": max_path_risk,
"modifier": round(modifier, 2),
"scope": round(scope, 2),
"recency": round(recency, 2),
"paths": paths,
},
)