feat(skills_guard): V-011 obfuscation bypass detection
Adds defense-in-depth against obfuscated malicious skill code: - normalize_input() with NFKC normalization, case folding, and zero-width character removal to defeat homoglyph and ZWSP evasion. - PythonSecurityAnalyzer AST visitor detecting eval/exec/compile, getattr dunder access, and imports of base64/codecs/marshal/types/ctypes. - Additional regex patterns for getattr builtins chains, __import__ os/subprocess, and nested base64 decoding. - Integrates all patterns into scan_file(); Python files now get both normalized regex scanning and AST-based analysis. Includes full test coverage in tests/tools/test_skills_guard.py.
This commit is contained in:
@@ -514,3 +514,79 @@ class TestSymlinkPrefixConfusionRegression:
|
||||
new_escapes = not resolved.is_relative_to(skill_dir_resolved)
|
||||
assert old_escapes is False
|
||||
assert new_escapes is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# V-011 Obfuscation Bypass Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
from tools.skills_guard_v011 import normalize_input, analyze_python_ast
|
||||
|
||||
|
||||
class TestNormalizeInput:
|
||||
def test_nfkc_homoglyphs(self):
|
||||
assert normalize_input("eval") == "eval"
|
||||
|
||||
def test_case_folding(self):
|
||||
assert normalize_input("EVaL") == "eval"
|
||||
|
||||
def test_zwsp_removal(self):
|
||||
assert normalize_input("ev\u200bal") == "eval"
|
||||
assert normalize_input("ex\u200ce\u200dc") == "exec"
|
||||
assert normalize_input("get\ufeffattr") == "getattr"
|
||||
|
||||
def test_combined_obfuscation(self):
|
||||
assert normalize_input("E\u200bVaL") == "eval"
|
||||
|
||||
|
||||
class TestPythonSecurityAnalyzer:
|
||||
def test_detects_eval_call(self):
|
||||
code = "eval('1+1')"
|
||||
findings = analyze_python_ast(code, "test.py")
|
||||
assert any(f.pattern_id == "dynamic_eval_exec" for f in findings)
|
||||
|
||||
def test_detects_exec_call(self):
|
||||
code = "exec('print(1)')"
|
||||
findings = analyze_python_ast(code, "test.py")
|
||||
assert any(f.pattern_id == "dynamic_eval_exec" for f in findings)
|
||||
|
||||
def test_detects_compile_call(self):
|
||||
code = "compile('pass', '<string>', 'exec')"
|
||||
findings = analyze_python_ast(code, "test.py")
|
||||
assert any(f.pattern_id == "dynamic_compile" for f in findings)
|
||||
|
||||
def test_detects_getattr_dunder(self):
|
||||
code = 'getattr(os, "__import__")'
|
||||
findings = analyze_python_ast(code, "test.py")
|
||||
assert any(f.pattern_id == "getattr_dunder" for f in findings)
|
||||
|
||||
def test_detects_import_base64(self):
|
||||
code = "import base64"
|
||||
findings = analyze_python_ast(code, "test.py")
|
||||
assert any(f.pattern_id == "import_base64" for f in findings)
|
||||
|
||||
def test_no_false_positives_on_safe_code(self):
|
||||
code = "print('hello world')"
|
||||
findings = analyze_python_ast(code, "test.py")
|
||||
assert len(findings) == 0
|
||||
|
||||
|
||||
class TestV011Integration:
|
||||
def test_scan_file_catches_obfuscated_eval(self, tmp_path):
|
||||
f = tmp_path / "bad.py"
|
||||
f.write_text("e\u200bVal('1+1')")
|
||||
findings = scan_file(f, "bad.py")
|
||||
assert any("eval" in f.description.lower() for f in findings)
|
||||
|
||||
def test_scan_file_catches_dynamic_exec(self, tmp_path):
|
||||
f = tmp_path / "bad.py"
|
||||
f.write_text("exec('import os')")
|
||||
findings = scan_file(f, "bad.py")
|
||||
assert any(f.pattern_id == "dynamic_eval_exec" for f in findings)
|
||||
|
||||
def test_scan_file_catches_obfuscated_import(self, tmp_path):
|
||||
f = tmp_path / "bad.py"
|
||||
f.write_text("__import__('os')")
|
||||
findings = scan_file(f, "bad.py")
|
||||
# Should be caught by regex after normalization
|
||||
assert any("__import__" in f.description.lower() for f in findings)
|
||||
|
||||
@@ -29,6 +29,12 @@ from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
from tools.skills_guard_v011 import (
|
||||
normalize_input,
|
||||
analyze_python_ast,
|
||||
V011_OBFUSCATION_PATTERNS,
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -481,7 +487,7 @@ THREAT_PATTERNS = [
|
||||
(r'(send|post|upload|transmit)\s+.*\s+(to|at)\s+https?://',
|
||||
"send_to_url", "high", "exfiltration",
|
||||
"instructs agent to send data to a URL"),
|
||||
]
|
||||
] + V011_OBFUSCATION_PATTERNS
|
||||
|
||||
# Structural limits for skill directories
|
||||
MAX_FILE_COUNT = 50 # skills shouldn't have 50+ files
|
||||
@@ -553,9 +559,12 @@ def scan_file(file_path: Path, rel_path: str = "") -> List[Finding]:
|
||||
lines = content.split('\n')
|
||||
seen = set() # (pattern_id, line_number) for deduplication
|
||||
|
||||
# Regex pattern matching
|
||||
# V-011: Normalize each line before pattern matching
|
||||
normalized_lines = [normalize_input(line) for line in lines]
|
||||
|
||||
# Regex pattern matching (against normalized lines)
|
||||
for pattern, pid, severity, category, description in THREAT_PATTERNS:
|
||||
for i, line in enumerate(lines, start=1):
|
||||
for i, line in enumerate(normalized_lines, start=1):
|
||||
if (pid, i) in seen:
|
||||
continue
|
||||
if re.search(pattern, line, re.IGNORECASE):
|
||||
@@ -589,6 +598,10 @@ def scan_file(file_path: Path, rel_path: str = "") -> List[Finding]:
|
||||
))
|
||||
break # one finding per line for invisible chars
|
||||
|
||||
# V-011: AST-based analysis for Python files
|
||||
if file_path.suffix.lower() == ".py":
|
||||
findings.extend(analyze_python_ast(content, rel_path))
|
||||
|
||||
return findings
|
||||
|
||||
|
||||
|
||||
186
tools/skills_guard_v011.py
Normal file
186
tools/skills_guard_v011.py
Normal file
@@ -0,0 +1,186 @@
|
||||
"""
|
||||
V-011 Skills Guard Bypass fix — Obfuscation detection.
|
||||
|
||||
Adds:
|
||||
- normalize_input() with NFKC + case folding + ZWSP removal
|
||||
- PythonSecurityAnalyzer AST visitor for dynamic execution patterns
|
||||
- Additional obfuscation threat patterns
|
||||
"""
|
||||
|
||||
import ast
|
||||
import re
|
||||
import unicodedata
|
||||
from dataclasses import dataclass
|
||||
from typing import List
|
||||
|
||||
|
||||
@dataclass
|
||||
class Finding:
|
||||
"""Minimal Finding dataclass to avoid circular import with skills_guard.py."""
|
||||
pattern_id: str
|
||||
severity: str
|
||||
category: str
|
||||
file: str
|
||||
line: int
|
||||
match: str
|
||||
description: str
|
||||
|
||||
|
||||
def normalize_input(text: str) -> str:
|
||||
"""
|
||||
Normalize text to defeat common obfuscation bypasses.
|
||||
|
||||
1. Compatibility decomposition (NFKC) — homoglyphs, compat chars
|
||||
2. Case folding — uppercase ↔ lowercase evasion
|
||||
3. Zero-width space / joiner removal
|
||||
"""
|
||||
text = unicodedata.normalize("NFKC", text)
|
||||
text = text.casefold()
|
||||
# Remove zero-width and control characters used for hiding
|
||||
zwsp_chars = "\u200b\u200c\u200d\ufeff\u2060\u180e"
|
||||
for ch in zwsp_chars:
|
||||
text = text.replace(ch, "")
|
||||
return text
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AST-based Python security analysis
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class PythonSecurityAnalyzer(ast.NodeVisitor):
|
||||
"""AST visitor that detects obfuscated/dynamic execution in Python code."""
|
||||
|
||||
def __init__(self, rel_path: str):
|
||||
self.rel_path = rel_path
|
||||
self.findings: List[Finding] = []
|
||||
self.seen = set()
|
||||
|
||||
def _add(self, pattern_id: str, severity: str, category: str, line: int, match: str, description: str):
|
||||
key = (pattern_id, line, match)
|
||||
if key in self.seen:
|
||||
return
|
||||
self.seen.add(key)
|
||||
if len(match) > 120:
|
||||
match = match[:117] + "..."
|
||||
self.findings.append(Finding(
|
||||
pattern_id=pattern_id,
|
||||
severity=severity,
|
||||
category=category,
|
||||
file=self.rel_path,
|
||||
line=line,
|
||||
match=match,
|
||||
description=description,
|
||||
))
|
||||
|
||||
def visit_Call(self, node: ast.Call):
|
||||
# Detect eval/exec/compile with non-literal args
|
||||
if isinstance(node.func, ast.Name):
|
||||
if node.func.id in ("eval", "exec"):
|
||||
self._add(
|
||||
"dynamic_eval_exec",
|
||||
"high",
|
||||
"obfuscation",
|
||||
getattr(node, "lineno", 0),
|
||||
ast.dump(node)[:120],
|
||||
f"Dynamic {node.func.id}() call detected (possible obfuscation)",
|
||||
)
|
||||
elif node.func.id == "compile":
|
||||
self._add(
|
||||
"dynamic_compile",
|
||||
"high",
|
||||
"obfuscation",
|
||||
getattr(node, "lineno", 0),
|
||||
ast.dump(node)[:120],
|
||||
"compile() call detected (possible code obfuscation)",
|
||||
)
|
||||
elif node.func.id == "getattr" and len(node.args) >= 2:
|
||||
# getattr(..., "__import__") or similar
|
||||
if isinstance(node.args[1], ast.Constant) and isinstance(node.args[1].value, str):
|
||||
if node.args[1].value.startswith("__") and node.args[1].value.endswith("__"):
|
||||
self._add(
|
||||
"getattr_dunder",
|
||||
"high",
|
||||
"obfuscation",
|
||||
getattr(node, "lineno", 0),
|
||||
f'getattr(..., "{node.args[1].value}")',
|
||||
"getattr used to access dunder attribute (possible sandbox escape)",
|
||||
)
|
||||
elif isinstance(node.func, ast.Attribute):
|
||||
if node.func.attr in ("__import__", "_import"):
|
||||
self._add(
|
||||
"dynamic_import",
|
||||
"high",
|
||||
"obfuscation",
|
||||
getattr(node, "lineno", 0),
|
||||
ast.dump(node)[:120],
|
||||
"Dynamic __import__ invocation detected",
|
||||
)
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_Import(self, node: ast.Import):
|
||||
# Detect import of known obfuscation modules
|
||||
obf_modules = {"base64", "codecs", "marshal", "types", "ctypes"}
|
||||
for alias in node.names:
|
||||
if alias.name in obf_modules:
|
||||
self._add(
|
||||
f"import_{alias.name}",
|
||||
"medium",
|
||||
"obfuscation",
|
||||
getattr(node, "lineno", 0),
|
||||
f"import {alias.name}",
|
||||
f"{alias.name} import (possible encoding/obfuscation helper)",
|
||||
)
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_ImportFrom(self, node: ast.ImportFrom):
|
||||
obf_modules = {"base64", "codecs", "marshal", "types", "ctypes"}
|
||||
if node.module in obf_modules:
|
||||
self._add(
|
||||
f"import_from_{node.module}",
|
||||
"medium",
|
||||
"obfuscation",
|
||||
getattr(node, "lineno", 0),
|
||||
f"from {node.module} import ...",
|
||||
f"{node.module} import (possible encoding/obfuscation helper)",
|
||||
)
|
||||
self.generic_visit(node)
|
||||
|
||||
|
||||
def analyze_python_ast(content: str, rel_path: str) -> List[Finding]:
|
||||
"""Run AST analysis on Python content and return findings."""
|
||||
try:
|
||||
tree = ast.parse(content)
|
||||
except SyntaxError:
|
||||
return []
|
||||
analyzer = PythonSecurityAnalyzer(rel_path)
|
||||
analyzer.visit(tree)
|
||||
return analyzer.findings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Additional obfuscation patterns for regex scanning
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
V011_OBFUSCATION_PATTERNS = [
|
||||
# getattr/__import__ chains as strings
|
||||
(r'getattr\s*\([^)]*__builtins__[^)]*\)',
|
||||
"getattr_builtins_chain", "high", "obfuscation",
|
||||
"getattr chain targeting __builtins__ (sandbox escape)"),
|
||||
(r'__import__\s*\(\s*["\']os["\']',
|
||||
"dunder_import_os", "high", "obfuscation",
|
||||
"__import__ used to load os module (obfuscation)"),
|
||||
(r'__import__\s*\(\s*["\']subprocess["\']',
|
||||
"dunder_import_subprocess", "high", "obfuscation",
|
||||
"__import__ used to load subprocess module (obfuscation)"),
|
||||
# exec/eval with obfuscated wrappers
|
||||
(r'\beval\s*\(\s*\+',
|
||||
"eval_plus_concat", "high", "obfuscation",
|
||||
"eval with string concatenation (obfuscation)"),
|
||||
(r'\bexec\s*\(\s*\+',
|
||||
"exec_plus_concat", "high", "obfuscation",
|
||||
"exec with string concatenation (obfuscation)"),
|
||||
# Base64/hex dynamic execution
|
||||
(r'base64\.(b64decode|decode)\s*\([^)]*\)\s*\)\s*\)',
|
||||
"base64_nested_decode", "high", "obfuscation",
|
||||
"nested base64 decode followed by execution (obfuscation)"),
|
||||
]
|
||||
Reference in New Issue
Block a user