Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
d1fb50bf2f feat: add Anthropic transport abstraction slice (#951)
All checks were successful
Lint / lint (pull_request) Successful in 8s
- add transport registry, shared transport dataclasses, and AnthropicTransport
- add normalize_anthropic_response_v2 as the bridge from existing Anthropic normalization to shared transport types
- extend Anthropic stop-reason mapping for refusal and model_context_window_exceeded
- add targeted transport and v2 normalization regression tests

Closes #951
Refs #949
2026-04-22 11:20:20 -04:00
16 changed files with 864 additions and 486 deletions

View File

@@ -1396,6 +1396,8 @@ def normalize_anthropic_response(
"tool_use": "tool_calls",
"max_tokens": "length",
"stop_sequence": "stop",
"refusal": "content_filter",
"model_context_window_exceeded": "length",
}
finish_reason = stop_reason_map.get(response.stop_reason, "stop")
@@ -1409,3 +1411,42 @@ def normalize_anthropic_response(
),
finish_reason,
)
def normalize_anthropic_response_v2(
response,
strip_tool_prefix: bool = False,
) -> "NormalizedResponse":
"""Normalize Anthropic response to NormalizedResponse.
Wraps the existing normalize_anthropic_response() and maps its output
to the shared transport types. This allows incremental migration
without disturbing the legacy call sites.
"""
from agent.transports.types import NormalizedResponse, build_tool_call
assistant_msg, finish_reason = normalize_anthropic_response(response, strip_tool_prefix)
tool_calls = None
if assistant_msg.tool_calls:
tool_calls = [
build_tool_call(
id=tc.id,
name=tc.function.name,
arguments=tc.function.arguments,
)
for tc in assistant_msg.tool_calls
]
provider_data = {}
if getattr(assistant_msg, "reasoning_details", None):
provider_data["reasoning_details"] = assistant_msg.reasoning_details
return NormalizedResponse(
content=assistant_msg.content,
tool_calls=tool_calls,
finish_reason=finish_reason,
reasoning=getattr(assistant_msg, "reasoning", None),
usage=None,
provider_data=provider_data or None,
)

View File

@@ -1,69 +0,0 @@
"""First-class context snapshot artifacts for live runtime memory evaluation."""
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any
from hermes_constants import get_hermes_home
_SAFE_SEGMENT_RE = re.compile(r"[^A-Za-z0-9_.-]+")
class ContextSnapshotRecorder:
"""Write per-call prompt-composition artifacts for a Hermes session."""
def __init__(self, session_id: str, *, enabled: bool = False, base_dir: str | Path | None = None):
self.session_id = session_id or "session"
self.enabled = bool(enabled)
self.base_dir = Path(base_dir) if base_dir else get_hermes_home() / "reports" / "context_snapshots"
@property
def session_dir(self) -> Path:
safe_session = _SAFE_SEGMENT_RE.sub("_", self.session_id).strip("._") or "session"
return self.base_dir / safe_session
def record_call(
self,
api_call_count: int,
*,
system_prompt: str,
memory_provider_system_prompt: str = "",
memory_prefetch_raw: str = "",
memory_context_block: str = "",
api_user_message: str = "",
api_messages: list[dict[str, Any]] | None = None,
metadata: dict[str, Any] | None = None,
) -> Path | None:
if not self.enabled:
return None
call_dir = self.session_dir / f"call_{api_call_count:03d}"
call_dir.mkdir(parents=True, exist_ok=True)
self._write_text(call_dir / "system_prompt.txt", system_prompt or "")
self._write_text(call_dir / "memory_provider_system_prompt.txt", memory_provider_system_prompt or "")
self._write_text(call_dir / "memory_prefetch_raw.txt", memory_prefetch_raw or "")
self._write_text(call_dir / "memory_context_block.txt", memory_context_block or "")
self._write_text(call_dir / "api_user_message.txt", api_user_message or "")
self._write_json(call_dir / "api_messages.json", api_messages or [])
self._write_json(
call_dir / "metadata.json",
{
"session_id": self.session_id,
"api_call_count": api_call_count,
**(metadata or {}),
},
)
return call_dir
@staticmethod
def _write_text(path: Path, content: str) -> None:
path.write_text(content, encoding="utf-8")
@staticmethod
def _write_json(path: Path, payload: Any) -> None:
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")

View File

@@ -0,0 +1,57 @@
"""Transport layer types and registry for provider response normalization.
Usage:
from agent.transports import get_transport
transport = get_transport("anthropic_messages")
result = transport.normalize_response(raw_response)
"""
from agent.transports.types import ( # noqa: F401
NormalizedResponse,
ToolCall,
Usage,
build_tool_call,
map_finish_reason,
)
_REGISTRY: dict = {}
def register_transport(api_mode: str, transport_cls: type) -> None:
"""Register a transport class for an api_mode string."""
_REGISTRY[api_mode] = transport_cls
def get_transport(api_mode: str):
"""Get a transport instance for the given api_mode.
Returns None if no transport is registered for this api_mode.
This allows gradual migration — call sites can check for None
and fall back to the legacy code path.
"""
if not _REGISTRY:
_discover_transports()
cls = _REGISTRY.get(api_mode)
if cls is None:
return None
return cls()
def _discover_transports() -> None:
"""Import all transport modules to trigger auto-registration."""
try:
import agent.transports.anthropic # noqa: F401
except ImportError:
pass
try:
import agent.transports.codex # noqa: F401
except ImportError:
pass
try:
import agent.transports.chat_completions # noqa: F401
except ImportError:
pass
try:
import agent.transports.bedrock # noqa: F401
except ImportError:
pass

View File

