Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
755e7513a1 docs: add grounded tensorzero evaluation packet (#860)
All checks were successful
Lint / lint (pull_request) Successful in 37s
- add a script that inventories Hermes routing/evaluation surfaces relevant to a TensorZero cutover
- generate a markdown and JSON evaluation packet for issue #860
- score gateway replacement, config migration, canary rollout, session feedback, and eval-suite readiness
- add focused regression tests for touchpoint scanning, requirement scoring, and report rendering

Refs #860
2026-04-22 11:33:31 -04:00
10 changed files with 2028 additions and 1078 deletions

View File

@@ -1,546 +1,197 @@
"""Session compaction with structured fact extraction.
"""Session compaction with fact extraction.
Before compressing conversation context, extract durable facts with enough
structure to survive retrieval: source/provenance, temporal anchors,
normalized canonical keys, and contradiction groups.
Before compressing conversation context, extracts durable facts
(user preferences, corrections, project details) and saves them
to the fact store so they survive compression.
Usage:
from agent.session_compactor import extract_and_save_facts
facts = extract_and_save_facts(messages)
"""
from __future__ import annotations
import json
import logging
import re
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
_DEPLOY_METHOD_RE = re.compile(r"\bdeploy(?:ing)?\s+(?:via|through|with)\s+([A-Za-z0-9_./+-]+)", re.IGNORECASE)
_WATCHDOG_CAP_RE = re.compile(
r"\b(?:the\s+)?([A-Za-z0-9_-]+(?:\s+watchdog)?)\s+(?:caps|limits)\s+dispatches(?:\s+per\s+cycle)?\s+to\s+([0-9]+)",
re.IGNORECASE,
)
_PROVIDER_RE = re.compile(
r"\bprovider\s+(?:is|should\s+stay|should\s+be|needs\s+to\s+be)\s+([A-Za-z0-9._/-]+)",
re.IGNORECASE,
)
_MODEL_RE = re.compile(
r"\bmodel\s+(?:is|should\s+stay|should\s+be|needs\s+to\s+be)\s+([A-Za-z0-9._:/-]+)",
re.IGNORECASE,
)
_PORT_RE = re.compile(r"\bport\s+(?:is|should\s+be)\s+([0-9]+)", re.IGNORECASE)
_PROJECT_USES_RE = re.compile(r"\b(?:the\s+)?project\s+(?:uses|needs|requires)\s+(.+?)(?:[.!?]|$)", re.IGNORECASE)
_PREFERENCE_RE = re.compile(r"\bI\s+(?:prefer|like|want|need)\s+(.+?)(?:[.!?]|$)", re.IGNORECASE)
_CONSTRAINT_RE = re.compile(r"\b(?:do\s+not|don't)\s+(?:ever\s+|again\s+)?(.+?)(?:[.!?]|$)", re.IGNORECASE)
_DECISION_RE = re.compile(r"\b(?:we|the\s+team)\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+?)(?:[.!?]|$)", re.IGNORECASE)
@dataclass
class ExtractedFact:
"""A durable fact extracted from conversation."""
category: str
entity: str
content: str
confidence: float
source_turn: int
"""A fact extracted from conversation."""
category: str # "user_pref", "correction", "project", "tool_quirk", "general"
entity: str # what the fact is about
content: str # the fact itself
confidence: float # 0.0-1.0
source_turn: int # which message turn it came from
timestamp: float = 0.0
source_role: str = "user"
source_text: str = ""
normalized_content: str = ""
canonical_key: str = ""
relation: str = "general"
contradiction_group: str = ""
status: str = "active"
provenance: str = ""
observed_at: str = ""
evidence: List[Dict[str, Any]] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if not self.timestamp:
self.timestamp = time.time()
if not self.observed_at:
self.observed_at = _iso_from_timestamp(self.timestamp)
if not self.normalized_content:
self.normalized_content = _normalize_value(self.content)
if not self.provenance:
self.provenance = f"conversation:{self.source_role}:{self.source_turn}"
if not self.canonical_key:
self.canonical_key = _canonical_key(self.entity, self.relation, self.normalized_content)
if not self.evidence:
self.evidence = [
{
"source_role": self.source_role,
"source_turn": self.source_turn,
"source_text": self.source_text or self.content,
"observed_at": self.observed_at,
"provenance": self.provenance,
}
]
self.metadata = dict(self.metadata or {})
self.metadata.setdefault("entity", self.entity)
self.metadata.setdefault("relation", self.relation)
self.metadata.setdefault("value", self.content)
self.metadata.setdefault("normalized_value", self.normalized_content)
self.metadata.setdefault("provenance", [self.provenance])
self.metadata.setdefault("evidence", list(self.evidence))
self.metadata.setdefault("observation_count", len(self.evidence))
self.metadata.setdefault("duplicate_count", max(0, self.metadata["observation_count"] - 1))
if self.contradiction_group:
self.metadata.setdefault("contradiction_group", self.contradiction_group)
self.metadata.setdefault("status", self.status)
# Patterns that indicate user preferences
_PREFERENCE_PATTERNS = [
(r"(?:I|we) (?:prefer|like|want|need) (.+?)(?:\.|$)", "preference"),
(r"(?:always|never) (?:use|do|run|deploy) (.+?)(?:\.|$)", "preference"),
(r"(?:my|our) (?:default|preferred|usual) (.+?) (?:is|are) (.+?)(?:\.|$)", "preference"),
(r"(?:make sure|ensure|remember) (?:to|that) (.+?)(?:\.|$)", "instruction"),
(r"(?:don'?t|do not) (?:ever|ever again) (.+?)(?:\.|$)", "constraint"),
]
# Patterns that indicate corrections
_CORRECTION_PATTERNS = [
(r"(?:actually|no[, ]|wait[, ]|correction[: ]|sorry[, ]) (.+)", "correction"),
(r"(?:I meant|what I meant was|the correct) (.+?)(?:\.|$)", "correction"),
(r"(?:it'?s|its) (?:not|shouldn'?t be|wrong) (.+?)(?:\.|$)", "correction"),
]
# Patterns that indicate project/tool facts
_PROJECT_PATTERNS = [
(r"(?:the |our )?(?:project|repo|codebase|code) (?:is|uses|needs|requires) (.+?)(?:\.|$)", "project"),
(r"(?:deploy|push|commit) (?:to|on) (.+?)(?:\.|$)", "project"),
(r"(?:this|that|the) (?:server|host|machine|VPS) (?:is|runs|has) (.+?)(?:\.|$)", "infrastructure"),
(r"(?:model|provider|engine) (?:is|should be|needs to be) (.+?)(?:\.|$)", "config"),
]
def extract_facts_from_messages(messages: List[Dict[str, Any]]) -> List[ExtractedFact]:
"""Extract durable facts from conversation messages.
Scans conversation turns for preferences, decisions, corrections, and
operational state. Raw candidates are normalized into canonical facts so
near-duplicates merge and contradictions remain inspectable.
Scans user messages for preferences, corrections, project facts,
and infrastructure details that should survive compression.
"""
facts = []
seen_contents = set()
raw_candidates: list[ExtractedFact] = []
for turn_idx, msg in enumerate(messages):
role = msg.get("role", "")
content = msg.get("content", "")
if role not in {"user", "assistant"}:
# Only scan user messages and assistant responses with corrections
if role not in ("user", "assistant"):
continue
if not content or not isinstance(content, str):
continue
if len(content) < 10:
continue
# Skip tool results and system messages
if role == "assistant" and msg.get("tool_calls"):
continue
if not isinstance(content, str) or len(content.strip()) < 10:
continue
timestamp, observed_at = _message_time(msg)
raw_candidates.extend(
_extract_from_text(
content.strip(),
turn_idx=turn_idx,
role=role,
timestamp=timestamp,
observed_at=observed_at,
)
)
extracted = _extract_from_text(content, turn_idx, role)
return _normalize_candidates(raw_candidates)
def evaluate_extraction_quality(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Return before/after metrics for raw vs normalized extraction quality."""
raw_candidates: list[ExtractedFact] = []
for turn_idx, msg in enumerate(messages):
role = msg.get("role", "")
content = msg.get("content", "")
if role not in {"user", "assistant"}:
continue
if role == "assistant" and msg.get("tool_calls"):
continue
if not isinstance(content, str) or len(content.strip()) < 10:
continue
timestamp, observed_at = _message_time(msg)
raw_candidates.extend(
_extract_from_text(
content.strip(),
turn_idx=turn_idx,
role=role,
timestamp=timestamp,
observed_at=observed_at,
)
)
normalized = _normalize_candidates(raw_candidates)
raw_count = len(raw_candidates)
normalized_count = len(normalized)
contradiction_groups = {
fact.contradiction_group
for fact in normalized
if fact.status == "contradiction" and fact.contradiction_group
}
duplicate_count = max(0, raw_count - normalized_count)
noise_reduction = (duplicate_count / raw_count) if raw_count else 0.0
return {
"raw_candidates": raw_count,
"normalized_facts": normalized_count,
"duplicates_merged": duplicate_count,
"contradiction_groups": len(contradiction_groups),
"noise_reduction": round(noise_reduction, 3),
}
def _extract_from_text(
text: str,
*,
turn_idx: int,
role: str,
timestamp: float,
observed_at: str,
) -> List[ExtractedFact]:
"""Extract raw fact candidates from a single text block."""
facts: list[ExtractedFact] = []
if role != "user":
return facts
deploy_match = _DEPLOY_METHOD_RE.search(text)
if deploy_match:
method = deploy_match.group(1).strip()
facts.append(
_build_fact(
category="project.decision",
entity="project",
relation="workflow.deploy_method",
value=method,
content=f"Deploy via {method}",
confidence=0.88,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
watchdog_match = _WATCHDOG_CAP_RE.search(text)
if watchdog_match:
watchdog = watchdog_match.group(1).strip()
cap = watchdog_match.group(2).strip()
facts.append(
_build_fact(
category="project.operational",
entity=_normalize_entity(watchdog),
relation="fleet.dispatch_cap",
value=cap,
content=f"{watchdog} caps dispatches per cycle to {cap}",
confidence=0.92,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
provider_match = _PROVIDER_RE.search(text)
if provider_match:
provider = provider_match.group(1).strip()
facts.append(
_build_fact(
category="project.config",
entity="project",
relation="config.provider",
value=provider,
content=f"Provider should stay {provider}",
confidence=0.91,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
model_match = _MODEL_RE.search(text)
if model_match:
model = model_match.group(1).strip()
facts.append(
_build_fact(
category="project.config",
entity="project",
relation="config.model",
value=model,
content=f"Model should stay {model}",
confidence=0.9,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
port_match = _PORT_RE.search(text)
if port_match:
port = port_match.group(1).strip()
facts.append(
_build_fact(
category="project.config",
entity="project",
relation="config.port",
value=port,
content=f"Port is {port}",
confidence=0.9,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
project_match = _PROJECT_USES_RE.search(text)
if project_match:
value = project_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="project.stack",
entity="project",
relation="project.stack",
value=value,
content=f"Project uses {value}",
confidence=0.74,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
preference_match = _PREFERENCE_RE.search(text)
if preference_match:
value = preference_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="user_pref.preference",
entity="user",
relation="user.preference",
value=value,
content=value,
confidence=0.72,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
constraint_match = _CONSTRAINT_RE.search(text)
if constraint_match:
value = constraint_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="user_pref.constraint",
entity="user",
relation="user.constraint",
value=value,
content=f"Do not {value}",
confidence=0.82,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
decision_match = _DECISION_RE.search(text)
if decision_match:
value = decision_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="project.decision",
entity="project",
relation="project.decision",
value=value,
content=f"Decision: {value}",
confidence=0.79,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
# Deduplicate by content
for fact in extracted:
key = f"{fact.category}:{fact.content[:100]}"
if key not in seen_contents:
seen_contents.add(key)
facts.append(fact)
return facts
def _build_fact(
*,
category: str,
entity: str,
relation: str,
value: str,
content: str,
confidence: float,
source_turn: int,
source_role: str,
source_text: str,
timestamp: float,
observed_at: str,
unique_slot: bool,
) -> ExtractedFact:
normalized_value = _normalize_value(value.rstrip(".!?"))
value = value.rstrip(".!?")
content = content.rstrip(".!?")
provenance = f"conversation:{source_role}:{source_turn}"
contradiction_group = relation if unique_slot else ""
evidence = [
{
"source_role": source_role,
"source_turn": source_turn,
"source_text": source_text,
"observed_at": observed_at,
"provenance": provenance,
}
]
metadata = {
"entity": entity,
"relation": relation,
"value": value,
"normalized_value": normalized_value,
"provenance": [provenance],
"evidence": list(evidence),
"observation_count": 1,
"duplicate_count": 0,
"status": "active",
}
if contradiction_group:
metadata["contradiction_group"] = contradiction_group
return ExtractedFact(
category=category,
entity=entity,
content=content,
confidence=confidence,
source_turn=source_turn,
timestamp=timestamp,
source_role=source_role,
source_text=source_text,
normalized_content=normalized_value,
canonical_key=_canonical_key(entity, relation, normalized_value),
relation=relation,
contradiction_group=contradiction_group,
status="active",
provenance=provenance,
observed_at=observed_at,
evidence=evidence,
metadata=metadata,
)
def _extract_from_text(text: str, turn_idx: int, role: str) -> List[ExtractedFact]:
"""Extract facts from a single text block."""
facts = []
timestamp = time.time()
# Clean text for pattern matching
clean = text.strip()
def _normalize_candidates(candidates: List[ExtractedFact]) -> List[ExtractedFact]:
"""Merge duplicates and mark contradictions while preserving evidence."""
# User preference patterns (from user messages)
if role == "user":
for pattern, subcategory in _PREFERENCE_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"user_pref.{subcategory}",
entity="user",
content=content[:200],
confidence=0.7,
source_turn=turn_idx,
timestamp=timestamp,
))
by_key: dict[str, ExtractedFact] = {}
contradiction_groups: dict[str, list[ExtractedFact]] = {}
# Correction patterns (from user messages)
if role == "user":
for pattern, subcategory in _CORRECTION_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"correction.{subcategory}",
entity="user",
content=content[:200],
confidence=0.8,
source_turn=turn_idx,
timestamp=timestamp,
))
for candidate in candidates:
existing = by_key.get(candidate.canonical_key)
if existing is not None:
by_key[candidate.canonical_key] = _merge_fact(existing, candidate)
continue
# Project/infrastructure patterns (from both user and assistant)
for pattern, subcategory in _PROJECT_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"project.{subcategory}",
entity=subcategory,
content=content[:200],
confidence=0.6,
source_turn=turn_idx,
timestamp=timestamp,
))
by_key[candidate.canonical_key] = candidate
if candidate.contradiction_group:
contradiction_groups.setdefault(candidate.contradiction_group, []).append(candidate)
for group, facts in contradiction_groups.items():
canonical_keys = {fact.canonical_key for fact in facts}
if len(canonical_keys) <= 1:
continue
for fact in facts:
fact.status = "contradiction"
fact.metadata["status"] = "contradiction"
fact.metadata["contradiction_group"] = group
fact.metadata["contradiction_keys"] = sorted(canonical_keys - {fact.canonical_key})
return sorted(by_key.values(), key=lambda fact: (fact.source_turn, fact.timestamp, fact.canonical_key))
def _merge_fact(existing: ExtractedFact, incoming: ExtractedFact) -> ExtractedFact:
existing.confidence = max(existing.confidence, incoming.confidence)
existing.timestamp = min(existing.timestamp, incoming.timestamp)
existing.source_turn = min(existing.source_turn, incoming.source_turn)
if not existing.observed_at or (incoming.observed_at and incoming.observed_at < existing.observed_at):
existing.observed_at = incoming.observed_at
existing.provenance = min(existing.provenance, incoming.provenance)
provenance = _ordered_unique(existing.metadata.get("provenance", []), incoming.metadata.get("provenance", []))
evidence = _merge_evidence(existing.metadata.get("evidence", []), incoming.metadata.get("evidence", []))
observation_count = int(existing.metadata.get("observation_count", len(existing.evidence) or 1))
observation_count += int(incoming.metadata.get("observation_count", len(incoming.evidence) or 1))
existing.evidence = evidence
existing.metadata["provenance"] = provenance
existing.metadata["evidence"] = evidence
existing.metadata["observation_count"] = observation_count
existing.metadata["duplicate_count"] = max(0, observation_count - 1)
existing.metadata["status"] = existing.status
return existing
return facts
def save_facts_to_store(facts: List[ExtractedFact], fact_store_fn=None) -> int:
"""Save extracted facts to the fact store.
If a callback is supplied, prefer the structured signature but fall back to
the legacy four-argument callback for compatibility.
Args:
facts: List of extracted facts.
fact_store_fn: Optional callable(category, entity, content, trust).
If None, uses the holographic fact store if available.
Returns:
Number of facts saved.
"""
saved = 0
for fact in facts:
payload = {
"category": _store_category(fact.category),
"entity": fact.entity,
"content": fact.content,
"trust": fact.confidence,
"metadata": dict(fact.metadata),
"canonical_key": fact.canonical_key,
"observed_at": fact.observed_at,
"source_role": fact.source_role,
"source_turn": fact.source_turn,
"contradiction_group": fact.contradiction_group,
"status": fact.status,
"relation": fact.relation,
}
if fact_store_fn:
if fact_store_fn:
for fact in facts:
try:
fact_store_fn(**payload)
fact_store_fn(
category=fact.category,
entity=fact.entity,
content=fact.content,
trust=fact.confidence,
)
saved += 1
continue
except TypeError:
try:
fact_store_fn(payload["category"], payload["entity"], payload["content"], payload["trust"])
saved += 1
continue
except Exception as exc:
logger.debug("Failed to save fact via callback: %s", exc)
continue
except Exception as exc:
logger.debug("Failed to save fact via callback: %s", exc)
continue
except Exception as e:
logger.debug("Failed to save fact: %s", e)
else:
# Try holographic fact store
try:
from fact_store import fact_store as _fs
tags = ",".join(filter(None, [fact.entity, fact.relation, fact.status]))
_fs(
action="add",
content=fact.content,
category=_store_category(fact.category),
tags=tags,
trust_delta=fact.confidence - 0.5,
)
saved += 1
for fact in facts:
try:
_fs(
action="add",
content=fact.content,
category=fact.category,
tags=fact.entity,
trust_delta=fact.confidence - 0.5,
)
saved += 1
except Exception as e:
logger.debug("Failed to save fact via fact_store: %s", e)
except ImportError:
logger.debug("fact_store not available — facts not persisted")
break
except Exception as exc:
logger.debug("Failed to save fact via fact_store: %s", exc)
return saved
@@ -553,10 +204,9 @@ def extract_and_save_facts(
Returns (extracted_facts, saved_count).
"""
facts = extract_facts_from_messages(messages)
if facts:
logger.info("Extracted %d normalized facts from conversation", len(facts))
logger.info("Extracted %d facts from conversation", len(facts))
saved = save_facts_to_store(facts, fact_store_fn)
logger.info("Saved %d/%d facts to store", saved, len(facts))
else:
@@ -566,105 +216,16 @@ def extract_and_save_facts(
def format_facts_summary(facts: List[ExtractedFact]) -> str:
"""Format extracted facts as a readable summary."""
if not facts:
return "No facts extracted."
by_category: dict[str, list[ExtractedFact]] = {}
for fact in facts:
by_category.setdefault(fact.category, []).append(fact)
by_category = {}
for f in facts:
by_category.setdefault(f.category, []).append(f)
lines = [f"Extracted {len(facts)} facts:", ""]
for category, category_facts in sorted(by_category.items()):
lines.append(f" {category}:")
for fact in category_facts:
suffix = f" [{fact.status}]" if fact.status != "active" else ""
lines.append(f" - {fact.content[:80]}{suffix}")
for cat, cat_facts in sorted(by_category.items()):
lines.append(f" {cat}:")
for f in cat_facts:
lines.append(f" - {f.content[:80]}")
return "\n".join(lines)
def _store_category(category: str) -> str:
if category.startswith("user_pref"):
return "user_pref"
if category.startswith("project"):
return "project"
if category.startswith("tool"):
return "tool"
return "general"
def _message_time(msg: Dict[str, Any]) -> Tuple[float, str]:
for key in ("created_at", "timestamp", "time"):
value = msg.get(key)
if value is None:
continue
if isinstance(value, (int, float)):
ts = float(value)
return ts, _iso_from_timestamp(ts)
if isinstance(value, str):
parsed = _parse_time_string(value)
if parsed is not None:
return parsed, _iso_from_timestamp(parsed) if "T" not in value else value.replace("+00:00", "Z")
return time.time(), value
now = time.time()
return now, _iso_from_timestamp(now)
def _parse_time_string(value: str) -> float | None:
text = value.strip()
if not text:
return None
try:
return float(text)
except ValueError:
pass
try:
normalized = text[:-1] + "+00:00" if text.endswith("Z") else text
return datetime.fromisoformat(normalized).timestamp()
except ValueError:
return None
def _iso_from_timestamp(value: float) -> str:
return datetime.fromtimestamp(value, tz=timezone.utc).isoformat().replace("+00:00", "Z")
def _normalize_value(value: str) -> str:
normalized = re.sub(r"[^a-z0-9]+", " ", value.lower())
normalized = re.sub(r"\s+", " ", normalized).strip()
return normalized
def _normalize_entity(value: str) -> str:
return _normalize_value(value).replace(" ", "_") or "entity"
def _canonical_key(entity: str, relation: str, normalized_value: str) -> str:
return f"{entity}|{relation}|{normalized_value}"
def _ordered_unique(*groups: List[str]) -> List[str]:
seen: set[str] = set()
ordered: list[str] = []
for group in groups:
for item in group:
if item and item not in seen:
seen.add(item)
ordered.append(item)
return ordered
def _merge_evidence(existing: List[Dict[str, Any]], incoming: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
seen: set[tuple[str, str, str]] = set()
merged: list[dict[str, Any]] = []
for item in list(existing) + list(incoming):
key = (
str(item.get("provenance", "")),
str(item.get("observed_at", "")),
str(item.get("source_text", "")),
)
if key in seen:
continue
seen.add(key)
merged.append(dict(item))
return merged

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,217 @@
# TensorZero Evaluation Packet
Issue #860: [tensorzero LLMOps platform evaluation](https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/860)
## Scope
This packet evaluates TensorZero as a possible replacement for Hermes' custom provider-routing stack.
It is intentionally grounded in the current repo state rather than a speculative cutover plan.
## Issue requirements being evaluated
- Deploy tensorzero gateway (Rust binary)
- Migrate provider routing config
- Test with canary (10% traffic) before full cutover
- Feed session data for prompt optimization
- Evaluation suite for A/B testing models
## Recommendation
Not ready for direct replacement. Recommend a shadow-evaluation phase first: keep Hermes routing live, inventory the migration seams, export SessionDB/trajectory data into an offline TensorZero experiment loop, and only design a canary gateway once percentage-based rollout controls exist.
## Requirement matrix
| Requirement | Status | Evidence labels | Summary |
| --- | --- | --- | --- |
| Gateway replacement scope | partial | fallback_chain, runtime_provider, gateway_provider_routing, cron_runtime_provider, auxiliary_fallback_chain, delegate_runtime_provider | Hermes already spreads provider routing across core agent, runtime provider, gateway, cron, auxiliary, and delegation seams; TensorZero would need parity across all of them before it can replace the gateway layer. |
| Config migration | partial | provider_routing_config, runtime_provider, smart_model_routing, fallback_chain | Hermes has multiple config concepts to migrate (`provider_routing`, `fallback_providers`, `smart_model_routing`, runtime provider resolution), so TensorZero is not a drop-in config swap. |
| 10% traffic canary | gap | — | The repo shows semantic routing and fallback, but no grounded 10% traffic-split canary mechanism. A TensorZero cutover would need new percentage-based rollout controls and observability hooks. |
| Session data for prompt optimization | partial | session_db, trajectory_export | Hermes already has SessionDB and trajectory export surfaces that can feed offline optimization data, but not a TensorZero-native ingestion path yet. |
| Evaluation suite / A/B testing | partial | benchmark_suite, trajectory_export | Hermes already has benchmark/trajectory machinery that can seed TensorZero A/B evaluation, but no integrated TensorZero experiment runner or live evaluation gateway. |
## Grounded Hermes touchpoints
- `run_agent.py:601` — [fallback_chain] fallback_model: Dict[str, Any] = None,
- `run_agent.py:995` — [fallback_chain] # failure). Supports both legacy single-dict ``fallback_model`` and
- `run_agent.py:996` — [fallback_chain] # new list ``fallback_providers`` format.
- `run_agent.py:997` — [fallback_chain] if isinstance(fallback_model, list):
- `run_agent.py:998` — [fallback_chain] self._fallback_chain = [
- `run_agent.py:999` — [fallback_chain] f for f in fallback_model
- `run_agent.py:1002` — [fallback_chain] elif isinstance(fallback_model, dict) and fallback_model.get("provider") and fallback_model.get("model"):
- `run_agent.py:1003` — [fallback_chain] self._fallback_chain = [fallback_model]
- `run_agent.py:1005` — [fallback_chain] self._fallback_chain = []
- `run_agent.py:1009` — [fallback_chain] self._fallback_model = self._fallback_chain[0] if self._fallback_chain else None
- `run_agent.py:1010` — [fallback_chain] if self._fallback_chain and not self.quiet_mode:
- `run_agent.py:1011` — [fallback_chain] if len(self._fallback_chain) == 1:
- `run_agent.py:1012` — [fallback_chain] fb = self._fallback_chain[0]
- `run_agent.py:1015` — [fallback_chain] print(f"🔄 Fallback chain ({len(self._fallback_chain)} providers): " +
- `run_agent.py:1016` — [fallback_chain] " → ".join(f"{f['model']} ({f['provider']})" for f in self._fallback_chain))
- `run_agent.py:5624` — [fallback_chain] if self._fallback_index >= len(self._fallback_chain):
- `run_agent.py:5627` — [fallback_chain] fb = self._fallback_chain[self._fallback_index]
- `run_agent.py:8559` — [fallback_chain] if self._fallback_index < len(self._fallback_chain):
- `run_agent.py:9355` — [fallback_chain] if is_rate_limited and self._fallback_index < len(self._fallback_chain):
- `run_agent.py:10460` — [fallback_chain] if _truly_empty and self._fallback_chain:
- `run_agent.py:10514` — [fallback_chain] + (" and fallback attempts." if self._fallback_chain else
- `cli.py:241` — [provider_routing_config] "smart_model_routing": {
- `cli.py:370` — [provider_routing_config] # (e.g. platform_toolsets, provider_routing, memory, honcho, etc.)
- `cli.py:1753` — [provider_routing_config] pr = CLI_CONFIG.get("provider_routing", {}) or {}
- `cli.py:1762` — [provider_routing_config] # Supports new list format (fallback_providers) and legacy single-dict (fallback_model).
- `cli.py:1763` — [provider_routing_config] fb = CLI_CONFIG.get("fallback_providers") or CLI_CONFIG.get("fallback_model") or []
- `cli.py:1770` — [provider_routing_config] self._smart_model_routing = CLI_CONFIG.get("smart_model_routing", {}) or {}
- `cli.py:2771` — [provider_routing_config] from agent.smart_model_routing import resolve_turn_route
- `cli.py:2776` — [provider_routing_config] self._smart_model_routing,
- `hermes_cli/runtime_provider.py:209` — [runtime_provider] def resolve_requested_provider(requested: Optional[str] = None) -> str:
- `hermes_cli/runtime_provider.py:649` — [runtime_provider] def resolve_runtime_provider(
- `agent/smart_model_routing.py:62` — [smart_model_routing] def choose_cheap_model_route(user_message: str, routing_config: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
- `agent/smart_model_routing.py:110` — [smart_model_routing] def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any]], primary: Dict[str, Any]) -> Dict[str, Any]:
- `gateway/run.py:1271` — [gateway_provider_routing] def _load_provider_routing() -> dict:
- `gateway/run.py:1285` — [gateway_provider_routing] def _load_fallback_model() -> list | dict | None:
- `gateway/run.py:1306` — [gateway_provider_routing] def _load_smart_model_routing() -> dict:
- `cron/scheduler.py:684` — [cron_runtime_provider] pr = _cfg.get("provider_routing", {})
- `cron/scheduler.py:688` — [cron_runtime_provider] resolve_runtime_provider,
- `cron/scheduler.py:697` — [cron_runtime_provider] runtime = resolve_runtime_provider(**runtime_kwargs)
- `cron/scheduler.py:702` — [cron_runtime_provider] from agent.smart_model_routing import resolve_turn_route
- `cron/scheduler.py:703` — [cron_runtime_provider] turn_route = resolve_turn_route(
- `cron/scheduler.py:717` — [cron_runtime_provider] fallback_model = _cfg.get("fallback_providers") or _cfg.get("fallback_model") or None
- `cron/scheduler.py:746` — [cron_runtime_provider] fallback_model=fallback_model,
- `agent/auxiliary_client.py:1018` — [auxiliary_fallback_chain] def _get_provider_chain() -> List[tuple]:
- `agent/auxiliary_client.py:1107` — [auxiliary_fallback_chain] for label, try_fn in _get_provider_chain():
- `agent/auxiliary_client.py:1189` — [auxiliary_fallback_chain] # ── Step 2: aggregator / fallback chain ──────────────────────────────
- `agent/auxiliary_client.py:1191` — [auxiliary_fallback_chain] for label, try_fn in _get_provider_chain():
- `agent/auxiliary_client.py:2397` — [auxiliary_fallback_chain] # error, fall through to the fallback chain below.
- `agent/auxiliary_client.py:2417` — [auxiliary_fallback_chain] # auto (the default) = best-effort fallback chain. (#7559)
- `agent/auxiliary_client.py:2589` — [auxiliary_fallback_chain] # error, fall through to the fallback chain below.
- `tools/delegate_tool.py:662` — [delegate_runtime_provider] # bundle (base_url, api_key, api_mode) via the same runtime provider system
- `tools/delegate_tool.py:854` — [delegate_runtime_provider] provider) is resolved via the runtime provider system — the same path used
- `tools/delegate_tool.py:909` — [delegate_runtime_provider] from hermes_cli.runtime_provider import resolve_runtime_provider
- `tools/delegate_tool.py:910` — [delegate_runtime_provider] runtime = resolve_runtime_provider(requested=configured_provider)
- `hermes_state.py:115` — [session_db] class SessionDB:
- `batch_runner.py:320` — [trajectory_export] save_trajectories=False, # We handle saving ourselves
- `batch_runner.py:346` — [trajectory_export] trajectory = agent._convert_to_trajectory_format(
- `batch_runner.py:460` — [trajectory_export] trajectory_entry = {
- `batch_runner.py:474` — [trajectory_export] f.write(json.dumps(trajectory_entry, ensure_ascii=False) + "\n")
- `benchmarks/tool_call_benchmark.py:3` — [benchmark_suite] Tool-Calling Benchmark — Gemma 4 vs mimo-v2-pro regression test.
- `benchmarks/tool_call_benchmark.py:9` — [benchmark_suite] python3 benchmarks/tool_call_benchmark.py # full 100-call suite
- `benchmarks/tool_call_benchmark.py:10` — [benchmark_suite] python3 benchmarks/tool_call_benchmark.py --limit 10 # quick smoke test
- `benchmarks/tool_call_benchmark.py:11` — [benchmark_suite] python3 benchmarks/tool_call_benchmark.py --models nous # single model
- `benchmarks/tool_call_benchmark.py:12` — [benchmark_suite] python3 benchmarks/tool_call_benchmark.py --category file # single category
- `benchmarks/tool_call_benchmark.py:37` — [benchmark_suite] class ToolCall:
- `benchmarks/tool_call_benchmark.py:51` — [benchmark_suite] ToolCall("file-01", "file", "Read the file /tmp/test_bench.txt and show me its contents.",
- `benchmarks/tool_call_benchmark.py:53` — [benchmark_suite] ToolCall("file-02", "file", "Write 'hello benchmark' to /tmp/test_bench_out.txt",
- `benchmarks/tool_call_benchmark.py:55` — [benchmark_suite] ToolCall("file-03", "file", "Search for the word 'import' in all Python files in the current directory.",
- `benchmarks/tool_call_benchmark.py:57` — [benchmark_suite] ToolCall("file-04", "file", "Read lines 1-20 of /etc/hosts",
- `benchmarks/tool_call_benchmark.py:59` — [benchmark_suite] ToolCall("file-05", "file", "Patch /tmp/test_bench_out.txt: replace 'hello' with 'goodbye'",
- `benchmarks/tool_call_benchmark.py:61` — [benchmark_suite] ToolCall("file-06", "file", "Search for files matching *.py in the current directory.",
- `benchmarks/tool_call_benchmark.py:63` — [benchmark_suite] ToolCall("file-07", "file", "Read the first 10 lines of /etc/passwd",
- `benchmarks/tool_call_benchmark.py:65` — [benchmark_suite] ToolCall("file-08", "file", "Write a JSON config to /tmp/bench_config.json with key 'debug': true",
- `benchmarks/tool_call_benchmark.py:67` — [benchmark_suite] ToolCall("file-09", "file", "Search for 'def test_' in Python test files.",
- `benchmarks/tool_call_benchmark.py:69` — [benchmark_suite] ToolCall("file-10", "file", "Read /tmp/bench_config.json and tell me what's in it.",
- `benchmarks/tool_call_benchmark.py:71` — [benchmark_suite] ToolCall("file-11", "file", "Create a file /tmp/bench_readme.md with one line: '# Benchmark'",
- `benchmarks/tool_call_benchmark.py:73` — [benchmark_suite] ToolCall("file-12", "file", "Search for 'TODO' comments in all .py files.",
- `benchmarks/tool_call_benchmark.py:75` — [benchmark_suite] ToolCall("file-13", "file", "Read /tmp/bench_readme.md",
- `benchmarks/tool_call_benchmark.py:77` — [benchmark_suite] ToolCall("file-14", "file", "Patch /tmp/bench_readme.md: replace '# Benchmark' with '# Tool Benchmark'",
- `benchmarks/tool_call_benchmark.py:78` — [benchmark_suite] "patch", "Tool Benchmark"),
- `benchmarks/tool_call_benchmark.py:79` — [benchmark_suite] ToolCall("file-15", "file", "Write a Python one-liner to /tmp/bench_hello.py that prints hello.",
- `benchmarks/tool_call_benchmark.py:81` — [benchmark_suite] ToolCall("file-16", "file", "Search for all .json files in /tmp/.",
- `benchmarks/tool_call_benchmark.py:83` — [benchmark_suite] ToolCall("file-17", "file", "Read /tmp/bench_hello.py and verify it has print('hello').",
- `benchmarks/tool_call_benchmark.py:85` — [benchmark_suite] ToolCall("file-18", "file", "Patch /tmp/bench_hello.py to print 'hello world' instead of 'hello'.",
- `benchmarks/tool_call_benchmark.py:87` — [benchmark_suite] ToolCall("file-19", "file", "List files matching 'bench*' in /tmp/.",
- `benchmarks/tool_call_benchmark.py:89` — [benchmark_suite] ToolCall("file-20", "file", "Read /tmp/test_bench.txt again and summarize its contents.",
- `benchmarks/tool_call_benchmark.py:93` — [benchmark_suite] ToolCall("term-01", "terminal", "Run `echo hello world` in the terminal.",
- `benchmarks/tool_call_benchmark.py:95` — [benchmark_suite] ToolCall("term-02", "terminal", "Run `date` to get the current date and time.",
- `benchmarks/tool_call_benchmark.py:97` — [benchmark_suite] ToolCall("term-03", "terminal", "Run `uname -a` to get system information.",
- `benchmarks/tool_call_benchmark.py:99` — [benchmark_suite] ToolCall("term-04", "terminal", "Run `pwd` to show the current directory.",
- `benchmarks/tool_call_benchmark.py:101` — [benchmark_suite] ToolCall("term-05", "terminal", "Run `ls -la /tmp/ | head -20` to list temp files.",
- `benchmarks/tool_call_benchmark.py:103` — [benchmark_suite] ToolCall("term-06", "terminal", "Run `whoami` to show the current user.",
- `benchmarks/tool_call_benchmark.py:105` — [benchmark_suite] ToolCall("term-07", "terminal", "Run `df -h` to show disk usage.",
- `benchmarks/tool_call_benchmark.py:107` — [benchmark_suite] ToolCall("term-08", "terminal", "Run `python3 --version` to check Python version.",
- `benchmarks/tool_call_benchmark.py:109` — [benchmark_suite] ToolCall("term-09", "terminal", "Run `cat /etc/hostname` to get the hostname.",
- `benchmarks/tool_call_benchmark.py:111` — [benchmark_suite] ToolCall("term-10", "terminal", "Run `uptime` to see system uptime.",
- `benchmarks/tool_call_benchmark.py:113` — [benchmark_suite] ToolCall("term-11", "terminal", "Run `env | grep PATH` to show the PATH variable.",
- `benchmarks/tool_call_benchmark.py:115` — [benchmark_suite] ToolCall("term-12", "terminal", "Run `wc -l /etc/passwd` to count lines.",
- `benchmarks/tool_call_benchmark.py:117` — [benchmark_suite] ToolCall("term-13", "terminal", "Run `echo $SHELL` to show the current shell.",
- `benchmarks/tool_call_benchmark.py:119` — [benchmark_suite] ToolCall("term-14", "terminal", "Run `free -h || vm_stat` to check memory usage.",
- `benchmarks/tool_call_benchmark.py:121` — [benchmark_suite] ToolCall("term-15", "terminal", "Run `id` to show user and group IDs.",
- `benchmarks/tool_call_benchmark.py:123` — [benchmark_suite] ToolCall("term-16", "terminal", "Run `hostname` to get the machine hostname.",
- `benchmarks/tool_call_benchmark.py:125` — [benchmark_suite] ToolCall("term-17", "terminal", "Run `echo {1..5}` to test brace expansion.",
- `benchmarks/tool_call_benchmark.py:127` — [benchmark_suite] ToolCall("term-18", "terminal", "Run `seq 1 5` to generate a number sequence.",
- `benchmarks/tool_call_benchmark.py:129` — [benchmark_suite] ToolCall("term-19", "terminal", "Run `python3 -c 'print(2+2)'` to compute 2+2.",
- `benchmarks/tool_call_benchmark.py:131` — [benchmark_suite] ToolCall("term-20", "terminal", "Run `ls -d /tmp/bench* 2>/dev/null | wc -l` to count bench files.",
- `benchmarks/tool_call_benchmark.py:135` — [benchmark_suite] ToolCall("code-01", "code", "Execute a Python script that computes factorial of 10.",
- `benchmarks/tool_call_benchmark.py:137` — [benchmark_suite] ToolCall("code-02", "code", "Run Python to read /tmp/test_bench.txt and count its words.",
- `benchmarks/tool_call_benchmark.py:139` — [benchmark_suite] ToolCall("code-03", "code", "Execute Python to generate the first 20 Fibonacci numbers.",
- `benchmarks/tool_call_benchmark.py:141` — [benchmark_suite] ToolCall("code-04", "code", "Run Python to parse JSON from a string and print keys.",
- `benchmarks/tool_call_benchmark.py:143` — [benchmark_suite] ToolCall("code-05", "code", "Execute Python to list all files in /tmp/ matching 'bench*'.",
- `benchmarks/tool_call_benchmark.py:145` — [benchmark_suite] ToolCall("code-06", "code", "Run Python to compute the sum of squares from 1 to 100.",
- `benchmarks/tool_call_benchmark.py:147` — [benchmark_suite] ToolCall("code-07", "code", "Execute Python to check if 'racecar' is a palindrome.",
- `benchmarks/tool_call_benchmark.py:149` — [benchmark_suite] ToolCall("code-08", "code", "Run Python to create a CSV string with 5 rows of sample data.",
- `benchmarks/tool_call_benchmark.py:151` — [benchmark_suite] ToolCall("code-09", "code", "Execute Python to sort a list [5,2,8,1,9] and print the result.",
- `benchmarks/tool_call_benchmark.py:153` — [benchmark_suite] ToolCall("code-10", "code", "Run Python to count lines in /etc/passwd.",
- `benchmarks/tool_call_benchmark.py:155` — [benchmark_suite] ToolCall("code-11", "code", "Execute Python to hash the string 'benchmark' with SHA256.",
- `benchmarks/tool_call_benchmark.py:157` — [benchmark_suite] ToolCall("code-12", "code", "Run Python to get the current UTC timestamp.",
- `benchmarks/tool_call_benchmark.py:159` — [benchmark_suite] ToolCall("code-13", "code", "Execute Python to convert 'hello world' to uppercase and reverse it.",
- `benchmarks/tool_call_benchmark.py:161` — [benchmark_suite] ToolCall("code-14", "code", "Run Python to create a dictionary of system info (platform, python version).",
- `benchmarks/tool_call_benchmark.py:163` — [benchmark_suite] ToolCall("code-15", "code", "Execute Python to check internet connectivity by resolving google.com.",
- `benchmarks/tool_call_benchmark.py:167` — [benchmark_suite] ToolCall("deleg-01", "delegate", "Use a subagent to find all .log files in /tmp/.",
- `benchmarks/tool_call_benchmark.py:169` — [benchmark_suite] ToolCall("deleg-02", "delegate", "Delegate to a subagent: what is 15 * 37?",
- `benchmarks/tool_call_benchmark.py:171` — [benchmark_suite] ToolCall("deleg-03", "delegate", "Use a subagent to check if Python 3 is installed and its version.",
- `benchmarks/tool_call_benchmark.py:173` — [benchmark_suite] ToolCall("deleg-04", "delegate", "Delegate: read /tmp/test_bench.txt and summarize it in one sentence.",
- `benchmarks/tool_call_benchmark.py:175` — [benchmark_suite] ToolCall("deleg-05", "delegate", "Use a subagent to list the contents of /tmp/ directory.",
- `benchmarks/tool_call_benchmark.py:177` — [benchmark_suite] ToolCall("deleg-06", "delegate", "Delegate: count the number of .py files in the current directory.",
- `benchmarks/tool_call_benchmark.py:179` — [benchmark_suite] ToolCall("deleg-07", "delegate", "Use a subagent to check disk space with df -h.",
- `benchmarks/tool_call_benchmark.py:181` — [benchmark_suite] ToolCall("deleg-08", "delegate", "Delegate: what OS are we running on?",
- `benchmarks/tool_call_benchmark.py:183` — [benchmark_suite] ToolCall("deleg-09", "delegate", "Use a subagent to find the hostname of this machine.",
- `benchmarks/tool_call_benchmark.py:185` — [benchmark_suite] ToolCall("deleg-10", "delegate", "Delegate: create a temp file /tmp/bench_deleg.txt with 'done'.",
- `benchmarks/tool_call_benchmark.py:189` — [benchmark_suite] ToolCall("todo-01", "todo", "Add a todo item: 'Run benchmark suite'",
- `benchmarks/tool_call_benchmark.py:190` — [benchmark_suite] "todo", "benchmark"),
- `benchmarks/tool_call_benchmark.py:191` — [benchmark_suite] ToolCall("todo-02", "todo", "Show me the current todo list.",
- `benchmarks/tool_call_benchmark.py:193` — [benchmark_suite] ToolCall("todo-03", "todo", "Mark the first todo item as completed.",
- `benchmarks/tool_call_benchmark.py:195` — [benchmark_suite] ToolCall("todo-04", "todo", "Add a todo: 'Review benchmark results' with status pending.",
- `benchmarks/tool_call_benchmark.py:197` — [benchmark_suite] ToolCall("todo-05", "todo", "Clear all completed todos.",
- `benchmarks/tool_call_benchmark.py:199` — [benchmark_suite] ToolCall("todo-06", "memory", "Save this to memory: 'benchmark ran on {date}'".format(
- `benchmarks/tool_call_benchmark.py:201` — [benchmark_suite] "memory", "benchmark"),
- `benchmarks/tool_call_benchmark.py:202` — [benchmark_suite] ToolCall("todo-07", "memory", "Search memory for 'benchmark'.",
- `benchmarks/tool_call_benchmark.py:203` — [benchmark_suite] "memory", "benchmark"),
- `benchmarks/tool_call_benchmark.py:204` — [benchmark_suite] ToolCall("todo-08", "memory", "Add a memory note: 'test models are gemma-4 and mimo-v2-pro'.",
- `benchmarks/tool_call_benchmark.py:206` — [benchmark_suite] ToolCall("todo-09", "todo", "Add three todo items: 'analyze', 'report', 'cleanup'.",
- `benchmarks/tool_call_benchmark.py:208` — [benchmark_suite] ToolCall("todo-10", "memory", "Search memory for any notes about models.",
- `benchmarks/tool_call_benchmark.py:212` — [benchmark_suite] ToolCall("skill-01", "skills", "List all available skills.",
- `benchmarks/tool_call_benchmark.py:214` — [benchmark_suite] ToolCall("skill-02", "skills", "View the skill called 'test-driven-development'.",
- `benchmarks/tool_call_benchmark.py:216` — [benchmark_suite] ToolCall("skill-03", "skills", "Search for skills related to 'git'.",
- `benchmarks/tool_call_benchmark.py:218` — [benchmark_suite] ToolCall("skill-04", "skills", "View the 'code-review' skill.",
- `benchmarks/tool_call_benchmark.py:220` — [benchmark_suite] ToolCall("skill-05", "skills", "List all skills in the 'devops' category.",
- `benchmarks/tool_call_benchmark.py:222` — [benchmark_suite] ToolCall("skill-06", "skills", "View the 'systematic-debugging' skill.",
- `benchmarks/tool_call_benchmark.py:224` — [benchmark_suite] ToolCall("skill-07", "skills", "Search for skills about 'testing'.",
- `benchmarks/tool_call_benchmark.py:226` — [benchmark_suite] ToolCall("skill-08", "skills", "View the 'writing-plans' skill.",
- `benchmarks/tool_call_benchmark.py:228` — [benchmark_suite] ToolCall("skill-09", "skills", "List skills in 'software-development' category.",
- `benchmarks/tool_call_benchmark.py:230` — [benchmark_suite] ToolCall("skill-10", "skills", "View the 'pr-review-discipline' skill.",
- `benchmarks/tool_call_benchmark.py:234` — [benchmark_suite] ToolCall("file-21", "file", "Write a Python snippet to /tmp/bench_sort.py that sorts [3,1,2].",
- `benchmarks/tool_call_benchmark.py:236` — [benchmark_suite] ToolCall("file-22", "file", "Read /tmp/bench_sort.py back and confirm it exists.",
- `benchmarks/tool_call_benchmark.py:238` — [benchmark_suite] ToolCall("file-23", "file", "Search for 'class' in all .py files in the benchmarks directory.",
- `benchmarks/tool_call_benchmark.py:240` — [benchmark_suite] ToolCall("term-21", "terminal", "Run `cat /etc/os-release 2>/dev/null || sw_vers 2>/dev/null` for OS info.",
- `benchmarks/tool_call_benchmark.py:242` — [benchmark_suite] ToolCall("term-22", "terminal", "Run `nproc 2>/dev/null || sysctl -n hw.ncpu 2>/dev/null` for CPU count.",
- `benchmarks/tool_call_benchmark.py:244` — [benchmark_suite] ToolCall("code-16", "code", "Execute Python to flatten a nested list [[1,2],[3,4],[5]].",
- `benchmarks/tool_call_benchmark.py:246` — [benchmark_suite] ToolCall("code-17", "code", "Run Python to check if a number 17 is prime.",
- `benchmarks/tool_call_benchmark.py:248` — [benchmark_suite] ToolCall("deleg-11", "delegate", "Delegate: what is the current working directory?",
- `benchmarks/tool_call_benchmark.py:250` — [benchmark_suite] ToolCall("todo-11", "todo", "Add a todo: 'Finalize benchmark report' status pending.",
- `benchmarks/tool_call_benchmark.py:252` — [benchmark_suite] ToolCall("todo-12", "memory", "Store fact: 'benchmark categories: file, terminal, code, delegate, todo, memory, skills'.",
- `benchmarks/tool_call_benchmark.py:254` — [benchmark_suite] ToolCall("skill-11", "skills", "Search for skills about 'deployment'.",
- `benchmarks/tool_call_benchmark.py:256` — [benchmark_suite] ToolCall("skill-12", "skills", "View the 'gitea-burn-cycle' skill.",
- `benchmarks/tool_call_benchmark.py:258` — [benchmark_suite] ToolCall("skill-13", "skills", "List all available skill categories.",
- `benchmarks/tool_call_benchmark.py:260` — [benchmark_suite] ToolCall("skill-14", "skills", "Search for skills related to 'memory'.",
- `benchmarks/tool_call_benchmark.py:262` — [benchmark_suite] ToolCall("skill-15", "skills", "View the 'mimo-swarm' skill.",
- `benchmarks/tool_call_benchmark.py:311` — [benchmark_suite] """Create prerequisite files for the benchmark."""
- `benchmarks/tool_call_benchmark.py:313` — [benchmark_suite] "This is a benchmark test file.\n"
- `benchmarks/tool_call_benchmark.py:349` — [benchmark_suite] "You are a benchmark test runner. Execute the user's request by calling "
- `benchmarks/tool_call_benchmark.py:406` — [benchmark_suite] """Generate markdown benchmark report."""
- `benchmarks/tool_call_benchmark.py:428` — [benchmark_suite] f"# Tool-Calling Benchmark Report",
- `benchmarks/tool_call_benchmark.py:535` — [benchmark_suite] parser = argparse.ArgumentParser(description="Tool-calling benchmark")
- `benchmarks/tool_call_benchmark.py:544` — [benchmark_suite] help="Output report path (default: benchmarks/gemma4-tool-calling-YYYY-MM-DD.md)")
- `benchmarks/tool_call_benchmark.py:565` — [benchmark_suite] output_path = Path(args.output) if args.output else REPO_ROOT / "benchmarks" / f"gemma4-tool-calling-{date_str}.md"
- `benchmarks/tool_call_benchmark.py:575` — [benchmark_suite] print(f"Benchmark: {len(suite)} tests × {len(model_specs)} models = {len(suite) * len(model_specs)} calls")
## Suggested next slice
1. Build an exporter that emits SessionDB + trajectory data into a TensorZero-friendly offline dataset.
2. Define percentage-based canary controls before attempting any gateway replacement.
3. Keep Hermes routing authoritative until TensorZero proves parity across CLI, gateway, cron, auxiliary, and delegation surfaces.

View File

@@ -356,57 +356,44 @@ class HolographicMemoryProvider(MemoryProvider):
# -- Auto-extraction (on_session_end) ------------------------------------
def _auto_extract_facts(self, messages: list) -> None:
from agent.session_compactor import evaluate_extraction_quality, extract_facts_from_messages
def _store_category(category: str) -> str:
if category.startswith("user_pref"):
return "user_pref"
if category.startswith("project"):
return "project"
if category.startswith("tool"):
return "tool"
return "general"
facts = extract_facts_from_messages(messages)
if not facts:
return
_PREF_PATTERNS = [
re.compile(r'\bI\s+(?:prefer|like|love|use|want|need)\s+(.+)', re.IGNORECASE),
re.compile(r'\bmy\s+(?:favorite|preferred|default)\s+\w+\s+is\s+(.+)', re.IGNORECASE),
re.compile(r'\bI\s+(?:always|never|usually)\s+(.+)', re.IGNORECASE),
]
_DECISION_PATTERNS = [
re.compile(r'\bwe\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+)', re.IGNORECASE),
re.compile(r'\bthe\s+project\s+(?:uses|needs|requires)\s+(.+)', re.IGNORECASE),
]
extracted = 0
for fact in facts:
try:
metadata = dict(fact.metadata)
metadata.setdefault("relation", fact.relation)
metadata.setdefault("value", fact.content)
metadata.setdefault("provenance", [fact.provenance])
metadata.setdefault("evidence", list(fact.evidence))
metadata.setdefault("observation_count", len(fact.evidence))
metadata.setdefault("duplicate_count", max(0, len(fact.evidence) - 1))
self._store.add_fact(
fact.content,
category=_store_category(fact.category),
tags=",".join(filter(None, [fact.entity, fact.relation, fact.status])),
canonical_key=fact.canonical_key,
metadata=metadata,
confidence=fact.confidence,
source_role=fact.source_role,
source_turn=fact.source_turn,
observed_at=fact.observed_at,
contradiction_group=fact.contradiction_group,
status=fact.status,
)
extracted += 1
except Exception as exc:
logger.debug("Structured auto-extract failed for %s: %s", fact.canonical_key, exc)
for msg in messages:
if msg.get("role") != "user":
continue
content = msg.get("content", "")
if not isinstance(content, str) or len(content) < 10:
continue
for pattern in _PREF_PATTERNS:
if pattern.search(content):
try:
self._store.add_fact(content[:400], category="user_pref")
extracted += 1
except Exception:
pass
break
for pattern in _DECISION_PATTERNS:
if pattern.search(content):
try:
self._store.add_fact(content[:400], category="project")
extracted += 1
except Exception:
pass
break
if extracted:
metrics = evaluate_extraction_quality(messages)
logger.info(
"Auto-extracted %d structured facts from conversation (raw=%d normalized=%d contradictions=%d)",
extracted,
metrics["raw_candidates"],
metrics["normalized_facts"],
metrics["contradiction_groups"],
)
logger.info("Auto-extracted %d facts from conversation", extracted)
# ---------------------------------------------------------------------------

View File

@@ -3,7 +3,6 @@ SQLite-backed fact store with entity resolution and trust scoring.
Single-user Hermes memory store plugin.
"""
import json
import re
import sqlite3
import threading
@@ -16,24 +15,16 @@ except ImportError:
_SCHEMA = """
CREATE TABLE IF NOT EXISTS facts (
fact_id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL UNIQUE,
category TEXT DEFAULT 'general',
tags TEXT DEFAULT '',
trust_score REAL DEFAULT 0.5,
retrieval_count INTEGER DEFAULT 0,
helpful_count INTEGER DEFAULT 0,
canonical_key TEXT DEFAULT '',
metadata_json TEXT DEFAULT '{}',
confidence REAL DEFAULT 0.5,
source_role TEXT DEFAULT '',
source_turn INTEGER DEFAULT -1,
observed_at TEXT DEFAULT '',
contradiction_group TEXT DEFAULT '',
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hrr_vector BLOB
fact_id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL UNIQUE,
category TEXT DEFAULT 'general',
tags TEXT DEFAULT '',
trust_score REAL DEFAULT 0.5,
retrieval_count INTEGER DEFAULT 0,
helpful_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hrr_vector BLOB
);
CREATE TABLE IF NOT EXISTS entities (
@@ -50,11 +41,9 @@ CREATE TABLE IF NOT EXISTS fact_entities (
PRIMARY KEY (fact_id, entity_id)
);
CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC);
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
CREATE INDEX IF NOT EXISTS idx_facts_canonical_key ON facts(canonical_key);
CREATE INDEX IF NOT EXISTS idx_facts_contradiction_group ON facts(contradiction_group);
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC);
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts
USING fts5(content, tags, content=facts, content_rowid=fact_id);
@@ -140,23 +129,10 @@ class MemoryStore:
"""Create tables, indexes, and triggers if they do not exist. Enable WAL mode."""
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.executescript(_SCHEMA)
# Migrate: add hrr_vector column if missing (safe for existing databases)
columns = {row[1] for row in self._conn.execute("PRAGMA table_info(facts)").fetchall()}
migrations = {
"hrr_vector": "ALTER TABLE facts ADD COLUMN hrr_vector BLOB",
"canonical_key": "ALTER TABLE facts ADD COLUMN canonical_key TEXT DEFAULT ''",
"metadata_json": "ALTER TABLE facts ADD COLUMN metadata_json TEXT DEFAULT '{}'",
"confidence": "ALTER TABLE facts ADD COLUMN confidence REAL DEFAULT 0.5",
"source_role": "ALTER TABLE facts ADD COLUMN source_role TEXT DEFAULT ''",
"source_turn": "ALTER TABLE facts ADD COLUMN source_turn INTEGER DEFAULT -1",
"observed_at": "ALTER TABLE facts ADD COLUMN observed_at TEXT DEFAULT ''",
"contradiction_group": "ALTER TABLE facts ADD COLUMN contradiction_group TEXT DEFAULT ''",
"status": "ALTER TABLE facts ADD COLUMN status TEXT DEFAULT 'active'",
}
for column, ddl in migrations.items():
if column not in columns:
self._conn.execute(ddl)
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_canonical_key ON facts(canonical_key)")
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_contradiction_group ON facts(contradiction_group)")
if "hrr_vector" not in columns:
self._conn.execute("ALTER TABLE facts ADD COLUMN hrr_vector BLOB")
self._conn.commit()
# ------------------------------------------------------------------
@@ -168,148 +144,41 @@ class MemoryStore:
content: str,
category: str = "general",
tags: str = "",
*,
canonical_key: str = "",
metadata: dict | None = None,
confidence: float | None = None,
source_role: str = "",
source_turn: int = -1,
observed_at: str = "",
contradiction_group: str = "",
status: str = "active",
) -> int:
"""Insert a fact and return its fact_id.
Exact duplicates are deduplicated by content. Near-duplicates are
normalized by canonical_key, with provenance/evidence merged into the
existing row. Contradictions sharing the same contradiction_group remain
stored as separate rows and are marked inspectably.
Deduplicates by content (UNIQUE constraint). On duplicate, returns
the existing fact_id without modifying the row. Extracts entities from
the content and links them to the fact.
"""
with self._lock:
content = content.strip()
if not content:
raise ValueError("content must not be empty")
metadata = dict(metadata or {})
canonical_key = canonical_key.strip()
contradiction_group = contradiction_group.strip()
observed_at = observed_at.strip()
status = status or "active"
trust_score = self.default_trust if confidence is None else _clamp_trust(confidence)
metadata_json = json.dumps(metadata, sort_keys=True)
if canonical_key:
existing = self._conn.execute(
"SELECT fact_id, metadata_json, trust_score, confidence, observed_at FROM facts WHERE canonical_key = ?",
(canonical_key,),
).fetchone()
if existing is not None:
merged_metadata = self._merge_metadata(existing["metadata_json"], metadata)
merged_trust = max(float(existing["trust_score"]), trust_score)
merged_observed_at = existing["observed_at"] or observed_at
if observed_at and merged_observed_at:
merged_observed_at = min(merged_observed_at, observed_at)
elif observed_at:
merged_observed_at = observed_at
self._conn.execute(
"""
UPDATE facts
SET metadata_json = ?,
trust_score = ?,
confidence = ?,
observed_at = ?,
updated_at = CURRENT_TIMESTAMP
WHERE fact_id = ?
""",
(
json.dumps(merged_metadata, sort_keys=True),
merged_trust,
max(float(existing["confidence"] or 0.0), confidence or trust_score),
merged_observed_at,
existing["fact_id"],
),
)
self._conn.commit()
return int(existing["fact_id"])
contradiction_rows = []
if contradiction_group:
contradiction_rows = self._conn.execute(
"""
SELECT fact_id, canonical_key, metadata_json
FROM facts
WHERE contradiction_group = ?
AND canonical_key != ?
""",
(contradiction_group, canonical_key),
).fetchall()
if contradiction_rows:
status = "contradiction"
metadata = dict(metadata)
metadata["status"] = "contradiction"
metadata["contradiction_group"] = contradiction_group
metadata["contradiction_keys"] = sorted(
{
canonical_key,
*[str(row["canonical_key"]) for row in contradiction_rows if row["canonical_key"]],
}
- {""}
)
metadata_json = json.dumps(metadata, sort_keys=True)
try:
cur = self._conn.execute(
"""
INSERT INTO facts (
content,
category,
tags,
trust_score,
canonical_key,
metadata_json,
confidence,
source_role,
source_turn,
observed_at,
contradiction_group,
status
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO facts (content, category, tags, trust_score)
VALUES (?, ?, ?, ?)
""",
(
content,
category,
tags,
trust_score,
canonical_key,
metadata_json,
confidence if confidence is not None else trust_score,
source_role,
source_turn,
observed_at,
contradiction_group,
status,
),
(content, category, tags, self.default_trust),
)
self._conn.commit()
fact_id: int = cur.lastrowid # type: ignore[assignment]
except sqlite3.IntegrityError:
# Duplicate content — return existing id
row = self._conn.execute(
"SELECT fact_id FROM facts WHERE content = ?", (content,)
).fetchone()
return int(row["fact_id"])
if contradiction_rows:
self._mark_contradictions(
contradiction_group=contradiction_group,
new_canonical_key=canonical_key,
existing_rows=contradiction_rows,
)
# Entity extraction and linking
for name in self._extract_entities(content):
entity_id = self._resolve_entity(name)
self._link_fact_entity(fact_id, entity_id)
# Compute HRR vector after entity linking
self._compute_hrr_vector(fact_id, content)
self._rebuild_bank(category)
@@ -342,9 +211,6 @@ class MemoryStore:
sql = f"""
SELECT f.fact_id, f.content, f.category, f.tags,
f.trust_score, f.retrieval_count, f.helpful_count,
f.canonical_key, f.metadata_json, f.confidence,
f.source_role, f.source_turn, f.observed_at,
f.contradiction_group, f.status,
f.created_at, f.updated_at
FROM facts f
JOIN facts_fts fts ON fts.rowid = f.fact_id
@@ -470,11 +336,7 @@ class MemoryStore:
sql = f"""
SELECT fact_id, content, category, tags, trust_score,
retrieval_count, helpful_count,
canonical_key, metadata_json, confidence,
source_role, source_turn, observed_at,
contradiction_group, status,
created_at, updated_at
retrieval_count, helpful_count, created_at, updated_at
FROM facts
WHERE trust_score >= ?
{category_clause}
@@ -525,89 +387,6 @@ class MemoryStore:
"helpful_count": row["helpful_count"] + helpful_increment,
}
# ------------------------------------------------------------------
# Metadata helpers
# ------------------------------------------------------------------
def _load_metadata(self, metadata_json: str | None) -> dict:
if not metadata_json:
return {}
try:
data = json.loads(metadata_json)
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _merge_metadata(self, existing_json: str | None, incoming: dict | None) -> dict:
existing = self._load_metadata(existing_json)
incoming = dict(incoming or {})
merged = dict(existing)
merged.update({k: v for k, v in incoming.items() if k not in {"provenance", "evidence", "observation_count", "duplicate_count", "contradiction_keys"}})
provenance = []
seen_provenance: set[str] = set()
for item in list(existing.get("provenance", [])) + list(incoming.get("provenance", [])):
if item and item not in seen_provenance:
seen_provenance.add(item)
provenance.append(item)
evidence = []
seen_evidence: set[tuple[str, str, str]] = set()
for item in list(existing.get("evidence", [])) + list(incoming.get("evidence", [])):
if not isinstance(item, dict):
continue
key = (
str(item.get("provenance", "")),
str(item.get("observed_at", "")),
str(item.get("source_text", "")),
)
if key in seen_evidence:
continue
seen_evidence.add(key)
evidence.append(dict(item))
observation_count = int(existing.get("observation_count", max(1, len(existing.get("evidence", [])) or 1)))
observation_count += int(incoming.get("observation_count", max(1, len(incoming.get("evidence", [])) or 1)))
contradiction_keys = []
seen_keys: set[str] = set()
for item in list(existing.get("contradiction_keys", [])) + list(incoming.get("contradiction_keys", [])):
if item and item not in seen_keys:
seen_keys.add(item)
contradiction_keys.append(item)
merged["provenance"] = provenance
merged["evidence"] = evidence
merged["observation_count"] = observation_count
merged["duplicate_count"] = max(0, observation_count - 1)
if contradiction_keys:
merged["contradiction_keys"] = contradiction_keys
return merged
def _mark_contradictions(self, contradiction_group: str, new_canonical_key: str, existing_rows: list[sqlite3.Row]) -> None:
for row in existing_rows:
metadata = self._load_metadata(row["metadata_json"])
keys = []
seen: set[str] = set()
for item in list(metadata.get("contradiction_keys", [])) + [new_canonical_key]:
if item and item not in seen:
seen.add(item)
keys.append(item)
metadata["status"] = "contradiction"
metadata["contradiction_group"] = contradiction_group
metadata["contradiction_keys"] = keys
self._conn.execute(
"""
UPDATE facts
SET status = 'contradiction',
metadata_json = ?,
updated_at = CURRENT_TIMESTAMP
WHERE fact_id = ?
""",
(json.dumps(metadata, sort_keys=True), row["fact_id"]),
)
self._conn.commit()
# ------------------------------------------------------------------
# Entity helpers
# ------------------------------------------------------------------
@@ -781,14 +560,8 @@ class MemoryStore:
# ------------------------------------------------------------------
def _row_to_dict(self, row: sqlite3.Row) -> dict:
"""Convert a sqlite3.Row to a plain dict with decoded metadata."""
data = dict(row)
metadata = self._load_metadata(data.get("metadata_json"))
if metadata:
data["metadata"] = metadata
data.setdefault("relation", metadata.get("relation"))
data.pop("metadata_json", None)
return data
"""Convert a sqlite3.Row to a plain dict."""
return dict(row)
def close(self) -> None:
"""Close the database connection."""

View File

@@ -0,0 +1,318 @@
#!/usr/bin/env python3
"""Generate a grounded TensorZero evaluation packet for Hermes.
This script inventories the current Hermes routing/evaluation surfaces, then
builds a markdown packet assessing how much of issue #860 can be satisfied by
TensorZero and where the migration risk still lives.
"""
from __future__ import annotations
import argparse
import json
import re
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Iterable
ISSUE_NUMBER = 860
ISSUE_TITLE = "tensorzero LLMOps platform evaluation"
ISSUE_URL = "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/860"
DEFAULT_OUTPUT = Path("docs/evaluations/tensorzero-860-evaluation.md")
DEFAULT_JSON_OUTPUT = Path("docs/evaluations/tensorzero-860-evaluation.json")
@dataclass(frozen=True)
class TouchpointPattern:
label: str
file_path: str
regex: str
description: str
@dataclass(frozen=True)
class Touchpoint:
label: str
file_path: str
line_number: int
matched_text: str
@dataclass(frozen=True)
class RequirementStatus:
key: str
name: str
status: str
evidence_labels: tuple[str, ...]
summary: str
@dataclass(frozen=True)
class EvaluationReport:
issue_number: int
issue_title: str
issue_url: str
recommendation: str
touchpoints: tuple[Touchpoint, ...]
requirements: tuple[RequirementStatus, ...]
PATTERNS: tuple[TouchpointPattern, ...] = (
TouchpointPattern(
label="fallback_chain",
file_path="run_agent.py",
regex=r"_fallback_chain|fallback_providers|fallback_model",
description="Primary agent fallback-provider chain in the core conversation loop.",
),
TouchpointPattern(
label="provider_routing_config",
file_path="cli.py",
regex=r"provider_routing|fallback_providers|smart_model_routing",
description="CLI-owned provider routing and fallback configuration surfaces.",
),
TouchpointPattern(
label="runtime_provider",
file_path="hermes_cli/runtime_provider.py",
regex=r"def resolve_runtime_provider|def resolve_requested_provider",
description="Central runtime provider resolution for CLI, gateway, cron, and helpers.",
),
TouchpointPattern(
label="smart_model_routing",
file_path="agent/smart_model_routing.py",
regex=r"def resolve_turn_route|def choose_cheap_model_route",
description="Cheap-vs-strong turn routing that TensorZero would need to absorb or replace.",
),
TouchpointPattern(
label="gateway_provider_routing",
file_path="gateway/run.py",
regex=r"def _load_provider_routing|def _load_fallback_model|def _load_smart_model_routing",
description="Gateway-specific loading of routing, fallback, and smart-model policies.",
),
TouchpointPattern(
label="cron_runtime_provider",
file_path="cron/scheduler.py",
regex=r"resolve_runtime_provider|resolve_turn_route|provider_routing|fallback_model",
description="Cron execution path that re-resolves providers and routing on every run.",
),
TouchpointPattern(
label="auxiliary_fallback_chain",
file_path="agent/auxiliary_client.py",
regex=r"fallback chain|_get_provider_chain|provider chain",
description="Auxiliary task routing/fallback chain outside the main inference path.",
),
TouchpointPattern(
label="delegate_runtime_provider",
file_path="tools/delegate_tool.py",
regex=r"runtime provider system|resolve the full credential bundle|resolve_runtime_provider",
description="Subagent/delegation routing path that would also need TensorZero parity.",
),
TouchpointPattern(
label="session_db",
file_path="hermes_state.py",
regex=r"class SessionDB",
description="Session persistence surface that could feed TensorZero optimization/eval data.",
),
TouchpointPattern(
label="trajectory_export",
file_path="batch_runner.py",
regex=r"trajectory_entry|save_trajectories|_convert_to_trajectory_format",
description="Trajectory export surface for offline optimization and replay data.",
),
TouchpointPattern(
label="benchmark_suite",
file_path="benchmarks/tool_call_benchmark.py",
regex=r"ToolCall\(|class ToolCall|benchmark",
description="Existing benchmark/evaluation harness that could map to TensorZero experiments.",
),
)
def _iter_matches(pattern: TouchpointPattern, text: str) -> Iterable[Touchpoint]:
regex = re.compile(pattern.regex, re.IGNORECASE)
for line_number, line in enumerate(text.splitlines(), start=1):
if regex.search(line):
yield Touchpoint(
label=pattern.label,
file_path=pattern.file_path,
line_number=line_number,
matched_text=line.strip(),
)
def scan_touchpoints(repo_root: Path) -> list[Touchpoint]:
touchpoints: list[Touchpoint] = []
for pattern in PATTERNS:
path = repo_root / pattern.file_path
if not path.exists():
continue
text = path.read_text(encoding="utf-8")
touchpoints.extend(_iter_matches(pattern, text))
return touchpoints
def build_requirement_matrix(touchpoints: list[Touchpoint]) -> list[RequirementStatus]:
labels = {tp.label for tp in touchpoints}
matrix: list[RequirementStatus] = []
gateway_labels = (
"fallback_chain",
"runtime_provider",
"gateway_provider_routing",
"cron_runtime_provider",
"auxiliary_fallback_chain",
"delegate_runtime_provider",
)
gateway_hits = tuple(label for label in gateway_labels if label in labels)
gateway_status = "partial" if len(gateway_hits) >= 4 else "gap"
gateway_summary = (
"Hermes already spreads provider routing across core agent, runtime provider, gateway, cron, auxiliary, and delegation seams; "
"TensorZero would need parity across all of them before it can replace the gateway layer."
if gateway_hits else
"No grounded routing surfaces were found for a gateway replacement assessment."
)
matrix.append(RequirementStatus("gateway_replacement", "Gateway replacement scope", gateway_status, gateway_hits, gateway_summary))
config_labels = (
"provider_routing_config",
"runtime_provider",
"smart_model_routing",
"fallback_chain",
)
config_hits = tuple(label for label in config_labels if label in labels)
config_status = "partial" if len(config_hits) >= 3 else "gap"
config_summary = (
"Hermes has multiple config concepts to migrate (`provider_routing`, `fallback_providers`, `smart_model_routing`, runtime provider resolution), "
"so TensorZero is not a drop-in config swap."
if config_hits else
"No current config migration surface was found."
)
matrix.append(RequirementStatus("config_migration", "Config migration", config_status, config_hits, config_summary))
canary_hits: tuple[str, ...] = tuple()
canary_summary = (
"The repo shows semantic routing and fallback, but no grounded 10% traffic-split canary mechanism. "
"A TensorZero cutover would need new percentage-based rollout controls and observability hooks."
)
matrix.append(RequirementStatus("canary_rollout", "10% traffic canary", "gap", canary_hits, canary_summary))
session_labels = ("session_db", "trajectory_export")
session_hits = tuple(label for label in session_labels if label in labels)
session_status = "partial" if len(session_hits) == len(session_labels) else "gap"
session_summary = (
"Hermes already has SessionDB and trajectory export surfaces that can feed offline optimization data, "
"but not a TensorZero-native ingestion path yet."
if session_hits else
"No session-data surface was found for prompt optimization."
)
matrix.append(RequirementStatus("session_feedback", "Session data for prompt optimization", session_status, session_hits, session_summary))
eval_labels = ("benchmark_suite", "trajectory_export")
eval_hits = tuple(label for label in eval_labels if label in labels)
eval_status = "partial" if "benchmark_suite" in eval_hits else "gap"
eval_summary = (
"Hermes already has benchmark/trajectory machinery that can seed TensorZero A/B evaluation, "
"but no integrated TensorZero experiment runner or live evaluation gateway."
if eval_hits else
"No evaluation harness was found to support TensorZero A/B testing."
)
matrix.append(RequirementStatus("evaluation_suite", "Evaluation suite / A/B testing", eval_status, eval_hits, eval_summary))
return matrix
def build_report(touchpoints: list[Touchpoint], requirement_matrix: list[RequirementStatus]) -> EvaluationReport:
recommendation = (
"Not ready for direct replacement. Recommend a shadow-evaluation phase first: keep Hermes routing live, "
"inventory the migration seams, export SessionDB/trajectory data into an offline TensorZero experiment loop, "
"and only design a canary gateway once percentage-based rollout controls exist."
)
return EvaluationReport(
issue_number=ISSUE_NUMBER,
issue_title=ISSUE_TITLE,
issue_url=ISSUE_URL,
recommendation=recommendation,
touchpoints=tuple(touchpoints),
requirements=tuple(requirement_matrix),
)
def build_markdown(report: EvaluationReport) -> str:
lines: list[str] = []
lines.append("# TensorZero Evaluation Packet")
lines.append("")
lines.append(f"Issue #{report.issue_number}: [{report.issue_title}]({report.issue_url})")
lines.append("")
lines.append("## Scope")
lines.append("")
lines.append("This packet evaluates TensorZero as a possible replacement for Hermes' custom provider-routing stack.")
lines.append("It is intentionally grounded in the current repo state rather than a speculative cutover plan.")
lines.append("")
lines.append("## Issue requirements being evaluated")
lines.append("")
lines.append("- Deploy tensorzero gateway (Rust binary)")
lines.append("- Migrate provider routing config")
lines.append("- Test with canary (10% traffic) before full cutover")
lines.append("- Feed session data for prompt optimization")
lines.append("- Evaluation suite for A/B testing models")
lines.append("")
lines.append("## Recommendation")
lines.append("")
lines.append(report.recommendation)
lines.append("")
lines.append("## Requirement matrix")
lines.append("")
lines.append("| Requirement | Status | Evidence labels | Summary |")
lines.append("| --- | --- | --- | --- |")
for row in report.requirements:
evidence = ", ".join(row.evidence_labels) if row.evidence_labels else ""
lines.append(f"| {row.name} | {row.status} | {evidence} | {row.summary} |")
lines.append("")
lines.append("## Grounded Hermes touchpoints")
lines.append("")
if report.touchpoints:
for tp in report.touchpoints:
lines.append(f"- `{tp.file_path}:{tp.line_number}` — [{tp.label}] {tp.matched_text}")
else:
lines.append("- No routing/evaluation touchpoints were found.")
lines.append("")
lines.append("## Suggested next slice")
lines.append("")
lines.append("1. Build an exporter that emits SessionDB + trajectory data into a TensorZero-friendly offline dataset.")
lines.append("2. Define percentage-based canary controls before attempting any gateway replacement.")
lines.append("3. Keep Hermes routing authoritative until TensorZero proves parity across CLI, gateway, cron, auxiliary, and delegation surfaces.")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def write_outputs(report: EvaluationReport, markdown_path: Path, json_path: Path | None = None) -> None:
markdown_path.parent.mkdir(parents=True, exist_ok=True)
markdown_path.write_text(build_markdown(report), encoding="utf-8")
if json_path is not None:
json_path.parent.mkdir(parents=True, exist_ok=True)
json_path.write_text(json.dumps(asdict(report), indent=2), encoding="utf-8")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate a grounded TensorZero evaluation packet for Hermes")
parser.add_argument("--repo-root", default=".", help="Hermes repo root to scan")
parser.add_argument("--output", default=str(DEFAULT_OUTPUT), help="Markdown output path")
parser.add_argument("--json-output", default=str(DEFAULT_JSON_OUTPUT), help="Optional JSON output path")
return parser.parse_args()
def main() -> int:
args = parse_args()
repo_root = Path(args.repo_root).resolve()
touchpoints = scan_touchpoints(repo_root)
matrix = build_requirement_matrix(touchpoints)
report = build_report(touchpoints, matrix)
json_output = Path(args.json_output) if args.json_output else None
write_outputs(report, Path(args.output), json_output)
print(f"Wrote {args.output}")
if json_output is not None:
print(f"Wrote {json_output}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,63 +0,0 @@
{
"preferences_and_duplicates": [
{
"role": "user",
"content": "Deploy via Ansible for production changes.",
"created_at": "2026-04-22T10:00:00Z"
},
{
"role": "user",
"content": "We deploy through Ansible on this repo.",
"created_at": "2026-04-22T10:01:00Z"
},
{
"role": "user",
"content": "Gitea-first for repository work.",
"created_at": "2026-04-22T10:02:00Z"
}
],
"operational_and_contradictions": [
{
"role": "user",
"content": "The BURN watchdog caps dispatches per cycle to 6.",
"created_at": "2026-04-22T11:00:00Z"
},
{
"role": "user",
"content": "The provider should stay openai-codex/gpt-5.4.",
"created_at": "2026-04-22T11:01:00Z"
},
{
"role": "user",
"content": "Correction: the provider should stay mimo-v2-pro.",
"created_at": "2026-04-22T11:02:00Z"
}
],
"mixed_transcript": [
{
"role": "user",
"content": "Deploy via Ansible for production changes.",
"created_at": "2026-04-22T10:00:00Z"
},
{
"role": "user",
"content": "We deploy through Ansible on this repo.",
"created_at": "2026-04-22T10:01:00Z"
},
{
"role": "user",
"content": "The BURN watchdog caps dispatches per cycle to 6.",
"created_at": "2026-04-22T11:00:00Z"
},
{
"role": "user",
"content": "The provider should stay openai-codex/gpt-5.4.",
"created_at": "2026-04-22T11:01:00Z"
},
{
"role": "user",
"content": "Correction: the provider should stay mimo-v2-pro.",
"created_at": "2026-04-22T11:02:00Z"
}
]
}

View File

@@ -1,50 +0,0 @@
"""Integration tests for holographic auto-extraction with structured fact persistence."""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
from plugins.memory.holographic import HolographicMemoryProvider
_FIXTURE_PATH = Path(__file__).resolve().parents[2] / "fixtures" / "memory_extraction_fragments.json"
def _load_fixture(name: str):
return json.loads(_FIXTURE_PATH.read_text())[name]
class TestHolographicAutoExtract:
def test_auto_extract_persists_structured_metadata_and_normalizes_duplicates(self, tmp_path):
provider = HolographicMemoryProvider(
config={
"db_path": str(tmp_path / "memory_store.db"),
"auto_extract": True,
"default_trust": 0.5,
}
)
provider.initialize("test-session")
messages = _load_fixture("mixed_transcript")
provider.on_session_end(messages)
provider.on_session_end(messages)
facts = provider._store.list_facts(min_trust=0.0, limit=20)
deploy_facts = [f for f in facts if f.get("relation") == "workflow.deploy_method"]
provider_facts = [f for f in facts if f.get("contradiction_group") == "config.provider"]
assert len(deploy_facts) == 1
assert deploy_facts[0]["metadata"]["duplicate_count"] >= 3
assert deploy_facts[0]["observed_at"] == "2026-04-22T10:00:00Z"
assert deploy_facts[0]["metadata"]["provenance"] == [
"conversation:user:0",
"conversation:user:1",
]
assert len(provider_facts) == 2
assert {f["status"] for f in provider_facts} == {"contradiction"}
assert {f["metadata"]["value"] for f in provider_facts} == {
"openai-codex/gpt-5.4",
"mimo-v2-pro",
}

View File

@@ -1,6 +1,6 @@
"""Tests for session compaction with fact extraction."""
import json
import pytest
import sys
from pathlib import Path
@@ -8,19 +8,12 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from agent.session_compactor import (
ExtractedFact,
evaluate_extraction_quality,
extract_and_save_facts,
extract_facts_from_messages,
format_facts_summary,
save_facts_to_store,
extract_and_save_facts,
format_facts_summary,
)
_FIXTURE_PATH = Path(__file__).resolve().parent / "fixtures" / "memory_extraction_fragments.json"
def _load_fixture(name: str):
return json.loads(_FIXTURE_PATH.read_text())[name]
class TestFactExtraction:
def test_extract_preference(self):
@@ -67,48 +60,14 @@ class TestFactExtraction:
{"role": "user", "content": "I prefer Python."},
]
facts = extract_facts_from_messages(messages)
# Should deduplicate
python_facts = [f for f in facts if "Python" in f.content]
assert len(python_facts) == 1
def test_structured_fact_preserves_provenance_and_temporal_metadata(self):
facts = extract_facts_from_messages(_load_fixture("preferences_and_duplicates"))
deploy_fact = next(f for f in facts if f.relation == "workflow.deploy_method")
assert deploy_fact.source_role == "user"
assert deploy_fact.source_turn == 0
assert deploy_fact.observed_at == "2026-04-22T10:00:00Z"
assert deploy_fact.provenance == "conversation:user:0"
assert deploy_fact.canonical_key
assert deploy_fact.evidence
assert deploy_fact.evidence[0]["source_text"].startswith("Deploy via Ansible")
def test_near_duplicate_facts_are_normalized_into_one_canonical_fact(self):
facts = extract_facts_from_messages(_load_fixture("preferences_and_duplicates"))
deploy_facts = [f for f in facts if f.relation == "workflow.deploy_method"]
assert len(deploy_facts) == 1
assert len(deploy_facts[0].evidence) == 2
assert deploy_facts[0].metadata["duplicate_count"] == 1
def test_contradictory_facts_are_preserved_for_unique_slots(self):
facts = extract_facts_from_messages(_load_fixture("operational_and_contradictions"))
provider_facts = [f for f in facts if f.contradiction_group == "config.provider"]
assert len(provider_facts) == 2
assert {f.status for f in provider_facts} == {"contradiction"}
assert {f.normalized_content for f in provider_facts} == {
"openai codex gpt 5 4",
"mimo v2 pro",
}
def test_quality_evaluation_reports_noise_reduction(self):
metrics = evaluate_extraction_quality(_load_fixture("mixed_transcript"))
assert metrics["raw_candidates"] > metrics["normalized_facts"]
assert metrics["noise_reduction"] > 0
assert metrics["contradiction_groups"] == 1
class TestSaveFacts:
def test_save_with_callback(self):
saved = []
def mock_save(category, entity, content, trust):
saved.append({"category": category, "content": content})
@@ -117,38 +76,6 @@ class TestSaveFacts:
assert count == 1
assert len(saved) == 1
def test_save_with_extended_callback_metadata(self):
saved = []
def mock_save(category, entity, content, trust, **kwargs):
saved.append({
"category": category,
"entity": entity,
"content": content,
"trust": trust,
**kwargs,
})
fact = ExtractedFact(
"project.operational",
"watchdog",
"BURN watchdog caps dispatches per cycle to 6",
0.9,
2,
source_role="user",
observed_at="2026-04-22T11:00:00Z",
provenance="conversation:user:2",
canonical_key="project.operational|watchdog|dispatch_cap|6",
relation="fleet.dispatch_cap",
contradiction_group="fleet.dispatch_cap",
metadata={"duplicate_count": 0},
)
count = save_facts_to_store([fact], fact_store_fn=mock_save)
assert count == 1
assert saved[0]["canonical_key"] == fact.canonical_key
assert saved[0]["observed_at"] == "2026-04-22T11:00:00Z"
assert saved[0]["metadata"]["duplicate_count"] == 0
class TestFormatSummary:
def test_empty(self):

View File

@@ -0,0 +1,149 @@
from pathlib import Path
import sys
SCRIPT_DIR = Path(__file__).resolve().parents[1] / "scripts"
sys.path.insert(0, str(SCRIPT_DIR))
import tensorzero_eval_packet as tz
def test_scan_touchpoints_finds_expected_matches(tmp_path):
(tmp_path / "run_agent.py").write_text(
"self._fallback_chain = []\n# Provider fallback chain\n"
)
(tmp_path / "hermes_cli").mkdir()
(tmp_path / "hermes_cli" / "runtime_provider.py").write_text(
"def resolve_runtime_provider():\n return {}\n"
)
(tmp_path / "agent").mkdir()
(tmp_path / "agent" / "smart_model_routing.py").write_text(
"def resolve_turn_route(user_message, routing_config, primary):\n return primary\n"
)
(tmp_path / "gateway").mkdir()
(tmp_path / "gateway" / "run.py").write_text(
"def _load_provider_routing():\n return {}\n"
)
(tmp_path / "cron").mkdir()
(tmp_path / "cron" / "scheduler.py").write_text(
"runtime = resolve_runtime_provider()\nturn_route = resolve_turn_route('x', {}, {})\n"
)
(tmp_path / "hermes_state.py").write_text("class SessionDB:\n pass\n")
(tmp_path / "benchmarks").mkdir()
(tmp_path / "benchmarks" / "tool_call_benchmark.py").write_text(
"class ToolCall: ...\n"
)
touchpoints = tz.scan_touchpoints(tmp_path)
labels = {tp.label for tp in touchpoints}
assert "fallback_chain" in labels
assert "runtime_provider" in labels
assert "smart_model_routing" in labels
assert "gateway_provider_routing" in labels
assert "cron_runtime_provider" in labels
assert "session_db" in labels
assert "benchmark_suite" in labels
def test_build_requirement_matrix_marks_canary_as_gap_without_split_support():
touchpoints = [
tz.Touchpoint(
label="runtime_provider",
file_path="hermes_cli/runtime_provider.py",
line_number=10,
matched_text="def resolve_runtime_provider",
),
tz.Touchpoint(
label="provider_routing_config",
file_path="cli.py",
line_number=20,
matched_text='provider_routing',
),
tz.Touchpoint(
label="fallback_chain",
file_path="run_agent.py",
line_number=21,
matched_text='_fallback_chain = []',
),
tz.Touchpoint(
label="smart_model_routing",
file_path="agent/smart_model_routing.py",
line_number=30,
matched_text='resolve_turn_route',
),
tz.Touchpoint(
label="gateway_provider_routing",
file_path="gateway/run.py",
line_number=35,
matched_text='def _load_provider_routing',
),
tz.Touchpoint(
label="cron_runtime_provider",
file_path="cron/scheduler.py",
line_number=36,
matched_text='runtime = resolve_runtime_provider()',
),
tz.Touchpoint(
label="session_db",
file_path="hermes_state.py",
line_number=40,
matched_text='class SessionDB',
),
tz.Touchpoint(
label="trajectory_export",
file_path="batch_runner.py",
line_number=50,
matched_text='trajectory_entry',
),
tz.Touchpoint(
label="benchmark_suite",
file_path="benchmarks/tool_call_benchmark.py",
line_number=60,
matched_text='ToolCall',
),
]
matrix = tz.build_requirement_matrix(touchpoints)
by_key = {row.key: row for row in matrix}
assert by_key["gateway_replacement"].status == "partial"
assert by_key["config_migration"].status == "partial"
assert by_key["canary_rollout"].status == "gap"
assert by_key["session_feedback"].status == "partial"
assert by_key["evaluation_suite"].status == "partial"
def test_build_markdown_renders_recommendation_and_touchpoints():
touchpoints = [
tz.Touchpoint(
label="runtime_provider",
file_path="hermes_cli/runtime_provider.py",
line_number=10,
matched_text="def resolve_runtime_provider",
),
tz.Touchpoint(
label="session_db",
file_path="hermes_state.py",
line_number=40,
matched_text='class SessionDB',
),
]
matrix = tz.build_requirement_matrix(touchpoints)
report = tz.build_report(touchpoints, matrix)
markdown = tz.build_markdown(report)
assert "# TensorZero Evaluation Packet" in markdown
assert "gateway_replacement" not in markdown # human labels, not raw keys
assert "Gateway replacement scope" in markdown
assert "Not ready for direct replacement" in markdown
assert "hermes_cli/runtime_provider.py:10" in markdown
assert "hermes_state.py:40" in markdown
def test_issue_context_is_embedded_in_report():
report = tz.build_report([], [])
markdown = tz.build_markdown(report)
assert "Issue #860" in markdown
assert "tensorzero" in markdown.lower()
assert "10% traffic" in markdown