Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
d1fb50bf2f feat: add Anthropic transport abstraction slice (#951)
All checks were successful
Lint / lint (pull_request) Successful in 8s
- add transport registry, shared transport dataclasses, and AnthropicTransport
- add normalize_anthropic_response_v2 as the bridge from existing Anthropic normalization to shared transport types
- extend Anthropic stop-reason mapping for refusal and model_context_window_exceeded
- add targeted transport and v2 normalization regression tests

Closes #951
Refs #949
2026-04-22 11:20:20 -04:00
13 changed files with 1208 additions and 914 deletions

View File

@@ -1396,6 +1396,8 @@ def normalize_anthropic_response(
"tool_use": "tool_calls",
"max_tokens": "length",
"stop_sequence": "stop",
"refusal": "content_filter",
"model_context_window_exceeded": "length",
}
finish_reason = stop_reason_map.get(response.stop_reason, "stop")
@@ -1409,3 +1411,42 @@ def normalize_anthropic_response(
),
finish_reason,
)
def normalize_anthropic_response_v2(
response,
strip_tool_prefix: bool = False,
) -> "NormalizedResponse":
"""Normalize Anthropic response to NormalizedResponse.
Wraps the existing normalize_anthropic_response() and maps its output
to the shared transport types. This allows incremental migration
without disturbing the legacy call sites.
"""
from agent.transports.types import NormalizedResponse, build_tool_call
assistant_msg, finish_reason = normalize_anthropic_response(response, strip_tool_prefix)
tool_calls = None
if assistant_msg.tool_calls:
tool_calls = [
build_tool_call(
id=tc.id,
name=tc.function.name,
arguments=tc.function.arguments,
)
for tc in assistant_msg.tool_calls
]
provider_data = {}
if getattr(assistant_msg, "reasoning_details", None):
provider_data["reasoning_details"] = assistant_msg.reasoning_details
return NormalizedResponse(
content=assistant_msg.content,
tool_calls=tool_calls,
finish_reason=finish_reason,
reasoning=getattr(assistant_msg, "reasoning", None),
usage=None,
provider_data=provider_data or None,
)

View File

@@ -0,0 +1,57 @@
"""Transport layer types and registry for provider response normalization.
Usage:
from agent.transports import get_transport
transport = get_transport("anthropic_messages")
result = transport.normalize_response(raw_response)
"""
from agent.transports.types import ( # noqa: F401
NormalizedResponse,
ToolCall,
Usage,
build_tool_call,
map_finish_reason,
)
_REGISTRY: dict = {}
def register_transport(api_mode: str, transport_cls: type) -> None:
"""Register a transport class for an api_mode string."""
_REGISTRY[api_mode] = transport_cls
def get_transport(api_mode: str):
"""Get a transport instance for the given api_mode.
Returns None if no transport is registered for this api_mode.
This allows gradual migration — call sites can check for None
and fall back to the legacy code path.
"""
if not _REGISTRY:
_discover_transports()
cls = _REGISTRY.get(api_mode)
if cls is None:
return None
return cls()
def _discover_transports() -> None:
"""Import all transport modules to trigger auto-registration."""
try:
import agent.transports.anthropic # noqa: F401
except ImportError:
pass
try:
import agent.transports.codex # noqa: F401
except ImportError:
pass
try:
import agent.transports.chat_completions # noqa: F401
except ImportError:
pass
try:
import agent.transports.bedrock # noqa: F401
except ImportError:
pass

View File

@@ -0,0 +1,95 @@
"""Anthropic Messages API transport.
Delegates to the existing adapter functions in agent/anthropic_adapter.py.
This transport owns format conversion and normalization — NOT client lifecycle.
"""
from typing import Any, Dict, List, Optional
from agent.transports.base import ProviderTransport
from agent.transports.types import NormalizedResponse
class AnthropicTransport(ProviderTransport):
"""Transport for api_mode='anthropic_messages'."""
@property
def api_mode(self) -> str:
return "anthropic_messages"
def convert_messages(self, messages: List[Dict[str, Any]], **kwargs) -> Any:
from agent.anthropic_adapter import convert_messages_to_anthropic
base_url = kwargs.get("base_url")
return convert_messages_to_anthropic(messages, base_url=base_url)
def convert_tools(self, tools: List[Dict[str, Any]]) -> Any:
from agent.anthropic_adapter import convert_tools_to_anthropic
return convert_tools_to_anthropic(tools)
def build_kwargs(
self,
model: str,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
**params,
) -> Dict[str, Any]:
from agent.anthropic_adapter import build_anthropic_kwargs
return build_anthropic_kwargs(
model=model,
messages=messages,
tools=tools,
max_tokens=params.get("max_tokens", 16384),
reasoning_config=params.get("reasoning_config"),
tool_choice=params.get("tool_choice"),
is_oauth=params.get("is_oauth", False),
preserve_dots=params.get("preserve_dots", False),
context_length=params.get("context_length"),
base_url=params.get("base_url"),
fast_mode=params.get("fast_mode", False),
)
def normalize_response(self, response: Any, **kwargs) -> NormalizedResponse:
from agent.anthropic_adapter import normalize_anthropic_response_v2
strip_tool_prefix = kwargs.get("strip_tool_prefix", False)
return normalize_anthropic_response_v2(response, strip_tool_prefix=strip_tool_prefix)
def validate_response(self, response: Any) -> bool:
if response is None:
return False
content_blocks = getattr(response, "content", None)
if not isinstance(content_blocks, list):
return False
if not content_blocks:
return False
return True
def extract_cache_stats(self, response: Any):
usage = getattr(response, "usage", None)
if usage is None:
return None
cached = getattr(usage, "cache_read_input_tokens", 0) or 0
written = getattr(usage, "cache_creation_input_tokens", 0) or 0
if cached or written:
return {"cached_tokens": cached, "creation_tokens": written}
return None
_STOP_REASON_MAP = {
"end_turn": "stop",
"tool_use": "tool_calls",
"max_tokens": "length",
"stop_sequence": "stop",
"refusal": "content_filter",
"model_context_window_exceeded": "length",
}
def map_finish_reason(self, raw_reason: str) -> str:
return self._STOP_REASON_MAP.get(raw_reason, "stop")
from agent.transports import register_transport # noqa: E402
register_transport("anthropic_messages", AnthropicTransport)

