Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
7c38007094 feat(memory): add grounded observation synthesis layer
All checks were successful
Lint / lint (pull_request) Successful in 16s
2026-04-22 10:59:40 -04:00
12 changed files with 564 additions and 878 deletions

View File

@@ -1396,8 +1396,6 @@ def normalize_anthropic_response(
"tool_use": "tool_calls",
"max_tokens": "length",
"stop_sequence": "stop",
"refusal": "content_filter",
"model_context_window_exceeded": "length",
}
finish_reason = stop_reason_map.get(response.stop_reason, "stop")
@@ -1411,42 +1409,3 @@ def normalize_anthropic_response(
),
finish_reason,
)
def normalize_anthropic_response_v2(
response,
strip_tool_prefix: bool = False,
) -> "NormalizedResponse":
"""Normalize Anthropic response to NormalizedResponse.
Wraps the existing normalize_anthropic_response() and maps its output
to the shared transport types. This allows incremental migration
without disturbing the legacy call sites.
"""
from agent.transports.types import NormalizedResponse, build_tool_call
assistant_msg, finish_reason = normalize_anthropic_response(response, strip_tool_prefix)
tool_calls = None
if assistant_msg.tool_calls:
tool_calls = [
build_tool_call(
id=tc.id,
name=tc.function.name,
arguments=tc.function.arguments,
)
for tc in assistant_msg.tool_calls
]
provider_data = {}
if getattr(assistant_msg, "reasoning_details", None):
provider_data["reasoning_details"] = assistant_msg.reasoning_details
return NormalizedResponse(
content=assistant_msg.content,
tool_calls=tool_calls,
finish_reason=finish_reason,
reasoning=getattr(assistant_msg, "reasoning", None),
usage=None,
provider_data=provider_data or None,
)

View File

@@ -1,57 +0,0 @@
"""Transport layer types and registry for provider response normalization.
Usage:
from agent.transports import get_transport
transport = get_transport("anthropic_messages")
result = transport.normalize_response(raw_response)
"""
from agent.transports.types import ( # noqa: F401
NormalizedResponse,
ToolCall,
Usage,
build_tool_call,
map_finish_reason,
)
_REGISTRY: dict = {}
def register_transport(api_mode: str, transport_cls: type) -> None:
"""Register a transport class for an api_mode string."""
_REGISTRY[api_mode] = transport_cls
def get_transport(api_mode: str):
"""Get a transport instance for the given api_mode.
Returns None if no transport is registered for this api_mode.
This allows gradual migration — call sites can check for None
and fall back to the legacy code path.
"""
if not _REGISTRY:
_discover_transports()
cls = _REGISTRY.get(api_mode)
if cls is None:
return None
return cls()
def _discover_transports() -> None:
"""Import all transport modules to trigger auto-registration."""
try:
import agent.transports.anthropic # noqa: F401
except ImportError:
pass
try:
import agent.transports.codex # noqa: F401
except ImportError:
pass
try:
import agent.transports.chat_completions # noqa: F401
except ImportError:
pass
try:
import agent.transports.bedrock # noqa: F401
except ImportError:
pass

View File

@@ -1,95 +0,0 @@
"""Anthropic Messages API transport.
Delegates to the existing adapter functions in agent/anthropic_adapter.py.
This transport owns format conversion and normalization — NOT client lifecycle.
"""
from typing import Any, Dict, List, Optional
from agent.transports.base import ProviderTransport
from agent.transports.types import NormalizedResponse
class AnthropicTransport(ProviderTransport):
"""Transport for api_mode='anthropic_messages'."""
@property
def api_mode(self) -> str:
return "anthropic_messages"
def convert_messages(self, messages: List[Dict[str, Any]], **kwargs) -> Any:
from agent.anthropic_adapter import convert_messages_to_anthropic
base_url = kwargs.get("base_url")
return convert_messages_to_anthropic(messages, base_url=base_url)
def convert_tools(self, tools: List[Dict[str, Any]]) -> Any:
from agent.anthropic_adapter import convert_tools_to_anthropic
return convert_tools_to_anthropic(tools)
def build_kwargs(
self,
model: str,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
**params,
) -> Dict[str, Any]:
from agent.anthropic_adapter import build_anthropic_kwargs
return build_anthropic_kwargs(
model=model,
messages=messages,
tools=tools,
max_tokens=params.get("max_tokens", 16384),
reasoning_config=params.get("reasoning_config"),
tool_choice=params.get("tool_choice"),
is_oauth=params.get("is_oauth", False),
preserve_dots=params.get("preserve_dots", False),
context_length=params.get("context_length"),
base_url=params.get("base_url"),
fast_mode=params.get("fast_mode", False),
)
def normalize_response(self, response: Any, **kwargs) -> NormalizedResponse:
from agent.anthropic_adapter import normalize_anthropic_response_v2
strip_tool_prefix = kwargs.get("strip_tool_prefix", False)
return normalize_anthropic_response_v2(response, strip_tool_prefix=strip_tool_prefix)
def validate_response(self, response: Any) -> bool:
if response is None:
return False
content_blocks = getattr(response, "content", None)
if not isinstance(content_blocks, list):
return False
if not content_blocks:
return False
return True
def extract_cache_stats(self, response: Any):
usage = getattr(response, "usage", None)
if usage is None:
return None
cached = getattr(usage, "cache_read_input_tokens", 0) or 0
written = getattr(usage, "cache_creation_input_tokens", 0) or 0
if cached or written:
return {"cached_tokens": cached, "creation_tokens": written}
return None
_STOP_REASON_MAP = {
"end_turn": "stop",
"tool_use": "tool_calls",
"max_tokens": "length",
"stop_sequence": "stop",
"refusal": "content_filter",
"model_context_window_exceeded": "length",
}
def map_finish_reason(self, raw_reason: str) -> str:
return self._STOP_REASON_MAP.get(raw_reason, "stop")
from agent.transports import register_transport # noqa: E402
register_transport("anthropic_messages", AnthropicTransport)

View File