@@ -0,0 +1,95 @@
"""Anthropic Messages API transport.
Delegates to the existing adapter functions in agent/anthropic_adapter.py.
This transport owns format conversion and normalization — NOT client lifecycle.
"""
from typing import Any, Dict, List, Optional
from agent.transports.base import ProviderTransport
from agent.transports.types import NormalizedResponse
class AnthropicTransport(ProviderTransport):
"""Transport for api_mode='anthropic_messages'."""
@property
def api_mode(self) -> str:
return "anthropic_messages"
def convert_messages(self, messages: List[Dict[str, Any]], **kwargs) -> Any:
from agent.anthropic_adapter import convert_messages_to_anthropic
base_url = kwargs.get("base_url")
return convert_messages_to_anthropic(messages, base_url=base_url)
def convert_tools(self, tools: List[Dict[str, Any]]) -> Any:
from agent.anthropic_adapter import convert_tools_to_anthropic
return convert_tools_to_anthropic(tools)
def build_kwargs(
self,
model: str,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
**params,
) -> Dict[str, Any]:
from agent.anthropic_adapter import build_anthropic_kwargs
return build_anthropic_kwargs(
model=model,
messages=messages,
tools=tools,
max_tokens=params.get("max_tokens", 16384),
reasoning_config=params.get("reasoning_config"),
tool_choice=params.get("tool_choice"),
is_oauth=params.get("is_oauth", False),
preserve_dots=params.get("preserve_dots", False),
context_length=params.get("context_length"),
base_url=params.get("base_url"),
fast_mode=params.get("fast_mode", False),
)
def normalize_response(self, response: Any, **kwargs) -> NormalizedResponse:
from agent.anthropic_adapter import normalize_anthropic_response_v2
strip_tool_prefix = kwargs.get("strip_tool_prefix", False)
return normalize_anthropic_response_v2(response, strip_tool_prefix=strip_tool_prefix)
def validate_response(self, response: Any) -> bool:
if response is None:
return False
content_blocks = getattr(response, "content", None)
if not isinstance(content_blocks, list):
return False
if not content_blocks:
return False
return True
def extract_cache_stats(self, response: Any):
usage = getattr(response, "usage", None)
if usage is None:
return None
cached = getattr(usage, "cache_read_input_tokens", 0) or 0
written = getattr(usage, "cache_creation_input_tokens", 0) or 0
if cached or written:
return {"cached_tokens": cached, "creation_tokens": written}
return None
_STOP_REASON_MAP = {
"end_turn": "stop",
"tool_use": "tool_calls",
"max_tokens": "length",
"stop_sequence": "stop",
"refusal": "content_filter",
"model_context_window_exceeded": "length",
}
def map_finish_reason(self, raw_reason: str) -> str:
return self._STOP_REASON_MAP.get(raw_reason, "stop")
from agent.transports import register_transport # noqa: E402
register_transport("anthropic_messages", AnthropicTransport)

61
agent/transports/base.py Normal file
View File