61
agent/transports/base.py Normal file
View File

@@ -0,0 +1,61 @@
"""Abstract base for provider transports.
A transport owns the data path for one api_mode:
convert_messages → convert_tools → build_kwargs → normalize_response
It does NOT own: client construction, streaming, credential refresh,
prompt caching, interrupt handling, or retry logic. Those stay on AIAgent.
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from agent.transports.types import NormalizedResponse
class ProviderTransport(ABC):
"""Base class for provider-specific format conversion and normalization."""
@property
@abstractmethod
def api_mode(self) -> str:
"""The api_mode string this transport handles."""
...
@abstractmethod
def convert_messages(self, messages: List[Dict[str, Any]], **kwargs) -> Any:
"""Convert OpenAI-format messages to provider-native format."""
...
@abstractmethod
def convert_tools(self, tools: List[Dict[str, Any]]) -> Any:
"""Convert OpenAI-format tool definitions to provider-native format."""
...
@abstractmethod
def build_kwargs(
self,
model: str,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
**params,
) -> Dict[str, Any]:
"""Build the complete provider kwargs dict."""
...
@abstractmethod
def normalize_response(self, response: Any, **kwargs) -> NormalizedResponse:
"""Normalize a raw provider response to the shared NormalizedResponse type."""
...
def validate_response(self, response: Any) -> bool:
"""Optional structural validation for raw responses."""
return True
def extract_cache_stats(self, response: Any) -> Optional[Dict[str, int]]:
"""Optional cache stats extraction."""
return None
def map_finish_reason(self, raw_reason: str) -> str:
"""Optional stop-reason mapping. Defaults to passthrough."""
return raw_reason

58
agent/transports/types.py Normal file
View File

@@ -0,0 +1,58 @@
"""Shared types for normalized provider responses."""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
@dataclass
class ToolCall:
"""A normalized tool call from any provider."""
id: Optional[str]
name: str
arguments: str
provider_data: Optional[Dict[str, Any]] = field(default=None, repr=False)
@dataclass
class Usage:
"""Token usage from an API response."""
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
cached_tokens: int = 0
@dataclass
class NormalizedResponse:
"""Normalized API response from any provider."""
content: Optional[str]
tool_calls: Optional[List[ToolCall]]
finish_reason: str
reasoning: Optional[str] = None
usage: Optional[Usage] = None
provider_data: Optional[Dict[str, Any]] = field(default=None, repr=False)
def build_tool_call(
id: Optional[str],
name: str,
arguments: Any,
**provider_fields: Any,
) -> ToolCall:
"""Build a ToolCall, auto-serialising dict arguments."""
args_str = json.dumps(arguments) if isinstance(arguments, dict) else str(arguments)
provider_data = dict(provider_fields) if provider_fields else None
return ToolCall(id=id, name=name, arguments=args_str, provider_data=provider_data)
def map_finish_reason(reason: Optional[str], mapping: Dict[str, str]) -> str:
"""Translate a provider-specific stop reason to the normalized set."""
if reason is None:
return "stop"
return mapping.get(reason, "stop")

View File

@@ -55,7 +55,7 @@ FACT_STORE_SCHEMA = {
"properties": {
"action": {
"type": "string",
"enum": ["add", "search", "probe", "related", "reason", "contradict", "trace", "update", "remove", "list"],
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list"],
},
"content": {"type": "string", "description": "Fact content (required for 'add')."},
"query": {"type": "string", "description": "Search query (required for 'search')."},
@@ -67,13 +67,6 @@ FACT_STORE_SCHEMA = {
"trust_delta": {"type": "number", "description": "Trust adjustment for 'update'."},
"min_trust": {"type": "number", "description": "Minimum trust filter (default: 0.3)."},
"limit": {"type": "integer", "description": "Max results (default: 10)."},
"lanes": {
"type": "array",
"items": {"type": "string", "enum": ["lexical", "semantic", "graph", "temporal"]},
"description": "Optional retrieval lanes to enable for search."
},
"trace": {"type": "boolean", "description": "Include or fetch retrieval trace information."},
"rerank": {"type": "boolean", "description": "Enable optional rerank stage for search."},
},
"required": ["action"],
},
@@ -126,9 +119,6 @@ class HolographicMemoryProvider(MemoryProvider):
self._store = None
self._retriever = None
self._min_trust = float(self._config.get("min_trust_threshold", 0.3))
self._retrieval_lanes = self._parse_retrieval_lanes(self._config.get("retrieval_lanes"))
self._enable_rerank = str(self._config.get("enable_rerank", "true")).lower() != "false"
self._last_retrieval_trace: dict | None = None
@property
def name(self) -> str:
@@ -154,14 +144,6 @@ class HolographicMemoryProvider(MemoryProvider):
except Exception:
pass
def _parse_retrieval_lanes(self, value) -> list[str]:
if isinstance(value, str):
value = [part.strip() for part in value.split(",") if part.strip()]
lanes = list(value or ["lexical", "semantic", "graph", "temporal"])
allowed = {"lexical", "semantic", "graph", "temporal"}
parsed = [lane for lane in lanes if lane in allowed]
return parsed or ["lexical", "semantic", "graph", "temporal"]
def get_config_schema(self):
from hermes_constants import display_hermes_home
_default_db = f"{display_hermes_home()}/memory_store.db"
@@ -170,10 +152,6 @@ class HolographicMemoryProvider(MemoryProvider):
{"key": "auto_extract", "description": "Auto-extract facts at session end", "default": "false", "choices": ["true", "false"]},
{"key": "default_trust", "description": "Default trust score for new facts", "default": "0.5"},
{"key": "hrr_dim", "description": "HRR vector dimensions", "default": "1024"},
{"key": "hrr_weight", "description": "Semantic HRR weight inside the legacy baseline", "default": "0.3"},
{"key": "temporal_decay_half_life", "description": "Temporal decay half-life in days (0 disables baseline decay)", "default": "0"},
{"key": "retrieval_lanes", "description": "Comma-separated retrieval lanes (lexical,semantic,graph,temporal)", "default": "lexical,semantic,graph,temporal"},
{"key": "enable_rerank", "description": "Enable optional local rerank stage", "default": "true", "choices": ["true", "false"]},
]
def initialize(self, session_id: str, **kwargs) -> None:
@@ -191,8 +169,6 @@ class HolographicMemoryProvider(MemoryProvider):
hrr_dim = int(self._config.get("hrr_dim", 1024))
hrr_weight = float(self._config.get("hrr_weight", 0.3))
temporal_decay = int(self._config.get("temporal_decay_half_life", 0))
self._retrieval_lanes = self._parse_retrieval_lanes(self._config.get("retrieval_lanes", self._retrieval_lanes))
self._enable_rerank = str(self._config.get("enable_rerank", self._enable_rerank)).lower() != "false"
self._store = MemoryStore(db_path=db_path, default_trust=default_trust, hrr_dim=hrr_dim)
self._retriever = FactRetriever(
@@ -200,8 +176,6 @@ class HolographicMemoryProvider(MemoryProvider):
temporal_decay_half_life=temporal_decay,
hrr_weight=hrr_weight,
hrr_dim=hrr_dim,
retrieval_lanes=self._retrieval_lanes,
enable_rerank=self._enable_rerank,
)
self._session_id = session_id
@@ -232,23 +206,13 @@ class HolographicMemoryProvider(MemoryProvider):
if not self._retriever or not query:
return ""
try:
payload = self._retriever.search_with_trace(
query,
min_trust=self._min_trust,
limit=5,
lanes=self._retrieval_lanes,
rerank=self._enable_rerank,
)
self._last_retrieval_trace = payload["trace"]
results = payload["results"]
results = self._retriever.search(query, min_trust=self._min_trust, limit=5)
if not results:
return ""
lines = []
for r in results:
trust = r.get("trust_score", r.get("trust", 0))
lanes = ",".join(r.get("matched_lanes", []))
lane_suffix = f" [{lanes}]" if lanes else ""
lines.append(f"- [{trust:.1f}] {r.get('content', '')}{lane_suffix}")
lines.append(f"- [{trust:.1f}] {r.get('content', '')}")
return "## Holographic Memory\n" + "\n".join(lines)
except Exception as e:
logger.debug("Holographic prefetch failed: %s", e)
@@ -306,39 +270,14 @@ class HolographicMemoryProvider(MemoryProvider):
return json.dumps({"fact_id": fact_id, "status": "added"})
elif action == "search":
lanes = args.get("lanes")
rerank = args.get("rerank")
with_trace = bool(args.get("trace", False))
if with_trace:
payload = retriever.search_with_trace(
args["query"],
category=args.get("category"),
min_trust=float(args.get("min_trust", self._min_trust)),
limit=int(args.get("limit", 10)),
lanes=lanes,
rerank=rerank,
)
self._last_retrieval_trace = payload["trace"]
return json.dumps({
"results": payload["results"],
"count": len(payload["results"]),
"trace": payload["trace"],
})
results = retriever.search(
args["query"],
category=args.get("category"),
min_trust=float(args.get("min_trust", self._min_trust)),
limit=int(args.get("limit", 10)),
lanes=lanes,
rerank=rerank,
)
self._last_retrieval_trace = retriever.last_trace
return json.dumps({"results": results, "count": len(results)})
elif action == "trace":
return json.dumps({"trace": self._last_retrieval_trace or retriever.last_trace or {}})
elif action == "probe":
results = retriever.probe(
args["entity"],
@@ -384,8 +323,7 @@ class HolographicMemoryProvider(MemoryProvider):
return json.dumps({"updated": updated})
elif action == "remove":
removed = store.remove_fact(int(args["fact_id"])
)
removed = store.remove_fact(int(args["fact_id"]))
return json.dumps({"removed": removed})
elif action == "list":

File diff suppressed because it is too large Load Diff

View File

@@ -83,7 +83,6 @@ _TRUST_MAX = 1.0
# Entity extraction patterns
_RE_CAPITALIZED = re.compile(r'\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\b')
_RE_SINGLE_PROPER = re.compile(r'\b([A-Z][A-Za-z0-9_-]{2,})\b')
_RE_DOUBLE_QUOTE = re.compile(r'"([^"]+)"')
_RE_SINGLE_QUOTE = re.compile(r"'([^']+)'")
_RE_AKA = re.compile(
@@ -415,13 +414,6 @@ class MemoryStore:
for m in _RE_CAPITALIZED.finditer(text):
_add(m.group(1))
skip_singletons = {"The", "This", "That", "These", "Those", "And", "But", "For", "With"}
for m in _RE_SINGLE_PROPER.finditer(text):
candidate = m.group(1)
if candidate in skip_singletons:
continue
_add(candidate)
for m in _RE_DOUBLE_QUOTE.finditer(text):
_add(m.group(1))

View File

@@ -0,0 +1,213 @@
"""Regression tests: normalize_anthropic_response_v2 vs v1.
Constructs mock Anthropic responses and asserts that the v2 function
(returning NormalizedResponse) produces identical field values to the
original v1 function (returning SimpleNamespace + finish_reason).
"""
from types import SimpleNamespace
import pytest
from agent.anthropic_adapter import (
normalize_anthropic_response,
normalize_anthropic_response_v2,
)
from agent.transports.types import NormalizedResponse
def _text_block(text: str):
return SimpleNamespace(type="text", text=text)
def _thinking_block(thinking: str, signature: str = "sig_abc"):
return SimpleNamespace(type="thinking", thinking=thinking, signature=signature)
def _tool_use_block(id: str, name: str, input: dict):
return SimpleNamespace(type="tool_use", id=id, name=name, input=input)
def _response(content_blocks, stop_reason="end_turn"):
return SimpleNamespace(
content=content_blocks,
stop_reason=stop_reason,
usage=SimpleNamespace(input_tokens=10, output_tokens=5),
)
class TestTextOnly:
def setup_method(self):
self.resp = _response([_text_block("Hello world")])
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_type(self):
assert isinstance(self.v2, NormalizedResponse)
def test_content_matches(self):
assert self.v2.content == self.v1_msg.content
def test_finish_reason_matches(self):
assert self.v2.finish_reason == self.v1_finish
def test_no_tool_calls(self):
assert self.v2.tool_calls is None
assert self.v1_msg.tool_calls is None
def test_no_reasoning(self):
assert self.v2.reasoning is None
assert self.v1_msg.reasoning is None
class TestWithToolCalls:
def setup_method(self):
self.resp = _response(
[
_text_block("I'll check that"),
_tool_use_block("toolu_abc", "terminal", {"command": "ls"}),
_tool_use_block("toolu_def", "read_file", {"path": "/tmp"}),
],
stop_reason="tool_use",
)
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_finish_reason(self):
assert self.v2.finish_reason == "tool_calls"
assert self.v1_finish == "tool_calls"
def test_tool_call_count(self):
assert len(self.v2.tool_calls) == 2
assert len(self.v1_msg.tool_calls) == 2
def test_tool_call_ids_match(self):
for i in range(2):
assert self.v2.tool_calls[i].id == self.v1_msg.tool_calls[i].id
def test_tool_call_names_match(self):
assert self.v2.tool_calls[0].name == "terminal"
assert self.v2.tool_calls[1].name == "read_file"
for i in range(2):
assert self.v2.tool_calls[i].name == self.v1_msg.tool_calls[i].function.name
def test_tool_call_arguments_match(self):
for i in range(2):
assert self.v2.tool_calls[i].arguments == self.v1_msg.tool_calls[i].function.arguments
def test_content_preserved(self):
assert self.v2.content == self.v1_msg.content
assert "check that" in self.v2.content
class TestWithThinking:
def setup_method(self):
self.resp = _response([
_thinking_block("Let me think about this carefully..."),
_text_block("The answer is 42."),
])
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_reasoning_matches(self):
assert self.v2.reasoning == self.v1_msg.reasoning
assert "think about this" in self.v2.reasoning
def test_reasoning_details_in_provider_data(self):
v1_details = self.v1_msg.reasoning_details
v2_details = self.v2.provider_data.get("reasoning_details") if self.v2.provider_data else None
assert v1_details is not None
assert v2_details is not None
assert len(v2_details) == len(v1_details)
def test_content_excludes_thinking(self):
assert self.v2.content == "The answer is 42."
class TestMixed:
def setup_method(self):
self.resp = _response(
[
_thinking_block("Planning my approach..."),
_text_block("I'll run the command"),
_tool_use_block("toolu_xyz", "terminal", {"command": "pwd"}),
],
stop_reason="tool_use",
)
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_all_fields_present(self):
assert self.v2.content is not None
assert self.v2.tool_calls is not None
assert self.v2.reasoning is not None
assert self.v2.finish_reason == "tool_calls"
def test_content_matches(self):
assert self.v2.content == self.v1_msg.content
def test_reasoning_matches(self):
assert self.v2.reasoning == self.v1_msg.reasoning
def test_tool_call_matches(self):
assert self.v2.tool_calls[0].id == self.v1_msg.tool_calls[0].id
assert self.v2.tool_calls[0].name == self.v1_msg.tool_calls[0].function.name
class TestStopReasons:
@pytest.mark.parametrize("stop_reason,expected", [
("end_turn", "stop"),
("tool_use", "tool_calls"),
("max_tokens", "length"),
("stop_sequence", "stop"),
("refusal", "content_filter"),
("model_context_window_exceeded", "length"),
("unknown_future_reason", "stop"),
])
def test_stop_reason_mapping(self, stop_reason, expected):
resp = _response([_text_block("x")], stop_reason=stop_reason)
_v1_msg, v1_finish = normalize_anthropic_response(resp)
v2 = normalize_anthropic_response_v2(resp)
assert v2.finish_reason == v1_finish == expected
class TestStripToolPrefix:
def test_prefix_stripped(self):
resp = _response(
[_tool_use_block("toolu_1", "mcp_terminal", {"cmd": "ls"})],
stop_reason="tool_use",
)
v1_msg, _ = normalize_anthropic_response(resp, strip_tool_prefix=True)
v2 = normalize_anthropic_response_v2(resp, strip_tool_prefix=True)
assert v1_msg.tool_calls[0].function.name == "terminal"
assert v2.tool_calls[0].name == "terminal"
def test_prefix_kept(self):
resp = _response(
[_tool_use_block("toolu_1", "mcp_terminal", {"cmd": "ls"})],
stop_reason="tool_use",
)
v1_msg, _ = normalize_anthropic_response(resp, strip_tool_prefix=False)
v2 = normalize_anthropic_response_v2(resp, strip_tool_prefix=False)
assert v1_msg.tool_calls[0].function.name == "mcp_terminal"
assert v2.tool_calls[0].name == "mcp_terminal"
class TestEdgeCases:
def test_empty_content_blocks(self):
resp = _response([])
v1_msg, _v1_finish = normalize_anthropic_response(resp)
v2 = normalize_anthropic_response_v2(resp)
assert v2.content == v1_msg.content
assert v2.content is None
def test_no_reasoning_details_means_none_provider_data(self):
resp = _response([_text_block("hi")])
v2 = normalize_anthropic_response_v2(resp)
assert v2.provider_data is None
def test_v2_returns_dataclass_not_namespace(self):
resp = _response([_text_block("hi")])
v2 = normalize_anthropic_response_v2(resp)
assert isinstance(v2, NormalizedResponse)
assert not isinstance(v2, SimpleNamespace)

View File

@@ -0,0 +1,208 @@
"""Tests for the transport ABC, registry, and AnthropicTransport."""
from types import SimpleNamespace
import pytest
from agent.transports import _REGISTRY, get_transport, register_transport
from agent.transports.base import ProviderTransport
from agent.transports.types import NormalizedResponse
class TestProviderTransportABC:
def test_cannot_instantiate_abc(self):
with pytest.raises(TypeError):
ProviderTransport()
def test_concrete_must_implement_all_abstract(self):
class Incomplete(ProviderTransport):
@property
def api_mode(self):
return "test"
with pytest.raises(TypeError):
Incomplete()
def test_minimal_concrete(self):
class Minimal(ProviderTransport):
@property
def api_mode(self):
return "test_minimal"
def convert_messages(self, messages, **kw):
return messages
def convert_tools(self, tools):
return tools
def build_kwargs(self, model, messages, tools=None, **params):
return {"model": model, "messages": messages}
def normalize_response(self, response, **kw):
return NormalizedResponse(content="ok", tool_calls=None, finish_reason="stop")
t = Minimal()
assert t.api_mode == "test_minimal"
assert t.validate_response(None) is True
assert t.extract_cache_stats(None) is None
assert t.map_finish_reason("end_turn") == "end_turn"
class TestTransportRegistry:
def test_get_unregistered_returns_none(self):
assert get_transport("nonexistent_mode") is None
def test_anthropic_registered_on_import(self):
import agent.transports.anthropic # noqa: F401
t = get_transport("anthropic_messages")
assert t is not None
assert t.api_mode == "anthropic_messages"
def test_register_and_get(self):
class DummyTransport(ProviderTransport):
@property
def api_mode(self):
return "dummy_test"
def convert_messages(self, messages, **kw):
return messages
def convert_tools(self, tools):
return tools
def build_kwargs(self, model, messages, tools=None, **params):
return {}
def normalize_response(self, response, **kw):
return NormalizedResponse(content=None, tool_calls=None, finish_reason="stop")
register_transport("dummy_test", DummyTransport)
t = get_transport("dummy_test")
assert t.api_mode == "dummy_test"
_REGISTRY.pop("dummy_test", None)
class TestAnthropicTransport:
@pytest.fixture
def transport(self):
import agent.transports.anthropic # noqa: F401
return get_transport("anthropic_messages")
def test_api_mode(self, transport):
assert transport.api_mode == "anthropic_messages"
def test_convert_tools_simple(self, transport):
tools = [{
"type": "function",
"function": {
"name": "test_tool",
"description": "A test",
"parameters": {"type": "object", "properties": {}},
},
}]
result = transport.convert_tools(tools)
assert len(result) == 1
assert result[0]["name"] == "test_tool"
assert "input_schema" in result[0]
def test_validate_response_none(self, transport):
assert transport.validate_response(None) is False
def test_validate_response_empty_content(self, transport):
r = SimpleNamespace(content=[])
assert transport.validate_response(r) is False
def test_validate_response_valid(self, transport):
r = SimpleNamespace(content=[SimpleNamespace(type="text", text="hello")])
assert transport.validate_response(r) is True
def test_map_finish_reason(self, transport):
assert transport.map_finish_reason("end_turn") == "stop"
assert transport.map_finish_reason("tool_use") == "tool_calls"
assert transport.map_finish_reason("max_tokens") == "length"
assert transport.map_finish_reason("stop_sequence") == "stop"
assert transport.map_finish_reason("refusal") == "content_filter"
assert transport.map_finish_reason("model_context_window_exceeded") == "length"
assert transport.map_finish_reason("unknown") == "stop"
def test_extract_cache_stats_none_usage(self, transport):
r = SimpleNamespace(usage=None)
assert transport.extract_cache_stats(r) is None
def test_extract_cache_stats_with_cache(self, transport):
usage = SimpleNamespace(cache_read_input_tokens=100, cache_creation_input_tokens=50)
r = SimpleNamespace(usage=usage)
result = transport.extract_cache_stats(r)
assert result == {"cached_tokens": 100, "creation_tokens": 50}
def test_extract_cache_stats_zero(self, transport):
usage = SimpleNamespace(cache_read_input_tokens=0, cache_creation_input_tokens=0)
r = SimpleNamespace(usage=usage)
assert transport.extract_cache_stats(r) is None
def test_normalize_response_text(self, transport):
r = SimpleNamespace(
content=[SimpleNamespace(type="text", text="Hello world")],
stop_reason="end_turn",
usage=SimpleNamespace(input_tokens=10, output_tokens=5),
model="claude-sonnet-4-6",
)
nr = transport.normalize_response(r)
assert isinstance(nr, NormalizedResponse)
assert nr.content == "Hello world"
assert nr.tool_calls is None or nr.tool_calls == []
assert nr.finish_reason == "stop"
def test_normalize_response_tool_calls(self, transport):
r = SimpleNamespace(
content=[
SimpleNamespace(type="tool_use", id="toolu_123", name="terminal", input={"command": "ls"}),
],
stop_reason="tool_use",
usage=SimpleNamespace(input_tokens=10, output_tokens=20),
model="claude-sonnet-4-6",
)
nr = transport.normalize_response(r)
assert nr.finish_reason == "tool_calls"
assert len(nr.tool_calls) == 1
tc = nr.tool_calls[0]
assert tc.name == "terminal"
assert tc.id == "toolu_123"
assert '"command"' in tc.arguments
def test_normalize_response_thinking(self, transport):
r = SimpleNamespace(
content=[
SimpleNamespace(type="thinking", thinking="Let me think..."),
SimpleNamespace(type="text", text="The answer is 42"),
],
stop_reason="end_turn",
usage=SimpleNamespace(input_tokens=10, output_tokens=15),
model="claude-sonnet-4-6",
)
nr = transport.normalize_response(r)
assert nr.content == "The answer is 42"
assert nr.reasoning == "Let me think..."
def test_build_kwargs_returns_dict(self, transport):
messages = [{"role": "user", "content": "Hello"}]
kw = transport.build_kwargs(
model="claude-sonnet-4-6",
messages=messages,
max_tokens=1024,
)
assert isinstance(kw, dict)
assert "model" in kw
assert "max_tokens" in kw
assert "messages" in kw
def test_convert_messages_extracts_system(self, transport):
messages = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hi"},
]
system, msgs = transport.convert_messages(messages)
assert system is not None
assert len(msgs) >= 1

View File

@@ -0,0 +1,130 @@
"""Tests for agent/transports/types.py — dataclass construction + helpers."""
import json
from agent.transports.types import (
NormalizedResponse,
ToolCall,
Usage,
build_tool_call,
map_finish_reason,
)
class TestToolCall:
def test_basic_construction(self):
tc = ToolCall(id="call_abc", name="terminal", arguments='{"cmd": "ls"}')
assert tc.id == "call_abc"
assert tc.name == "terminal"
assert tc.arguments == '{"cmd": "ls"}'
assert tc.provider_data is None
def test_none_id(self):
tc = ToolCall(id=None, name="read_file", arguments="{}")
assert tc.id is None
def test_provider_data(self):
tc = ToolCall(
id="call_x",
name="t",
arguments="{}",
provider_data={"call_id": "call_x", "response_item_id": "fc_x"},
)
assert tc.provider_data["call_id"] == "call_x"
assert tc.provider_data["response_item_id"] == "fc_x"
class TestUsage:
def test_defaults(self):
u = Usage()
assert u.prompt_tokens == 0
assert u.completion_tokens == 0
assert u.total_tokens == 0
assert u.cached_tokens == 0
def test_explicit(self):
u = Usage(prompt_tokens=100, completion_tokens=50, total_tokens=150, cached_tokens=80)
assert u.total_tokens == 150
class TestNormalizedResponse:
def test_text_only(self):
r = NormalizedResponse(content="hello", tool_calls=None, finish_reason="stop")
assert r.content == "hello"
assert r.tool_calls is None
assert r.finish_reason == "stop"
assert r.reasoning is None
assert r.usage is None
assert r.provider_data is None
def test_with_tool_calls(self):
tcs = [ToolCall(id="call_1", name="terminal", arguments='{"cmd":"pwd"}')]
r = NormalizedResponse(content=None, tool_calls=tcs, finish_reason="tool_calls")
assert r.finish_reason == "tool_calls"
assert len(r.tool_calls) == 1
assert r.tool_calls[0].name == "terminal"
def test_with_reasoning(self):
r = NormalizedResponse(
content="answer",
tool_calls=None,
finish_reason="stop",
reasoning="I thought about it",
)
assert r.reasoning == "I thought about it"
def test_with_provider_data(self):
r = NormalizedResponse(
content=None,
tool_calls=None,
finish_reason="stop",
provider_data={"reasoning_details": [{"type": "thinking", "thinking": "hmm"}]},
)
assert r.provider_data["reasoning_details"][0]["type"] == "thinking"
class TestBuildToolCall:
def test_dict_arguments_serialized(self):
tc = build_tool_call(id="call_1", name="terminal", arguments={"cmd": "ls"})
assert tc.arguments == json.dumps({"cmd": "ls"})
assert tc.provider_data is None
def test_string_arguments_passthrough(self):
tc = build_tool_call(id="call_2", name="read_file", arguments='{"path": "/tmp"}')
assert tc.arguments == '{"path": "/tmp"}'
def test_provider_fields(self):
tc = build_tool_call(
id="call_3",
name="terminal",
arguments="{}",
call_id="call_3",
response_item_id="fc_3",
)
assert tc.provider_data == {"call_id": "call_3", "response_item_id": "fc_3"}
def test_none_id(self):
tc = build_tool_call(id=None, name="t", arguments="{}")
assert tc.id is None
class TestMapFinishReason:
ANTHROPIC_MAP = {
"end_turn": "stop",
"tool_use": "tool_calls",
"max_tokens": "length",
"stop_sequence": "stop",
"refusal": "content_filter",
}
def test_known_reason(self):
assert map_finish_reason("end_turn", self.ANTHROPIC_MAP) == "stop"
assert map_finish_reason("tool_use", self.ANTHROPIC_MAP) == "tool_calls"
assert map_finish_reason("max_tokens", self.ANTHROPIC_MAP) == "length"
assert map_finish_reason("refusal", self.ANTHROPIC_MAP) == "content_filter"
def test_unknown_reason_defaults_to_stop(self):
assert map_finish_reason("something_new", self.ANTHROPIC_MAP) == "stop"
def test_none_reason(self):
assert map_finish_reason(None, self.ANTHROPIC_MAP) == "stop"

View File

@@ -1,56 +0,0 @@
{
"facts": [
{
"content": "Alexander Whitestone aka Rockachopa.",
"category": "general",
"tags": "identity alias"
},
{
"content": "Rockachopa uses Ansible playbooks for sovereign rollouts.",
"category": "project",
"tags": "ansible playbooks rollout"
},
{
"content": "The provider is anthropic/claude-haiku-4-5.",
"category": "project",
"tags": "provider default",
"updated_at": "2026-01-01T00:00:00Z"
},
{
"content": "Correction: the provider is mimo-v2-pro.",
"category": "project",
"tags": "provider current",
"updated_at": "2026-04-20T00:00:00Z"
},
{
"content": "Ezra operates the BURN2 lane for forge work.",
"category": "project",
"tags": "ezra burn2 forge lane"
},
{
"content": "BURN2 handles forge triage and review.",
"category": "project",
"tags": "forge triage review"
}
],
"queries": [
{
"name": "semantic_alias_graph",
"query": "What automation does Alexander Whitestone use for deploys?",
"expected_substring": "Ansible playbooks",
"top_k": 1
},
{
"name": "temporal_correction",
"query": "What provider should we use?",
"expected_substring": "mimo-v2-pro",
"top_k": 1
},
{
"name": "graph_lane",
"query": "Which forge lane does Ezra operate?",
"expected_substring": "BURN2 lane",
"top_k": 1
}
]
}

View File

@@ -1,116 +0,0 @@
"""Tests for multi-path holographic retrieval fusion and traceability."""
from __future__ import annotations
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
from plugins.memory.holographic import HolographicMemoryProvider
from plugins.memory.holographic.retrieval import FactRetriever, format_benchmark_report
from plugins.memory.holographic.store import MemoryStore
_FIXTURE_PATH = Path(__file__).resolve().parents[2] / "fixtures" / "holographic_recall_matrix.json"
def _fixture() -> dict:
return json.loads(_FIXTURE_PATH.read_text())
def _seed_store(tmp_path) -> MemoryStore:
store = MemoryStore(db_path=tmp_path / "memory_store.db")
for fact in _fixture()["facts"]:
fact_id = store.add_fact(fact["content"], category=fact["category"], tags=fact.get("tags", ""))
if fact.get("updated_at"):
store._conn.execute(
"UPDATE facts SET created_at = ?, updated_at = ? WHERE fact_id = ?",
(fact["updated_at"], fact["updated_at"], fact_id),
)
store._conn.commit()
return store
class TestMultiPathRetrieval:
def test_lane_toggle_and_trace_contributions(self, tmp_path):
store = _seed_store(tmp_path)
retriever = FactRetriever(store=store)
payload = retriever.search_with_trace(
"Which forge lane does Ezra operate?",
limit=3,
lanes=["lexical", "graph"],
)
assert payload["trace"]["lanes_run"] == ["lexical", "graph"]
assert payload["results"]
top = payload["results"][0]
assert "BURN2 lane" in top["content"]
assert "graph" in top["lane_contributions"]
assert set(top["lane_contributions"]).issubset({"lexical", "graph"})
def test_trace_available_for_failed_recall(self, tmp_path):
store = _seed_store(tmp_path)
retriever = FactRetriever(store=store)
payload = retriever.search_with_trace(
"nonexistent memory topic xyz123",
limit=3,
lanes=["lexical", "semantic", "graph", "temporal"],
)
assert payload["results"] == []
assert payload["trace"]["fused_count"] == 0
assert payload["trace"]["lane_hits"]["lexical"] == 0
assert payload["trace"]["lane_hits"]["semantic"] == 0
def test_benchmark_prompt_matrix_shows_gain_over_baseline(self, tmp_path):
store = _seed_store(tmp_path)
retriever = FactRetriever(store=store)
report = retriever.benchmark_prompt_matrix(_fixture()["queries"], limit=3)
assert report["fused_top1_hits"] > report["baseline_top1_hits"]
assert report["improvement"] > 0
rendered = format_benchmark_report(report)
assert "Prompt matrix benchmark" in rendered
assert "semantic_alias_graph" in rendered
assert "improvement" in rendered.lower()
class TestHolographicProviderTrace:
def test_prefetch_records_trace_and_trace_action_returns_it(self, tmp_path):
provider = HolographicMemoryProvider(
config={
"db_path": str(tmp_path / "provider.db"),
"retrieval_lanes": ["lexical", "semantic", "graph", "temporal"],
"enable_rerank": True,
}
)
provider.initialize("test-session")
seed_store = _seed_store(tmp_path / "seed")
rows = seed_store.list_facts(min_trust=0.0, limit=20)
for row in rows:
provider._store.add_fact(row["content"], category=row["category"], tags=row.get("tags", ""))
if row["content"].startswith("The provider is anthropic"):
provider._store._conn.execute(
"UPDATE facts SET created_at = ?, updated_at = ? WHERE content = ?",
("2026-01-01T00:00:00Z", "2026-01-01T00:00:00Z", row["content"]),
)
elif row["content"].startswith("Correction: the provider is mimo"):
provider._store._conn.execute(
"UPDATE facts SET created_at = ?, updated_at = ? WHERE content = ?",
("2026-04-20T00:00:00Z", "2026-04-20T00:00:00Z", row["content"]),
)
provider._store._conn.commit()
block = provider.prefetch("What provider should we use?")
assert "Holographic Memory" in block
assert "mimo-v2-pro" in block
trace_payload = json.loads(provider.handle_tool_call("fact_store", {"action": "trace"}))
assert trace_payload["trace"]["query"] == "What provider should we use?"
assert trace_payload["trace"]["rerank_applied"] in {True, False}
assert trace_payload["trace"]["lane_hits"]["temporal"] >= 1