@@ -1,61 +0,0 @@
"""Abstract base for provider transports.
A transport owns the data path for one api_mode:
convert_messages → convert_tools → build_kwargs → normalize_response
It does NOT own: client construction, streaming, credential refresh,
prompt caching, interrupt handling, or retry logic. Those stay on AIAgent.
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from agent.transports.types import NormalizedResponse
class ProviderTransport(ABC):
"""Base class for provider-specific format conversion and normalization."""
@property
@abstractmethod
def api_mode(self) -> str:
"""The api_mode string this transport handles."""
...
@abstractmethod
def convert_messages(self, messages: List[Dict[str, Any]], **kwargs) -> Any:
"""Convert OpenAI-format messages to provider-native format."""
...
@abstractmethod
def convert_tools(self, tools: List[Dict[str, Any]]) -> Any:
"""Convert OpenAI-format tool definitions to provider-native format."""
...
@abstractmethod
def build_kwargs(
self,
model: str,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
**params,
) -> Dict[str, Any]:
"""Build the complete provider kwargs dict."""
...
@abstractmethod
def normalize_response(self, response: Any, **kwargs) -> NormalizedResponse:
"""Normalize a raw provider response to the shared NormalizedResponse type."""
...
def validate_response(self, response: Any) -> bool:
"""Optional structural validation for raw responses."""
return True
def extract_cache_stats(self, response: Any) -> Optional[Dict[str, int]]:
"""Optional cache stats extraction."""
return None
def map_finish_reason(self, raw_reason: str) -> str:
"""Optional stop-reason mapping. Defaults to passthrough."""
return raw_reason

View File

@@ -1,58 +0,0 @@
"""Shared types for normalized provider responses."""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
@dataclass
class ToolCall:
"""A normalized tool call from any provider."""
id: Optional[str]
name: str
arguments: str
provider_data: Optional[Dict[str, Any]] = field(default=None, repr=False)
@dataclass
class Usage:
"""Token usage from an API response."""
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
cached_tokens: int = 0
@dataclass
class NormalizedResponse:
"""Normalized API response from any provider."""
content: Optional[str]
tool_calls: Optional[List[ToolCall]]
finish_reason: str
reasoning: Optional[str] = None
usage: Optional[Usage] = None
provider_data: Optional[Dict[str, Any]] = field(default=None, repr=False)
def build_tool_call(
id: Optional[str],
name: str,
arguments: Any,
**provider_fields: Any,
) -> ToolCall:
"""Build a ToolCall, auto-serialising dict arguments."""
args_str = json.dumps(arguments) if isinstance(arguments, dict) else str(arguments)
provider_data = dict(provider_fields) if provider_fields else None
return ToolCall(id=id, name=name, arguments=args_str, provider_data=provider_data)
def map_finish_reason(reason: Optional[str], mapping: Dict[str, str]) -> str:
"""Translate a provider-specific stop reason to the normalized set."""
if reason is None:
return "stop"
return mapping.get(reason, "stop")

View File

@@ -26,6 +26,7 @@ from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
from .store import MemoryStore
from .retrieval import FactRetriever
from .observations import ObservationSynthesizer
logger = logging.getLogger(__name__)
@@ -37,28 +38,29 @@ logger = logging.getLogger(__name__)
FACT_STORE_SCHEMA = {
"name": "fact_store",
"description": (
"Deep structured memory with algebraic reasoning. "
"Deep structured memory with algebraic reasoning and grounded observation synthesis. "
"Use alongside the memory tool — memory for always-on context, "
"fact_store for deep recall and compositional queries.\n\n"
"fact_store for deep recall, compositional queries, and higher-order observations.\n\n"
"ACTIONS (simple → powerful):\n"
"• add — Store a fact the user would expect you to remember.\n"
"• search — Keyword lookup ('editor config', 'deploy process').\n"
"• probe — Entity recall: ALL facts about a person/thing.\n"
"• related — What connects to an entity? Structural adjacency.\n"
"• reason — Compositional: facts connected to MULTIPLE entities simultaneously.\n"
"• observe — Synthesized higher-order observations backed by supporting facts.\n"
"• contradict — Memory hygiene: find facts making conflicting claims.\n"
"• update/remove/list — CRUD operations.\n\n"
"IMPORTANT: Before answering questions about the user, ALWAYS probe or reason first."
"IMPORTANT: Before answering questions about the user, ALWAYS probe/reason/observe first."
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list"],
"enum": ["add", "search", "probe", "related", "reason", "observe", "contradict", "update", "remove", "list"],
},
"content": {"type": "string", "description": "Fact content (required for 'add')."},
"query": {"type": "string", "description": "Search query (required for 'search')."},
"query": {"type": "string", "description": "Search query (required for 'search'/'observe')."},
"entity": {"type": "string", "description": "Entity name for 'probe'/'related'."},
"entities": {"type": "array", "items": {"type": "string"}, "description": "Entity names for 'reason'."},
"fact_id": {"type": "integer", "description": "Fact ID for 'update'/'remove'."},
@@ -66,6 +68,12 @@ FACT_STORE_SCHEMA = {
"tags": {"type": "string", "description": "Comma-separated tags."},
"trust_delta": {"type": "number", "description": "Trust adjustment for 'update'."},
"min_trust": {"type": "number", "description": "Minimum trust filter (default: 0.3)."},
"min_confidence": {"type": "number", "description": "Minimum observation confidence (default: 0.6)."},
"observation_type": {
"type": "string",
"enum": ["recurring_preference", "stable_direction", "behavioral_pattern"],
"description": "Optional observation type filter for 'observe'.",
},
"limit": {"type": "integer", "description": "Max results (default: 10)."},
},
"required": ["action"],
@@ -118,7 +126,9 @@ class HolographicMemoryProvider(MemoryProvider):
self._config = config or _load_plugin_config()
self._store = None
self._retriever = None
self._observation_synth = None
self._min_trust = float(self._config.get("min_trust_threshold", 0.3))
self._observation_min_confidence = float(self._config.get("observation_min_confidence", 0.6))
@property
def name(self) -> str:
@@ -177,6 +187,7 @@ class HolographicMemoryProvider(MemoryProvider):
hrr_weight=hrr_weight,
hrr_dim=hrr_dim,
)
self._observation_synth = ObservationSynthesizer(self._store)
self._session_id = session_id
def system_prompt_block(self) -> str:
@@ -193,30 +204,76 @@ class HolographicMemoryProvider(MemoryProvider):
"# Holographic Memory\n"
"Active. Empty fact store — proactively add facts the user would expect you to remember.\n"
"Use fact_store(action='add') to store durable structured facts about people, projects, preferences, decisions.\n"
"Use fact_store(action='observe') to synthesize higher-order observations with evidence.\n"
"Use fact_feedback to rate facts after using them (trains trust scores)."
)
return (
f"# Holographic Memory\n"
f"Active. {total} facts stored with entity resolution and trust scoring.\n"
f"Use fact_store to search, probe entities, reason across entities, or add facts.\n"
f"Use fact_store to search, probe entities, reason across entities, or synthesize observations.\n"
f"Use fact_feedback to rate facts after using them (trains trust scores)."
)
def prefetch(self, query: str, *, session_id: str = "") -> str:
if not self._retriever or not query:
if not query:
return ""
parts = []
raw_results = []
try:
results = self._retriever.search(query, min_trust=self._min_trust, limit=5)
if not results:
return ""
if self._retriever:
raw_results = self._retriever.search(query, min_trust=self._min_trust, limit=5)
except Exception as e:
logger.debug("Holographic prefetch fact search failed: %s", e)
raw_results = []
observations = []
try:
if self._observation_synth:
observations = self._observation_synth.observe(
query,
min_confidence=self._observation_min_confidence,
limit=3,
refresh=True,
)
except Exception as e:
logger.debug("Holographic prefetch observation search failed: %s", e)
observations = []
if not raw_results and observations:
seen_fact_ids = set()
evidence_backfill = []
for observation in observations:
for evidence in observation.get("evidence", []):
fact_id = evidence.get("fact_id")
if fact_id in seen_fact_ids:
continue
seen_fact_ids.add(fact_id)
evidence_backfill.append(evidence)
raw_results = evidence_backfill[:5]
if raw_results:
lines = []
for r in results:
for r in raw_results:
trust = r.get("trust_score", r.get("trust", 0))
lines.append(f"- [{trust:.1f}] {r.get('content', '')}")
return "## Holographic Memory\n" + "\n".join(lines)
except Exception as e:
logger.debug("Holographic prefetch failed: %s", e)
return ""
parts.append("## Holographic Memory\n" + "\n".join(lines))
if observations:
lines = []
for observation in observations:
evidence_ids = ", ".join(
f"#{item['fact_id']}" for item in observation.get("evidence", [])[:3]
) or "none"
lines.append(
f"- [{observation.get('confidence', 0.0):.2f}] "
f"{observation.get('observation_type', 'observation')}: "
f"{observation.get('summary', '')} "
f"(evidence: {evidence_ids})"
)
parts.append("## Holographic Observations\n" + "\n".join(lines))
return "\n\n".join(parts)
def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None:
# Holographic memory stores explicit facts via tools, not auto-sync.
@@ -252,6 +309,7 @@ class HolographicMemoryProvider(MemoryProvider):
def shutdown(self) -> None:
self._store = None
self._retriever = None
self._observation_synth = None
# -- Tool handlers -------------------------------------------------------
@@ -305,6 +363,19 @@ class HolographicMemoryProvider(MemoryProvider):
)
return json.dumps({"results": results, "count": len(results)})
elif action == "observe":
synthesizer = self._observation_synth
if not synthesizer:
return tool_error("Observation synthesizer is not initialized")
observations = synthesizer.observe(
args.get("query", ""),
observation_type=args.get("observation_type"),
min_confidence=float(args.get("min_confidence", self._observation_min_confidence)),
limit=int(args.get("limit", 10)),
refresh=True,
)
return json.dumps({"observations": observations, "count": len(observations)})
elif action == "contradict":
results = retriever.contradict(
category=args.get("category"),

View File

@@ -0,0 +1,249 @@
"""Higher-order observation synthesis for holographic memory.
Builds grounded observations from accumulated facts and keeps them in a
separate retrieval layer with explicit evidence links back to supporting facts.
"""
from __future__ import annotations
import re
from typing import Any
from .store import MemoryStore
_TOKEN_RE = re.compile(r"[a-z0-9_]+")
_HIGHER_ORDER_CUES = {
"prefer",
"preference",
"preferences",
"style",
"pattern",
"patterns",
"behavior",
"behaviour",
"habit",
"habits",
"workflow",
"direction",
"trajectory",
"strategy",
"tend",
"usually",
}
_OBSERVATION_PATTERNS = [
{
"observation_type": "recurring_preference",
"subject": "communication_style",
"categories": {"user_pref", "general"},
"labels": {
"concise": ["concise", "terse", "brief", "short", "no fluff"],
"result_first": ["result-only", "result only", "outcome only", "quick", "quickly"],
"silent_ops": ["silent", "no status", "no repetitive status", "no questions"],
},
"summary_prefix": "Recurring preference",
},
{
"observation_type": "stable_direction",
"subject": "project_direction",
"categories": {"project", "general", "tool"},
"labels": {
"local_first": ["local-first", "local first", "local-only", "local only", "ollama", "own hardware"],
"gitea_first": ["gitea-first", "gitea first", "forge", "pull request", "pr flow", "issue flow"],
"ansible": ["ansible", "playbook", "role", "deploy via ansible"],
},
"summary_prefix": "Stable direction",
},
{
"observation_type": "behavioral_pattern",
"subject": "operator_workflow",
"categories": {"general", "project", "tool", "user_pref"},
"labels": {
"commit_early": ["commit early", "commits early", "commit after", "wip commit"],
"pr_first": ["open pr", "push a pr", "pull request", "pr immediately", "create pr"],
"dedup_guard": ["no dupes", "no duplicates", "avoid duplicate", "existing pr"],
},
"summary_prefix": "Behavioral pattern",
},
]
_TYPE_QUERY_HINTS = {
"recurring_preference": {"prefer", "preference", "style", "communication", "likes", "wants"},
"stable_direction": {"direction", "trajectory", "strategy", "project", "roadmap", "moving"},
"behavioral_pattern": {"pattern", "behavior", "workflow", "habit", "operator", "agent", "usually"},
}
class ObservationSynthesizer:
"""Synthesizes grounded observations from facts and retrieves them by query."""
def __init__(self, store: MemoryStore):
self.store = store
def synthesize(
self,
*,
persist: bool = True,
min_confidence: float = 0.6,
limit: int = 10,
) -> list[dict[str, Any]]:
facts = self.store.list_facts(min_trust=0.0, limit=1000)
observations: list[dict[str, Any]] = []
for pattern in _OBSERVATION_PATTERNS:
candidate = self._build_candidate(pattern, facts, min_confidence=min_confidence)
if not candidate:
continue
if persist:
candidate["observation_id"] = self.store.upsert_observation(
candidate["observation_type"],
candidate["subject"],
candidate["summary"],
candidate["confidence"],
candidate["evidence_fact_ids"],
metadata=candidate["metadata"],
)
candidate["evidence"] = self._expand_evidence(candidate["evidence_fact_ids"])
candidate["evidence_count"] = len(candidate["evidence"])
candidate.pop("evidence_fact_ids", None)
observations.append(candidate)
observations.sort(
key=lambda item: (item["confidence"], item.get("evidence_count", 0)),
reverse=True,
)
return observations[:limit]
def observe(
self,
query: str = "",
*,
observation_type: str | None = None,
min_confidence: float = 0.6,
limit: int = 10,
refresh: bool = True,
) -> list[dict[str, Any]]:
if refresh:
self.synthesize(persist=True, min_confidence=min_confidence, limit=limit)
observations = self.store.list_observations(
observation_type=observation_type,
min_confidence=min_confidence,
limit=max(limit * 4, 20),
)
if not observations:
return []
if not query:
return observations[:limit]
query_tokens = self._tokenize(query)
is_higher_order = bool(query_tokens & _HIGHER_ORDER_CUES)
ranked: list[dict[str, Any]] = []
for item in observations:
searchable = " ".join(
[
item.get("summary", ""),
item.get("subject", ""),
item.get("observation_type", ""),
" ".join(item.get("metadata", {}).get("labels", [])),
]
)
overlap = self._overlap_score(query_tokens, self._tokenize(searchable))
type_bonus = self._type_bonus(query_tokens, item.get("observation_type", ""))
if overlap <= 0 and type_bonus <= 0 and not is_higher_order:
continue
ranked_item = dict(item)
ranked_item["score"] = round(item.get("confidence", 0.0) + overlap + type_bonus, 3)
ranked.append(ranked_item)
if not ranked and is_higher_order:
ranked = [
{**item, "score": round(float(item.get("confidence", 0.0)), 3)}
for item in observations
]
ranked.sort(
key=lambda item: (item.get("score", 0.0), item.get("confidence", 0.0), item.get("evidence_count", 0)),
reverse=True,
)
return ranked[:limit]
def _build_candidate(
self,
pattern: dict[str, Any],
facts: list[dict[str, Any]],
*,
min_confidence: float,
) -> dict[str, Any] | None:
matched_fact_ids: set[int] = set()
matched_labels: dict[str, set[int]] = {label: set() for label in pattern["labels"]}
for fact in facts:
if fact.get("category") not in pattern["categories"]:
continue
haystack = f"{fact.get('content', '')} {fact.get('tags', '')}".lower()
local_match = False
for label, keywords in pattern["labels"].items():
if any(keyword in haystack for keyword in keywords):
matched_labels[label].add(int(fact["fact_id"]))
local_match = True
if local_match:
matched_fact_ids.add(int(fact["fact_id"]))
if len(matched_fact_ids) < 2:
return None
active_labels = sorted(label for label, ids in matched_labels.items() if ids)
confidence = min(0.95, 0.35 + 0.12 * len(matched_fact_ids) + 0.08 * len(active_labels))
confidence = round(confidence, 3)
if confidence < min_confidence:
return None
label_summary = ", ".join(label.replace("_", "-") for label in active_labels)
subject_text = pattern["subject"].replace("_", " ")
summary = (
f"{pattern['summary_prefix']}: {subject_text} trends toward {label_summary} "
f"based on {len(matched_fact_ids)} supporting facts."
)
return {
"observation_type": pattern["observation_type"],
"subject": pattern["subject"],
"summary": summary,
"confidence": confidence,
"metadata": {
"labels": active_labels,
"evidence_count": len(matched_fact_ids),
},
"evidence_fact_ids": sorted(matched_fact_ids),
}
def _expand_evidence(self, fact_ids: list[int]) -> list[dict[str, Any]]:
facts_by_id = {
fact["fact_id"]: fact
for fact in self.store.list_facts(min_trust=0.0, limit=1000)
}
return [facts_by_id[fact_id] for fact_id in fact_ids if fact_id in facts_by_id]
@staticmethod
def _tokenize(text: str) -> set[str]:
return set(_TOKEN_RE.findall(text.lower()))
@staticmethod
def _overlap_score(query_tokens: set[str], text_tokens: set[str]) -> float:
if not query_tokens or not text_tokens:
return 0.0
overlap = query_tokens & text_tokens
if not overlap:
return 0.0
return round(len(overlap) / max(len(query_tokens), 1), 3)
@staticmethod
def _type_bonus(query_tokens: set[str], observation_type: str) -> float:
hints = _TYPE_QUERY_HINTS.get(observation_type, set())
if not hints:
return 0.0
return 0.25 if query_tokens & hints else 0.0

View File

@@ -3,6 +3,7 @@ SQLite-backed fact store with entity resolution and trust scoring.
Single-user Hermes memory store plugin.
"""
import json
import re
import sqlite3
import threading
@@ -73,6 +74,28 @@ CREATE TABLE IF NOT EXISTS memory_banks (
fact_count INTEGER DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS observations (
observation_id INTEGER PRIMARY KEY AUTOINCREMENT,
observation_type TEXT NOT NULL,
subject TEXT NOT NULL,
summary TEXT NOT NULL,
confidence REAL DEFAULT 0.0,
metadata_json TEXT DEFAULT '{}',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(observation_type, subject)
);
CREATE TABLE IF NOT EXISTS observation_evidence (
observation_id INTEGER REFERENCES observations(observation_id) ON DELETE CASCADE,
fact_id INTEGER REFERENCES facts(fact_id) ON DELETE CASCADE,
evidence_weight REAL DEFAULT 1.0,
PRIMARY KEY (observation_id, fact_id)
);
CREATE INDEX IF NOT EXISTS idx_observations_type ON observations(observation_type);
CREATE INDEX IF NOT EXISTS idx_observations_confidence ON observations(confidence DESC);
"""
# Trust adjustment constants
@@ -128,6 +151,7 @@ class MemoryStore:
def _init_db(self) -> None:
"""Create tables, indexes, and triggers if they do not exist. Enable WAL mode."""
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA foreign_keys=ON")
self._conn.executescript(_SCHEMA)
# Migrate: add hrr_vector column if missing (safe for existing databases)
columns = {row[1] for row in self._conn.execute("PRAGMA table_info(facts)").fetchall()}
@@ -346,6 +370,115 @@ class MemoryStore:
rows = self._conn.execute(sql, params).fetchall()
return [self._row_to_dict(r) for r in rows]
def upsert_observation(
self,
observation_type: str,
subject: str,
summary: str,
confidence: float,
evidence_fact_ids: list[int],
metadata: dict | None = None,
) -> int:
"""Create or update a synthesized observation and its evidence links."""
with self._lock:
metadata_json = json.dumps(metadata or {}, sort_keys=True)
self._conn.execute(
"""
INSERT INTO observations (
observation_type, subject, summary, confidence, metadata_json
)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(observation_type, subject) DO UPDATE SET
summary = excluded.summary,
confidence = excluded.confidence,
metadata_json = excluded.metadata_json,
updated_at = CURRENT_TIMESTAMP
""",
(observation_type, subject, summary, confidence, metadata_json),
)
row = self._conn.execute(
"""
SELECT observation_id
FROM observations
WHERE observation_type = ? AND subject = ?
""",
(observation_type, subject),
).fetchone()
observation_id = int(row["observation_id"])
self._conn.execute(
"DELETE FROM observation_evidence WHERE observation_id = ?",
(observation_id,),
)
unique_fact_ids = sorted({int(fid) for fid in evidence_fact_ids})
if unique_fact_ids:
self._conn.executemany(
"""
INSERT OR IGNORE INTO observation_evidence (observation_id, fact_id)
VALUES (?, ?)
""",
[(observation_id, fact_id) for fact_id in unique_fact_ids],
)
self._conn.commit()
return observation_id
def list_observations(
self,
observation_type: str | None = None,
min_confidence: float = 0.0,
limit: int = 50,
) -> list[dict]:
"""List synthesized observations with expanded supporting evidence."""
with self._lock:
params: list = [min_confidence]
observation_clause = ""
if observation_type is not None:
observation_clause = "AND observation_type = ?"
params.append(observation_type)
params.append(limit)
rows = self._conn.execute(
f"""
SELECT observation_id, observation_type, subject, summary, confidence,
metadata_json, created_at, updated_at,
(
SELECT COUNT(*)
FROM observation_evidence oe
WHERE oe.observation_id = observations.observation_id
) AS evidence_count
FROM observations
WHERE confidence >= ?
{observation_clause}
ORDER BY confidence DESC, updated_at DESC
LIMIT ?
""",
params,
).fetchall()
results = []
for row in rows:
item = dict(row)
try:
item["metadata"] = json.loads(item.pop("metadata_json") or "{}")
except json.JSONDecodeError:
item["metadata"] = {}
item["evidence"] = self._get_observation_evidence(int(item["observation_id"]))
results.append(item)
return results
def _get_observation_evidence(self, observation_id: int) -> list[dict]:
rows = self._conn.execute(
"""
SELECT f.fact_id, f.content, f.category, f.tags, f.trust_score,
f.retrieval_count, f.helpful_count, f.created_at, f.updated_at
FROM observation_evidence oe
JOIN facts f ON f.fact_id = oe.fact_id
WHERE oe.observation_id = ?
ORDER BY f.trust_score DESC, f.updated_at DESC
""",
(observation_id,),
).fetchall()
return [self._row_to_dict(row) for row in rows]
def record_feedback(self, fact_id: int, helpful: bool) -> dict:
"""Record user feedback and adjust trust asymmetrically.

View File

@@ -1,213 +0,0 @@
"""Regression tests: normalize_anthropic_response_v2 vs v1.
Constructs mock Anthropic responses and asserts that the v2 function
(returning NormalizedResponse) produces identical field values to the
original v1 function (returning SimpleNamespace + finish_reason).
"""
from types import SimpleNamespace
import pytest
from agent.anthropic_adapter import (
normalize_anthropic_response,
normalize_anthropic_response_v2,
)
from agent.transports.types import NormalizedResponse
def _text_block(text: str):
return SimpleNamespace(type="text", text=text)
def _thinking_block(thinking: str, signature: str = "sig_abc"):
return SimpleNamespace(type="thinking", thinking=thinking, signature=signature)
def _tool_use_block(id: str, name: str, input: dict):
return SimpleNamespace(type="tool_use", id=id, name=name, input=input)
def _response(content_blocks, stop_reason="end_turn"):
return SimpleNamespace(
content=content_blocks,
stop_reason=stop_reason,
usage=SimpleNamespace(input_tokens=10, output_tokens=5),
)
class TestTextOnly:
def setup_method(self):
self.resp = _response([_text_block("Hello world")])
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_type(self):
assert isinstance(self.v2, NormalizedResponse)
def test_content_matches(self):
assert self.v2.content == self.v1_msg.content
def test_finish_reason_matches(self):
assert self.v2.finish_reason == self.v1_finish
def test_no_tool_calls(self):
assert self.v2.tool_calls is None
assert self.v1_msg.tool_calls is None
def test_no_reasoning(self):
assert self.v2.reasoning is None
assert self.v1_msg.reasoning is None
class TestWithToolCalls:
def setup_method(self):
self.resp = _response(
[
_text_block("I'll check that"),
_tool_use_block("toolu_abc", "terminal", {"command": "ls"}),
_tool_use_block("toolu_def", "read_file", {"path": "/tmp"}),
],
stop_reason="tool_use",
)
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_finish_reason(self):
assert self.v2.finish_reason == "tool_calls"
assert self.v1_finish == "tool_calls"
def test_tool_call_count(self):
assert len(self.v2.tool_calls) == 2
assert len(self.v1_msg.tool_calls) == 2
def test_tool_call_ids_match(self):
for i in range(2):
assert self.v2.tool_calls[i].id == self.v1_msg.tool_calls[i].id
def test_tool_call_names_match(self):
assert self.v2.tool_calls[0].name == "terminal"
assert self.v2.tool_calls[1].name == "read_file"
for i in range(2):
assert self.v2.tool_calls[i].name == self.v1_msg.tool_calls[i].function.name
def test_tool_call_arguments_match(self):
for i in range(2):
assert self.v2.tool_calls[i].arguments == self.v1_msg.tool_calls[i].function.arguments
def test_content_preserved(self):
assert self.v2.content == self.v1_msg.content
assert "check that" in self.v2.content
class TestWithThinking:
def setup_method(self):
self.resp = _response([
_thinking_block("Let me think about this carefully..."),
_text_block("The answer is 42."),
])
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_reasoning_matches(self):
assert self.v2.reasoning == self.v1_msg.reasoning
assert "think about this" in self.v2.reasoning
def test_reasoning_details_in_provider_data(self):
v1_details = self.v1_msg.reasoning_details
v2_details = self.v2.provider_data.get("reasoning_details") if self.v2.provider_data else None
assert v1_details is not None
assert v2_details is not None
assert len(v2_details) == len(v1_details)
def test_content_excludes_thinking(self):
assert self.v2.content == "The answer is 42."
class TestMixed:
def setup_method(self):
self.resp = _response(
[
_thinking_block("Planning my approach..."),
_text_block("I'll run the command"),
_tool_use_block("toolu_xyz", "terminal", {"command": "pwd"}),
],
stop_reason="tool_use",
)
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_all_fields_present(self):
assert self.v2.content is not None
assert self.v2.tool_calls is not None
assert self.v2.reasoning is not None
assert self.v2.finish_reason == "tool_calls"
def test_content_matches(self):
assert self.v2.content == self.v1_msg.content
def test_reasoning_matches(self):
assert self.v2.reasoning == self.v1_msg.reasoning
def test_tool_call_matches(self):
assert self.v2.tool_calls[0].id == self.v1_msg.tool_calls[0].id
assert self.v2.tool_calls[0].name == self.v1_msg.tool_calls[0].function.name
class TestStopReasons:
@pytest.mark.parametrize("stop_reason,expected", [
("end_turn", "stop"),
("tool_use", "tool_calls"),
("max_tokens", "length"),
("stop_sequence", "stop"),
("refusal", "content_filter"),
("model_context_window_exceeded", "length"),
("unknown_future_reason", "stop"),
])
def test_stop_reason_mapping(self, stop_reason, expected):
resp = _response([_text_block("x")], stop_reason=stop_reason)
_v1_msg, v1_finish = normalize_anthropic_response(resp)
v2 = normalize_anthropic_response_v2(resp)
assert v2.finish_reason == v1_finish == expected
class TestStripToolPrefix:
def test_prefix_stripped(self):
resp = _response(
[_tool_use_block("toolu_1", "mcp_terminal", {"cmd": "ls"})],
stop_reason="tool_use",
)
v1_msg, _ = normalize_anthropic_response(resp, strip_tool_prefix=True)
v2 = normalize_anthropic_response_v2(resp, strip_tool_prefix=True)
assert v1_msg.tool_calls[0].function.name == "terminal"
assert v2.tool_calls[0].name == "terminal"
def test_prefix_kept(self):
resp = _response(
[_tool_use_block("toolu_1", "mcp_terminal", {"cmd": "ls"})],
stop_reason="tool_use",
)
v1_msg, _ = normalize_anthropic_response(resp, strip_tool_prefix=False)
v2 = normalize_anthropic_response_v2(resp, strip_tool_prefix=False)
assert v1_msg.tool_calls[0].function.name == "mcp_terminal"
assert v2.tool_calls[0].name == "mcp_terminal"
class TestEdgeCases:
def test_empty_content_blocks(self):
resp = _response([])
v1_msg, _v1_finish = normalize_anthropic_response(resp)
v2 = normalize_anthropic_response_v2(resp)
assert v2.content == v1_msg.content
assert v2.content is None
def test_no_reasoning_details_means_none_provider_data(self):
resp = _response([_text_block("hi")])
v2 = normalize_anthropic_response_v2(resp)
assert v2.provider_data is None
def test_v2_returns_dataclass_not_namespace(self):
resp = _response([_text_block("hi")])
v2 = normalize_anthropic_response_v2(resp)
assert isinstance(v2, NormalizedResponse)
assert not isinstance(v2, SimpleNamespace)

View File

@@ -1,208 +0,0 @@
"""Tests for the transport ABC, registry, and AnthropicTransport."""
from types import SimpleNamespace
import pytest
from agent.transports import _REGISTRY, get_transport, register_transport
from agent.transports.base import ProviderTransport
from agent.transports.types import NormalizedResponse
class TestProviderTransportABC:
def test_cannot_instantiate_abc(self):
with pytest.raises(TypeError):
ProviderTransport()
def test_concrete_must_implement_all_abstract(self):
class Incomplete(ProviderTransport):
@property
def api_mode(self):
return "test"
with pytest.raises(TypeError):
Incomplete()
def test_minimal_concrete(self):
class Minimal(ProviderTransport):
@property
def api_mode(self):
return "test_minimal"
def convert_messages(self, messages, **kw):
return messages
def convert_tools(self, tools):
return tools
def build_kwargs(self, model, messages, tools=None, **params):
return {"model": model, "messages": messages}
def normalize_response(self, response, **kw):
return NormalizedResponse(content="ok", tool_calls=None, finish_reason="stop")
t = Minimal()
assert t.api_mode == "test_minimal"
assert t.validate_response(None) is True
assert t.extract_cache_stats(None) is None
assert t.map_finish_reason("end_turn") == "end_turn"
class TestTransportRegistry:
def test_get_unregistered_returns_none(self):
assert get_transport("nonexistent_mode") is None
def test_anthropic_registered_on_import(self):
import agent.transports.anthropic # noqa: F401
t = get_transport("anthropic_messages")
assert t is not None
assert t.api_mode == "anthropic_messages"
def test_register_and_get(self):
class DummyTransport(ProviderTransport):
@property
def api_mode(self):
return "dummy_test"
def convert_messages(self, messages, **kw):
return messages
def convert_tools(self, tools):
return tools
def build_kwargs(self, model, messages, tools=None, **params):
return {}
def normalize_response(self, response, **kw):
return NormalizedResponse(content=None, tool_calls=None, finish_reason="stop")
register_transport("dummy_test", DummyTransport)
t = get_transport("dummy_test")
assert t.api_mode == "dummy_test"
_REGISTRY.pop("dummy_test", None)
class TestAnthropicTransport:
@pytest.fixture
def transport(self):
import agent.transports.anthropic # noqa: F401
return get_transport("anthropic_messages")
def test_api_mode(self, transport):
assert transport.api_mode == "anthropic_messages"
def test_convert_tools_simple(self, transport):
tools = [{
"type": "function",
"function": {
"name": "test_tool",
"description": "A test",
"parameters": {"type": "object", "properties": {}},
},
}]
result = transport.convert_tools(tools)
assert len(result) == 1
assert result[0]["name"] == "test_tool"
assert "input_schema" in result[0]
def test_validate_response_none(self, transport):
assert transport.validate_response(None) is False
def test_validate_response_empty_content(self, transport):
r = SimpleNamespace(content=[])
assert transport.validate_response(r) is False
def test_validate_response_valid(self, transport):
r = SimpleNamespace(content=[SimpleNamespace(type="text", text="hello")])
assert transport.validate_response(r) is True
def test_map_finish_reason(self, transport):
assert transport.map_finish_reason("end_turn") == "stop"
assert transport.map_finish_reason("tool_use") == "tool_calls"
assert transport.map_finish_reason("max_tokens") == "length"
assert transport.map_finish_reason("stop_sequence") == "stop"
assert transport.map_finish_reason("refusal") == "content_filter"
assert transport.map_finish_reason("model_context_window_exceeded") == "length"
assert transport.map_finish_reason("unknown") == "stop"
def test_extract_cache_stats_none_usage(self, transport):
r = SimpleNamespace(usage=None)
assert transport.extract_cache_stats(r) is None
def test_extract_cache_stats_with_cache(self, transport):
usage = SimpleNamespace(cache_read_input_tokens=100, cache_creation_input_tokens=50)
r = SimpleNamespace(usage=usage)
result = transport.extract_cache_stats(r)
assert result == {"cached_tokens": 100, "creation_tokens": 50}
def test_extract_cache_stats_zero(self, transport):
usage = SimpleNamespace(cache_read_input_tokens=0, cache_creation_input_tokens=0)
r = SimpleNamespace(usage=usage)
assert transport.extract_cache_stats(r) is None
def test_normalize_response_text(self, transport):
r = SimpleNamespace(
content=[SimpleNamespace(type="text", text="Hello world")],
stop_reason="end_turn",
usage=SimpleNamespace(input_tokens=10, output_tokens=5),
model="claude-sonnet-4-6",
)
nr = transport.normalize_response(r)
assert isinstance(nr, NormalizedResponse)
assert nr.content == "Hello world"
assert nr.tool_calls is None or nr.tool_calls == []
assert nr.finish_reason == "stop"
def test_normalize_response_tool_calls(self, transport):
r = SimpleNamespace(
content=[
SimpleNamespace(type="tool_use", id="toolu_123", name="terminal", input={"command": "ls"}),
],
stop_reason="tool_use",
usage=SimpleNamespace(input_tokens=10, output_tokens=20),
model="claude-sonnet-4-6",
)
nr = transport.normalize_response(r)
assert nr.finish_reason == "tool_calls"
assert len(nr.tool_calls) == 1
tc = nr.tool_calls[0]
assert tc.name == "terminal"
assert tc.id == "toolu_123"
assert '"command"' in tc.arguments
def test_normalize_response_thinking(self, transport):
r = SimpleNamespace(
content=[
SimpleNamespace(type="thinking", thinking="Let me think..."),
SimpleNamespace(type="text", text="The answer is 42"),
],
stop_reason="end_turn",
usage=SimpleNamespace(input_tokens=10, output_tokens=15),
model="claude-sonnet-4-6",
)
nr = transport.normalize_response(r)
assert nr.content == "The answer is 42"
assert nr.reasoning == "Let me think..."
def test_build_kwargs_returns_dict(self, transport):
messages = [{"role": "user", "content": "Hello"}]
kw = transport.build_kwargs(
model="claude-sonnet-4-6",
messages=messages,
max_tokens=1024,
)
assert isinstance(kw, dict)
assert "model" in kw
assert "max_tokens" in kw
assert "messages" in kw
def test_convert_messages_extracts_system(self, transport):
messages = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hi"},
]
system, msgs = transport.convert_messages(messages)
assert system is not None
assert len(msgs) >= 1

