Compare commits
1 Commits
fix/1012
...
claude/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3f4515db38 |
@@ -1,546 +1,197 @@
|
||||
"""Session compaction with structured fact extraction.
|
||||
"""Session compaction with fact extraction.
|
||||
|
||||
Before compressing conversation context, extract durable facts with enough
|
||||
structure to survive retrieval: source/provenance, temporal anchors,
|
||||
normalized canonical keys, and contradiction groups.
|
||||
Before compressing conversation context, extracts durable facts
|
||||
(user preferences, corrections, project details) and saves them
|
||||
to the fact store so they survive compression.
|
||||
|
||||
Usage:
|
||||
from agent.session_compactor import extract_and_save_facts
|
||||
facts = extract_and_save_facts(messages)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, List, Tuple
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_DEPLOY_METHOD_RE = re.compile(r"\bdeploy(?:ing)?\s+(?:via|through|with)\s+([A-Za-z0-9_./+-]+)", re.IGNORECASE)
|
||||
_WATCHDOG_CAP_RE = re.compile(
|
||||
r"\b(?:the\s+)?([A-Za-z0-9_-]+(?:\s+watchdog)?)\s+(?:caps|limits)\s+dispatches(?:\s+per\s+cycle)?\s+to\s+([0-9]+)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_PROVIDER_RE = re.compile(
|
||||
r"\bprovider\s+(?:is|should\s+stay|should\s+be|needs\s+to\s+be)\s+([A-Za-z0-9._/-]+)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_MODEL_RE = re.compile(
|
||||
r"\bmodel\s+(?:is|should\s+stay|should\s+be|needs\s+to\s+be)\s+([A-Za-z0-9._:/-]+)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_PORT_RE = re.compile(r"\bport\s+(?:is|should\s+be)\s+([0-9]+)", re.IGNORECASE)
|
||||
_PROJECT_USES_RE = re.compile(r"\b(?:the\s+)?project\s+(?:uses|needs|requires)\s+(.+?)(?:[.!?]|$)", re.IGNORECASE)
|
||||
_PREFERENCE_RE = re.compile(r"\bI\s+(?:prefer|like|want|need)\s+(.+?)(?:[.!?]|$)", re.IGNORECASE)
|
||||
_CONSTRAINT_RE = re.compile(r"\b(?:do\s+not|don't)\s+(?:ever\s+|again\s+)?(.+?)(?:[.!?]|$)", re.IGNORECASE)
|
||||
_DECISION_RE = re.compile(r"\b(?:we|the\s+team)\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+?)(?:[.!?]|$)", re.IGNORECASE)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExtractedFact:
|
||||
"""A durable fact extracted from conversation."""
|
||||
|
||||
category: str
|
||||
entity: str
|
||||
content: str
|
||||
confidence: float
|
||||
source_turn: int
|
||||
"""A fact extracted from conversation."""
|
||||
category: str # "user_pref", "correction", "project", "tool_quirk", "general"
|
||||
entity: str # what the fact is about
|
||||
content: str # the fact itself
|
||||
confidence: float # 0.0-1.0
|
||||
source_turn: int # which message turn it came from
|
||||
timestamp: float = 0.0
|
||||
source_role: str = "user"
|
||||
source_text: str = ""
|
||||
normalized_content: str = ""
|
||||
canonical_key: str = ""
|
||||
relation: str = "general"
|
||||
contradiction_group: str = ""
|
||||
status: str = "active"
|
||||
provenance: str = ""
|
||||
observed_at: str = ""
|
||||
evidence: List[Dict[str, Any]] = field(default_factory=list)
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not self.timestamp:
|
||||
self.timestamp = time.time()
|
||||
if not self.observed_at:
|
||||
self.observed_at = _iso_from_timestamp(self.timestamp)
|
||||
if not self.normalized_content:
|
||||
self.normalized_content = _normalize_value(self.content)
|
||||
if not self.provenance:
|
||||
self.provenance = f"conversation:{self.source_role}:{self.source_turn}"
|
||||
if not self.canonical_key:
|
||||
self.canonical_key = _canonical_key(self.entity, self.relation, self.normalized_content)
|
||||
if not self.evidence:
|
||||
self.evidence = [
|
||||
{
|
||||
"source_role": self.source_role,
|
||||
"source_turn": self.source_turn,
|
||||
"source_text": self.source_text or self.content,
|
||||
"observed_at": self.observed_at,
|
||||
"provenance": self.provenance,
|
||||
}
|
||||
]
|
||||
self.metadata = dict(self.metadata or {})
|
||||
self.metadata.setdefault("entity", self.entity)
|
||||
self.metadata.setdefault("relation", self.relation)
|
||||
self.metadata.setdefault("value", self.content)
|
||||
self.metadata.setdefault("normalized_value", self.normalized_content)
|
||||
self.metadata.setdefault("provenance", [self.provenance])
|
||||
self.metadata.setdefault("evidence", list(self.evidence))
|
||||
self.metadata.setdefault("observation_count", len(self.evidence))
|
||||
self.metadata.setdefault("duplicate_count", max(0, self.metadata["observation_count"] - 1))
|
||||
if self.contradiction_group:
|
||||
self.metadata.setdefault("contradiction_group", self.contradiction_group)
|
||||
self.metadata.setdefault("status", self.status)
|
||||
|
||||
# Patterns that indicate user preferences
|
||||
_PREFERENCE_PATTERNS = [
|
||||
(r"(?:I|we) (?:prefer|like|want|need) (.+?)(?:\.|$)", "preference"),
|
||||
(r"(?:always|never) (?:use|do|run|deploy) (.+?)(?:\.|$)", "preference"),
|
||||
(r"(?:my|our) (?:default|preferred|usual) (.+?) (?:is|are) (.+?)(?:\.|$)", "preference"),
|
||||
(r"(?:make sure|ensure|remember) (?:to|that) (.+?)(?:\.|$)", "instruction"),
|
||||
(r"(?:don'?t|do not) (?:ever|ever again) (.+?)(?:\.|$)", "constraint"),
|
||||
]
|
||||
|
||||
# Patterns that indicate corrections
|
||||
_CORRECTION_PATTERNS = [
|
||||
(r"(?:actually|no[, ]|wait[, ]|correction[: ]|sorry[, ]) (.+)", "correction"),
|
||||
(r"(?:I meant|what I meant was|the correct) (.+?)(?:\.|$)", "correction"),
|
||||
(r"(?:it'?s|its) (?:not|shouldn'?t be|wrong) (.+?)(?:\.|$)", "correction"),
|
||||
]
|
||||
|
||||
# Patterns that indicate project/tool facts
|
||||
_PROJECT_PATTERNS = [
|
||||
(r"(?:the |our )?(?:project|repo|codebase|code) (?:is|uses|needs|requires) (.+?)(?:\.|$)", "project"),
|
||||
(r"(?:deploy|push|commit) (?:to|on) (.+?)(?:\.|$)", "project"),
|
||||
(r"(?:this|that|the) (?:server|host|machine|VPS) (?:is|runs|has) (.+?)(?:\.|$)", "infrastructure"),
|
||||
(r"(?:model|provider|engine) (?:is|should be|needs to be) (.+?)(?:\.|$)", "config"),
|
||||
]
|
||||
|
||||
|
||||
def extract_facts_from_messages(messages: List[Dict[str, Any]]) -> List[ExtractedFact]:
|
||||
"""Extract durable facts from conversation messages.
|
||||
|
||||
Scans conversation turns for preferences, decisions, corrections, and
|
||||
operational state. Raw candidates are normalized into canonical facts so
|
||||
near-duplicates merge and contradictions remain inspectable.
|
||||
Scans user messages for preferences, corrections, project facts,
|
||||
and infrastructure details that should survive compression.
|
||||
"""
|
||||
facts = []
|
||||
seen_contents = set()
|
||||
|
||||
raw_candidates: list[ExtractedFact] = []
|
||||
for turn_idx, msg in enumerate(messages):
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
if role not in {"user", "assistant"}:
|
||||
|
||||
# Only scan user messages and assistant responses with corrections
|
||||
if role not in ("user", "assistant"):
|
||||
continue
|
||||
if not content or not isinstance(content, str):
|
||||
continue
|
||||
if len(content) < 10:
|
||||
continue
|
||||
|
||||
# Skip tool results and system messages
|
||||
if role == "assistant" and msg.get("tool_calls"):
|
||||
continue
|
||||
if not isinstance(content, str) or len(content.strip()) < 10:
|
||||
continue
|
||||
|
||||
timestamp, observed_at = _message_time(msg)
|
||||
raw_candidates.extend(
|
||||
_extract_from_text(
|
||||
content.strip(),
|
||||
turn_idx=turn_idx,
|
||||
role=role,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
)
|
||||
)
|
||||
extracted = _extract_from_text(content, turn_idx, role)
|
||||
|
||||
return _normalize_candidates(raw_candidates)
|
||||
|
||||
|
||||
def evaluate_extraction_quality(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""Return before/after metrics for raw vs normalized extraction quality."""
|
||||
|
||||
raw_candidates: list[ExtractedFact] = []
|
||||
for turn_idx, msg in enumerate(messages):
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
if role not in {"user", "assistant"}:
|
||||
continue
|
||||
if role == "assistant" and msg.get("tool_calls"):
|
||||
continue
|
||||
if not isinstance(content, str) or len(content.strip()) < 10:
|
||||
continue
|
||||
timestamp, observed_at = _message_time(msg)
|
||||
raw_candidates.extend(
|
||||
_extract_from_text(
|
||||
content.strip(),
|
||||
turn_idx=turn_idx,
|
||||
role=role,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
)
|
||||
)
|
||||
|
||||
normalized = _normalize_candidates(raw_candidates)
|
||||
raw_count = len(raw_candidates)
|
||||
normalized_count = len(normalized)
|
||||
contradiction_groups = {
|
||||
fact.contradiction_group
|
||||
for fact in normalized
|
||||
if fact.status == "contradiction" and fact.contradiction_group
|
||||
}
|
||||
duplicate_count = max(0, raw_count - normalized_count)
|
||||
noise_reduction = (duplicate_count / raw_count) if raw_count else 0.0
|
||||
|
||||
return {
|
||||
"raw_candidates": raw_count,
|
||||
"normalized_facts": normalized_count,
|
||||
"duplicates_merged": duplicate_count,
|
||||
"contradiction_groups": len(contradiction_groups),
|
||||
"noise_reduction": round(noise_reduction, 3),
|
||||
}
|
||||
|
||||
|
||||
def _extract_from_text(
|
||||
text: str,
|
||||
*,
|
||||
turn_idx: int,
|
||||
role: str,
|
||||
timestamp: float,
|
||||
observed_at: str,
|
||||
) -> List[ExtractedFact]:
|
||||
"""Extract raw fact candidates from a single text block."""
|
||||
|
||||
facts: list[ExtractedFact] = []
|
||||
if role != "user":
|
||||
return facts
|
||||
|
||||
deploy_match = _DEPLOY_METHOD_RE.search(text)
|
||||
if deploy_match:
|
||||
method = deploy_match.group(1).strip()
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.decision",
|
||||
entity="project",
|
||||
relation="workflow.deploy_method",
|
||||
value=method,
|
||||
content=f"Deploy via {method}",
|
||||
confidence=0.88,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=True,
|
||||
)
|
||||
)
|
||||
|
||||
watchdog_match = _WATCHDOG_CAP_RE.search(text)
|
||||
if watchdog_match:
|
||||
watchdog = watchdog_match.group(1).strip()
|
||||
cap = watchdog_match.group(2).strip()
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.operational",
|
||||
entity=_normalize_entity(watchdog),
|
||||
relation="fleet.dispatch_cap",
|
||||
value=cap,
|
||||
content=f"{watchdog} caps dispatches per cycle to {cap}",
|
||||
confidence=0.92,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=True,
|
||||
)
|
||||
)
|
||||
|
||||
provider_match = _PROVIDER_RE.search(text)
|
||||
if provider_match:
|
||||
provider = provider_match.group(1).strip()
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.config",
|
||||
entity="project",
|
||||
relation="config.provider",
|
||||
value=provider,
|
||||
content=f"Provider should stay {provider}",
|
||||
confidence=0.91,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=True,
|
||||
)
|
||||
)
|
||||
|
||||
model_match = _MODEL_RE.search(text)
|
||||
if model_match:
|
||||
model = model_match.group(1).strip()
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.config",
|
||||
entity="project",
|
||||
relation="config.model",
|
||||
value=model,
|
||||
content=f"Model should stay {model}",
|
||||
confidence=0.9,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=True,
|
||||
)
|
||||
)
|
||||
|
||||
port_match = _PORT_RE.search(text)
|
||||
if port_match:
|
||||
port = port_match.group(1).strip()
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.config",
|
||||
entity="project",
|
||||
relation="config.port",
|
||||
value=port,
|
||||
content=f"Port is {port}",
|
||||
confidence=0.9,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=True,
|
||||
)
|
||||
)
|
||||
|
||||
project_match = _PROJECT_USES_RE.search(text)
|
||||
if project_match:
|
||||
value = project_match.group(1).strip().rstrip(".")
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.stack",
|
||||
entity="project",
|
||||
relation="project.stack",
|
||||
value=value,
|
||||
content=f"Project uses {value}",
|
||||
confidence=0.74,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=False,
|
||||
)
|
||||
)
|
||||
|
||||
preference_match = _PREFERENCE_RE.search(text)
|
||||
if preference_match:
|
||||
value = preference_match.group(1).strip().rstrip(".")
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="user_pref.preference",
|
||||
entity="user",
|
||||
relation="user.preference",
|
||||
value=value,
|
||||
content=value,
|
||||
confidence=0.72,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=False,
|
||||
)
|
||||
)
|
||||
|
||||
constraint_match = _CONSTRAINT_RE.search(text)
|
||||
if constraint_match:
|
||||
value = constraint_match.group(1).strip().rstrip(".")
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="user_pref.constraint",
|
||||
entity="user",
|
||||
relation="user.constraint",
|
||||
value=value,
|
||||
content=f"Do not {value}",
|
||||
confidence=0.82,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=False,
|
||||
)
|
||||
)
|
||||
|
||||
decision_match = _DECISION_RE.search(text)
|
||||
if decision_match:
|
||||
value = decision_match.group(1).strip().rstrip(".")
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.decision",
|
||||
entity="project",
|
||||
relation="project.decision",
|
||||
value=value,
|
||||
content=f"Decision: {value}",
|
||||
confidence=0.79,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=False,
|
||||
)
|
||||
)
|
||||
# Deduplicate by content
|
||||
for fact in extracted:
|
||||
key = f"{fact.category}:{fact.content[:100]}"
|
||||
if key not in seen_contents:
|
||||
seen_contents.add(key)
|
||||
facts.append(fact)
|
||||
|
||||
return facts
|
||||
|
||||
|
||||
def _build_fact(
|
||||
*,
|
||||
category: str,
|
||||
entity: str,
|
||||
relation: str,
|
||||
value: str,
|
||||
content: str,
|
||||
confidence: float,
|
||||
source_turn: int,
|
||||
source_role: str,
|
||||
source_text: str,
|
||||
timestamp: float,
|
||||
observed_at: str,
|
||||
unique_slot: bool,
|
||||
) -> ExtractedFact:
|
||||
normalized_value = _normalize_value(value.rstrip(".!?"))
|
||||
value = value.rstrip(".!?")
|
||||
content = content.rstrip(".!?")
|
||||
provenance = f"conversation:{source_role}:{source_turn}"
|
||||
contradiction_group = relation if unique_slot else ""
|
||||
evidence = [
|
||||
{
|
||||
"source_role": source_role,
|
||||
"source_turn": source_turn,
|
||||
"source_text": source_text,
|
||||
"observed_at": observed_at,
|
||||
"provenance": provenance,
|
||||
}
|
||||
]
|
||||
metadata = {
|
||||
"entity": entity,
|
||||
"relation": relation,
|
||||
"value": value,
|
||||
"normalized_value": normalized_value,
|
||||
"provenance": [provenance],
|
||||
"evidence": list(evidence),
|
||||
"observation_count": 1,
|
||||
"duplicate_count": 0,
|
||||
"status": "active",
|
||||
}
|
||||
if contradiction_group:
|
||||
metadata["contradiction_group"] = contradiction_group
|
||||
return ExtractedFact(
|
||||
category=category,
|
||||
entity=entity,
|
||||
content=content,
|
||||
confidence=confidence,
|
||||
source_turn=source_turn,
|
||||
timestamp=timestamp,
|
||||
source_role=source_role,
|
||||
source_text=source_text,
|
||||
normalized_content=normalized_value,
|
||||
canonical_key=_canonical_key(entity, relation, normalized_value),
|
||||
relation=relation,
|
||||
contradiction_group=contradiction_group,
|
||||
status="active",
|
||||
provenance=provenance,
|
||||
observed_at=observed_at,
|
||||
evidence=evidence,
|
||||
metadata=metadata,
|
||||
)
|
||||
def _extract_from_text(text: str, turn_idx: int, role: str) -> List[ExtractedFact]:
|
||||
"""Extract facts from a single text block."""
|
||||
facts = []
|
||||
timestamp = time.time()
|
||||
|
||||
# Clean text for pattern matching
|
||||
clean = text.strip()
|
||||
|
||||
def _normalize_candidates(candidates: List[ExtractedFact]) -> List[ExtractedFact]:
|
||||
"""Merge duplicates and mark contradictions while preserving evidence."""
|
||||
# User preference patterns (from user messages)
|
||||
if role == "user":
|
||||
for pattern, subcategory in _PREFERENCE_PATTERNS:
|
||||
for match in re.finditer(pattern, clean, re.IGNORECASE):
|
||||
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
|
||||
if len(content) > 5:
|
||||
facts.append(ExtractedFact(
|
||||
category=f"user_pref.{subcategory}",
|
||||
entity="user",
|
||||
content=content[:200],
|
||||
confidence=0.7,
|
||||
source_turn=turn_idx,
|
||||
timestamp=timestamp,
|
||||
))
|
||||
|
||||
by_key: dict[str, ExtractedFact] = {}
|
||||
contradiction_groups: dict[str, list[ExtractedFact]] = {}
|
||||
# Correction patterns (from user messages)
|
||||
if role == "user":
|
||||
for pattern, subcategory in _CORRECTION_PATTERNS:
|
||||
for match in re.finditer(pattern, clean, re.IGNORECASE):
|
||||
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
|
||||
if len(content) > 5:
|
||||
facts.append(ExtractedFact(
|
||||
category=f"correction.{subcategory}",
|
||||
entity="user",
|
||||
content=content[:200],
|
||||
confidence=0.8,
|
||||
source_turn=turn_idx,
|
||||
timestamp=timestamp,
|
||||
))
|
||||
|
||||
for candidate in candidates:
|
||||
existing = by_key.get(candidate.canonical_key)
|
||||
if existing is not None:
|
||||
by_key[candidate.canonical_key] = _merge_fact(existing, candidate)
|
||||
continue
|
||||
# Project/infrastructure patterns (from both user and assistant)
|
||||
for pattern, subcategory in _PROJECT_PATTERNS:
|
||||
for match in re.finditer(pattern, clean, re.IGNORECASE):
|
||||
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
|
||||
if len(content) > 5:
|
||||
facts.append(ExtractedFact(
|
||||
category=f"project.{subcategory}",
|
||||
entity=subcategory,
|
||||
content=content[:200],
|
||||
confidence=0.6,
|
||||
source_turn=turn_idx,
|
||||
timestamp=timestamp,
|
||||
))
|
||||
|
||||
by_key[candidate.canonical_key] = candidate
|
||||
if candidate.contradiction_group:
|
||||
contradiction_groups.setdefault(candidate.contradiction_group, []).append(candidate)
|
||||
|
||||
for group, facts in contradiction_groups.items():
|
||||
canonical_keys = {fact.canonical_key for fact in facts}
|
||||
if len(canonical_keys) <= 1:
|
||||
continue
|
||||
for fact in facts:
|
||||
fact.status = "contradiction"
|
||||
fact.metadata["status"] = "contradiction"
|
||||
fact.metadata["contradiction_group"] = group
|
||||
fact.metadata["contradiction_keys"] = sorted(canonical_keys - {fact.canonical_key})
|
||||
|
||||
return sorted(by_key.values(), key=lambda fact: (fact.source_turn, fact.timestamp, fact.canonical_key))
|
||||
|
||||
|
||||
def _merge_fact(existing: ExtractedFact, incoming: ExtractedFact) -> ExtractedFact:
|
||||
existing.confidence = max(existing.confidence, incoming.confidence)
|
||||
existing.timestamp = min(existing.timestamp, incoming.timestamp)
|
||||
existing.source_turn = min(existing.source_turn, incoming.source_turn)
|
||||
if not existing.observed_at or (incoming.observed_at and incoming.observed_at < existing.observed_at):
|
||||
existing.observed_at = incoming.observed_at
|
||||
existing.provenance = min(existing.provenance, incoming.provenance)
|
||||
|
||||
provenance = _ordered_unique(existing.metadata.get("provenance", []), incoming.metadata.get("provenance", []))
|
||||
evidence = _merge_evidence(existing.metadata.get("evidence", []), incoming.metadata.get("evidence", []))
|
||||
observation_count = int(existing.metadata.get("observation_count", len(existing.evidence) or 1))
|
||||
observation_count += int(incoming.metadata.get("observation_count", len(incoming.evidence) or 1))
|
||||
|
||||
existing.evidence = evidence
|
||||
existing.metadata["provenance"] = provenance
|
||||
existing.metadata["evidence"] = evidence
|
||||
existing.metadata["observation_count"] = observation_count
|
||||
existing.metadata["duplicate_count"] = max(0, observation_count - 1)
|
||||
existing.metadata["status"] = existing.status
|
||||
return existing
|
||||
return facts
|
||||
|
||||
|
||||
def save_facts_to_store(facts: List[ExtractedFact], fact_store_fn=None) -> int:
|
||||
"""Save extracted facts to the fact store.
|
||||
|
||||
If a callback is supplied, prefer the structured signature but fall back to
|
||||
the legacy four-argument callback for compatibility.
|
||||
Args:
|
||||
facts: List of extracted facts.
|
||||
fact_store_fn: Optional callable(category, entity, content, trust).
|
||||
If None, uses the holographic fact store if available.
|
||||
|
||||
Returns:
|
||||
Number of facts saved.
|
||||
"""
|
||||
|
||||
saved = 0
|
||||
for fact in facts:
|
||||
payload = {
|
||||
"category": _store_category(fact.category),
|
||||
"entity": fact.entity,
|
||||
"content": fact.content,
|
||||
"trust": fact.confidence,
|
||||
"metadata": dict(fact.metadata),
|
||||
"canonical_key": fact.canonical_key,
|
||||
"observed_at": fact.observed_at,
|
||||
"source_role": fact.source_role,
|
||||
"source_turn": fact.source_turn,
|
||||
"contradiction_group": fact.contradiction_group,
|
||||
"status": fact.status,
|
||||
"relation": fact.relation,
|
||||
}
|
||||
|
||||
if fact_store_fn:
|
||||
if fact_store_fn:
|
||||
for fact in facts:
|
||||
try:
|
||||
fact_store_fn(**payload)
|
||||
fact_store_fn(
|
||||
category=fact.category,
|
||||
entity=fact.entity,
|
||||
content=fact.content,
|
||||
trust=fact.confidence,
|
||||
)
|
||||
saved += 1
|
||||
continue
|
||||
except TypeError:
|
||||
try:
|
||||
fact_store_fn(payload["category"], payload["entity"], payload["content"], payload["trust"])
|
||||
saved += 1
|
||||
continue
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to save fact via callback: %s", exc)
|
||||
continue
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to save fact via callback: %s", exc)
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.debug("Failed to save fact: %s", e)
|
||||
else:
|
||||
# Try holographic fact store
|
||||
try:
|
||||
from fact_store import fact_store as _fs
|
||||
|
||||
tags = ",".join(filter(None, [fact.entity, fact.relation, fact.status]))
|
||||
_fs(
|
||||
action="add",
|
||||
content=fact.content,
|
||||
category=_store_category(fact.category),
|
||||
tags=tags,
|
||||
trust_delta=fact.confidence - 0.5,
|
||||
)
|
||||
saved += 1
|
||||
for fact in facts:
|
||||
try:
|
||||
_fs(
|
||||
action="add",
|
||||
content=fact.content,
|
||||
category=fact.category,
|
||||
tags=fact.entity,
|
||||
trust_delta=fact.confidence - 0.5,
|
||||
)
|
||||
saved += 1
|
||||
except Exception as e:
|
||||
logger.debug("Failed to save fact via fact_store: %s", e)
|
||||
except ImportError:
|
||||
logger.debug("fact_store not available — facts not persisted")
|
||||
break
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to save fact via fact_store: %s", exc)
|
||||
|
||||
return saved
|
||||
|
||||
@@ -553,10 +204,9 @@ def extract_and_save_facts(
|
||||
|
||||
Returns (extracted_facts, saved_count).
|
||||
"""
|
||||
|
||||
facts = extract_facts_from_messages(messages)
|
||||
if facts:
|
||||
logger.info("Extracted %d normalized facts from conversation", len(facts))
|
||||
logger.info("Extracted %d facts from conversation", len(facts))
|
||||
saved = save_facts_to_store(facts, fact_store_fn)
|
||||
logger.info("Saved %d/%d facts to store", saved, len(facts))
|
||||
else:
|
||||
@@ -566,105 +216,16 @@ def extract_and_save_facts(
|
||||
|
||||
def format_facts_summary(facts: List[ExtractedFact]) -> str:
|
||||
"""Format extracted facts as a readable summary."""
|
||||
|
||||
if not facts:
|
||||
return "No facts extracted."
|
||||
|
||||
by_category: dict[str, list[ExtractedFact]] = {}
|
||||
for fact in facts:
|
||||
by_category.setdefault(fact.category, []).append(fact)
|
||||
by_category = {}
|
||||
for f in facts:
|
||||
by_category.setdefault(f.category, []).append(f)
|
||||
|
||||
lines = [f"Extracted {len(facts)} facts:", ""]
|
||||
for category, category_facts in sorted(by_category.items()):
|
||||
lines.append(f" {category}:")
|
||||
for fact in category_facts:
|
||||
suffix = f" [{fact.status}]" if fact.status != "active" else ""
|
||||
lines.append(f" - {fact.content[:80]}{suffix}")
|
||||
for cat, cat_facts in sorted(by_category.items()):
|
||||
lines.append(f" {cat}:")
|
||||
for f in cat_facts:
|
||||
lines.append(f" - {f.content[:80]}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _store_category(category: str) -> str:
|
||||
if category.startswith("user_pref"):
|
||||
return "user_pref"
|
||||
if category.startswith("project"):
|
||||
return "project"
|
||||
if category.startswith("tool"):
|
||||
return "tool"
|
||||
return "general"
|
||||
|
||||
|
||||
def _message_time(msg: Dict[str, Any]) -> Tuple[float, str]:
|
||||
for key in ("created_at", "timestamp", "time"):
|
||||
value = msg.get(key)
|
||||
if value is None:
|
||||
continue
|
||||
if isinstance(value, (int, float)):
|
||||
ts = float(value)
|
||||
return ts, _iso_from_timestamp(ts)
|
||||
if isinstance(value, str):
|
||||
parsed = _parse_time_string(value)
|
||||
if parsed is not None:
|
||||
return parsed, _iso_from_timestamp(parsed) if "T" not in value else value.replace("+00:00", "Z")
|
||||
return time.time(), value
|
||||
now = time.time()
|
||||
return now, _iso_from_timestamp(now)
|
||||
|
||||
|
||||
def _parse_time_string(value: str) -> float | None:
|
||||
text = value.strip()
|
||||
if not text:
|
||||
return None
|
||||
try:
|
||||
return float(text)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
normalized = text[:-1] + "+00:00" if text.endswith("Z") else text
|
||||
return datetime.fromisoformat(normalized).timestamp()
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def _iso_from_timestamp(value: float) -> str:
|
||||
return datetime.fromtimestamp(value, tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
|
||||
def _normalize_value(value: str) -> str:
|
||||
normalized = re.sub(r"[^a-z0-9]+", " ", value.lower())
|
||||
normalized = re.sub(r"\s+", " ", normalized).strip()
|
||||
return normalized
|
||||
|
||||
|
||||
def _normalize_entity(value: str) -> str:
|
||||
return _normalize_value(value).replace(" ", "_") or "entity"
|
||||
|
||||
|
||||
def _canonical_key(entity: str, relation: str, normalized_value: str) -> str:
|
||||
return f"{entity}|{relation}|{normalized_value}"
|
||||
|
||||
|
||||
def _ordered_unique(*groups: List[str]) -> List[str]:
|
||||
seen: set[str] = set()
|
||||
ordered: list[str] = []
|
||||
for group in groups:
|
||||
for item in group:
|
||||
if item and item not in seen:
|
||||
seen.add(item)
|
||||
ordered.append(item)
|
||||
return ordered
|
||||
|
||||
|
||||
def _merge_evidence(existing: List[Dict[str, Any]], incoming: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
seen: set[tuple[str, str, str]] = set()
|
||||
merged: list[dict[str, Any]] = []
|
||||
for item in list(existing) + list(incoming):
|
||||
key = (
|
||||
str(item.get("provenance", "")),
|
||||
str(item.get("observed_at", "")),
|
||||
str(item.get("source_text", "")),
|
||||
)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
merged.append(dict(item))
|
||||
return merged
|
||||
|
||||
@@ -26,6 +26,7 @@ from agent.memory_provider import MemoryProvider
|
||||
from tools.registry import tool_error
|
||||
from .store import MemoryStore
|
||||
from .retrieval import FactRetriever
|
||||
from .observations import ObservationSynthesizer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -37,28 +38,29 @@ logger = logging.getLogger(__name__)
|
||||
FACT_STORE_SCHEMA = {
|
||||
"name": "fact_store",
|
||||
"description": (
|
||||
"Deep structured memory with algebraic reasoning. "
|
||||
"Deep structured memory with algebraic reasoning and grounded observation synthesis. "
|
||||
"Use alongside the memory tool — memory for always-on context, "
|
||||
"fact_store for deep recall and compositional queries.\n\n"
|
||||
"fact_store for deep recall, compositional queries, and higher-order observations.\n\n"
|
||||
"ACTIONS (simple → powerful):\n"
|
||||
"• add — Store a fact the user would expect you to remember.\n"
|
||||
"• search — Keyword lookup ('editor config', 'deploy process').\n"
|
||||
"• probe — Entity recall: ALL facts about a person/thing.\n"
|
||||
"• related — What connects to an entity? Structural adjacency.\n"
|
||||
"• reason — Compositional: facts connected to MULTIPLE entities simultaneously.\n"
|
||||
"• observe — Synthesized higher-order observations backed by supporting facts.\n"
|
||||
"• contradict — Memory hygiene: find facts making conflicting claims.\n"
|
||||
"• update/remove/list — CRUD operations.\n\n"
|
||||
"IMPORTANT: Before answering questions about the user, ALWAYS probe or reason first."
|
||||
"IMPORTANT: Before answering questions about the user, ALWAYS probe/reason/observe first."
|
||||
),
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"action": {
|
||||
"type": "string",
|
||||
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list"],
|
||||
"enum": ["add", "search", "probe", "related", "reason", "observe", "contradict", "update", "remove", "list"],
|
||||
},
|
||||
"content": {"type": "string", "description": "Fact content (required for 'add')."},
|
||||
"query": {"type": "string", "description": "Search query (required for 'search')."},
|
||||
"query": {"type": "string", "description": "Search query (required for 'search'/'observe')."},
|
||||
"entity": {"type": "string", "description": "Entity name for 'probe'/'related'."},
|
||||
"entities": {"type": "array", "items": {"type": "string"}, "description": "Entity names for 'reason'."},
|
||||
"fact_id": {"type": "integer", "description": "Fact ID for 'update'/'remove'."},
|
||||
@@ -66,6 +68,12 @@ FACT_STORE_SCHEMA = {
|
||||
"tags": {"type": "string", "description": "Comma-separated tags."},
|
||||
"trust_delta": {"type": "number", "description": "Trust adjustment for 'update'."},
|
||||
"min_trust": {"type": "number", "description": "Minimum trust filter (default: 0.3)."},
|
||||
"min_confidence": {"type": "number", "description": "Minimum observation confidence (default: 0.6)."},
|
||||
"observation_type": {
|
||||
"type": "string",
|
||||
"enum": ["recurring_preference", "stable_direction", "behavioral_pattern"],
|
||||
"description": "Optional observation type filter for 'observe'.",
|
||||
},
|
||||
"limit": {"type": "integer", "description": "Max results (default: 10)."},
|
||||
},
|
||||
"required": ["action"],
|
||||
@@ -118,7 +126,9 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
self._config = config or _load_plugin_config()
|
||||
self._store = None
|
||||
self._retriever = None
|
||||
self._observation_synth = None
|
||||
self._min_trust = float(self._config.get("min_trust_threshold", 0.3))
|
||||
self._observation_min_confidence = float(self._config.get("observation_min_confidence", 0.6))
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
@@ -177,6 +187,7 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
hrr_weight=hrr_weight,
|
||||
hrr_dim=hrr_dim,
|
||||
)
|
||||
self._observation_synth = ObservationSynthesizer(self._store)
|
||||
self._session_id = session_id
|
||||
|
||||
def system_prompt_block(self) -> str:
|
||||
@@ -193,30 +204,76 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
"# Holographic Memory\n"
|
||||
"Active. Empty fact store — proactively add facts the user would expect you to remember.\n"
|
||||
"Use fact_store(action='add') to store durable structured facts about people, projects, preferences, decisions.\n"
|
||||
"Use fact_store(action='observe') to synthesize higher-order observations with evidence.\n"
|
||||
"Use fact_feedback to rate facts after using them (trains trust scores)."
|
||||
)
|
||||
return (
|
||||
f"# Holographic Memory\n"
|
||||
f"Active. {total} facts stored with entity resolution and trust scoring.\n"
|
||||
f"Use fact_store to search, probe entities, reason across entities, or add facts.\n"
|
||||
f"Use fact_store to search, probe entities, reason across entities, or synthesize observations.\n"
|
||||
f"Use fact_feedback to rate facts after using them (trains trust scores)."
|
||||
)
|
||||
|
||||
def prefetch(self, query: str, *, session_id: str = "") -> str:
|
||||
if not self._retriever or not query:
|
||||
if not query:
|
||||
return ""
|
||||
|
||||
parts = []
|
||||
raw_results = []
|
||||
try:
|
||||
results = self._retriever.search(query, min_trust=self._min_trust, limit=5)
|
||||
if not results:
|
||||
return ""
|
||||
if self._retriever:
|
||||
raw_results = self._retriever.search(query, min_trust=self._min_trust, limit=5)
|
||||
except Exception as e:
|
||||
logger.debug("Holographic prefetch fact search failed: %s", e)
|
||||
raw_results = []
|
||||
|
||||
observations = []
|
||||
try:
|
||||
if self._observation_synth:
|
||||
observations = self._observation_synth.observe(
|
||||
query,
|
||||
min_confidence=self._observation_min_confidence,
|
||||
limit=3,
|
||||
refresh=True,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Holographic prefetch observation search failed: %s", e)
|
||||
observations = []
|
||||
|
||||
if not raw_results and observations:
|
||||
seen_fact_ids = set()
|
||||
evidence_backfill = []
|
||||
for observation in observations:
|
||||
for evidence in observation.get("evidence", []):
|
||||
fact_id = evidence.get("fact_id")
|
||||
if fact_id in seen_fact_ids:
|
||||
continue
|
||||
seen_fact_ids.add(fact_id)
|
||||
evidence_backfill.append(evidence)
|
||||
raw_results = evidence_backfill[:5]
|
||||
|
||||
if raw_results:
|
||||
lines = []
|
||||
for r in results:
|
||||
for r in raw_results:
|
||||
trust = r.get("trust_score", r.get("trust", 0))
|
||||
lines.append(f"- [{trust:.1f}] {r.get('content', '')}")
|
||||
return "## Holographic Memory\n" + "\n".join(lines)
|
||||
except Exception as e:
|
||||
logger.debug("Holographic prefetch failed: %s", e)
|
||||
return ""
|
||||
parts.append("## Holographic Memory\n" + "\n".join(lines))
|
||||
|
||||
if observations:
|
||||
lines = []
|
||||
for observation in observations:
|
||||
evidence_ids = ", ".join(
|
||||
f"#{item['fact_id']}" for item in observation.get("evidence", [])[:3]
|
||||
) or "none"
|
||||
lines.append(
|
||||
f"- [{observation.get('confidence', 0.0):.2f}] "
|
||||
f"{observation.get('observation_type', 'observation')}: "
|
||||
f"{observation.get('summary', '')} "
|
||||
f"(evidence: {evidence_ids})"
|
||||
)
|
||||
parts.append("## Holographic Observations\n" + "\n".join(lines))
|
||||
|
||||
return "\n\n".join(parts)
|
||||
|
||||
def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None:
|
||||
# Holographic memory stores explicit facts via tools, not auto-sync.
|
||||
@@ -252,6 +309,7 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
def shutdown(self) -> None:
|
||||
self._store = None
|
||||
self._retriever = None
|
||||
self._observation_synth = None
|
||||
|
||||
# -- Tool handlers -------------------------------------------------------
|
||||
|
||||
@@ -305,6 +363,19 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
)
|
||||
return json.dumps({"results": results, "count": len(results)})
|
||||
|
||||
elif action == "observe":
|
||||
synthesizer = self._observation_synth
|
||||
if not synthesizer:
|
||||
return tool_error("Observation synthesizer is not initialized")
|
||||
observations = synthesizer.observe(
|
||||
args.get("query", ""),
|
||||
observation_type=args.get("observation_type"),
|
||||
min_confidence=float(args.get("min_confidence", self._observation_min_confidence)),
|
||||
limit=int(args.get("limit", 10)),
|
||||
refresh=True,
|
||||
)
|
||||
return json.dumps({"observations": observations, "count": len(observations)})
|
||||
|
||||
elif action == "contradict":
|
||||
results = retriever.contradict(
|
||||
category=args.get("category"),
|
||||
@@ -356,57 +427,44 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
# -- Auto-extraction (on_session_end) ------------------------------------
|
||||
|
||||
def _auto_extract_facts(self, messages: list) -> None:
|
||||
from agent.session_compactor import evaluate_extraction_quality, extract_facts_from_messages
|
||||
|
||||
def _store_category(category: str) -> str:
|
||||
if category.startswith("user_pref"):
|
||||
return "user_pref"
|
||||
if category.startswith("project"):
|
||||
return "project"
|
||||
if category.startswith("tool"):
|
||||
return "tool"
|
||||
return "general"
|
||||
|
||||
facts = extract_facts_from_messages(messages)
|
||||
if not facts:
|
||||
return
|
||||
_PREF_PATTERNS = [
|
||||
re.compile(r'\bI\s+(?:prefer|like|love|use|want|need)\s+(.+)', re.IGNORECASE),
|
||||
re.compile(r'\bmy\s+(?:favorite|preferred|default)\s+\w+\s+is\s+(.+)', re.IGNORECASE),
|
||||
re.compile(r'\bI\s+(?:always|never|usually)\s+(.+)', re.IGNORECASE),
|
||||
]
|
||||
_DECISION_PATTERNS = [
|
||||
re.compile(r'\bwe\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+)', re.IGNORECASE),
|
||||
re.compile(r'\bthe\s+project\s+(?:uses|needs|requires)\s+(.+)', re.IGNORECASE),
|
||||
]
|
||||
|
||||
extracted = 0
|
||||
for fact in facts:
|
||||
try:
|
||||
metadata = dict(fact.metadata)
|
||||
metadata.setdefault("relation", fact.relation)
|
||||
metadata.setdefault("value", fact.content)
|
||||
metadata.setdefault("provenance", [fact.provenance])
|
||||
metadata.setdefault("evidence", list(fact.evidence))
|
||||
metadata.setdefault("observation_count", len(fact.evidence))
|
||||
metadata.setdefault("duplicate_count", max(0, len(fact.evidence) - 1))
|
||||
self._store.add_fact(
|
||||
fact.content,
|
||||
category=_store_category(fact.category),
|
||||
tags=",".join(filter(None, [fact.entity, fact.relation, fact.status])),
|
||||
canonical_key=fact.canonical_key,
|
||||
metadata=metadata,
|
||||
confidence=fact.confidence,
|
||||
source_role=fact.source_role,
|
||||
source_turn=fact.source_turn,
|
||||
observed_at=fact.observed_at,
|
||||
contradiction_group=fact.contradiction_group,
|
||||
status=fact.status,
|
||||
)
|
||||
extracted += 1
|
||||
except Exception as exc:
|
||||
logger.debug("Structured auto-extract failed for %s: %s", fact.canonical_key, exc)
|
||||
for msg in messages:
|
||||
if msg.get("role") != "user":
|
||||
continue
|
||||
content = msg.get("content", "")
|
||||
if not isinstance(content, str) or len(content) < 10:
|
||||
continue
|
||||
|
||||
for pattern in _PREF_PATTERNS:
|
||||
if pattern.search(content):
|
||||
try:
|
||||
self._store.add_fact(content[:400], category="user_pref")
|
||||
extracted += 1
|
||||
except Exception:
|
||||
pass
|
||||
break
|
||||
|
||||
for pattern in _DECISION_PATTERNS:
|
||||
if pattern.search(content):
|
||||
try:
|
||||
self._store.add_fact(content[:400], category="project")
|
||||
extracted += 1
|
||||
except Exception:
|
||||
pass
|
||||
break
|
||||
|
||||
if extracted:
|
||||
metrics = evaluate_extraction_quality(messages)
|
||||
logger.info(
|
||||
"Auto-extracted %d structured facts from conversation (raw=%d normalized=%d contradictions=%d)",
|
||||
extracted,
|
||||
metrics["raw_candidates"],
|
||||
metrics["normalized_facts"],
|
||||
metrics["contradiction_groups"],
|
||||
)
|
||||
logger.info("Auto-extracted %d facts from conversation", extracted)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
249
plugins/memory/holographic/observations.py
Normal file
249
plugins/memory/holographic/observations.py
Normal file
@@ -0,0 +1,249 @@
|
||||
"""Higher-order observation synthesis for holographic memory.
|
||||
|
||||
Builds grounded observations from accumulated facts and keeps them in a
|
||||
separate retrieval layer with explicit evidence links back to supporting facts.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from .store import MemoryStore
|
||||
|
||||
_TOKEN_RE = re.compile(r"[a-z0-9_]+")
|
||||
_HIGHER_ORDER_CUES = {
|
||||
"prefer",
|
||||
"preference",
|
||||
"preferences",
|
||||
"style",
|
||||
"pattern",
|
||||
"patterns",
|
||||
"behavior",
|
||||
"behaviour",
|
||||
"habit",
|
||||
"habits",
|
||||
"workflow",
|
||||
"direction",
|
||||
"trajectory",
|
||||
"strategy",
|
||||
"tend",
|
||||
"usually",
|
||||
}
|
||||
|
||||
_OBSERVATION_PATTERNS = [
|
||||
{
|
||||
"observation_type": "recurring_preference",
|
||||
"subject": "communication_style",
|
||||
"categories": {"user_pref", "general"},
|
||||
"labels": {
|
||||
"concise": ["concise", "terse", "brief", "short", "no fluff"],
|
||||
"result_first": ["result-only", "result only", "outcome only", "quick", "quickly"],
|
||||
"silent_ops": ["silent", "no status", "no repetitive status", "no questions"],
|
||||
},
|
||||
"summary_prefix": "Recurring preference",
|
||||
},
|
||||
{
|
||||
"observation_type": "stable_direction",
|
||||
"subject": "project_direction",
|
||||
"categories": {"project", "general", "tool"},
|
||||
"labels": {
|
||||
"local_first": ["local-first", "local first", "local-only", "local only", "ollama", "own hardware"],
|
||||
"gitea_first": ["gitea-first", "gitea first", "forge", "pull request", "pr flow", "issue flow"],
|
||||
"ansible": ["ansible", "playbook", "role", "deploy via ansible"],
|
||||
},
|
||||
"summary_prefix": "Stable direction",
|
||||
},
|
||||
{
|
||||
"observation_type": "behavioral_pattern",
|
||||
"subject": "operator_workflow",
|
||||
"categories": {"general", "project", "tool", "user_pref"},
|
||||
"labels": {
|
||||
"commit_early": ["commit early", "commits early", "commit after", "wip commit"],
|
||||
"pr_first": ["open pr", "push a pr", "pull request", "pr immediately", "create pr"],
|
||||
"dedup_guard": ["no dupes", "no duplicates", "avoid duplicate", "existing pr"],
|
||||
},
|
||||
"summary_prefix": "Behavioral pattern",
|
||||
},
|
||||
]
|
||||
|
||||
_TYPE_QUERY_HINTS = {
|
||||
"recurring_preference": {"prefer", "preference", "style", "communication", "likes", "wants"},
|
||||
"stable_direction": {"direction", "trajectory", "strategy", "project", "roadmap", "moving"},
|
||||
"behavioral_pattern": {"pattern", "behavior", "workflow", "habit", "operator", "agent", "usually"},
|
||||
}
|
||||
|
||||
|
||||
class ObservationSynthesizer:
|
||||
"""Synthesizes grounded observations from facts and retrieves them by query."""
|
||||
|
||||
def __init__(self, store: MemoryStore):
|
||||
self.store = store
|
||||
|
||||
def synthesize(
|
||||
self,
|
||||
*,
|
||||
persist: bool = True,
|
||||
min_confidence: float = 0.6,
|
||||
limit: int = 10,
|
||||
) -> list[dict[str, Any]]:
|
||||
facts = self.store.list_facts(min_trust=0.0, limit=1000)
|
||||
observations: list[dict[str, Any]] = []
|
||||
|
||||
for pattern in _OBSERVATION_PATTERNS:
|
||||
candidate = self._build_candidate(pattern, facts, min_confidence=min_confidence)
|
||||
if not candidate:
|
||||
continue
|
||||
|
||||
if persist:
|
||||
candidate["observation_id"] = self.store.upsert_observation(
|
||||
candidate["observation_type"],
|
||||
candidate["subject"],
|
||||
candidate["summary"],
|
||||
candidate["confidence"],
|
||||
candidate["evidence_fact_ids"],
|
||||
metadata=candidate["metadata"],
|
||||
)
|
||||
|
||||
candidate["evidence"] = self._expand_evidence(candidate["evidence_fact_ids"])
|
||||
candidate["evidence_count"] = len(candidate["evidence"])
|
||||
candidate.pop("evidence_fact_ids", None)
|
||||
observations.append(candidate)
|
||||
|
||||
observations.sort(
|
||||
key=lambda item: (item["confidence"], item.get("evidence_count", 0)),
|
||||
reverse=True,
|
||||
)
|
||||
return observations[:limit]
|
||||
|
||||
def observe(
|
||||
self,
|
||||
query: str = "",
|
||||
*,
|
||||
observation_type: str | None = None,
|
||||
min_confidence: float = 0.6,
|
||||
limit: int = 10,
|
||||
refresh: bool = True,
|
||||
) -> list[dict[str, Any]]:
|
||||
if refresh:
|
||||
self.synthesize(persist=True, min_confidence=min_confidence, limit=limit)
|
||||
|
||||
observations = self.store.list_observations(
|
||||
observation_type=observation_type,
|
||||
min_confidence=min_confidence,
|
||||
limit=max(limit * 4, 20),
|
||||
)
|
||||
if not observations:
|
||||
return []
|
||||
|
||||
if not query:
|
||||
return observations[:limit]
|
||||
|
||||
query_tokens = self._tokenize(query)
|
||||
is_higher_order = bool(query_tokens & _HIGHER_ORDER_CUES)
|
||||
ranked: list[dict[str, Any]] = []
|
||||
|
||||
for item in observations:
|
||||
searchable = " ".join(
|
||||
[
|
||||
item.get("summary", ""),
|
||||
item.get("subject", ""),
|
||||
item.get("observation_type", ""),
|
||||
" ".join(item.get("metadata", {}).get("labels", [])),
|
||||
]
|
||||
)
|
||||
overlap = self._overlap_score(query_tokens, self._tokenize(searchable))
|
||||
type_bonus = self._type_bonus(query_tokens, item.get("observation_type", ""))
|
||||
if overlap <= 0 and type_bonus <= 0 and not is_higher_order:
|
||||
continue
|
||||
ranked_item = dict(item)
|
||||
ranked_item["score"] = round(item.get("confidence", 0.0) + overlap + type_bonus, 3)
|
||||
ranked.append(ranked_item)
|
||||
|
||||
if not ranked and is_higher_order:
|
||||
ranked = [
|
||||
{**item, "score": round(float(item.get("confidence", 0.0)), 3)}
|
||||
for item in observations
|
||||
]
|
||||
|
||||
ranked.sort(
|
||||
key=lambda item: (item.get("score", 0.0), item.get("confidence", 0.0), item.get("evidence_count", 0)),
|
||||
reverse=True,
|
||||
)
|
||||
return ranked[:limit]
|
||||
|
||||
def _build_candidate(
|
||||
self,
|
||||
pattern: dict[str, Any],
|
||||
facts: list[dict[str, Any]],
|
||||
*,
|
||||
min_confidence: float,
|
||||
) -> dict[str, Any] | None:
|
||||
matched_fact_ids: set[int] = set()
|
||||
matched_labels: dict[str, set[int]] = {label: set() for label in pattern["labels"]}
|
||||
|
||||
for fact in facts:
|
||||
if fact.get("category") not in pattern["categories"]:
|
||||
continue
|
||||
haystack = f"{fact.get('content', '')} {fact.get('tags', '')}".lower()
|
||||
local_match = False
|
||||
for label, keywords in pattern["labels"].items():
|
||||
if any(keyword in haystack for keyword in keywords):
|
||||
matched_labels[label].add(int(fact["fact_id"]))
|
||||
local_match = True
|
||||
if local_match:
|
||||
matched_fact_ids.add(int(fact["fact_id"]))
|
||||
|
||||
if len(matched_fact_ids) < 2:
|
||||
return None
|
||||
|
||||
active_labels = sorted(label for label, ids in matched_labels.items() if ids)
|
||||
confidence = min(0.95, 0.35 + 0.12 * len(matched_fact_ids) + 0.08 * len(active_labels))
|
||||
confidence = round(confidence, 3)
|
||||
if confidence < min_confidence:
|
||||
return None
|
||||
|
||||
label_summary = ", ".join(label.replace("_", "-") for label in active_labels)
|
||||
subject_text = pattern["subject"].replace("_", " ")
|
||||
summary = (
|
||||
f"{pattern['summary_prefix']}: {subject_text} trends toward {label_summary} "
|
||||
f"based on {len(matched_fact_ids)} supporting facts."
|
||||
)
|
||||
return {
|
||||
"observation_type": pattern["observation_type"],
|
||||
"subject": pattern["subject"],
|
||||
"summary": summary,
|
||||
"confidence": confidence,
|
||||
"metadata": {
|
||||
"labels": active_labels,
|
||||
"evidence_count": len(matched_fact_ids),
|
||||
},
|
||||
"evidence_fact_ids": sorted(matched_fact_ids),
|
||||
}
|
||||
|
||||
def _expand_evidence(self, fact_ids: list[int]) -> list[dict[str, Any]]:
|
||||
facts_by_id = {
|
||||
fact["fact_id"]: fact
|
||||
for fact in self.store.list_facts(min_trust=0.0, limit=1000)
|
||||
}
|
||||
return [facts_by_id[fact_id] for fact_id in fact_ids if fact_id in facts_by_id]
|
||||
|
||||
@staticmethod
|
||||
def _tokenize(text: str) -> set[str]:
|
||||
return set(_TOKEN_RE.findall(text.lower()))
|
||||
|
||||
@staticmethod
|
||||
def _overlap_score(query_tokens: set[str], text_tokens: set[str]) -> float:
|
||||
if not query_tokens or not text_tokens:
|
||||
return 0.0
|
||||
overlap = query_tokens & text_tokens
|
||||
if not overlap:
|
||||
return 0.0
|
||||
return round(len(overlap) / max(len(query_tokens), 1), 3)
|
||||
|
||||
@staticmethod
|
||||
def _type_bonus(query_tokens: set[str], observation_type: str) -> float:
|
||||
hints = _TYPE_QUERY_HINTS.get(observation_type, set())
|
||||
if not hints:
|
||||
return 0.0
|
||||
return 0.25 if query_tokens & hints else 0.0
|
||||
@@ -16,24 +16,16 @@ except ImportError:
|
||||
|
||||
_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS facts (
|
||||
fact_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
content TEXT NOT NULL UNIQUE,
|
||||
category TEXT DEFAULT 'general',
|
||||
tags TEXT DEFAULT '',
|
||||
trust_score REAL DEFAULT 0.5,
|
||||
retrieval_count INTEGER DEFAULT 0,
|
||||
helpful_count INTEGER DEFAULT 0,
|
||||
canonical_key TEXT DEFAULT '',
|
||||
metadata_json TEXT DEFAULT '{}',
|
||||
confidence REAL DEFAULT 0.5,
|
||||
source_role TEXT DEFAULT '',
|
||||
source_turn INTEGER DEFAULT -1,
|
||||
observed_at TEXT DEFAULT '',
|
||||
contradiction_group TEXT DEFAULT '',
|
||||
status TEXT DEFAULT 'active',
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
hrr_vector BLOB
|
||||
fact_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
content TEXT NOT NULL UNIQUE,
|
||||
category TEXT DEFAULT 'general',
|
||||
tags TEXT DEFAULT '',
|
||||
trust_score REAL DEFAULT 0.5,
|
||||
retrieval_count INTEGER DEFAULT 0,
|
||||
helpful_count INTEGER DEFAULT 0,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
hrr_vector BLOB
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS entities (
|
||||
@@ -50,11 +42,9 @@ CREATE TABLE IF NOT EXISTS fact_entities (
|
||||
PRIMARY KEY (fact_id, entity_id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_canonical_key ON facts(canonical_key);
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_contradiction_group ON facts(contradiction_group);
|
||||
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
|
||||
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
|
||||
|
||||
CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts
|
||||
USING fts5(content, tags, content=facts, content_rowid=fact_id);
|
||||
@@ -84,6 +74,28 @@ CREATE TABLE IF NOT EXISTS memory_banks (
|
||||
fact_count INTEGER DEFAULT 0,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS observations (
|
||||
observation_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
observation_type TEXT NOT NULL,
|
||||
subject TEXT NOT NULL,
|
||||
summary TEXT NOT NULL,
|
||||
confidence REAL DEFAULT 0.0,
|
||||
metadata_json TEXT DEFAULT '{}',
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(observation_type, subject)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS observation_evidence (
|
||||
observation_id INTEGER REFERENCES observations(observation_id) ON DELETE CASCADE,
|
||||
fact_id INTEGER REFERENCES facts(fact_id) ON DELETE CASCADE,
|
||||
evidence_weight REAL DEFAULT 1.0,
|
||||
PRIMARY KEY (observation_id, fact_id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_observations_type ON observations(observation_type);
|
||||
CREATE INDEX IF NOT EXISTS idx_observations_confidence ON observations(confidence DESC);
|
||||
"""
|
||||
|
||||
# Trust adjustment constants
|
||||
@@ -139,24 +151,12 @@ class MemoryStore:
|
||||
def _init_db(self) -> None:
|
||||
"""Create tables, indexes, and triggers if they do not exist. Enable WAL mode."""
|
||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||
self._conn.execute("PRAGMA foreign_keys=ON")
|
||||
self._conn.executescript(_SCHEMA)
|
||||
# Migrate: add hrr_vector column if missing (safe for existing databases)
|
||||
columns = {row[1] for row in self._conn.execute("PRAGMA table_info(facts)").fetchall()}
|
||||
migrations = {
|
||||
"hrr_vector": "ALTER TABLE facts ADD COLUMN hrr_vector BLOB",
|
||||
"canonical_key": "ALTER TABLE facts ADD COLUMN canonical_key TEXT DEFAULT ''",
|
||||
"metadata_json": "ALTER TABLE facts ADD COLUMN metadata_json TEXT DEFAULT '{}'",
|
||||
"confidence": "ALTER TABLE facts ADD COLUMN confidence REAL DEFAULT 0.5",
|
||||
"source_role": "ALTER TABLE facts ADD COLUMN source_role TEXT DEFAULT ''",
|
||||
"source_turn": "ALTER TABLE facts ADD COLUMN source_turn INTEGER DEFAULT -1",
|
||||
"observed_at": "ALTER TABLE facts ADD COLUMN observed_at TEXT DEFAULT ''",
|
||||
"contradiction_group": "ALTER TABLE facts ADD COLUMN contradiction_group TEXT DEFAULT ''",
|
||||
"status": "ALTER TABLE facts ADD COLUMN status TEXT DEFAULT 'active'",
|
||||
}
|
||||
for column, ddl in migrations.items():
|
||||
if column not in columns:
|
||||
self._conn.execute(ddl)
|
||||
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_canonical_key ON facts(canonical_key)")
|
||||
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_contradiction_group ON facts(contradiction_group)")
|
||||
if "hrr_vector" not in columns:
|
||||
self._conn.execute("ALTER TABLE facts ADD COLUMN hrr_vector BLOB")
|
||||
self._conn.commit()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
@@ -168,148 +168,41 @@ class MemoryStore:
|
||||
content: str,
|
||||
category: str = "general",
|
||||
tags: str = "",
|
||||
*,
|
||||
canonical_key: str = "",
|
||||
metadata: dict | None = None,
|
||||
confidence: float | None = None,
|
||||
source_role: str = "",
|
||||
source_turn: int = -1,
|
||||
observed_at: str = "",
|
||||
contradiction_group: str = "",
|
||||
status: str = "active",
|
||||
) -> int:
|
||||
"""Insert a fact and return its fact_id.
|
||||
|
||||
Exact duplicates are deduplicated by content. Near-duplicates are
|
||||
normalized by canonical_key, with provenance/evidence merged into the
|
||||
existing row. Contradictions sharing the same contradiction_group remain
|
||||
stored as separate rows and are marked inspectably.
|
||||
Deduplicates by content (UNIQUE constraint). On duplicate, returns
|
||||
the existing fact_id without modifying the row. Extracts entities from
|
||||
the content and links them to the fact.
|
||||
"""
|
||||
with self._lock:
|
||||
content = content.strip()
|
||||
if not content:
|
||||
raise ValueError("content must not be empty")
|
||||
|
||||
metadata = dict(metadata or {})
|
||||
canonical_key = canonical_key.strip()
|
||||
contradiction_group = contradiction_group.strip()
|
||||
observed_at = observed_at.strip()
|
||||
status = status or "active"
|
||||
trust_score = self.default_trust if confidence is None else _clamp_trust(confidence)
|
||||
metadata_json = json.dumps(metadata, sort_keys=True)
|
||||
|
||||
if canonical_key:
|
||||
existing = self._conn.execute(
|
||||
"SELECT fact_id, metadata_json, trust_score, confidence, observed_at FROM facts WHERE canonical_key = ?",
|
||||
(canonical_key,),
|
||||
).fetchone()
|
||||
if existing is not None:
|
||||
merged_metadata = self._merge_metadata(existing["metadata_json"], metadata)
|
||||
merged_trust = max(float(existing["trust_score"]), trust_score)
|
||||
merged_observed_at = existing["observed_at"] or observed_at
|
||||
if observed_at and merged_observed_at:
|
||||
merged_observed_at = min(merged_observed_at, observed_at)
|
||||
elif observed_at:
|
||||
merged_observed_at = observed_at
|
||||
self._conn.execute(
|
||||
"""
|
||||
UPDATE facts
|
||||
SET metadata_json = ?,
|
||||
trust_score = ?,
|
||||
confidence = ?,
|
||||
observed_at = ?,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE fact_id = ?
|
||||
""",
|
||||
(
|
||||
json.dumps(merged_metadata, sort_keys=True),
|
||||
merged_trust,
|
||||
max(float(existing["confidence"] or 0.0), confidence or trust_score),
|
||||
merged_observed_at,
|
||||
existing["fact_id"],
|
||||
),
|
||||
)
|
||||
self._conn.commit()
|
||||
return int(existing["fact_id"])
|
||||
|
||||
contradiction_rows = []
|
||||
if contradiction_group:
|
||||
contradiction_rows = self._conn.execute(
|
||||
"""
|
||||
SELECT fact_id, canonical_key, metadata_json
|
||||
FROM facts
|
||||
WHERE contradiction_group = ?
|
||||
AND canonical_key != ?
|
||||
""",
|
||||
(contradiction_group, canonical_key),
|
||||
).fetchall()
|
||||
if contradiction_rows:
|
||||
status = "contradiction"
|
||||
metadata = dict(metadata)
|
||||
metadata["status"] = "contradiction"
|
||||
metadata["contradiction_group"] = contradiction_group
|
||||
metadata["contradiction_keys"] = sorted(
|
||||
{
|
||||
canonical_key,
|
||||
*[str(row["canonical_key"]) for row in contradiction_rows if row["canonical_key"]],
|
||||
}
|
||||
- {""}
|
||||
)
|
||||
metadata_json = json.dumps(metadata, sort_keys=True)
|
||||
|
||||
try:
|
||||
cur = self._conn.execute(
|
||||
"""
|
||||
INSERT INTO facts (
|
||||
content,
|
||||
category,
|
||||
tags,
|
||||
trust_score,
|
||||
canonical_key,
|
||||
metadata_json,
|
||||
confidence,
|
||||
source_role,
|
||||
source_turn,
|
||||
observed_at,
|
||||
contradiction_group,
|
||||
status
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
INSERT INTO facts (content, category, tags, trust_score)
|
||||
VALUES (?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
content,
|
||||
category,
|
||||
tags,
|
||||
trust_score,
|
||||
canonical_key,
|
||||
metadata_json,
|
||||
confidence if confidence is not None else trust_score,
|
||||
source_role,
|
||||
source_turn,
|
||||
observed_at,
|
||||
contradiction_group,
|
||||
status,
|
||||
),
|
||||
(content, category, tags, self.default_trust),
|
||||
)
|
||||
self._conn.commit()
|
||||
fact_id: int = cur.lastrowid # type: ignore[assignment]
|
||||
except sqlite3.IntegrityError:
|
||||
# Duplicate content — return existing id
|
||||
row = self._conn.execute(
|
||||
"SELECT fact_id FROM facts WHERE content = ?", (content,)
|
||||
).fetchone()
|
||||
return int(row["fact_id"])
|
||||
|
||||
if contradiction_rows:
|
||||
self._mark_contradictions(
|
||||
contradiction_group=contradiction_group,
|
||||
new_canonical_key=canonical_key,
|
||||
existing_rows=contradiction_rows,
|
||||
)
|
||||
|
||||
# Entity extraction and linking
|
||||
for name in self._extract_entities(content):
|
||||
entity_id = self._resolve_entity(name)
|
||||
self._link_fact_entity(fact_id, entity_id)
|
||||
|
||||
# Compute HRR vector after entity linking
|
||||
self._compute_hrr_vector(fact_id, content)
|
||||
self._rebuild_bank(category)
|
||||
|
||||
@@ -342,9 +235,6 @@ class MemoryStore:
|
||||
sql = f"""
|
||||
SELECT f.fact_id, f.content, f.category, f.tags,
|
||||
f.trust_score, f.retrieval_count, f.helpful_count,
|
||||
f.canonical_key, f.metadata_json, f.confidence,
|
||||
f.source_role, f.source_turn, f.observed_at,
|
||||
f.contradiction_group, f.status,
|
||||
f.created_at, f.updated_at
|
||||
FROM facts f
|
||||
JOIN facts_fts fts ON fts.rowid = f.fact_id
|
||||
@@ -470,11 +360,7 @@ class MemoryStore:
|
||||
|
||||
sql = f"""
|
||||
SELECT fact_id, content, category, tags, trust_score,
|
||||
retrieval_count, helpful_count,
|
||||
canonical_key, metadata_json, confidence,
|
||||
source_role, source_turn, observed_at,
|
||||
contradiction_group, status,
|
||||
created_at, updated_at
|
||||
retrieval_count, helpful_count, created_at, updated_at
|
||||
FROM facts
|
||||
WHERE trust_score >= ?
|
||||
{category_clause}
|
||||
@@ -484,6 +370,115 @@ class MemoryStore:
|
||||
rows = self._conn.execute(sql, params).fetchall()
|
||||
return [self._row_to_dict(r) for r in rows]
|
||||
|
||||
def upsert_observation(
|
||||
self,
|
||||
observation_type: str,
|
||||
subject: str,
|
||||
summary: str,
|
||||
confidence: float,
|
||||
evidence_fact_ids: list[int],
|
||||
metadata: dict | None = None,
|
||||
) -> int:
|
||||
"""Create or update a synthesized observation and its evidence links."""
|
||||
with self._lock:
|
||||
metadata_json = json.dumps(metadata or {}, sort_keys=True)
|
||||
self._conn.execute(
|
||||
"""
|
||||
INSERT INTO observations (
|
||||
observation_type, subject, summary, confidence, metadata_json
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(observation_type, subject) DO UPDATE SET
|
||||
summary = excluded.summary,
|
||||
confidence = excluded.confidence,
|
||||
metadata_json = excluded.metadata_json,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
""",
|
||||
(observation_type, subject, summary, confidence, metadata_json),
|
||||
)
|
||||
row = self._conn.execute(
|
||||
"""
|
||||
SELECT observation_id
|
||||
FROM observations
|
||||
WHERE observation_type = ? AND subject = ?
|
||||
""",
|
||||
(observation_type, subject),
|
||||
).fetchone()
|
||||
observation_id = int(row["observation_id"])
|
||||
|
||||
self._conn.execute(
|
||||
"DELETE FROM observation_evidence WHERE observation_id = ?",
|
||||
(observation_id,),
|
||||
)
|
||||
unique_fact_ids = sorted({int(fid) for fid in evidence_fact_ids})
|
||||
if unique_fact_ids:
|
||||
self._conn.executemany(
|
||||
"""
|
||||
INSERT OR IGNORE INTO observation_evidence (observation_id, fact_id)
|
||||
VALUES (?, ?)
|
||||
""",
|
||||
[(observation_id, fact_id) for fact_id in unique_fact_ids],
|
||||
)
|
||||
self._conn.commit()
|
||||
return observation_id
|
||||
|
||||
def list_observations(
|
||||
self,
|
||||
observation_type: str | None = None,
|
||||
min_confidence: float = 0.0,
|
||||
limit: int = 50,
|
||||
) -> list[dict]:
|
||||
"""List synthesized observations with expanded supporting evidence."""
|
||||
with self._lock:
|
||||
params: list = [min_confidence]
|
||||
observation_clause = ""
|
||||
if observation_type is not None:
|
||||
observation_clause = "AND observation_type = ?"
|
||||
params.append(observation_type)
|
||||
params.append(limit)
|
||||
rows = self._conn.execute(
|
||||
f"""
|
||||
SELECT observation_id, observation_type, subject, summary, confidence,
|
||||
metadata_json, created_at, updated_at,
|
||||
(
|
||||
SELECT COUNT(*)
|
||||
FROM observation_evidence oe
|
||||
WHERE oe.observation_id = observations.observation_id
|
||||
) AS evidence_count
|
||||
FROM observations
|
||||
WHERE confidence >= ?
|
||||
{observation_clause}
|
||||
ORDER BY confidence DESC, updated_at DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
params,
|
||||
).fetchall()
|
||||
|
||||
results = []
|
||||
for row in rows:
|
||||
item = dict(row)
|
||||
try:
|
||||
item["metadata"] = json.loads(item.pop("metadata_json") or "{}")
|
||||
except json.JSONDecodeError:
|
||||
item["metadata"] = {}
|
||||
item["evidence"] = self._get_observation_evidence(int(item["observation_id"]))
|
||||
results.append(item)
|
||||
return results
|
||||
|
||||
def _get_observation_evidence(self, observation_id: int) -> list[dict]:
|
||||
rows = self._conn.execute(
|
||||
"""
|
||||
SELECT f.fact_id, f.content, f.category, f.tags, f.trust_score,
|
||||
f.retrieval_count, f.helpful_count, f.created_at, f.updated_at
|
||||
FROM observation_evidence oe
|
||||
JOIN facts f ON f.fact_id = oe.fact_id
|
||||
WHERE oe.observation_id = ?
|
||||
ORDER BY f.trust_score DESC, f.updated_at DESC
|
||||
""",
|
||||
(observation_id,),
|
||||
).fetchall()
|
||||
return [self._row_to_dict(row) for row in rows]
|
||||
|
||||
def record_feedback(self, fact_id: int, helpful: bool) -> dict:
|
||||
"""Record user feedback and adjust trust asymmetrically.
|
||||
|
||||
@@ -525,89 +520,6 @@ class MemoryStore:
|
||||
"helpful_count": row["helpful_count"] + helpful_increment,
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Metadata helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _load_metadata(self, metadata_json: str | None) -> dict:
|
||||
if not metadata_json:
|
||||
return {}
|
||||
try:
|
||||
data = json.loads(metadata_json)
|
||||
return data if isinstance(data, dict) else {}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
def _merge_metadata(self, existing_json: str | None, incoming: dict | None) -> dict:
|
||||
existing = self._load_metadata(existing_json)
|
||||
incoming = dict(incoming or {})
|
||||
merged = dict(existing)
|
||||
merged.update({k: v for k, v in incoming.items() if k not in {"provenance", "evidence", "observation_count", "duplicate_count", "contradiction_keys"}})
|
||||
|
||||
provenance = []
|
||||
seen_provenance: set[str] = set()
|
||||
for item in list(existing.get("provenance", [])) + list(incoming.get("provenance", [])):
|
||||
if item and item not in seen_provenance:
|
||||
seen_provenance.add(item)
|
||||
provenance.append(item)
|
||||
|
||||
evidence = []
|
||||
seen_evidence: set[tuple[str, str, str]] = set()
|
||||
for item in list(existing.get("evidence", [])) + list(incoming.get("evidence", [])):
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
key = (
|
||||
str(item.get("provenance", "")),
|
||||
str(item.get("observed_at", "")),
|
||||
str(item.get("source_text", "")),
|
||||
)
|
||||
if key in seen_evidence:
|
||||
continue
|
||||
seen_evidence.add(key)
|
||||
evidence.append(dict(item))
|
||||
|
||||
observation_count = int(existing.get("observation_count", max(1, len(existing.get("evidence", [])) or 1)))
|
||||
observation_count += int(incoming.get("observation_count", max(1, len(incoming.get("evidence", [])) or 1)))
|
||||
|
||||
contradiction_keys = []
|
||||
seen_keys: set[str] = set()
|
||||
for item in list(existing.get("contradiction_keys", [])) + list(incoming.get("contradiction_keys", [])):
|
||||
if item and item not in seen_keys:
|
||||
seen_keys.add(item)
|
||||
contradiction_keys.append(item)
|
||||
|
||||
merged["provenance"] = provenance
|
||||
merged["evidence"] = evidence
|
||||
merged["observation_count"] = observation_count
|
||||
merged["duplicate_count"] = max(0, observation_count - 1)
|
||||
if contradiction_keys:
|
||||
merged["contradiction_keys"] = contradiction_keys
|
||||
return merged
|
||||
|
||||
def _mark_contradictions(self, contradiction_group: str, new_canonical_key: str, existing_rows: list[sqlite3.Row]) -> None:
|
||||
for row in existing_rows:
|
||||
metadata = self._load_metadata(row["metadata_json"])
|
||||
keys = []
|
||||
seen: set[str] = set()
|
||||
for item in list(metadata.get("contradiction_keys", [])) + [new_canonical_key]:
|
||||
if item and item not in seen:
|
||||
seen.add(item)
|
||||
keys.append(item)
|
||||
metadata["status"] = "contradiction"
|
||||
metadata["contradiction_group"] = contradiction_group
|
||||
metadata["contradiction_keys"] = keys
|
||||
self._conn.execute(
|
||||
"""
|
||||
UPDATE facts
|
||||
SET status = 'contradiction',
|
||||
metadata_json = ?,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE fact_id = ?
|
||||
""",
|
||||
(json.dumps(metadata, sort_keys=True), row["fact_id"]),
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Entity helpers
|
||||
# ------------------------------------------------------------------
|
||||
@@ -781,14 +693,8 @@ class MemoryStore:
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _row_to_dict(self, row: sqlite3.Row) -> dict:
|
||||
"""Convert a sqlite3.Row to a plain dict with decoded metadata."""
|
||||
data = dict(row)
|
||||
metadata = self._load_metadata(data.get("metadata_json"))
|
||||
if metadata:
|
||||
data["metadata"] = metadata
|
||||
data.setdefault("relation", metadata.get("relation"))
|
||||
data.pop("metadata_json", None)
|
||||
return data
|
||||
"""Convert a sqlite3.Row to a plain dict."""
|
||||
return dict(row)
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the database connection."""
|
||||
|
||||
63
tests/fixtures/memory_extraction_fragments.json
vendored
63
tests/fixtures/memory_extraction_fragments.json
vendored
@@ -1,63 +0,0 @@
|
||||
{
|
||||
"preferences_and_duplicates": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Deploy via Ansible for production changes.",
|
||||
"created_at": "2026-04-22T10:00:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "We deploy through Ansible on this repo.",
|
||||
"created_at": "2026-04-22T10:01:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Gitea-first for repository work.",
|
||||
"created_at": "2026-04-22T10:02:00Z"
|
||||
}
|
||||
],
|
||||
"operational_and_contradictions": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "The BURN watchdog caps dispatches per cycle to 6.",
|
||||
"created_at": "2026-04-22T11:00:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "The provider should stay openai-codex/gpt-5.4.",
|
||||
"created_at": "2026-04-22T11:01:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Correction: the provider should stay mimo-v2-pro.",
|
||||
"created_at": "2026-04-22T11:02:00Z"
|
||||
}
|
||||
],
|
||||
"mixed_transcript": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Deploy via Ansible for production changes.",
|
||||
"created_at": "2026-04-22T10:00:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "We deploy through Ansible on this repo.",
|
||||
"created_at": "2026-04-22T10:01:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "The BURN watchdog caps dispatches per cycle to 6.",
|
||||
"created_at": "2026-04-22T11:00:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "The provider should stay openai-codex/gpt-5.4.",
|
||||
"created_at": "2026-04-22T11:01:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Correction: the provider should stay mimo-v2-pro.",
|
||||
"created_at": "2026-04-22T11:02:00Z"
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -1,50 +0,0 @@
|
||||
"""Integration tests for holographic auto-extraction with structured fact persistence."""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||||
|
||||
from plugins.memory.holographic import HolographicMemoryProvider
|
||||
|
||||
_FIXTURE_PATH = Path(__file__).resolve().parents[2] / "fixtures" / "memory_extraction_fragments.json"
|
||||
|
||||
|
||||
def _load_fixture(name: str):
|
||||
return json.loads(_FIXTURE_PATH.read_text())[name]
|
||||
|
||||
|
||||
class TestHolographicAutoExtract:
|
||||
def test_auto_extract_persists_structured_metadata_and_normalizes_duplicates(self, tmp_path):
|
||||
provider = HolographicMemoryProvider(
|
||||
config={
|
||||
"db_path": str(tmp_path / "memory_store.db"),
|
||||
"auto_extract": True,
|
||||
"default_trust": 0.5,
|
||||
}
|
||||
)
|
||||
provider.initialize("test-session")
|
||||
|
||||
messages = _load_fixture("mixed_transcript")
|
||||
provider.on_session_end(messages)
|
||||
provider.on_session_end(messages)
|
||||
|
||||
facts = provider._store.list_facts(min_trust=0.0, limit=20)
|
||||
deploy_facts = [f for f in facts if f.get("relation") == "workflow.deploy_method"]
|
||||
provider_facts = [f for f in facts if f.get("contradiction_group") == "config.provider"]
|
||||
|
||||
assert len(deploy_facts) == 1
|
||||
assert deploy_facts[0]["metadata"]["duplicate_count"] >= 3
|
||||
assert deploy_facts[0]["observed_at"] == "2026-04-22T10:00:00Z"
|
||||
assert deploy_facts[0]["metadata"]["provenance"] == [
|
||||
"conversation:user:0",
|
||||
"conversation:user:1",
|
||||
]
|
||||
|
||||
assert len(provider_facts) == 2
|
||||
assert {f["status"] for f in provider_facts} == {"contradiction"}
|
||||
assert {f["metadata"]["value"] for f in provider_facts} == {
|
||||
"openai-codex/gpt-5.4",
|
||||
"mimo-v2-pro",
|
||||
}
|
||||
96
tests/plugins/memory/test_holographic_observations.py
Normal file
96
tests/plugins/memory/test_holographic_observations.py
Normal file
@@ -0,0 +1,96 @@
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from plugins.memory.holographic import HolographicMemoryProvider
|
||||
from plugins.memory.holographic.store import MemoryStore
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def store(tmp_path):
|
||||
db_path = tmp_path / "memory.db"
|
||||
s = MemoryStore(db_path=str(db_path), default_trust=0.5)
|
||||
yield s
|
||||
s.close()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def provider(tmp_path):
|
||||
p = HolographicMemoryProvider(
|
||||
config={
|
||||
"db_path": str(tmp_path / "memory.db"),
|
||||
"default_trust": 0.5,
|
||||
}
|
||||
)
|
||||
p.initialize(session_id="test-session")
|
||||
yield p
|
||||
if p._store:
|
||||
p._store.close()
|
||||
|
||||
|
||||
class TestObservationSynthesis:
|
||||
def test_observe_action_persists_observation_with_evidence_links(self, provider):
|
||||
fact_ids = [
|
||||
provider._store.add_fact('User prefers concise status updates', category='user_pref'),
|
||||
provider._store.add_fact('User wants result-only replies with no fluff', category='user_pref'),
|
||||
]
|
||||
|
||||
result = json.loads(
|
||||
provider.handle_tool_call(
|
||||
'fact_store',
|
||||
{
|
||||
'action': 'observe',
|
||||
'query': 'What communication style does the user prefer?',
|
||||
'limit': 5,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
assert result['count'] == 1
|
||||
observation = result['observations'][0]
|
||||
assert observation['observation_type'] == 'recurring_preference'
|
||||
assert observation['confidence'] >= 0.6
|
||||
assert sorted(item['fact_id'] for item in observation['evidence']) == sorted(fact_ids)
|
||||
|
||||
stored = provider._store.list_observations(limit=10)
|
||||
assert len(stored) == 1
|
||||
assert stored[0]['observation_type'] == 'recurring_preference'
|
||||
assert stored[0]['evidence_count'] == 2
|
||||
assert len(provider._store.list_facts(limit=10)) == 2
|
||||
|
||||
def test_observe_action_synthesizes_three_observation_types(self, provider):
|
||||
provider._store.add_fact('User prefers concise updates', category='user_pref')
|
||||
provider._store.add_fact('User wants result-only communication', category='user_pref')
|
||||
provider._store.add_fact('Project is moving to a local-first deployment model', category='project')
|
||||
provider._store.add_fact('Project direction stays Gitea-first for issue and PR flow', category='project')
|
||||
provider._store.add_fact('Operator always commits early before moving on', category='general')
|
||||
provider._store.add_fact('Operator pushes a PR immediately after each meaningful fix', category='general')
|
||||
|
||||
result = json.loads(provider.handle_tool_call('fact_store', {'action': 'observe', 'limit': 10}))
|
||||
types = {item['observation_type'] for item in result['observations']}
|
||||
|
||||
assert {'recurring_preference', 'stable_direction', 'behavioral_pattern'} <= types
|
||||
|
||||
def test_single_fact_does_not_create_overconfident_observation(self, provider):
|
||||
provider._store.add_fact('User prefers concise updates', category='user_pref')
|
||||
|
||||
result = json.loads(
|
||||
provider.handle_tool_call(
|
||||
'fact_store',
|
||||
{'action': 'observe', 'query': 'What does the user prefer?', 'limit': 5},
|
||||
)
|
||||
)
|
||||
|
||||
assert result['count'] == 0
|
||||
assert provider._store.list_observations(limit=10) == []
|
||||
|
||||
def test_prefetch_surfaces_observations_as_separate_layer(self, provider):
|
||||
provider._store.add_fact('User prefers concise updates', category='user_pref')
|
||||
provider._store.add_fact('User wants result-only communication', category='user_pref')
|
||||
|
||||
prefetch = provider.prefetch('What communication style does the user prefer?')
|
||||
|
||||
assert '## Holographic Observations' in prefetch
|
||||
assert '## Holographic Memory' in prefetch
|
||||
assert 'recurring_preference' in prefetch
|
||||
assert 'evidence' in prefetch.lower()
|
||||
@@ -1,6 +1,6 @@
|
||||
"""Tests for session compaction with fact extraction."""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
@@ -8,19 +8,12 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from agent.session_compactor import (
|
||||
ExtractedFact,
|
||||
evaluate_extraction_quality,
|
||||
extract_and_save_facts,
|
||||
extract_facts_from_messages,
|
||||
format_facts_summary,
|
||||
save_facts_to_store,
|
||||
extract_and_save_facts,
|
||||
format_facts_summary,
|
||||
)
|
||||
|
||||
_FIXTURE_PATH = Path(__file__).resolve().parent / "fixtures" / "memory_extraction_fragments.json"
|
||||
|
||||
|
||||
def _load_fixture(name: str):
|
||||
return json.loads(_FIXTURE_PATH.read_text())[name]
|
||||
|
||||
|
||||
class TestFactExtraction:
|
||||
def test_extract_preference(self):
|
||||
@@ -67,48 +60,14 @@ class TestFactExtraction:
|
||||
{"role": "user", "content": "I prefer Python."},
|
||||
]
|
||||
facts = extract_facts_from_messages(messages)
|
||||
# Should deduplicate
|
||||
python_facts = [f for f in facts if "Python" in f.content]
|
||||
assert len(python_facts) == 1
|
||||
|
||||
def test_structured_fact_preserves_provenance_and_temporal_metadata(self):
|
||||
facts = extract_facts_from_messages(_load_fixture("preferences_and_duplicates"))
|
||||
deploy_fact = next(f for f in facts if f.relation == "workflow.deploy_method")
|
||||
assert deploy_fact.source_role == "user"
|
||||
assert deploy_fact.source_turn == 0
|
||||
assert deploy_fact.observed_at == "2026-04-22T10:00:00Z"
|
||||
assert deploy_fact.provenance == "conversation:user:0"
|
||||
assert deploy_fact.canonical_key
|
||||
assert deploy_fact.evidence
|
||||
assert deploy_fact.evidence[0]["source_text"].startswith("Deploy via Ansible")
|
||||
|
||||
def test_near_duplicate_facts_are_normalized_into_one_canonical_fact(self):
|
||||
facts = extract_facts_from_messages(_load_fixture("preferences_and_duplicates"))
|
||||
deploy_facts = [f for f in facts if f.relation == "workflow.deploy_method"]
|
||||
assert len(deploy_facts) == 1
|
||||
assert len(deploy_facts[0].evidence) == 2
|
||||
assert deploy_facts[0].metadata["duplicate_count"] == 1
|
||||
|
||||
def test_contradictory_facts_are_preserved_for_unique_slots(self):
|
||||
facts = extract_facts_from_messages(_load_fixture("operational_and_contradictions"))
|
||||
provider_facts = [f for f in facts if f.contradiction_group == "config.provider"]
|
||||
assert len(provider_facts) == 2
|
||||
assert {f.status for f in provider_facts} == {"contradiction"}
|
||||
assert {f.normalized_content for f in provider_facts} == {
|
||||
"openai codex gpt 5 4",
|
||||
"mimo v2 pro",
|
||||
}
|
||||
|
||||
def test_quality_evaluation_reports_noise_reduction(self):
|
||||
metrics = evaluate_extraction_quality(_load_fixture("mixed_transcript"))
|
||||
assert metrics["raw_candidates"] > metrics["normalized_facts"]
|
||||
assert metrics["noise_reduction"] > 0
|
||||
assert metrics["contradiction_groups"] == 1
|
||||
|
||||
|
||||
class TestSaveFacts:
|
||||
def test_save_with_callback(self):
|
||||
saved = []
|
||||
|
||||
def mock_save(category, entity, content, trust):
|
||||
saved.append({"category": category, "content": content})
|
||||
|
||||
@@ -117,38 +76,6 @@ class TestSaveFacts:
|
||||
assert count == 1
|
||||
assert len(saved) == 1
|
||||
|
||||
def test_save_with_extended_callback_metadata(self):
|
||||
saved = []
|
||||
|
||||
def mock_save(category, entity, content, trust, **kwargs):
|
||||
saved.append({
|
||||
"category": category,
|
||||
"entity": entity,
|
||||
"content": content,
|
||||
"trust": trust,
|
||||
**kwargs,
|
||||
})
|
||||
|
||||
fact = ExtractedFact(
|
||||
"project.operational",
|
||||
"watchdog",
|
||||
"BURN watchdog caps dispatches per cycle to 6",
|
||||
0.9,
|
||||
2,
|
||||
source_role="user",
|
||||
observed_at="2026-04-22T11:00:00Z",
|
||||
provenance="conversation:user:2",
|
||||
canonical_key="project.operational|watchdog|dispatch_cap|6",
|
||||
relation="fleet.dispatch_cap",
|
||||
contradiction_group="fleet.dispatch_cap",
|
||||
metadata={"duplicate_count": 0},
|
||||
)
|
||||
count = save_facts_to_store([fact], fact_store_fn=mock_save)
|
||||
assert count == 1
|
||||
assert saved[0]["canonical_key"] == fact.canonical_key
|
||||
assert saved[0]["observed_at"] == "2026-04-22T11:00:00Z"
|
||||
assert saved[0]["metadata"]["duplicate_count"] == 0
|
||||
|
||||
|
||||
class TestFormatSummary:
|
||||
def test_empty(self):
|
||||
|
||||
Reference in New Issue
Block a user