@@ -0,0 +1,61 @@
"""Abstract base for provider transports.
A transport owns the data path for one api_mode:
convert_messages → convert_tools → build_kwargs → normalize_response
It does NOT own: client construction, streaming, credential refresh,
prompt caching, interrupt handling, or retry logic. Those stay on AIAgent.
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from agent.transports.types import NormalizedResponse
class ProviderTransport(ABC):
"""Base class for provider-specific format conversion and normalization."""
@property
@abstractmethod
def api_mode(self) -> str:
"""The api_mode string this transport handles."""
...
@abstractmethod
def convert_messages(self, messages: List[Dict[str, Any]], **kwargs) -> Any:
"""Convert OpenAI-format messages to provider-native format."""
...
@abstractmethod
def convert_tools(self, tools: List[Dict[str, Any]]) -> Any:
"""Convert OpenAI-format tool definitions to provider-native format."""
...
@abstractmethod
def build_kwargs(
self,
model: str,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
**params,
) -> Dict[str, Any]:
"""Build the complete provider kwargs dict."""
...
@abstractmethod
def normalize_response(self, response: Any, **kwargs) -> NormalizedResponse:
"""Normalize a raw provider response to the shared NormalizedResponse type."""
...
def validate_response(self, response: Any) -> bool:
"""Optional structural validation for raw responses."""
return True
def extract_cache_stats(self, response: Any) -> Optional[Dict[str, int]]:
"""Optional cache stats extraction."""
return None
def map_finish_reason(self, raw_reason: str) -> str:
"""Optional stop-reason mapping. Defaults to passthrough."""
return raw_reason

58
agent/transports/types.py Normal file
View File

@@ -0,0 +1,58 @@
"""Shared types for normalized provider responses."""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
@dataclass
class ToolCall:
"""A normalized tool call from any provider."""
id: Optional[str]
name: str
arguments: str
provider_data: Optional[Dict[str, Any]] = field(default=None, repr=False)
@dataclass
class Usage:
"""Token usage from an API response."""
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
cached_tokens: int = 0
@dataclass
class NormalizedResponse:
"""Normalized API response from any provider."""
content: Optional[str]
tool_calls: Optional[List[ToolCall]]
finish_reason: str
reasoning: Optional[str] = None
usage: Optional[Usage] = None
provider_data: Optional[Dict[str, Any]] = field(default=None, repr=False)
def build_tool_call(
id: Optional[str],
name: str,
arguments: Any,
**provider_fields: Any,
) -> ToolCall:
"""Build a ToolCall, auto-serialising dict arguments."""
args_str = json.dumps(arguments) if isinstance(arguments, dict) else str(arguments)
provider_data = dict(provider_fields) if provider_fields else None
return ToolCall(id=id, name=name, arguments=args_str, provider_data=provider_data)
def map_finish_reason(reason: Optional[str], mapping: Dict[str, str]) -> str:
"""Translate a provider-specific stop reason to the normalized set."""
if reason is None:
return "stop"
return mapping.get(reason, "stop")

View File

@@ -1,132 +0,0 @@
# Hindsight local eval homes for live Hermes runtime testing
Issue: #1010
Parent: #985
This document defines a reproducible, profile-scoped evaluation layout for baseline / MemPalace / Hindsight comparisons without requiring Hindsight Cloud.
## Eval home layout
Use three separate `HERMES_HOME` directories so each run has isolated config, memory, sessions, and artifacts.
```text
~/.hermes/profiles/atlas-baseline/
config.yaml
.env
MEMORY.md
USER.md
reports/context_snapshots/
~/.hermes/profiles/atlas-mempalace/
config.yaml
.env
MEMORY.md
USER.md
reports/context_snapshots/
plugins/ # if a local MemPalace plugin is installed for this eval lane
~/.hermes/profiles/atlas-hindsight/
config.yaml
.env
MEMORY.md
USER.md
hindsight/config.json
reports/context_snapshots/
```
## Hindsight local config
The Hindsight provider already loads config from `$HERMES_HOME/hindsight/config.json` first. For the local eval lane, prefer `local_embedded` so Hermes can bring up a local Hindsight daemon without cloud signup.
Example `~/.hermes/profiles/atlas-hindsight/hindsight/config.json`:
```json
{
"mode": "local_embedded",
"memory_mode": "context",
"recall_prefetch_method": "recall",
"llm_provider": "ollama",
"llm_model": "gemma3:12b",
"api_url": "http://localhost:8888"
}
```
Notes:
- `local_embedded` avoids any Hindsight Cloud dependency.
- If `profile` is omitted, Hermes now derives a stable local Hindsight profile name from the active profile identity / `HERMES_HOME` instead of collapsing all local runs into the shared legacy `hermes` profile.
- `local_external` remains valid if you already run a local Hindsight server yourself.
## Runtime switching procedure
Switch by exporting `HERMES_HOME` before launching Hermes.
### 1. Baseline
```bash
export HERMES_HOME="$HOME/.hermes/profiles/atlas-baseline"
unset HERMES_CONTEXT_SNAPSHOTS
hermes chat
```
### 2. MemPalace lane
```bash
export HERMES_HOME="$HOME/.hermes/profiles/atlas-mempalace"
export HERMES_CONTEXT_SNAPSHOTS=1
hermes chat
```
### 3. Hindsight lane
```bash
export HERMES_HOME="$HOME/.hermes/profiles/atlas-hindsight"
export HERMES_CONTEXT_SNAPSHOTS=1
hermes chat
```
## Raw artifact capture
When `HERMES_CONTEXT_SNAPSHOTS=1` is enabled, Hermes writes first-class prompt-composition artifacts under the active home by default.
Artifact tree:
```text
$HERMES_HOME/reports/context_snapshots/<session-id>/call_001/
system_prompt.txt
memory_provider_system_prompt.txt
memory_prefetch_raw.txt
memory_context_block.txt
api_user_message.txt
api_messages.json
metadata.json
```
Minimum files a benchmark should inspect:
- `system_prompt.txt`
- `memory_prefetch_raw.txt`
- `memory_context_block.txt`
- `api_user_message.txt`
- `api_messages.json`
These prove:
- what the system prompt was
- what the provider prefetched
- what entered `<memory-context>`
- what the final API user message looked like
- what full payload reached the model
## Follow-on benchmark workflow
A benchmark issue can now consume this path without redoing integration work:
1. pick one eval home (`atlas-baseline`, `atlas-mempalace`, `atlas-hindsight`)
2. export the corresponding `HERMES_HOME`
3. run Hermes on the same prompt set
4. compare the snapshot artifacts in `reports/context_snapshots/`
5. score recall quality and answer quality separately
## Why this is sovereign
- no hosted Hindsight Cloud dependency is required
- the Hindsight config is profile-scoped under `hindsight/config.json`
- the runtime artifacts stay under the active `HERMES_HOME`
- switching between baseline / MemPalace / Hindsight is just a `HERMES_HOME` swap

View File

@@ -178,25 +178,6 @@ def _load_config() -> dict:
}
def _derive_local_profile_name(agent_identity: str = "", hermes_home: str = "") -> str:
"""Return a stable profile name for local embedded Hindsight storage.
Prefer the active Hermes profile identity when available, otherwise fall back
to the basename of the active HERMES_HOME path. This prevents all local
Hindsight eval homes from sharing the legacy default profile name "hermes".
"""
from pathlib import Path
import re
raw = (agent_identity or "").strip()
if not raw and hermes_home:
raw = Path(hermes_home).name.strip()
if not raw:
raw = "hermes"
safe = re.sub(r"[^A-Za-z0-9_.-]+", "-", raw).strip(".-_")
return safe or "hermes"
# ---------------------------------------------------------------------------
# MemoryProvider implementation
# ---------------------------------------------------------------------------
@@ -487,8 +468,6 @@ class HindsightMemoryProvider(MemoryProvider):
def initialize(self, session_id: str, **kwargs) -> None:
self._session_id = session_id
hermes_home = str(kwargs.get("hermes_home") or "")
agent_identity = str(kwargs.get("agent_identity") or "")
# Check client version and auto-upgrade if needed
try:
@@ -521,11 +500,6 @@ class HindsightMemoryProvider(MemoryProvider):
# "local" is a legacy alias for "local_embedded"
if self._mode == "local":
self._mode = "local_embedded"
if self._mode == "local_embedded" and not self._config.get("profile"):
self._config["profile"] = _derive_local_profile_name(
agent_identity=agent_identity,
hermes_home=hermes_home,
)
self._api_key = self._config.get("apiKey") or self._config.get("api_key") or os.environ.get("HINDSIGHT_API_KEY", "")
default_url = _DEFAULT_LOCAL_URL if self._mode in ("local_embedded", "local_external") else _DEFAULT_API_URL
self._api_url = self._config.get("api_url") or os.environ.get("HINDSIGHT_API_URL", default_url)

View File

@@ -604,8 +604,6 @@ class AIAgent:
checkpoint_max_snapshots: int = 50,
pass_session_id: bool = False,
persist_session: bool = True,
context_snapshots_enabled: bool | None = None,
context_snapshots_dir: str | None = None,
):
"""
Initialize the AI Agent.
@@ -1131,43 +1129,6 @@ class AIAgent:
except Exception:
_agent_cfg = {}
def _is_enabled(value):
if isinstance(value, bool):
return value
return str(value).strip().lower() in {"1", "true", "yes", "on"}
_debug_cfg = _agent_cfg.get("debug", {}) if isinstance(_agent_cfg, dict) else {}
if not isinstance(_debug_cfg, dict):
_debug_cfg = {}
_snapshot_cfg = _debug_cfg.get("context_snapshots", {})
if not isinstance(_snapshot_cfg, dict):
_snapshot_cfg = {}
_snapshots_env = os.getenv("HERMES_CONTEXT_SNAPSHOTS")
_snapshots_dir_env = os.getenv("HERMES_CONTEXT_SNAPSHOTS_DIR")
if context_snapshots_enabled is None:
if _snapshots_env is not None:
self._context_snapshots_enabled = _is_enabled(_snapshots_env)
else:
self._context_snapshots_enabled = _is_enabled(_snapshot_cfg.get("enabled", False))
else:
self._context_snapshots_enabled = bool(context_snapshots_enabled)
self._context_snapshots_dir = (
context_snapshots_dir
or _snapshots_dir_env
or _snapshot_cfg.get("dir")
or None
)
try:
from agent.context_snapshots import ContextSnapshotRecorder
self._context_snapshot_recorder = ContextSnapshotRecorder(
session_id=self.session_id,
enabled=self._context_snapshots_enabled,
base_dir=self._context_snapshots_dir,
)
except Exception as _snapshot_err:
logger.debug("Context snapshot recorder init failed: %s", _snapshot_err)
self._context_snapshot_recorder = None
# Persistent memory (MEMORY.md + USER.md) -- loaded from disk
self._memory_store = None
self._memory_enabled = False
@@ -8183,17 +8144,12 @@ class AIAgent:
# Use original_user_message (clean input) — user_message may contain
# injected skill content that bloats / breaks provider queries.
_ext_prefetch_cache = ""
_memory_provider_prompt_cache = ""
if self._memory_manager:
try:
_query = original_user_message if isinstance(original_user_message, str) else ""
_ext_prefetch_cache = self._memory_manager.prefetch_all(_query) or ""
except Exception:
pass
try:
_memory_provider_prompt_cache = self._memory_manager.build_system_prompt() or ""
except Exception:
pass
while (api_call_count < self.max_iterations and self.iteration_budget.remaining > 0) or self._budget_grace_call:
# Reset per-turn checkpoint dedup so each iteration can take one snapshot
@@ -8261,8 +8217,6 @@ class AIAgent:
# However, providers like Moonshot AI require a separate 'reasoning_content' field
# on assistant messages with tool_calls. We handle both cases here.
api_messages = []
_current_api_user_message = ""
_current_memory_context_block = ""
for idx, msg in enumerate(messages):
api_msg = msg.copy()
@@ -8277,15 +8231,12 @@ class AIAgent:
_fenced = build_memory_context_block(_ext_prefetch_cache)
if _fenced:
_injections.append(_fenced)
_current_memory_context_block = _fenced
if _plugin_user_context:
_injections.append(_plugin_user_context)
if _injections:
_base = api_msg.get("content", "")
if isinstance(_base, str):
api_msg["content"] = _base + "\n\n" + "\n\n".join(_injections)
if isinstance(api_msg.get("content"), str):
_current_api_user_message = api_msg["content"]
# For ALL assistant messages, pass reasoning back to the API
# This ensures multi-turn reasoning context is preserved
@@ -8320,13 +8271,7 @@ class AIAgent:
from agent.privacy_filter import PrivacyFilter
pf = PrivacyFilter()
# Sanitize messages before they reach the provider
_pf_result = pf.sanitize_messages(api_messages)
if isinstance(_pf_result, tuple):
api_messages, _pf_report = _pf_result
if getattr(pf, "last_report", None) is None:
pf.last_report = _pf_report
else:
api_messages = _pf_result
api_messages = pf.sanitize_messages(api_messages)
if pf.last_report and pf.last_report.had_redactions:
logger.info(f"Privacy Filter: Redacted sensitive data from turn payload. Details: {pf.last_report.summary()}")
except Exception as e:
@@ -8397,27 +8342,6 @@ class AIAgent:
new_tcs.append(tc)
am["tool_calls"] = new_tcs
if self._context_snapshot_recorder:
try:
self._context_snapshot_recorder.record_call(
api_call_count,
system_prompt=effective_system,
memory_provider_system_prompt=_memory_provider_prompt_cache,
memory_prefetch_raw=_ext_prefetch_cache,
memory_context_block=_current_memory_context_block,
api_user_message=_current_api_user_message,
api_messages=api_messages,
metadata={
"model": self.model,
"provider": self.provider,
"platform": self.platform or "",
"api_mode": self.api_mode,
"memory_providers": [p.name for p in getattr(self._memory_manager, "providers", [])],
},
)
except Exception as _snapshot_err:
logger.debug("Context snapshot capture failed: %s", _snapshot_err)
# Calculate approximate request size for logging
total_chars = sum(len(str(msg)) for msg in api_messages)
approx_tokens = estimate_messages_tokens_rough(api_messages)

