Compare commits
2 Commits
main
...
rescue/v01
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3c8510cc0a | ||
|
|
5a47056073 |
@@ -13,7 +13,6 @@ concurrency:
|
||||
jobs:
|
||||
smoke-and-build:
|
||||
runs-on: ubuntu-latest
|
||||
container: catthehacker/ubuntu:act-22.04
|
||||
timeout-minutes: 5
|
||||
steps:
|
||||
- name: Checkout code
|
||||
@@ -21,9 +20,6 @@ jobs:
|
||||
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v5
|
||||
with:
|
||||
enable-cache: true
|
||||
cache-dependency-glob: "uv.lock"
|
||||
|
||||
- name: Set up Python 3.11
|
||||
run: uv python install 3.11
|
||||
|
||||
@@ -11,7 +11,6 @@ on:
|
||||
jobs:
|
||||
notebook-smoke:
|
||||
runs-on: ubuntu-latest
|
||||
container: catthehacker/ubuntu:act-22.04
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
1
.github/workflows/dependency-audit.yml
vendored
1
.github/workflows/dependency-audit.yml
vendored
@@ -19,7 +19,6 @@ jobs:
|
||||
audit:
|
||||
name: Audit Python dependencies
|
||||
runs-on: ubuntu-latest
|
||||
container: catthehacker/ubuntu:act-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: astral-sh/setup-uv@v5
|
||||
|
||||
1
.github/workflows/docs-site-checks.yml
vendored
1
.github/workflows/docs-site-checks.yml
vendored
@@ -10,7 +10,6 @@ on:
|
||||
jobs:
|
||||
docs-site-checks:
|
||||
runs-on: ubuntu-latest
|
||||
container: catthehacker/ubuntu:act-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ jobs:
|
||||
create-audit-issue:
|
||||
name: Create quarterly security audit issue
|
||||
runs-on: ubuntu-latest
|
||||
container: catthehacker/ubuntu:act-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
|
||||
1
.github/workflows/secret-scan.yml
vendored
1
.github/workflows/secret-scan.yml
vendored
@@ -12,7 +12,6 @@ jobs:
|
||||
scan:
|
||||
name: Scan for secrets
|
||||
runs-on: ubuntu-latest
|
||||
container: catthehacker/ubuntu:act-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
|
||||
1
.github/workflows/supply-chain-audit.yml
vendored
1
.github/workflows/supply-chain-audit.yml
vendored
@@ -12,7 +12,6 @@ jobs:
|
||||
scan:
|
||||
name: Scan PR for supply chain risks
|
||||
runs-on: ubuntu-latest
|
||||
container: catthehacker/ubuntu:act-22.04
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
1
.github/workflows/tests.yml
vendored
1
.github/workflows/tests.yml
vendored
@@ -14,7 +14,6 @@ concurrency:
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
container: catthehacker/ubuntu:act-22.04
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- name: Checkout code
|
||||
|
||||
@@ -64,7 +64,7 @@ class ContextCompressor:
|
||||
model: str,
|
||||
threshold_percent: float = 0.50,
|
||||
protect_first_n: int = 3,
|
||||
protect_last_n: int = 20,
|
||||
protect_last_n: int = 5,
|
||||
summary_target_ratio: float = 0.20,
|
||||
quiet_mode: bool = False,
|
||||
summary_model_override: str = None,
|
||||
|
||||
@@ -1180,7 +1180,7 @@ class AIAgent:
|
||||
compression_enabled = str(_compression_cfg.get("enabled", True)).lower() in ("true", "1", "yes")
|
||||
compression_summary_model = _compression_cfg.get("summary_model") or None
|
||||
compression_target_ratio = float(_compression_cfg.get("target_ratio", 0.20))
|
||||
compression_protect_last = int(_compression_cfg.get("protect_last_n", 20))
|
||||
compression_protect_last = int(_compression_cfg.get("protect_last_n", 5))
|
||||
|
||||
# Read explicit context_length override from model config
|
||||
_model_cfg = _agent_cfg.get("model", {})
|
||||
|
||||
@@ -98,23 +98,9 @@ class HealthReport:
|
||||
self.passed = False
|
||||
|
||||
|
||||
EXCLUDED_PATH_SEGMENTS = frozenset({
|
||||
".cache", "__pycache__", ".venv", "venv", "site-packages",
|
||||
".local/share/uv", "node_modules", ".git", ".tox",
|
||||
})
|
||||
|
||||
|
||||
def _is_excluded_path(path: Path) -> bool:
|
||||
"""Skip cache, venv, and package-manager directories."""
|
||||
parts = set(path.parts)
|
||||
return not parts.isdisjoint(EXCLUDED_PATH_SEGMENTS)
|
||||
|
||||
|
||||
def scan_orphaned_bytecode(root: Path, report: HealthReport) -> None:
|
||||
"""Detect .pyc files without corresponding .py source files."""
|
||||
for pyc in root.rglob("*.pyc"):
|
||||
if _is_excluded_path(pyc):
|
||||
continue
|
||||
py = pyc.with_suffix(".py")
|
||||
if not py.exists():
|
||||
# Also check __pycache__ naming convention
|
||||
@@ -156,12 +142,6 @@ def _is_sensitive_filename(name: str) -> bool:
|
||||
lower = name.lower()
|
||||
if lower == ".env.example":
|
||||
return False
|
||||
# Skip stylesheet and documentation artifacts
|
||||
if lower.endswith(".css"):
|
||||
return False
|
||||
# Skip scanner tooling — these are detectors, not secrets
|
||||
if lower in {"secret_scan.py", "secret_scanner.py"}:
|
||||
return False
|
||||
if any(pat in lower for pat in SENSITIVE_FILE_PATTERNS):
|
||||
return True
|
||||
if any(lower.startswith(pref) for pref in SENSITIVE_NAME_PREFIXES):
|
||||
@@ -176,8 +156,6 @@ def scan_sensitive_file_permissions(root: Path, report: HealthReport, fix: bool
|
||||
for fpath in root.rglob("*"):
|
||||
if not fpath.is_file():
|
||||
continue
|
||||
if _is_excluded_path(fpath):
|
||||
continue
|
||||
# Skip test files — real secrets should never live in tests/
|
||||
if "/tests/" in str(fpath) or str(fpath).startswith(str(root / "tests")):
|
||||
continue
|
||||
|
||||
@@ -1,100 +0,0 @@
|
||||
---
|
||||
name: gitea-workflow-automation
|
||||
title: Gitea Workflow Automation
|
||||
description: Automate Gitea issues, PRs, and repository workflows via the API for forge CI and backlog tracking.
|
||||
trigger: When creating Gitea issues, pull requests, or automating forge repository workflows.
|
||||
---
|
||||
|
||||
# Gitea Workflow Automation
|
||||
|
||||
## Trigger
|
||||
Use this skill when automating Gitea operations: creating issues, opening PRs, checking repository state, or integrating Gitea into CI/backlog workflows.
|
||||
|
||||
## Prerequisites
|
||||
- `GITEA_URL` environment variable set (e.g., `https://forge.alexanderwhitestone.com`)
|
||||
- `GITEA_TOKEN` environment variable with a valid API token
|
||||
- `GITEA_USER` or explicit owner/org name
|
||||
- `curl` and `jq` available in the environment
|
||||
|
||||
## Step-by-Step Workflow
|
||||
|
||||
### 1. Verify Environment
|
||||
```bash
|
||||
: "${GITEA_URL?}" "${GITEA_TOKEN?}" "${GITEA_USER?}"
|
||||
echo "Gitea env OK"
|
||||
```
|
||||
|
||||
### 2. List Issues in a Repository
|
||||
```bash
|
||||
curl -s -H "Authorization: token ${GITEA_TOKEN}" \
|
||||
"${GITEA_URL}/api/v1/repos/${OWNER}/${REPO}/issues?state=open&limit=50" | jq '.[] | {number, title, state}'
|
||||
```
|
||||
|
||||
### 3. Create an Issue
|
||||
```bash
|
||||
curl -s -X POST -H "Authorization: token ${GITEA_TOKEN}" \
|
||||
-H "Content-Type: application/json" \
|
||||
"${GITEA_URL}/api/v1/repos/${OWNER}/${REPO}/issues" \
|
||||
-d "{\"title\":\"${TITLE}\",\"body\":\"${BODY}\",\"assignees\":[\"${ASSIGNEE}\"]}
|
||||
```
|
||||
- Escape newlines in `BODY` if passing inline; prefer a JSON file for multi-line bodies.
|
||||
|
||||
### 4. Create a Pull Request
|
||||
```bash
|
||||
curl -s -X POST -H "Authorization: token ${GITEA_TOKEN}" \
|
||||
-H "Content-Type: application/json" \
|
||||
"${GITEA_URL}/api/v1/repos/${OWNER}/${REPO}/pulls" \
|
||||
-d "{\"title\":\"${TITLE}\",\"body\":\"${BODY}\",\"head\":\"${BRANCH}\",\"base\":\"${BASE_BRANCH}\"}"
|
||||
```
|
||||
|
||||
### 5. Check PR Status / Diff
|
||||
```bash
|
||||
curl -s -H "Authorization: token ${GITEA_TOKEN}" \
|
||||
"${GITEA_URL}/api/v1/repos/${OWNER}/${REPO}/pulls/${PR_NUMBER}" | jq '{number, title, state, mergeable}'
|
||||
```
|
||||
|
||||
### 6. Push Code Before Opening PR
|
||||
```bash
|
||||
git checkout -b "${BRANCH}"
|
||||
git add .
|
||||
git commit -m "${COMMIT_MSG}"
|
||||
git push origin "${BRANCH}"
|
||||
```
|
||||
|
||||
### 7. Add Comments to Issues/PRs
|
||||
```bash
|
||||
curl -s -X POST -H "Authorization: token ${GITEA_TOKEN}" \
|
||||
-H "Content-Type: application/json" \
|
||||
"${GITEA_URL}/api/v1/repos/${OWNER}/${REPO}/issues/${NUMBER}/comments" \
|
||||
-d "{\"body\":\"${COMMENT_BODY}\"}"
|
||||
```
|
||||
|
||||
## Verification Checklist
|
||||
- [ ] Environment variables are exported and non-empty
|
||||
- [ ] API responses are parsed with `jq` to confirm success
|
||||
- [ ] Issue/PR numbers are captured from the JSON response for cross-linking
|
||||
- [ ] Branch exists on remote before creating a PR
|
||||
- [ ] Multi-line bodies are written to a temp JSON file to avoid escaping hell
|
||||
|
||||
## Pitfalls
|
||||
- **Trailing slashes in `GITEA_URL`:** Ensure `GITEA_URL` does not end with `/` or double slashes break URLs.
|
||||
- **Branch not pushed:** Creating a PR for a local-only branch returns 422.
|
||||
- **Escape hell:** For multi-line issue/PR bodies, write JSON to a file with `cat <<EOF > /tmp/payload.json` and pass `@/tmp/payload.json` to curl instead of inline strings.
|
||||
- **Token scope:** If operations fail with 403, verify the token has `repo` or `write:issue` scope.
|
||||
- **Pagination:** Default limit is 30 issues; use `?limit=100` or paginate with `page=` for large backlogs.
|
||||
|
||||
## Example: Full Issue Creation with File Body
|
||||
```bash
|
||||
cat <<'EOF' > /tmp/issue.json
|
||||
{
|
||||
"title": "[Bezalel] Forge Health Check",
|
||||
"body": "Build a diagnostic scanner for artifact integrity and permissions.\n\n- Detect .pyc without .py source\n- Detect world-readable sensitive files\n- Output JSON for CI consumption",
|
||||
"assignees": ["bezalel"],
|
||||
"labels": ["enhancement", "security"]
|
||||
}
|
||||
EOF
|
||||
curl -s -X POST -H "Authorization: token ${GITEA_TOKEN}" \
|
||||
-H "Content-Type: application/json" \
|
||||
"${GITEA_URL}/api/v1/repos/Timmy_Foundation/hermes-agent/issues" \
|
||||
-d @/tmp/issue.json | jq '.number'
|
||||
```
|
||||
@@ -557,8 +557,41 @@ class TestSummaryTargetRatio:
|
||||
assert c.threshold_percent == 0.50
|
||||
assert c.threshold_tokens == 50_000
|
||||
|
||||
def test_default_protect_last_n_is_20(self):
|
||||
"""Default protect_last_n should be 20."""
|
||||
def test_default_protect_last_n_is_5(self):
|
||||
"""Default protect_last_n should be 5 (Last 5 Turns protection)."""
|
||||
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
|
||||
c = ContextCompressor(model="test", quiet_mode=True)
|
||||
assert c.protect_last_n == 20
|
||||
assert c.protect_last_n == 5
|
||||
|
||||
def test_last_5_turns_protected_from_compression(self):
|
||||
"""The most recent 5 messages must never be summarized away."""
|
||||
with patch("agent.context_compressor.get_model_context_length", return_value=10_000):
|
||||
c = ContextCompressor(
|
||||
model="test",
|
||||
quiet_mode=True,
|
||||
protect_first_n=2,
|
||||
protect_last_n=5,
|
||||
threshold_percent=0.50,
|
||||
)
|
||||
# Build messages: system + 11 user/assistant exchanges
|
||||
messages = [{"role": "system", "content": "sys"}]
|
||||
for i in range(11):
|
||||
messages.append({"role": "user", "content": f"user {i}"})
|
||||
messages.append({"role": "assistant", "content": f"assistant {i}"})
|
||||
|
||||
c.last_prompt_tokens = 6_000
|
||||
|
||||
with patch.object(c, "_generate_summary", return_value="[SUMMARY]"):
|
||||
result = c.compress(messages, current_tokens=6_000)
|
||||
|
||||
# The tail should preserve the last 5 raw messages from the original list.
|
||||
# Original last 5 messages: assistant 8, user 9, assistant 9, user 10, assistant 10
|
||||
# The summary may be merged into the first tail message to avoid role collision.
|
||||
tail_roles = [m.get("role") for m in result[-5:]]
|
||||
tail_contents = [m.get("content", "") for m in result[-5:]]
|
||||
assert tail_roles == ["assistant", "user", "assistant", "user", "assistant"]
|
||||
assert tail_contents[-4:] == [
|
||||
"user 9", "assistant 9", "user 10", "assistant 10"
|
||||
]
|
||||
# First tail message has the original content preserved (possibly merged with summary)
|
||||
assert "assistant 8" in tail_contents[0]
|
||||
|
||||
@@ -279,7 +279,7 @@ class TestSkillViewFilePathSecurity:
|
||||
"""Tests for file_path parameter security in skill_view."""
|
||||
|
||||
@pytest.fixture
|
||||
def setup_skill_with_files(self, tmp_path):
|
||||
def setup_skill_with_files(self, tmp_path):
|
||||
"""Create a skill with supporting files."""
|
||||
skills_dir = tmp_path / "skills"
|
||||
skills_dir.mkdir()
|
||||
|
||||
@@ -514,3 +514,79 @@ class TestSymlinkPrefixConfusionRegression:
|
||||
new_escapes = not resolved.is_relative_to(skill_dir_resolved)
|
||||
assert old_escapes is False
|
||||
assert new_escapes is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# V-011 Obfuscation Bypass Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
from tools.skills_guard_v011 import normalize_input, analyze_python_ast
|
||||
|
||||
|
||||
class TestNormalizeInput:
|
||||
def test_nfkc_homoglyphs(self):
|
||||
assert normalize_input("eval") == "eval"
|
||||
|
||||
def test_case_folding(self):
|
||||
assert normalize_input("EVaL") == "eval"
|
||||
|
||||
def test_zwsp_removal(self):
|
||||
assert normalize_input("ev\u200bal") == "eval"
|
||||
assert normalize_input("ex\u200ce\u200dc") == "exec"
|
||||
assert normalize_input("get\ufeffattr") == "getattr"
|
||||
|
||||
def test_combined_obfuscation(self):
|
||||
assert normalize_input("E\u200bVaL") == "eval"
|
||||
|
||||
|
||||
class TestPythonSecurityAnalyzer:
|
||||
def test_detects_eval_call(self):
|
||||
code = "eval('1+1')"
|
||||
findings = analyze_python_ast(code, "test.py")
|
||||
assert any(f.pattern_id == "dynamic_eval_exec" for f in findings)
|
||||
|
||||
def test_detects_exec_call(self):
|
||||
code = "exec('print(1)')"
|
||||
findings = analyze_python_ast(code, "test.py")
|
||||
assert any(f.pattern_id == "dynamic_eval_exec" for f in findings)
|
||||
|
||||
def test_detects_compile_call(self):
|
||||
code = "compile('pass', '<string>', 'exec')"
|
||||
findings = analyze_python_ast(code, "test.py")
|
||||
assert any(f.pattern_id == "dynamic_compile" for f in findings)
|
||||
|
||||
def test_detects_getattr_dunder(self):
|
||||
code = 'getattr(os, "__import__")'
|
||||
findings = analyze_python_ast(code, "test.py")
|
||||
assert any(f.pattern_id == "getattr_dunder" for f in findings)
|
||||
|
||||
def test_detects_import_base64(self):
|
||||
code = "import base64"
|
||||
findings = analyze_python_ast(code, "test.py")
|
||||
assert any(f.pattern_id == "import_base64" for f in findings)
|
||||
|
||||
def test_no_false_positives_on_safe_code(self):
|
||||
code = "print('hello world')"
|
||||
findings = analyze_python_ast(code, "test.py")
|
||||
assert len(findings) == 0
|
||||
|
||||
|
||||
class TestV011Integration:
|
||||
def test_scan_file_catches_obfuscated_eval(self, tmp_path):
|
||||
f = tmp_path / "bad.py"
|
||||
f.write_text("e\u200bVal('1+1')")
|
||||
findings = scan_file(f, "bad.py")
|
||||
assert any("eval" in f.description.lower() for f in findings)
|
||||
|
||||
def test_scan_file_catches_dynamic_exec(self, tmp_path):
|
||||
f = tmp_path / "bad.py"
|
||||
f.write_text("exec('import os')")
|
||||
findings = scan_file(f, "bad.py")
|
||||
assert any(f.pattern_id == "dynamic_eval_exec" for f in findings)
|
||||
|
||||
def test_scan_file_catches_obfuscated_import(self, tmp_path):
|
||||
f = tmp_path / "bad.py"
|
||||
f.write_text("__import__('os')")
|
||||
findings = scan_file(f, "bad.py")
|
||||
# Should be caught by regex after normalization
|
||||
assert any("__import__" in f.description.lower() for f in findings)
|
||||
|
||||
@@ -32,6 +32,12 @@ from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import List, Set, Tuple
|
||||
|
||||
from tools.skills_guard_v011 import (
|
||||
normalize_input,
|
||||
analyze_python_ast,
|
||||
V011_OBFUSCATION_PATTERNS,
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -484,7 +490,7 @@ THREAT_PATTERNS = [
|
||||
(r'(send|post|upload|transmit)\s+.*\s+(to|at)\s+https?://',
|
||||
"send_to_url", "high", "exfiltration",
|
||||
"instructs agent to send data to a URL"),
|
||||
]
|
||||
] + V011_OBFUSCATION_PATTERNS
|
||||
|
||||
# Structural limits for skill directories
|
||||
MAX_FILE_COUNT = 50 # skills shouldn't have 50+ files
|
||||
@@ -921,7 +927,7 @@ def scan_file(file_path: Path, rel_path: str = "") -> List[Finding]:
|
||||
original_line = lines[i - 1] if i <= len(lines) else norm_line
|
||||
matched_text = original_line.strip()
|
||||
if len(matched_text) > 120:
|
||||
matched_text = matched_text[:117] + "..."
|
||||
matched_text = matched_text[:117] + '...'
|
||||
findings.append(Finding(
|
||||
pattern_id=pid,
|
||||
severity=severity,
|
||||
|
||||
186
tools/skills_guard_v011.py
Normal file
186
tools/skills_guard_v011.py
Normal file
@@ -0,0 +1,186 @@
|
||||
"""
|
||||
V-011 Skills Guard Bypass fix — Obfuscation detection.
|
||||
|
||||
Adds:
|
||||
- normalize_input() with NFKC + case folding + ZWSP removal
|
||||
- PythonSecurityAnalyzer AST visitor for dynamic execution patterns
|
||||
- Additional obfuscation threat patterns
|
||||
"""
|
||||
|
||||
import ast
|
||||
import re
|
||||
import unicodedata
|
||||
from dataclasses import dataclass
|
||||
from typing import List
|
||||
|
||||
|
||||
@dataclass
|
||||
class Finding:
|
||||
"""Minimal Finding dataclass to avoid circular import with skills_guard.py."""
|
||||
pattern_id: str
|
||||
severity: str
|
||||
category: str
|
||||
file: str
|
||||
line: int
|
||||
match: str
|
||||
description: str
|
||||
|
||||
|
||||
def normalize_input(text: str) -> str:
|
||||
"""
|
||||
Normalize text to defeat common obfuscation bypasses.
|
||||
|
||||
1. Compatibility decomposition (NFKC) — homoglyphs, compat chars
|
||||
2. Case folding — uppercase ↔ lowercase evasion
|
||||
3. Zero-width space / joiner removal
|
||||
"""
|
||||
text = unicodedata.normalize("NFKC", text)
|
||||
text = text.casefold()
|
||||
# Remove zero-width and control characters used for hiding
|
||||
zwsp_chars = "\u200b\u200c\u200d\ufeff\u2060\u180e"
|
||||
for ch in zwsp_chars:
|
||||
text = text.replace(ch, "")
|
||||
return text
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AST-based Python security analysis
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class PythonSecurityAnalyzer(ast.NodeVisitor):
|
||||
"""AST visitor that detects obfuscated/dynamic execution in Python code."""
|
||||
|
||||
def __init__(self, rel_path: str):
|
||||
self.rel_path = rel_path
|
||||
self.findings: List[Finding] = []
|
||||
self.seen = set()
|
||||
|
||||
def _add(self, pattern_id: str, severity: str, category: str, line: int, match: str, description: str):
|
||||
key = (pattern_id, line, match)
|
||||
if key in self.seen:
|
||||
return
|
||||
self.seen.add(key)
|
||||
if len(match) > 120:
|
||||
match = match[:117] + "..."
|
||||
self.findings.append(Finding(
|
||||
pattern_id=pattern_id,
|
||||
severity=severity,
|
||||
category=category,
|
||||
file=self.rel_path,
|
||||
line=line,
|
||||
match=match,
|
||||
description=description,
|
||||
))
|
||||
|
||||
def visit_Call(self, node: ast.Call):
|
||||
# Detect eval/exec/compile with non-literal args
|
||||
if isinstance(node.func, ast.Name):
|
||||
if node.func.id in ("eval", "exec"):
|
||||
self._add(
|
||||
"dynamic_eval_exec",
|
||||
"high",
|
||||
"obfuscation",
|
||||
getattr(node, "lineno", 0),
|
||||
ast.dump(node)[:120],
|
||||
f"Dynamic {node.func.id}() call detected (possible obfuscation)",
|
||||
)
|
||||
elif node.func.id == "compile":
|
||||
self._add(
|
||||
"dynamic_compile",
|
||||
"high",
|
||||
"obfuscation",
|
||||
getattr(node, "lineno", 0),
|
||||
ast.dump(node)[:120],
|
||||
"compile() call detected (possible code obfuscation)",
|
||||
)
|
||||
elif node.func.id == "getattr" and len(node.args) >= 2:
|
||||
# getattr(..., "__import__") or similar
|
||||
if isinstance(node.args[1], ast.Constant) and isinstance(node.args[1].value, str):
|
||||
if node.args[1].value.startswith("__") and node.args[1].value.endswith("__"):
|
||||
self._add(
|
||||
"getattr_dunder",
|
||||
"high",
|
||||
"obfuscation",
|
||||
getattr(node, "lineno", 0),
|
||||
f'getattr(..., "{node.args[1].value}")',
|
||||
"getattr used to access dunder attribute (possible sandbox escape)",
|
||||
)
|
||||
elif isinstance(node.func, ast.Attribute):
|
||||
if node.func.attr in ("__import__", "_import"):
|
||||
self._add(
|
||||
"dynamic_import",
|
||||
"high",
|
||||
"obfuscation",
|
||||
getattr(node, "lineno", 0),
|
||||
ast.dump(node)[:120],
|
||||
"Dynamic __import__ invocation detected",
|
||||
)
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_Import(self, node: ast.Import):
|
||||
# Detect import of known obfuscation modules
|
||||
obf_modules = {"base64", "codecs", "marshal", "types", "ctypes"}
|
||||
for alias in node.names:
|
||||
if alias.name in obf_modules:
|
||||
self._add(
|
||||
f"import_{alias.name}",
|
||||
"medium",
|
||||
"obfuscation",
|
||||
getattr(node, "lineno", 0),
|
||||
f"import {alias.name}",
|
||||
f"{alias.name} import (possible encoding/obfuscation helper)",
|
||||
)
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_ImportFrom(self, node: ast.ImportFrom):
|
||||
obf_modules = {"base64", "codecs", "marshal", "types", "ctypes"}
|
||||
if node.module in obf_modules:
|
||||
self._add(
|
||||
f"import_from_{node.module}",
|
||||
"medium",
|
||||
"obfuscation",
|
||||
getattr(node, "lineno", 0),
|
||||
f"from {node.module} import ...",
|
||||
f"{node.module} import (possible encoding/obfuscation helper)",
|
||||
)
|
||||
self.generic_visit(node)
|
||||
|
||||
|
||||
def analyze_python_ast(content: str, rel_path: str) -> List[Finding]:
|
||||
"""Run AST analysis on Python content and return findings."""
|
||||
try:
|
||||
tree = ast.parse(content)
|
||||
except SyntaxError:
|
||||
return []
|
||||
analyzer = PythonSecurityAnalyzer(rel_path)
|
||||
analyzer.visit(tree)
|
||||
return analyzer.findings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Additional obfuscation patterns for regex scanning
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
V011_OBFUSCATION_PATTERNS = [
|
||||
# getattr/__import__ chains as strings
|
||||
(r'getattr\s*\([^)]*__builtins__[^)]*\)',
|
||||
"getattr_builtins_chain", "high", "obfuscation",
|
||||
"getattr chain targeting __builtins__ (sandbox escape)"),
|
||||
(r'__import__\s*\(\s*["\']os["\']',
|
||||
"dunder_import_os", "high", "obfuscation",
|
||||
"__import__ used to load os module (obfuscation)"),
|
||||
(r'__import__\s*\(\s*["\']subprocess["\']',
|
||||
"dunder_import_subprocess", "high", "obfuscation",
|
||||
"__import__ used to load subprocess module (obfuscation)"),
|
||||
# exec/eval with obfuscated wrappers
|
||||
(r'\beval\s*\(\s*\+',
|
||||
"eval_plus_concat", "high", "obfuscation",
|
||||
"eval with string concatenation (obfuscation)"),
|
||||
(r'\bexec\s*\(\s*\+',
|
||||
"exec_plus_concat", "high", "obfuscation",
|
||||
"exec with string concatenation (obfuscation)"),
|
||||
# Base64/hex dynamic execution
|
||||
(r'base64\.(b64decode|decode)\s*\([^)]*\)\s*\)\s*\)',
|
||||
"base64_nested_decode", "high", "obfuscation",
|
||||
"nested base64 decode followed by execution (obfuscation)"),
|
||||
]
|
||||
Reference in New Issue
Block a user