Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
45679eef8a |
288
agent/gemma4_tool_hardening.py
Normal file
288
agent/gemma4_tool_hardening.py
Normal file
@@ -0,0 +1,288 @@
|
||||
"""Gemma 4 tool calling hardening — parse, validate, benchmark.
|
||||
|
||||
Gemma 4 has native multimodal function calling but its output format
|
||||
may differ from OpenAI/Claude. This module provides:
|
||||
|
||||
1. Gemma4ToolParser — robust parsing for Gemma 4's tool call format
|
||||
2. Parallel tool call detection and splitting
|
||||
3. Tool call success rate tracking and benchmarking
|
||||
4. Fallback parsing strategies for malformed output
|
||||
|
||||
Usage:
|
||||
from agent.gemma4_tool_hardening import Gemma4ToolParser
|
||||
parser = Gemma4ToolParser()
|
||||
tool_calls = parser.parse(response_text)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolCallAttempt:
|
||||
"""Record of a single tool call parsing attempt."""
|
||||
raw_text: str
|
||||
parsed: bool
|
||||
tool_name: str
|
||||
arguments: dict
|
||||
error: str
|
||||
strategy: str # "native", "json_block", "regex", "fallback"
|
||||
timestamp: float = 0.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class Gemma4BenchmarkResult:
|
||||
"""Result of a tool calling benchmark run."""
|
||||
total_calls: int = 0
|
||||
successful_parses: int = 0
|
||||
parallel_calls: int = 0
|
||||
strategies_used: Dict[str, int] = field(default_factory=dict)
|
||||
avg_parse_time_ms: float = 0.0
|
||||
success_rate: float = 0.0
|
||||
errors: List[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"total_calls": self.total_calls,
|
||||
"successful_parses": self.successful_parses,
|
||||
"parallel_calls": self.parallel_calls,
|
||||
"success_rate": round(self.success_rate, 3),
|
||||
"strategies_used": self.strategies_used,
|
||||
"avg_parse_time_ms": round(self.avg_parse_time_ms, 2),
|
||||
"error_count": len(self.errors),
|
||||
"errors": self.errors[:10],
|
||||
}
|
||||
|
||||
|
||||
class Gemma4ToolParser:
|
||||
"""Robust tool call parser for Gemma 4 output format.
|
||||
|
||||
Tries multiple parsing strategies in order:
|
||||
1. Native OpenAI format (standard tool_calls)
|
||||
2. JSON code blocks (```json ... ```)
|
||||
3. Regex extraction (function_name + arguments patterns)
|
||||
4. Heuristic fallback (best-effort extraction)
|
||||
"""
|
||||
|
||||
# Patterns for Gemma 4 tool call formats
|
||||
_JSON_BLOCK_PATTERN = re.compile(
|
||||
r'```(?:json)?\s*\n?(.*?)\n?```',
|
||||
re.DOTALL | re.IGNORECASE,
|
||||
)
|
||||
_FUNCTION_CALL_PATTERN = re.compile(
|
||||
r'(?:function|tool|call)[:\s]*(\w+)\s*\(\s*({.*?})\s*\)',
|
||||
re.DOTALL | re.IGNORECASE,
|
||||
)
|
||||
_GEMMA_INLINE_PATTERN = re.compile(
|
||||
r'\[(?:tool_call|function_call)\]\s*(\w+)\s*:\s*({.*?})',
|
||||
re.DOTALL | re.IGNORECASE,
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
self._attempts: List[ToolCallAttempt] = []
|
||||
self._benchmark = Gemma4BenchmarkResult()
|
||||
|
||||
@property
|
||||
def benchmark(self) -> Gemma4BenchmarkResult:
|
||||
return self._benchmark
|
||||
|
||||
def parse(self, response_text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Parse tool calls from model response using multiple strategies.
|
||||
|
||||
Returns list of tool call dicts in OpenAI format:
|
||||
[{"id": "...", "type": "function", "function": {"name": "...", "arguments": "..."}}]
|
||||
"""
|
||||
t0 = time.monotonic()
|
||||
self._benchmark.total_calls += 1
|
||||
|
||||
# Strategy 1: Native OpenAI format
|
||||
result = self._try_native_parse(response_text)
|
||||
if result:
|
||||
self._record_attempt(response_text, True, result, "native")
|
||||
self._benchmark.successful_parses += 1
|
||||
if len(result) > 1:
|
||||
self._benchmark.parallel_calls += 1
|
||||
self._benchmark.strategies_used["native"] = self._benchmark.strategies_used.get("native", 0) + 1
|
||||
self._update_timing(t0)
|
||||
return result
|
||||
|
||||
# Strategy 2: JSON code blocks
|
||||
result = self._try_json_block_parse(response_text, expected_tools)
|
||||
if result:
|
||||
self._record_attempt(response_text, True, result, "json_block")
|
||||
self._benchmark.successful_parses += 1
|
||||
if len(result) > 1:
|
||||
self._benchmark.parallel_calls += 1
|
||||
self._benchmark.strategies_used["json_block"] = self._benchmark.strategies_used.get("json_block", 0) + 1
|
||||
self._update_timing(t0)
|
||||
return result
|
||||
|
||||
# Strategy 3: Regex extraction
|
||||
result = self._try_regex_parse(response_text)
|
||||
if result:
|
||||
self._record_attempt(response_text, True, result, "regex")
|
||||
self._benchmark.successful_parses += 1
|
||||
self._benchmark.strategies_used["regex"] = self._benchmark.strategies_used.get("regex", 0) + 1
|
||||
self._update_timing(t0)
|
||||
return result
|
||||
|
||||
# Strategy 4: Heuristic fallback
|
||||
result = self._try_heuristic_parse(response_text, expected_tools)
|
||||
if result:
|
||||
self._record_attempt(response_text, True, result, "fallback")
|
||||
self._benchmark.successful_parses += 1
|
||||
self._benchmark.strategies_used["fallback"] = self._benchmark.strategies_used.get("fallback", 0) + 1
|
||||
self._update_timing(t0)
|
||||
return result
|
||||
|
||||
# All strategies failed
|
||||
self._record_attempt(response_text, False, [], "none")
|
||||
self._benchmark.errors.append(f"Failed to parse: {response_text[:200]}")
|
||||
self._update_timing(t0)
|
||||
return []
|
||||
|
||||
def _try_native_parse(self, text: str) -> List[Dict[str, Any]]:
|
||||
"""Try parsing standard OpenAI tool_calls JSON."""
|
||||
try:
|
||||
data = json.loads(text)
|
||||
if isinstance(data, dict) and "tool_calls" in data:
|
||||
return data["tool_calls"]
|
||||
if isinstance(data, list):
|
||||
if all(isinstance(item, dict) and "function" in item for item in data):
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
return []
|
||||
|
||||
def _try_json_block_parse(self, text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Extract tool calls from JSON code blocks."""
|
||||
matches = self._JSON_BLOCK_PATTERN.findall(text)
|
||||
calls = []
|
||||
for match in matches:
|
||||
try:
|
||||
data = json.loads(match.strip())
|
||||
if isinstance(data, dict):
|
||||
if "name" in data and "arguments" in data:
|
||||
calls.append(self._to_openai_format(data["name"], data["arguments"]))
|
||||
elif "function" in data and "arguments" in data:
|
||||
calls.append(self._to_openai_format(data["function"], data["arguments"]))
|
||||
elif isinstance(data, list):
|
||||
for item in data:
|
||||
if isinstance(item, dict) and "name" in item:
|
||||
args = item.get("arguments", item.get("args", {}))
|
||||
calls.append(self._to_openai_format(item["name"], args))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return calls
|
||||
|
||||
def _try_regex_parse(self, text: str) -> List[Dict[str, Any]]:
|
||||
"""Extract tool calls using regex patterns."""
|
||||
calls = []
|
||||
|
||||
# Pattern: function_name({...})
|
||||
for match in self._FUNCTION_CALL_PATTERN.finditer(text):
|
||||
name = match.group(1)
|
||||
args_str = match.group(2)
|
||||
try:
|
||||
args = json.loads(args_str)
|
||||
calls.append(self._to_openai_format(name, args))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
# Pattern: [tool_call] name: {...}
|
||||
for match in self._GEMMA_INLINE_PATTERN.finditer(text):
|
||||
name = match.group(1)
|
||||
args_str = match.group(2)
|
||||
try:
|
||||
args = json.loads(args_str)
|
||||
calls.append(self._to_openai_format(name, args))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return calls
|
||||
|
||||
def _try_heuristic_parse(self, text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Best-effort heuristic extraction."""
|
||||
if not expected_tools:
|
||||
return []
|
||||
|
||||
calls = []
|
||||
for tool_name in expected_tools:
|
||||
# Look for tool name near JSON-like content
|
||||
pattern = re.compile(
|
||||
rf'{re.escape(tool_name)}\s*[\(:]\s*({{[^}}]+}})',
|
||||
re.IGNORECASE,
|
||||
)
|
||||
match = pattern.search(text)
|
||||
if match:
|
||||
try:
|
||||
args = json.loads(match.group(1))
|
||||
calls.append(self._to_openai_format(tool_name, args))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
return calls
|
||||
|
||||
def _to_openai_format(self, name: str, arguments: Any) -> Dict[str, Any]:
|
||||
"""Convert to OpenAI tool call format."""
|
||||
import uuid
|
||||
args_str = json.dumps(arguments) if isinstance(arguments, dict) else str(arguments)
|
||||
return {
|
||||
"id": f"call_{uuid.uuid4().hex[:24]}",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": name,
|
||||
"arguments": args_str,
|
||||
},
|
||||
}
|
||||
|
||||
def _record_attempt(self, text: str, success: bool, result: list, strategy: str):
|
||||
self._attempts.append(ToolCallAttempt(
|
||||
raw_text=text[:500],
|
||||
parsed=success,
|
||||
tool_name=result[0]["function"]["name"] if result else "",
|
||||
arguments={},
|
||||
error="" if success else "parse failed",
|
||||
strategy=strategy,
|
||||
timestamp=time.time(),
|
||||
))
|
||||
|
||||
def _update_timing(self, t0: float):
|
||||
elapsed = (time.monotonic() - t0) * 1000
|
||||
n = self._benchmark.total_calls
|
||||
self._benchmark.avg_parse_time_ms = (
|
||||
(self._benchmark.avg_parse_time_ms * (n - 1) + elapsed) / n
|
||||
)
|
||||
self._benchmark.success_rate = (
|
||||
self._benchmark.successful_parses / n if n > 0 else 0
|
||||
)
|
||||
|
||||
def format_report(self) -> str:
|
||||
"""Format benchmark report."""
|
||||
b = self._benchmark
|
||||
lines = [
|
||||
"Gemma 4 Tool Calling Benchmark",
|
||||
"=" * 40,
|
||||
f"Total attempts: {b.total_calls}",
|
||||
f"Successful parses: {b.successful_parses}",
|
||||
f"Success rate: {b.success_rate:.1%}",
|
||||
f"Parallel calls: {b.parallel_calls}",
|
||||
f"Avg parse time: {b.avg_parse_time_ms:.2f}ms",
|
||||
"",
|
||||
"Strategies used:",
|
||||
]
|
||||
for strategy, count in sorted(b.strategies_used.items(), key=lambda x: -x[1]):
|
||||
lines.append(f" {strategy}: {count}")
|
||||
|
||||
if b.errors:
|
||||
lines.append("")
|
||||
lines.append(f"Errors ({len(b.errors)}):")
|
||||
for err in b.errors[:5]:
|
||||
lines.append(f" {err[:100]}")
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -1,122 +0,0 @@
|
||||
"""Tests for credential redaction — Issue #839."""
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from tools.credential_redaction import (
|
||||
redact_credentials, should_auto_mask, mask_config_values,
|
||||
redact_tool_output, RedactionResult
|
||||
)
|
||||
|
||||
|
||||
class TestRedactCredentials:
|
||||
def test_openai_key(self):
|
||||
text = "API key: sk-abc123def456ghi789jkl012mno345pqr678stu901vwx"
|
||||
result = redact_credentials(text)
|
||||
assert result.was_redacted
|
||||
assert "sk-abc" not in result.text
|
||||
assert "[REDACTED" in result.text
|
||||
|
||||
def test_github_pat(self):
|
||||
text = "token: ghp_1234567890abcdefghijklmnopqrstuvwxyz"
|
||||
result = redact_credentials(text)
|
||||
assert result.was_redacted
|
||||
assert "ghp_" not in result.text
|
||||
|
||||
def test_bearer_token(self):
|
||||
text = "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
|
||||
result = redact_credentials(text)
|
||||
assert result.was_redacted
|
||||
assert "Bearer eyJ" not in result.text
|
||||
|
||||
def test_password_assignment(self):
|
||||
text = 'password: "supersecret123"'
|
||||
result = redact_credentials(text)
|
||||
assert result.was_redacted
|
||||
|
||||
def test_clean_text(self):
|
||||
text = "Hello world, no credentials here"
|
||||
result = redact_credentials(text)
|
||||
assert not result.was_redacted
|
||||
assert result.text == text
|
||||
|
||||
def test_empty_text(self):
|
||||
result = redact_credentials("")
|
||||
assert not result.was_redacted
|
||||
|
||||
|
||||
class TestShouldAutoMask:
|
||||
def test_env_file(self):
|
||||
assert should_auto_mask(".env") == True
|
||||
|
||||
def test_config_file(self):
|
||||
assert should_auto_mask("config.yaml") == True
|
||||
|
||||
def test_token_file(self):
|
||||
assert should_auto_mask("gitea_token") == True
|
||||
|
||||
def test_normal_file(self):
|
||||
assert should_auto_mask("readme.md") == False
|
||||
|
||||
|
||||
class TestMaskConfigValues:
|
||||
def test_env_api_key(self):
|
||||
text = "API_KEY=sk-abc123def456"
|
||||
result = mask_config_values(text)
|
||||
assert "sk-abc" not in result
|
||||
assert "[REDACTED]" in result
|
||||
|
||||
def test_yaml_token(self):
|
||||
text = 'token: "ghp_1234567890"'
|
||||
result = mask_config_values(text)
|
||||
assert "ghp_" not in result
|
||||
assert "[REDACTED]" in result
|
||||
|
||||
def test_preserves_structure(self):
|
||||
text = "API_KEY=secret\nOTHER=value"
|
||||
result = mask_config_values(text)
|
||||
assert "OTHER=value" in result # Non-credential preserved
|
||||
|
||||
|
||||
class TestRedactToolOutput:
|
||||
def test_string_output(self):
|
||||
output = "Result: sk-abc123def456ghi789jkl012mno345pqr678stu901vwx"
|
||||
redacted, notice = redact_tool_output("file_read", output)
|
||||
assert "sk-abc123" not in redacted
|
||||
assert notice is not None
|
||||
|
||||
def test_dict_output(self):
|
||||
output = {"content": "token: ghp_1234567890abcdefghijklmnopqrstuvwxyz"}
|
||||
redacted, notice = redact_tool_output("file_read", output)
|
||||
assert "ghp_" not in redacted["content"]
|
||||
|
||||
def test_clean_output(self):
|
||||
output = "No credentials here"
|
||||
redacted, notice = redact_tool_output("file_read", output)
|
||||
assert redacted == output
|
||||
assert notice is None
|
||||
|
||||
|
||||
class TestRedactionResult:
|
||||
def test_notice_singular(self):
|
||||
result = RedactionResult("redacted", "original", [{"pattern_name": "test"}])
|
||||
assert "1 credential pattern" in result.notice()
|
||||
|
||||
def test_notice_plural(self):
|
||||
result = RedactionResult("redacted", "original", [
|
||||
{"pattern_name": "test1"},
|
||||
{"pattern_name": "test2"},
|
||||
])
|
||||
assert "2 credential patterns" in result.notice()
|
||||
|
||||
def test_to_dict(self):
|
||||
result = RedactionResult("redacted", "original", [{"pattern_name": "test"}])
|
||||
d = result.to_dict()
|
||||
assert d["redacted"] == True
|
||||
assert d["count"] == 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
pytest.main([__file__, "-v"])
|
||||
94
tests/test_gemma4_tool_hardening.py
Normal file
94
tests/test_gemma4_tool_hardening.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""Tests for Gemma 4 tool calling hardening."""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from agent.gemma4_tool_hardening import Gemma4ToolParser, Gemma4BenchmarkResult
|
||||
|
||||
|
||||
class TestNativeParse:
|
||||
def test_standard_tool_calls(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = json.dumps({"tool_calls": [{"id": "call_1", "type": "function", "function": {"name": "read_file", "arguments": '{"path": "test.py"}'}}]})
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
assert result[0]["function"]["name"] == "read_file"
|
||||
|
||||
def test_list_format(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = json.dumps([{"id": "c1", "type": "function", "function": {"name": "terminal", "arguments": '{"command": "ls"}'}}])
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
|
||||
|
||||
class TestJsonBlockParse:
|
||||
def test_json_code_block(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = 'Here is the tool call:\n```json\n{"name": "read_file", "arguments": {"path": "test.py"}}\n```'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
assert result[0]["function"]["name"] == "read_file"
|
||||
|
||||
def test_multiple_json_blocks(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = '```json\n{"name": "read_file", "arguments": {"path": "a.py"}}\n```\n```json\n{"name": "read_file", "arguments": {"path": "b.py"}}\n```'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_list_in_json_block(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = '```json\n[{"name": "terminal", "arguments": {"command": "ls"}}]\n```'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
|
||||
|
||||
class TestRegexParse:
|
||||
def test_function_call_pattern(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = 'I will call read_file({"path": "test.py"}) now.'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
assert result[0]["function"]["name"] == "read_file"
|
||||
|
||||
def test_gemma_inline_pattern(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = '[tool_call] terminal: {"command": "pwd"}'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
|
||||
|
||||
class TestHeuristicParse:
|
||||
def test_heuristic_with_expected_tools(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = 'Calling read_file({"path": "config.yaml"}) now'
|
||||
result = parser.parse(text, expected_tools=["read_file"])
|
||||
assert len(result) == 1
|
||||
|
||||
def test_heuristic_without_expected_tools(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = 'Some text with {"key": "value"} but no tool name'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 0
|
||||
|
||||
|
||||
class TestBenchmark:
|
||||
def test_benchmark_counts(self):
|
||||
parser = Gemma4ToolParser()
|
||||
parser.parse(json.dumps({"tool_calls": [{"id": "1", "type": "function", "function": {"name": "x", "arguments": "{}"}}]}))
|
||||
parser.parse('```json\n{"name": "y", "arguments": {}}\n```')
|
||||
parser.parse('no tool call here')
|
||||
b = parser.benchmark
|
||||
assert b.total_calls == 3
|
||||
assert b.successful_parses == 2
|
||||
assert abs(b.success_rate - 2/3) < 0.01
|
||||
|
||||
def test_report_format(self):
|
||||
parser = Gemma4ToolParser()
|
||||
parser.parse(json.dumps({"tool_calls": [{"id": "1", "type": "function", "function": {"name": "x", "arguments": "{}"}}]}))
|
||||
report = parser.format_report()
|
||||
assert "Gemma 4 Tool Calling Benchmark" in report
|
||||
assert "native" in report
|
||||
@@ -1,269 +0,0 @@
|
||||
"""Credential Redaction — Poka-yoke for tool outputs.
|
||||
|
||||
Blocks silent credential exposure by redacting API keys, tokens, and
|
||||
passwords from tool outputs before they enter agent context.
|
||||
|
||||
Issue #839: Poka-yoke: Block silent credential exposure in tool outputs
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Audit log path
|
||||
_AUDIT_DIR = Path.home() / ".hermes" / "audit"
|
||||
_AUDIT_LOG = _AUDIT_DIR / "redactions.jsonl"
|
||||
|
||||
# Credential patterns — order matters (most specific first)
|
||||
_CREDENTIAL_PATTERNS = [
|
||||
# API keys
|
||||
(r'sk-[a-zA-Z0-9]{20,}', '[REDACTED: OpenAI-style API key]'),
|
||||
(r'sk-ant-[a-zA-Z0-9-]{20,}', '[REDACTED: Anthropic API key]'),
|
||||
(r'ghp_[a-zA-Z0-9]{36}', '[REDACTED: GitHub PAT]'),
|
||||
(r'gho_[a-zA-Z0-9]{36}', '[REDACTED: GitHub OAuth token]'),
|
||||
(r'github_pat_[a-zA-Z0-9_]{82}', '[REDACTED: GitHub fine-grained PAT]'),
|
||||
(r'glpat-[a-zA-Z0-9-]{20,}', '[REDACTED: GitLab PAT]'),
|
||||
(r'syt_[a-zA-Z0-9_-]{40,}', '[REDACTED: Matrix access token]'),
|
||||
(r'xoxb-[0-9]{10,}-[a-zA-Z0-9]{20,}', '[REDACTED: Slack bot token]'),
|
||||
(r'xoxp-[0-9]{10,}-[a-zA-Z0-9]{20,}', '[REDACTED: Slack user token]'),
|
||||
|
||||
# Bearer tokens
|
||||
(r'Bearer\s+[a-zA-Z0-9_.-]{20,}', '[REDACTED: Bearer token]'),
|
||||
|
||||
# Generic tokens/passwords in assignments
|
||||
(r'(?:token|api_key|api_key|secret|password|passwd|pwd)\s*[:=]\s*["\']?([a-zA-Z0-9_.-]{8,})["\']?', '[REDACTED: credential]'),
|
||||
|
||||
# Environment variable assignments
|
||||
(r'(?:export\s+)?(?:TOKEN|KEY|SECRET|PASSWORD|API_KEY)\s*=\s*["\']?([a-zA-Z0-9_.-]{8,})["\']?', '[REDACTED: env credential]'),
|
||||
|
||||
# Base64 encoded credentials (high entropy strings)
|
||||
(r'(?:authorization|auth)\s*[:=]\s*(?:basic|bearer)\s+[a-zA-Z0-9+/=]{20,}', '[REDACTED: auth header]'),
|
||||
|
||||
# AWS credentials
|
||||
(r'AKIA[0-9A-Z]{16}', '[REDACTED: AWS access key]'),
|
||||
(r'(?<![A-Z0-9])[A-Za-z0-9/+=]{40}(?![A-Z0-9])', None), # Only match near context
|
||||
|
||||
# Private keys
|
||||
(r'-----BEGIN (?:RSA |EC |OPENSSH )?PRIVATE KEY-----', '[REDACTED: private key block]'),
|
||||
]
|
||||
|
||||
|
||||
class RedactionResult:
|
||||
"""Result of credential redaction."""
|
||||
|
||||
def __init__(self, text: str, original: str, redactions: List[Dict[str, Any]]):
|
||||
self.text = text
|
||||
self.original = original
|
||||
self.redactions = redactions
|
||||
|
||||
@property
|
||||
def was_redacted(self) -> bool:
|
||||
return len(self.redactions) > 0
|
||||
|
||||
@property
|
||||
def count(self) -> int:
|
||||
return len(self.redactions)
|
||||
|
||||
def notice(self) -> str:
|
||||
"""Generate compact redaction notice."""
|
||||
if not self.was_redacted:
|
||||
return ""
|
||||
return f"[REDACTED: {self.count} credential pattern{'s' if self.count > 1 else ''} found]"
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"redacted": self.was_redacted,
|
||||
"count": self.count,
|
||||
"notice": self.notice(),
|
||||
"patterns": [r["pattern_name"] for r in self.redactions],
|
||||
}
|
||||
|
||||
|
||||
def redact_credentials(text: str, source: str = "unknown") -> RedactionResult:
|
||||
"""Redact credentials from text.
|
||||
|
||||
Args:
|
||||
text: Text to redact
|
||||
source: Source identifier for audit logging
|
||||
|
||||
Returns:
|
||||
RedactionResult with redacted text and metadata
|
||||
"""
|
||||
if not text:
|
||||
return RedactionResult(text, text, [])
|
||||
|
||||
redactions = []
|
||||
result = text
|
||||
|
||||
for pattern, replacement in _CREDENTIAL_PATTERNS:
|
||||
if replacement is None:
|
||||
continue # Skip conditional patterns
|
||||
|
||||
matches = list(re.finditer(pattern, result, re.IGNORECASE))
|
||||
for match in matches:
|
||||
redactions.append({
|
||||
"pattern_name": replacement,
|
||||
"position": match.start(),
|
||||
"length": len(match.group()),
|
||||
"source": source,
|
||||
"timestamp": time.time(),
|
||||
})
|
||||
|
||||
result = re.sub(pattern, replacement, result, flags=re.IGNORECASE)
|
||||
|
||||
redaction_result = RedactionResult(result, text, redactions)
|
||||
|
||||
# Log to audit trail
|
||||
if redaction_result.was_redacted:
|
||||
_log_redaction(redaction_result, source)
|
||||
|
||||
return redaction_result
|
||||
|
||||
|
||||
def _log_redaction(result: RedactionResult, source: str) -> None:
|
||||
"""Log redaction event to audit trail."""
|
||||
try:
|
||||
_AUDIT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
entry = {
|
||||
"timestamp": time.time(),
|
||||
"source": source,
|
||||
"count": result.count,
|
||||
"patterns": [r["pattern_name"] for r in result.redactions],
|
||||
}
|
||||
with open(_AUDIT_LOG, "a") as f:
|
||||
f.write(json.dumps(entry) + "\n")
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to log redaction: {e}")
|
||||
|
||||
|
||||
def should_auto_mask(file_path: str) -> bool:
|
||||
"""Check if file should have credentials auto-masked."""
|
||||
path_lower = file_path.lower()
|
||||
sensitive_patterns = [
|
||||
".env", "config", "token", "secret", "credential",
|
||||
"key", "auth", "password", ".pem", ".key",
|
||||
]
|
||||
return any(p in path_lower for p in sensitive_patterns)
|
||||
|
||||
|
||||
def mask_config_values(text: str) -> str:
|
||||
"""Mask credential values in config/env files while preserving structure.
|
||||
|
||||
Transforms:
|
||||
API_KEY=sk-abc123 → API_KEY=[REDACTED]
|
||||
token: "ghp_xyz" → token: "[REDACTED]"
|
||||
"""
|
||||
lines = text.split("\n")
|
||||
result = []
|
||||
|
||||
for line in lines:
|
||||
# Match KEY=VALUE patterns
|
||||
match = re.match(r'^(\s*(?:export\s+)?[A-Z_][A-Z0-9_]*)\s*=\s*(.*)', line)
|
||||
if match:
|
||||
key = match.group(1)
|
||||
value = match.group(2).strip()
|
||||
|
||||
# Check if key looks credential-like
|
||||
key_lower = key.lower()
|
||||
if any(p in key_lower for p in ["key", "token", "secret", "password", "auth"]):
|
||||
if value and not value.startswith("[REDACTED]"):
|
||||
# Preserve quotes
|
||||
if value.startswith('"') and value.endswith('"'):
|
||||
result.append(f'{key}="[REDACTED]"')
|
||||
elif value.startswith("'") and value.endswith("'"):
|
||||
result.append(f"{key}='[REDACTED]'")
|
||||
else:
|
||||
result.append(f"{key}=[REDACTED]")
|
||||
continue
|
||||
|
||||
# Match YAML-style key: value
|
||||
match = re.match(r'^(\s*[a-z_][a-z0-9_]*)\s*:\s*["\']?(.*?)["\']?\s*$', line)
|
||||
if match:
|
||||
key = match.group(1)
|
||||
value = match.group(2).strip()
|
||||
|
||||
key_lower = key.lower()
|
||||
if any(p in key_lower for p in ["key", "token", "secret", "password", "auth"]):
|
||||
if value and not value.startswith("[REDACTED]"):
|
||||
result.append(f'{key}: "[REDACTED]"')
|
||||
continue
|
||||
|
||||
result.append(line)
|
||||
|
||||
return "\n".join(result)
|
||||
|
||||
|
||||
def redact_tool_output(
|
||||
tool_name: str,
|
||||
output: Any,
|
||||
source: str = None,
|
||||
) -> Tuple[Any, Optional[str]]:
|
||||
"""Redact credentials from tool output.
|
||||
|
||||
Args:
|
||||
tool_name: Name of the tool
|
||||
output: Tool output (string or dict)
|
||||
source: Source identifier (defaults to tool_name)
|
||||
|
||||
Returns:
|
||||
Tuple of (redacted_output, notice)
|
||||
"""
|
||||
source = source or tool_name
|
||||
|
||||
if isinstance(output, str):
|
||||
result = redact_credentials(output, source)
|
||||
if result.was_redacted:
|
||||
return result.text, result.notice()
|
||||
return output, None
|
||||
|
||||
if isinstance(output, dict):
|
||||
# Redact string values in dict
|
||||
redacted = {}
|
||||
notices = []
|
||||
for key, value in output.items():
|
||||
if isinstance(value, str):
|
||||
r, n = redact_tool_output(tool_name, value, f"{source}.{key}")
|
||||
redacted[key] = r
|
||||
if n:
|
||||
notices.append(n)
|
||||
else:
|
||||
redacted[key] = value
|
||||
|
||||
notice = "; ".join(notices) if notices else None
|
||||
return redacted, notice
|
||||
|
||||
# Non-string, non-dict: pass through
|
||||
return output, None
|
||||
|
||||
|
||||
def get_redaction_stats() -> Dict[str, Any]:
|
||||
"""Get redaction statistics from audit log."""
|
||||
stats = {
|
||||
"total_redactions": 0,
|
||||
"by_source": {},
|
||||
"by_pattern": {},
|
||||
}
|
||||
|
||||
if not _AUDIT_LOG.exists():
|
||||
return stats
|
||||
|
||||
try:
|
||||
with open(_AUDIT_LOG, "r") as f:
|
||||
for line in f:
|
||||
entry = json.loads(line.strip())
|
||||
stats["total_redactions"] += entry.get("count", 0)
|
||||
|
||||
source = entry.get("source", "unknown")
|
||||
stats["by_source"][source] = stats["by_source"].get(source, 0) + 1
|
||||
|
||||
for pattern in entry.get("patterns", []):
|
||||
stats["by_pattern"][pattern] = stats["by_pattern"].get(pattern, 0) + 1
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return stats
|
||||
Reference in New Issue
Block a user