View File

@@ -0,0 +1,213 @@
"""Regression tests: normalize_anthropic_response_v2 vs v1.
Constructs mock Anthropic responses and asserts that the v2 function
(returning NormalizedResponse) produces identical field values to the
original v1 function (returning SimpleNamespace + finish_reason).
"""
from types import SimpleNamespace
import pytest
from agent.anthropic_adapter import (
normalize_anthropic_response,
normalize_anthropic_response_v2,
)
from agent.transports.types import NormalizedResponse
def _text_block(text: str):
return SimpleNamespace(type="text", text=text)
def _thinking_block(thinking: str, signature: str = "sig_abc"):
return SimpleNamespace(type="thinking", thinking=thinking, signature=signature)
def _tool_use_block(id: str, name: str, input: dict):
return SimpleNamespace(type="tool_use", id=id, name=name, input=input)
def _response(content_blocks, stop_reason="end_turn"):
return SimpleNamespace(
content=content_blocks,
stop_reason=stop_reason,
usage=SimpleNamespace(input_tokens=10, output_tokens=5),
)
class TestTextOnly:
def setup_method(self):
self.resp = _response([_text_block("Hello world")])
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_type(self):
assert isinstance(self.v2, NormalizedResponse)
def test_content_matches(self):
assert self.v2.content == self.v1_msg.content
def test_finish_reason_matches(self):
assert self.v2.finish_reason == self.v1_finish
def test_no_tool_calls(self):
assert self.v2.tool_calls is None
assert self.v1_msg.tool_calls is None
def test_no_reasoning(self):
assert self.v2.reasoning is None
assert self.v1_msg.reasoning is None
class TestWithToolCalls:
def setup_method(self):
self.resp = _response(
[
_text_block("I'll check that"),
_tool_use_block("toolu_abc", "terminal", {"command": "ls"}),
_tool_use_block("toolu_def", "read_file", {"path": "/tmp"}),
],
stop_reason="tool_use",
)
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_finish_reason(self):
assert self.v2.finish_reason == "tool_calls"
assert self.v1_finish == "tool_calls"
def test_tool_call_count(self):
assert len(self.v2.tool_calls) == 2
assert len(self.v1_msg.tool_calls) == 2
def test_tool_call_ids_match(self):
for i in range(2):
assert self.v2.tool_calls[i].id == self.v1_msg.tool_calls[i].id
def test_tool_call_names_match(self):
assert self.v2.tool_calls[0].name == "terminal"
assert self.v2.tool_calls[1].name == "read_file"
for i in range(2):
assert self.v2.tool_calls[i].name == self.v1_msg.tool_calls[i].function.name
def test_tool_call_arguments_match(self):
for i in range(2):
assert self.v2.tool_calls[i].arguments == self.v1_msg.tool_calls[i].function.arguments
def test_content_preserved(self):
assert self.v2.content == self.v1_msg.content
assert "check that" in self.v2.content
class TestWithThinking:
def setup_method(self):
self.resp = _response([
_thinking_block("Let me think about this carefully..."),
_text_block("The answer is 42."),
])
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_reasoning_matches(self):
assert self.v2.reasoning == self.v1_msg.reasoning
assert "think about this" in self.v2.reasoning
def test_reasoning_details_in_provider_data(self):
v1_details = self.v1_msg.reasoning_details
v2_details = self.v2.provider_data.get("reasoning_details") if self.v2.provider_data else None
assert v1_details is not None
assert v2_details is not None
assert len(v2_details) == len(v1_details)
def test_content_excludes_thinking(self):
assert self.v2.content == "The answer is 42."
class TestMixed:
def setup_method(self):
self.resp = _response(
[
_thinking_block("Planning my approach..."),
_text_block("I'll run the command"),
_tool_use_block("toolu_xyz", "terminal", {"command": "pwd"}),
],
stop_reason="tool_use",
)
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_all_fields_present(self):
assert self.v2.content is not None
assert self.v2.tool_calls is not None
assert self.v2.reasoning is not None
assert self.v2.finish_reason == "tool_calls"
def test_content_matches(self):
assert self.v2.content == self.v1_msg.content
def test_reasoning_matches(self):
assert self.v2.reasoning == self.v1_msg.reasoning
def test_tool_call_matches(self):
assert self.v2.tool_calls[0].id == self.v1_msg.tool_calls[0].id
assert self.v2.tool_calls[0].name == self.v1_msg.tool_calls[0].function.name
class TestStopReasons:
@pytest.mark.parametrize("stop_reason,expected", [
("end_turn", "stop"),
("tool_use", "tool_calls"),
("max_tokens", "length"),
("stop_sequence", "stop"),
("refusal", "content_filter"),
("model_context_window_exceeded", "length"),
("unknown_future_reason", "stop"),
])
def test_stop_reason_mapping(self, stop_reason, expected):
resp = _response([_text_block("x")], stop_reason=stop_reason)
_v1_msg, v1_finish = normalize_anthropic_response(resp)
v2 = normalize_anthropic_response_v2(resp)
assert v2.finish_reason == v1_finish == expected
class TestStripToolPrefix:
def test_prefix_stripped(self):
resp = _response(
[_tool_use_block("toolu_1", "mcp_terminal", {"cmd": "ls"})],
stop_reason="tool_use",
)
v1_msg, _ = normalize_anthropic_response(resp, strip_tool_prefix=True)
v2 = normalize_anthropic_response_v2(resp, strip_tool_prefix=True)
assert v1_msg.tool_calls[0].function.name == "terminal"
assert v2.tool_calls[0].name == "terminal"
def test_prefix_kept(self):
resp = _response(
[_tool_use_block("toolu_1", "mcp_terminal", {"cmd": "ls"})],
stop_reason="tool_use",
)
v1_msg, _ = normalize_anthropic_response(resp, strip_tool_prefix=False)
v2 = normalize_anthropic_response_v2(resp, strip_tool_prefix=False)
assert v1_msg.tool_calls[0].function.name == "mcp_terminal"
assert v2.tool_calls[0].name == "mcp_terminal"
class TestEdgeCases:
def test_empty_content_blocks(self):
resp = _response([])
v1_msg, _v1_finish = normalize_anthropic_response(resp)
v2 = normalize_anthropic_response_v2(resp)
assert v2.content == v1_msg.content
assert v2.content is None
def test_no_reasoning_details_means_none_provider_data(self):
resp = _response([_text_block("hi")])
v2 = normalize_anthropic_response_v2(resp)
assert v2.provider_data is None
def test_v2_returns_dataclass_not_namespace(self):
resp = _response([_text_block("hi")])
v2 = normalize_anthropic_response_v2(resp)
assert isinstance(v2, NormalizedResponse)
assert not isinstance(v2, SimpleNamespace)

