Compare commits
1 Commits
fix/813
...
fix/749-v2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f7f89e15ff |
@@ -1,302 +0,0 @@
|
||||
"""Self-Modifying Prompt Engine — agent learns from its own failures.
|
||||
|
||||
Analyzes session transcripts, identifies failure patterns, and generates
|
||||
prompt patches to prevent future failures.
|
||||
|
||||
The loop: fail → analyze → rewrite → retry → verify improvement.
|
||||
|
||||
Usage:
|
||||
from agent.self_modify import PromptLearner
|
||||
learner = PromptLearner()
|
||||
patches = learner.analyze_session(session_id)
|
||||
learner.apply_patches(patches)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
PATCHES_DIR = HERMES_HOME / "prompt_patches"
|
||||
ROLLBACK_DIR = HERMES_HOME / "prompt_rollback"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FailurePattern:
|
||||
"""A detected failure pattern in session transcripts."""
|
||||
pattern_type: str # retry_loop, timeout, error_hallucination, context_loss
|
||||
description: str
|
||||
frequency: int
|
||||
example_messages: List[str] = field(default_factory=list)
|
||||
suggested_fix: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class PromptPatch:
|
||||
"""A modification to the system prompt based on failure analysis."""
|
||||
id: str
|
||||
failure_type: str
|
||||
original_rule: str
|
||||
new_rule: str
|
||||
confidence: float
|
||||
applied_at: Optional[float] = None
|
||||
reverted: bool = False
|
||||
|
||||
|
||||
# Failure detection patterns
|
||||
FAILURE_SIGNALS = {
|
||||
"retry_loop": {
|
||||
"patterns": [
|
||||
r"(?i)retry(?:ing)?\s*(?:attempt|again)",
|
||||
r"(?i)failed.*retrying",
|
||||
r"(?i)error.*again",
|
||||
r"(?i)attempt\s+\d+\s*(?:of|/)\s*\d+",
|
||||
],
|
||||
"description": "Agent stuck in retry loop",
|
||||
},
|
||||
"timeout": {
|
||||
"patterns": [
|
||||
r"(?i)timed?\s*out",
|
||||
r"(?i)deadline\s+exceeded",
|
||||
r"(?i)took\s+(?:too\s+)?long",
|
||||
],
|
||||
"description": "Operation timed out",
|
||||
},
|
||||
"hallucination": {
|
||||
"patterns": [
|
||||
r"(?i)i\s+(?:don't|do\s+not)\s+(?:have|see|find)\s+(?:any|that|this)\s+(?:information|data|file)",
|
||||
r"(?i)the\s+file\s+doesn't\s+exist",
|
||||
r"(?i)i\s+(?:made|invented|fabricated)\s+(?:that\s+up|this)",
|
||||
],
|
||||
"description": "Agent hallucinated or fabricated information",
|
||||
},
|
||||
"context_loss": {
|
||||
"patterns": [
|
||||
r"(?i)i\s+(?:don't|do\s+not)\s+(?:remember|recall|know)\s+(?:what|where|when|how)",
|
||||
r"(?i)could\s+you\s+remind\s+me",
|
||||
r"(?i)what\s+were\s+we\s+(?:doing|working|talking)\s+(?:on|about)",
|
||||
],
|
||||
"description": "Agent lost context from earlier in conversation",
|
||||
},
|
||||
"tool_failure": {
|
||||
"patterns": [
|
||||
r"(?i)tool\s+(?:call|execution)\s+failed",
|
||||
r"(?i)command\s+not\s+found",
|
||||
r"(?i)permission\s+denied",
|
||||
r"(?i)no\s+such\s+file",
|
||||
],
|
||||
"description": "Tool execution failed",
|
||||
},
|
||||
}
|
||||
|
||||
# Prompt improvement templates
|
||||
PROMPT_FIXES = {
|
||||
"retry_loop": (
|
||||
"If an operation fails more than twice, stop retrying. "
|
||||
"Report the failure and ask the user for guidance. "
|
||||
"Do not enter retry loops — they waste tokens."
|
||||
),
|
||||
"timeout": (
|
||||
"For operations that may take long, set a timeout and report "
|
||||
"progress. If an operation takes more than 30 seconds, report "
|
||||
"what you've done so far and ask if you should continue."
|
||||
),
|
||||
"hallucination": (
|
||||
"If you cannot find information, say 'I don't know' or "
|
||||
"'I couldn't find that.' Never fabricate information. "
|
||||
"If a file doesn't exist, say so — don't guess its contents."
|
||||
),
|
||||
"context_loss": (
|
||||
"When you need context from earlier in the conversation, "
|
||||
"use session_search to find it. Don't ask the user to repeat themselves."
|
||||
),
|
||||
"tool_failure": (
|
||||
"If a tool fails, check the error message and try a different approach. "
|
||||
"Don't retry the exact same command — diagnose first."
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class PromptLearner:
|
||||
"""Analyze session transcripts and generate prompt improvements."""
|
||||
|
||||
def __init__(self):
|
||||
PATCHES_DIR.mkdir(parents=True, exist_ok=True)
|
||||
ROLLBACK_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def analyze_session(self, session_data: dict) -> List[FailurePattern]:
|
||||
"""Analyze a session for failure patterns.
|
||||
|
||||
Args:
|
||||
session_data: Session dict with 'messages' list.
|
||||
|
||||
Returns:
|
||||
List of detected failure patterns.
|
||||
"""
|
||||
messages = session_data.get("messages", [])
|
||||
patterns_found: Dict[str, FailurePattern] = {}
|
||||
|
||||
for msg in messages:
|
||||
content = str(msg.get("content", ""))
|
||||
role = msg.get("role", "")
|
||||
|
||||
# Only analyze assistant messages and tool results
|
||||
if role not in ("assistant", "tool"):
|
||||
continue
|
||||
|
||||
for failure_type, config in FAILURE_SIGNALS.items():
|
||||
for pattern in config["patterns"]:
|
||||
if re.search(pattern, content):
|
||||
if failure_type not in patterns_found:
|
||||
patterns_found[failure_type] = FailurePattern(
|
||||
pattern_type=failure_type,
|
||||
description=config["description"],
|
||||
frequency=0,
|
||||
suggested_fix=PROMPT_FIXES.get(failure_type, ""),
|
||||
)
|
||||
patterns_found[failure_type].frequency += 1
|
||||
if len(patterns_found[failure_type].example_messages) < 3:
|
||||
patterns_found[failure_type].example_messages.append(
|
||||
content[:200]
|
||||
)
|
||||
break # One match per message per type is enough
|
||||
|
||||
return list(patterns_found.values())
|
||||
|
||||
def generate_patches(self, patterns: List[FailurePattern],
|
||||
min_confidence: float = 0.7) -> List[PromptPatch]:
|
||||
"""Generate prompt patches from failure patterns.
|
||||
|
||||
Args:
|
||||
patterns: Detected failure patterns.
|
||||
min_confidence: Minimum confidence to generate a patch.
|
||||
|
||||
Returns:
|
||||
List of prompt patches.
|
||||
"""
|
||||
patches = []
|
||||
for pattern in patterns:
|
||||
# Confidence based on frequency
|
||||
if pattern.frequency >= 3:
|
||||
confidence = 0.9
|
||||
elif pattern.frequency >= 2:
|
||||
confidence = 0.75
|
||||
else:
|
||||
confidence = 0.5
|
||||
|
||||
if confidence < min_confidence:
|
||||
continue
|
||||
|
||||
if not pattern.suggested_fix:
|
||||
continue
|
||||
|
||||
patch = PromptPatch(
|
||||
id=f"{pattern.pattern_type}-{int(time.time())}",
|
||||
failure_type=pattern.pattern_type,
|
||||
original_rule="(missing — no existing rule for this pattern)",
|
||||
new_rule=pattern.suggested_fix,
|
||||
confidence=confidence,
|
||||
)
|
||||
patches.append(patch)
|
||||
|
||||
return patches
|
||||
|
||||
def apply_patches(self, patches: List[PromptPatch],
|
||||
prompt_path: Optional[str] = None) -> int:
|
||||
"""Apply patches to the system prompt.
|
||||
|
||||
Args:
|
||||
patches: Patches to apply.
|
||||
prompt_path: Path to prompt file (default: ~/.hermes/system_prompt.md)
|
||||
|
||||
Returns:
|
||||
Number of patches applied.
|
||||
"""
|
||||
if prompt_path is None:
|
||||
prompt_path = str(HERMES_HOME / "system_prompt.md")
|
||||
|
||||
prompt_file = Path(prompt_path)
|
||||
|
||||
# Backup current prompt
|
||||
if prompt_file.exists():
|
||||
backup = ROLLBACK_DIR / f"{prompt_file.name}.{int(time.time())}.bak"
|
||||
backup.write_text(prompt_file.read_text())
|
||||
|
||||
# Read current prompt
|
||||
current = prompt_file.read_text() if prompt_file.exists() else ""
|
||||
|
||||
# Apply patches
|
||||
applied = 0
|
||||
additions = []
|
||||
for patch in patches:
|
||||
if patch.new_rule not in current:
|
||||
additions.append(f"\n## Auto-learned: {patch.failure_type}\n{patch.new_rule}")
|
||||
patch.applied_at = time.time()
|
||||
applied += 1
|
||||
|
||||
if additions:
|
||||
new_content = current + "\n".join(additions)
|
||||
prompt_file.write_text(new_content)
|
||||
|
||||
# Log patches
|
||||
patches_file = PATCHES_DIR / f"patches-{int(time.time())}.json"
|
||||
with open(patches_file, "w") as f:
|
||||
json.dump([p.__dict__ for p in patches], f, indent=2, default=str)
|
||||
|
||||
logger.info("Applied %d prompt patches", applied)
|
||||
return applied
|
||||
|
||||
def rollback_last(self, prompt_path: Optional[str] = None) -> bool:
|
||||
"""Rollback to the most recent backup.
|
||||
|
||||
Args:
|
||||
prompt_path: Path to prompt file.
|
||||
|
||||
Returns:
|
||||
True if rollback succeeded.
|
||||
"""
|
||||
if prompt_path is None:
|
||||
prompt_path = str(HERMES_HOME / "system_prompt.md")
|
||||
|
||||
backups = sorted(ROLLBACK_DIR.glob("*.bak"), reverse=True)
|
||||
if not backups:
|
||||
logger.warning("No backups to rollback to")
|
||||
return False
|
||||
|
||||
latest = backups[0]
|
||||
Path(prompt_path).write_text(latest.read_text())
|
||||
logger.info("Rolled back to %s", latest.name)
|
||||
return True
|
||||
|
||||
def learn_from_session(self, session_data: dict) -> Dict[str, Any]:
|
||||
"""Full learning cycle: analyze → patch → apply.
|
||||
|
||||
Args:
|
||||
session_data: Session dict.
|
||||
|
||||
Returns:
|
||||
Summary of what was learned and applied.
|
||||
"""
|
||||
patterns = self.analyze_session(session_data)
|
||||
patches = self.generate_patches(patterns)
|
||||
applied = self.apply_patches(patches)
|
||||
|
||||
return {
|
||||
"patterns_detected": len(patterns),
|
||||
"patches_generated": len(patches),
|
||||
"patches_applied": applied,
|
||||
"patterns": [
|
||||
{"type": p.pattern_type, "frequency": p.frequency, "description": p.description}
|
||||
for p in patterns
|
||||
],
|
||||
}
|
||||
136
tests/test_batch_executor.py
Normal file
136
tests/test_batch_executor.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""Tests for batch tool execution — Issue #749."""
|
||||
import asyncio
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from tools.batch_executor import (
|
||||
ToolSafety, ToolCall, BatchResult,
|
||||
classify_tool_safety, classify_calls,
|
||||
execute_batch_sync, get_tool_safety_report
|
||||
)
|
||||
|
||||
|
||||
class TestClassification:
|
||||
def test_parallel_safe_read(self):
|
||||
assert classify_tool_safety("file_read") == ToolSafety.PARALLEL_SAFE
|
||||
|
||||
def test_sequential_write(self):
|
||||
assert classify_tool_safety("file_write") == ToolSafety.SEQUENTIAL
|
||||
|
||||
def test_destructive_terminal(self):
|
||||
assert classify_tool_safety("terminal") == ToolSafety.DESTRUCTIVE
|
||||
|
||||
def test_unknown_defaults_sequential(self):
|
||||
assert classify_tool_safety("unknown_tool") == ToolSafety.SEQUENTIAL
|
||||
|
||||
def test_prefix_match(self):
|
||||
assert classify_tool_safety("file_read_special") == ToolSafety.PARALLEL_SAFE
|
||||
|
||||
|
||||
class TestClassifyCalls:
|
||||
def test_classifies_multiple(self):
|
||||
calls = [
|
||||
{"name": "file_read", "arguments": "{}"},
|
||||
{"name": "file_write", "arguments": "{}"},
|
||||
{"name": "terminal", "arguments": "{}"},
|
||||
]
|
||||
result = classify_calls(calls)
|
||||
assert len(result) == 3
|
||||
assert result[0].safety == ToolSafety.PARALLEL_SAFE
|
||||
assert result[1].safety == ToolSafety.SEQUENTIAL
|
||||
assert result[2].safety == ToolSafety.DESTRUCTIVE
|
||||
|
||||
|
||||
class TestBatchExecution:
|
||||
def test_parallel_execution(self):
|
||||
"""Parallel-safe calls should execute faster than sequential."""
|
||||
import time
|
||||
|
||||
def slow_executor(name, args):
|
||||
time.sleep(0.1)
|
||||
return f"result_{name}"
|
||||
|
||||
calls = [
|
||||
{"name": "file_read", "arguments": "{}"},
|
||||
{"name": "file_search", "arguments": "{}"},
|
||||
{"name": "web_search", "arguments": "{}"},
|
||||
]
|
||||
|
||||
start = time.time()
|
||||
result = execute_batch_sync(calls, slow_executor)
|
||||
duration = time.time() - start
|
||||
|
||||
# Should be faster than 0.3s (3 * 0.1) since parallel
|
||||
assert duration < 0.25
|
||||
assert result.parallel_count == 3
|
||||
assert len(result.errors) == 0
|
||||
|
||||
def test_sequential_execution(self):
|
||||
"""Sequential calls should execute one at a time."""
|
||||
import time
|
||||
|
||||
def slow_executor(name, args):
|
||||
time.sleep(0.05)
|
||||
return f"result_{name}"
|
||||
|
||||
calls = [
|
||||
{"name": "file_write", "arguments": "{}"},
|
||||
{"name": "file_patch", "arguments": "{}"},
|
||||
]
|
||||
|
||||
start = time.time()
|
||||
result = execute_batch_sync(calls, slow_executor)
|
||||
duration = time.time() - start
|
||||
|
||||
# Should take at least 0.1s (2 * 0.05) since sequential
|
||||
assert duration >= 0.1
|
||||
assert result.sequential_count == 2
|
||||
|
||||
def test_mixed_execution(self):
|
||||
"""Mixed calls: parallel first, then sequential."""
|
||||
calls = [
|
||||
{"name": "file_read", "arguments": "{}"},
|
||||
{"name": "file_write", "arguments": "{}"},
|
||||
{"name": "web_search", "arguments": "{}"},
|
||||
]
|
||||
|
||||
def executor(name, args):
|
||||
return f"result_{name}"
|
||||
|
||||
result = execute_batch_sync(calls, executor)
|
||||
assert result.parallel_count == 2
|
||||
assert result.sequential_count == 1
|
||||
assert len(result.errors) == 0
|
||||
|
||||
def test_error_handling(self):
|
||||
"""Errors in one call shouldn't stop others."""
|
||||
def failing_executor(name, args):
|
||||
if name == "file_write":
|
||||
raise Exception("Write failed")
|
||||
return "ok"
|
||||
|
||||
calls = [
|
||||
{"name": "file_read", "arguments": "{}"},
|
||||
{"name": "file_write", "arguments": "{}"},
|
||||
]
|
||||
|
||||
result = execute_batch_sync(calls, failing_executor)
|
||||
assert len(result.errors) == 1
|
||||
assert "file_write" in result.errors[0]
|
||||
|
||||
|
||||
class TestSafetyReport:
|
||||
def test_report_format(self):
|
||||
calls = [
|
||||
ToolCall(name="file_read", args={}, safety=ToolSafety.PARALLEL_SAFE, duration=0.1),
|
||||
ToolCall(name="file_write", args={}, safety=ToolSafety.SEQUENTIAL, duration=0.2),
|
||||
]
|
||||
report = get_tool_safety_report(calls)
|
||||
assert "Parallel-safe: 1" in report
|
||||
assert "Sequential: 1" in report
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
pytest.main([__file__, "-v"])
|
||||
280
tools/batch_executor.py
Normal file
280
tools/batch_executor.py
Normal file
@@ -0,0 +1,280 @@
|
||||
"""Batch tool execution with parallel safety checks.
|
||||
|
||||
Classifies tool calls as parallel-safe vs sequential and executes
|
||||
parallel-safe calls concurrently while keeping destructive ops serialized.
|
||||
|
||||
Issue #749: feat: batch tool execution with parallel safety checks
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ToolSafety(Enum):
|
||||
"""Safety classification for tool calls."""
|
||||
PARALLEL_SAFE = "parallel_safe" # Can run concurrently
|
||||
SEQUENTIAL = "sequential" # Must run one at a time
|
||||
DESTRUCTIVE = "destructive" # Destructive, needs approval
|
||||
|
||||
|
||||
# Tool safety classifications
|
||||
_TOOL_SAFETY: Dict[str, ToolSafety] = {
|
||||
# Parallel-safe: reads, searches, non-destructive
|
||||
"file_read": ToolSafety.PARALLEL_SAFE,
|
||||
"file_search": ToolSafety.PARALLEL_SAFE,
|
||||
"web_search": ToolSafety.PARALLEL_SAFE,
|
||||
"web_extract": ToolSafety.PARALLEL_SAFE,
|
||||
"browser_snapshot": ToolSafety.PARALLEL_SAFE,
|
||||
"browser_vision": ToolSafety.PARALLEL_SAFE,
|
||||
"browser_get_images": ToolSafety.PARALLEL_SAFE,
|
||||
"skill_view": ToolSafety.PARALLEL_SAFE,
|
||||
"memory_search": ToolSafety.PARALLEL_SAFE,
|
||||
"memory_recall": ToolSafety.PARALLEL_SAFE,
|
||||
"session_search": ToolSafety.PARALLEL_SAFE,
|
||||
|
||||
# Sequential: writes, edits, state changes
|
||||
"file_write": ToolSafety.SEQUENTIAL,
|
||||
"file_patch": ToolSafety.SEQUENTIAL,
|
||||
"file_append": ToolSafety.SEQUENTIAL,
|
||||
"browser_navigate": ToolSafety.SEQUENTIAL,
|
||||
"browser_click": ToolSafety.SEQUENTIAL,
|
||||
"browser_type": ToolSafety.SEQUENTIAL,
|
||||
"browser_scroll": ToolSafety.SEQUENTIAL,
|
||||
"memory_store": ToolSafety.SEQUENTIAL,
|
||||
"memory_update": ToolSafety.SEQUENTIAL,
|
||||
"cronjob": ToolSafety.SEQUENTIAL,
|
||||
"send_message": ToolSafety.SEQUENTIAL,
|
||||
|
||||
# Destructive: needs approval
|
||||
"terminal": ToolSafety.DESTRUCTIVE,
|
||||
"execute_code": ToolSafety.DESTRUCTIVE,
|
||||
"browser_execute_js": ToolSafety.DESTRUCTIVE,
|
||||
"delegate_task": ToolSafety.DESTRUCTIVE,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolCall:
|
||||
"""A single tool call with metadata."""
|
||||
name: str
|
||||
args: Dict[str, Any]
|
||||
call_id: str = ""
|
||||
safety: ToolSafety = ToolSafety.SEQUENTIAL
|
||||
result: Optional[Any] = None
|
||||
error: Optional[str] = None
|
||||
duration: float = 0.0
|
||||
started_at: float = 0.0
|
||||
completed_at: float = 0.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class BatchResult:
|
||||
"""Result of batch tool execution."""
|
||||
calls: List[ToolCall] = field(default_factory=list)
|
||||
parallel_count: int = 0
|
||||
sequential_count: int = 0
|
||||
total_duration: float = 0.0
|
||||
errors: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
def classify_tool_safety(tool_name: str) -> ToolSafety:
|
||||
"""Classify a tool call's safety level."""
|
||||
# Check exact match first
|
||||
if tool_name in _TOOL_SAFETY:
|
||||
return _TOOL_SAFETY[tool_name]
|
||||
|
||||
# Check prefix matches
|
||||
for pattern, safety in _TOOL_SAFETY.items():
|
||||
if tool_name.startswith(pattern):
|
||||
return safety
|
||||
|
||||
# Default to sequential for unknown tools
|
||||
return ToolSafety.SEQUENTIAL
|
||||
|
||||
|
||||
def classify_calls(tool_calls: List[Dict[str, Any]]) -> List[ToolCall]:
|
||||
"""Classify a list of tool calls by safety level."""
|
||||
calls = []
|
||||
for i, tc in enumerate(tool_calls):
|
||||
name = tc.get("name", tc.get("function", {}).get("name", ""))
|
||||
args = tc.get("arguments", tc.get("function", {}).get("arguments", {}))
|
||||
if isinstance(args, str):
|
||||
import json
|
||||
try:
|
||||
args = json.loads(args)
|
||||
except Exception:
|
||||
args = {}
|
||||
|
||||
call_id = tc.get("id", f"call_{i}")
|
||||
safety = classify_tool_safety(name)
|
||||
|
||||
calls.append(ToolCall(
|
||||
name=name,
|
||||
args=args,
|
||||
call_id=call_id,
|
||||
safety=safety,
|
||||
))
|
||||
|
||||
return calls
|
||||
|
||||
|
||||
async def execute_parallel(
|
||||
calls: List[ToolCall],
|
||||
executor: Callable[[str, Dict[str, Any]], Any],
|
||||
) -> List[ToolCall]:
|
||||
"""Execute parallel-safe calls concurrently."""
|
||||
async def run_call(call: ToolCall) -> ToolCall:
|
||||
call.started_at = time.time()
|
||||
try:
|
||||
# Run in thread pool to avoid blocking
|
||||
loop = asyncio.get_event_loop()
|
||||
result = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: executor(call.name, call.args),
|
||||
)
|
||||
call.result = result
|
||||
except Exception as e:
|
||||
call.error = str(e)
|
||||
logger.error(f"Parallel call {call.name} failed: {e}")
|
||||
finally:
|
||||
call.completed_at = time.time()
|
||||
call.duration = call.completed_at - call.started_at
|
||||
return call
|
||||
|
||||
# Execute all parallel-safe calls concurrently
|
||||
tasks = [run_call(call) for call in calls]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Handle exceptions from gather
|
||||
processed = []
|
||||
for i, result in enumerate(results):
|
||||
if isinstance(result, Exception):
|
||||
calls[i].error = str(result)
|
||||
calls[i].completed_at = time.time()
|
||||
calls[i].duration = calls[i].completed_at - calls[i].started_at
|
||||
processed.append(calls[i])
|
||||
else:
|
||||
processed.append(result)
|
||||
|
||||
return processed
|
||||
|
||||
|
||||
async def execute_sequential(
|
||||
calls: List[ToolCall],
|
||||
executor: Callable[[str, Dict[str, Any]], Any],
|
||||
) -> List[ToolCall]:
|
||||
"""Execute sequential/destructive calls one at a time."""
|
||||
for call in calls:
|
||||
call.started_at = time.time()
|
||||
try:
|
||||
result = executor(call.name, call.args)
|
||||
call.result = result
|
||||
except Exception as e:
|
||||
call.error = str(e)
|
||||
logger.error(f"Sequential call {call.name} failed: {e}")
|
||||
finally:
|
||||
call.completed_at = time.time()
|
||||
call.duration = call.completed_at - call.started_at
|
||||
|
||||
return calls
|
||||
|
||||
|
||||
async def execute_batch(
|
||||
tool_calls: List[Dict[str, Any]],
|
||||
executor: Callable[[str, Dict[str, Any]], Any],
|
||||
max_parallel: int = 5,
|
||||
) -> BatchResult:
|
||||
"""Execute a batch of tool calls with parallel safety checks.
|
||||
|
||||
Args:
|
||||
tool_calls: List of tool call dicts (OpenAI format)
|
||||
executor: Function to execute a single tool call (name, args) -> result
|
||||
max_parallel: Maximum concurrent parallel calls
|
||||
|
||||
Returns:
|
||||
BatchResult with all call results and timing info
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
# Classify all calls
|
||||
calls = classify_calls(tool_calls)
|
||||
|
||||
# Split by safety level
|
||||
parallel_calls = [c for c in calls if c.safety == ToolSafety.PARALLEL_SAFE]
|
||||
sequential_calls = [c for c in calls if c.safety != ToolSafety.PARALLEL_SAFE]
|
||||
|
||||
result = BatchResult(
|
||||
calls=calls,
|
||||
parallel_count=len(parallel_calls),
|
||||
sequential_count=len(sequential_calls),
|
||||
)
|
||||
|
||||
# Execute parallel calls concurrently
|
||||
if parallel_calls:
|
||||
logger.info(f"Executing {len(parallel_calls)} parallel-safe calls concurrently")
|
||||
|
||||
# Batch into chunks of max_parallel
|
||||
for i in range(0, len(parallel_calls), max_parallel):
|
||||
chunk = parallel_calls[i:i + max_parallel]
|
||||
await execute_parallel(chunk, executor)
|
||||
|
||||
# Execute sequential calls one at a time
|
||||
if sequential_calls:
|
||||
logger.info(f"Executing {len(sequential_calls)} sequential calls")
|
||||
await execute_sequential(sequential_calls, executor)
|
||||
|
||||
# Collect errors
|
||||
for call in calls:
|
||||
if call.error:
|
||||
result.errors.append(f"{call.name}: {call.error}")
|
||||
|
||||
result.total_duration = time.time() - start_time
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def execute_batch_sync(
|
||||
tool_calls: List[Dict[str, Any]],
|
||||
executor: Callable[[str, Dict[str, Any]], Any],
|
||||
max_parallel: int = 5,
|
||||
) -> BatchResult:
|
||||
"""Synchronous wrapper for execute_batch."""
|
||||
return asyncio.run(execute_batch(tool_calls, executor, max_parallel))
|
||||
|
||||
|
||||
def get_tool_safety_report(calls: List[ToolCall]) -> str:
|
||||
"""Generate a human-readable safety report."""
|
||||
parallel = [c for c in calls if c.safety == ToolSafety.PARALLEL_SAFE]
|
||||
sequential = [c for c in calls if c.safety == ToolSafety.SEQUENTIAL]
|
||||
destructive = [c for c in calls if c.safety == ToolSafety.DESTRUCTIVE]
|
||||
|
||||
lines = ["Tool Safety Report:"]
|
||||
lines.append(f" Parallel-safe: {len(parallel)}")
|
||||
lines.append(f" Sequential: {len(sequential)}")
|
||||
lines.append(f" Destructive: {len(destructive)}")
|
||||
|
||||
if parallel:
|
||||
lines.append("\nParallel-safe calls:")
|
||||
for c in parallel:
|
||||
status = "✓" if not c.error else "✗"
|
||||
lines.append(f" {status} {c.name} ({c.duration:.2f}s)")
|
||||
|
||||
if sequential:
|
||||
lines.append("\nSequential calls:")
|
||||
for c in sequential:
|
||||
status = "✓" if not c.error else "✗"
|
||||
lines.append(f" {status} {c.name} ({c.duration:.2f}s)")
|
||||
|
||||
if destructive:
|
||||
lines.append("\nDestructive calls:")
|
||||
for c in destructive:
|
||||
status = "✓" if not c.error else "✗"
|
||||
lines.append(f" {status} {c.name} ({c.duration:.2f}s)")
|
||||
|
||||
return "\n".join(lines)
|
||||
Reference in New Issue
Block a user