Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
45679eef8a |
288
agent/gemma4_tool_hardening.py
Normal file
288
agent/gemma4_tool_hardening.py
Normal file
@@ -0,0 +1,288 @@
|
||||
"""Gemma 4 tool calling hardening — parse, validate, benchmark.
|
||||
|
||||
Gemma 4 has native multimodal function calling but its output format
|
||||
may differ from OpenAI/Claude. This module provides:
|
||||
|
||||
1. Gemma4ToolParser — robust parsing for Gemma 4's tool call format
|
||||
2. Parallel tool call detection and splitting
|
||||
3. Tool call success rate tracking and benchmarking
|
||||
4. Fallback parsing strategies for malformed output
|
||||
|
||||
Usage:
|
||||
from agent.gemma4_tool_hardening import Gemma4ToolParser
|
||||
parser = Gemma4ToolParser()
|
||||
tool_calls = parser.parse(response_text)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolCallAttempt:
|
||||
"""Record of a single tool call parsing attempt."""
|
||||
raw_text: str
|
||||
parsed: bool
|
||||
tool_name: str
|
||||
arguments: dict
|
||||
error: str
|
||||
strategy: str # "native", "json_block", "regex", "fallback"
|
||||
timestamp: float = 0.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class Gemma4BenchmarkResult:
|
||||
"""Result of a tool calling benchmark run."""
|
||||
total_calls: int = 0
|
||||
successful_parses: int = 0
|
||||
parallel_calls: int = 0
|
||||
strategies_used: Dict[str, int] = field(default_factory=dict)
|
||||
avg_parse_time_ms: float = 0.0
|
||||
success_rate: float = 0.0
|
||||
errors: List[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"total_calls": self.total_calls,
|
||||
"successful_parses": self.successful_parses,
|
||||
"parallel_calls": self.parallel_calls,
|
||||
"success_rate": round(self.success_rate, 3),
|
||||
"strategies_used": self.strategies_used,
|
||||
"avg_parse_time_ms": round(self.avg_parse_time_ms, 2),
|
||||
"error_count": len(self.errors),
|
||||
"errors": self.errors[:10],
|
||||
}
|
||||
|
||||
|
||||
class Gemma4ToolParser:
|
||||
"""Robust tool call parser for Gemma 4 output format.
|
||||
|
||||
Tries multiple parsing strategies in order:
|
||||
1. Native OpenAI format (standard tool_calls)
|
||||
2. JSON code blocks (```json ... ```)
|
||||
3. Regex extraction (function_name + arguments patterns)
|
||||
4. Heuristic fallback (best-effort extraction)
|
||||
"""
|
||||
|
||||
# Patterns for Gemma 4 tool call formats
|
||||
_JSON_BLOCK_PATTERN = re.compile(
|
||||
r'```(?:json)?\s*\n?(.*?)\n?```',
|
||||
re.DOTALL | re.IGNORECASE,
|
||||
)
|
||||
_FUNCTION_CALL_PATTERN = re.compile(
|
||||
r'(?:function|tool|call)[:\s]*(\w+)\s*\(\s*({.*?})\s*\)',
|
||||
re.DOTALL | re.IGNORECASE,
|
||||
)
|
||||
_GEMMA_INLINE_PATTERN = re.compile(
|
||||
r'\[(?:tool_call|function_call)\]\s*(\w+)\s*:\s*({.*?})',
|
||||
re.DOTALL | re.IGNORECASE,
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
self._attempts: List[ToolCallAttempt] = []
|
||||
self._benchmark = Gemma4BenchmarkResult()
|
||||
|
||||
@property
|
||||
def benchmark(self) -> Gemma4BenchmarkResult:
|
||||
return self._benchmark
|
||||
|
||||
def parse(self, response_text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Parse tool calls from model response using multiple strategies.
|
||||
|
||||
Returns list of tool call dicts in OpenAI format:
|
||||
[{"id": "...", "type": "function", "function": {"name": "...", "arguments": "..."}}]
|
||||
"""
|
||||
t0 = time.monotonic()
|
||||
self._benchmark.total_calls += 1
|
||||
|
||||
# Strategy 1: Native OpenAI format
|
||||
result = self._try_native_parse(response_text)
|
||||
if result:
|
||||
self._record_attempt(response_text, True, result, "native")
|
||||
self._benchmark.successful_parses += 1
|
||||
if len(result) > 1:
|
||||
self._benchmark.parallel_calls += 1
|
||||
self._benchmark.strategies_used["native"] = self._benchmark.strategies_used.get("native", 0) + 1
|
||||
self._update_timing(t0)
|
||||
return result
|
||||
|
||||
# Strategy 2: JSON code blocks
|
||||
result = self._try_json_block_parse(response_text, expected_tools)
|
||||
if result:
|
||||
self._record_attempt(response_text, True, result, "json_block")
|
||||
self._benchmark.successful_parses += 1
|
||||
if len(result) > 1:
|
||||
self._benchmark.parallel_calls += 1
|
||||
self._benchmark.strategies_used["json_block"] = self._benchmark.strategies_used.get("json_block", 0) + 1
|
||||
self._update_timing(t0)
|
||||
return result
|
||||
|
||||
# Strategy 3: Regex extraction
|
||||
result = self._try_regex_parse(response_text)
|
||||
if result:
|
||||
self._record_attempt(response_text, True, result, "regex")
|
||||
self._benchmark.successful_parses += 1
|
||||
self._benchmark.strategies_used["regex"] = self._benchmark.strategies_used.get("regex", 0) + 1
|
||||
self._update_timing(t0)
|
||||
return result
|
||||
|
||||
# Strategy 4: Heuristic fallback
|
||||
result = self._try_heuristic_parse(response_text, expected_tools)
|
||||
if result:
|
||||
self._record_attempt(response_text, True, result, "fallback")
|
||||
self._benchmark.successful_parses += 1
|
||||
self._benchmark.strategies_used["fallback"] = self._benchmark.strategies_used.get("fallback", 0) + 1
|
||||
self._update_timing(t0)
|
||||
return result
|
||||
|
||||
# All strategies failed
|
||||
self._record_attempt(response_text, False, [], "none")
|
||||
self._benchmark.errors.append(f"Failed to parse: {response_text[:200]}")
|
||||
self._update_timing(t0)
|
||||
return []
|
||||
|
||||
def _try_native_parse(self, text: str) -> List[Dict[str, Any]]:
|
||||
"""Try parsing standard OpenAI tool_calls JSON."""
|
||||
try:
|
||||
data = json.loads(text)
|
||||
if isinstance(data, dict) and "tool_calls" in data:
|
||||
return data["tool_calls"]
|
||||
if isinstance(data, list):
|
||||
if all(isinstance(item, dict) and "function" in item for item in data):
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
return []
|
||||
|
||||
def _try_json_block_parse(self, text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Extract tool calls from JSON code blocks."""
|
||||
matches = self._JSON_BLOCK_PATTERN.findall(text)
|
||||
calls = []
|
||||
for match in matches:
|
||||
try:
|
||||
data = json.loads(match.strip())
|
||||
if isinstance(data, dict):
|
||||
if "name" in data and "arguments" in data:
|
||||
calls.append(self._to_openai_format(data["name"], data["arguments"]))
|
||||
elif "function" in data and "arguments" in data:
|
||||
calls.append(self._to_openai_format(data["function"], data["arguments"]))
|
||||
elif isinstance(data, list):
|
||||
for item in data:
|
||||
if isinstance(item, dict) and "name" in item:
|
||||
args = item.get("arguments", item.get("args", {}))
|
||||
calls.append(self._to_openai_format(item["name"], args))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return calls
|
||||
|
||||
def _try_regex_parse(self, text: str) -> List[Dict[str, Any]]:
|
||||
"""Extract tool calls using regex patterns."""
|
||||
calls = []
|
||||
|
||||
# Pattern: function_name({...})
|
||||
for match in self._FUNCTION_CALL_PATTERN.finditer(text):
|
||||
name = match.group(1)
|
||||
args_str = match.group(2)
|
||||
try:
|
||||
args = json.loads(args_str)
|
||||
calls.append(self._to_openai_format(name, args))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
# Pattern: [tool_call] name: {...}
|
||||
for match in self._GEMMA_INLINE_PATTERN.finditer(text):
|
||||
name = match.group(1)
|
||||
args_str = match.group(2)
|
||||
try:
|
||||
args = json.loads(args_str)
|
||||
calls.append(self._to_openai_format(name, args))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return calls
|
||||
|
||||
def _try_heuristic_parse(self, text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Best-effort heuristic extraction."""
|
||||
if not expected_tools:
|
||||
return []
|
||||
|
||||
calls = []
|
||||
for tool_name in expected_tools:
|
||||
# Look for tool name near JSON-like content
|
||||
pattern = re.compile(
|
||||
rf'{re.escape(tool_name)}\s*[\(:]\s*({{[^}}]+}})',
|
||||
re.IGNORECASE,
|
||||
)
|
||||
match = pattern.search(text)
|
||||
if match:
|
||||
try:
|
||||
args = json.loads(match.group(1))
|
||||
calls.append(self._to_openai_format(tool_name, args))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
return calls
|
||||
|
||||
def _to_openai_format(self, name: str, arguments: Any) -> Dict[str, Any]:
|
||||
"""Convert to OpenAI tool call format."""
|
||||
import uuid
|
||||
args_str = json.dumps(arguments) if isinstance(arguments, dict) else str(arguments)
|
||||
return {
|
||||
"id": f"call_{uuid.uuid4().hex[:24]}",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": name,
|
||||
"arguments": args_str,
|
||||
},
|
||||
}
|
||||
|
||||
def _record_attempt(self, text: str, success: bool, result: list, strategy: str):
|
||||
self._attempts.append(ToolCallAttempt(
|
||||
raw_text=text[:500],
|
||||
parsed=success,
|
||||
tool_name=result[0]["function"]["name"] if result else "",
|
||||
arguments={},
|
||||
error="" if success else "parse failed",
|
||||
strategy=strategy,
|
||||
timestamp=time.time(),
|
||||
))
|
||||
|
||||
def _update_timing(self, t0: float):
|
||||
elapsed = (time.monotonic() - t0) * 1000
|
||||
n = self._benchmark.total_calls
|
||||
self._benchmark.avg_parse_time_ms = (
|
||||
(self._benchmark.avg_parse_time_ms * (n - 1) + elapsed) / n
|
||||
)
|
||||
self._benchmark.success_rate = (
|
||||
self._benchmark.successful_parses / n if n > 0 else 0
|
||||
)
|
||||
|
||||
def format_report(self) -> str:
|
||||
"""Format benchmark report."""
|
||||
b = self._benchmark
|
||||
lines = [
|
||||
"Gemma 4 Tool Calling Benchmark",
|
||||
"=" * 40,
|
||||
f"Total attempts: {b.total_calls}",
|
||||
f"Successful parses: {b.successful_parses}",
|
||||
f"Success rate: {b.success_rate:.1%}",
|
||||
f"Parallel calls: {b.parallel_calls}",
|
||||
f"Avg parse time: {b.avg_parse_time_ms:.2f}ms",
|
||||
"",
|
||||
"Strategies used:",
|
||||
]
|
||||
for strategy, count in sorted(b.strategies_used.items(), key=lambda x: -x[1]):
|
||||
lines.append(f" {strategy}: {count}")
|
||||
|
||||
if b.errors:
|
||||
lines.append("")
|
||||
lines.append(f"Errors ({len(b.errors)}):")
|
||||
for err in b.errors[:5]:
|
||||
lines.append(f" {err[:100]}")
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -1,302 +0,0 @@
|
||||
"""Self-Modifying Prompt Engine — agent learns from its own failures.
|
||||
|
||||
Analyzes session transcripts, identifies failure patterns, and generates
|
||||
prompt patches to prevent future failures.
|
||||
|
||||
The loop: fail → analyze → rewrite → retry → verify improvement.
|
||||
|
||||
Usage:
|
||||
from agent.self_modify import PromptLearner
|
||||
learner = PromptLearner()
|
||||
patches = learner.analyze_session(session_id)
|
||||
learner.apply_patches(patches)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
PATCHES_DIR = HERMES_HOME / "prompt_patches"
|
||||
ROLLBACK_DIR = HERMES_HOME / "prompt_rollback"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FailurePattern:
|
||||
"""A detected failure pattern in session transcripts."""
|
||||
pattern_type: str # retry_loop, timeout, error_hallucination, context_loss
|
||||
description: str
|
||||
frequency: int
|
||||
example_messages: List[str] = field(default_factory=list)
|
||||
suggested_fix: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class PromptPatch:
|
||||
"""A modification to the system prompt based on failure analysis."""
|
||||
id: str
|
||||
failure_type: str
|
||||
original_rule: str
|
||||
new_rule: str
|
||||
confidence: float
|
||||
applied_at: Optional[float] = None
|
||||
reverted: bool = False
|
||||
|
||||
|
||||
# Failure detection patterns
|
||||
FAILURE_SIGNALS = {
|
||||
"retry_loop": {
|
||||
"patterns": [
|
||||
r"(?i)retry(?:ing)?\s*(?:attempt|again)",
|
||||
r"(?i)failed.*retrying",
|
||||
r"(?i)error.*again",
|
||||
r"(?i)attempt\s+\d+\s*(?:of|/)\s*\d+",
|
||||
],
|
||||
"description": "Agent stuck in retry loop",
|
||||
},
|
||||
"timeout": {
|
||||
"patterns": [
|
||||
r"(?i)timed?\s*out",
|
||||
r"(?i)deadline\s+exceeded",
|
||||
r"(?i)took\s+(?:too\s+)?long",
|
||||
],
|
||||
"description": "Operation timed out",
|
||||
},
|
||||
"hallucination": {
|
||||
"patterns": [
|
||||
r"(?i)i\s+(?:don't|do\s+not)\s+(?:have|see|find)\s+(?:any|that|this)\s+(?:information|data|file)",
|
||||
r"(?i)the\s+file\s+doesn't\s+exist",
|
||||
r"(?i)i\s+(?:made|invented|fabricated)\s+(?:that\s+up|this)",
|
||||
],
|
||||
"description": "Agent hallucinated or fabricated information",
|
||||
},
|
||||
"context_loss": {
|
||||
"patterns": [
|
||||
r"(?i)i\s+(?:don't|do\s+not)\s+(?:remember|recall|know)\s+(?:what|where|when|how)",
|
||||
r"(?i)could\s+you\s+remind\s+me",
|
||||
r"(?i)what\s+were\s+we\s+(?:doing|working|talking)\s+(?:on|about)",
|
||||
],
|
||||
"description": "Agent lost context from earlier in conversation",
|
||||
},
|
||||
"tool_failure": {
|
||||
"patterns": [
|
||||
r"(?i)tool\s+(?:call|execution)\s+failed",
|
||||
r"(?i)command\s+not\s+found",
|
||||
r"(?i)permission\s+denied",
|
||||
r"(?i)no\s+such\s+file",
|
||||
],
|
||||
"description": "Tool execution failed",
|
||||
},
|
||||
}
|
||||
|
||||
# Prompt improvement templates
|
||||
PROMPT_FIXES = {
|
||||
"retry_loop": (
|
||||
"If an operation fails more than twice, stop retrying. "
|
||||
"Report the failure and ask the user for guidance. "
|
||||
"Do not enter retry loops — they waste tokens."
|
||||
),
|
||||
"timeout": (
|
||||
"For operations that may take long, set a timeout and report "
|
||||
"progress. If an operation takes more than 30 seconds, report "
|
||||
"what you've done so far and ask if you should continue."
|
||||
),
|
||||
"hallucination": (
|
||||
"If you cannot find information, say 'I don't know' or "
|
||||
"'I couldn't find that.' Never fabricate information. "
|
||||
"If a file doesn't exist, say so — don't guess its contents."
|
||||
),
|
||||
"context_loss": (
|
||||
"When you need context from earlier in the conversation, "
|
||||
"use session_search to find it. Don't ask the user to repeat themselves."
|
||||
),
|
||||
"tool_failure": (
|
||||
"If a tool fails, check the error message and try a different approach. "
|
||||
"Don't retry the exact same command — diagnose first."
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class PromptLearner:
|
||||
"""Analyze session transcripts and generate prompt improvements."""
|
||||
|
||||
def __init__(self):
|
||||
PATCHES_DIR.mkdir(parents=True, exist_ok=True)
|
||||
ROLLBACK_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def analyze_session(self, session_data: dict) -> List[FailurePattern]:
|
||||
"""Analyze a session for failure patterns.
|
||||
|
||||
Args:
|
||||
session_data: Session dict with 'messages' list.
|
||||
|
||||
Returns:
|
||||
List of detected failure patterns.
|
||||
"""
|
||||
messages = session_data.get("messages", [])
|
||||
patterns_found: Dict[str, FailurePattern] = {}
|
||||
|
||||
for msg in messages:
|
||||
content = str(msg.get("content", ""))
|
||||
role = msg.get("role", "")
|
||||
|
||||
# Only analyze assistant messages and tool results
|
||||
if role not in ("assistant", "tool"):
|
||||
continue
|
||||
|
||||
for failure_type, config in FAILURE_SIGNALS.items():
|
||||
for pattern in config["patterns"]:
|
||||
if re.search(pattern, content):
|
||||
if failure_type not in patterns_found:
|
||||
patterns_found[failure_type] = FailurePattern(
|
||||
pattern_type=failure_type,
|
||||
description=config["description"],
|
||||
frequency=0,
|
||||
suggested_fix=PROMPT_FIXES.get(failure_type, ""),
|
||||
)
|
||||
patterns_found[failure_type].frequency += 1
|
||||
if len(patterns_found[failure_type].example_messages) < 3:
|
||||
patterns_found[failure_type].example_messages.append(
|
||||
content[:200]
|
||||
)
|
||||
break # One match per message per type is enough
|
||||
|
||||
return list(patterns_found.values())
|
||||
|
||||
def generate_patches(self, patterns: List[FailurePattern],
|
||||
min_confidence: float = 0.7) -> List[PromptPatch]:
|
||||
"""Generate prompt patches from failure patterns.
|
||||
|
||||
Args:
|
||||
patterns: Detected failure patterns.
|
||||
min_confidence: Minimum confidence to generate a patch.
|
||||
|
||||
Returns:
|
||||
List of prompt patches.
|
||||
"""
|
||||
patches = []
|
||||
for pattern in patterns:
|
||||
# Confidence based on frequency
|
||||
if pattern.frequency >= 3:
|
||||
confidence = 0.9
|
||||
elif pattern.frequency >= 2:
|
||||
confidence = 0.75
|
||||
else:
|
||||
confidence = 0.5
|
||||
|
||||
if confidence < min_confidence:
|
||||
continue
|
||||
|
||||
if not pattern.suggested_fix:
|
||||
continue
|
||||
|
||||
patch = PromptPatch(
|
||||
id=f"{pattern.pattern_type}-{int(time.time())}",
|
||||
failure_type=pattern.pattern_type,
|
||||
original_rule="(missing — no existing rule for this pattern)",
|
||||
new_rule=pattern.suggested_fix,
|
||||
confidence=confidence,
|
||||
)
|
||||
patches.append(patch)
|
||||
|
||||
return patches
|
||||
|
||||
def apply_patches(self, patches: List[PromptPatch],
|
||||
prompt_path: Optional[str] = None) -> int:
|
||||
"""Apply patches to the system prompt.
|
||||
|
||||
Args:
|
||||
patches: Patches to apply.
|
||||
prompt_path: Path to prompt file (default: ~/.hermes/system_prompt.md)
|
||||
|
||||
Returns:
|
||||
Number of patches applied.
|
||||
"""
|
||||
if prompt_path is None:
|
||||
prompt_path = str(HERMES_HOME / "system_prompt.md")
|
||||
|
||||
prompt_file = Path(prompt_path)
|
||||
|
||||
# Backup current prompt
|
||||
if prompt_file.exists():
|
||||
backup = ROLLBACK_DIR / f"{prompt_file.name}.{int(time.time())}.bak"
|
||||
backup.write_text(prompt_file.read_text())
|
||||
|
||||
# Read current prompt
|
||||
current = prompt_file.read_text() if prompt_file.exists() else ""
|
||||
|
||||
# Apply patches
|
||||
applied = 0
|
||||
additions = []
|
||||
for patch in patches:
|
||||
if patch.new_rule not in current:
|
||||
additions.append(f"\n## Auto-learned: {patch.failure_type}\n{patch.new_rule}")
|
||||
patch.applied_at = time.time()
|
||||
applied += 1
|
||||
|
||||
if additions:
|
||||
new_content = current + "\n".join(additions)
|
||||
prompt_file.write_text(new_content)
|
||||
|
||||
# Log patches
|
||||
patches_file = PATCHES_DIR / f"patches-{int(time.time())}.json"
|
||||
with open(patches_file, "w") as f:
|
||||
json.dump([p.__dict__ for p in patches], f, indent=2, default=str)
|
||||
|
||||
logger.info("Applied %d prompt patches", applied)
|
||||
return applied
|
||||
|
||||
def rollback_last(self, prompt_path: Optional[str] = None) -> bool:
|
||||
"""Rollback to the most recent backup.
|
||||
|
||||
Args:
|
||||
prompt_path: Path to prompt file.
|
||||
|
||||
Returns:
|
||||
True if rollback succeeded.
|
||||
"""
|
||||
if prompt_path is None:
|
||||
prompt_path = str(HERMES_HOME / "system_prompt.md")
|
||||
|
||||
backups = sorted(ROLLBACK_DIR.glob("*.bak"), reverse=True)
|
||||
if not backups:
|
||||
logger.warning("No backups to rollback to")
|
||||
return False
|
||||
|
||||
latest = backups[0]
|
||||
Path(prompt_path).write_text(latest.read_text())
|
||||
logger.info("Rolled back to %s", latest.name)
|
||||
return True
|
||||
|
||||
def learn_from_session(self, session_data: dict) -> Dict[str, Any]:
|
||||
"""Full learning cycle: analyze → patch → apply.
|
||||
|
||||
Args:
|
||||
session_data: Session dict.
|
||||
|
||||
Returns:
|
||||
Summary of what was learned and applied.
|
||||
"""
|
||||
patterns = self.analyze_session(session_data)
|
||||
patches = self.generate_patches(patterns)
|
||||
applied = self.apply_patches(patches)
|
||||
|
||||
return {
|
||||
"patterns_detected": len(patterns),
|
||||
"patches_generated": len(patches),
|
||||
"patches_applied": applied,
|
||||
"patterns": [
|
||||
{"type": p.pattern_type, "frequency": p.frequency, "description": p.description}
|
||||
for p in patterns
|
||||
],
|
||||
}
|
||||
94
tests/test_gemma4_tool_hardening.py
Normal file
94
tests/test_gemma4_tool_hardening.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""Tests for Gemma 4 tool calling hardening."""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from agent.gemma4_tool_hardening import Gemma4ToolParser, Gemma4BenchmarkResult
|
||||
|
||||
|
||||
class TestNativeParse:
|
||||
def test_standard_tool_calls(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = json.dumps({"tool_calls": [{"id": "call_1", "type": "function", "function": {"name": "read_file", "arguments": '{"path": "test.py"}'}}]})
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
assert result[0]["function"]["name"] == "read_file"
|
||||
|
||||
def test_list_format(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = json.dumps([{"id": "c1", "type": "function", "function": {"name": "terminal", "arguments": '{"command": "ls"}'}}])
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
|
||||
|
||||
class TestJsonBlockParse:
|
||||
def test_json_code_block(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = 'Here is the tool call:\n```json\n{"name": "read_file", "arguments": {"path": "test.py"}}\n```'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
assert result[0]["function"]["name"] == "read_file"
|
||||
|
||||
def test_multiple_json_blocks(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = '```json\n{"name": "read_file", "arguments": {"path": "a.py"}}\n```\n```json\n{"name": "read_file", "arguments": {"path": "b.py"}}\n```'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_list_in_json_block(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = '```json\n[{"name": "terminal", "arguments": {"command": "ls"}}]\n```'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
|
||||
|
||||
class TestRegexParse:
|
||||
def test_function_call_pattern(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = 'I will call read_file({"path": "test.py"}) now.'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
assert result[0]["function"]["name"] == "read_file"
|
||||
|
||||
def test_gemma_inline_pattern(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = '[tool_call] terminal: {"command": "pwd"}'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
|
||||
|
||||
class TestHeuristicParse:
|
||||
def test_heuristic_with_expected_tools(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = 'Calling read_file({"path": "config.yaml"}) now'
|
||||
result = parser.parse(text, expected_tools=["read_file"])
|
||||
assert len(result) == 1
|
||||
|
||||
def test_heuristic_without_expected_tools(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = 'Some text with {"key": "value"} but no tool name'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 0
|
||||
|
||||
|
||||
class TestBenchmark:
|
||||
def test_benchmark_counts(self):
|
||||
parser = Gemma4ToolParser()
|
||||
parser.parse(json.dumps({"tool_calls": [{"id": "1", "type": "function", "function": {"name": "x", "arguments": "{}"}}]}))
|
||||
parser.parse('```json\n{"name": "y", "arguments": {}}\n```')
|
||||
parser.parse('no tool call here')
|
||||
b = parser.benchmark
|
||||
assert b.total_calls == 3
|
||||
assert b.successful_parses == 2
|
||||
assert abs(b.success_rate - 2/3) < 0.01
|
||||
|
||||
def test_report_format(self):
|
||||
parser = Gemma4ToolParser()
|
||||
parser.parse(json.dumps({"tool_calls": [{"id": "1", "type": "function", "function": {"name": "x", "arguments": "{}"}}]}))
|
||||
report = parser.format_report()
|
||||
assert "Gemma 4 Tool Calling Benchmark" in report
|
||||
assert "native" in report
|
||||
Reference in New Issue
Block a user