View File

@@ -1,43 +0,0 @@
from pathlib import Path
from agent.context_snapshots import ContextSnapshotRecorder
def test_disabled_recorder_writes_nothing(tmp_path):
recorder = ContextSnapshotRecorder(session_id="session-1", enabled=False, base_dir=tmp_path)
out = recorder.record_call(
1,
system_prompt="system",
api_messages=[{"role": "user", "content": "hello"}],
)
assert out is None
assert not (tmp_path / "session-1").exists()
def test_enabled_recorder_writes_expected_artifacts(tmp_path):
recorder = ContextSnapshotRecorder(session_id="session-1", enabled=True, base_dir=tmp_path)
out = recorder.record_call(
1,
system_prompt="system prompt",
memory_provider_system_prompt="# Hindsight Memory\nActive.",
memory_prefetch_raw="- remembered fact",
memory_context_block="<memory-context>\nremembered\n</memory-context>",
api_user_message="What do I prefer?\n\n<memory-context>\nremembered\n</memory-context>",
api_messages=[
{"role": "system", "content": "system prompt"},
{"role": "user", "content": "What do I prefer?"},
],
metadata={"provider": "openai", "memory_providers": ["builtin", "hindsight"]},
)
assert out == tmp_path / "session-1" / "call_001"
assert (out / "system_prompt.txt").read_text(encoding="utf-8") == "system prompt"
assert (out / "memory_provider_system_prompt.txt").read_text(encoding="utf-8").startswith("# Hindsight Memory")
assert (out / "memory_prefetch_raw.txt").read_text(encoding="utf-8") == "- remembered fact"
assert "<memory-context>" in (out / "memory_context_block.txt").read_text(encoding="utf-8")
assert "What do I prefer?" in (out / "api_user_message.txt").read_text(encoding="utf-8")
assert (out / "api_messages.json").read_text(encoding="utf-8").startswith("[")
assert '"hindsight"' in (out / "metadata.json").read_text(encoding="utf-8")

