Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
1bb48c91e7 feat: add morning review packet generator (#966)
All checks were successful
Lint / lint (pull_request) Successful in 10s
Implements one concrete Phase 2 slice for the morning review packet epic:
- add a script that fetches an epic and child QA issues from Gitea
- parse structured QA issue sections into a reusable packet model
- render a review-ready markdown packet
- add a generated 2026-04-21 Hermes harness review packet artifact
- cover parsing and rendering with targeted tests

Refs #966
2026-04-22 10:59:01 -04:00
9 changed files with 1063 additions and 1078 deletions

View File

@@ -1,546 +1,197 @@
"""Session compaction with structured fact extraction.
"""Session compaction with fact extraction.
Before compressing conversation context, extract durable facts with enough
structure to survive retrieval: source/provenance, temporal anchors,
normalized canonical keys, and contradiction groups.
Before compressing conversation context, extracts durable facts
(user preferences, corrections, project details) and saves them
to the fact store so they survive compression.
Usage:
from agent.session_compactor import extract_and_save_facts
facts = extract_and_save_facts(messages)
"""
from __future__ import annotations
import json
import logging
import re
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
_DEPLOY_METHOD_RE = re.compile(r"\bdeploy(?:ing)?\s+(?:via|through|with)\s+([A-Za-z0-9_./+-]+)", re.IGNORECASE)
_WATCHDOG_CAP_RE = re.compile(
r"\b(?:the\s+)?([A-Za-z0-9_-]+(?:\s+watchdog)?)\s+(?:caps|limits)\s+dispatches(?:\s+per\s+cycle)?\s+to\s+([0-9]+)",
re.IGNORECASE,
)
_PROVIDER_RE = re.compile(
r"\bprovider\s+(?:is|should\s+stay|should\s+be|needs\s+to\s+be)\s+([A-Za-z0-9._/-]+)",
re.IGNORECASE,
)
_MODEL_RE = re.compile(
r"\bmodel\s+(?:is|should\s+stay|should\s+be|needs\s+to\s+be)\s+([A-Za-z0-9._:/-]+)",
re.IGNORECASE,
)
_PORT_RE = re.compile(r"\bport\s+(?:is|should\s+be)\s+([0-9]+)", re.IGNORECASE)
_PROJECT_USES_RE = re.compile(r"\b(?:the\s+)?project\s+(?:uses|needs|requires)\s+(.+?)(?:[.!?]|$)", re.IGNORECASE)
_PREFERENCE_RE = re.compile(r"\bI\s+(?:prefer|like|want|need)\s+(.+?)(?:[.!?]|$)", re.IGNORECASE)
_CONSTRAINT_RE = re.compile(r"\b(?:do\s+not|don't)\s+(?:ever\s+|again\s+)?(.+?)(?:[.!?]|$)", re.IGNORECASE)
_DECISION_RE = re.compile(r"\b(?:we|the\s+team)\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+?)(?:[.!?]|$)", re.IGNORECASE)
@dataclass
class ExtractedFact:
"""A durable fact extracted from conversation."""
category: str
entity: str
content: str
confidence: float
source_turn: int
"""A fact extracted from conversation."""
category: str # "user_pref", "correction", "project", "tool_quirk", "general"
entity: str # what the fact is about
content: str # the fact itself
confidence: float # 0.0-1.0
source_turn: int # which message turn it came from
timestamp: float = 0.0
source_role: str = "user"
source_text: str = ""
normalized_content: str = ""
canonical_key: str = ""
relation: str = "general"
contradiction_group: str = ""
status: str = "active"
provenance: str = ""
observed_at: str = ""
evidence: List[Dict[str, Any]] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if not self.timestamp:
self.timestamp = time.time()
if not self.observed_at:
self.observed_at = _iso_from_timestamp(self.timestamp)
if not self.normalized_content:
self.normalized_content = _normalize_value(self.content)
if not self.provenance:
self.provenance = f"conversation:{self.source_role}:{self.source_turn}"
if not self.canonical_key:
self.canonical_key = _canonical_key(self.entity, self.relation, self.normalized_content)
if not self.evidence:
self.evidence = [
{
"source_role": self.source_role,
"source_turn": self.source_turn,
"source_text": self.source_text or self.content,
"observed_at": self.observed_at,
"provenance": self.provenance,
}
]
self.metadata = dict(self.metadata or {})
self.metadata.setdefault("entity", self.entity)
self.metadata.setdefault("relation", self.relation)
self.metadata.setdefault("value", self.content)
self.metadata.setdefault("normalized_value", self.normalized_content)
self.metadata.setdefault("provenance", [self.provenance])
self.metadata.setdefault("evidence", list(self.evidence))
self.metadata.setdefault("observation_count", len(self.evidence))
self.metadata.setdefault("duplicate_count", max(0, self.metadata["observation_count"] - 1))
if self.contradiction_group:
self.metadata.setdefault("contradiction_group", self.contradiction_group)
self.metadata.setdefault("status", self.status)
# Patterns that indicate user preferences
_PREFERENCE_PATTERNS = [
(r"(?:I|we) (?:prefer|like|want|need) (.+?)(?:\.|$)", "preference"),
(r"(?:always|never) (?:use|do|run|deploy) (.+?)(?:\.|$)", "preference"),
(r"(?:my|our) (?:default|preferred|usual) (.+?) (?:is|are) (.+?)(?:\.|$)", "preference"),
(r"(?:make sure|ensure|remember) (?:to|that) (.+?)(?:\.|$)", "instruction"),
(r"(?:don'?t|do not) (?:ever|ever again) (.+?)(?:\.|$)", "constraint"),
]
# Patterns that indicate corrections
_CORRECTION_PATTERNS = [
(r"(?:actually|no[, ]|wait[, ]|correction[: ]|sorry[, ]) (.+)", "correction"),
(r"(?:I meant|what I meant was|the correct) (.+?)(?:\.|$)", "correction"),
(r"(?:it'?s|its) (?:not|shouldn'?t be|wrong) (.+?)(?:\.|$)", "correction"),
]
# Patterns that indicate project/tool facts
_PROJECT_PATTERNS = [
(r"(?:the |our )?(?:project|repo|codebase|code) (?:is|uses|needs|requires) (.+?)(?:\.|$)", "project"),
(r"(?:deploy|push|commit) (?:to|on) (.+?)(?:\.|$)", "project"),
(r"(?:this|that|the) (?:server|host|machine|VPS) (?:is|runs|has) (.+?)(?:\.|$)", "infrastructure"),
(r"(?:model|provider|engine) (?:is|should be|needs to be) (.+?)(?:\.|$)", "config"),
]
def extract_facts_from_messages(messages: List[Dict[str, Any]]) -> List[ExtractedFact]:
"""Extract durable facts from conversation messages.
Scans conversation turns for preferences, decisions, corrections, and
operational state. Raw candidates are normalized into canonical facts so
near-duplicates merge and contradictions remain inspectable.
Scans user messages for preferences, corrections, project facts,
and infrastructure details that should survive compression.
"""
facts = []
seen_contents = set()
raw_candidates: list[ExtractedFact] = []
for turn_idx, msg in enumerate(messages):
role = msg.get("role", "")
content = msg.get("content", "")
if role not in {"user", "assistant"}:
# Only scan user messages and assistant responses with corrections
if role not in ("user", "assistant"):
continue
if not content or not isinstance(content, str):
continue
if len(content) < 10:
continue
# Skip tool results and system messages
if role == "assistant" and msg.get("tool_calls"):
continue
if not isinstance(content, str) or len(content.strip()) < 10:
continue
timestamp, observed_at = _message_time(msg)
raw_candidates.extend(
_extract_from_text(
content.strip(),
turn_idx=turn_idx,
role=role,
timestamp=timestamp,
observed_at=observed_at,
)
)
extracted = _extract_from_text(content, turn_idx, role)
return _normalize_candidates(raw_candidates)
def evaluate_extraction_quality(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Return before/after metrics for raw vs normalized extraction quality."""
raw_candidates: list[ExtractedFact] = []
for turn_idx, msg in enumerate(messages):
role = msg.get("role", "")
content = msg.get("content", "")
if role not in {"user", "assistant"}:
continue
if role == "assistant" and msg.get("tool_calls"):
continue
if not isinstance(content, str) or len(content.strip()) < 10:
continue
timestamp, observed_at = _message_time(msg)
raw_candidates.extend(
_extract_from_text(
content.strip(),
turn_idx=turn_idx,
role=role,
timestamp=timestamp,
observed_at=observed_at,
)
)
normalized = _normalize_candidates(raw_candidates)
raw_count = len(raw_candidates)
normalized_count = len(normalized)
contradiction_groups = {
fact.contradiction_group
for fact in normalized
if fact.status == "contradiction" and fact.contradiction_group
}
duplicate_count = max(0, raw_count - normalized_count)
noise_reduction = (duplicate_count / raw_count) if raw_count else 0.0
return {
"raw_candidates": raw_count,
"normalized_facts": normalized_count,
"duplicates_merged": duplicate_count,
"contradiction_groups": len(contradiction_groups),
"noise_reduction": round(noise_reduction, 3),
}
def _extract_from_text(
text: str,
*,
turn_idx: int,
role: str,
timestamp: float,
observed_at: str,
) -> List[ExtractedFact]:
"""Extract raw fact candidates from a single text block."""
facts: list[ExtractedFact] = []
if role != "user":
return facts
deploy_match = _DEPLOY_METHOD_RE.search(text)
if deploy_match:
method = deploy_match.group(1).strip()
facts.append(
_build_fact(
category="project.decision",
entity="project",
relation="workflow.deploy_method",
value=method,
content=f"Deploy via {method}",
confidence=0.88,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
watchdog_match = _WATCHDOG_CAP_RE.search(text)
if watchdog_match:
watchdog = watchdog_match.group(1).strip()
cap = watchdog_match.group(2).strip()
facts.append(
_build_fact(
category="project.operational",
entity=_normalize_entity(watchdog),
relation="fleet.dispatch_cap",
value=cap,
content=f"{watchdog} caps dispatches per cycle to {cap}",
confidence=0.92,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
provider_match = _PROVIDER_RE.search(text)
if provider_match:
provider = provider_match.group(1).strip()
facts.append(
_build_fact(
category="project.config",
entity="project",
relation="config.provider",
value=provider,
content=f"Provider should stay {provider}",
confidence=0.91,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
model_match = _MODEL_RE.search(text)
if model_match:
model = model_match.group(1).strip()
facts.append(
_build_fact(
category="project.config",
entity="project",
relation="config.model",
value=model,
content=f"Model should stay {model}",
confidence=0.9,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
port_match = _PORT_RE.search(text)
if port_match:
port = port_match.group(1).strip()
facts.append(
_build_fact(
category="project.config",
entity="project",
relation="config.port",
value=port,
content=f"Port is {port}",
confidence=0.9,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
project_match = _PROJECT_USES_RE.search(text)
if project_match:
value = project_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="project.stack",
entity="project",
relation="project.stack",
value=value,
content=f"Project uses {value}",
confidence=0.74,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
preference_match = _PREFERENCE_RE.search(text)
if preference_match:
value = preference_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="user_pref.preference",
entity="user",
relation="user.preference",
value=value,
content=value,
confidence=0.72,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
constraint_match = _CONSTRAINT_RE.search(text)
if constraint_match:
value = constraint_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="user_pref.constraint",
entity="user",
relation="user.constraint",
value=value,
content=f"Do not {value}",
confidence=0.82,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
decision_match = _DECISION_RE.search(text)
if decision_match:
value = decision_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="project.decision",
entity="project",
relation="project.decision",
value=value,
content=f"Decision: {value}",
confidence=0.79,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
# Deduplicate by content
for fact in extracted:
key = f"{fact.category}:{fact.content[:100]}"
if key not in seen_contents:
seen_contents.add(key)
facts.append(fact)
return facts
def _build_fact(
*,
category: str,
entity: str,
relation: str,
value: str,
content: str,
confidence: float,
source_turn: int,
source_role: str,
source_text: str,
timestamp: float,
observed_at: str,
unique_slot: bool,
) -> ExtractedFact:
normalized_value = _normalize_value(value.rstrip(".!?"))
value = value.rstrip(".!?")
content = content.rstrip(".!?")
provenance = f"conversation:{source_role}:{source_turn}"
contradiction_group = relation if unique_slot else ""
evidence = [
{
"source_role": source_role,
"source_turn": source_turn,
"source_text": source_text,
"observed_at": observed_at,
"provenance": provenance,
}
]
metadata = {
"entity": entity,
"relation": relation,
"value": value,
"normalized_value": normalized_value,
"provenance": [provenance],
"evidence": list(evidence),
"observation_count": 1,
"duplicate_count": 0,
"status": "active",
}
if contradiction_group:
metadata["contradiction_group"] = contradiction_group
return ExtractedFact(
category=category,
entity=entity,
content=content,
confidence=confidence,
source_turn=source_turn,
timestamp=timestamp,
source_role=source_role,
source_text=source_text,
normalized_content=normalized_value,
canonical_key=_canonical_key(entity, relation, normalized_value),
relation=relation,
contradiction_group=contradiction_group,
status="active",
provenance=provenance,
observed_at=observed_at,
evidence=evidence,
metadata=metadata,
)
def _extract_from_text(text: str, turn_idx: int, role: str) -> List[ExtractedFact]:
"""Extract facts from a single text block."""
facts = []
timestamp = time.time()
# Clean text for pattern matching
clean = text.strip()
def _normalize_candidates(candidates: List[ExtractedFact]) -> List[ExtractedFact]:
"""Merge duplicates and mark contradictions while preserving evidence."""
# User preference patterns (from user messages)
if role == "user":
for pattern, subcategory in _PREFERENCE_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"user_pref.{subcategory}",
entity="user",
content=content[:200],
confidence=0.7,
source_turn=turn_idx,
timestamp=timestamp,
))
by_key: dict[str, ExtractedFact] = {}
contradiction_groups: dict[str, list[ExtractedFact]] = {}
# Correction patterns (from user messages)
if role == "user":
for pattern, subcategory in _CORRECTION_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"correction.{subcategory}",
entity="user",
content=content[:200],
confidence=0.8,
source_turn=turn_idx,
timestamp=timestamp,
))
for candidate in candidates:
existing = by_key.get(candidate.canonical_key)
if existing is not None:
by_key[candidate.canonical_key] = _merge_fact(existing, candidate)
continue
# Project/infrastructure patterns (from both user and assistant)
for pattern, subcategory in _PROJECT_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"project.{subcategory}",
entity=subcategory,
content=content[:200],
confidence=0.6,
source_turn=turn_idx,
timestamp=timestamp,
))
by_key[candidate.canonical_key] = candidate
if candidate.contradiction_group:
contradiction_groups.setdefault(candidate.contradiction_group, []).append(candidate)
for group, facts in contradiction_groups.items():
canonical_keys = {fact.canonical_key for fact in facts}
if len(canonical_keys) <= 1:
continue
for fact in facts:
fact.status = "contradiction"
fact.metadata["status"] = "contradiction"
fact.metadata["contradiction_group"] = group
fact.metadata["contradiction_keys"] = sorted(canonical_keys - {fact.canonical_key})
return sorted(by_key.values(), key=lambda fact: (fact.source_turn, fact.timestamp, fact.canonical_key))
def _merge_fact(existing: ExtractedFact, incoming: ExtractedFact) -> ExtractedFact:
existing.confidence = max(existing.confidence, incoming.confidence)
existing.timestamp = min(existing.timestamp, incoming.timestamp)
existing.source_turn = min(existing.source_turn, incoming.source_turn)
if not existing.observed_at or (incoming.observed_at and incoming.observed_at < existing.observed_at):
existing.observed_at = incoming.observed_at
existing.provenance = min(existing.provenance, incoming.provenance)
provenance = _ordered_unique(existing.metadata.get("provenance", []), incoming.metadata.get("provenance", []))
evidence = _merge_evidence(existing.metadata.get("evidence", []), incoming.metadata.get("evidence", []))
observation_count = int(existing.metadata.get("observation_count", len(existing.evidence) or 1))
observation_count += int(incoming.metadata.get("observation_count", len(incoming.evidence) or 1))
existing.evidence = evidence
existing.metadata["provenance"] = provenance
existing.metadata["evidence"] = evidence
existing.metadata["observation_count"] = observation_count
existing.metadata["duplicate_count"] = max(0, observation_count - 1)
existing.metadata["status"] = existing.status
return existing
return facts
def save_facts_to_store(facts: List[ExtractedFact], fact_store_fn=None) -> int:
"""Save extracted facts to the fact store.
If a callback is supplied, prefer the structured signature but fall back to
the legacy four-argument callback for compatibility.
Args:
facts: List of extracted facts.
fact_store_fn: Optional callable(category, entity, content, trust).
If None, uses the holographic fact store if available.
Returns:
Number of facts saved.
"""
saved = 0
for fact in facts:
payload = {
"category": _store_category(fact.category),
"entity": fact.entity,
"content": fact.content,
"trust": fact.confidence,
"metadata": dict(fact.metadata),
"canonical_key": fact.canonical_key,
"observed_at": fact.observed_at,
"source_role": fact.source_role,
"source_turn": fact.source_turn,
"contradiction_group": fact.contradiction_group,
"status": fact.status,
"relation": fact.relation,
}
if fact_store_fn:
if fact_store_fn:
for fact in facts:
try:
fact_store_fn(**payload)
fact_store_fn(
category=fact.category,
entity=fact.entity,
content=fact.content,
trust=fact.confidence,
)
saved += 1
continue
except TypeError:
try:
fact_store_fn(payload["category"], payload["entity"], payload["content"], payload["trust"])
saved += 1
continue
except Exception as exc:
logger.debug("Failed to save fact via callback: %s", exc)
continue
except Exception as exc:
logger.debug("Failed to save fact via callback: %s", exc)
continue
except Exception as e:
logger.debug("Failed to save fact: %s", e)
else:
# Try holographic fact store
try:
from fact_store import fact_store as _fs
tags = ",".join(filter(None, [fact.entity, fact.relation, fact.status]))
_fs(
action="add",
content=fact.content,
category=_store_category(fact.category),
tags=tags,
trust_delta=fact.confidence - 0.5,
)
saved += 1
for fact in facts:
try:
_fs(
action="add",
content=fact.content,
category=fact.category,
tags=fact.entity,
trust_delta=fact.confidence - 0.5,
)
saved += 1
except Exception as e:
logger.debug("Failed to save fact via fact_store: %s", e)
except ImportError:
logger.debug("fact_store not available — facts not persisted")
break
except Exception as exc:
logger.debug("Failed to save fact via fact_store: %s", exc)
return saved
@@ -553,10 +204,9 @@ def extract_and_save_facts(
Returns (extracted_facts, saved_count).
"""
facts = extract_facts_from_messages(messages)
if facts:
logger.info("Extracted %d normalized facts from conversation", len(facts))
logger.info("Extracted %d facts from conversation", len(facts))
saved = save_facts_to_store(facts, fact_store_fn)
logger.info("Saved %d/%d facts to store", saved, len(facts))
else:
@@ -566,105 +216,16 @@ def extract_and_save_facts(
def format_facts_summary(facts: List[ExtractedFact]) -> str:
"""Format extracted facts as a readable summary."""
if not facts:
return "No facts extracted."
by_category: dict[str, list[ExtractedFact]] = {}
for fact in facts:
by_category.setdefault(fact.category, []).append(fact)
by_category = {}
for f in facts:
by_category.setdefault(f.category, []).append(f)
lines = [f"Extracted {len(facts)} facts:", ""]
for category, category_facts in sorted(by_category.items()):
lines.append(f" {category}:")
for fact in category_facts:
suffix = f" [{fact.status}]" if fact.status != "active" else ""
lines.append(f" - {fact.content[:80]}{suffix}")
for cat, cat_facts in sorted(by_category.items()):
lines.append(f" {cat}:")
for f in cat_facts:
lines.append(f" - {f.content[:80]}")
return "\n".join(lines)
def _store_category(category: str) -> str:
if category.startswith("user_pref"):
return "user_pref"
if category.startswith("project"):
return "project"
if category.startswith("tool"):
return "tool"
return "general"
def _message_time(msg: Dict[str, Any]) -> Tuple[float, str]:
for key in ("created_at", "timestamp", "time"):
value = msg.get(key)
if value is None:
continue
if isinstance(value, (int, float)):
ts = float(value)
return ts, _iso_from_timestamp(ts)
if isinstance(value, str):
parsed = _parse_time_string(value)
if parsed is not None:
return parsed, _iso_from_timestamp(parsed) if "T" not in value else value.replace("+00:00", "Z")
return time.time(), value
now = time.time()
return now, _iso_from_timestamp(now)
def _parse_time_string(value: str) -> float | None:
text = value.strip()
if not text:
return None
try:
return float(text)
except ValueError:
pass
try:
normalized = text[:-1] + "+00:00" if text.endswith("Z") else text
return datetime.fromisoformat(normalized).timestamp()
except ValueError:
return None
def _iso_from_timestamp(value: float) -> str:
return datetime.fromtimestamp(value, tz=timezone.utc).isoformat().replace("+00:00", "Z")
def _normalize_value(value: str) -> str:
normalized = re.sub(r"[^a-z0-9]+", " ", value.lower())
normalized = re.sub(r"\s+", " ", normalized).strip()
return normalized
def _normalize_entity(value: str) -> str:
return _normalize_value(value).replace(" ", "_") or "entity"
def _canonical_key(entity: str, relation: str, normalized_value: str) -> str:
return f"{entity}|{relation}|{normalized_value}"
def _ordered_unique(*groups: List[str]) -> List[str]:
seen: set[str] = set()
ordered: list[str] = []
for group in groups:
for item in group:
if item and item not in seen:
seen.add(item)
ordered.append(item)
return ordered
def _merge_evidence(existing: List[Dict[str, Any]], incoming: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
seen: set[tuple[str, str, str]] = set()
merged: list[dict[str, Any]] = []
for item in list(existing) + list(incoming):
key = (
str(item.get("provenance", "")),
str(item.get("observed_at", "")),
str(item.get("source_text", "")),
)
if key in seen:
continue
seen.add(key)
merged.append(dict(item))
return merged

View File

@@ -0,0 +1,387 @@
# Morning Review Packet
Source epic: [EPIC: Morning review packet — Hermes harness features landed 2026-04-21](https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/949)
## Epic context
EPIC: Morning review packet — Hermes harness features landed 2026-04-21
Source: git log on upstream/main since 2026-04-21 00:00 EDT, plus the current local branch `burn/921-poka-yoke-hardcoded-paths` for the branch-only path-guard work.
Important review note:
- Validate upstream-landed features on `upstream/main` or a synced branch.
- Validate the path-guard work on `burn/921-poka-yoke-hardcoded-paths`.
This epic is a morning-review packet: one QA issue per feature cluster, each with concrete acceptance criteria and targeted tests or manual checks.
## Success criteria
- [ ] Every issue has a clear PASS / FAIL outcome.
- [ ] Test output or manual evidence is attached to each issue.
- [ ] Any drift between upstream/main and forge/main is called out explicitly.
## Sub-issues
### Upstream/main features landed 2026-04-21
- [ ] #950 [QA] Verify AI Gateway provider UX + attribution headers
- [ ] #951 [QA] Verify transport abstraction + AnthropicTransport wiring
- [ ] #952 [QA] Verify CLI voice beep toggle
- [ ] #953 [QA] Verify bundled skill scripts run out of the box
- [ ] #954 [QA] Verify maps skill guest_house / camp_site / bakery expansion
- [ ] #955 [QA] Verify KittenTTS local provider end-to-end
- [ ] #956 [QA] Verify numbered keyboard shortcuts for approval + clarify prompts
- [ ] #957 [QA] Verify optional adversarial-ux-test skill catalog flow
- [ ] #958 [QA] Verify /usage account limits in CLI + gateway
- [ ] #959 [QA] Verify OpenCode-Go curated catalog additions
- [ ] #960 [QA] Verify patch 'did you mean?' suggestions
- [ ] #961 [QA] Verify web dashboard update/restart action buttons
### Local branch-only work
- [ ] #962 [QA] Verify hardcoded-home path guard on burn/921 branch
## Summary
| Issue | State | Commits | Tests |
| --- | --- | --- | --- |
| #950 | open | 5 | 2 |
| #951 | open | 2 | 2 |
| #952 | open | 1 | 1 |
| #953 | open | 1 | 2 |
| #954 | open | 1 | 0 |
| #955 | open | 2 | 1 |
| #956 | open | 1 | 0 |
| #957 | open | 1 | 0 |
| #958 | open | 2 | 2 |
| #959 | open | 1 | 1 |
| #960 | open | 2 | 1 |
| #961 | closed | 1 | 0 |
| #962 | closed | 1 | 1 |
## #950 — [QA] Verify AI Gateway provider UX + attribution headers
State: open
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/950
### Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
### Commits
- `b11753879` — attribution default_headers for ai-gateway provider
- `700437440` — curated picker with live pricing
- `ac26a460f` — promote ai-gateway in provider picker ordering
- `5bb2d11b0` — auto-promote free Moonshot models
- `29f57ec95` — Vercel deep-link for API key creation
### Targeted tests
- `tests/hermes_cli/test_ai_gateway_models.py`
- `tests/run_agent/test_provider_attribution_headers.py`
### Tasks
- [ ] Open `hermes model` and verify `ai-gateway` appears near the top.
- [ ] Verify live pricing appears in the picker.
- [ ] Verify free Moonshot models are promoted.
- [ ] Trigger API-key setup flow and verify the Vercel deep link.
- [ ] Send one ai-gateway request and verify attribution headers are attached.
### Acceptance criteria
- [ ] UI ordering and pricing match the landed behavior.
- [ ] Attribution headers are present on ai-gateway requests.
- [ ] Targeted tests pass.
## #951 — [QA] Verify transport abstraction + AnthropicTransport wiring
State: open
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/951
### Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
### Commits
- `7ab5eebd0` — transport types + Anthropic normalize migration
- `731f4fbae` — transport ABC + AnthropicTransport wired to all paths
### Targeted tests
- `tests/agent/transports/test_types.py`
- `tests/agent/test_anthropic_normalize_v2.py`
### Tasks
- [ ] Verify plain-text Anthropic responses normalize correctly.
- [ ] Verify tool-call responses preserve IDs, names, and arguments.
- [ ] Verify reasoning/thinking is preserved separately from visible content.
- [ ] Verify finish_reason mapping remains correct across paths.
### Acceptance criteria
- [ ] Normalized response shape is stable.
- [ ] Tool-call and reasoning payloads survive normalization.
- [ ] Targeted tests pass.
## #952 — [QA] Verify CLI voice beep toggle
State: open
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/952
### Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
### Commits
- `b48ea41d2` — voice: add CLI beep toggle
### Targeted tests
- `tests/tools/test_voice_cli_integration.py`
### Tasks
- [ ] Enable the beep option in config and confirm voice mode emits the beep.
- [ ] Disable the option and confirm the same path is silent.
- [ ] Verify voice mode still strips markdown before speech output.
- [ ] Verify voice mode does not pollute conversation history with TTS-only text.
### Acceptance criteria
- [ ] Beep behavior is actually toggled by config.
- [ ] Existing voice/TTS integration behavior is not regressed.
- [ ] Targeted tests pass.
## #953 — [QA] Verify bundled skill scripts run out of the box
State: open
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/953
### Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
### Commits
- `328223576` — make bundled skill scripts runnable out of the box
### Targeted tests
- `tests/agent/test_skill_commands.py`
- `tests/tools/test_local_shell_init.py`
### Tasks
- [ ] Pick a bundled skill that ships a script and run it without manual chmod/PATH surgery.
- [ ] Verify local terminal execution resolves the installed skill script correctly.
- [ ] Verify local shell init still behaves correctly.
### Acceptance criteria
- [ ] Bundled skill scripts execute from the installed skill location with no manual prep.
- [ ] Local shell init remains healthy.
- [ ] Targeted tests pass.
## #954 — [QA] Verify maps skill guest_house / camp_site / bakery expansion
State: open
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/954
### Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
### Commits
- `c5a814b23` — maps: add guest_house, camp_site, and dual-key bakery lookup
### Tasks
- [ ] Use the maps skill to search for a guest house in a known populated area.
- [ ] Use the maps skill to search for a camp site in a known populated area.
- [ ] Use the maps skill to search for a bakery and verify both supported keys resolve correctly.
- [ ] Confirm results are sensible and non-empty.
### Acceptance criteria
- [ ] All three place types resolve correctly.
- [ ] Bakery lookup works through both supported keys.
- [ ] Manual evidence is attached in the issue.
## #955 — [QA] Verify KittenTTS local provider end-to-end
State: open
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/955
### Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
### Commits
- `1830ebfc5` — add KittenTTS provider
- `2d7ff9c5b` — complete KittenTTS integration across tools/setup/docs/tests
### Targeted tests
- `tests/tools/test_tts_kittentts.py`
### Tasks
- [ ] Configure TTS to use `kittentts`.
- [ ] Generate speech to `.wav` and verify playable output.
- [ ] Verify voice / speed / cleaned text are passed correctly.
- [ ] Generate repeated requests and verify model caching behavior.
- [ ] Generate a non-wav output and verify ffmpeg conversion path.
- [ ] Verify missing-package behavior returns a helpful error.
### Acceptance criteria
- [ ] KittenTTS works end-to-end when installed.
- [ ] Failure mode is operator-friendly when not installed.
- [ ] Targeted tests pass.
## #956 — [QA] Verify numbered keyboard shortcuts for approval + clarify prompts
State: open
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/956
### Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
### Commits
- `d1ed6f4fb` — CLI: add numbered keyboard shortcuts to approval and clarify prompts
### Tasks
- [ ] Trigger an approval prompt and choose an option with number keys.
- [ ] Trigger a clarify prompt and choose an option with number keys.
- [ ] Verify the correct option is submitted both times.
- [ ] Verify normal keyboard navigation still works.
### Acceptance criteria
- [ ] Number-key selection works for both prompt types.
- [ ] Legacy keyboard navigation is not broken.
- [ ] Manual evidence is attached in the issue.
## #957 — [QA] Verify optional adversarial-ux-test skill catalog flow
State: open
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/957
### Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
### Commits
- `e50e7f11b` — skills: add adversarial-ux-test optional skill
### Tasks
- [ ] Verify the optional skill appears in the optional skill catalog.
- [ ] Install or enable the skill.
- [ ] Load it successfully through Hermes.
- [ ] Disable or remove it and verify catalog state updates cleanly.
### Acceptance criteria
- [ ] Catalog listing is correct.
- [ ] Install / load / disable lifecycle works cleanly.
- [ ] Manual evidence is attached in the issue.
## #958 — [QA] Verify /usage account limits in CLI + gateway
State: open
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/958
### Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
### Commits
- `8a11b0a20` — per-provider account limits module
- `bcc5d7b67` — append account limits section in CLI and gateway
### Targeted tests
- `tests/test_account_usage.py`
- `tests/gateway/test_usage_command.py`
### Tasks
- [ ] Run `/usage` in CLI for a provider with account limits.
- [ ] Verify provider, remaining quota, total limit, and reset window render correctly.
- [ ] Run `/usage` through the gateway and verify the same section appears.
- [ ] Verify zero-value cache read/write sections stay hidden when appropriate.
### Acceptance criteria
- [ ] CLI and gateway both show the landed account-limits section correctly.
- [ ] Targeted tests pass.
## #959 — [QA] Verify OpenCode-Go curated catalog additions
State: open
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/959
### Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
### Commits
- `4fea1769d` — opencode-go: add Kimi K2.6 and Qwen3.5/3.6 Plus to curated catalog
### Targeted tests
- `tests/hermes_cli/test_opencode_go_in_model_list.py`
### Tasks
- [ ] With valid OpenCode-Go credentials, open `hermes model`.
- [ ] Verify Kimi K2.6 appears.
- [ ] Verify Qwen 3.5 Plus and 3.6 Plus appear.
- [ ] Unset credentials and verify the provider/catalog hides correctly.
### Acceptance criteria
- [ ] New curated models are present when credentials exist.
- [ ] Catalog visibility still respects credential gating.
- [ ] Targeted tests pass.
## #960 — [QA] Verify patch 'did you mean?' suggestions
State: open
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/960
### Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
### Commits
- `15abf4ed8` — add `did you mean?` feedback when patch fails to match
- `5e6427a42` — gate it to true no-match cases and extend to v4a / skill_manage
### Targeted tests
- `tests/tools/test_fuzzy_match.py`
### Tasks
- [ ] Intentionally run a replace/patch with a near-miss `old_string`.
- [ ] Verify the tool suggests a useful nearby line/context.
- [ ] Verify suggestions only appear on true no-match failures.
- [ ] Verify the behavior also works via file tools, v4a patching, and skill_manage.
### Acceptance criteria
- [ ] Suggestion quality is helpful, not noisy.
- [ ] Suggestions are correctly gated to no-match cases.
- [ ] Targeted tests pass.
## #961 — [QA] Verify web dashboard update/restart action buttons
State: closed
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/961
### Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
### Commits
- `fc21c1420` — add buttons to update Hermes and restart gateway
### Files touched
- `web/src/pages/StatusPage.tsx`
- `web/src/lib/api.ts`
- `web/src/i18n/en.ts`
### Tasks
- [ ] Open the Web UI status page and verify both buttons are present.
- [ ] Click Restart Gateway in a safe environment and verify running/output/success-or-failure states render.
- [ ] Click Update Hermes and verify the same action lifecycle.
- [ ] Verify the page remains responsive while actions are running.
### Acceptance criteria
- [ ] Both action buttons are present and wired.
- [ ] Action status polling and result rendering work end-to-end.
- [ ] Manual evidence is attached in the issue.
## #962 — [QA] Verify hardcoded-home path guard on burn/921 branch
State: closed
URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/962
### Branch / checkout
- Validate specifically on `burn/921-poka-yoke-hardcoded-paths` (not upstream/main).
### Commits
- `5dcb90531` — Poka-yoke: prevent hardcoded home-directory paths
### Targeted tests
- `tests/test_path_guard.py`
### Tasks
- [ ] Verify hardcoded `/Users/...` paths are rejected.
- [ ] Verify hardcoded `~/.hermes/...` paths are rejected in guarded contexts.
- [ ] Verify valid relative paths still pass.
- [ ] Verify appropriate absolute paths still pass where intended.
- [ ] Verify linting catches violations in non-test files.
### Acceptance criteria
- [ ] Guard blocks the dangerous patterns and preserves allowed ones.
- [ ] Targeted tests pass.

View File

@@ -356,57 +356,44 @@ class HolographicMemoryProvider(MemoryProvider):
# -- Auto-extraction (on_session_end) ------------------------------------
def _auto_extract_facts(self, messages: list) -> None:
from agent.session_compactor import evaluate_extraction_quality, extract_facts_from_messages
def _store_category(category: str) -> str:
if category.startswith("user_pref"):
return "user_pref"
if category.startswith("project"):
return "project"
if category.startswith("tool"):
return "tool"
return "general"
facts = extract_facts_from_messages(messages)
if not facts:
return
_PREF_PATTERNS = [
re.compile(r'\bI\s+(?:prefer|like|love|use|want|need)\s+(.+)', re.IGNORECASE),
re.compile(r'\bmy\s+(?:favorite|preferred|default)\s+\w+\s+is\s+(.+)', re.IGNORECASE),
re.compile(r'\bI\s+(?:always|never|usually)\s+(.+)', re.IGNORECASE),
]
_DECISION_PATTERNS = [
re.compile(r'\bwe\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+)', re.IGNORECASE),
re.compile(r'\bthe\s+project\s+(?:uses|needs|requires)\s+(.+)', re.IGNORECASE),
]
extracted = 0
for fact in facts:
try:
metadata = dict(fact.metadata)
metadata.setdefault("relation", fact.relation)
metadata.setdefault("value", fact.content)
metadata.setdefault("provenance", [fact.provenance])
metadata.setdefault("evidence", list(fact.evidence))
metadata.setdefault("observation_count", len(fact.evidence))
metadata.setdefault("duplicate_count", max(0, len(fact.evidence) - 1))
self._store.add_fact(
fact.content,
category=_store_category(fact.category),
tags=",".join(filter(None, [fact.entity, fact.relation, fact.status])),
canonical_key=fact.canonical_key,
metadata=metadata,
confidence=fact.confidence,
source_role=fact.source_role,
source_turn=fact.source_turn,
observed_at=fact.observed_at,
contradiction_group=fact.contradiction_group,
status=fact.status,
)
extracted += 1
except Exception as exc:
logger.debug("Structured auto-extract failed for %s: %s", fact.canonical_key, exc)
for msg in messages:
if msg.get("role") != "user":
continue
content = msg.get("content", "")
if not isinstance(content, str) or len(content) < 10:
continue
for pattern in _PREF_PATTERNS:
if pattern.search(content):
try:
self._store.add_fact(content[:400], category="user_pref")
extracted += 1
except Exception:
pass
break
for pattern in _DECISION_PATTERNS:
if pattern.search(content):
try:
self._store.add_fact(content[:400], category="project")
extracted += 1
except Exception:
pass
break
if extracted:
metrics = evaluate_extraction_quality(messages)
logger.info(
"Auto-extracted %d structured facts from conversation (raw=%d normalized=%d contradictions=%d)",
extracted,
metrics["raw_candidates"],
metrics["normalized_facts"],
metrics["contradiction_groups"],
)
logger.info("Auto-extracted %d facts from conversation", extracted)
# ---------------------------------------------------------------------------

View File

@@ -3,7 +3,6 @@ SQLite-backed fact store with entity resolution and trust scoring.
Single-user Hermes memory store plugin.
"""
import json
import re
import sqlite3
import threading
@@ -16,24 +15,16 @@ except ImportError:
_SCHEMA = """
CREATE TABLE IF NOT EXISTS facts (
fact_id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL UNIQUE,
category TEXT DEFAULT 'general',
tags TEXT DEFAULT '',
trust_score REAL DEFAULT 0.5,
retrieval_count INTEGER DEFAULT 0,
helpful_count INTEGER DEFAULT 0,
canonical_key TEXT DEFAULT '',
metadata_json TEXT DEFAULT '{}',
confidence REAL DEFAULT 0.5,
source_role TEXT DEFAULT '',
source_turn INTEGER DEFAULT -1,
observed_at TEXT DEFAULT '',
contradiction_group TEXT DEFAULT '',
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hrr_vector BLOB
fact_id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL UNIQUE,
category TEXT DEFAULT 'general',
tags TEXT DEFAULT '',
trust_score REAL DEFAULT 0.5,
retrieval_count INTEGER DEFAULT 0,
helpful_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hrr_vector BLOB
);
CREATE TABLE IF NOT EXISTS entities (
@@ -50,11 +41,9 @@ CREATE TABLE IF NOT EXISTS fact_entities (
PRIMARY KEY (fact_id, entity_id)
);
CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC);
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
CREATE INDEX IF NOT EXISTS idx_facts_canonical_key ON facts(canonical_key);
CREATE INDEX IF NOT EXISTS idx_facts_contradiction_group ON facts(contradiction_group);
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC);
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts
USING fts5(content, tags, content=facts, content_rowid=fact_id);
@@ -140,23 +129,10 @@ class MemoryStore:
"""Create tables, indexes, and triggers if they do not exist. Enable WAL mode."""
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.executescript(_SCHEMA)
# Migrate: add hrr_vector column if missing (safe for existing databases)
columns = {row[1] for row in self._conn.execute("PRAGMA table_info(facts)").fetchall()}
migrations = {
"hrr_vector": "ALTER TABLE facts ADD COLUMN hrr_vector BLOB",
"canonical_key": "ALTER TABLE facts ADD COLUMN canonical_key TEXT DEFAULT ''",
"metadata_json": "ALTER TABLE facts ADD COLUMN metadata_json TEXT DEFAULT '{}'",
"confidence": "ALTER TABLE facts ADD COLUMN confidence REAL DEFAULT 0.5",
"source_role": "ALTER TABLE facts ADD COLUMN source_role TEXT DEFAULT ''",
"source_turn": "ALTER TABLE facts ADD COLUMN source_turn INTEGER DEFAULT -1",
"observed_at": "ALTER TABLE facts ADD COLUMN observed_at TEXT DEFAULT ''",
"contradiction_group": "ALTER TABLE facts ADD COLUMN contradiction_group TEXT DEFAULT ''",
"status": "ALTER TABLE facts ADD COLUMN status TEXT DEFAULT 'active'",
}
for column, ddl in migrations.items():
if column not in columns:
self._conn.execute(ddl)
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_canonical_key ON facts(canonical_key)")
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_contradiction_group ON facts(contradiction_group)")
if "hrr_vector" not in columns:
self._conn.execute("ALTER TABLE facts ADD COLUMN hrr_vector BLOB")
self._conn.commit()
# ------------------------------------------------------------------
@@ -168,148 +144,41 @@ class MemoryStore:
content: str,
category: str = "general",
tags: str = "",
*,
canonical_key: str = "",
metadata: dict | None = None,
confidence: float | None = None,
source_role: str = "",
source_turn: int = -1,
observed_at: str = "",
contradiction_group: str = "",
status: str = "active",
) -> int:
"""Insert a fact and return its fact_id.
Exact duplicates are deduplicated by content. Near-duplicates are
normalized by canonical_key, with provenance/evidence merged into the
existing row. Contradictions sharing the same contradiction_group remain
stored as separate rows and are marked inspectably.
Deduplicates by content (UNIQUE constraint). On duplicate, returns
the existing fact_id without modifying the row. Extracts entities from
the content and links them to the fact.
"""
with self._lock:
content = content.strip()
if not content:
raise ValueError("content must not be empty")
metadata = dict(metadata or {})
canonical_key = canonical_key.strip()
contradiction_group = contradiction_group.strip()
observed_at = observed_at.strip()
status = status or "active"
trust_score = self.default_trust if confidence is None else _clamp_trust(confidence)
metadata_json = json.dumps(metadata, sort_keys=True)
if canonical_key:
existing = self._conn.execute(
"SELECT fact_id, metadata_json, trust_score, confidence, observed_at FROM facts WHERE canonical_key = ?",
(canonical_key,),
).fetchone()
if existing is not None:
merged_metadata = self._merge_metadata(existing["metadata_json"], metadata)
merged_trust = max(float(existing["trust_score"]), trust_score)
merged_observed_at = existing["observed_at"] or observed_at
if observed_at and merged_observed_at:
merged_observed_at = min(merged_observed_at, observed_at)
elif observed_at:
merged_observed_at = observed_at
self._conn.execute(
"""
UPDATE facts
SET metadata_json = ?,
trust_score = ?,
confidence = ?,
observed_at = ?,
updated_at = CURRENT_TIMESTAMP
WHERE fact_id = ?
""",
(
json.dumps(merged_metadata, sort_keys=True),
merged_trust,
max(float(existing["confidence"] or 0.0), confidence or trust_score),
merged_observed_at,
existing["fact_id"],
),
)
self._conn.commit()
return int(existing["fact_id"])
contradiction_rows = []
if contradiction_group:
contradiction_rows = self._conn.execute(
"""
SELECT fact_id, canonical_key, metadata_json
FROM facts
WHERE contradiction_group = ?
AND canonical_key != ?
""",
(contradiction_group, canonical_key),
).fetchall()
if contradiction_rows:
status = "contradiction"
metadata = dict(metadata)
metadata["status"] = "contradiction"
metadata["contradiction_group"] = contradiction_group
metadata["contradiction_keys"] = sorted(
{
canonical_key,
*[str(row["canonical_key"]) for row in contradiction_rows if row["canonical_key"]],
}
- {""}
)
metadata_json = json.dumps(metadata, sort_keys=True)
try:
cur = self._conn.execute(
"""
INSERT INTO facts (
content,
category,
tags,
trust_score,
canonical_key,
metadata_json,
confidence,
source_role,
source_turn,
observed_at,
contradiction_group,
status
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO facts (content, category, tags, trust_score)
VALUES (?, ?, ?, ?)
""",
(
content,
category,
tags,
trust_score,
canonical_key,
metadata_json,
confidence if confidence is not None else trust_score,
source_role,
source_turn,
observed_at,
contradiction_group,
status,
),
(content, category, tags, self.default_trust),
)
self._conn.commit()
fact_id: int = cur.lastrowid # type: ignore[assignment]
except sqlite3.IntegrityError:
# Duplicate content — return existing id
row = self._conn.execute(
"SELECT fact_id FROM facts WHERE content = ?", (content,)
).fetchone()
return int(row["fact_id"])
if contradiction_rows:
self._mark_contradictions(
contradiction_group=contradiction_group,
new_canonical_key=canonical_key,
existing_rows=contradiction_rows,
)
# Entity extraction and linking
for name in self._extract_entities(content):
entity_id = self._resolve_entity(name)
self._link_fact_entity(fact_id, entity_id)
# Compute HRR vector after entity linking
self._compute_hrr_vector(fact_id, content)
self._rebuild_bank(category)
@@ -342,9 +211,6 @@ class MemoryStore:
sql = f"""
SELECT f.fact_id, f.content, f.category, f.tags,
f.trust_score, f.retrieval_count, f.helpful_count,
f.canonical_key, f.metadata_json, f.confidence,
f.source_role, f.source_turn, f.observed_at,
f.contradiction_group, f.status,
f.created_at, f.updated_at
FROM facts f
JOIN facts_fts fts ON fts.rowid = f.fact_id
@@ -470,11 +336,7 @@ class MemoryStore:
sql = f"""
SELECT fact_id, content, category, tags, trust_score,
retrieval_count, helpful_count,
canonical_key, metadata_json, confidence,
source_role, source_turn, observed_at,
contradiction_group, status,
created_at, updated_at
retrieval_count, helpful_count, created_at, updated_at
FROM facts
WHERE trust_score >= ?
{category_clause}
@@ -525,89 +387,6 @@ class MemoryStore:
"helpful_count": row["helpful_count"] + helpful_increment,
}
# ------------------------------------------------------------------
# Metadata helpers
# ------------------------------------------------------------------
def _load_metadata(self, metadata_json: str | None) -> dict:
if not metadata_json:
return {}
try:
data = json.loads(metadata_json)
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _merge_metadata(self, existing_json: str | None, incoming: dict | None) -> dict:
existing = self._load_metadata(existing_json)
incoming = dict(incoming or {})
merged = dict(existing)
merged.update({k: v for k, v in incoming.items() if k not in {"provenance", "evidence", "observation_count", "duplicate_count", "contradiction_keys"}})
provenance = []
seen_provenance: set[str] = set()
for item in list(existing.get("provenance", [])) + list(incoming.get("provenance", [])):
if item and item not in seen_provenance:
seen_provenance.add(item)
provenance.append(item)
evidence = []
seen_evidence: set[tuple[str, str, str]] = set()
for item in list(existing.get("evidence", [])) + list(incoming.get("evidence", [])):
if not isinstance(item, dict):
continue
key = (
str(item.get("provenance", "")),
str(item.get("observed_at", "")),
str(item.get("source_text", "")),
)
if key in seen_evidence:
continue
seen_evidence.add(key)
evidence.append(dict(item))
observation_count = int(existing.get("observation_count", max(1, len(existing.get("evidence", [])) or 1)))
observation_count += int(incoming.get("observation_count", max(1, len(incoming.get("evidence", [])) or 1)))
contradiction_keys = []
seen_keys: set[str] = set()
for item in list(existing.get("contradiction_keys", [])) + list(incoming.get("contradiction_keys", [])):
if item and item not in seen_keys:
seen_keys.add(item)
contradiction_keys.append(item)
merged["provenance"] = provenance
merged["evidence"] = evidence
merged["observation_count"] = observation_count
merged["duplicate_count"] = max(0, observation_count - 1)
if contradiction_keys:
merged["contradiction_keys"] = contradiction_keys
return merged
def _mark_contradictions(self, contradiction_group: str, new_canonical_key: str, existing_rows: list[sqlite3.Row]) -> None:
for row in existing_rows:
metadata = self._load_metadata(row["metadata_json"])
keys = []
seen: set[str] = set()
for item in list(metadata.get("contradiction_keys", [])) + [new_canonical_key]:
if item and item not in seen:
seen.add(item)
keys.append(item)
metadata["status"] = "contradiction"
metadata["contradiction_group"] = contradiction_group
metadata["contradiction_keys"] = keys
self._conn.execute(
"""
UPDATE facts
SET status = 'contradiction',
metadata_json = ?,
updated_at = CURRENT_TIMESTAMP
WHERE fact_id = ?
""",
(json.dumps(metadata, sort_keys=True), row["fact_id"]),
)
self._conn.commit()
# ------------------------------------------------------------------
# Entity helpers
# ------------------------------------------------------------------
@@ -781,14 +560,8 @@ class MemoryStore:
# ------------------------------------------------------------------
def _row_to_dict(self, row: sqlite3.Row) -> dict:
"""Convert a sqlite3.Row to a plain dict with decoded metadata."""
data = dict(row)
metadata = self._load_metadata(data.get("metadata_json"))
if metadata:
data["metadata"] = metadata
data.setdefault("relation", metadata.get("relation"))
data.pop("metadata_json", None)
return data
"""Convert a sqlite3.Row to a plain dict."""
return dict(row)
def close(self) -> None:
"""Close the database connection."""

View File

@@ -0,0 +1,301 @@
#!/usr/bin/env python3
"""Build a morning review packet from a Gitea epic and its child QA issues.
This script fetches a parent epic plus its sub-issues, extracts the structured
sections from each QA issue body, and renders a single markdown packet suitable
for morning review.
Usage:
python scripts/morning_review_packet.py --epic-number 949
python scripts/morning_review_packet.py --epic-number 949 --children 950-962
python scripts/morning_review_packet.py --epic-number 949 --output docs/review_packets/hermes-harness-2026-04-21.md
"""
from __future__ import annotations
import argparse
import json
import os
import re
import urllib.request
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com"
DEFAULT_OWNER = "Timmy_Foundation"
DEFAULT_REPO = "hermes-agent"
DEFAULT_TOKEN_PATH = Path.home() / ".config" / "gitea" / "token"
@dataclass(frozen=True)
class CommitEvidence:
sha: str
summary: str
@dataclass
class ReviewIssue:
number: int
title: str
state: str
url: str
comments: int = 0
parent_issue: int | None = None
checkout_notes: list[str] = field(default_factory=list)
commits: list[CommitEvidence] = field(default_factory=list)
targeted_tests: list[str] = field(default_factory=list)
files_touched: list[str] = field(default_factory=list)
tasks: list[str] = field(default_factory=list)
acceptance_criteria: list[str] = field(default_factory=list)
def parse_issue_number_spec(spec: str) -> list[int]:
"""Parse a comma-separated issue list like ``950-952,955,962``."""
numbers: list[int] = []
seen: set[int] = set()
for chunk in (part.strip() for part in spec.split(",")):
if not chunk:
continue
if "-" in chunk:
start_str, end_str = (part.strip() for part in chunk.split("-", 1))
start = int(start_str)
end = int(end_str)
if end < start:
raise ValueError(f"Invalid descending issue range: {chunk}")
for number in range(start, end + 1):
if number not in seen:
numbers.append(number)
seen.add(number)
else:
number = int(chunk)
if number not in seen:
numbers.append(number)
seen.add(number)
return numbers
def _parse_sections(body: str) -> dict[str, list[str]]:
sections: dict[str, list[str]] = {}
current: str | None = None
for raw_line in body.splitlines():
line = raw_line.rstrip()
if line.startswith("## "):
current = line[3:].strip()
sections[current] = []
continue
if current is not None:
sections[current].append(line)
return sections
def _clean_bullet(line: str) -> str | None:
stripped = line.strip()
if not stripped:
return None
stripped = re.sub(r"^-\s*\[(?: |x|X)\]\s*", "", stripped)
stripped = re.sub(r"^-\s*", "", stripped)
return stripped.strip() or None
def _extract_bullets(lines: Iterable[str]) -> list[str]:
items: list[str] = []
for line in lines:
cleaned = _clean_bullet(line)
if cleaned:
items.append(cleaned)
return items
def _extract_parent_issue(body: str, sections: dict[str, list[str]]) -> int | None:
parent_lines = sections.get("Parent", [])
for line in parent_lines:
match = re.search(r"#(\d+)", line)
if match:
return int(match.group(1))
match = re.search(r"Linked to Epic\s+#(\d+)", body, flags=re.IGNORECASE)
if match:
return int(match.group(1))
return None
def _extract_commits(lines: Iterable[str]) -> list[CommitEvidence]:
commits: list[CommitEvidence] = []
for item in _extract_bullets(lines):
match = re.match(r"`([^`]+)`\s*(.*)", item)
if match:
commits.append(CommitEvidence(sha=match.group(1).strip(), summary=match.group(2).strip()))
else:
commits.append(CommitEvidence(sha="", summary=item))
return commits
def _strip_backticks(items: Iterable[str]) -> list[str]:
cleaned: list[str] = []
for item in items:
cleaned.append(item.replace("`", "").strip())
return cleaned
def discover_child_issue_numbers(epic_body: str) -> list[int]:
"""Discover sub-issue numbers from an epic body."""
sections = _parse_sections(epic_body)
sub_lines = sections.get("Sub-issues")
if not sub_lines:
return []
numbers: list[int] = []
seen: set[int] = set()
for line in sub_lines:
for match in re.finditer(r"#(\d+)", line):
number = int(match.group(1))
if number not in seen:
numbers.append(number)
seen.add(number)
return numbers
def parse_child_issue(issue: dict) -> ReviewIssue:
body = issue.get("body") or ""
sections = _parse_sections(body)
commit_lines = sections.get("Commits landed today", []) or sections.get("Commit landed today", [])
return ReviewIssue(
number=int(issue["number"]),
title=issue.get("title") or "",
state=(issue.get("state") or "unknown").lower(),
url=issue.get("html_url") or issue.get("url") or "",
comments=int(issue.get("comments") or 0),
parent_issue=_extract_parent_issue(body, sections),
checkout_notes=_extract_bullets(sections.get("Branch / checkout", [])),
commits=_extract_commits(commit_lines),
targeted_tests=_strip_backticks(_extract_bullets(sections.get("Targeted tests", []))),
files_touched=_strip_backticks(_extract_bullets(sections.get("Files touched", []))),
tasks=_extract_bullets(sections.get("Tasks", [])),
acceptance_criteria=_extract_bullets(sections.get("Acceptance Criteria", [])),
)
def build_packet_markdown(epic_issue: dict, child_issues: list[ReviewIssue]) -> str:
title = epic_issue.get("title") or f"Epic #{epic_issue.get('number')}"
url = epic_issue.get("html_url") or epic_issue.get("url") or ""
body = epic_issue.get("body") or ""
children = sorted(child_issues, key=lambda item: item.number)
lines: list[str] = []
lines.append("# Morning Review Packet")
lines.append("")
lines.append(f"Source epic: [{title}]({url})")
lines.append("")
lines.append("## Epic context")
lines.append("")
lines.append(title)
lines.append("")
for line in body.splitlines():
if line.strip():
lines.append(line)
else:
lines.append("")
lines.append("")
lines.append("## Summary")
lines.append("")
lines.append("| Issue | State | Commits | Tests |")
lines.append("| --- | --- | --- | --- |")
for child in children:
lines.append(
f"| #{child.number} | {child.state} | {len(child.commits)} | {len(child.targeted_tests)} |"
)
lines.append("")
for child in children:
lines.append(f"## #{child.number}{child.title}")
lines.append("")
lines.append(f"State: {child.state}")
lines.append(f"URL: {child.url}")
lines.append("")
if child.checkout_notes:
lines.append("### Branch / checkout")
for note in child.checkout_notes:
lines.append(f"- {note}")
lines.append("")
if child.commits:
lines.append("### Commits")
for commit in child.commits:
if commit.sha:
lines.append(f"- `{commit.sha}` — {commit.summary}")
else:
lines.append(f"- {commit.summary}")
lines.append("")
if child.targeted_tests:
lines.append("### Targeted tests")
for test_path in child.targeted_tests:
lines.append(f"- `{test_path}`")
lines.append("")
if child.files_touched:
lines.append("### Files touched")
for file_path in child.files_touched:
lines.append(f"- `{file_path}`")
lines.append("")
if child.tasks:
lines.append("### Tasks")
for task in child.tasks:
lines.append(f"- [ ] {task}")
lines.append("")
if child.acceptance_criteria:
lines.append("### Acceptance criteria")
for item in child.acceptance_criteria:
lines.append(f"- [ ] {item}")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def _resolve_token(explicit_token: str | None = None) -> str:
if explicit_token:
return explicit_token.strip()
env_token = os.getenv("GITEA_TOKEN")
if env_token:
return env_token.strip()
if DEFAULT_TOKEN_PATH.exists():
return DEFAULT_TOKEN_PATH.read_text().strip()
raise FileNotFoundError(f"No Gitea token found. Set GITEA_TOKEN or create {DEFAULT_TOKEN_PATH}")
def fetch_issue(base_url: str, owner: str, repo: str, number: int, token: str) -> dict:
url = f"{base_url.rstrip('/')}/api/v1/repos/{owner}/{repo}/issues/{number}"
request = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
with urllib.request.urlopen(request, timeout=30) as response:
return json.loads(response.read().decode())
def collect_child_issues(base_url: str, owner: str, repo: str, epic_issue: dict, token: str, children_spec: str | None = None) -> list[dict]:
numbers = parse_issue_number_spec(children_spec) if children_spec else discover_child_issue_numbers(epic_issue.get("body") or "")
return [fetch_issue(base_url, owner, repo, number, token) for number in numbers]
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Build a markdown morning review packet from a Gitea epic")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--owner", default=DEFAULT_OWNER)
parser.add_argument("--repo", default=DEFAULT_REPO)
parser.add_argument("--epic-number", type=int, required=True)
parser.add_argument("--children", help="Explicit issue list/ranges, e.g. 950-962")
parser.add_argument("--token", help="Gitea token (defaults to GITEA_TOKEN or ~/.config/gitea/token)")
parser.add_argument("--output", help="Write markdown packet to this path instead of stdout")
args = parser.parse_args(argv)
token = _resolve_token(args.token)
epic_issue = fetch_issue(args.base_url, args.owner, args.repo, args.epic_number, token)
child_issue_dicts = collect_child_issues(args.base_url, args.owner, args.repo, epic_issue, token, args.children)
packet = build_packet_markdown(epic_issue, [parse_child_issue(issue) for issue in child_issue_dicts])
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(packet)
else:
print(packet, end="")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,63 +0,0 @@
{
"preferences_and_duplicates": [
{
"role": "user",
"content": "Deploy via Ansible for production changes.",
"created_at": "2026-04-22T10:00:00Z"
},
{
"role": "user",
"content": "We deploy through Ansible on this repo.",
"created_at": "2026-04-22T10:01:00Z"
},
{
"role": "user",
"content": "Gitea-first for repository work.",
"created_at": "2026-04-22T10:02:00Z"
}
],
"operational_and_contradictions": [
{
"role": "user",
"content": "The BURN watchdog caps dispatches per cycle to 6.",
"created_at": "2026-04-22T11:00:00Z"
},
{
"role": "user",
"content": "The provider should stay openai-codex/gpt-5.4.",
"created_at": "2026-04-22T11:01:00Z"
},
{
"role": "user",
"content": "Correction: the provider should stay mimo-v2-pro.",
"created_at": "2026-04-22T11:02:00Z"
}
],
"mixed_transcript": [
{
"role": "user",
"content": "Deploy via Ansible for production changes.",
"created_at": "2026-04-22T10:00:00Z"
},
{
"role": "user",
"content": "We deploy through Ansible on this repo.",
"created_at": "2026-04-22T10:01:00Z"
},
{
"role": "user",
"content": "The BURN watchdog caps dispatches per cycle to 6.",
"created_at": "2026-04-22T11:00:00Z"
},
{
"role": "user",
"content": "The provider should stay openai-codex/gpt-5.4.",
"created_at": "2026-04-22T11:01:00Z"
},
{
"role": "user",
"content": "Correction: the provider should stay mimo-v2-pro.",
"created_at": "2026-04-22T11:02:00Z"
}
]
}

View File

@@ -1,50 +0,0 @@
"""Integration tests for holographic auto-extraction with structured fact persistence."""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
from plugins.memory.holographic import HolographicMemoryProvider
_FIXTURE_PATH = Path(__file__).resolve().parents[2] / "fixtures" / "memory_extraction_fragments.json"
def _load_fixture(name: str):
return json.loads(_FIXTURE_PATH.read_text())[name]
class TestHolographicAutoExtract:
def test_auto_extract_persists_structured_metadata_and_normalizes_duplicates(self, tmp_path):
provider = HolographicMemoryProvider(
config={
"db_path": str(tmp_path / "memory_store.db"),
"auto_extract": True,
"default_trust": 0.5,
}
)
provider.initialize("test-session")
messages = _load_fixture("mixed_transcript")
provider.on_session_end(messages)
provider.on_session_end(messages)
facts = provider._store.list_facts(min_trust=0.0, limit=20)
deploy_facts = [f for f in facts if f.get("relation") == "workflow.deploy_method"]
provider_facts = [f for f in facts if f.get("contradiction_group") == "config.provider"]
assert len(deploy_facts) == 1
assert deploy_facts[0]["metadata"]["duplicate_count"] >= 3
assert deploy_facts[0]["observed_at"] == "2026-04-22T10:00:00Z"
assert deploy_facts[0]["metadata"]["provenance"] == [
"conversation:user:0",
"conversation:user:1",
]
assert len(provider_facts) == 2
assert {f["status"] for f in provider_facts} == {"contradiction"}
assert {f["metadata"]["value"] for f in provider_facts} == {
"openai-codex/gpt-5.4",
"mimo-v2-pro",
}

View File

@@ -0,0 +1,162 @@
from pathlib import Path
import sys
SCRIPT_DIR = Path(__file__).resolve().parents[1] / "scripts"
sys.path.insert(0, str(SCRIPT_DIR))
import morning_review_packet as mrp
EPIC_BODY = """Source: git log on upstream/main since 2026-04-21 00:00 EDT.
## Success criteria
- [ ] Every issue has a clear PASS / FAIL outcome.
## Sub-issues
- [ ] #950 [QA] Verify AI Gateway provider UX + attribution headers
- [ ] #951 [QA] Verify transport abstraction + AnthropicTransport wiring
- [x] #962 [QA] Verify hardcoded-home path guard on burn/921 branch
"""
CHILD_BODY_PLURAL = """## Parent
#949
## Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
## Commits landed today
- `b11753879` attribution default_headers for ai-gateway provider
- `700437440` curated picker with live pricing
## Targeted tests
- `tests/hermes_cli/test_ai_gateway_models.py`
- `tests/run_agent/test_provider_attribution_headers.py`
## Tasks
- [ ] Verify the picker ordering.
- [ ] Verify attribution headers.
## Acceptance Criteria
- [ ] Picker shows AI Gateway prominently.
- [ ] Headers appear on OpenRouter calls.
"""
CHILD_BODY_SINGULAR = """## Parent
#949
## Branch / checkout
- Validate on `upstream/main` or an equivalent synced checkout.
## Commit landed today
- `fc21c1420` add buttons to update Hermes and restart gateway
## Files touched
- `web/src/pages/StatusPage.tsx`
- `web/src/lib/api.ts`
- `web/src/i18n/en.ts`
## Tasks
- [ ] Open the Web UI status page and verify both buttons are present.
- [ ] Click Restart Gateway in a safe environment.
"""
def test_discover_child_issue_numbers_from_epic_body():
assert mrp.discover_child_issue_numbers(EPIC_BODY) == [950, 951, 962]
def test_parse_issue_number_spec_supports_ranges_and_lists():
assert mrp.parse_issue_number_spec("950-952,955,962") == [950, 951, 952, 955, 962]
def test_parse_child_issue_extracts_structured_sections():
issue = {
"number": 950,
"title": "[QA] Verify AI Gateway provider UX + attribution headers",
"state": "open",
"html_url": "https://forge.example/950",
"comments": 0,
"body": CHILD_BODY_PLURAL,
}
parsed = mrp.parse_child_issue(issue)
assert parsed.number == 950
assert parsed.parent_issue == 949
assert parsed.checkout_notes == ["Validate on `upstream/main` or an equivalent synced checkout."]
assert [c.sha for c in parsed.commits] == ["b11753879", "700437440"]
assert parsed.targeted_tests == [
"tests/hermes_cli/test_ai_gateway_models.py",
"tests/run_agent/test_provider_attribution_headers.py",
]
assert parsed.tasks == [
"Verify the picker ordering.",
"Verify attribution headers.",
]
assert parsed.acceptance_criteria == [
"Picker shows AI Gateway prominently.",
"Headers appear on OpenRouter calls.",
]
def test_parse_child_issue_handles_singular_commit_heading_and_files_touched():
issue = {
"number": 961,
"title": "[QA] Verify web dashboard update/restart action buttons",
"state": "closed",
"html_url": "https://forge.example/961",
"comments": 16,
"body": CHILD_BODY_SINGULAR,
}
parsed = mrp.parse_child_issue(issue)
assert [c.sha for c in parsed.commits] == ["fc21c1420"]
assert parsed.files_touched == [
"web/src/pages/StatusPage.tsx",
"web/src/lib/api.ts",
"web/src/i18n/en.ts",
]
assert parsed.tasks == [
"Open the Web UI status page and verify both buttons are present.",
"Click Restart Gateway in a safe environment.",
]
def test_build_packet_markdown_renders_summary_and_details():
epic_issue = {
"number": 949,
"title": "EPIC: Morning review packet — Hermes harness features landed 2026-04-21",
"state": "open",
"html_url": "https://forge.example/949",
"body": EPIC_BODY,
}
child_a = mrp.parse_child_issue({
"number": 950,
"title": "[QA] Verify AI Gateway provider UX + attribution headers",
"state": "open",
"html_url": "https://forge.example/950",
"comments": 0,
"body": CHILD_BODY_PLURAL,
})
child_b = mrp.parse_child_issue({
"number": 961,
"title": "[QA] Verify web dashboard update/restart action buttons",
"state": "closed",
"html_url": "https://forge.example/961",
"comments": 16,
"body": CHILD_BODY_SINGULAR,
})
markdown = mrp.build_packet_markdown(epic_issue, [child_a, child_b])
assert "# Morning Review Packet" in markdown
assert "EPIC: Morning review packet — Hermes harness features landed 2026-04-21" in markdown
assert "| #950 | open | 2 | 2 |" in markdown
assert "| #961 | closed | 1 | 0 |" in markdown
assert "## #950 — [QA] Verify AI Gateway provider UX + attribution headers" in markdown
assert "## #961 — [QA] Verify web dashboard update/restart action buttons" in markdown
assert "`b11753879` — attribution default_headers for ai-gateway provider" in markdown
assert "`web/src/pages/StatusPage.tsx`" in markdown

View File

@@ -1,6 +1,6 @@
"""Tests for session compaction with fact extraction."""
import json
import pytest
import sys
from pathlib import Path
@@ -8,19 +8,12 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from agent.session_compactor import (
ExtractedFact,
evaluate_extraction_quality,
extract_and_save_facts,
extract_facts_from_messages,
format_facts_summary,
save_facts_to_store,
extract_and_save_facts,
format_facts_summary,
)
_FIXTURE_PATH = Path(__file__).resolve().parent / "fixtures" / "memory_extraction_fragments.json"
def _load_fixture(name: str):
return json.loads(_FIXTURE_PATH.read_text())[name]
class TestFactExtraction:
def test_extract_preference(self):
@@ -67,48 +60,14 @@ class TestFactExtraction:
{"role": "user", "content": "I prefer Python."},
]
facts = extract_facts_from_messages(messages)
# Should deduplicate
python_facts = [f for f in facts if "Python" in f.content]
assert len(python_facts) == 1
def test_structured_fact_preserves_provenance_and_temporal_metadata(self):
facts = extract_facts_from_messages(_load_fixture("preferences_and_duplicates"))
deploy_fact = next(f for f in facts if f.relation == "workflow.deploy_method")
assert deploy_fact.source_role == "user"
assert deploy_fact.source_turn == 0
assert deploy_fact.observed_at == "2026-04-22T10:00:00Z"
assert deploy_fact.provenance == "conversation:user:0"
assert deploy_fact.canonical_key
assert deploy_fact.evidence
assert deploy_fact.evidence[0]["source_text"].startswith("Deploy via Ansible")
def test_near_duplicate_facts_are_normalized_into_one_canonical_fact(self):
facts = extract_facts_from_messages(_load_fixture("preferences_and_duplicates"))
deploy_facts = [f for f in facts if f.relation == "workflow.deploy_method"]
assert len(deploy_facts) == 1
assert len(deploy_facts[0].evidence) == 2
assert deploy_facts[0].metadata["duplicate_count"] == 1
def test_contradictory_facts_are_preserved_for_unique_slots(self):
facts = extract_facts_from_messages(_load_fixture("operational_and_contradictions"))
provider_facts = [f for f in facts if f.contradiction_group == "config.provider"]
assert len(provider_facts) == 2
assert {f.status for f in provider_facts} == {"contradiction"}
assert {f.normalized_content for f in provider_facts} == {
"openai codex gpt 5 4",
"mimo v2 pro",
}
def test_quality_evaluation_reports_noise_reduction(self):
metrics = evaluate_extraction_quality(_load_fixture("mixed_transcript"))
assert metrics["raw_candidates"] > metrics["normalized_facts"]
assert metrics["noise_reduction"] > 0
assert metrics["contradiction_groups"] == 1
class TestSaveFacts:
def test_save_with_callback(self):
saved = []
def mock_save(category, entity, content, trust):
saved.append({"category": category, "content": content})
@@ -117,38 +76,6 @@ class TestSaveFacts:
assert count == 1
assert len(saved) == 1
def test_save_with_extended_callback_metadata(self):
saved = []
def mock_save(category, entity, content, trust, **kwargs):
saved.append({
"category": category,
"entity": entity,
"content": content,
"trust": trust,
**kwargs,
})
fact = ExtractedFact(
"project.operational",
"watchdog",
"BURN watchdog caps dispatches per cycle to 6",
0.9,
2,
source_role="user",
observed_at="2026-04-22T11:00:00Z",
provenance="conversation:user:2",
canonical_key="project.operational|watchdog|dispatch_cap|6",
relation="fleet.dispatch_cap",
contradiction_group="fleet.dispatch_cap",
metadata={"duplicate_count": 0},
)
count = save_facts_to_store([fact], fact_store_fn=mock_save)
assert count == 1
assert saved[0]["canonical_key"] == fact.canonical_key
assert saved[0]["observed_at"] == "2026-04-22T11:00:00Z"
assert saved[0]["metadata"]["duplicate_count"] == 0
class TestFormatSummary:
def test_empty(self):