View File

@@ -1,130 +0,0 @@
"""Tests for agent/transports/types.py — dataclass construction + helpers."""
import json
from agent.transports.types import (
NormalizedResponse,
ToolCall,
Usage,
build_tool_call,
map_finish_reason,
)
class TestToolCall:
def test_basic_construction(self):
tc = ToolCall(id="call_abc", name="terminal", arguments='{"cmd": "ls"}')
assert tc.id == "call_abc"
assert tc.name == "terminal"
assert tc.arguments == '{"cmd": "ls"}'
assert tc.provider_data is None
def test_none_id(self):
tc = ToolCall(id=None, name="read_file", arguments="{}")
assert tc.id is None
def test_provider_data(self):
tc = ToolCall(
id="call_x",
name="t",
arguments="{}",
provider_data={"call_id": "call_x", "response_item_id": "fc_x"},
)
assert tc.provider_data["call_id"] == "call_x"
assert tc.provider_data["response_item_id"] == "fc_x"
class TestUsage:
def test_defaults(self):
u = Usage()
assert u.prompt_tokens == 0
assert u.completion_tokens == 0
assert u.total_tokens == 0
assert u.cached_tokens == 0
def test_explicit(self):
u = Usage(prompt_tokens=100, completion_tokens=50, total_tokens=150, cached_tokens=80)
assert u.total_tokens == 150
class TestNormalizedResponse:
def test_text_only(self):
r = NormalizedResponse(content="hello", tool_calls=None, finish_reason="stop")
assert r.content == "hello"
assert r.tool_calls is None
assert r.finish_reason == "stop"
assert r.reasoning is None
assert r.usage is None
assert r.provider_data is None
def test_with_tool_calls(self):
tcs = [ToolCall(id="call_1", name="terminal", arguments='{"cmd":"pwd"}')]
r = NormalizedResponse(content=None, tool_calls=tcs, finish_reason="tool_calls")
assert r.finish_reason == "tool_calls"
assert len(r.tool_calls) == 1
assert r.tool_calls[0].name == "terminal"
def test_with_reasoning(self):
r = NormalizedResponse(
content="answer",
tool_calls=None,
finish_reason="stop",
reasoning="I thought about it",
)
assert r.reasoning == "I thought about it"
def test_with_provider_data(self):
r = NormalizedResponse(
content=None,
tool_calls=None,
finish_reason="stop",
provider_data={"reasoning_details": [{"type": "thinking", "thinking": "hmm"}]},
)
assert r.provider_data["reasoning_details"][0]["type"] == "thinking"
class TestBuildToolCall:
def test_dict_arguments_serialized(self):
tc = build_tool_call(id="call_1", name="terminal", arguments={"cmd": "ls"})
assert tc.arguments == json.dumps({"cmd": "ls"})
assert tc.provider_data is None
def test_string_arguments_passthrough(self):
tc = build_tool_call(id="call_2", name="read_file", arguments='{"path": "/tmp"}')
assert tc.arguments == '{"path": "/tmp"}'
def test_provider_fields(self):
tc = build_tool_call(
id="call_3",
name="terminal",
arguments="{}",
call_id="call_3",
response_item_id="fc_3",
)
assert tc.provider_data == {"call_id": "call_3", "response_item_id": "fc_3"}
def test_none_id(self):
tc = build_tool_call(id=None, name="t", arguments="{}")
assert tc.id is None
class TestMapFinishReason:
ANTHROPIC_MAP = {
"end_turn": "stop",
"tool_use": "tool_calls",
"max_tokens": "length",
"stop_sequence": "stop",
"refusal": "content_filter",
}
def test_known_reason(self):
assert map_finish_reason("end_turn", self.ANTHROPIC_MAP) == "stop"
assert map_finish_reason("tool_use", self.ANTHROPIC_MAP) == "tool_calls"
assert map_finish_reason("max_tokens", self.ANTHROPIC_MAP) == "length"
assert map_finish_reason("refusal", self.ANTHROPIC_MAP) == "content_filter"
def test_unknown_reason_defaults_to_stop(self):
assert map_finish_reason("something_new", self.ANTHROPIC_MAP) == "stop"
def test_none_reason(self):
assert map_finish_reason(None, self.ANTHROPIC_MAP) == "stop"