View File

@@ -0,0 +1,208 @@
"""Tests for the transport ABC, registry, and AnthropicTransport."""
from types import SimpleNamespace
import pytest
from agent.transports import _REGISTRY, get_transport, register_transport
from agent.transports.base import ProviderTransport
from agent.transports.types import NormalizedResponse
class TestProviderTransportABC:
def test_cannot_instantiate_abc(self):
with pytest.raises(TypeError):
ProviderTransport()
def test_concrete_must_implement_all_abstract(self):
class Incomplete(ProviderTransport):
@property
def api_mode(self):
return "test"
with pytest.raises(TypeError):
Incomplete()
def test_minimal_concrete(self):
class Minimal(ProviderTransport):
@property
def api_mode(self):
return "test_minimal"
def convert_messages(self, messages, **kw):
return messages
def convert_tools(self, tools):
return tools
def build_kwargs(self, model, messages, tools=None, **params):
return {"model": model, "messages": messages}
def normalize_response(self, response, **kw):
return NormalizedResponse(content="ok", tool_calls=None, finish_reason="stop")
t = Minimal()
assert t.api_mode == "test_minimal"
assert t.validate_response(None) is True
assert t.extract_cache_stats(None) is None
assert t.map_finish_reason("end_turn") == "end_turn"
class TestTransportRegistry:
def test_get_unregistered_returns_none(self):
assert get_transport("nonexistent_mode") is None
def test_anthropic_registered_on_import(self):
import agent.transports.anthropic # noqa: F401
t = get_transport("anthropic_messages")
assert t is not None
assert t.api_mode == "anthropic_messages"
def test_register_and_get(self):
class DummyTransport(ProviderTransport):
@property
def api_mode(self):
return "dummy_test"
def convert_messages(self, messages, **kw):
return messages
def convert_tools(self, tools):
return tools
def build_kwargs(self, model, messages, tools=None, **params):
return {}
def normalize_response(self, response, **kw):
return NormalizedResponse(content=None, tool_calls=None, finish_reason="stop")
register_transport("dummy_test", DummyTransport)
t = get_transport("dummy_test")
assert t.api_mode == "dummy_test"
_REGISTRY.pop("dummy_test", None)
class TestAnthropicTransport:
@pytest.fixture
def transport(self):
import agent.transports.anthropic # noqa: F401
return get_transport("anthropic_messages")
def test_api_mode(self, transport):
assert transport.api_mode == "anthropic_messages"
def test_convert_tools_simple(self, transport):
tools = [{
"type": "function",
"function": {
"name": "test_tool",
"description": "A test",
"parameters": {"type": "object", "properties": {}},
},
}]
result = transport.convert_tools(tools)
assert len(result) == 1
assert result[0]["name"] == "test_tool"
assert "input_schema" in result[0]
def test_validate_response_none(self, transport):
assert transport.validate_response(None) is False
def test_validate_response_empty_content(self, transport):
r = SimpleNamespace(content=[])
assert transport.validate_response(r) is False
def test_validate_response_valid(self, transport):
r = SimpleNamespace(content=[SimpleNamespace(type="text", text="hello")])
assert transport.validate_response(r) is True
def test_map_finish_reason(self, transport):
assert transport.map_finish_reason("end_turn") == "stop"
assert transport.map_finish_reason("tool_use") == "tool_calls"
assert transport.map_finish_reason("max_tokens") == "length"
assert transport.map_finish_reason("stop_sequence") == "stop"
assert transport.map_finish_reason("refusal") == "content_filter"
assert transport.map_finish_reason("model_context_window_exceeded") == "length"
assert transport.map_finish_reason("unknown") == "stop"
def test_extract_cache_stats_none_usage(self, transport):
r = SimpleNamespace(usage=None)
assert transport.extract_cache_stats(r) is None
def test_extract_cache_stats_with_cache(self, transport):
usage = SimpleNamespace(cache_read_input_tokens=100, cache_creation_input_tokens=50)
r = SimpleNamespace(usage=usage)
result = transport.extract_cache_stats(r)
assert result == {"cached_tokens": 100, "creation_tokens": 50}
def test_extract_cache_stats_zero(self, transport):
usage = SimpleNamespace(cache_read_input_tokens=0, cache_creation_input_tokens=0)
r = SimpleNamespace(usage=usage)
assert transport.extract_cache_stats(r) is None
def test_normalize_response_text(self, transport):
r = SimpleNamespace(
content=[SimpleNamespace(type="text", text="Hello world")],
stop_reason="end_turn",
usage=SimpleNamespace(input_tokens=10, output_tokens=5),
model="claude-sonnet-4-6",
)
nr = transport.normalize_response(r)
assert isinstance(nr, NormalizedResponse)
assert nr.content == "Hello world"
assert nr.tool_calls is None or nr.tool_calls == []
assert nr.finish_reason == "stop"
def test_normalize_response_tool_calls(self, transport):
r = SimpleNamespace(
content=[
SimpleNamespace(type="tool_use", id="toolu_123", name="terminal", input={"command": "ls"}),
],
stop_reason="tool_use",
usage=SimpleNamespace(input_tokens=10, output_tokens=20),
model="claude-sonnet-4-6",
)
nr = transport.normalize_response(r)
assert nr.finish_reason == "tool_calls"
assert len(nr.tool_calls) == 1
tc = nr.tool_calls[0]
assert tc.name == "terminal"
assert tc.id == "toolu_123"
assert '"command"' in tc.arguments
def test_normalize_response_thinking(self, transport):
r = SimpleNamespace(
content=[
SimpleNamespace(type="thinking", thinking="Let me think..."),
SimpleNamespace(type="text", text="The answer is 42"),
],
stop_reason="end_turn",
usage=SimpleNamespace(input_tokens=10, output_tokens=15),
model="claude-sonnet-4-6",
)
nr = transport.normalize_response(r)
assert nr.content == "The answer is 42"
assert nr.reasoning == "Let me think..."
def test_build_kwargs_returns_dict(self, transport):
messages = [{"role": "user", "content": "Hello"}]
kw = transport.build_kwargs(
model="claude-sonnet-4-6",
messages=messages,
max_tokens=1024,
)
assert isinstance(kw, dict)
assert "model" in kw
assert "max_tokens" in kw
assert "messages" in kw
def test_convert_messages_extracts_system(self, transport):
messages = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hi"},
]
system, msgs = transport.convert_messages(messages)
assert system is not None
assert len(msgs) >= 1

