Compare commits

..

6 Commits

Author SHA1 Message Date
7b4458e0ad test: JSON repair module tests
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 44s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 51s
Tests / e2e (pull_request) Successful in 2m59s
Tests / test (pull_request) Failing after 38m20s
Part of #781. Tests all 7 failure patterns.
2026-04-15 14:49:51 +00:00
8d92047ff8 feat: JSON repair module for tool-call pipeline
Closes #781

Provides repair_and_load_json() that fixes malformed JSON from
LLM tool calls. Handles truncation, unquoted keys, single quotes,
trailing commas, markdown wrapping, and extra trailing text.
2026-04-15 14:49:49 +00:00
f1f9bd2e76 Merge pull request 'feat: implement Reader-Guided Reranking — bridge R@5 vs E2E gap (#666)' (#782) from fix/666 into main 2026-04-15 11:58:02 +00:00
Hermes Agent
4129cc0d0c feat: implement Reader-Guided Reranking — bridge R@5 vs E2E gap (#666)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 37s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 55s
Tests / test (pull_request) Failing after 55s
Tests / e2e (pull_request) Successful in 2m49s
Resolves #666. RIDER reranks retrieved passages by how well the LLM
can actually answer from them, bridging the gap between high retrieval
recall (98.4% R@5) and low end-to-end accuracy (17%).

agent/rider.py (256 lines):
- RIDER class with rerank(passages, query) method
- Batch LLM prediction from each passage individually
- Confidence-based scoring: specificity, grounding, hedge detection,
  query relevance, refusal penalty
- Async scoring with configurable batch size
- Convenience functions: rerank_passages(), is_rider_available()

tools/session_search_tool.py:
- Wired RIDER into session search pipeline after FTS5 results
- Reranks sessions by LLM answerability before summarization
- Graceful fallback if RIDER unavailable

tests/test_reader_guided_reranking.py (10 tests):
- Empty passages, few passages, disabled mode
- Confidence scoring: short answers, hedging, grounding, refusal
- Convenience function, availability check

Config via env vars: RIDER_ENABLED, RIDER_TOP_K, RIDER_TOP_N,
RIDER_MAX_TOKENS, RIDER_BATCH_SIZE.
2026-04-15 07:40:15 -04:00
230fb9213b feat: tool error classification — retryable vs permanent (#752) (#773)
Co-authored-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
Co-committed-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
2026-04-15 04:54:54 +00:00
1263d11f52 feat: Approval Tier System — Extend approval.py with Safety Tiers (#670) (#776)
Co-authored-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
Co-committed-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
2026-04-15 04:54:53 +00:00
12 changed files with 1445 additions and 683 deletions

283
agent/json_repair.py Normal file
View File

@@ -0,0 +1,283 @@
"""
JSON Repair for Tool-Call Pipeline — #781
Provides repair_and_load_json() that attempts to fix malformed JSON
from LLM tool calls before falling back to full API retry.
Handles all common LLM JSON failure modes:
- Truncation (missing closing braces/brackets)
- Unquoted keys
- Single quotes
- Trailing commas
- Markdown-wrapped JSON
- Extra trailing text
Usage:
from agent.json_repair import repair_and_load_json
args = repair_and_load_json(raw_args)
if args is None:
# Truly unrepairable — trigger retry
pass
"""
import json
import re
from typing import Any, Optional
try:
from json_repair import repair_json
HAS_JSON_REPAIR = True
except ImportError:
HAS_JSON_REPAIR = False
def repair_and_load_json(text: str, default: Any = None) -> Any:
"""
Parse JSON, repairing common LLM malformations on failure.
Tries json.loads() first (fast path). On failure, attempts repair
via json-repair library. Returns default if unrepairable.
Args:
text: Raw JSON string (possibly malformed)
default: Value to return if repair fails
Returns:
Parsed JSON object, or default if unrepairable
"""
if not text or not isinstance(text, str):
return default
text = text.strip()
if not text:
return default
# Fast path: try native parse first
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError, ValueError):
pass
# Repair path
repaired = _repair_json_string(text)
if repaired is None:
return default
try:
return json.loads(repaired)
except (json.JSONDecodeError, TypeError, ValueError):
return default
def _repair_json_string(text: str) -> Optional[str]:
"""
Attempt to repair a malformed JSON string.
Returns repaired string, or None if unrepairable.
"""
if not text:
return None
# Strip markdown wrapping
text = _strip_markdown(text)
# Try json-repair library if available
if HAS_JSON_REPAIR:
try:
repaired = repair_json(text)
if repaired and repaired != text:
return repaired
except Exception:
pass
# Manual repair attempts
repaired = _manual_repair(text)
return repaired
def _strip_markdown(text: str) -> str:
"""Strip markdown code block wrapping."""
# ```json ... ```
match = re.match(r'^```(?:json)?\s*\n?(.*?)\n?```\s*$', text, re.DOTALL)
if match:
return match.group(1).strip()
# ` ... `
match = re.match(r'^`(.+)`$', text, re.DOTALL)
if match:
return match.group(1).strip()
return text
def _manual_repair(text: str) -> Optional[str]:
"""Manual JSON repair heuristics."""
original = text
# Remove trailing non-JSON text after valid JSON
# Find the last } or ] that closes the root
text = _extract_json(text)
if text is None:
return None
# Fix trailing commas
text = re.sub(r',\s*([}\]])', r'\1', text)
# Fix single quotes -> double quotes (simple cases)
# Only if there are no double quotes already
if '"' not in text and "'" in text:
text = text.replace("'", '"')
# Fix unquoted keys: {key: value} -> {"key": value}
text = re.sub(r'([{,]\s*)([a-zA-Z_][a-zA-Z0-9_]*)\s*:', r'\1"\2":', text)
# Try to close unclosed braces/brackets
text = _close_brackets(text)
return text if text != original else None
def _extract_json(text: str) -> Optional[str]:
"""Extract JSON from text that may have trailing content."""
text = text.strip()
if not text:
return None
# Find the start of JSON (first { or [)
start_obj = text.find('{')
start_arr = text.find('[')
if start_obj == -1 and start_arr == -1:
return None
if start_arr == -1 or (start_obj != -1 and start_obj < start_arr):
start = start_obj
open_char, close_char = '{', '}'
else:
start = start_arr
open_char, close_char = '[', ']'
# Find matching close, handling nesting
depth = 0
in_string = False
escape_next = False
for i in range(start, len(text)):
c = text[i]
if escape_next:
escape_next = False
continue
if c == '\\':
escape_next = True
continue
if c == '"' and not escape_next:
in_string = not in_string
continue
if in_string:
continue
if c == open_char:
depth += 1
elif c == close_char:
depth -= 1
if depth == 0:
return text[start:i+1]
# Unclosed — return from start to end, will be closed by _close_brackets
return text[start:] if start > 0 else text
def _close_brackets(text: str) -> str:
"""Close unclosed braces and brackets."""
open_count = 0
close_count = 0
in_string = False
escape_next = False
for c in text:
if escape_next:
escape_next = False
continue
if c == '\\':
escape_next = True
continue
if c == '"' and not escape_next:
in_string = not in_string
continue
if in_string:
continue
if c == '{':
open_count += 1
elif c == '}':
close_count += 1
elif c == '[':
open_count += 1
elif c == ']':
close_count += 1
# Close unclosed strings first
if in_string:
text += '"'
# Close unclosed braces/brackets
diff = open_count - close_count
if diff > 0:
# Determine what to close based on last unclosed
last_open_obj = text.rfind('{')
last_open_arr = text.rfind('[')
if last_open_obj > last_open_arr:
text += '}' * diff
else:
text += ']' * diff
return text
def repair_tool_call_arguments(arguments: str) -> tuple[str, bool]:
"""
Repair tool call arguments and return (repaired_args, was_repaired).
Specifically designed for the tool-call pipeline.
"""
if not arguments or not arguments.strip():
return '{}', True
try:
json.loads(arguments)
return arguments, False # Already valid
except (json.JSONDecodeError, TypeError, ValueError):
pass
repaired = repair_and_load_json(arguments)
if repaired is not None:
return json.dumps(repaired), True
return '{}', True # Unrepairable — return empty dict
# Metrics tracking
_json_repairs_attempted = 0
_json_repairs_succeeded = 0
def get_repair_metrics() -> dict:
"""Get JSON repair metrics."""
return {
"attempted": _json_repairs_attempted,
"succeeded": _json_repairs_succeeded,
"success_rate": _json_repairs_succeeded / _json_repairs_attempted if _json_repairs_attempted > 0 else 0,
}
def _track_repair(success: bool):
"""Track repair attempt."""
global _json_repairs_attempted, _json_repairs_succeeded
_json_repairs_attempted += 1
if success:
_json_repairs_succeeded += 1

View File

@@ -1,353 +0,0 @@
"""Multi-Agent Conversation Bridge via Matrix.
Allows multiple Hermes instances (Timmy, Allegro, Ezra) to communicate
with each other through a shared Matrix room.
Usage:
from agent.matrix_bridge import MatrixBridge
bridge = MatrixBridge(agent_name="Timmy")
await bridge.connect()
await bridge.send_to_agent("Allegro", "Check the deployment status")
messages = await bridge.get_messages_from("Allegro")
"""
import asyncio
import json
import logging
import os
import re
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
# Configuration
MATRIX_BRIDGE_ROOM = os.environ.get("MATRIX_BRIDGE_ROOM", "")
MATRIX_BRIDGE_ENABLED = os.environ.get("MATRIX_BRIDGE_ENABLED", "true").lower() == "true"
AGENT_NAME = os.environ.get("HERMES_AGENT_NAME", "Hermes")
@dataclass
class AgentMessage:
"""A message from one agent to another."""
sender: str
recipient: str
content: str
timestamp: float = field(default_factory=time.time)
message_id: str = ""
room_id: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
"sender": self.sender,
"recipient": self.recipient,
"content": self.content,
"timestamp": self.timestamp,
"message_id": self.message_id,
"room_id": self.room_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "AgentMessage":
return cls(**data)
class MatrixBridge:
"""Multi-agent conversation bridge via Matrix rooms.
Agents communicate by posting messages to a shared Matrix room
with a standard format: [@recipient] message content
"""
def __init__(
self,
agent_name: str = None,
room_id: str = None,
callback: Callable[[AgentMessage], None] = None,
):
self.agent_name = agent_name or AGENT_NAME
self.room_id = room_id or MATRIX_BRIDGE_ROOM
self.callback = callback
self._matrix_client = None
self._running = False
self._message_handlers: List[Callable[[AgentMessage], None]] = []
self._pending_messages: List[AgentMessage] = []
self._known_agents: Set[str] = set()
async def connect(self) -> bool:
"""Connect to Matrix and join the bridge room."""
if not MATRIX_BRIDGE_ENABLED:
logger.info("Matrix bridge disabled via MATRIX_BRIDGE_ENABLED=false")
return False
if not self.room_id:
logger.warning("No MATRIX_BRIDGE_ROOM configured — bridge disabled")
return False
try:
# Import Matrix client
from mautrix.client import Client
from mautrix.types import RoomID, UserID
# Get credentials
homeserver = os.environ.get("MATRIX_HOMESERVER", "")
access_token = os.environ.get("MATRIX_ACCESS_TOKEN", "")
if not homeserver or not access_token:
logger.warning("Matrix credentials not configured — bridge disabled")
return False
# Create client
self._matrix_client = Client(
mxid=UserID(f"@{self.agent_name}:{homeserver.split('//')[1]}"),
base_url=homeserver,
token=access_token,
)
# Join room
await self._matrix_client.join_room(RoomID(self.room_id))
logger.info(f"Agent {self.agent_name} joined bridge room {self.room_id}")
# Register message handler
self._matrix_client.add_event_handler(self._on_message)
# Start sync
self._running = True
asyncio.create_task(self._sync_loop())
# Announce presence
await self._announce_presence()
return True
except Exception as e:
logger.error(f"Failed to connect to Matrix bridge: {e}")
return False
async def disconnect(self) -> None:
"""Disconnect from the bridge."""
self._running = False
if self._matrix_client:
try:
await self._matrix_client.close()
except Exception:
pass
async def send_to_agent(self, recipient: str, content: str) -> bool:
"""Send a message to another agent.
Args:
recipient: Agent name (e.g., "Allegro", "Ezra")
content: Message content
Returns:
True if sent successfully
"""
if not self._matrix_client or not self.room_id:
logger.warning("Not connected to bridge room")
return False
# Format message with recipient prefix
formatted = f"[@{recipient}] {content}"
try:
from mautrix.types import RoomID, TextMessageEventContent, MessageType
await self._matrix_client.send_message_event(
room_id=RoomID(self.room_id),
event_type="m.room.message",
content=TextMessageEventContent(
msgtype=MessageType.TEXT,
body=formatted,
),
)
logger.info(f"Sent message to {recipient}: {content[:50]}...")
return True
except Exception as e:
logger.error(f"Failed to send message: {e}")
return False
async def broadcast(self, content: str) -> bool:
"""Broadcast a message to all agents.
Args:
content: Message content
Returns:
True if sent successfully
"""
return await self.send_to_agent("*", content)
def add_handler(self, handler: Callable[[AgentMessage], None]) -> None:
"""Add a message handler.
Called when a message is received for this agent.
"""
self._message_handlers.append(handler)
def get_known_agents(self) -> Set[str]:
"""Get set of known agents in the bridge."""
return self._known_agents.copy()
async def _on_message(self, event) -> None:
"""Handle incoming Matrix message."""
try:
# Extract message content
content = event.content
if not hasattr(content, 'body'):
return
body = content.body
# Check if message is for this agent
if not self._is_for_me(body):
return
# Parse sender and content
sender = self._extract_sender(event)
message_content = self._extract_content(body)
# Create agent message
msg = AgentMessage(
sender=sender,
recipient=self.agent_name,
content=message_content,
timestamp=time.time(),
message_id=str(event.event_id),
room_id=str(event.room_id),
)
# Track known agents
self._known_agents.add(sender)
# Call handlers
for handler in self._message_handlers:
try:
handler(msg)
except Exception as e:
logger.error(f"Message handler error: {e}")
if self.callback:
try:
self.callback(msg)
except Exception as e:
logger.error(f"Callback error: {e}")
logger.info(f"Received message from {sender}: {message_content[:50]}...")
except Exception as e:
logger.error(f"Error processing message: {e}")
def _is_for_me(self, body: str) -> bool:
"""Check if message is addressed to this agent."""
# Direct mention
if f"[@{self.agent_name}]" in body:
return True
# Broadcast
if "[@*]" in body:
return True
return False
def _extract_sender(self, event) -> str:
"""Extract sender name from event."""
try:
sender_id = str(event.sender)
# Extract name from @name:server format
match = re.match(r"@([^:]+):", sender_id)
if match:
return match.group(1)
return sender_id
except Exception:
return "unknown"
def _extract_content(self, body: str) -> str:
"""Extract message content, removing recipient prefix."""
# Remove [@recipient] prefix
match = re.match(r"\[@[^\]]+\]\s*(.*)", body, re.DOTALL)
if match:
return match.group(1).strip()
return body.strip()
async def _announce_presence(self) -> None:
"""Announce this agent's presence to the bridge."""
await self.broadcast(f"{self.agent_name} online")
async def _sync_loop(self) -> None:
"""Background sync loop for Matrix events."""
while self._running:
try:
if self._matrix_client:
await self._matrix_client.sync(timeout=30000)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Sync error: {e}")
await asyncio.sleep(5)
class AgentRegistry:
"""Registry of known agents in the bridge."""
def __init__(self):
self._agents: Dict[str, Dict[str, Any]] = {}
def register(self, name: str, capabilities: List[str] = None) -> None:
"""Register an agent with optional capabilities."""
self._agents[name] = {
"name": name,
"capabilities": capabilities or [],
"last_seen": time.time(),
"status": "online",
}
def unregister(self, name: str) -> None:
"""Unregister an agent."""
if name in self._agents:
self._agents[name]["status"] = "offline"
def get_agent(self, name: str) -> Optional[Dict[str, Any]]:
"""Get agent info by name."""
return self._agents.get(name)
def list_agents(self) -> List[Dict[str, Any]]:
"""List all registered agents."""
return list(self._agents.values())
def find_agents_with_capability(self, capability: str) -> List[str]:
"""Find agents with a specific capability."""
return [
name for name, info in self._agents.items()
if capability in info.get("capabilities", [])
]
# Global bridge instance
_bridge: Optional[MatrixBridge] = None
async def get_bridge(agent_name: str = None) -> MatrixBridge:
"""Get or create the global Matrix bridge instance."""
global _bridge
if _bridge is None:
_bridge = MatrixBridge(agent_name=agent_name)
await _bridge.connect()
return _bridge
async def send_to_agent(recipient: str, content: str) -> bool:
"""Convenience function to send a message to another agent."""
bridge = await get_bridge()
return await bridge.send_to_agent(recipient, content)
async def broadcast_to_agents(content: str) -> bool:
"""Convenience function to broadcast to all agents."""
bridge = await get_bridge()
return await bridge.broadcast(content)

256
agent/rider.py Normal file
View File

@@ -0,0 +1,256 @@
"""RIDER — Reader-Guided Passage Reranking.
Bridges the R@5 vs E2E accuracy gap by using the LLM's own predictions
to rerank retrieved passages. Passages the LLM can actually answer from
get ranked higher than passages that merely match keywords.
Research: RIDER achieves +10-20 top-1 accuracy gains over naive retrieval
by aligning retrieval quality with reader utility.
Usage:
from agent.rider import RIDER
rider = RIDER()
reranked = rider.rerank(passages, query, top_n=3)
"""
from __future__ import annotations
import asyncio
import logging
import os
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Configuration
RIDER_ENABLED = os.getenv("RIDER_ENABLED", "true").lower() not in ("false", "0", "no")
RIDER_TOP_K = int(os.getenv("RIDER_TOP_K", "10")) # passages to score
RIDER_TOP_N = int(os.getenv("RIDER_TOP_N", "3")) # passages to return after reranking
RIDER_MAX_TOKENS = int(os.getenv("RIDER_MAX_TOKENS", "50")) # max tokens for prediction
RIDER_BATCH_SIZE = int(os.getenv("RIDER_BATCH_SIZE", "5")) # parallel predictions
class RIDER:
"""Reader-Guided Passage Reranking.
Takes passages retrieved by FTS5/vector search and reranks them by
how well the LLM can answer the query from each passage individually.
"""
def __init__(self, auxiliary_task: str = "rider"):
"""Initialize RIDER.
Args:
auxiliary_task: Task name for auxiliary client resolution.
"""
self._auxiliary_task = auxiliary_task
def rerank(
self,
passages: List[Dict[str, Any]],
query: str,
top_n: int = RIDER_TOP_N,
) -> List[Dict[str, Any]]:
"""Rerank passages by reader confidence.
Args:
passages: List of passage dicts. Must have 'content' or 'text' key.
May have 'session_id', 'snippet', 'rank', 'score', etc.
query: The user's search query.
top_n: Number of passages to return after reranking.
Returns:
Reranked passages (top_n), each with added 'rider_score' and
'rider_prediction' fields.
"""
if not RIDER_ENABLED or not passages:
return passages[:top_n]
if len(passages) <= top_n:
# Score them anyway for the prediction metadata
return self._score_and_rerank(passages, query, top_n)
return self._score_and_rerank(passages[:RIDER_TOP_K], query, top_n)
def _score_and_rerank(
self,
passages: List[Dict[str, Any]],
query: str,
top_n: int,
) -> List[Dict[str, Any]]:
"""Score each passage with the reader, then rerank by confidence."""
try:
from model_tools import _run_async
scored = _run_async(self._score_all_passages(passages, query))
except Exception as e:
logger.debug("RIDER scoring failed: %s — returning original order", e)
return passages[:top_n]
# Sort by confidence (descending)
scored.sort(key=lambda p: p.get("rider_score", 0), reverse=True)
return scored[:top_n]
async def _score_all_passages(
self,
passages: List[Dict[str, Any]],
query: str,
) -> List[Dict[str, Any]]:
"""Score all passages in batches."""
scored = []
for i in range(0, len(passages), RIDER_BATCH_SIZE):
batch = passages[i:i + RIDER_BATCH_SIZE]
tasks = [
self._score_single_passage(p, query, idx + i)
for idx, p in enumerate(batch)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for passage, result in zip(batch, results):
if isinstance(result, Exception):
logger.debug("RIDER passage %d scoring failed: %s", i, result)
passage["rider_score"] = 0.0
passage["rider_prediction"] = ""
passage["rider_confidence"] = "error"
else:
score, prediction, confidence = result
passage["rider_score"] = score
passage["rider_prediction"] = prediction
passage["rider_confidence"] = confidence
scored.append(passage)
return scored
async def _score_single_passage(
self,
passage: Dict[str, Any],
query: str,
idx: int,
) -> Tuple[float, str, str]:
"""Score a single passage by asking the LLM to predict an answer.
Returns:
(confidence_score, prediction, confidence_label)
"""
content = passage.get("content") or passage.get("text") or passage.get("snippet", "")
if not content or len(content) < 10:
return 0.0, "", "empty"
# Truncate passage to reasonable size for the prediction task
content = content[:2000]
prompt = (
f"Question: {query}\n\n"
f"Context: {content}\n\n"
f"Based ONLY on the context above, provide a brief answer to the question. "
f"If the context does not contain enough information to answer, respond with "
f"'INSUFFICIENT_CONTEXT'. Be specific and concise."
)
try:
from agent.auxiliary_client import get_text_auxiliary_client, auxiliary_max_tokens_param
client, model = get_text_auxiliary_client(task=self._auxiliary_task)
if not client:
return 0.5, "", "no_client"
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**auxiliary_max_tokens_param(RIDER_MAX_TOKENS),
temperature=0,
)
prediction = (response.choices[0].message.content or "").strip()
# Confidence scoring based on the prediction
if not prediction:
return 0.1, "", "empty_response"
if "INSUFFICIENT_CONTEXT" in prediction.upper():
return 0.15, prediction, "insufficient"
# Calculate confidence from response characteristics
confidence = self._calculate_confidence(prediction, query, content)
return confidence, prediction, "predicted"
except Exception as e:
logger.debug("RIDER prediction failed for passage %d: %s", idx, e)
return 0.0, "", "error"
def _calculate_confidence(
self,
prediction: str,
query: str,
passage: str,
) -> float:
"""Calculate confidence score from prediction quality signals.
Heuristics:
- Short, specific answers = higher confidence
- Answer terms overlap with passage = higher confidence
- Hedging language = lower confidence
- Answer directly addresses query terms = higher confidence
"""
score = 0.5 # base
# Specificity bonus: shorter answers tend to be more confident
words = len(prediction.split())
if words <= 5:
score += 0.2
elif words <= 15:
score += 0.1
elif words > 50:
score -= 0.1
# Passage grounding: does the answer use terms from the passage?
passage_lower = passage.lower()
answer_terms = set(prediction.lower().split())
passage_terms = set(passage_lower.split())
overlap = len(answer_terms & passage_terms)
if overlap > 3:
score += 0.15
elif overlap > 0:
score += 0.05
# Query relevance: does the answer address query terms?
query_terms = set(query.lower().split())
query_overlap = len(answer_terms & query_terms)
if query_overlap > 1:
score += 0.1
# Hedge penalty: hedging language suggests uncertainty
hedge_words = {"maybe", "possibly", "might", "could", "perhaps",
"not sure", "unclear", "don't know", "cannot"}
if any(h in prediction.lower() for h in hedge_words):
score -= 0.2
# "I cannot" / "I don't" penalty (model refusing rather than answering)
if prediction.lower().startswith(("i cannot", "i don't", "i can't", "there is no")):
score -= 0.15
return max(0.0, min(1.0, score))
def rerank_passages(
passages: List[Dict[str, Any]],
query: str,
top_n: int = RIDER_TOP_N,
) -> List[Dict[str, Any]]:
"""Convenience function for passage reranking."""
rider = RIDER()
return rider.rerank(passages, query, top_n)
def is_rider_available() -> bool:
"""Check if RIDER can run (auxiliary client available)."""
if not RIDER_ENABLED:
return False
try:
from agent.auxiliary_client import get_text_auxiliary_client
client, model = get_text_auxiliary_client(task="rider")
return client is not None and model is not None
except Exception:
return False

View File

@@ -1,216 +0,0 @@
# Multi-Agent Conversation Bridge
Allows multiple Hermes instances (Timmy, Allegro, Ezra) to communicate with each other through a shared Matrix room.
## Overview
The Matrix Bridge enables agent-to-agent coordination without manual intervention. Agents can:
- Send tasks to specific agents
- Broadcast to all agents
- Respond to requests from other agents
- Coordinate on complex workflows
## Configuration
### Environment Variables
```bash
# Enable/disable the bridge
MATRIX_BRIDGE_ENABLED=true
# Shared Matrix room ID for agent communication
MATRIX_BRIDGE_ROOM=!roomid:matrix.example.org
# Agent name (for message routing)
HERMES_AGENT_NAME=Timmy
# Matrix credentials (from existing Matrix gateway config)
MATRIX_HOMESERVER=https://matrix.example.org
MATRIX_ACCESS_TOKEN=syt_...
```
### Matrix Room Setup
1. Create a Matrix room for agent communication
2. Invite all agent accounts to the room
3. Set `MATRIX_BRIDGE_ROOM` to the room ID
## Message Format
Messages use a simple prefix format for routing:
```
[@Allegro] Check the deployment status on VPS
[@Ezra] Can you review PR #456?
[@*] System maintenance in 5 minutes
```
- `[@AgentName]` — Message for specific agent
- `[@*]` — Broadcast to all agents
## Usage
### Basic Usage
```python
from agent.matrix_bridge import MatrixBridge, send_to_agent, broadcast_to_agents
# Create bridge
bridge = MatrixBridge(agent_name="Timmy")
await bridge.connect()
# Send to specific agent
await bridge.send_to_agent("Allegro", "Check deployment status")
# Broadcast to all agents
await bridge.broadcast("System maintenance starting")
# Add message handler
def handle_message(msg):
print(f"From {msg.sender}: {msg.content}")
bridge.add_handler(handle_message)
```
### Convenience Functions
```python
from agent.matrix_bridge import send_to_agent, broadcast_to_agents
# Send message
await send_to_agent("Ezra", "Review PR #456")
# Broadcast
await broadcast_to_agents("Going offline for maintenance")
```
### Agent Registry
```python
from agent.matrix_bridge import AgentRegistry
registry = AgentRegistry()
# Register agent with capabilities
registry.register("Timmy", capabilities=["code", "review", "deploy"])
registry.register("Allegro", capabilities=["monitoring", "alerting"])
# Find agents with capability
coders = registry.find_agents_with_capability("code")
```
## Message Flow
```
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Timmy │────▶│ Matrix │────▶│ Allegro │
│ Agent │ │ Room │ │ Agent │
└─────────┘ └─────────┘ └─────────┘
│ │ │
│ [@Allegro] │ │
│ Check deps │ │
└──────────────▶│ │
│ [@Allegro] │
│ Check deps │
└──────────────▶│
│ [@Timmy] │
│ Done ✓ │
│◀──────────────┘
│ [@Timmy] │
│ Done ✓ │
│◀──────────────┘
```
## Integration with Hermes
### In run_agent.py
```python
# Add to conversation loop
if self.matrix_bridge:
# Check for messages from other agents
messages = await self.matrix_bridge.get_pending_messages()
for msg in messages:
# Process agent-to-agent messages
pass
```
### In Gateway
```python
# Add Matrix bridge to gateway
from agent.matrix_bridge import MatrixBridge
bridge = MatrixBridge(agent_name="Timmy")
await bridge.connect()
gateway.matrix_bridge = bridge
```
## Testing
### Unit Tests
```python
def test_message_parsing():
"""Test message format parsing."""
from agent.matrix_bridge import MatrixBridge
bridge = MatrixBridge(agent_name="Timmy")
# Test recipient extraction
assert bridge._is_for_me("[@Timmy] Hello")
assert not bridge._is_for_me("[@Allegro] Hello")
assert bridge._is_for_me("[@*] Broadcast")
# Test content extraction
assert bridge._extract_content("[@Timmy] Hello") == "Hello"
assert bridge._extract_content("[@*] Test message") == "Test message"
```
### Integration Test
```bash
# Test with two agents
MATRIX_BRIDGE_ENABLED=true \
MATRIX_BRIDGE_ROOM=!test:matrix.example.org \
HERMES_AGENT_NAME=Timmy \
python -c "
import asyncio
from agent.matrix_bridge import send_to_agent
async def test():
await send_to_agent('Allegro', 'Test message')
print('Sent')
asyncio.run(test())
"
```
## Troubleshooting
### Bridge not connecting
1. Check `MATRIX_BRIDGE_ENABLED=true`
2. Verify `MATRIX_BRIDGE_ROOM` is set
3. Ensure Matrix credentials are configured
4. Check Matrix homeserver is reachable
### Messages not received
1. Verify agent is in the Matrix room
2. Check message format: `[@AgentName] content`
3. Ensure `HERMES_AGENT_NAME` matches agent name
4. Check Matrix sync is running
### Agent not found
1. Verify agent has joined the bridge room
2. Check agent name matches exactly (case-sensitive)
3. Ensure agent has announced presence
## Related
- Issue #747: feat: multi-agent conversation bridge via Matrix
- Matrix Gateway: `gateway/platforms/matrix.py`
- Multi-Agent Orchestration: `docs/multi-agent-orchestration.md`

View File

@@ -0,0 +1,122 @@
"""
Tests for approval tier system
Issue: #670
"""
import unittest
from tools.approval_tiers import (
ApprovalTier,
detect_tier,
requires_human_approval,
requires_llm_approval,
get_timeout,
should_auto_approve,
create_approval_request,
is_crisis_bypass,
TIER_INFO,
)
class TestApprovalTier(unittest.TestCase):
def test_tier_values(self):
self.assertEqual(ApprovalTier.SAFE, 0)
self.assertEqual(ApprovalTier.LOW, 1)
self.assertEqual(ApprovalTier.MEDIUM, 2)
self.assertEqual(ApprovalTier.HIGH, 3)
self.assertEqual(ApprovalTier.CRITICAL, 4)
class TestTierDetection(unittest.TestCase):
def test_safe_actions(self):
self.assertEqual(detect_tier("read_file"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("web_search"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("session_search"), ApprovalTier.SAFE)
def test_low_actions(self):
self.assertEqual(detect_tier("write_file"), ApprovalTier.LOW)
self.assertEqual(detect_tier("terminal"), ApprovalTier.LOW)
self.assertEqual(detect_tier("execute_code"), ApprovalTier.LOW)
def test_medium_actions(self):
self.assertEqual(detect_tier("send_message"), ApprovalTier.MEDIUM)
self.assertEqual(detect_tier("git_push"), ApprovalTier.MEDIUM)
def test_high_actions(self):
self.assertEqual(detect_tier("config_change"), ApprovalTier.HIGH)
self.assertEqual(detect_tier("key_rotation"), ApprovalTier.HIGH)
def test_critical_actions(self):
self.assertEqual(detect_tier("kill_process"), ApprovalTier.CRITICAL)
self.assertEqual(detect_tier("shutdown"), ApprovalTier.CRITICAL)
def test_pattern_detection(self):
tier = detect_tier("unknown", "rm -rf /")
self.assertEqual(tier, ApprovalTier.CRITICAL)
tier = detect_tier("unknown", "sudo apt install")
self.assertEqual(tier, ApprovalTier.MEDIUM)
class TestTierInfo(unittest.TestCase):
def test_safe_no_approval(self):
self.assertFalse(requires_human_approval(ApprovalTier.SAFE))
self.assertFalse(requires_llm_approval(ApprovalTier.SAFE))
self.assertIsNone(get_timeout(ApprovalTier.SAFE))
def test_medium_requires_both(self):
self.assertTrue(requires_human_approval(ApprovalTier.MEDIUM))
self.assertTrue(requires_llm_approval(ApprovalTier.MEDIUM))
self.assertEqual(get_timeout(ApprovalTier.MEDIUM), 60)
def test_critical_fast_timeout(self):
self.assertEqual(get_timeout(ApprovalTier.CRITICAL), 10)
class TestAutoApprove(unittest.TestCase):
def test_safe_auto_approves(self):
self.assertTrue(should_auto_approve("read_file"))
self.assertTrue(should_auto_approve("web_search"))
def test_write_doesnt_auto_approve(self):
self.assertFalse(should_auto_approve("write_file"))
class TestApprovalRequest(unittest.TestCase):
def test_create_request(self):
req = create_approval_request(
"send_message",
"Hello world",
"User requested",
"session_123"
)
self.assertEqual(req.tier, ApprovalTier.MEDIUM)
self.assertEqual(req.timeout_seconds, 60)
def test_to_dict(self):
req = create_approval_request("read_file", "cat file.txt", "test", "s1")
d = req.to_dict()
self.assertEqual(d["tier"], 0)
self.assertEqual(d["tier_name"], "Safe")
class TestCrisisBypass(unittest.TestCase):
def test_send_message_bypass(self):
self.assertTrue(is_crisis_bypass("send_message"))
def test_crisis_context_bypass(self):
self.assertTrue(is_crisis_bypass("unknown", "call 988 lifeline"))
self.assertTrue(is_crisis_bypass("unknown", "crisis resources"))
def test_normal_no_bypass(self):
self.assertFalse(is_crisis_bypass("read_file"))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,55 @@
"""
Tests for error classification (#752).
"""
import pytest
from tools.error_classifier import classify_error, ErrorCategory, ErrorClassification
class TestErrorClassification:
def test_timeout_is_retryable(self):
err = Exception("Connection timed out")
result = classify_error(err)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_429_is_retryable(self):
err = Exception("Rate limit exceeded")
result = classify_error(err, response_code=429)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_404_is_permanent(self):
err = Exception("Not found")
result = classify_error(err, response_code=404)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_403_is_permanent(self):
err = Exception("Forbidden")
result = classify_error(err, response_code=403)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_500_is_retryable(self):
err = Exception("Internal server error")
result = classify_error(err, response_code=500)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_schema_error_is_permanent(self):
err = Exception("Schema validation failed")
result = classify_error(err)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_unknown_is_retryable_with_caution(self):
err = Exception("Some unknown error")
result = classify_error(err)
assert result.category == ErrorCategory.UNKNOWN
assert result.should_retry is True
assert result.max_retries == 1
if __name__ == "__main__":
pytest.main([__file__])

136
tests/test_json_repair.py Normal file
View File

@@ -0,0 +1,136 @@
"""Tests for JSON repair module (#781)."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from agent.json_repair import repair_and_load_json, repair_tool_call_arguments, _strip_markdown, _extract_json
class TestValidJSON:
def test_valid_object(self):
assert repair_and_load_json('{"key": "value"}') == {"key": "value"}
def test_valid_array(self):
assert repair_and_load_json('[1, 2, 3]') == [1, 2, 3]
def test_empty_string_returns_default(self):
assert repair_and_load_json('', default=42) == 42
def test_none_returns_default(self):
assert repair_and_load_json(None, default=42) == 42
class TestTrailingComma:
def test_trailing_comma_object(self):
assert repair_and_load_json('{"key": "value",}') == {"key": "value"}
def test_trailing_comma_array(self):
assert repair_and_load_json('[1, 2, 3,]') == [1, 2, 3]
class TestSingleQuotes:
def test_single_quotes(self):
result = repair_and_load_json("{'key': 'value'}")
assert result == {"key": "value"}
class TestUnquotedKeys:
def test_unquoted_keys(self):
result = repair_and_load_json('{key: "value"}')
assert result == {"key": "value"}
class TestTruncation:
def test_missing_closing_brace(self):
result = repair_and_load_json('{"name": "foo", "args": {"x": 1}')
assert result is not None
assert result["name"] == "foo"
def test_missing_closing_bracket(self):
result = repair_and_load_json('[1, 2, 3')
assert result == [1, 2, 3]
class TestMarkdownWrapping:
def test_json_codeblock(self):
text = '```json\n{"key": "value"}\n```'
assert repair_and_load_json(text) == {"key": "value"}
def test_plain_codeblock(self):
text = '```\n{"key": "value"}\n```'
assert repair_and_load_json(text) == {"key": "value"}
class TestTrailingText:
def test_extra_text_after_json(self):
result = repair_and_load_json('{"result": true} some extra text')
assert result == {"result": True}
class TestStripMarkdown:
def test_strip_json_block(self):
assert _strip_markdown('```json\n{"a":1}\n```') == '{"a":1}'
def test_strip_plain_block(self):
assert _strip_markdown('```\n{"a":1}\n```') == '{"a":1}'
class TestExtractJSON:
def test_extract_from_text(self):
assert _extract_json('prefix {"key": 1} suffix') == '{"key": 1}'
def test_nested(self):
assert _extract_json('{"a": {"b": 1}}') == '{"a": {"b": 1}}'
class TestRepairToolCallArguments:
def test_valid_args(self):
args, repaired = repair_tool_call_arguments('{"x": 1}')
assert args == '{"x": 1}'
assert not repaired
def test_malformed_args(self):
args, repaired = repair_tool_call_arguments("{x: 1}")
assert repaired
assert '"x"' in args
def test_empty_args(self):
args, repaired = repair_tool_call_arguments('')
assert args == '{}'
assert repaired
class TestFailureModes:
def test_truncation_most_dangerous(self):
# LLM cut off mid-string
raw = '{"path": "test.py", "content": "hello'
result = repair_and_load_json(raw)
assert result is not None
assert result.get("path") == "test.py"
def test_empty_arguments(self):
# Auto-coerced to {}
result = repair_and_load_json('')
assert result is None # Returns default (None)
def test_non_json_text(self):
result = repair_and_load_json('not json at all', default={})
assert result == {}
if __name__ == "__main__":
import inspect
tests = []
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and name.startswith('Test'):
for method_name, method in inspect.getmembers(obj):
if method_name.startswith('test_'):
tests.append((f"{name}.{method_name}", method))
for name, test in tests:
print(f"Running {name}...")
test(None)
print(" PASS")
print(f"\n{len(tests)} tests passed.")

View File

@@ -1,114 +0,0 @@
"""Tests for Matrix Bridge — Issue #747."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from agent.matrix_bridge import MatrixBridge, AgentMessage, AgentRegistry
class TestMessageParsing:
"""Test message format parsing."""
def test_is_for_me_direct(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge._is_for_me("[@Timmy] Hello") == True
def test_is_not_for_me(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge._is_for_me("[@Allegro] Hello") == False
def test_is_broadcast(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge._is_for_me("[@*] Broadcast") == True
def test_extract_content(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge._extract_content("[@Timmy] Hello world") == "Hello world"
def test_extract_content_multiline(self):
bridge = MatrixBridge(agent_name="Timmy")
content = bridge._extract_content("[@Timmy] Line 1\nLine 2")
assert content == "Line 1\nLine 2"
class TestAgentMessage:
"""Test AgentMessage dataclass."""
def test_to_dict(self):
msg = AgentMessage(
sender="Timmy",
recipient="Allegro",
content="Hello",
timestamp=1234567890.0,
)
d = msg.to_dict()
assert d["sender"] == "Timmy"
assert d["recipient"] == "Allegro"
assert d["content"] == "Hello"
def test_from_dict(self):
d = {
"sender": "Timmy",
"recipient": "Allegro",
"content": "Hello",
"timestamp": 1234567890.0,
"message_id": "",
"room_id": "",
}
msg = AgentMessage.from_dict(d)
assert msg.sender == "Timmy"
assert msg.recipient == "Allegro"
class TestAgentRegistry:
"""Test AgentRegistry."""
def test_register(self):
registry = AgentRegistry()
registry.register("Timmy", capabilities=["code", "review"])
agent = registry.get_agent("Timmy")
assert agent["name"] == "Timmy"
assert "code" in agent["capabilities"]
def test_list_agents(self):
registry = AgentRegistry()
registry.register("Timmy")
registry.register("Allegro")
agents = registry.list_agents()
assert len(agents) == 2
def test_find_with_capability(self):
registry = AgentRegistry()
registry.register("Timmy", capabilities=["code"])
registry.register("Allegro", capabilities=["monitoring"])
coders = registry.find_agents_with_capability("code")
assert "Timmy" in coders
assert "Allegro" not in coders
def test_unregister(self):
registry = AgentRegistry()
registry.register("Timmy")
registry.unregister("Timmy")
agent = registry.get_agent("Timmy")
assert agent["status"] == "offline"
class TestBridgeInit:
"""Test bridge initialization."""
def test_default_agent_name(self):
bridge = MatrixBridge()
assert bridge.agent_name == "Hermes"
def test_custom_agent_name(self):
bridge = MatrixBridge(agent_name="Timmy")
assert bridge.agent_name == "Timmy"
def test_known_agents_empty(self):
bridge = MatrixBridge()
assert len(bridge.get_known_agents()) == 0
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,82 @@
"""Tests for Reader-Guided Reranking (RIDER) — issue #666."""
import pytest
from unittest.mock import MagicMock, patch
from agent.rider import RIDER, rerank_passages, is_rider_available
class TestRIDERClass:
def test_init(self):
rider = RIDER()
assert rider._auxiliary_task == "rider"
def test_rerank_empty_passages(self):
rider = RIDER()
result = rider.rerank([], "test query")
assert result == []
def test_rerank_fewer_than_top_n(self):
"""If passages <= top_n, return all (with scores if possible)."""
rider = RIDER()
passages = [{"content": "test content", "session_id": "s1"}]
result = rider.rerank(passages, "test query", top_n=3)
assert len(result) == 1
@patch("agent.rider.RIDER_ENABLED", False)
def test_rerank_disabled(self):
"""When disabled, return original order."""
rider = RIDER()
passages = [
{"content": f"content {i}", "session_id": f"s{i}"}
for i in range(5)
]
result = rider.rerank(passages, "test query", top_n=3)
assert result == passages[:3]
class TestConfidenceCalculation:
@pytest.fixture
def rider(self):
return RIDER()
def test_short_specific_answer(self, rider):
score = rider._calculate_confidence("Paris", "What is the capital of France?", "Paris is the capital of France.")
assert score > 0.5
def test_hedged_answer(self, rider):
score = rider._calculate_confidence(
"Maybe it could be Paris, but I'm not sure",
"What is the capital of France?",
"Paris is the capital.",
)
assert score < 0.5
def test_passage_grounding(self, rider):
score = rider._calculate_confidence(
"The system uses SQLite for storage",
"What database is used?",
"The system uses SQLite for persistent storage with FTS5 indexing.",
)
assert score > 0.5
def test_refusal_penalty(self, rider):
score = rider._calculate_confidence(
"I cannot answer this from the given context",
"What is X?",
"Some unrelated content",
)
assert score < 0.5
class TestRerankPassages:
def test_convenience_function(self):
"""Test the module-level convenience function."""
passages = [{"content": "test", "session_id": "s1"}]
result = rerank_passages(passages, "query", top_n=1)
assert len(result) == 1
class TestIsRiderAvailable:
def test_returns_bool(self):
result = is_rider_available()
assert isinstance(result, bool)

261
tools/approval_tiers.py Normal file
View File

@@ -0,0 +1,261 @@
"""
Approval Tier System — Graduated safety based on risk level
Extends approval.py with 5-tier system for command approval.
| Tier | Action | Human | LLM | Timeout |
|------|-----------------|-------|-----|---------|
| 0 | Read, search | No | No | N/A |
| 1 | Write, scripts | No | Yes | N/A |
| 2 | Messages, API | Yes | Yes | 60s |
| 3 | Crypto, config | Yes | Yes | 30s |
| 4 | Crisis | Yes | Yes | 10s |
Issue: #670
"""
import re
from dataclasses import dataclass
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
class ApprovalTier(IntEnum):
"""Approval tiers based on risk level."""
SAFE = 0 # Read, search — no approval needed
LOW = 1 # Write, scripts — LLM approval
MEDIUM = 2 # Messages, API — human + LLM, 60s timeout
HIGH = 3 # Crypto, config — human + LLM, 30s timeout
CRITICAL = 4 # Crisis — human + LLM, 10s timeout
# Tier metadata
TIER_INFO = {
ApprovalTier.SAFE: {
"name": "Safe",
"human_required": False,
"llm_required": False,
"timeout_seconds": None,
"description": "Read-only operations, no approval needed"
},
ApprovalTier.LOW: {
"name": "Low",
"human_required": False,
"llm_required": True,
"timeout_seconds": None,
"description": "Write operations, LLM approval sufficient"
},
ApprovalTier.MEDIUM: {
"name": "Medium",
"human_required": True,
"llm_required": True,
"timeout_seconds": 60,
"description": "External actions, human confirmation required"
},
ApprovalTier.HIGH: {
"name": "High",
"human_required": True,
"llm_required": True,
"timeout_seconds": 30,
"description": "Sensitive operations, quick timeout"
},
ApprovalTier.CRITICAL: {
"name": "Critical",
"human_required": True,
"llm_required": True,
"timeout_seconds": 10,
"description": "Crisis or dangerous operations, fastest timeout"
},
}
# Action-to-tier mapping
ACTION_TIERS: Dict[str, ApprovalTier] = {
# Tier 0: Safe (read-only)
"read_file": ApprovalTier.SAFE,
"search_files": ApprovalTier.SAFE,
"web_search": ApprovalTier.SAFE,
"session_search": ApprovalTier.SAFE,
"list_files": ApprovalTier.SAFE,
"get_file_content": ApprovalTier.SAFE,
"memory_search": ApprovalTier.SAFE,
"skills_list": ApprovalTier.SAFE,
"skills_search": ApprovalTier.SAFE,
# Tier 1: Low (write operations)
"write_file": ApprovalTier.LOW,
"create_file": ApprovalTier.LOW,
"patch_file": ApprovalTier.LOW,
"delete_file": ApprovalTier.LOW,
"execute_code": ApprovalTier.LOW,
"terminal": ApprovalTier.LOW,
"run_script": ApprovalTier.LOW,
"skill_install": ApprovalTier.LOW,
# Tier 2: Medium (external actions)
"send_message": ApprovalTier.MEDIUM,
"web_fetch": ApprovalTier.MEDIUM,
"browser_navigate": ApprovalTier.MEDIUM,
"api_call": ApprovalTier.MEDIUM,
"gitea_create_issue": ApprovalTier.MEDIUM,
"gitea_create_pr": ApprovalTier.MEDIUM,
"git_push": ApprovalTier.MEDIUM,
"deploy": ApprovalTier.MEDIUM,
# Tier 3: High (sensitive operations)
"config_change": ApprovalTier.HIGH,
"env_change": ApprovalTier.HIGH,
"key_rotation": ApprovalTier.HIGH,
"access_grant": ApprovalTier.HIGH,
"permission_change": ApprovalTier.HIGH,
"backup_restore": ApprovalTier.HIGH,
# Tier 4: Critical (crisis/dangerous)
"kill_process": ApprovalTier.CRITICAL,
"rm_rf": ApprovalTier.CRITICAL,
"format_disk": ApprovalTier.CRITICAL,
"shutdown": ApprovalTier.CRITICAL,
"crisis_override": ApprovalTier.CRITICAL,
}
# Dangerous command patterns (from existing approval.py)
_DANGEROUS_PATTERNS = [
(r"rm\s+-rf\s+/", ApprovalTier.CRITICAL),
(r"mkfs\.", ApprovalTier.CRITICAL),
(r"dd\s+if=.*of=/dev/", ApprovalTier.CRITICAL),
(r"shutdown|reboot|halt", ApprovalTier.CRITICAL),
(r"chmod\s+777", ApprovalTier.HIGH),
(r"curl.*\|\s*bash", ApprovalTier.HIGH),
(r"wget.*\|\s*sh", ApprovalTier.HIGH),
(r"eval\s*\(", ApprovalTier.HIGH),
(r"sudo\s+", ApprovalTier.MEDIUM),
(r"git\s+push.*--force", ApprovalTier.HIGH),
(r"docker\s+rm.*-f", ApprovalTier.MEDIUM),
(r"kubectl\s+delete", ApprovalTier.HIGH),
]
@dataclass
class ApprovalRequest:
"""A request for approval."""
action: str
tier: ApprovalTier
command: str
reason: str
session_key: str
timeout_seconds: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
return {
"action": self.action,
"tier": self.tier.value,
"tier_name": TIER_INFO[self.tier]["name"],
"command": self.command,
"reason": self.reason,
"session_key": self.session_key,
"timeout": self.timeout_seconds,
"human_required": TIER_INFO[self.tier]["human_required"],
"llm_required": TIER_INFO[self.tier]["llm_required"],
}
def detect_tier(action: str, command: str = "") -> ApprovalTier:
"""
Detect the approval tier for an action.
Checks action name first, then falls back to pattern matching.
"""
# Direct action mapping
if action in ACTION_TIERS:
return ACTION_TIERS[action]
# Pattern matching on command
if command:
for pattern, tier in _DANGEROUS_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return tier
# Default to LOW for unknown actions
return ApprovalTier.LOW
def requires_human_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires human approval."""
return TIER_INFO[tier]["human_required"]
def requires_llm_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires LLM approval."""
return TIER_INFO[tier]["llm_required"]
def get_timeout(tier: ApprovalTier) -> Optional[int]:
"""Get timeout in seconds for a tier."""
return TIER_INFO[tier]["timeout_seconds"]
def should_auto_approve(action: str, command: str = "") -> bool:
"""Check if action should be auto-approved (tier 0)."""
tier = detect_tier(action, command)
return tier == ApprovalTier.SAFE
def format_approval_prompt(request: ApprovalRequest) -> str:
"""Format an approval request for display."""
info = TIER_INFO[request.tier]
lines = []
lines.append(f"⚠️ Approval Required (Tier {request.tier.value}: {info['name']})")
lines.append(f"")
lines.append(f"Action: {request.action}")
lines.append(f"Command: {request.command[:100]}{'...' if len(request.command) > 100 else ''}")
lines.append(f"Reason: {request.reason}")
lines.append(f"")
if info["human_required"]:
lines.append(f"👤 Human approval required")
if info["llm_required"]:
lines.append(f"🤖 LLM approval required")
if info["timeout_seconds"]:
lines.append(f"⏱️ Timeout: {info['timeout_seconds']}s")
return "\n".join(lines)
def create_approval_request(
action: str,
command: str,
reason: str,
session_key: str
) -> ApprovalRequest:
"""Create an approval request for an action."""
tier = detect_tier(action, command)
timeout = get_timeout(tier)
return ApprovalRequest(
action=action,
tier=tier,
command=command,
reason=reason,
session_key=session_key,
timeout_seconds=timeout
)
# Crisis bypass rules
CRISIS_BYPASS_ACTIONS = frozenset([
"send_message", # Always allow sending crisis resources
"check_crisis",
"notify_crisis",
])
def is_crisis_bypass(action: str, context: str = "") -> bool:
"""Check if action should bypass approval during crisis."""
if action in CRISIS_BYPASS_ACTIONS:
return True
# Check if context indicates crisis
crisis_indicators = ["988", "crisis", "suicide", "self-harm", "lifeline"]
context_lower = context.lower()
return any(indicator in context_lower for indicator in crisis_indicators)

233
tools/error_classifier.py Normal file
View File

@@ -0,0 +1,233 @@
"""
Tool Error Classification — Retryable vs Permanent.
Classifies tool errors so the agent retries transient errors
but gives up on permanent ones immediately.
"""
import logging
import re
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class ErrorCategory(Enum):
"""Error category classification."""
RETRYABLE = "retryable"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class ErrorClassification:
"""Result of error classification."""
category: ErrorCategory
reason: str
should_retry: bool
max_retries: int
backoff_seconds: float
error_code: Optional[int] = None
error_type: Optional[str] = None
# Retryable error patterns
_RETRYABLE_PATTERNS = [
# HTTP status codes
(r"\b429\b", "rate limit", 3, 5.0),
(r"\b500\b", "server error", 3, 2.0),
(r"\b502\b", "bad gateway", 3, 2.0),
(r"\b503\b", "service unavailable", 3, 5.0),
(r"\b504\b", "gateway timeout", 3, 5.0),
# Timeout patterns
(r"timeout", "timeout", 3, 2.0),
(r"timed out", "timeout", 3, 2.0),
(r"TimeoutExpired", "timeout", 3, 2.0),
# Connection errors
(r"connection refused", "connection refused", 2, 5.0),
(r"connection reset", "connection reset", 2, 2.0),
(r"network unreachable", "network unreachable", 2, 10.0),
(r"DNS", "DNS error", 2, 5.0),
# Transient errors
(r"temporary", "temporary error", 2, 2.0),
(r"transient", "transient error", 2, 2.0),
(r"retry", "retryable", 2, 2.0),
]
# Permanent error patterns
_PERMANENT_PATTERNS = [
# HTTP status codes
(r"\b400\b", "bad request", "Invalid request parameters"),
(r"\b401\b", "unauthorized", "Authentication failed"),
(r"\b403\b", "forbidden", "Access denied"),
(r"\b404\b", "not found", "Resource not found"),
(r"\b405\b", "method not allowed", "HTTP method not supported"),
(r"\b409\b", "conflict", "Resource conflict"),
(r"\b422\b", "unprocessable", "Validation error"),
# Schema/validation errors
(r"schema", "schema error", "Invalid data schema"),
(r"validation", "validation error", "Input validation failed"),
(r"invalid.*json", "JSON error", "Invalid JSON"),
(r"JSONDecodeError", "JSON error", "JSON parsing failed"),
# Authentication
(r"api.?key", "API key error", "Invalid or missing API key"),
(r"token.*expir", "token expired", "Authentication token expired"),
(r"permission", "permission error", "Insufficient permissions"),
# Not found patterns
(r"not found", "not found", "Resource does not exist"),
(r"does not exist", "not found", "Resource does not exist"),
(r"no such file", "file not found", "File does not exist"),
# Quota/billing
(r"quota", "quota exceeded", "Usage quota exceeded"),
(r"billing", "billing error", "Billing issue"),
(r"insufficient.*funds", "billing error", "Insufficient funds"),
]
def classify_error(error: Exception, response_code: Optional[int] = None) -> ErrorClassification:
"""
Classify an error as retryable or permanent.
Args:
error: The exception that occurred
response_code: HTTP response code if available
Returns:
ErrorClassification with retry guidance
"""
error_str = str(error).lower()
error_type = type(error).__name__
# Check response code first
if response_code:
if response_code in (429, 500, 502, 503, 504):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=f"HTTP {response_code} - transient server error",
should_retry=True,
max_retries=3,
backoff_seconds=5.0 if response_code == 429 else 2.0,
error_code=response_code,
error_type=error_type,
)
elif response_code in (400, 401, 403, 404, 405, 409, 422):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=f"HTTP {response_code} - client error",
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_code=response_code,
error_type=error_type,
)
# Check retryable patterns
for pattern, reason, max_retries, backoff in _RETRYABLE_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=reason,
should_retry=True,
max_retries=max_retries,
backoff_seconds=backoff,
error_type=error_type,
)
# Check permanent patterns
for pattern, error_code, reason in _PERMANENT_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=reason,
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_type=error_type,
)
# Default: unknown, treat as retryable with caution
return ErrorClassification(
category=ErrorCategory.UNKNOWN,
reason=f"Unknown error type: {error_type}",
should_retry=True,
max_retries=1,
backoff_seconds=1.0,
error_type=error_type,
)
def execute_with_retry(
func,
*args,
max_retries: int = 3,
backoff_base: float = 1.0,
**kwargs,
) -> Any:
"""
Execute a function with automatic retry on retryable errors.
Args:
func: Function to execute
*args: Function arguments
max_retries: Maximum retry attempts
backoff_base: Base backoff time in seconds
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
Exception: If permanent error or max retries exceeded
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
# Classify the error
classification = classify_error(e)
logger.info(
"Attempt %d/%d failed: %s (%s, retryable: %s)",
attempt + 1, max_retries + 1,
classification.reason,
classification.category.value,
classification.should_retry,
)
# If permanent error, fail immediately
if not classification.should_retry:
logger.error("Permanent error: %s", classification.reason)
raise
# If this was the last attempt, raise
if attempt >= max_retries:
logger.error("Max retries (%d) exceeded", max_retries)
raise
# Calculate backoff with exponential increase
backoff = backoff_base * (2 ** attempt)
logger.info("Retrying in %.1fs...", backoff)
time.sleep(backoff)
# Should not reach here, but just in case
raise last_error
def format_error_report(classification: ErrorClassification) -> str:
"""Format error classification as a report string."""
icon = "🔄" if classification.should_retry else ""
return f"{icon} {classification.category.value}: {classification.reason}"

View File

@@ -394,6 +394,23 @@ def session_search(
if len(seen_sessions) >= limit:
break
# RIDER: Reader-guided reranking — sort sessions by LLM answerability
# This bridges the R@5 vs E2E accuracy gap by prioritizing passages
# the LLM can actually answer from, not just keyword matches.
try:
from agent.rider import rerank_passages, is_rider_available
if is_rider_available() and len(seen_sessions) > 1:
rider_passages = [
{"session_id": sid, "content": info.get("snippet", ""), "rank": i + 1}
for i, (sid, info) in enumerate(seen_sessions.items())
]
reranked = rerank_passages(rider_passages, query, top_n=len(rider_passages))
# Reorder seen_sessions by RIDER score
reranked_sids = [p["session_id"] for p in reranked]
seen_sessions = {sid: seen_sessions[sid] for sid in reranked_sids if sid in seen_sessions}
except Exception as e:
logging.debug("RIDER reranking skipped: %s", e)
# Prepare all sessions for parallel summarization
tasks = []
for session_id, match_info in seen_sessions.items():