View File

@@ -0,0 +1,96 @@
import json
import pytest
from plugins.memory.holographic import HolographicMemoryProvider
from plugins.memory.holographic.store import MemoryStore
@pytest.fixture()
def store(tmp_path):
db_path = tmp_path / "memory.db"
s = MemoryStore(db_path=str(db_path), default_trust=0.5)
yield s
s.close()
@pytest.fixture()
def provider(tmp_path):
p = HolographicMemoryProvider(
config={
"db_path": str(tmp_path / "memory.db"),
"default_trust": 0.5,
}
)
p.initialize(session_id="test-session")
yield p
if p._store:
p._store.close()
class TestObservationSynthesis:
def test_observe_action_persists_observation_with_evidence_links(self, provider):
fact_ids = [
provider._store.add_fact('User prefers concise status updates', category='user_pref'),
provider._store.add_fact('User wants result-only replies with no fluff', category='user_pref'),
]
result = json.loads(
provider.handle_tool_call(
'fact_store',
{
'action': 'observe',
'query': 'What communication style does the user prefer?',
'limit': 5,
},
)
)
assert result['count'] == 1
observation = result['observations'][0]
assert observation['observation_type'] == 'recurring_preference'
assert observation['confidence'] >= 0.6
assert sorted(item['fact_id'] for item in observation['evidence']) == sorted(fact_ids)
stored = provider._store.list_observations(limit=10)
assert len(stored) == 1
assert stored[0]['observation_type'] == 'recurring_preference'
assert stored[0]['evidence_count'] == 2
assert len(provider._store.list_facts(limit=10)) == 2
def test_observe_action_synthesizes_three_observation_types(self, provider):
provider._store.add_fact('User prefers concise updates', category='user_pref')
provider._store.add_fact('User wants result-only communication', category='user_pref')
provider._store.add_fact('Project is moving to a local-first deployment model', category='project')
provider._store.add_fact('Project direction stays Gitea-first for issue and PR flow', category='project')
provider._store.add_fact('Operator always commits early before moving on', category='general')
provider._store.add_fact('Operator pushes a PR immediately after each meaningful fix', category='general')
result = json.loads(provider.handle_tool_call('fact_store', {'action': 'observe', 'limit': 10}))
types = {item['observation_type'] for item in result['observations']}
assert {'recurring_preference', 'stable_direction', 'behavioral_pattern'} <= types
def test_single_fact_does_not_create_overconfident_observation(self, provider):
provider._store.add_fact('User prefers concise updates', category='user_pref')
result = json.loads(
provider.handle_tool_call(
'fact_store',
{'action': 'observe', 'query': 'What does the user prefer?', 'limit': 5},
)
)
assert result['count'] == 0
assert provider._store.list_observations(limit=10) == []
def test_prefetch_surfaces_observations_as_separate_layer(self, provider):
provider._store.add_fact('User prefers concise updates', category='user_pref')
provider._store.add_fact('User wants result-only communication', category='user_pref')
prefetch = provider.prefetch('What communication style does the user prefer?')
assert '## Holographic Observations' in prefetch
assert '## Holographic Memory' in prefetch
assert 'recurring_preference' in prefetch
assert 'evidence' in prefetch.lower()