View File

@@ -0,0 +1,130 @@
"""Tests for agent/transports/types.py — dataclass construction + helpers."""
import json
from agent.transports.types import (
NormalizedResponse,
ToolCall,
Usage,
build_tool_call,
map_finish_reason,
)
class TestToolCall:
def test_basic_construction(self):
tc = ToolCall(id="call_abc", name="terminal", arguments='{"cmd": "ls"}')
assert tc.id == "call_abc"
assert tc.name == "terminal"
assert tc.arguments == '{"cmd": "ls"}'
assert tc.provider_data is None
def test_none_id(self):
tc = ToolCall(id=None, name="read_file", arguments="{}")
assert tc.id is None
def test_provider_data(self):
tc = ToolCall(
id="call_x",
name="t",
arguments="{}",
provider_data={"call_id": "call_x", "response_item_id": "fc_x"},
)
assert tc.provider_data["call_id"] == "call_x"
assert tc.provider_data["response_item_id"] == "fc_x"
class TestUsage:
def test_defaults(self):
u = Usage()
assert u.prompt_tokens == 0
assert u.completion_tokens == 0
assert u.total_tokens == 0
assert u.cached_tokens == 0
def test_explicit(self):
u = Usage(prompt_tokens=100, completion_tokens=50, total_tokens=150, cached_tokens=80)
assert u.total_tokens == 150
class TestNormalizedResponse:
def test_text_only(self):
r = NormalizedResponse(content="hello", tool_calls=None, finish_reason="stop")
assert r.content == "hello"
assert r.tool_calls is None
assert r.finish_reason == "stop"
assert r.reasoning is None
assert r.usage is None
assert r.provider_data is None
def test_with_tool_calls(self):
tcs = [ToolCall(id="call_1", name="terminal", arguments='{"cmd":"pwd"}')]
r = NormalizedResponse(content=None, tool_calls=tcs, finish_reason="tool_calls")
assert r.finish_reason == "tool_calls"
assert len(r.tool_calls) == 1
assert r.tool_calls[0].name == "terminal"
def test_with_reasoning(self):
r = NormalizedResponse(
content="answer",
tool_calls=None,
finish_reason="stop",
reasoning="I thought about it",
)
assert r.reasoning == "I thought about it"
def test_with_provider_data(self):
r = NormalizedResponse(
content=None,
tool_calls=None,
finish_reason="stop",
provider_data={"reasoning_details": [{"type": "thinking", "thinking": "hmm"}]},
)
assert r.provider_data["reasoning_details"][0]["type"] == "thinking"
class TestBuildToolCall:
def test_dict_arguments_serialized(self):
tc = build_tool_call(id="call_1", name="terminal", arguments={"cmd": "ls"})
assert tc.arguments == json.dumps({"cmd": "ls"})
assert tc.provider_data is None
def test_string_arguments_passthrough(self):
tc = build_tool_call(id="call_2", name="read_file", arguments='{"path": "/tmp"}')
assert tc.arguments == '{"path": "/tmp"}'
def test_provider_fields(self):
tc = build_tool_call(
id="call_3",
name="terminal",
arguments="{}",
call_id="call_3",
response_item_id="fc_3",
)
assert tc.provider_data == {"call_id": "call_3", "response_item_id": "fc_3"}
def test_none_id(self):
tc = build_tool_call(id=None, name="t", arguments="{}")
assert tc.id is None
class TestMapFinishReason:
ANTHROPIC_MAP = {
"end_turn": "stop",
"tool_use": "tool_calls",
"max_tokens": "length",
"stop_sequence": "stop",
"refusal": "content_filter",
}
def test_known_reason(self):
assert map_finish_reason("end_turn", self.ANTHROPIC_MAP) == "stop"
assert map_finish_reason("tool_use", self.ANTHROPIC_MAP) == "tool_calls"
assert map_finish_reason("max_tokens", self.ANTHROPIC_MAP) == "length"
assert map_finish_reason("refusal", self.ANTHROPIC_MAP) == "content_filter"
def test_unknown_reason_defaults_to_stop(self):
assert map_finish_reason("something_new", self.ANTHROPIC_MAP) == "stop"
def test_none_reason(self):
assert map_finish_reason(None, self.ANTHROPIC_MAP) == "stop"

