Compare commits

..

4 Commits

Author SHA1 Message Date
Alexander Whitestone
e28d16b324 test: cover patch did-you-mean end-to-end (#960)
All checks were successful
Lint / lint (pull_request) Successful in 18s
- add focused QA tests for replace-mode rich hints without legacy generic hints
- verify ambiguous replace failures do not show did-you-mean noise
- verify V4A patch validation surfaces rich hints
- verify skill patching surfaces rich hints on true no-match failures
2026-04-22 10:43:34 -04:00
Alexander Whitestone
bc32047610 wip: fix patch did-you-mean dependencies (#960)
- restore escape-drift guard needed by current fuzzy-match tests
- import missing typing symbols in file_tools after porting patch hint logic
2026-04-22 10:41:29 -04:00
Teknium
3a24420d7d fix(patch): gate 'did you mean?' to no-match + extend to v4a/skill_manage
Follow-ups on top of @teyrebaz33's cherry-picked commit:

1. New shared helper format_no_match_hint() in fuzzy_match.py with a
   startswith('Could not find') gate so the snippet only appends to
   genuine no-match errors — not to 'Found N matches' (ambiguous),
   'Escape-drift detected', or 'identical strings' errors, which would
   all mislead the model.

2. file_tools.patch_tool suppresses the legacy generic '[Hint: old_string
   not found...]' string when the rich 'Did you mean?' snippet is
   already attached — no more double-hint.

3. Wire the same helper into patch_parser.py (V4A patch mode, both
   _validate_operations and _apply_update) and skill_manager_tool.py so
   all three fuzzy callers surface the hint consistently.

Tests: 7 new gating tests in TestFormatNoMatchHint cover every error
class (ambiguous, drift, identical, non-zero match count, None error,
no similar content, happy path). 34/34 test_fuzzy_match, 96/96
test_file_tools + test_patch_parser + test_skill_manager_tool pass.
E2E verified across all four scenarios: no-match-with-similar,
no-match-no-similar, ambiguous, success. V4A mode confirmed
end-to-end with a non-matching hunk.

(cherry picked from commit 5e6427a42c)
2026-04-22 10:30:00 -04:00
teyrebaz33
d14c1c5a56 feat(patch): add 'did you mean?' feedback when patch fails to match
When patch_replace() cannot find old_string in a file, the error message
now includes the closest matching lines from the file with line numbers
and context. This helps the LLM self-correct without a separate read_file
call.

Implements Phase 1 of #536: enhanced patch error feedback with no
architectural changes.

- tools/fuzzy_match.py: new find_closest_lines() using SequenceMatcher
- tools/file_operations.py: attach closest-lines hint to patch errors
- tests/tools/test_fuzzy_match.py: 5 new tests for find_closest_lines

(cherry picked from commit 15abf4ed8f)
2026-04-22 10:28:40 -04:00
13 changed files with 680 additions and 1088 deletions

View File

@@ -1,546 +1,197 @@
"""Session compaction with structured fact extraction.
"""Session compaction with fact extraction.
Before compressing conversation context, extract durable facts with enough
structure to survive retrieval: source/provenance, temporal anchors,
normalized canonical keys, and contradiction groups.
Before compressing conversation context, extracts durable facts
(user preferences, corrections, project details) and saves them
to the fact store so they survive compression.
Usage:
from agent.session_compactor import extract_and_save_facts
facts = extract_and_save_facts(messages)
"""
from __future__ import annotations
import json
import logging
import re
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
_DEPLOY_METHOD_RE = re.compile(r"\bdeploy(?:ing)?\s+(?:via|through|with)\s+([A-Za-z0-9_./+-]+)", re.IGNORECASE)
_WATCHDOG_CAP_RE = re.compile(
r"\b(?:the\s+)?([A-Za-z0-9_-]+(?:\s+watchdog)?)\s+(?:caps|limits)\s+dispatches(?:\s+per\s+cycle)?\s+to\s+([0-9]+)",
re.IGNORECASE,
)
_PROVIDER_RE = re.compile(
r"\bprovider\s+(?:is|should\s+stay|should\s+be|needs\s+to\s+be)\s+([A-Za-z0-9._/-]+)",
re.IGNORECASE,
)
_MODEL_RE = re.compile(
r"\bmodel\s+(?:is|should\s+stay|should\s+be|needs\s+to\s+be)\s+([A-Za-z0-9._:/-]+)",
re.IGNORECASE,
)
_PORT_RE = re.compile(r"\bport\s+(?:is|should\s+be)\s+([0-9]+)", re.IGNORECASE)
_PROJECT_USES_RE = re.compile(r"\b(?:the\s+)?project\s+(?:uses|needs|requires)\s+(.+?)(?:[.!?]|$)", re.IGNORECASE)
_PREFERENCE_RE = re.compile(r"\bI\s+(?:prefer|like|want|need)\s+(.+?)(?:[.!?]|$)", re.IGNORECASE)
_CONSTRAINT_RE = re.compile(r"\b(?:do\s+not|don't)\s+(?:ever\s+|again\s+)?(.+?)(?:[.!?]|$)", re.IGNORECASE)
_DECISION_RE = re.compile(r"\b(?:we|the\s+team)\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+?)(?:[.!?]|$)", re.IGNORECASE)
@dataclass
class ExtractedFact:
"""A durable fact extracted from conversation."""
category: str
entity: str
content: str
confidence: float
source_turn: int
"""A fact extracted from conversation."""
category: str # "user_pref", "correction", "project", "tool_quirk", "general"
entity: str # what the fact is about
content: str # the fact itself
confidence: float # 0.0-1.0
source_turn: int # which message turn it came from
timestamp: float = 0.0
source_role: str = "user"
source_text: str = ""
normalized_content: str = ""
canonical_key: str = ""
relation: str = "general"
contradiction_group: str = ""
status: str = "active"
provenance: str = ""
observed_at: str = ""
evidence: List[Dict[str, Any]] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if not self.timestamp:
self.timestamp = time.time()
if not self.observed_at:
self.observed_at = _iso_from_timestamp(self.timestamp)
if not self.normalized_content:
self.normalized_content = _normalize_value(self.content)
if not self.provenance:
self.provenance = f"conversation:{self.source_role}:{self.source_turn}"
if not self.canonical_key:
self.canonical_key = _canonical_key(self.entity, self.relation, self.normalized_content)
if not self.evidence:
self.evidence = [
{
"source_role": self.source_role,
"source_turn": self.source_turn,
"source_text": self.source_text or self.content,
"observed_at": self.observed_at,
"provenance": self.provenance,
}
]
self.metadata = dict(self.metadata or {})
self.metadata.setdefault("entity", self.entity)
self.metadata.setdefault("relation", self.relation)
self.metadata.setdefault("value", self.content)
self.metadata.setdefault("normalized_value", self.normalized_content)
self.metadata.setdefault("provenance", [self.provenance])
self.metadata.setdefault("evidence", list(self.evidence))
self.metadata.setdefault("observation_count", len(self.evidence))
self.metadata.setdefault("duplicate_count", max(0, self.metadata["observation_count"] - 1))
if self.contradiction_group:
self.metadata.setdefault("contradiction_group", self.contradiction_group)
self.metadata.setdefault("status", self.status)
# Patterns that indicate user preferences
_PREFERENCE_PATTERNS = [
(r"(?:I|we) (?:prefer|like|want|need) (.+?)(?:\.|$)", "preference"),
(r"(?:always|never) (?:use|do|run|deploy) (.+?)(?:\.|$)", "preference"),
(r"(?:my|our) (?:default|preferred|usual) (.+?) (?:is|are) (.+?)(?:\.|$)", "preference"),
(r"(?:make sure|ensure|remember) (?:to|that) (.+?)(?:\.|$)", "instruction"),
(r"(?:don'?t|do not) (?:ever|ever again) (.+?)(?:\.|$)", "constraint"),
]
# Patterns that indicate corrections
_CORRECTION_PATTERNS = [
(r"(?:actually|no[, ]|wait[, ]|correction[: ]|sorry[, ]) (.+)", "correction"),
(r"(?:I meant|what I meant was|the correct) (.+?)(?:\.|$)", "correction"),
(r"(?:it'?s|its) (?:not|shouldn'?t be|wrong) (.+?)(?:\.|$)", "correction"),
]
# Patterns that indicate project/tool facts
_PROJECT_PATTERNS = [
(r"(?:the |our )?(?:project|repo|codebase|code) (?:is|uses|needs|requires) (.+?)(?:\.|$)", "project"),
(r"(?:deploy|push|commit) (?:to|on) (.+?)(?:\.|$)", "project"),
(r"(?:this|that|the) (?:server|host|machine|VPS) (?:is|runs|has) (.+?)(?:\.|$)", "infrastructure"),
(r"(?:model|provider|engine) (?:is|should be|needs to be) (.+?)(?:\.|$)", "config"),
]
def extract_facts_from_messages(messages: List[Dict[str, Any]]) -> List[ExtractedFact]:
"""Extract durable facts from conversation messages.
Scans conversation turns for preferences, decisions, corrections, and
operational state. Raw candidates are normalized into canonical facts so
near-duplicates merge and contradictions remain inspectable.
Scans user messages for preferences, corrections, project facts,
and infrastructure details that should survive compression.
"""
facts = []
seen_contents = set()
raw_candidates: list[ExtractedFact] = []
for turn_idx, msg in enumerate(messages):
role = msg.get("role", "")
content = msg.get("content", "")
if role not in {"user", "assistant"}:
# Only scan user messages and assistant responses with corrections
if role not in ("user", "assistant"):
continue
if not content or not isinstance(content, str):
continue
if len(content) < 10:
continue
# Skip tool results and system messages
if role == "assistant" and msg.get("tool_calls"):
continue
if not isinstance(content, str) or len(content.strip()) < 10:
continue
timestamp, observed_at = _message_time(msg)
raw_candidates.extend(
_extract_from_text(
content.strip(),
turn_idx=turn_idx,
role=role,
timestamp=timestamp,
observed_at=observed_at,
)
)
extracted = _extract_from_text(content, turn_idx, role)
return _normalize_candidates(raw_candidates)
def evaluate_extraction_quality(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Return before/after metrics for raw vs normalized extraction quality."""
raw_candidates: list[ExtractedFact] = []
for turn_idx, msg in enumerate(messages):
role = msg.get("role", "")
content = msg.get("content", "")
if role not in {"user", "assistant"}:
continue
if role == "assistant" and msg.get("tool_calls"):
continue
if not isinstance(content, str) or len(content.strip()) < 10:
continue
timestamp, observed_at = _message_time(msg)
raw_candidates.extend(
_extract_from_text(
content.strip(),
turn_idx=turn_idx,
role=role,
timestamp=timestamp,
observed_at=observed_at,
)
)
normalized = _normalize_candidates(raw_candidates)
raw_count = len(raw_candidates)
normalized_count = len(normalized)
contradiction_groups = {
fact.contradiction_group
for fact in normalized
if fact.status == "contradiction" and fact.contradiction_group
}
duplicate_count = max(0, raw_count - normalized_count)
noise_reduction = (duplicate_count / raw_count) if raw_count else 0.0
return {
"raw_candidates": raw_count,
"normalized_facts": normalized_count,
"duplicates_merged": duplicate_count,
"contradiction_groups": len(contradiction_groups),
"noise_reduction": round(noise_reduction, 3),
}
def _extract_from_text(
text: str,
*,
turn_idx: int,
role: str,
timestamp: float,
observed_at: str,
) -> List[ExtractedFact]:
"""Extract raw fact candidates from a single text block."""
facts: list[ExtractedFact] = []
if role != "user":
return facts
deploy_match = _DEPLOY_METHOD_RE.search(text)
if deploy_match:
method = deploy_match.group(1).strip()
facts.append(
_build_fact(
category="project.decision",
entity="project",
relation="workflow.deploy_method",
value=method,
content=f"Deploy via {method}",
confidence=0.88,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
watchdog_match = _WATCHDOG_CAP_RE.search(text)
if watchdog_match:
watchdog = watchdog_match.group(1).strip()
cap = watchdog_match.group(2).strip()
facts.append(
_build_fact(
category="project.operational",
entity=_normalize_entity(watchdog),
relation="fleet.dispatch_cap",
value=cap,
content=f"{watchdog} caps dispatches per cycle to {cap}",
confidence=0.92,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
provider_match = _PROVIDER_RE.search(text)
if provider_match:
provider = provider_match.group(1).strip()
facts.append(
_build_fact(
category="project.config",
entity="project",
relation="config.provider",
value=provider,
content=f"Provider should stay {provider}",
confidence=0.91,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
model_match = _MODEL_RE.search(text)
if model_match:
model = model_match.group(1).strip()
facts.append(
_build_fact(
category="project.config",
entity="project",
relation="config.model",
value=model,
content=f"Model should stay {model}",
confidence=0.9,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
port_match = _PORT_RE.search(text)
if port_match:
port = port_match.group(1).strip()
facts.append(
_build_fact(
category="project.config",
entity="project",
relation="config.port",
value=port,
content=f"Port is {port}",
confidence=0.9,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
project_match = _PROJECT_USES_RE.search(text)
if project_match:
value = project_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="project.stack",
entity="project",
relation="project.stack",
value=value,
content=f"Project uses {value}",
confidence=0.74,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
preference_match = _PREFERENCE_RE.search(text)
if preference_match:
value = preference_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="user_pref.preference",
entity="user",
relation="user.preference",
value=value,
content=value,
confidence=0.72,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
constraint_match = _CONSTRAINT_RE.search(text)
if constraint_match:
value = constraint_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="user_pref.constraint",
entity="user",
relation="user.constraint",
value=value,
content=f"Do not {value}",
confidence=0.82,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
decision_match = _DECISION_RE.search(text)
if decision_match:
value = decision_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="project.decision",
entity="project",
relation="project.decision",
value=value,
content=f"Decision: {value}",
confidence=0.79,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
# Deduplicate by content
for fact in extracted:
key = f"{fact.category}:{fact.content[:100]}"
if key not in seen_contents:
seen_contents.add(key)
facts.append(fact)
return facts
def _build_fact(
*,
category: str,
entity: str,
relation: str,
value: str,
content: str,
confidence: float,
source_turn: int,
source_role: str,
source_text: str,
timestamp: float,
observed_at: str,
unique_slot: bool,
) -> ExtractedFact:
normalized_value = _normalize_value(value.rstrip(".!?"))
value = value.rstrip(".!?")
content = content.rstrip(".!?")
provenance = f"conversation:{source_role}:{source_turn}"
contradiction_group = relation if unique_slot else ""
evidence = [
{
"source_role": source_role,
"source_turn": source_turn,
"source_text": source_text,
"observed_at": observed_at,
"provenance": provenance,
}
]
metadata = {
"entity": entity,
"relation": relation,
"value": value,
"normalized_value": normalized_value,
"provenance": [provenance],
"evidence": list(evidence),
"observation_count": 1,
"duplicate_count": 0,
"status": "active",
}
if contradiction_group:
metadata["contradiction_group"] = contradiction_group
return ExtractedFact(
category=category,
entity=entity,
content=content,
confidence=confidence,
source_turn=source_turn,
timestamp=timestamp,
source_role=source_role,
source_text=source_text,
normalized_content=normalized_value,
canonical_key=_canonical_key(entity, relation, normalized_value),
relation=relation,
contradiction_group=contradiction_group,
status="active",
provenance=provenance,
observed_at=observed_at,
evidence=evidence,
metadata=metadata,
)
def _extract_from_text(text: str, turn_idx: int, role: str) -> List[ExtractedFact]:
"""Extract facts from a single text block."""
facts = []
timestamp = time.time()
# Clean text for pattern matching
clean = text.strip()
def _normalize_candidates(candidates: List[ExtractedFact]) -> List[ExtractedFact]:
"""Merge duplicates and mark contradictions while preserving evidence."""
# User preference patterns (from user messages)
if role == "user":
for pattern, subcategory in _PREFERENCE_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"user_pref.{subcategory}",
entity="user",
content=content[:200],
confidence=0.7,
source_turn=turn_idx,
timestamp=timestamp,
))
by_key: dict[str, ExtractedFact] = {}
contradiction_groups: dict[str, list[ExtractedFact]] = {}
# Correction patterns (from user messages)
if role == "user":
for pattern, subcategory in _CORRECTION_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"correction.{subcategory}",
entity="user",
content=content[:200],
confidence=0.8,
source_turn=turn_idx,
timestamp=timestamp,
))
for candidate in candidates:
existing = by_key.get(candidate.canonical_key)
if existing is not None:
by_key[candidate.canonical_key] = _merge_fact(existing, candidate)
continue
# Project/infrastructure patterns (from both user and assistant)
for pattern, subcategory in _PROJECT_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"project.{subcategory}",
entity=subcategory,
content=content[:200],
confidence=0.6,
source_turn=turn_idx,
timestamp=timestamp,
))
by_key[candidate.canonical_key] = candidate
if candidate.contradiction_group:
contradiction_groups.setdefault(candidate.contradiction_group, []).append(candidate)
for group, facts in contradiction_groups.items():
canonical_keys = {fact.canonical_key for fact in facts}
if len(canonical_keys) <= 1:
continue
for fact in facts:
fact.status = "contradiction"
fact.metadata["status"] = "contradiction"
fact.metadata["contradiction_group"] = group
fact.metadata["contradiction_keys"] = sorted(canonical_keys - {fact.canonical_key})
return sorted(by_key.values(), key=lambda fact: (fact.source_turn, fact.timestamp, fact.canonical_key))
def _merge_fact(existing: ExtractedFact, incoming: ExtractedFact) -> ExtractedFact:
existing.confidence = max(existing.confidence, incoming.confidence)
existing.timestamp = min(existing.timestamp, incoming.timestamp)
existing.source_turn = min(existing.source_turn, incoming.source_turn)
if not existing.observed_at or (incoming.observed_at and incoming.observed_at < existing.observed_at):
existing.observed_at = incoming.observed_at
existing.provenance = min(existing.provenance, incoming.provenance)
provenance = _ordered_unique(existing.metadata.get("provenance", []), incoming.metadata.get("provenance", []))
evidence = _merge_evidence(existing.metadata.get("evidence", []), incoming.metadata.get("evidence", []))
observation_count = int(existing.metadata.get("observation_count", len(existing.evidence) or 1))
observation_count += int(incoming.metadata.get("observation_count", len(incoming.evidence) or 1))
existing.evidence = evidence
existing.metadata["provenance"] = provenance
existing.metadata["evidence"] = evidence
existing.metadata["observation_count"] = observation_count
existing.metadata["duplicate_count"] = max(0, observation_count - 1)
existing.metadata["status"] = existing.status
return existing
return facts
def save_facts_to_store(facts: List[ExtractedFact], fact_store_fn=None) -> int:
"""Save extracted facts to the fact store.
If a callback is supplied, prefer the structured signature but fall back to
the legacy four-argument callback for compatibility.
Args:
facts: List of extracted facts.
fact_store_fn: Optional callable(category, entity, content, trust).
If None, uses the holographic fact store if available.
Returns:
Number of facts saved.
"""
saved = 0
for fact in facts:
payload = {
"category": _store_category(fact.category),
"entity": fact.entity,
"content": fact.content,
"trust": fact.confidence,
"metadata": dict(fact.metadata),
"canonical_key": fact.canonical_key,
"observed_at": fact.observed_at,
"source_role": fact.source_role,
"source_turn": fact.source_turn,
"contradiction_group": fact.contradiction_group,
"status": fact.status,
"relation": fact.relation,
}
if fact_store_fn:
if fact_store_fn:
for fact in facts:
try:
fact_store_fn(**payload)
fact_store_fn(
category=fact.category,
entity=fact.entity,
content=fact.content,
trust=fact.confidence,
)
saved += 1
continue
except TypeError:
try:
fact_store_fn(payload["category"], payload["entity"], payload["content"], payload["trust"])
saved += 1
continue
except Exception as exc:
logger.debug("Failed to save fact via callback: %s", exc)
continue
except Exception as exc:
logger.debug("Failed to save fact via callback: %s", exc)
continue
except Exception as e:
logger.debug("Failed to save fact: %s", e)
else:
# Try holographic fact store
try:
from fact_store import fact_store as _fs
tags = ",".join(filter(None, [fact.entity, fact.relation, fact.status]))
_fs(
action="add",
content=fact.content,
category=_store_category(fact.category),
tags=tags,
trust_delta=fact.confidence - 0.5,
)
saved += 1
for fact in facts:
try:
_fs(
action="add",
content=fact.content,
category=fact.category,
tags=fact.entity,
trust_delta=fact.confidence - 0.5,
)
saved += 1
except Exception as e:
logger.debug("Failed to save fact via fact_store: %s", e)
except ImportError:
logger.debug("fact_store not available — facts not persisted")
break
except Exception as exc:
logger.debug("Failed to save fact via fact_store: %s", exc)
return saved
@@ -553,10 +204,9 @@ def extract_and_save_facts(
Returns (extracted_facts, saved_count).
"""
facts = extract_facts_from_messages(messages)
if facts:
logger.info("Extracted %d normalized facts from conversation", len(facts))
logger.info("Extracted %d facts from conversation", len(facts))
saved = save_facts_to_store(facts, fact_store_fn)
logger.info("Saved %d/%d facts to store", saved, len(facts))
else:
@@ -566,105 +216,16 @@ def extract_and_save_facts(
def format_facts_summary(facts: List[ExtractedFact]) -> str:
"""Format extracted facts as a readable summary."""
if not facts:
return "No facts extracted."
by_category: dict[str, list[ExtractedFact]] = {}
for fact in facts:
by_category.setdefault(fact.category, []).append(fact)
by_category = {}
for f in facts:
by_category.setdefault(f.category, []).append(f)
lines = [f"Extracted {len(facts)} facts:", ""]
for category, category_facts in sorted(by_category.items()):
lines.append(f" {category}:")
for fact in category_facts:
suffix = f" [{fact.status}]" if fact.status != "active" else ""
lines.append(f" - {fact.content[:80]}{suffix}")
for cat, cat_facts in sorted(by_category.items()):
lines.append(f" {cat}:")
for f in cat_facts:
lines.append(f" - {f.content[:80]}")
return "\n".join(lines)
def _store_category(category: str) -> str:
if category.startswith("user_pref"):
return "user_pref"
if category.startswith("project"):
return "project"
if category.startswith("tool"):
return "tool"
return "general"
def _message_time(msg: Dict[str, Any]) -> Tuple[float, str]:
for key in ("created_at", "timestamp", "time"):
value = msg.get(key)
if value is None:
continue
if isinstance(value, (int, float)):
ts = float(value)
return ts, _iso_from_timestamp(ts)
if isinstance(value, str):
parsed = _parse_time_string(value)
if parsed is not None:
return parsed, _iso_from_timestamp(parsed) if "T" not in value else value.replace("+00:00", "Z")
return time.time(), value
now = time.time()
return now, _iso_from_timestamp(now)
def _parse_time_string(value: str) -> float | None:
text = value.strip()
if not text:
return None
try:
return float(text)
except ValueError:
pass
try:
normalized = text[:-1] + "+00:00" if text.endswith("Z") else text
return datetime.fromisoformat(normalized).timestamp()
except ValueError:
return None
def _iso_from_timestamp(value: float) -> str:
return datetime.fromtimestamp(value, tz=timezone.utc).isoformat().replace("+00:00", "Z")
def _normalize_value(value: str) -> str:
normalized = re.sub(r"[^a-z0-9]+", " ", value.lower())
normalized = re.sub(r"\s+", " ", normalized).strip()
return normalized
def _normalize_entity(value: str) -> str:
return _normalize_value(value).replace(" ", "_") or "entity"
def _canonical_key(entity: str, relation: str, normalized_value: str) -> str:
return f"{entity}|{relation}|{normalized_value}"
def _ordered_unique(*groups: List[str]) -> List[str]:
seen: set[str] = set()
ordered: list[str] = []
for group in groups:
for item in group:
if item and item not in seen:
seen.add(item)
ordered.append(item)
return ordered
def _merge_evidence(existing: List[Dict[str, Any]], incoming: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
seen: set[tuple[str, str, str]] = set()
merged: list[dict[str, Any]] = []
for item in list(existing) + list(incoming):
key = (
str(item.get("provenance", "")),
str(item.get("observed_at", "")),
str(item.get("source_text", "")),
)
if key in seen:
continue
seen.add(key)
merged.append(dict(item))
return merged

View File

@@ -356,57 +356,44 @@ class HolographicMemoryProvider(MemoryProvider):
# -- Auto-extraction (on_session_end) ------------------------------------
def _auto_extract_facts(self, messages: list) -> None:
from agent.session_compactor import evaluate_extraction_quality, extract_facts_from_messages
def _store_category(category: str) -> str:
if category.startswith("user_pref"):
return "user_pref"
if category.startswith("project"):
return "project"
if category.startswith("tool"):
return "tool"
return "general"
facts = extract_facts_from_messages(messages)
if not facts:
return
_PREF_PATTERNS = [
re.compile(r'\bI\s+(?:prefer|like|love|use|want|need)\s+(.+)', re.IGNORECASE),
re.compile(r'\bmy\s+(?:favorite|preferred|default)\s+\w+\s+is\s+(.+)', re.IGNORECASE),
re.compile(r'\bI\s+(?:always|never|usually)\s+(.+)', re.IGNORECASE),
]
_DECISION_PATTERNS = [
re.compile(r'\bwe\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+)', re.IGNORECASE),
re.compile(r'\bthe\s+project\s+(?:uses|needs|requires)\s+(.+)', re.IGNORECASE),
]
extracted = 0
for fact in facts:
try:
metadata = dict(fact.metadata)
metadata.setdefault("relation", fact.relation)
metadata.setdefault("value", fact.content)
metadata.setdefault("provenance", [fact.provenance])
metadata.setdefault("evidence", list(fact.evidence))
metadata.setdefault("observation_count", len(fact.evidence))
metadata.setdefault("duplicate_count", max(0, len(fact.evidence) - 1))
self._store.add_fact(
fact.content,
category=_store_category(fact.category),
tags=",".join(filter(None, [fact.entity, fact.relation, fact.status])),
canonical_key=fact.canonical_key,
metadata=metadata,
confidence=fact.confidence,
source_role=fact.source_role,
source_turn=fact.source_turn,
observed_at=fact.observed_at,
contradiction_group=fact.contradiction_group,
status=fact.status,
)
extracted += 1
except Exception as exc:
logger.debug("Structured auto-extract failed for %s: %s", fact.canonical_key, exc)
for msg in messages:
if msg.get("role") != "user":
continue
content = msg.get("content", "")
if not isinstance(content, str) or len(content) < 10:
continue
for pattern in _PREF_PATTERNS:
if pattern.search(content):
try:
self._store.add_fact(content[:400], category="user_pref")
extracted += 1
except Exception:
pass
break
for pattern in _DECISION_PATTERNS:
if pattern.search(content):
try:
self._store.add_fact(content[:400], category="project")
extracted += 1
except Exception:
pass
break
if extracted:
metrics = evaluate_extraction_quality(messages)
logger.info(
"Auto-extracted %d structured facts from conversation (raw=%d normalized=%d contradictions=%d)",
extracted,
metrics["raw_candidates"],
metrics["normalized_facts"],
metrics["contradiction_groups"],
)
logger.info("Auto-extracted %d facts from conversation", extracted)
# ---------------------------------------------------------------------------

View File

@@ -3,7 +3,6 @@ SQLite-backed fact store with entity resolution and trust scoring.
Single-user Hermes memory store plugin.
"""
import json
import re
import sqlite3
import threading
@@ -16,24 +15,16 @@ except ImportError:
_SCHEMA = """
CREATE TABLE IF NOT EXISTS facts (
fact_id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL UNIQUE,
category TEXT DEFAULT 'general',
tags TEXT DEFAULT '',
trust_score REAL DEFAULT 0.5,
retrieval_count INTEGER DEFAULT 0,
helpful_count INTEGER DEFAULT 0,
canonical_key TEXT DEFAULT '',
metadata_json TEXT DEFAULT '{}',
confidence REAL DEFAULT 0.5,
source_role TEXT DEFAULT '',
source_turn INTEGER DEFAULT -1,
observed_at TEXT DEFAULT '',
contradiction_group TEXT DEFAULT '',
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hrr_vector BLOB
fact_id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL UNIQUE,
category TEXT DEFAULT 'general',
tags TEXT DEFAULT '',
trust_score REAL DEFAULT 0.5,
retrieval_count INTEGER DEFAULT 0,
helpful_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hrr_vector BLOB
);
CREATE TABLE IF NOT EXISTS entities (
@@ -50,11 +41,9 @@ CREATE TABLE IF NOT EXISTS fact_entities (
PRIMARY KEY (fact_id, entity_id)
);
CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC);
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
CREATE INDEX IF NOT EXISTS idx_facts_canonical_key ON facts(canonical_key);
CREATE INDEX IF NOT EXISTS idx_facts_contradiction_group ON facts(contradiction_group);
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC);
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts
USING fts5(content, tags, content=facts, content_rowid=fact_id);
@@ -140,23 +129,10 @@ class MemoryStore:
"""Create tables, indexes, and triggers if they do not exist. Enable WAL mode."""
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.executescript(_SCHEMA)
# Migrate: add hrr_vector column if missing (safe for existing databases)
columns = {row[1] for row in self._conn.execute("PRAGMA table_info(facts)").fetchall()}
migrations = {
"hrr_vector": "ALTER TABLE facts ADD COLUMN hrr_vector BLOB",
"canonical_key": "ALTER TABLE facts ADD COLUMN canonical_key TEXT DEFAULT ''",
"metadata_json": "ALTER TABLE facts ADD COLUMN metadata_json TEXT DEFAULT '{}'",
"confidence": "ALTER TABLE facts ADD COLUMN confidence REAL DEFAULT 0.5",
"source_role": "ALTER TABLE facts ADD COLUMN source_role TEXT DEFAULT ''",
"source_turn": "ALTER TABLE facts ADD COLUMN source_turn INTEGER DEFAULT -1",
"observed_at": "ALTER TABLE facts ADD COLUMN observed_at TEXT DEFAULT ''",
"contradiction_group": "ALTER TABLE facts ADD COLUMN contradiction_group TEXT DEFAULT ''",
"status": "ALTER TABLE facts ADD COLUMN status TEXT DEFAULT 'active'",
}
for column, ddl in migrations.items():
if column not in columns:
self._conn.execute(ddl)
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_canonical_key ON facts(canonical_key)")
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_contradiction_group ON facts(contradiction_group)")
if "hrr_vector" not in columns:
self._conn.execute("ALTER TABLE facts ADD COLUMN hrr_vector BLOB")
self._conn.commit()
# ------------------------------------------------------------------
@@ -168,148 +144,41 @@ class MemoryStore:
content: str,
category: str = "general",
tags: str = "",
*,
canonical_key: str = "",
metadata: dict | None = None,
confidence: float | None = None,
source_role: str = "",
source_turn: int = -1,
observed_at: str = "",
contradiction_group: str = "",
status: str = "active",
) -> int:
"""Insert a fact and return its fact_id.
Exact duplicates are deduplicated by content. Near-duplicates are
normalized by canonical_key, with provenance/evidence merged into the
existing row. Contradictions sharing the same contradiction_group remain
stored as separate rows and are marked inspectably.
Deduplicates by content (UNIQUE constraint). On duplicate, returns
the existing fact_id without modifying the row. Extracts entities from
the content and links them to the fact.
"""
with self._lock:
content = content.strip()
if not content:
raise ValueError("content must not be empty")
metadata = dict(metadata or {})
canonical_key = canonical_key.strip()
contradiction_group = contradiction_group.strip()
observed_at = observed_at.strip()
status = status or "active"
trust_score = self.default_trust if confidence is None else _clamp_trust(confidence)
metadata_json = json.dumps(metadata, sort_keys=True)
if canonical_key:
existing = self._conn.execute(
"SELECT fact_id, metadata_json, trust_score, confidence, observed_at FROM facts WHERE canonical_key = ?",
(canonical_key,),
).fetchone()
if existing is not None:
merged_metadata = self._merge_metadata(existing["metadata_json"], metadata)
merged_trust = max(float(existing["trust_score"]), trust_score)
merged_observed_at = existing["observed_at"] or observed_at
if observed_at and merged_observed_at:
merged_observed_at = min(merged_observed_at, observed_at)
elif observed_at:
merged_observed_at = observed_at
self._conn.execute(
"""
UPDATE facts
SET metadata_json = ?,
trust_score = ?,
confidence = ?,
observed_at = ?,
updated_at = CURRENT_TIMESTAMP
WHERE fact_id = ?
""",
(
json.dumps(merged_metadata, sort_keys=True),
merged_trust,
max(float(existing["confidence"] or 0.0), confidence or trust_score),
merged_observed_at,
existing["fact_id"],
),
)
self._conn.commit()
return int(existing["fact_id"])
contradiction_rows = []
if contradiction_group:
contradiction_rows = self._conn.execute(
"""
SELECT fact_id, canonical_key, metadata_json
FROM facts
WHERE contradiction_group = ?
AND canonical_key != ?
""",
(contradiction_group, canonical_key),
).fetchall()
if contradiction_rows:
status = "contradiction"
metadata = dict(metadata)
metadata["status"] = "contradiction"
metadata["contradiction_group"] = contradiction_group
metadata["contradiction_keys"] = sorted(
{
canonical_key,
*[str(row["canonical_key"]) for row in contradiction_rows if row["canonical_key"]],
}
- {""}
)
metadata_json = json.dumps(metadata, sort_keys=True)
try:
cur = self._conn.execute(
"""
INSERT INTO facts (
content,
category,
tags,
trust_score,
canonical_key,
metadata_json,
confidence,
source_role,
source_turn,
observed_at,
contradiction_group,
status
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO facts (content, category, tags, trust_score)
VALUES (?, ?, ?, ?)
""",
(
content,
category,
tags,
trust_score,
canonical_key,
metadata_json,
confidence if confidence is not None else trust_score,
source_role,
source_turn,
observed_at,
contradiction_group,
status,
),
(content, category, tags, self.default_trust),
)
self._conn.commit()
fact_id: int = cur.lastrowid # type: ignore[assignment]
except sqlite3.IntegrityError:
# Duplicate content — return existing id
row = self._conn.execute(
"SELECT fact_id FROM facts WHERE content = ?", (content,)
).fetchone()
return int(row["fact_id"])
if contradiction_rows:
self._mark_contradictions(
contradiction_group=contradiction_group,
new_canonical_key=canonical_key,
existing_rows=contradiction_rows,
)
# Entity extraction and linking
for name in self._extract_entities(content):
entity_id = self._resolve_entity(name)
self._link_fact_entity(fact_id, entity_id)
# Compute HRR vector after entity linking
self._compute_hrr_vector(fact_id, content)
self._rebuild_bank(category)
@@ -342,9 +211,6 @@ class MemoryStore:
sql = f"""
SELECT f.fact_id, f.content, f.category, f.tags,
f.trust_score, f.retrieval_count, f.helpful_count,
f.canonical_key, f.metadata_json, f.confidence,
f.source_role, f.source_turn, f.observed_at,
f.contradiction_group, f.status,
f.created_at, f.updated_at
FROM facts f
JOIN facts_fts fts ON fts.rowid = f.fact_id
@@ -470,11 +336,7 @@ class MemoryStore:
sql = f"""
SELECT fact_id, content, category, tags, trust_score,
retrieval_count, helpful_count,
canonical_key, metadata_json, confidence,
source_role, source_turn, observed_at,
contradiction_group, status,
created_at, updated_at
retrieval_count, helpful_count, created_at, updated_at
FROM facts
WHERE trust_score >= ?
{category_clause}
@@ -525,89 +387,6 @@ class MemoryStore:
"helpful_count": row["helpful_count"] + helpful_increment,
}
# ------------------------------------------------------------------
# Metadata helpers
# ------------------------------------------------------------------
def _load_metadata(self, metadata_json: str | None) -> dict:
if not metadata_json:
return {}
try:
data = json.loads(metadata_json)
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _merge_metadata(self, existing_json: str | None, incoming: dict | None) -> dict:
existing = self._load_metadata(existing_json)
incoming = dict(incoming or {})
merged = dict(existing)
merged.update({k: v for k, v in incoming.items() if k not in {"provenance", "evidence", "observation_count", "duplicate_count", "contradiction_keys"}})
provenance = []
seen_provenance: set[str] = set()
for item in list(existing.get("provenance", [])) + list(incoming.get("provenance", [])):
if item and item not in seen_provenance:
seen_provenance.add(item)
provenance.append(item)
evidence = []
seen_evidence: set[tuple[str, str, str]] = set()
for item in list(existing.get("evidence", [])) + list(incoming.get("evidence", [])):
if not isinstance(item, dict):
continue
key = (
str(item.get("provenance", "")),
str(item.get("observed_at", "")),
str(item.get("source_text", "")),
)
if key in seen_evidence:
continue
seen_evidence.add(key)
evidence.append(dict(item))
observation_count = int(existing.get("observation_count", max(1, len(existing.get("evidence", [])) or 1)))
observation_count += int(incoming.get("observation_count", max(1, len(incoming.get("evidence", [])) or 1)))
contradiction_keys = []
seen_keys: set[str] = set()
for item in list(existing.get("contradiction_keys", [])) + list(incoming.get("contradiction_keys", [])):
if item and item not in seen_keys:
seen_keys.add(item)
contradiction_keys.append(item)
merged["provenance"] = provenance
merged["evidence"] = evidence
merged["observation_count"] = observation_count
merged["duplicate_count"] = max(0, observation_count - 1)
if contradiction_keys:
merged["contradiction_keys"] = contradiction_keys
return merged
def _mark_contradictions(self, contradiction_group: str, new_canonical_key: str, existing_rows: list[sqlite3.Row]) -> None:
for row in existing_rows:
metadata = self._load_metadata(row["metadata_json"])
keys = []
seen: set[str] = set()
for item in list(metadata.get("contradiction_keys", [])) + [new_canonical_key]:
if item and item not in seen:
seen.add(item)
keys.append(item)
metadata["status"] = "contradiction"
metadata["contradiction_group"] = contradiction_group
metadata["contradiction_keys"] = keys
self._conn.execute(
"""
UPDATE facts
SET status = 'contradiction',
metadata_json = ?,
updated_at = CURRENT_TIMESTAMP
WHERE fact_id = ?
""",
(json.dumps(metadata, sort_keys=True), row["fact_id"]),
)
self._conn.commit()
# ------------------------------------------------------------------
# Entity helpers
# ------------------------------------------------------------------
@@ -781,14 +560,8 @@ class MemoryStore:
# ------------------------------------------------------------------
def _row_to_dict(self, row: sqlite3.Row) -> dict:
"""Convert a sqlite3.Row to a plain dict with decoded metadata."""
data = dict(row)
metadata = self._load_metadata(data.get("metadata_json"))
if metadata:
data["metadata"] = metadata
data.setdefault("relation", metadata.get("relation"))
data.pop("metadata_json", None)
return data
"""Convert a sqlite3.Row to a plain dict."""
return dict(row)
def close(self) -> None:
"""Close the database connection."""

View File

@@ -1,63 +0,0 @@
{
"preferences_and_duplicates": [
{
"role": "user",
"content": "Deploy via Ansible for production changes.",
"created_at": "2026-04-22T10:00:00Z"
},
{
"role": "user",
"content": "We deploy through Ansible on this repo.",
"created_at": "2026-04-22T10:01:00Z"
},
{
"role": "user",
"content": "Gitea-first for repository work.",
"created_at": "2026-04-22T10:02:00Z"
}
],
"operational_and_contradictions": [
{
"role": "user",
"content": "The BURN watchdog caps dispatches per cycle to 6.",
"created_at": "2026-04-22T11:00:00Z"
},
{
"role": "user",
"content": "The provider should stay openai-codex/gpt-5.4.",
"created_at": "2026-04-22T11:01:00Z"
},
{
"role": "user",
"content": "Correction: the provider should stay mimo-v2-pro.",
"created_at": "2026-04-22T11:02:00Z"
}
],
"mixed_transcript": [
{
"role": "user",
"content": "Deploy via Ansible for production changes.",
"created_at": "2026-04-22T10:00:00Z"
},
{
"role": "user",
"content": "We deploy through Ansible on this repo.",
"created_at": "2026-04-22T10:01:00Z"
},
{
"role": "user",
"content": "The BURN watchdog caps dispatches per cycle to 6.",
"created_at": "2026-04-22T11:00:00Z"
},
{
"role": "user",
"content": "The provider should stay openai-codex/gpt-5.4.",
"created_at": "2026-04-22T11:01:00Z"
},
{
"role": "user",
"content": "Correction: the provider should stay mimo-v2-pro.",
"created_at": "2026-04-22T11:02:00Z"
}
]
}

View File

@@ -1,50 +0,0 @@
"""Integration tests for holographic auto-extraction with structured fact persistence."""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
from plugins.memory.holographic import HolographicMemoryProvider
_FIXTURE_PATH = Path(__file__).resolve().parents[2] / "fixtures" / "memory_extraction_fragments.json"
def _load_fixture(name: str):
return json.loads(_FIXTURE_PATH.read_text())[name]
class TestHolographicAutoExtract:
def test_auto_extract_persists_structured_metadata_and_normalizes_duplicates(self, tmp_path):
provider = HolographicMemoryProvider(
config={
"db_path": str(tmp_path / "memory_store.db"),
"auto_extract": True,
"default_trust": 0.5,
}
)
provider.initialize("test-session")
messages = _load_fixture("mixed_transcript")
provider.on_session_end(messages)
provider.on_session_end(messages)
facts = provider._store.list_facts(min_trust=0.0, limit=20)
deploy_facts = [f for f in facts if f.get("relation") == "workflow.deploy_method"]
provider_facts = [f for f in facts if f.get("contradiction_group") == "config.provider"]
assert len(deploy_facts) == 1
assert deploy_facts[0]["metadata"]["duplicate_count"] >= 3
assert deploy_facts[0]["observed_at"] == "2026-04-22T10:00:00Z"
assert deploy_facts[0]["metadata"]["provenance"] == [
"conversation:user:0",
"conversation:user:1",
]
assert len(provider_facts) == 2
assert {f["status"] for f in provider_facts} == {"contradiction"}
assert {f["metadata"]["value"] for f in provider_facts} == {
"openai-codex/gpt-5.4",
"mimo-v2-pro",
}

View File

@@ -1,6 +1,6 @@
"""Tests for session compaction with fact extraction."""
import json
import pytest
import sys
from pathlib import Path
@@ -8,19 +8,12 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from agent.session_compactor import (
ExtractedFact,
evaluate_extraction_quality,
extract_and_save_facts,
extract_facts_from_messages,
format_facts_summary,
save_facts_to_store,
extract_and_save_facts,
format_facts_summary,
)
_FIXTURE_PATH = Path(__file__).resolve().parent / "fixtures" / "memory_extraction_fragments.json"
def _load_fixture(name: str):
return json.loads(_FIXTURE_PATH.read_text())[name]
class TestFactExtraction:
def test_extract_preference(self):
@@ -67,48 +60,14 @@ class TestFactExtraction:
{"role": "user", "content": "I prefer Python."},
]
facts = extract_facts_from_messages(messages)
# Should deduplicate
python_facts = [f for f in facts if "Python" in f.content]
assert len(python_facts) == 1
def test_structured_fact_preserves_provenance_and_temporal_metadata(self):
facts = extract_facts_from_messages(_load_fixture("preferences_and_duplicates"))
deploy_fact = next(f for f in facts if f.relation == "workflow.deploy_method")
assert deploy_fact.source_role == "user"
assert deploy_fact.source_turn == 0
assert deploy_fact.observed_at == "2026-04-22T10:00:00Z"
assert deploy_fact.provenance == "conversation:user:0"
assert deploy_fact.canonical_key
assert deploy_fact.evidence
assert deploy_fact.evidence[0]["source_text"].startswith("Deploy via Ansible")
def test_near_duplicate_facts_are_normalized_into_one_canonical_fact(self):
facts = extract_facts_from_messages(_load_fixture("preferences_and_duplicates"))
deploy_facts = [f for f in facts if f.relation == "workflow.deploy_method"]
assert len(deploy_facts) == 1
assert len(deploy_facts[0].evidence) == 2
assert deploy_facts[0].metadata["duplicate_count"] == 1
def test_contradictory_facts_are_preserved_for_unique_slots(self):
facts = extract_facts_from_messages(_load_fixture("operational_and_contradictions"))
provider_facts = [f for f in facts if f.contradiction_group == "config.provider"]
assert len(provider_facts) == 2
assert {f.status for f in provider_facts} == {"contradiction"}
assert {f.normalized_content for f in provider_facts} == {
"openai codex gpt 5 4",
"mimo v2 pro",
}
def test_quality_evaluation_reports_noise_reduction(self):
metrics = evaluate_extraction_quality(_load_fixture("mixed_transcript"))
assert metrics["raw_candidates"] > metrics["normalized_facts"]
assert metrics["noise_reduction"] > 0
assert metrics["contradiction_groups"] == 1
class TestSaveFacts:
def test_save_with_callback(self):
saved = []
def mock_save(category, entity, content, trust):
saved.append({"category": category, "content": content})
@@ -117,38 +76,6 @@ class TestSaveFacts:
assert count == 1
assert len(saved) == 1
def test_save_with_extended_callback_metadata(self):
saved = []
def mock_save(category, entity, content, trust, **kwargs):
saved.append({
"category": category,
"entity": entity,
"content": content,
"trust": trust,
**kwargs,
})
fact = ExtractedFact(
"project.operational",
"watchdog",
"BURN watchdog caps dispatches per cycle to 6",
0.9,
2,
source_role="user",
observed_at="2026-04-22T11:00:00Z",
provenance="conversation:user:2",
canonical_key="project.operational|watchdog|dispatch_cap|6",
relation="fleet.dispatch_cap",
contradiction_group="fleet.dispatch_cap",
metadata={"duplicate_count": 0},
)
count = save_facts_to_store([fact], fact_store_fn=mock_save)
assert count == 1
assert saved[0]["canonical_key"] == fact.canonical_key
assert saved[0]["observed_at"] == "2026-04-22T11:00:00Z"
assert saved[0]["metadata"]["duplicate_count"] == 0
class TestFormatSummary:
def test_empty(self):

View File

@@ -148,3 +148,184 @@ class TestStrategyNameSurfaced:
assert count == 0
assert strategy is None
assert err is not None
class TestEscapeDriftGuard:
"""Tests for the escape-drift guard that catches bash/JSON serialization
artifacts where an apostrophe gets prefixed with a spurious backslash
in tool-call transport.
"""
def test_drift_blocked_apostrophe(self):
"""File has ', old_string and new_string both have \\' — classic
tool-call drift. Guard must block with a helpful error instead of
writing \\' literals into source code."""
content = "x = \"hello there\"\n"
# Simulate transport-corrupted old_string and new_string where an
# apostrophe-like context got prefixed with a backslash. The content
# itself has no apostrophe, but both strings do — matching via
# whitespace/anchor strategies would otherwise succeed.
old_string = "x = \"hello there\" # don\\'t edit\n"
new_string = "x = \"hi there\" # don\\'t edit\n"
# This particular pair won't match anything, so it exits via
# no-match path. Build a case where a non-exact strategy DOES match.
content = "line\n x = 1\nline"
old_string = "line\n x = \\'a\\'\nline"
new_string = "line\n x = \\'b\\'\nline"
new, count, strategy, err = fuzzy_find_and_replace(content, old_string, new_string)
assert count == 0
assert err is not None and "Escape-drift" in err
assert "backslash" in err.lower()
assert new == content # file untouched
def test_drift_blocked_double_quote(self):
"""Same idea but with \\" drift instead of \\'."""
content = 'line\n x = 1\nline'
old_string = 'line\n x = \\"a\\"\nline'
new_string = 'line\n x = \\"b\\"\nline'
new, count, strategy, err = fuzzy_find_and_replace(content, old_string, new_string)
assert count == 0
assert err is not None and "Escape-drift" in err
def test_drift_allowed_when_file_genuinely_has_backslash_escapes(self):
"""If the file already contains \\' (e.g. inside an existing escaped
string), the model is legitimately preserving it. Guard must NOT
fire."""
content = "line\n x = \\'a\\'\nline"
old_string = "line\n x = \\'a\\'\nline"
new_string = "line\n x = \\'b\\'\nline"
new, count, strategy, err = fuzzy_find_and_replace(content, old_string, new_string)
assert err is None
assert count == 1
assert "\\'b\\'" in new
def test_drift_allowed_on_exact_match(self):
"""Exact matches bypass the drift guard entirely — if the file
really contains the exact bytes old_string specified, it's not
drift."""
content = "hello \\'world\\'"
new, count, strategy, err = fuzzy_find_and_replace(
content, "hello \\'world\\'", "hello \\'there\\'"
)
assert err is None
assert count == 1
assert strategy == "exact"
def test_drift_allowed_when_adding_escaped_strings(self):
"""Model is adding new content with \\' that wasn't in the original.
old_string has no \\', so guard doesn't fire."""
content = "line1\nline2\nline3"
old_string = "line1\nline2\nline3"
new_string = "line1\nprint(\\'added\\')\nline2\nline3"
new, count, strategy, err = fuzzy_find_and_replace(content, old_string, new_string)
assert err is None
assert count == 1
assert "\\'added\\'" in new
def test_no_drift_check_when_new_string_lacks_suspect_chars(self):
"""Fast-path: if new_string has no \\' or \\", guard must not
fire even on fuzzy match."""
content = "def foo():\n pass" # extra space ignored by line_trimmed
old_string = "def foo():\n pass"
new_string = "def bar():\n return 1"
new, count, strategy, err = fuzzy_find_and_replace(content, old_string, new_string)
assert err is None
assert count == 1
class TestFindClosestLines:
def setup_method(self):
from tools.fuzzy_match import find_closest_lines
self.find_closest_lines = find_closest_lines
def test_finds_similar_line(self):
content = "def foo():\n pass\ndef bar():\n return 1\n"
result = self.find_closest_lines("def baz():", content)
assert "def foo" in result or "def bar" in result
def test_returns_empty_for_no_match(self):
content = "completely different content here"
result = self.find_closest_lines("xyzzy_no_match_possible_!!!", content)
assert result == ""
def test_returns_empty_for_empty_inputs(self):
assert self.find_closest_lines("", "some content") == ""
assert self.find_closest_lines("old string", "") == ""
def test_includes_context_lines(self):
content = "line1\nline2\ndef target():\n pass\nline5\n"
result = self.find_closest_lines("def target():", content)
assert "target" in result
def test_includes_line_numbers(self):
content = "line1\nline2\ndef foo():\n pass\n"
result = self.find_closest_lines("def foo():", content)
# Should include line numbers in format "N| content"
assert "|" in result
class TestFormatNoMatchHint:
"""Gating tests for format_no_match_hint — the shared helper that decides
whether a 'Did you mean?' snippet should be appended to an error.
"""
def setup_method(self):
from tools.fuzzy_match import format_no_match_hint
self.fmt = format_no_match_hint
def test_fires_on_could_not_find_with_match(self):
"""Classic no-match: similar content exists → hint fires."""
content = "def foo():\n pass\ndef bar():\n pass\n"
result = self.fmt(
"Could not find a match for old_string in the file",
0, "def baz():", content,
)
assert "Did you mean" in result
assert "foo" in result or "bar" in result
def test_silent_on_ambiguous_match_error(self):
"""'Found N matches' is not a missing-match failure — no hint."""
content = "aaa bbb aaa\n"
result = self.fmt(
"Found 2 matches for old_string. Provide more context to make it unique, or use replace_all=True.",
0, "aaa", content,
)
assert result == ""
def test_silent_on_escape_drift_error(self):
"""Escape-drift errors are intentional blocks — hint would mislead."""
content = "x = 1\n"
result = self.fmt(
"Escape-drift detected: old_string and new_string contain the literal sequence '\\\\''...",
0, "x = \\'1\\'", content,
)
assert result == ""
def test_silent_on_identical_strings(self):
"""old_string == new_string — hint irrelevant."""
result = self.fmt(
"old_string and new_string are identical",
0, "foo", "foo bar\n",
)
assert result == ""
def test_silent_when_match_count_nonzero(self):
"""If match succeeded, we shouldn't be in the error path — defense in depth."""
result = self.fmt(
"Could not find a match for old_string in the file",
1, "foo", "foo bar\n",
)
assert result == ""
def test_silent_on_none_error(self):
"""No error at all — no hint."""
result = self.fmt(None, 0, "foo", "bar\n")
assert result == ""
def test_silent_when_no_similar_content(self):
"""Even for a valid no-match error, skip hint when nothing similar exists."""
result = self.fmt(
"Could not find a match for old_string in the file",
0, "totally_unique_xyzzy_qux", "abc\nxyz\n",
)
assert result == ""

View File

@@ -0,0 +1,114 @@
import json
import os
import textwrap
from pathlib import Path
import tools.skill_manager_tool as skill_manager_tool
from tools.file_tools import patch_tool
from tools.skill_manager_tool import _create_skill, _patch_skill
def _disable_patch_tool_guards(monkeypatch):
monkeypatch.setattr("tools.file_tools._check_sensitive_path", lambda _path: None)
monkeypatch.setattr("tools.file_tools._check_file_staleness", lambda _path, _task_id: None)
monkeypatch.setattr("tools.file_tools._log_and_check_conflict", lambda _path, _task_id, _action: None)
def test_patch_tool_replace_no_match_shows_rich_hint_without_legacy_hint(tmp_path, monkeypatch):
_disable_patch_tool_guards(monkeypatch)
sample = tmp_path / "sample.py"
sample.write_text("def foo():\n return 1\n\ndef bar():\n return 2\n", encoding="utf-8")
raw = patch_tool(
mode="replace",
path=str(sample),
old_string="def barycentric():",
new_string="def barycentric_new():",
task_id="qa960-replace-rich-hint",
)
result = json.loads(raw)
assert result["success"] is False
assert "Could not find a match" in result["error"]
assert "Did you mean one of these sections?" in result["error"]
assert "def bar():" in result["error"] or "def foo():" in result["error"]
assert "[Hint:" not in raw
def test_patch_tool_replace_ambiguous_error_does_not_show_did_you_mean(tmp_path, monkeypatch):
_disable_patch_tool_guards(monkeypatch)
sample = tmp_path / "sample.py"
sample.write_text("aaa\nbbb\naaa\n", encoding="utf-8")
raw = patch_tool(
mode="replace",
path=str(sample),
old_string="aaa",
new_string="ccc",
task_id="qa960-replace-ambiguous",
)
result = json.loads(raw)
assert result["success"] is False
assert "Found 2 matches" in result["error"]
assert "Did you mean one of these sections?" not in result["error"]
assert "[Hint:" not in raw
def test_patch_tool_v4a_no_match_shows_rich_hint(tmp_path, monkeypatch):
_disable_patch_tool_guards(monkeypatch)
sample = tmp_path / "sample.py"
sample.write_text("def foo():\n return 1\n", encoding="utf-8")
patch = textwrap.dedent(
f"""\
*** Begin Patch
*** Update File: {sample}
@@
-def barycentric():
+def barycentric_new():
*** End Patch
"""
)
raw = patch_tool(mode="patch", patch=patch, task_id="qa960-v4a-rich-hint")
result = json.loads(raw)
assert result["success"] is False
assert "Patch validation failed" in result["error"]
assert "Did you mean one of these sections?" in result["error"]
assert "def foo():" in result["error"]
def test_skill_patch_no_match_shows_rich_hint(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
skills_dir = tmp_path / "skills"
skills_dir.mkdir(parents=True, exist_ok=True)
monkeypatch.setattr(skill_manager_tool, "SKILLS_DIR", skills_dir)
monkeypatch.setattr(skill_manager_tool, "_security_scan_skill", lambda _skill_dir: None)
_create_skill(
"qa-skill",
textwrap.dedent(
"""\
---
name: qa-skill
description: test
---
Step 1: Do the thing.
Step 2: Verify the thing.
"""
),
)
result = _patch_skill(
"qa-skill",
"Step 1: Do the production rollout.",
"Step 1: Updated.",
)
assert result["success"] is False
assert "Could not find a match" in result["error"]
assert "Did you mean one of these sections?" in result["error"]
assert "Step 1: Do the thing." in result["error"]
assert "file_preview" in result

View File

@@ -757,12 +757,14 @@ class ShellFileOperations(FileOperations):
content, old_string, new_string, replace_all
)
if error:
return PatchResult(error=error)
if match_count == 0:
return PatchResult(error=f"Could not find match for old_string in {path}")
if error or match_count == 0:
err_msg = error or f"Could not find match for old_string in {path}"
try:
from tools.fuzzy_match import format_no_match_hint
err_msg += format_no_match_hint(err_msg, match_count, old_string, content)
except Exception:
pass
return PatchResult(error=err_msg)
# Write back
write_result = self.write_file(path, new_content)
if write_result.error:

View File

@@ -8,6 +8,7 @@ import os
import threading
import time
from pathlib import Path
from typing import Any, Dict, Optional
from tools.binary_extensions import has_binary_extension
from tools.file_operations import ShellFileOperations
from agent.redact import redact_sensitive_text
@@ -690,8 +691,11 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
result_json = json.dumps(result_dict, ensure_ascii=False)
# Hint when old_string not found — saves iterations where the agent
# retries with stale content instead of re-reading the file.
# Suppressed when patch_replace already attached a rich "Did you mean?"
# snippet (which is strictly more useful than the generic hint).
if result_dict.get("error") and "Could not find" in str(result_dict["error"]):
result_json += "\n\n[Hint: old_string not found. Use read_file to verify the current content, or search_files to locate the text.]"
if "Did you mean one of these sections?" not in str(result_dict["error"]):
result_json += "\n\n[Hint: old_string not found. Use read_file to verify the current content, or search_files to locate the text.]"
return result_json
except Exception as e:
return tool_error(str(e))

View File

@@ -93,6 +93,21 @@ def fuzzy_find_and_replace(content: str, old_string: str, new_string: str,
f"Provide more context to make it unique, or use replace_all=True."
)
# Escape-drift guard: when the matched strategy is NOT `exact`,
# we matched via some form of normalization. If new_string
# contains shell/JSON-style escape sequences (\\' or \\") that
# would be written literally into the file but the matched
# region of the file has no such sequences, this is almost
# certainly tool-call serialization drift — the model typed
# an apostrophe/quote and the transport added a stray
# backslash. Writing new_string as-is would corrupt the file.
# Block with a helpful error so the model re-reads and retries
# instead of the caller silently persisting garbage (or not).
if strategy_name != "exact":
drift_err = _detect_escape_drift(content, matches, old_string, new_string)
if drift_err:
return content, 0, None, drift_err
# Perform replacement
new_content = _apply_replacements(content, matches, new_string)
return new_content, len(matches), strategy_name, None
@@ -101,6 +116,46 @@ def fuzzy_find_and_replace(content: str, old_string: str, new_string: str,
return content, 0, None, "Could not find a match for old_string in the file"
def _detect_escape_drift(content: str, matches: List[Tuple[int, int]],
old_string: str, new_string: str) -> Optional[str]:
"""Detect tool-call escape-drift artifacts in new_string.
Looks for ``\\'`` or ``\\"`` sequences that are present in both
old_string and new_string (i.e. the model copy-pasted them as "context"
it intended to preserve) but don't exist in the matched region of the
file. That pattern indicates the transport layer inserted spurious
shell-style escapes around apostrophes or quotes — writing new_string
verbatim would literally insert ``\\'`` into source code.
Returns an error string if drift is detected, None otherwise.
"""
# Cheap pre-check: bail out unless new_string actually contains a
# suspect escape sequence. This keeps the guard free for all the
# common, correct cases.
if "\\'" not in new_string and '\\"' not in new_string:
return None
# Aggregate matched regions of the file — that's what new_string will
# replace. If the suspect escapes are present there already, the
# model is genuinely preserving them (valid for some languages /
# escaped strings); accept the patch.
matched_regions = "".join(content[start:end] for start, end in matches)
for suspect in ("\\'", '\\"'):
if suspect in new_string and suspect in old_string and suspect not in matched_regions:
plain = suspect[1] # "'" or '"'
return (
f"Escape-drift detected: old_string and new_string contain "
f"the literal sequence {suspect!r} but the matched region of "
f"the file does not. This is almost always a tool-call "
f"serialization artifact where an apostrophe or quote got "
f"prefixed with a spurious backslash. Re-read the file with "
f"read_file and pass old_string/new_string without "
f"backslash-escaping {plain!r} characters."
)
return None
def _apply_replacements(content: str, matches: List[Tuple[int, int]], new_string: str) -> str:
"""
Apply replacements at the given positions.
@@ -564,3 +619,86 @@ def _map_normalized_positions(original: str, normalized: str,
original_matches.append((orig_start, min(orig_end, len(original))))
return original_matches
def find_closest_lines(old_string: str, content: str, context_lines: int = 2, max_results: int = 3) -> str:
"""Find lines in content most similar to old_string for "did you mean?" feedback.
Returns a formatted string showing the closest matching lines with context,
or empty string if no useful match is found.
"""
if not old_string or not content:
return ""
old_lines = old_string.splitlines()
content_lines = content.splitlines()
if not old_lines or not content_lines:
return ""
# Use first line of old_string as anchor for search
anchor = old_lines[0].strip()
if not anchor:
# Try second line if first is blank
candidates = [l.strip() for l in old_lines if l.strip()]
if not candidates:
return ""
anchor = candidates[0]
# Score each line in content by similarity to anchor
scored = []
for i, line in enumerate(content_lines):
stripped = line.strip()
if not stripped:
continue
ratio = SequenceMatcher(None, anchor, stripped).ratio()
if ratio > 0.3:
scored.append((ratio, i))
if not scored:
return ""
# Take top matches
scored.sort(key=lambda x: -x[0])
top = scored[:max_results]
parts = []
seen_ranges = set()
for _, line_idx in top:
start = max(0, line_idx - context_lines)
end = min(len(content_lines), line_idx + len(old_lines) + context_lines)
key = (start, end)
if key in seen_ranges:
continue
seen_ranges.add(key)
snippet = "\n".join(
f"{start + j + 1:4d}| {content_lines[start + j]}"
for j in range(end - start)
)
parts.append(snippet)
if not parts:
return ""
return "\n---\n".join(parts)
def format_no_match_hint(error: Optional[str], match_count: int,
old_string: str, content: str) -> str:
"""Return a '\\n\\nDid you mean...' snippet for plain no-match errors.
Gated so the hint only fires for actual "old_string not found" failures.
Ambiguous-match ("Found N matches"), escape-drift, and identical-strings
errors all have ``match_count == 0`` but a "did you mean?" snippet would
be misleading — those failed for unrelated reasons.
Returns an empty string when there's nothing useful to append.
"""
if match_count != 0:
return ""
if not error or not error.startswith("Could not find"):
return ""
hint = find_closest_lines(old_string, content)
if not hint:
return ""
return "\n\nDid you mean one of these sections?\n" + hint

View File

@@ -290,10 +290,16 @@ def _validate_operations(
)
if count == 0:
label = f"'{hunk.context_hint}'" if hunk.context_hint else "(no hint)"
errors.append(
msg = (
f"{op.file_path}: hunk {label} not found"
+ (f"{match_error}" if match_error else "")
)
try:
from tools.fuzzy_match import format_no_match_hint
msg += format_no_match_hint(match_error, count, search_pattern, simulated)
except Exception:
pass
errors.append(msg)
else:
# Advance simulation so subsequent hunks validate correctly.
# Reuse the result from the call above — no second fuzzy run.
@@ -537,7 +543,13 @@ def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
error = None
if error:
return False, f"Could not apply hunk: {error}"
err_msg = f"Could not apply hunk: {error}"
try:
from tools.fuzzy_match import format_no_match_hint
err_msg += format_no_match_hint(error, 0, search_pattern, new_content)
except Exception:
pass
return False, err_msg
else:
# Addition-only hunk (no context or removed lines).
# Insert at the location indicated by the context hint, or at end of file.

View File

@@ -575,9 +575,15 @@ def _patch_skill(
if match_error:
# Show a short preview of the file so the model can self-correct
preview = content[:500] + ("..." if len(content) > 500 else "")
err_msg = match_error
try:
from tools.fuzzy_match import format_no_match_hint
err_msg += format_no_match_hint(match_error, match_count, old_string, content)
except Exception:
pass
return {
"success": False,
"error": match_error,
"error": err_msg,
"file_preview": preview,
}