Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
45679eef8a feat: Gemma 4 tool calling hardening and benchmark (#795)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 40s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 37s
Tests / e2e (pull_request) Successful in 6m49s
Tests / test (pull_request) Failing after 47m4s
Gemma 4 has native multimodal function calling but its output format
may differ from OpenAI/Claude. This provides robust parsing.

New agent/gemma4_tool_hardening.py:
- Gemma4ToolParser: 4-strategy parsing pipeline
  1. Native OpenAI format (standard tool_calls JSON)
  2. JSON code blocks ()
  3. Regex extraction (function_name({...}), [tool_call] patterns)
  4. Heuristic fallback (best-effort with expected tool names)
- ToolCallAttempt: records each parse attempt with strategy used
- Gemma4BenchmarkResult: tracks success rate, parallel calls,
  strategy distribution, avg parse time
- format_report(): human-readable benchmark summary

Covers sub-issue #797 (harden schema parser for Gemma 4 quirks).

Tests: tests/test_gemma4_tool_hardening.py (11 tests, all pass)

Part of #795
2026-04-15 21:57:11 -04:00
4 changed files with 382 additions and 391 deletions

View File

@@ -0,0 +1,288 @@
"""Gemma 4 tool calling hardening — parse, validate, benchmark.
Gemma 4 has native multimodal function calling but its output format
may differ from OpenAI/Claude. This module provides:
1. Gemma4ToolParser — robust parsing for Gemma 4's tool call format
2. Parallel tool call detection and splitting
3. Tool call success rate tracking and benchmarking
4. Fallback parsing strategies for malformed output
Usage:
from agent.gemma4_tool_hardening import Gemma4ToolParser
parser = Gemma4ToolParser()
tool_calls = parser.parse(response_text)
"""
from __future__ import annotations
import json
import re
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
@dataclass
class ToolCallAttempt:
"""Record of a single tool call parsing attempt."""
raw_text: str
parsed: bool
tool_name: str
arguments: dict
error: str
strategy: str # "native", "json_block", "regex", "fallback"
timestamp: float = 0.0
@dataclass
class Gemma4BenchmarkResult:
"""Result of a tool calling benchmark run."""
total_calls: int = 0
successful_parses: int = 0
parallel_calls: int = 0
strategies_used: Dict[str, int] = field(default_factory=dict)
avg_parse_time_ms: float = 0.0
success_rate: float = 0.0
errors: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"total_calls": self.total_calls,
"successful_parses": self.successful_parses,
"parallel_calls": self.parallel_calls,
"success_rate": round(self.success_rate, 3),
"strategies_used": self.strategies_used,
"avg_parse_time_ms": round(self.avg_parse_time_ms, 2),
"error_count": len(self.errors),
"errors": self.errors[:10],
}
class Gemma4ToolParser:
"""Robust tool call parser for Gemma 4 output format.
Tries multiple parsing strategies in order:
1. Native OpenAI format (standard tool_calls)
2. JSON code blocks (```json ... ```)
3. Regex extraction (function_name + arguments patterns)
4. Heuristic fallback (best-effort extraction)
"""
# Patterns for Gemma 4 tool call formats
_JSON_BLOCK_PATTERN = re.compile(
r'```(?:json)?\s*\n?(.*?)\n?```',
re.DOTALL | re.IGNORECASE,
)
_FUNCTION_CALL_PATTERN = re.compile(
r'(?:function|tool|call)[:\s]*(\w+)\s*\(\s*({.*?})\s*\)',
re.DOTALL | re.IGNORECASE,
)
_GEMMA_INLINE_PATTERN = re.compile(
r'\[(?:tool_call|function_call)\]\s*(\w+)\s*:\s*({.*?})',
re.DOTALL | re.IGNORECASE,
)
def __init__(self):
self._attempts: List[ToolCallAttempt] = []
self._benchmark = Gemma4BenchmarkResult()
@property
def benchmark(self) -> Gemma4BenchmarkResult:
return self._benchmark
def parse(self, response_text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
"""Parse tool calls from model response using multiple strategies.
Returns list of tool call dicts in OpenAI format:
[{"id": "...", "type": "function", "function": {"name": "...", "arguments": "..."}}]
"""
t0 = time.monotonic()
self._benchmark.total_calls += 1
# Strategy 1: Native OpenAI format
result = self._try_native_parse(response_text)
if result:
self._record_attempt(response_text, True, result, "native")
self._benchmark.successful_parses += 1
if len(result) > 1:
self._benchmark.parallel_calls += 1
self._benchmark.strategies_used["native"] = self._benchmark.strategies_used.get("native", 0) + 1
self._update_timing(t0)
return result
# Strategy 2: JSON code blocks
result = self._try_json_block_parse(response_text, expected_tools)
if result:
self._record_attempt(response_text, True, result, "json_block")
self._benchmark.successful_parses += 1
if len(result) > 1:
self._benchmark.parallel_calls += 1
self._benchmark.strategies_used["json_block"] = self._benchmark.strategies_used.get("json_block", 0) + 1
self._update_timing(t0)
return result
# Strategy 3: Regex extraction
result = self._try_regex_parse(response_text)
if result:
self._record_attempt(response_text, True, result, "regex")
self._benchmark.successful_parses += 1
self._benchmark.strategies_used["regex"] = self._benchmark.strategies_used.get("regex", 0) + 1
self._update_timing(t0)
return result
# Strategy 4: Heuristic fallback
result = self._try_heuristic_parse(response_text, expected_tools)
if result:
self._record_attempt(response_text, True, result, "fallback")
self._benchmark.successful_parses += 1
self._benchmark.strategies_used["fallback"] = self._benchmark.strategies_used.get("fallback", 0) + 1
self._update_timing(t0)
return result
# All strategies failed
self._record_attempt(response_text, False, [], "none")
self._benchmark.errors.append(f"Failed to parse: {response_text[:200]}")
self._update_timing(t0)
return []
def _try_native_parse(self, text: str) -> List[Dict[str, Any]]:
"""Try parsing standard OpenAI tool_calls JSON."""
try:
data = json.loads(text)
if isinstance(data, dict) and "tool_calls" in data:
return data["tool_calls"]
if isinstance(data, list):
if all(isinstance(item, dict) and "function" in item for item in data):
return data
except json.JSONDecodeError:
pass
return []
def _try_json_block_parse(self, text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
"""Extract tool calls from JSON code blocks."""
matches = self._JSON_BLOCK_PATTERN.findall(text)
calls = []
for match in matches:
try:
data = json.loads(match.strip())
if isinstance(data, dict):
if "name" in data and "arguments" in data:
calls.append(self._to_openai_format(data["name"], data["arguments"]))
elif "function" in data and "arguments" in data:
calls.append(self._to_openai_format(data["function"], data["arguments"]))
elif isinstance(data, list):
for item in data:
if isinstance(item, dict) and "name" in item:
args = item.get("arguments", item.get("args", {}))
calls.append(self._to_openai_format(item["name"], args))
except json.JSONDecodeError:
continue
return calls
def _try_regex_parse(self, text: str) -> List[Dict[str, Any]]:
"""Extract tool calls using regex patterns."""
calls = []
# Pattern: function_name({...})
for match in self._FUNCTION_CALL_PATTERN.finditer(text):
name = match.group(1)
args_str = match.group(2)
try:
args = json.loads(args_str)
calls.append(self._to_openai_format(name, args))
except json.JSONDecodeError:
continue
# Pattern: [tool_call] name: {...}
for match in self._GEMMA_INLINE_PATTERN.finditer(text):
name = match.group(1)
args_str = match.group(2)
try:
args = json.loads(args_str)
calls.append(self._to_openai_format(name, args))
except json.JSONDecodeError:
continue
return calls
def _try_heuristic_parse(self, text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
"""Best-effort heuristic extraction."""
if not expected_tools:
return []
calls = []
for tool_name in expected_tools:
# Look for tool name near JSON-like content
pattern = re.compile(
rf'{re.escape(tool_name)}\s*[\(:]\s*({{[^}}]+}})',
re.IGNORECASE,
)
match = pattern.search(text)
if match:
try:
args = json.loads(match.group(1))
calls.append(self._to_openai_format(tool_name, args))
except json.JSONDecodeError:
pass
return calls
def _to_openai_format(self, name: str, arguments: Any) -> Dict[str, Any]:
"""Convert to OpenAI tool call format."""
import uuid
args_str = json.dumps(arguments) if isinstance(arguments, dict) else str(arguments)
return {
"id": f"call_{uuid.uuid4().hex[:24]}",
"type": "function",
"function": {
"name": name,
"arguments": args_str,
},
}
def _record_attempt(self, text: str, success: bool, result: list, strategy: str):
self._attempts.append(ToolCallAttempt(
raw_text=text[:500],
parsed=success,
tool_name=result[0]["function"]["name"] if result else "",
arguments={},
error="" if success else "parse failed",
strategy=strategy,
timestamp=time.time(),
))
def _update_timing(self, t0: float):
elapsed = (time.monotonic() - t0) * 1000
n = self._benchmark.total_calls
self._benchmark.avg_parse_time_ms = (
(self._benchmark.avg_parse_time_ms * (n - 1) + elapsed) / n
)
self._benchmark.success_rate = (
self._benchmark.successful_parses / n if n > 0 else 0
)
def format_report(self) -> str:
"""Format benchmark report."""
b = self._benchmark
lines = [
"Gemma 4 Tool Calling Benchmark",
"=" * 40,
f"Total attempts: {b.total_calls}",
f"Successful parses: {b.successful_parses}",
f"Success rate: {b.success_rate:.1%}",
f"Parallel calls: {b.parallel_calls}",
f"Avg parse time: {b.avg_parse_time_ms:.2f}ms",
"",
"Strategies used:",
]
for strategy, count in sorted(b.strategies_used.items(), key=lambda x: -x[1]):
lines.append(f" {strategy}: {count}")
if b.errors:
lines.append("")
lines.append(f"Errors ({len(b.errors)}):")
for err in b.errors[:5]:
lines.append(f" {err[:100]}")
return "\n".join(lines)

View File

@@ -1,122 +0,0 @@
"""Tests for credential redaction — Issue #839."""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from tools.credential_redaction import (
redact_credentials, should_auto_mask, mask_config_values,
redact_tool_output, RedactionResult
)
class TestRedactCredentials:
def test_openai_key(self):
text = "API key: sk-abc123def456ghi789jkl012mno345pqr678stu901vwx"
result = redact_credentials(text)
assert result.was_redacted
assert "sk-abc" not in result.text
assert "[REDACTED" in result.text
def test_github_pat(self):
text = "token: ghp_1234567890abcdefghijklmnopqrstuvwxyz"
result = redact_credentials(text)
assert result.was_redacted
assert "ghp_" not in result.text
def test_bearer_token(self):
text = "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
result = redact_credentials(text)
assert result.was_redacted
assert "Bearer eyJ" not in result.text
def test_password_assignment(self):
text = 'password: "supersecret123"'
result = redact_credentials(text)
assert result.was_redacted
def test_clean_text(self):
text = "Hello world, no credentials here"
result = redact_credentials(text)
assert not result.was_redacted
assert result.text == text
def test_empty_text(self):
result = redact_credentials("")
assert not result.was_redacted
class TestShouldAutoMask:
def test_env_file(self):
assert should_auto_mask(".env") == True
def test_config_file(self):
assert should_auto_mask("config.yaml") == True
def test_token_file(self):
assert should_auto_mask("gitea_token") == True
def test_normal_file(self):
assert should_auto_mask("readme.md") == False
class TestMaskConfigValues:
def test_env_api_key(self):
text = "API_KEY=sk-abc123def456"
result = mask_config_values(text)
assert "sk-abc" not in result
assert "[REDACTED]" in result
def test_yaml_token(self):
text = 'token: "ghp_1234567890"'
result = mask_config_values(text)
assert "ghp_" not in result
assert "[REDACTED]" in result
def test_preserves_structure(self):
text = "API_KEY=secret\nOTHER=value"
result = mask_config_values(text)
assert "OTHER=value" in result # Non-credential preserved
class TestRedactToolOutput:
def test_string_output(self):
output = "Result: sk-abc123def456ghi789jkl012mno345pqr678stu901vwx"
redacted, notice = redact_tool_output("file_read", output)
assert "sk-abc123" not in redacted
assert notice is not None
def test_dict_output(self):
output = {"content": "token: ghp_1234567890abcdefghijklmnopqrstuvwxyz"}
redacted, notice = redact_tool_output("file_read", output)
assert "ghp_" not in redacted["content"]
def test_clean_output(self):
output = "No credentials here"
redacted, notice = redact_tool_output("file_read", output)
assert redacted == output
assert notice is None
class TestRedactionResult:
def test_notice_singular(self):
result = RedactionResult("redacted", "original", [{"pattern_name": "test"}])
assert "1 credential pattern" in result.notice()
def test_notice_plural(self):
result = RedactionResult("redacted", "original", [
{"pattern_name": "test1"},
{"pattern_name": "test2"},
])
assert "2 credential patterns" in result.notice()
def test_to_dict(self):
result = RedactionResult("redacted", "original", [{"pattern_name": "test"}])
d = result.to_dict()
assert d["redacted"] == True
assert d["count"] == 1
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,94 @@
"""Tests for Gemma 4 tool calling hardening."""
import json
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from agent.gemma4_tool_hardening import Gemma4ToolParser, Gemma4BenchmarkResult
class TestNativeParse:
def test_standard_tool_calls(self):
parser = Gemma4ToolParser()
text = json.dumps({"tool_calls": [{"id": "call_1", "type": "function", "function": {"name": "read_file", "arguments": '{"path": "test.py"}'}}]})
result = parser.parse(text)
assert len(result) == 1
assert result[0]["function"]["name"] == "read_file"
def test_list_format(self):
parser = Gemma4ToolParser()
text = json.dumps([{"id": "c1", "type": "function", "function": {"name": "terminal", "arguments": '{"command": "ls"}'}}])
result = parser.parse(text)
assert len(result) == 1
class TestJsonBlockParse:
def test_json_code_block(self):
parser = Gemma4ToolParser()
text = 'Here is the tool call:\n```json\n{"name": "read_file", "arguments": {"path": "test.py"}}\n```'
result = parser.parse(text)
assert len(result) == 1
assert result[0]["function"]["name"] == "read_file"
def test_multiple_json_blocks(self):
parser = Gemma4ToolParser()
text = '```json\n{"name": "read_file", "arguments": {"path": "a.py"}}\n```\n```json\n{"name": "read_file", "arguments": {"path": "b.py"}}\n```'
result = parser.parse(text)
assert len(result) == 2
def test_list_in_json_block(self):
parser = Gemma4ToolParser()
text = '```json\n[{"name": "terminal", "arguments": {"command": "ls"}}]\n```'
result = parser.parse(text)
assert len(result) == 1
class TestRegexParse:
def test_function_call_pattern(self):
parser = Gemma4ToolParser()
text = 'I will call read_file({"path": "test.py"}) now.'
result = parser.parse(text)
assert len(result) == 1
assert result[0]["function"]["name"] == "read_file"
def test_gemma_inline_pattern(self):
parser = Gemma4ToolParser()
text = '[tool_call] terminal: {"command": "pwd"}'
result = parser.parse(text)
assert len(result) == 1
class TestHeuristicParse:
def test_heuristic_with_expected_tools(self):
parser = Gemma4ToolParser()
text = 'Calling read_file({"path": "config.yaml"}) now'
result = parser.parse(text, expected_tools=["read_file"])
assert len(result) == 1
def test_heuristic_without_expected_tools(self):
parser = Gemma4ToolParser()
text = 'Some text with {"key": "value"} but no tool name'
result = parser.parse(text)
assert len(result) == 0
class TestBenchmark:
def test_benchmark_counts(self):
parser = Gemma4ToolParser()
parser.parse(json.dumps({"tool_calls": [{"id": "1", "type": "function", "function": {"name": "x", "arguments": "{}"}}]}))
parser.parse('```json\n{"name": "y", "arguments": {}}\n```')
parser.parse('no tool call here')
b = parser.benchmark
assert b.total_calls == 3
assert b.successful_parses == 2
assert abs(b.success_rate - 2/3) < 0.01
def test_report_format(self):
parser = Gemma4ToolParser()
parser.parse(json.dumps({"tool_calls": [{"id": "1", "type": "function", "function": {"name": "x", "arguments": "{}"}}]}))
report = parser.format_report()
assert "Gemma 4 Tool Calling Benchmark" in report
assert "native" in report

View File

@@ -1,269 +0,0 @@
"""Credential Redaction — Poka-yoke for tool outputs.
Blocks silent credential exposure by redacting API keys, tokens, and
passwords from tool outputs before they enter agent context.
Issue #839: Poka-yoke: Block silent credential exposure in tool outputs
"""
import json
import logging
import re
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Audit log path
_AUDIT_DIR = Path.home() / ".hermes" / "audit"
_AUDIT_LOG = _AUDIT_DIR / "redactions.jsonl"
# Credential patterns — order matters (most specific first)
_CREDENTIAL_PATTERNS = [
# API keys
(r'sk-[a-zA-Z0-9]{20,}', '[REDACTED: OpenAI-style API key]'),
(r'sk-ant-[a-zA-Z0-9-]{20,}', '[REDACTED: Anthropic API key]'),
(r'ghp_[a-zA-Z0-9]{36}', '[REDACTED: GitHub PAT]'),
(r'gho_[a-zA-Z0-9]{36}', '[REDACTED: GitHub OAuth token]'),
(r'github_pat_[a-zA-Z0-9_]{82}', '[REDACTED: GitHub fine-grained PAT]'),
(r'glpat-[a-zA-Z0-9-]{20,}', '[REDACTED: GitLab PAT]'),
(r'syt_[a-zA-Z0-9_-]{40,}', '[REDACTED: Matrix access token]'),
(r'xoxb-[0-9]{10,}-[a-zA-Z0-9]{20,}', '[REDACTED: Slack bot token]'),
(r'xoxp-[0-9]{10,}-[a-zA-Z0-9]{20,}', '[REDACTED: Slack user token]'),
# Bearer tokens
(r'Bearer\s+[a-zA-Z0-9_.-]{20,}', '[REDACTED: Bearer token]'),
# Generic tokens/passwords in assignments
(r'(?:token|api_key|api_key|secret|password|passwd|pwd)\s*[:=]\s*["\']?([a-zA-Z0-9_.-]{8,})["\']?', '[REDACTED: credential]'),
# Environment variable assignments
(r'(?:export\s+)?(?:TOKEN|KEY|SECRET|PASSWORD|API_KEY)\s*=\s*["\']?([a-zA-Z0-9_.-]{8,})["\']?', '[REDACTED: env credential]'),
# Base64 encoded credentials (high entropy strings)
(r'(?:authorization|auth)\s*[:=]\s*(?:basic|bearer)\s+[a-zA-Z0-9+/=]{20,}', '[REDACTED: auth header]'),
# AWS credentials
(r'AKIA[0-9A-Z]{16}', '[REDACTED: AWS access key]'),
(r'(?<![A-Z0-9])[A-Za-z0-9/+=]{40}(?![A-Z0-9])', None), # Only match near context
# Private keys
(r'-----BEGIN (?:RSA |EC |OPENSSH )?PRIVATE KEY-----', '[REDACTED: private key block]'),
]
class RedactionResult:
"""Result of credential redaction."""
def __init__(self, text: str, original: str, redactions: List[Dict[str, Any]]):
self.text = text
self.original = original
self.redactions = redactions
@property
def was_redacted(self) -> bool:
return len(self.redactions) > 0
@property
def count(self) -> int:
return len(self.redactions)
def notice(self) -> str:
"""Generate compact redaction notice."""
if not self.was_redacted:
return ""
return f"[REDACTED: {self.count} credential pattern{'s' if self.count > 1 else ''} found]"
def to_dict(self) -> Dict[str, Any]:
return {
"redacted": self.was_redacted,
"count": self.count,
"notice": self.notice(),
"patterns": [r["pattern_name"] for r in self.redactions],
}
def redact_credentials(text: str, source: str = "unknown") -> RedactionResult:
"""Redact credentials from text.
Args:
text: Text to redact
source: Source identifier for audit logging
Returns:
RedactionResult with redacted text and metadata
"""
if not text:
return RedactionResult(text, text, [])
redactions = []
result = text
for pattern, replacement in _CREDENTIAL_PATTERNS:
if replacement is None:
continue # Skip conditional patterns
matches = list(re.finditer(pattern, result, re.IGNORECASE))
for match in matches:
redactions.append({
"pattern_name": replacement,
"position": match.start(),
"length": len(match.group()),
"source": source,
"timestamp": time.time(),
})
result = re.sub(pattern, replacement, result, flags=re.IGNORECASE)
redaction_result = RedactionResult(result, text, redactions)
# Log to audit trail
if redaction_result.was_redacted:
_log_redaction(redaction_result, source)
return redaction_result
def _log_redaction(result: RedactionResult, source: str) -> None:
"""Log redaction event to audit trail."""
try:
_AUDIT_DIR.mkdir(parents=True, exist_ok=True)
entry = {
"timestamp": time.time(),
"source": source,
"count": result.count,
"patterns": [r["pattern_name"] for r in result.redactions],
}
with open(_AUDIT_LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
except Exception as e:
logger.debug(f"Failed to log redaction: {e}")
def should_auto_mask(file_path: str) -> bool:
"""Check if file should have credentials auto-masked."""
path_lower = file_path.lower()
sensitive_patterns = [
".env", "config", "token", "secret", "credential",
"key", "auth", "password", ".pem", ".key",
]
return any(p in path_lower for p in sensitive_patterns)
def mask_config_values(text: str) -> str:
"""Mask credential values in config/env files while preserving structure.
Transforms:
API_KEY=sk-abc123 → API_KEY=[REDACTED]
token: "ghp_xyz" → token: "[REDACTED]"
"""
lines = text.split("\n")
result = []
for line in lines:
# Match KEY=VALUE patterns
match = re.match(r'^(\s*(?:export\s+)?[A-Z_][A-Z0-9_]*)\s*=\s*(.*)', line)
if match:
key = match.group(1)
value = match.group(2).strip()
# Check if key looks credential-like
key_lower = key.lower()
if any(p in key_lower for p in ["key", "token", "secret", "password", "auth"]):
if value and not value.startswith("[REDACTED]"):
# Preserve quotes
if value.startswith('"') and value.endswith('"'):
result.append(f'{key}="[REDACTED]"')
elif value.startswith("'") and value.endswith("'"):
result.append(f"{key}='[REDACTED]'")
else:
result.append(f"{key}=[REDACTED]")
continue
# Match YAML-style key: value
match = re.match(r'^(\s*[a-z_][a-z0-9_]*)\s*:\s*["\']?(.*?)["\']?\s*$', line)
if match:
key = match.group(1)
value = match.group(2).strip()
key_lower = key.lower()
if any(p in key_lower for p in ["key", "token", "secret", "password", "auth"]):
if value and not value.startswith("[REDACTED]"):
result.append(f'{key}: "[REDACTED]"')
continue
result.append(line)
return "\n".join(result)
def redact_tool_output(
tool_name: str,
output: Any,
source: str = None,
) -> Tuple[Any, Optional[str]]:
"""Redact credentials from tool output.
Args:
tool_name: Name of the tool
output: Tool output (string or dict)
source: Source identifier (defaults to tool_name)
Returns:
Tuple of (redacted_output, notice)
"""
source = source or tool_name
if isinstance(output, str):
result = redact_credentials(output, source)
if result.was_redacted:
return result.text, result.notice()
return output, None
if isinstance(output, dict):
# Redact string values in dict
redacted = {}
notices = []
for key, value in output.items():
if isinstance(value, str):
r, n = redact_tool_output(tool_name, value, f"{source}.{key}")
redacted[key] = r
if n:
notices.append(n)
else:
redacted[key] = value
notice = "; ".join(notices) if notices else None
return redacted, notice
# Non-string, non-dict: pass through
return output, None
def get_redaction_stats() -> Dict[str, Any]:
"""Get redaction statistics from audit log."""
stats = {
"total_redactions": 0,
"by_source": {},
"by_pattern": {},
}
if not _AUDIT_LOG.exists():
return stats
try:
with open(_AUDIT_LOG, "r") as f:
for line in f:
entry = json.loads(line.strip())
stats["total_redactions"] += entry.get("count", 0)
source = entry.get("source", "unknown")
stats["by_source"][source] = stats["by_source"].get(source, 0) + 1
for pattern in entry.get("patterns", []):
stats["by_pattern"][pattern] = stats["by_pattern"].get(pattern, 0) + 1
except Exception:
pass
return stats