View File

@@ -596,26 +596,3 @@ class TestAvailability:
monkeypatch.setenv("HINDSIGHT_MODE", "local")
p = HindsightMemoryProvider()
assert p.is_available()
def test_local_embedded_profile_defaults_to_agent_identity(self, tmp_path, monkeypatch):
config_path = tmp_path / "hindsight" / "config.json"
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(json.dumps({
"mode": "local_embedded",
"llm_provider": "ollama",
"llm_model": "gemma3:12b",
}))
monkeypatch.setattr(
"plugins.memory.hindsight.get_hermes_home",
lambda: tmp_path,
)
p = HindsightMemoryProvider()
p.initialize(
session_id="test-session",
hermes_home=str(tmp_path / "profiles" / "atlas-hindsight"),
platform="cli",
agent_identity="atlas-hindsight",
)
assert p._config["profile"] == "atlas-hindsight"

View File

@@ -1,94 +0,0 @@
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import importlib
import sys
import types
def _make_tool_defs(*names: str) -> list:
return [
{
"type": "function",
"function": {
"name": n,
"description": f"{n} tool",
"parameters": {"type": "object", "properties": {}},
},
}
for n in names
]
def _mock_response(content="Done", finish_reason="stop"):
msg = SimpleNamespace(content=content, tool_calls=None)
choice = SimpleNamespace(message=msg, finish_reason=finish_reason)
return SimpleNamespace(choices=[choice], usage=SimpleNamespace(prompt_tokens=1, completion_tokens=1, total_tokens=2))
def _load_ai_agent():
sys.modules.setdefault("agent.auxiliary_client", types.SimpleNamespace(call_llm=lambda *a, **k: ""))
run_agent = importlib.import_module("run_agent")
return run_agent.AIAgent
def test_run_conversation_writes_context_snapshot_artifacts(tmp_path):
AIAgent = _load_ai_agent()
class _FakePrivacyFilter:
def __init__(self):
self.last_report = None
def sanitize_messages(self, messages):
return list(messages)
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch("hermes_cli.plugins.invoke_hook", return_value=[]),
patch.dict(sys.modules, {"agent.privacy_filter": types.SimpleNamespace(PrivacyFilter=_FakePrivacyFilter)}),
):
agent = AIAgent(
api_key="test-key-1234567890",
base_url="https://example.com/v1",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
context_snapshots_enabled=True,
context_snapshots_dir=str(tmp_path),
)
agent.client = MagicMock()
agent.client.chat.completions.create.return_value = _mock_response(content="Done")
agent._build_system_prompt = MagicMock(return_value="Core system prompt")
agent._memory_manager = MagicMock()
agent._memory_manager.prefetch_all.return_value = "- remembered preference"
agent._memory_manager.build_system_prompt.return_value = "# Hindsight Memory\nActive."
agent._memory_manager.providers = [
SimpleNamespace(name="builtin"),
SimpleNamespace(name="hindsight"),
]
result = agent.run_conversation("What do I prefer?")
assert result["final_response"] == "Done"
call_dir = tmp_path / agent.session_id / "call_001"
assert call_dir.exists()
assert (call_dir / "system_prompt.txt").read_text(encoding="utf-8") == "Core system prompt"
assert (call_dir / "memory_provider_system_prompt.txt").read_text(encoding="utf-8").startswith("# Hindsight Memory")
assert (call_dir / "memory_prefetch_raw.txt").read_text(encoding="utf-8") == "- remembered preference"
assert "<memory-context>" in (call_dir / "memory_context_block.txt").read_text(encoding="utf-8")
api_user_message = (call_dir / "api_user_message.txt").read_text(encoding="utf-8")
assert "What do I prefer?" in api_user_message
assert "remembered preference" in api_user_message
api_messages = (call_dir / "api_messages.json").read_text(encoding="utf-8")
assert '"role": "system"' in api_messages
assert '"role": "user"' in api_messages
metadata = (call_dir / "metadata.json").read_text(encoding="utf-8")
assert '"hindsight"' in metadata

View File

@@ -1,22 +0,0 @@
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DOC = ROOT / "docs" / "hindsight-local-eval.md"
def test_hindsight_local_eval_doc_exists_and_covers_switching():
assert DOC.exists(), "missing Hindsight local eval doc"
text = DOC.read_text(encoding="utf-8")
for snippet in (
"atlas-baseline",
"atlas-mempalace",
"atlas-hindsight",
"HERMES_HOME",
"HERMES_CONTEXT_SNAPSHOTS",
"memory_prefetch_raw.txt",
"api_user_message.txt",
"local_embedded",
"hindsight/config.json",
):
assert snippet in text