Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9316121a4 |
@@ -1,70 +1,43 @@
|
||||
from __future__ import annotations
|
||||
|
||||
"""
|
||||
A2A agent card generation for fleet discovery.
|
||||
Agent Card — A2A-compliant agent discovery.
|
||||
Part of #843: fix: implement A2A agent card for fleet discovery (#819)
|
||||
|
||||
Refs #801.
|
||||
Closes #802.
|
||||
Provides metadata about the agent's identity, capabilities, and installed skills
|
||||
for discovery by other agents in the fleet.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from typing import Any, Dict, Iterable, List, Mapping, Sequence
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from hermes_cli import __version__
|
||||
from hermes_cli.config import load_config
|
||||
|
||||
from hermes_cli.config import load_config, get_hermes_home
|
||||
from agent.skill_utils import (
|
||||
get_all_skills_dirs,
|
||||
get_disabled_skill_names,
|
||||
iter_skill_index_files,
|
||||
parse_frontmatter,
|
||||
skill_matches_platform,
|
||||
get_all_skills_dirs,
|
||||
get_disabled_skill_names,
|
||||
skill_matches_platform
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_DESCRIPTION = "Sovereign AI agent — orchestration, code, research"
|
||||
DEFAULT_INPUT_MODES = ["text/plain", "application/json"]
|
||||
DEFAULT_OUTPUT_MODES = ["text/plain", "application/json"]
|
||||
_REQUIRED_CAPABILITY_FLAGS = (
|
||||
"streaming",
|
||||
"pushNotifications",
|
||||
"stateTransitionHistory",
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentSkill:
|
||||
id: str
|
||||
name: str
|
||||
description: str = ""
|
||||
tags: List[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
data: Dict[str, Any] = {"id": self.id, "name": self.name}
|
||||
if self.description:
|
||||
data["description"] = self.description
|
||||
if self.tags:
|
||||
data["tags"] = self.tags
|
||||
return data
|
||||
|
||||
version: str = "1.0.0"
|
||||
|
||||
@dataclass
|
||||
class AgentCapabilities:
|
||||
streaming: bool = True
|
||||
pushNotifications: bool = False
|
||||
stateTransitionHistory: bool = True
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
tools: bool = True
|
||||
vision: bool = False
|
||||
reasoning: bool = False
|
||||
|
||||
@dataclass
|
||||
class AgentCard:
|
||||
@@ -74,81 +47,14 @@ class AgentCard:
|
||||
version: str = __version__
|
||||
capabilities: AgentCapabilities = field(default_factory=AgentCapabilities)
|
||||
skills: List[AgentSkill] = field(default_factory=list)
|
||||
defaultInputModes: List[str] = field(default_factory=lambda: list(DEFAULT_INPUT_MODES))
|
||||
defaultOutputModes: List[str] = field(default_factory=lambda: list(DEFAULT_OUTPUT_MODES))
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
data: Dict[str, Any] = {
|
||||
"name": self.name,
|
||||
"description": self.description,
|
||||
"url": self.url,
|
||||
"version": self.version,
|
||||
"capabilities": self.capabilities.to_dict(),
|
||||
"skills": [skill.to_dict() for skill in self.skills],
|
||||
"defaultInputModes": list(self.defaultInputModes),
|
||||
"defaultOutputModes": list(self.defaultOutputModes),
|
||||
}
|
||||
if self.metadata:
|
||||
data["metadata"] = dict(self.metadata)
|
||||
return data
|
||||
|
||||
def to_json(self, indent: int = 2) -> str:
|
||||
return json.dumps(self.to_dict(), indent=indent)
|
||||
|
||||
|
||||
def _env_or_empty(key: str) -> str:
|
||||
return os.environ.get(key, "").strip()
|
||||
|
||||
|
||||
def _as_agent_config(config: Mapping[str, Any] | None) -> Dict[str, Any]:
|
||||
if not isinstance(config, Mapping):
|
||||
return {}
|
||||
agent_cfg = config.get("agent")
|
||||
return dict(agent_cfg) if isinstance(agent_cfg, Mapping) else {}
|
||||
|
||||
|
||||
def _as_a2a_config(config: Mapping[str, Any] | None) -> Dict[str, Any]:
|
||||
if not isinstance(config, Mapping):
|
||||
return {}
|
||||
a2a_cfg = config.get("a2a")
|
||||
return dict(a2a_cfg) if isinstance(a2a_cfg, Mapping) else {}
|
||||
|
||||
|
||||
def _normalize_string_list(value: Any) -> List[str]:
|
||||
if value is None:
|
||||
return []
|
||||
if isinstance(value, str):
|
||||
parts = value.split(",")
|
||||
elif isinstance(value, Sequence) and not isinstance(value, (bytes, bytearray, str)):
|
||||
parts = list(value)
|
||||
else:
|
||||
parts = [value]
|
||||
out: List[str] = []
|
||||
seen = set()
|
||||
for item in parts:
|
||||
text = str(item).strip()
|
||||
if not text or text in seen:
|
||||
continue
|
||||
seen.add(text)
|
||||
out.append(text)
|
||||
return out
|
||||
|
||||
|
||||
def _normalize_skill_tags(frontmatter: Mapping[str, Any]) -> List[str]:
|
||||
tags = _normalize_string_list(frontmatter.get("tags"))
|
||||
category = str(frontmatter.get("category") or "").strip()
|
||||
if category and category not in tags:
|
||||
tags.append(category)
|
||||
return tags
|
||||
|
||||
defaultInputModes: List[str] = field(default_factory=lambda: ["text/plain"])
|
||||
defaultOutputModes: List[str] = field(default_factory=lambda: ["text/plain"])
|
||||
|
||||
def _load_skills() -> List[AgentSkill]:
|
||||
"""Scan enabled skills and return A2A skill metadata."""
|
||||
skills: List[AgentSkill] = []
|
||||
"""Scan all enabled skills and return metadata."""
|
||||
skills = []
|
||||
disabled = get_disabled_skill_names()
|
||||
seen_ids = set()
|
||||
|
||||
|
||||
for skills_dir in get_all_skills_dirs():
|
||||
if not skills_dir.is_dir():
|
||||
continue
|
||||
@@ -159,262 +65,71 @@ def _load_skills() -> List[AgentSkill]:
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
skill_name = frontmatter.get("name") or skill_file.parent.name
|
||||
if str(skill_name) in disabled:
|
||||
continue
|
||||
if not skill_matches_platform(frontmatter):
|
||||
continue
|
||||
|
||||
skill_id = str(frontmatter.get("name") or skill_file.parent.name).strip().lower().replace(" ", "-")
|
||||
if skill_id in disabled or skill_id in seen_ids:
|
||||
continue
|
||||
seen_ids.add(skill_id)
|
||||
skills.append(AgentSkill(
|
||||
id=str(skill_name),
|
||||
name=str(frontmatter.get("name", skill_name)),
|
||||
description=str(frontmatter.get("description", "")),
|
||||
version=str(frontmatter.get("version", "1.0.0"))
|
||||
))
|
||||
return skills
|
||||
|
||||
display_name = str(frontmatter.get("title") or frontmatter.get("name") or skill_file.parent.name).strip()
|
||||
description = str(frontmatter.get("description") or "").strip()
|
||||
tags = _normalize_skill_tags(frontmatter)
|
||||
skills.append(
|
||||
AgentSkill(
|
||||
id=skill_id,
|
||||
name=display_name,
|
||||
description=description,
|
||||
tags=tags,
|
||||
)
|
||||
)
|
||||
def build_agent_card() -> AgentCard:
|
||||
"""Build the agent card from current configuration and environment."""
|
||||
config = load_config()
|
||||
|
||||
# Identity
|
||||
name = os.environ.get("HERMES_AGENT_NAME") or config.get("agent", {}).get("name") or "hermes"
|
||||
description = os.environ.get("HERMES_AGENT_DESCRIPTION") or config.get("agent", {}).get("description") or "Sovereign AI agent"
|
||||
|
||||
# URL - try to determine from environment or config
|
||||
port = os.environ.get("HERMES_WEB_PORT") or "9119"
|
||||
host = os.environ.get("HERMES_WEB_HOST") or "localhost"
|
||||
url = f"http://{host}:{port}"
|
||||
|
||||
# Capabilities
|
||||
# In a real scenario, we'd check model metadata for vision/reasoning
|
||||
capabilities = AgentCapabilities(
|
||||
streaming=True,
|
||||
tools=True,
|
||||
vision=False, # Default to false unless we can confirm
|
||||
reasoning=False
|
||||
)
|
||||
|
||||
# Skills
|
||||
skills = _load_skills()
|
||||
|
||||
return AgentCard(
|
||||
name=name,
|
||||
description=description,
|
||||
url=url,
|
||||
version=__version__,
|
||||
capabilities=capabilities,
|
||||
skills=skills
|
||||
)
|
||||
|
||||
return sorted(skills, key=lambda skill: skill.id)
|
||||
|
||||
|
||||
def _get_agent_name(config: Mapping[str, Any] | None, override: str | None = None) -> str:
|
||||
if override:
|
||||
return override
|
||||
env_name = _env_or_empty("HERMES_AGENT_NAME") or _env_or_empty("AGENT_NAME")
|
||||
if env_name:
|
||||
return env_name
|
||||
agent_cfg = _as_agent_config(config)
|
||||
if agent_cfg.get("name"):
|
||||
return str(agent_cfg["name"]).strip()
|
||||
def get_agent_card_json() -> str:
|
||||
"""Return the agent card as a JSON string."""
|
||||
try:
|
||||
hostname = socket.gethostname().split(".", 1)[0].strip()
|
||||
if hostname:
|
||||
return hostname
|
||||
except Exception:
|
||||
pass
|
||||
return "hermes"
|
||||
|
||||
|
||||
def _get_description(config: Mapping[str, Any] | None, override: str | None = None) -> str:
|
||||
if override:
|
||||
return override
|
||||
env_description = _env_or_empty("HERMES_AGENT_DESCRIPTION") or _env_or_empty("AGENT_DESCRIPTION")
|
||||
if env_description:
|
||||
return env_description
|
||||
agent_cfg = _as_agent_config(config)
|
||||
if agent_cfg.get("description"):
|
||||
return str(agent_cfg["description"]).strip()
|
||||
return DEFAULT_DESCRIPTION
|
||||
|
||||
|
||||
def _normalize_a2a_url(url: str) -> str:
|
||||
raw = (url or "").strip()
|
||||
if not raw:
|
||||
return ""
|
||||
parsed = urlparse(raw if "://" in raw else f"https://{raw}")
|
||||
scheme = parsed.scheme or "https"
|
||||
netloc = parsed.netloc or parsed.path
|
||||
path = parsed.path if parsed.netloc else ""
|
||||
normalized_path = path.rstrip("/") if path not in ("", "/") else ""
|
||||
if not normalized_path.endswith("/a2a"):
|
||||
normalized_path = f"{normalized_path}/a2a" if normalized_path else "/a2a"
|
||||
return urlunparse((scheme, netloc, normalized_path, "", "", ""))
|
||||
|
||||
|
||||
def _get_agent_url(config: Mapping[str, Any] | None, override: str | None = None) -> str:
|
||||
if override:
|
||||
return _normalize_a2a_url(override)
|
||||
|
||||
agent_cfg = _as_agent_config(config)
|
||||
a2a_cfg = _as_a2a_config(config)
|
||||
|
||||
explicit = (
|
||||
_env_or_empty("HERMES_A2A_PUBLIC_URL")
|
||||
or str(a2a_cfg.get("public_url") or "").strip()
|
||||
or str(agent_cfg.get("a2a_public_url") or "").strip()
|
||||
)
|
||||
if explicit:
|
||||
return _normalize_a2a_url(explicit)
|
||||
|
||||
host = (
|
||||
_env_or_empty("HERMES_A2A_HOST")
|
||||
or str(a2a_cfg.get("host") or "").strip()
|
||||
or _env_or_empty("HERMES_WEB_HOST")
|
||||
or str(agent_cfg.get("host") or "").strip()
|
||||
or "localhost"
|
||||
)
|
||||
port = (
|
||||
_env_or_empty("HERMES_A2A_PORT")
|
||||
or str(a2a_cfg.get("port") or "").strip()
|
||||
or _env_or_empty("HERMES_WEB_PORT")
|
||||
or str(agent_cfg.get("port") or "").strip()
|
||||
or "9119"
|
||||
)
|
||||
scheme = (
|
||||
_env_or_empty("HERMES_A2A_SCHEME")
|
||||
or str(a2a_cfg.get("scheme") or "").strip()
|
||||
or ("https" if (_env_or_empty("HERMES_MTLS_CERT") or str(port) == "9443") else "http")
|
||||
)
|
||||
return _normalize_a2a_url(f"{scheme}://{host}:{port}")
|
||||
|
||||
|
||||
def _merge_skills(base_skills: Iterable[AgentSkill], extra_skills: Iterable[AgentSkill] | None = None) -> List[AgentSkill]:
|
||||
merged: Dict[str, AgentSkill] = {}
|
||||
for skill in list(base_skills) + list(extra_skills or []):
|
||||
if skill.id not in merged:
|
||||
merged[skill.id] = skill
|
||||
return [merged[key] for key in sorted(merged)]
|
||||
|
||||
|
||||
def build_agent_card(
|
||||
*,
|
||||
name: str | None = None,
|
||||
description: str | None = None,
|
||||
url: str | None = None,
|
||||
extra_skills: Iterable[AgentSkill] | None = None,
|
||||
metadata: Mapping[str, Any] | None = None,
|
||||
) -> AgentCard:
|
||||
"""Build an A2A-compliant agent card from config, env, and installed skills."""
|
||||
try:
|
||||
config = load_config()
|
||||
except Exception as exc:
|
||||
logger.debug("Falling back to empty config while building agent card: %s", exc)
|
||||
config = {}
|
||||
|
||||
card = AgentCard(
|
||||
name=_get_agent_name(config, override=name),
|
||||
description=_get_description(config, override=description),
|
||||
url=_get_agent_url(config, override=url),
|
||||
skills=_merge_skills(_load_skills(), extra_skills),
|
||||
metadata=dict(metadata or {}),
|
||||
)
|
||||
return card
|
||||
|
||||
|
||||
def validate_agent_card(card: AgentCard | Dict[str, Any]) -> List[str]:
|
||||
"""Return a list of schema-validation errors for an agent card."""
|
||||
data = card.to_dict() if isinstance(card, AgentCard) else dict(card)
|
||||
errors: List[str] = []
|
||||
|
||||
for field_name in ("name", "description", "url", "version"):
|
||||
value = data.get(field_name)
|
||||
if not isinstance(value, str) or not value.strip():
|
||||
errors.append(f"{field_name} must be a non-empty string")
|
||||
|
||||
url_value = str(data.get("url") or "")
|
||||
parsed = urlparse(url_value)
|
||||
if not parsed.scheme or not parsed.netloc:
|
||||
errors.append("url must be an absolute http/https URL")
|
||||
elif parsed.scheme not in {"http", "https"}:
|
||||
errors.append("url must use http or https")
|
||||
elif not parsed.path.rstrip("/").endswith("/a2a"):
|
||||
errors.append("url must point to the /a2a endpoint")
|
||||
|
||||
capabilities = data.get("capabilities")
|
||||
if not isinstance(capabilities, Mapping):
|
||||
errors.append("capabilities must be an object")
|
||||
else:
|
||||
for capability_name in _REQUIRED_CAPABILITY_FLAGS:
|
||||
if not isinstance(capabilities.get(capability_name), bool):
|
||||
errors.append(f"capabilities.{capability_name} must be a boolean")
|
||||
|
||||
for field_name, required_modes in (
|
||||
("defaultInputModes", DEFAULT_INPUT_MODES),
|
||||
("defaultOutputModes", DEFAULT_OUTPUT_MODES),
|
||||
):
|
||||
modes = data.get(field_name)
|
||||
if not isinstance(modes, list) or not modes:
|
||||
errors.append(f"{field_name} must be a non-empty list of MIME types")
|
||||
continue
|
||||
for mode in modes:
|
||||
if not isinstance(mode, str) or "/" not in mode:
|
||||
errors.append(f"{field_name} entries must be MIME types")
|
||||
for required_mode in required_modes:
|
||||
if required_mode not in modes:
|
||||
errors.append(f"{field_name} must include {required_mode}")
|
||||
|
||||
skills = data.get("skills")
|
||||
if not isinstance(skills, list):
|
||||
errors.append("skills must be a list")
|
||||
else:
|
||||
for index, skill in enumerate(skills):
|
||||
if not isinstance(skill, Mapping):
|
||||
errors.append(f"skills[{index}] must be an object")
|
||||
continue
|
||||
if not str(skill.get("id") or "").strip():
|
||||
errors.append(f"skills[{index}] missing id")
|
||||
if not str(skill.get("name") or "").strip():
|
||||
errors.append(f"skills[{index}] missing name")
|
||||
tags = skill.get("tags", [])
|
||||
if tags is None:
|
||||
tags = []
|
||||
if not isinstance(tags, list):
|
||||
errors.append(f"skills[{index}].tags must be a list")
|
||||
else:
|
||||
for tag in tags:
|
||||
if not isinstance(tag, str) or not tag.strip():
|
||||
errors.append(f"skills[{index}].tags entries must be non-empty strings")
|
||||
|
||||
metadata = data.get("metadata")
|
||||
if metadata is not None and not isinstance(metadata, Mapping):
|
||||
errors.append("metadata must be an object when present")
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def get_agent_card_json(
|
||||
*,
|
||||
name: str | None = None,
|
||||
description: str | None = None,
|
||||
url: str | None = None,
|
||||
metadata: Mapping[str, Any] | None = None,
|
||||
indent: int = 2,
|
||||
) -> str:
|
||||
"""Return the local agent card as JSON, falling back to an error card on failure."""
|
||||
try:
|
||||
card = build_agent_card(name=name, description=description, url=url, metadata=metadata)
|
||||
errors = validate_agent_card(card)
|
||||
if errors:
|
||||
raise ValueError("; ".join(errors))
|
||||
return card.to_json(indent=indent)
|
||||
except Exception as exc:
|
||||
logger.error("Failed to build agent card: %s", exc)
|
||||
card = build_agent_card()
|
||||
return json.dumps(asdict(card), indent=2)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to build agent card: {e}")
|
||||
# Minimal fallback card
|
||||
fallback = {
|
||||
"name": name or _env_or_empty("HERMES_AGENT_NAME") or "hermes",
|
||||
"description": "Sovereign AI agent (agent card fallback)",
|
||||
"url": url or "http://localhost:9119/a2a",
|
||||
"name": "hermes",
|
||||
"description": "Sovereign AI agent (fallback)",
|
||||
"version": __version__,
|
||||
"capabilities": AgentCapabilities().to_dict(),
|
||||
"skills": [],
|
||||
"defaultInputModes": list(DEFAULT_INPUT_MODES),
|
||||
"defaultOutputModes": list(DEFAULT_OUTPUT_MODES),
|
||||
"error": str(exc),
|
||||
"error": str(e)
|
||||
}
|
||||
return json.dumps(fallback, indent=indent)
|
||||
return json.dumps(fallback, indent=2)
|
||||
|
||||
|
||||
def main(argv: Sequence[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description="Generate an A2A-compliant Hermes agent card")
|
||||
parser.add_argument("--name", help="Override the agent name")
|
||||
parser.add_argument("--description", help="Override the agent description")
|
||||
parser.add_argument("--url", help="Override the public A2A URL")
|
||||
parser.add_argument("--validate", action="store_true", help="Validate before printing; exit 1 on schema errors")
|
||||
args = parser.parse_args(list(argv) if argv is not None else None)
|
||||
|
||||
card = build_agent_card(name=args.name, description=args.description, url=args.url)
|
||||
errors = validate_agent_card(card)
|
||||
if args.validate and errors:
|
||||
for error in errors:
|
||||
print(error, file=sys.stderr)
|
||||
return 1
|
||||
print(card.to_json(indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
def validate_agent_card(card_data: Dict[str, Any]) -> bool:
|
||||
"""Check if the card data complies with the A2A schema."""
|
||||
required = ["name", "description", "url", "version"]
|
||||
return all(k in card_data for k in required)
|
||||
|
||||
@@ -1,197 +1,546 @@
|
||||
"""Session compaction with fact extraction.
|
||||
"""Session compaction with structured fact extraction.
|
||||
|
||||
Before compressing conversation context, extracts durable facts
|
||||
(user preferences, corrections, project details) and saves them
|
||||
to the fact store so they survive compression.
|
||||
|
||||
Usage:
|
||||
from agent.session_compactor import extract_and_save_facts
|
||||
facts = extract_and_save_facts(messages)
|
||||
Before compressing conversation context, extract durable facts with enough
|
||||
structure to survive retrieval: source/provenance, temporal anchors,
|
||||
normalized canonical keys, and contradiction groups.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_DEPLOY_METHOD_RE = re.compile(r"\bdeploy(?:ing)?\s+(?:via|through|with)\s+([A-Za-z0-9_./+-]+)", re.IGNORECASE)
|
||||
_WATCHDOG_CAP_RE = re.compile(
|
||||
r"\b(?:the\s+)?([A-Za-z0-9_-]+(?:\s+watchdog)?)\s+(?:caps|limits)\s+dispatches(?:\s+per\s+cycle)?\s+to\s+([0-9]+)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_PROVIDER_RE = re.compile(
|
||||
r"\bprovider\s+(?:is|should\s+stay|should\s+be|needs\s+to\s+be)\s+([A-Za-z0-9._/-]+)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_MODEL_RE = re.compile(
|
||||
r"\bmodel\s+(?:is|should\s+stay|should\s+be|needs\s+to\s+be)\s+([A-Za-z0-9._:/-]+)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_PORT_RE = re.compile(r"\bport\s+(?:is|should\s+be)\s+([0-9]+)", re.IGNORECASE)
|
||||
_PROJECT_USES_RE = re.compile(r"\b(?:the\s+)?project\s+(?:uses|needs|requires)\s+(.+?)(?:[.!?]|$)", re.IGNORECASE)
|
||||
_PREFERENCE_RE = re.compile(r"\bI\s+(?:prefer|like|want|need)\s+(.+?)(?:[.!?]|$)", re.IGNORECASE)
|
||||
_CONSTRAINT_RE = re.compile(r"\b(?:do\s+not|don't)\s+(?:ever\s+|again\s+)?(.+?)(?:[.!?]|$)", re.IGNORECASE)
|
||||
_DECISION_RE = re.compile(r"\b(?:we|the\s+team)\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+?)(?:[.!?]|$)", re.IGNORECASE)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExtractedFact:
|
||||
"""A fact extracted from conversation."""
|
||||
category: str # "user_pref", "correction", "project", "tool_quirk", "general"
|
||||
entity: str # what the fact is about
|
||||
content: str # the fact itself
|
||||
confidence: float # 0.0-1.0
|
||||
source_turn: int # which message turn it came from
|
||||
"""A durable fact extracted from conversation."""
|
||||
|
||||
category: str
|
||||
entity: str
|
||||
content: str
|
||||
confidence: float
|
||||
source_turn: int
|
||||
timestamp: float = 0.0
|
||||
source_role: str = "user"
|
||||
source_text: str = ""
|
||||
normalized_content: str = ""
|
||||
canonical_key: str = ""
|
||||
relation: str = "general"
|
||||
contradiction_group: str = ""
|
||||
status: str = "active"
|
||||
provenance: str = ""
|
||||
observed_at: str = ""
|
||||
evidence: List[Dict[str, Any]] = field(default_factory=list)
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
# Patterns that indicate user preferences
|
||||
_PREFERENCE_PATTERNS = [
|
||||
(r"(?:I|we) (?:prefer|like|want|need) (.+?)(?:\.|$)", "preference"),
|
||||
(r"(?:always|never) (?:use|do|run|deploy) (.+?)(?:\.|$)", "preference"),
|
||||
(r"(?:my|our) (?:default|preferred|usual) (.+?) (?:is|are) (.+?)(?:\.|$)", "preference"),
|
||||
(r"(?:make sure|ensure|remember) (?:to|that) (.+?)(?:\.|$)", "instruction"),
|
||||
(r"(?:don'?t|do not) (?:ever|ever again) (.+?)(?:\.|$)", "constraint"),
|
||||
]
|
||||
|
||||
# Patterns that indicate corrections
|
||||
_CORRECTION_PATTERNS = [
|
||||
(r"(?:actually|no[, ]|wait[, ]|correction[: ]|sorry[, ]) (.+)", "correction"),
|
||||
(r"(?:I meant|what I meant was|the correct) (.+?)(?:\.|$)", "correction"),
|
||||
(r"(?:it'?s|its) (?:not|shouldn'?t be|wrong) (.+?)(?:\.|$)", "correction"),
|
||||
]
|
||||
|
||||
# Patterns that indicate project/tool facts
|
||||
_PROJECT_PATTERNS = [
|
||||
(r"(?:the |our )?(?:project|repo|codebase|code) (?:is|uses|needs|requires) (.+?)(?:\.|$)", "project"),
|
||||
(r"(?:deploy|push|commit) (?:to|on) (.+?)(?:\.|$)", "project"),
|
||||
(r"(?:this|that|the) (?:server|host|machine|VPS) (?:is|runs|has) (.+?)(?:\.|$)", "infrastructure"),
|
||||
(r"(?:model|provider|engine) (?:is|should be|needs to be) (.+?)(?:\.|$)", "config"),
|
||||
]
|
||||
def __post_init__(self) -> None:
|
||||
if not self.timestamp:
|
||||
self.timestamp = time.time()
|
||||
if not self.observed_at:
|
||||
self.observed_at = _iso_from_timestamp(self.timestamp)
|
||||
if not self.normalized_content:
|
||||
self.normalized_content = _normalize_value(self.content)
|
||||
if not self.provenance:
|
||||
self.provenance = f"conversation:{self.source_role}:{self.source_turn}"
|
||||
if not self.canonical_key:
|
||||
self.canonical_key = _canonical_key(self.entity, self.relation, self.normalized_content)
|
||||
if not self.evidence:
|
||||
self.evidence = [
|
||||
{
|
||||
"source_role": self.source_role,
|
||||
"source_turn": self.source_turn,
|
||||
"source_text": self.source_text or self.content,
|
||||
"observed_at": self.observed_at,
|
||||
"provenance": self.provenance,
|
||||
}
|
||||
]
|
||||
self.metadata = dict(self.metadata or {})
|
||||
self.metadata.setdefault("entity", self.entity)
|
||||
self.metadata.setdefault("relation", self.relation)
|
||||
self.metadata.setdefault("value", self.content)
|
||||
self.metadata.setdefault("normalized_value", self.normalized_content)
|
||||
self.metadata.setdefault("provenance", [self.provenance])
|
||||
self.metadata.setdefault("evidence", list(self.evidence))
|
||||
self.metadata.setdefault("observation_count", len(self.evidence))
|
||||
self.metadata.setdefault("duplicate_count", max(0, self.metadata["observation_count"] - 1))
|
||||
if self.contradiction_group:
|
||||
self.metadata.setdefault("contradiction_group", self.contradiction_group)
|
||||
self.metadata.setdefault("status", self.status)
|
||||
|
||||
|
||||
def extract_facts_from_messages(messages: List[Dict[str, Any]]) -> List[ExtractedFact]:
|
||||
"""Extract durable facts from conversation messages.
|
||||
|
||||
Scans user messages for preferences, corrections, project facts,
|
||||
and infrastructure details that should survive compression.
|
||||
Scans conversation turns for preferences, decisions, corrections, and
|
||||
operational state. Raw candidates are normalized into canonical facts so
|
||||
near-duplicates merge and contradictions remain inspectable.
|
||||
"""
|
||||
facts = []
|
||||
seen_contents = set()
|
||||
|
||||
raw_candidates: list[ExtractedFact] = []
|
||||
for turn_idx, msg in enumerate(messages):
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
|
||||
# Only scan user messages and assistant responses with corrections
|
||||
if role not in ("user", "assistant"):
|
||||
if role not in {"user", "assistant"}:
|
||||
continue
|
||||
if not content or not isinstance(content, str):
|
||||
continue
|
||||
if len(content) < 10:
|
||||
continue
|
||||
|
||||
# Skip tool results and system messages
|
||||
if role == "assistant" and msg.get("tool_calls"):
|
||||
continue
|
||||
if not isinstance(content, str) or len(content.strip()) < 10:
|
||||
continue
|
||||
|
||||
extracted = _extract_from_text(content, turn_idx, role)
|
||||
timestamp, observed_at = _message_time(msg)
|
||||
raw_candidates.extend(
|
||||
_extract_from_text(
|
||||
content.strip(),
|
||||
turn_idx=turn_idx,
|
||||
role=role,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
)
|
||||
)
|
||||
|
||||
# Deduplicate by content
|
||||
for fact in extracted:
|
||||
key = f"{fact.category}:{fact.content[:100]}"
|
||||
if key not in seen_contents:
|
||||
seen_contents.add(key)
|
||||
facts.append(fact)
|
||||
return _normalize_candidates(raw_candidates)
|
||||
|
||||
|
||||
def evaluate_extraction_quality(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""Return before/after metrics for raw vs normalized extraction quality."""
|
||||
|
||||
raw_candidates: list[ExtractedFact] = []
|
||||
for turn_idx, msg in enumerate(messages):
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
if role not in {"user", "assistant"}:
|
||||
continue
|
||||
if role == "assistant" and msg.get("tool_calls"):
|
||||
continue
|
||||
if not isinstance(content, str) or len(content.strip()) < 10:
|
||||
continue
|
||||
timestamp, observed_at = _message_time(msg)
|
||||
raw_candidates.extend(
|
||||
_extract_from_text(
|
||||
content.strip(),
|
||||
turn_idx=turn_idx,
|
||||
role=role,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
)
|
||||
)
|
||||
|
||||
normalized = _normalize_candidates(raw_candidates)
|
||||
raw_count = len(raw_candidates)
|
||||
normalized_count = len(normalized)
|
||||
contradiction_groups = {
|
||||
fact.contradiction_group
|
||||
for fact in normalized
|
||||
if fact.status == "contradiction" and fact.contradiction_group
|
||||
}
|
||||
duplicate_count = max(0, raw_count - normalized_count)
|
||||
noise_reduction = (duplicate_count / raw_count) if raw_count else 0.0
|
||||
|
||||
return {
|
||||
"raw_candidates": raw_count,
|
||||
"normalized_facts": normalized_count,
|
||||
"duplicates_merged": duplicate_count,
|
||||
"contradiction_groups": len(contradiction_groups),
|
||||
"noise_reduction": round(noise_reduction, 3),
|
||||
}
|
||||
|
||||
|
||||
def _extract_from_text(
|
||||
text: str,
|
||||
*,
|
||||
turn_idx: int,
|
||||
role: str,
|
||||
timestamp: float,
|
||||
observed_at: str,
|
||||
) -> List[ExtractedFact]:
|
||||
"""Extract raw fact candidates from a single text block."""
|
||||
|
||||
facts: list[ExtractedFact] = []
|
||||
if role != "user":
|
||||
return facts
|
||||
|
||||
deploy_match = _DEPLOY_METHOD_RE.search(text)
|
||||
if deploy_match:
|
||||
method = deploy_match.group(1).strip()
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.decision",
|
||||
entity="project",
|
||||
relation="workflow.deploy_method",
|
||||
value=method,
|
||||
content=f"Deploy via {method}",
|
||||
confidence=0.88,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=True,
|
||||
)
|
||||
)
|
||||
|
||||
watchdog_match = _WATCHDOG_CAP_RE.search(text)
|
||||
if watchdog_match:
|
||||
watchdog = watchdog_match.group(1).strip()
|
||||
cap = watchdog_match.group(2).strip()
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.operational",
|
||||
entity=_normalize_entity(watchdog),
|
||||
relation="fleet.dispatch_cap",
|
||||
value=cap,
|
||||
content=f"{watchdog} caps dispatches per cycle to {cap}",
|
||||
confidence=0.92,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=True,
|
||||
)
|
||||
)
|
||||
|
||||
provider_match = _PROVIDER_RE.search(text)
|
||||
if provider_match:
|
||||
provider = provider_match.group(1).strip()
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.config",
|
||||
entity="project",
|
||||
relation="config.provider",
|
||||
value=provider,
|
||||
content=f"Provider should stay {provider}",
|
||||
confidence=0.91,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=True,
|
||||
)
|
||||
)
|
||||
|
||||
model_match = _MODEL_RE.search(text)
|
||||
if model_match:
|
||||
model = model_match.group(1).strip()
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.config",
|
||||
entity="project",
|
||||
relation="config.model",
|
||||
value=model,
|
||||
content=f"Model should stay {model}",
|
||||
confidence=0.9,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=True,
|
||||
)
|
||||
)
|
||||
|
||||
port_match = _PORT_RE.search(text)
|
||||
if port_match:
|
||||
port = port_match.group(1).strip()
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.config",
|
||||
entity="project",
|
||||
relation="config.port",
|
||||
value=port,
|
||||
content=f"Port is {port}",
|
||||
confidence=0.9,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=True,
|
||||
)
|
||||
)
|
||||
|
||||
project_match = _PROJECT_USES_RE.search(text)
|
||||
if project_match:
|
||||
value = project_match.group(1).strip().rstrip(".")
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.stack",
|
||||
entity="project",
|
||||
relation="project.stack",
|
||||
value=value,
|
||||
content=f"Project uses {value}",
|
||||
confidence=0.74,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=False,
|
||||
)
|
||||
)
|
||||
|
||||
preference_match = _PREFERENCE_RE.search(text)
|
||||
if preference_match:
|
||||
value = preference_match.group(1).strip().rstrip(".")
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="user_pref.preference",
|
||||
entity="user",
|
||||
relation="user.preference",
|
||||
value=value,
|
||||
content=value,
|
||||
confidence=0.72,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=False,
|
||||
)
|
||||
)
|
||||
|
||||
constraint_match = _CONSTRAINT_RE.search(text)
|
||||
if constraint_match:
|
||||
value = constraint_match.group(1).strip().rstrip(".")
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="user_pref.constraint",
|
||||
entity="user",
|
||||
relation="user.constraint",
|
||||
value=value,
|
||||
content=f"Do not {value}",
|
||||
confidence=0.82,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=False,
|
||||
)
|
||||
)
|
||||
|
||||
decision_match = _DECISION_RE.search(text)
|
||||
if decision_match:
|
||||
value = decision_match.group(1).strip().rstrip(".")
|
||||
facts.append(
|
||||
_build_fact(
|
||||
category="project.decision",
|
||||
entity="project",
|
||||
relation="project.decision",
|
||||
value=value,
|
||||
content=f"Decision: {value}",
|
||||
confidence=0.79,
|
||||
source_turn=turn_idx,
|
||||
source_role=role,
|
||||
source_text=text,
|
||||
timestamp=timestamp,
|
||||
observed_at=observed_at,
|
||||
unique_slot=False,
|
||||
)
|
||||
)
|
||||
|
||||
return facts
|
||||
|
||||
|
||||
def _extract_from_text(text: str, turn_idx: int, role: str) -> List[ExtractedFact]:
|
||||
"""Extract facts from a single text block."""
|
||||
facts = []
|
||||
timestamp = time.time()
|
||||
def _build_fact(
|
||||
*,
|
||||
category: str,
|
||||
entity: str,
|
||||
relation: str,
|
||||
value: str,
|
||||
content: str,
|
||||
confidence: float,
|
||||
source_turn: int,
|
||||
source_role: str,
|
||||
source_text: str,
|
||||
timestamp: float,
|
||||
observed_at: str,
|
||||
unique_slot: bool,
|
||||
) -> ExtractedFact:
|
||||
normalized_value = _normalize_value(value.rstrip(".!?"))
|
||||
value = value.rstrip(".!?")
|
||||
content = content.rstrip(".!?")
|
||||
provenance = f"conversation:{source_role}:{source_turn}"
|
||||
contradiction_group = relation if unique_slot else ""
|
||||
evidence = [
|
||||
{
|
||||
"source_role": source_role,
|
||||
"source_turn": source_turn,
|
||||
"source_text": source_text,
|
||||
"observed_at": observed_at,
|
||||
"provenance": provenance,
|
||||
}
|
||||
]
|
||||
metadata = {
|
||||
"entity": entity,
|
||||
"relation": relation,
|
||||
"value": value,
|
||||
"normalized_value": normalized_value,
|
||||
"provenance": [provenance],
|
||||
"evidence": list(evidence),
|
||||
"observation_count": 1,
|
||||
"duplicate_count": 0,
|
||||
"status": "active",
|
||||
}
|
||||
if contradiction_group:
|
||||
metadata["contradiction_group"] = contradiction_group
|
||||
return ExtractedFact(
|
||||
category=category,
|
||||
entity=entity,
|
||||
content=content,
|
||||
confidence=confidence,
|
||||
source_turn=source_turn,
|
||||
timestamp=timestamp,
|
||||
source_role=source_role,
|
||||
source_text=source_text,
|
||||
normalized_content=normalized_value,
|
||||
canonical_key=_canonical_key(entity, relation, normalized_value),
|
||||
relation=relation,
|
||||
contradiction_group=contradiction_group,
|
||||
status="active",
|
||||
provenance=provenance,
|
||||
observed_at=observed_at,
|
||||
evidence=evidence,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
# Clean text for pattern matching
|
||||
clean = text.strip()
|
||||
|
||||
# User preference patterns (from user messages)
|
||||
if role == "user":
|
||||
for pattern, subcategory in _PREFERENCE_PATTERNS:
|
||||
for match in re.finditer(pattern, clean, re.IGNORECASE):
|
||||
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
|
||||
if len(content) > 5:
|
||||
facts.append(ExtractedFact(
|
||||
category=f"user_pref.{subcategory}",
|
||||
entity="user",
|
||||
content=content[:200],
|
||||
confidence=0.7,
|
||||
source_turn=turn_idx,
|
||||
timestamp=timestamp,
|
||||
))
|
||||
def _normalize_candidates(candidates: List[ExtractedFact]) -> List[ExtractedFact]:
|
||||
"""Merge duplicates and mark contradictions while preserving evidence."""
|
||||
|
||||
# Correction patterns (from user messages)
|
||||
if role == "user":
|
||||
for pattern, subcategory in _CORRECTION_PATTERNS:
|
||||
for match in re.finditer(pattern, clean, re.IGNORECASE):
|
||||
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
|
||||
if len(content) > 5:
|
||||
facts.append(ExtractedFact(
|
||||
category=f"correction.{subcategory}",
|
||||
entity="user",
|
||||
content=content[:200],
|
||||
confidence=0.8,
|
||||
source_turn=turn_idx,
|
||||
timestamp=timestamp,
|
||||
))
|
||||
by_key: dict[str, ExtractedFact] = {}
|
||||
contradiction_groups: dict[str, list[ExtractedFact]] = {}
|
||||
|
||||
# Project/infrastructure patterns (from both user and assistant)
|
||||
for pattern, subcategory in _PROJECT_PATTERNS:
|
||||
for match in re.finditer(pattern, clean, re.IGNORECASE):
|
||||
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
|
||||
if len(content) > 5:
|
||||
facts.append(ExtractedFact(
|
||||
category=f"project.{subcategory}",
|
||||
entity=subcategory,
|
||||
content=content[:200],
|
||||
confidence=0.6,
|
||||
source_turn=turn_idx,
|
||||
timestamp=timestamp,
|
||||
))
|
||||
for candidate in candidates:
|
||||
existing = by_key.get(candidate.canonical_key)
|
||||
if existing is not None:
|
||||
by_key[candidate.canonical_key] = _merge_fact(existing, candidate)
|
||||
continue
|
||||
|
||||
return facts
|
||||
by_key[candidate.canonical_key] = candidate
|
||||
if candidate.contradiction_group:
|
||||
contradiction_groups.setdefault(candidate.contradiction_group, []).append(candidate)
|
||||
|
||||
for group, facts in contradiction_groups.items():
|
||||
canonical_keys = {fact.canonical_key for fact in facts}
|
||||
if len(canonical_keys) <= 1:
|
||||
continue
|
||||
for fact in facts:
|
||||
fact.status = "contradiction"
|
||||
fact.metadata["status"] = "contradiction"
|
||||
fact.metadata["contradiction_group"] = group
|
||||
fact.metadata["contradiction_keys"] = sorted(canonical_keys - {fact.canonical_key})
|
||||
|
||||
return sorted(by_key.values(), key=lambda fact: (fact.source_turn, fact.timestamp, fact.canonical_key))
|
||||
|
||||
|
||||
def _merge_fact(existing: ExtractedFact, incoming: ExtractedFact) -> ExtractedFact:
|
||||
existing.confidence = max(existing.confidence, incoming.confidence)
|
||||
existing.timestamp = min(existing.timestamp, incoming.timestamp)
|
||||
existing.source_turn = min(existing.source_turn, incoming.source_turn)
|
||||
if not existing.observed_at or (incoming.observed_at and incoming.observed_at < existing.observed_at):
|
||||
existing.observed_at = incoming.observed_at
|
||||
existing.provenance = min(existing.provenance, incoming.provenance)
|
||||
|
||||
provenance = _ordered_unique(existing.metadata.get("provenance", []), incoming.metadata.get("provenance", []))
|
||||
evidence = _merge_evidence(existing.metadata.get("evidence", []), incoming.metadata.get("evidence", []))
|
||||
observation_count = int(existing.metadata.get("observation_count", len(existing.evidence) or 1))
|
||||
observation_count += int(incoming.metadata.get("observation_count", len(incoming.evidence) or 1))
|
||||
|
||||
existing.evidence = evidence
|
||||
existing.metadata["provenance"] = provenance
|
||||
existing.metadata["evidence"] = evidence
|
||||
existing.metadata["observation_count"] = observation_count
|
||||
existing.metadata["duplicate_count"] = max(0, observation_count - 1)
|
||||
existing.metadata["status"] = existing.status
|
||||
return existing
|
||||
|
||||
|
||||
def save_facts_to_store(facts: List[ExtractedFact], fact_store_fn=None) -> int:
|
||||
"""Save extracted facts to the fact store.
|
||||
|
||||
Args:
|
||||
facts: List of extracted facts.
|
||||
fact_store_fn: Optional callable(category, entity, content, trust).
|
||||
If None, uses the holographic fact store if available.
|
||||
|
||||
Returns:
|
||||
Number of facts saved.
|
||||
If a callback is supplied, prefer the structured signature but fall back to
|
||||
the legacy four-argument callback for compatibility.
|
||||
"""
|
||||
saved = 0
|
||||
|
||||
if fact_store_fn:
|
||||
for fact in facts:
|
||||
saved = 0
|
||||
for fact in facts:
|
||||
payload = {
|
||||
"category": _store_category(fact.category),
|
||||
"entity": fact.entity,
|
||||
"content": fact.content,
|
||||
"trust": fact.confidence,
|
||||
"metadata": dict(fact.metadata),
|
||||
"canonical_key": fact.canonical_key,
|
||||
"observed_at": fact.observed_at,
|
||||
"source_role": fact.source_role,
|
||||
"source_turn": fact.source_turn,
|
||||
"contradiction_group": fact.contradiction_group,
|
||||
"status": fact.status,
|
||||
"relation": fact.relation,
|
||||
}
|
||||
|
||||
if fact_store_fn:
|
||||
try:
|
||||
fact_store_fn(
|
||||
category=fact.category,
|
||||
entity=fact.entity,
|
||||
content=fact.content,
|
||||
trust=fact.confidence,
|
||||
)
|
||||
fact_store_fn(**payload)
|
||||
saved += 1
|
||||
except Exception as e:
|
||||
logger.debug("Failed to save fact: %s", e)
|
||||
else:
|
||||
# Try holographic fact store
|
||||
continue
|
||||
except TypeError:
|
||||
try:
|
||||
fact_store_fn(payload["category"], payload["entity"], payload["content"], payload["trust"])
|
||||
saved += 1
|
||||
continue
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to save fact via callback: %s", exc)
|
||||
continue
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to save fact via callback: %s", exc)
|
||||
continue
|
||||
|
||||
try:
|
||||
from fact_store import fact_store as _fs
|
||||
for fact in facts:
|
||||
try:
|
||||
_fs(
|
||||
action="add",
|
||||
content=fact.content,
|
||||
category=fact.category,
|
||||
tags=fact.entity,
|
||||
trust_delta=fact.confidence - 0.5,
|
||||
)
|
||||
saved += 1
|
||||
except Exception as e:
|
||||
logger.debug("Failed to save fact via fact_store: %s", e)
|
||||
|
||||
tags = ",".join(filter(None, [fact.entity, fact.relation, fact.status]))
|
||||
_fs(
|
||||
action="add",
|
||||
content=fact.content,
|
||||
category=_store_category(fact.category),
|
||||
tags=tags,
|
||||
trust_delta=fact.confidence - 0.5,
|
||||
)
|
||||
saved += 1
|
||||
except ImportError:
|
||||
logger.debug("fact_store not available — facts not persisted")
|
||||
break
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to save fact via fact_store: %s", exc)
|
||||
|
||||
return saved
|
||||
|
||||
@@ -204,9 +553,10 @@ def extract_and_save_facts(
|
||||
|
||||
Returns (extracted_facts, saved_count).
|
||||
"""
|
||||
|
||||
facts = extract_facts_from_messages(messages)
|
||||
if facts:
|
||||
logger.info("Extracted %d facts from conversation", len(facts))
|
||||
logger.info("Extracted %d normalized facts from conversation", len(facts))
|
||||
saved = save_facts_to_store(facts, fact_store_fn)
|
||||
logger.info("Saved %d/%d facts to store", saved, len(facts))
|
||||
else:
|
||||
@@ -216,16 +566,105 @@ def extract_and_save_facts(
|
||||
|
||||
def format_facts_summary(facts: List[ExtractedFact]) -> str:
|
||||
"""Format extracted facts as a readable summary."""
|
||||
|
||||
if not facts:
|
||||
return "No facts extracted."
|
||||
|
||||
by_category = {}
|
||||
for f in facts:
|
||||
by_category.setdefault(f.category, []).append(f)
|
||||
by_category: dict[str, list[ExtractedFact]] = {}
|
||||
for fact in facts:
|
||||
by_category.setdefault(fact.category, []).append(fact)
|
||||
|
||||
lines = [f"Extracted {len(facts)} facts:", ""]
|
||||
for cat, cat_facts in sorted(by_category.items()):
|
||||
lines.append(f" {cat}:")
|
||||
for f in cat_facts:
|
||||
lines.append(f" - {f.content[:80]}")
|
||||
for category, category_facts in sorted(by_category.items()):
|
||||
lines.append(f" {category}:")
|
||||
for fact in category_facts:
|
||||
suffix = f" [{fact.status}]" if fact.status != "active" else ""
|
||||
lines.append(f" - {fact.content[:80]}{suffix}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _store_category(category: str) -> str:
|
||||
if category.startswith("user_pref"):
|
||||
return "user_pref"
|
||||
if category.startswith("project"):
|
||||
return "project"
|
||||
if category.startswith("tool"):
|
||||
return "tool"
|
||||
return "general"
|
||||
|
||||
|
||||
def _message_time(msg: Dict[str, Any]) -> Tuple[float, str]:
|
||||
for key in ("created_at", "timestamp", "time"):
|
||||
value = msg.get(key)
|
||||
if value is None:
|
||||
continue
|
||||
if isinstance(value, (int, float)):
|
||||
ts = float(value)
|
||||
return ts, _iso_from_timestamp(ts)
|
||||
if isinstance(value, str):
|
||||
parsed = _parse_time_string(value)
|
||||
if parsed is not None:
|
||||
return parsed, _iso_from_timestamp(parsed) if "T" not in value else value.replace("+00:00", "Z")
|
||||
return time.time(), value
|
||||
now = time.time()
|
||||
return now, _iso_from_timestamp(now)
|
||||
|
||||
|
||||
def _parse_time_string(value: str) -> float | None:
|
||||
text = value.strip()
|
||||
if not text:
|
||||
return None
|
||||
try:
|
||||
return float(text)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
normalized = text[:-1] + "+00:00" if text.endswith("Z") else text
|
||||
return datetime.fromisoformat(normalized).timestamp()
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def _iso_from_timestamp(value: float) -> str:
|
||||
return datetime.fromtimestamp(value, tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
|
||||
def _normalize_value(value: str) -> str:
|
||||
normalized = re.sub(r"[^a-z0-9]+", " ", value.lower())
|
||||
normalized = re.sub(r"\s+", " ", normalized).strip()
|
||||
return normalized
|
||||
|
||||
|
||||
def _normalize_entity(value: str) -> str:
|
||||
return _normalize_value(value).replace(" ", "_") or "entity"
|
||||
|
||||
|
||||
def _canonical_key(entity: str, relation: str, normalized_value: str) -> str:
|
||||
return f"{entity}|{relation}|{normalized_value}"
|
||||
|
||||
|
||||
def _ordered_unique(*groups: List[str]) -> List[str]:
|
||||
seen: set[str] = set()
|
||||
ordered: list[str] = []
|
||||
for group in groups:
|
||||
for item in group:
|
||||
if item and item not in seen:
|
||||
seen.add(item)
|
||||
ordered.append(item)
|
||||
return ordered
|
||||
|
||||
|
||||
def _merge_evidence(existing: List[Dict[str, Any]], incoming: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
seen: set[tuple[str, str, str]] = set()
|
||||
merged: list[dict[str, Any]] = []
|
||||
for item in list(existing) + list(incoming):
|
||||
key = (
|
||||
str(item.get("provenance", "")),
|
||||
str(item.get("observed_at", "")),
|
||||
str(item.get("source_text", "")),
|
||||
)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
merged.append(dict(item))
|
||||
return merged
|
||||
|
||||
@@ -356,44 +356,57 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
# -- Auto-extraction (on_session_end) ------------------------------------
|
||||
|
||||
def _auto_extract_facts(self, messages: list) -> None:
|
||||
_PREF_PATTERNS = [
|
||||
re.compile(r'\bI\s+(?:prefer|like|love|use|want|need)\s+(.+)', re.IGNORECASE),
|
||||
re.compile(r'\bmy\s+(?:favorite|preferred|default)\s+\w+\s+is\s+(.+)', re.IGNORECASE),
|
||||
re.compile(r'\bI\s+(?:always|never|usually)\s+(.+)', re.IGNORECASE),
|
||||
]
|
||||
_DECISION_PATTERNS = [
|
||||
re.compile(r'\bwe\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+)', re.IGNORECASE),
|
||||
re.compile(r'\bthe\s+project\s+(?:uses|needs|requires)\s+(.+)', re.IGNORECASE),
|
||||
]
|
||||
from agent.session_compactor import evaluate_extraction_quality, extract_facts_from_messages
|
||||
|
||||
def _store_category(category: str) -> str:
|
||||
if category.startswith("user_pref"):
|
||||
return "user_pref"
|
||||
if category.startswith("project"):
|
||||
return "project"
|
||||
if category.startswith("tool"):
|
||||
return "tool"
|
||||
return "general"
|
||||
|
||||
facts = extract_facts_from_messages(messages)
|
||||
if not facts:
|
||||
return
|
||||
|
||||
extracted = 0
|
||||
for msg in messages:
|
||||
if msg.get("role") != "user":
|
||||
continue
|
||||
content = msg.get("content", "")
|
||||
if not isinstance(content, str) or len(content) < 10:
|
||||
continue
|
||||
|
||||
for pattern in _PREF_PATTERNS:
|
||||
if pattern.search(content):
|
||||
try:
|
||||
self._store.add_fact(content[:400], category="user_pref")
|
||||
extracted += 1
|
||||
except Exception:
|
||||
pass
|
||||
break
|
||||
|
||||
for pattern in _DECISION_PATTERNS:
|
||||
if pattern.search(content):
|
||||
try:
|
||||
self._store.add_fact(content[:400], category="project")
|
||||
extracted += 1
|
||||
except Exception:
|
||||
pass
|
||||
break
|
||||
for fact in facts:
|
||||
try:
|
||||
metadata = dict(fact.metadata)
|
||||
metadata.setdefault("relation", fact.relation)
|
||||
metadata.setdefault("value", fact.content)
|
||||
metadata.setdefault("provenance", [fact.provenance])
|
||||
metadata.setdefault("evidence", list(fact.evidence))
|
||||
metadata.setdefault("observation_count", len(fact.evidence))
|
||||
metadata.setdefault("duplicate_count", max(0, len(fact.evidence) - 1))
|
||||
self._store.add_fact(
|
||||
fact.content,
|
||||
category=_store_category(fact.category),
|
||||
tags=",".join(filter(None, [fact.entity, fact.relation, fact.status])),
|
||||
canonical_key=fact.canonical_key,
|
||||
metadata=metadata,
|
||||
confidence=fact.confidence,
|
||||
source_role=fact.source_role,
|
||||
source_turn=fact.source_turn,
|
||||
observed_at=fact.observed_at,
|
||||
contradiction_group=fact.contradiction_group,
|
||||
status=fact.status,
|
||||
)
|
||||
extracted += 1
|
||||
except Exception as exc:
|
||||
logger.debug("Structured auto-extract failed for %s: %s", fact.canonical_key, exc)
|
||||
|
||||
if extracted:
|
||||
logger.info("Auto-extracted %d facts from conversation", extracted)
|
||||
metrics = evaluate_extraction_quality(messages)
|
||||
logger.info(
|
||||
"Auto-extracted %d structured facts from conversation (raw=%d normalized=%d contradictions=%d)",
|
||||
extracted,
|
||||
metrics["raw_candidates"],
|
||||
metrics["normalized_facts"],
|
||||
metrics["contradiction_groups"],
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -3,6 +3,7 @@ SQLite-backed fact store with entity resolution and trust scoring.
|
||||
Single-user Hermes memory store plugin.
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import sqlite3
|
||||
import threading
|
||||
@@ -15,16 +16,24 @@ except ImportError:
|
||||
|
||||
_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS facts (
|
||||
fact_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
content TEXT NOT NULL UNIQUE,
|
||||
category TEXT DEFAULT 'general',
|
||||
tags TEXT DEFAULT '',
|
||||
trust_score REAL DEFAULT 0.5,
|
||||
retrieval_count INTEGER DEFAULT 0,
|
||||
helpful_count INTEGER DEFAULT 0,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
hrr_vector BLOB
|
||||
fact_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
content TEXT NOT NULL UNIQUE,
|
||||
category TEXT DEFAULT 'general',
|
||||
tags TEXT DEFAULT '',
|
||||
trust_score REAL DEFAULT 0.5,
|
||||
retrieval_count INTEGER DEFAULT 0,
|
||||
helpful_count INTEGER DEFAULT 0,
|
||||
canonical_key TEXT DEFAULT '',
|
||||
metadata_json TEXT DEFAULT '{}',
|
||||
confidence REAL DEFAULT 0.5,
|
||||
source_role TEXT DEFAULT '',
|
||||
source_turn INTEGER DEFAULT -1,
|
||||
observed_at TEXT DEFAULT '',
|
||||
contradiction_group TEXT DEFAULT '',
|
||||
status TEXT DEFAULT 'active',
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
hrr_vector BLOB
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS entities (
|
||||
@@ -41,9 +50,11 @@ CREATE TABLE IF NOT EXISTS fact_entities (
|
||||
PRIMARY KEY (fact_id, entity_id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
|
||||
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_canonical_key ON facts(canonical_key);
|
||||
CREATE INDEX IF NOT EXISTS idx_facts_contradiction_group ON facts(contradiction_group);
|
||||
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
|
||||
|
||||
CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts
|
||||
USING fts5(content, tags, content=facts, content_rowid=fact_id);
|
||||
@@ -129,10 +140,23 @@ class MemoryStore:
|
||||
"""Create tables, indexes, and triggers if they do not exist. Enable WAL mode."""
|
||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||
self._conn.executescript(_SCHEMA)
|
||||
# Migrate: add hrr_vector column if missing (safe for existing databases)
|
||||
columns = {row[1] for row in self._conn.execute("PRAGMA table_info(facts)").fetchall()}
|
||||
if "hrr_vector" not in columns:
|
||||
self._conn.execute("ALTER TABLE facts ADD COLUMN hrr_vector BLOB")
|
||||
migrations = {
|
||||
"hrr_vector": "ALTER TABLE facts ADD COLUMN hrr_vector BLOB",
|
||||
"canonical_key": "ALTER TABLE facts ADD COLUMN canonical_key TEXT DEFAULT ''",
|
||||
"metadata_json": "ALTER TABLE facts ADD COLUMN metadata_json TEXT DEFAULT '{}'",
|
||||
"confidence": "ALTER TABLE facts ADD COLUMN confidence REAL DEFAULT 0.5",
|
||||
"source_role": "ALTER TABLE facts ADD COLUMN source_role TEXT DEFAULT ''",
|
||||
"source_turn": "ALTER TABLE facts ADD COLUMN source_turn INTEGER DEFAULT -1",
|
||||
"observed_at": "ALTER TABLE facts ADD COLUMN observed_at TEXT DEFAULT ''",
|
||||
"contradiction_group": "ALTER TABLE facts ADD COLUMN contradiction_group TEXT DEFAULT ''",
|
||||
"status": "ALTER TABLE facts ADD COLUMN status TEXT DEFAULT 'active'",
|
||||
}
|
||||
for column, ddl in migrations.items():
|
||||
if column not in columns:
|
||||
self._conn.execute(ddl)
|
||||
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_canonical_key ON facts(canonical_key)")
|
||||
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_contradiction_group ON facts(contradiction_group)")
|
||||
self._conn.commit()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
@@ -144,41 +168,148 @@ class MemoryStore:
|
||||
content: str,
|
||||
category: str = "general",
|
||||
tags: str = "",
|
||||
*,
|
||||
canonical_key: str = "",
|
||||
metadata: dict | None = None,
|
||||
confidence: float | None = None,
|
||||
source_role: str = "",
|
||||
source_turn: int = -1,
|
||||
observed_at: str = "",
|
||||
contradiction_group: str = "",
|
||||
status: str = "active",
|
||||
) -> int:
|
||||
"""Insert a fact and return its fact_id.
|
||||
|
||||
Deduplicates by content (UNIQUE constraint). On duplicate, returns
|
||||
the existing fact_id without modifying the row. Extracts entities from
|
||||
the content and links them to the fact.
|
||||
Exact duplicates are deduplicated by content. Near-duplicates are
|
||||
normalized by canonical_key, with provenance/evidence merged into the
|
||||
existing row. Contradictions sharing the same contradiction_group remain
|
||||
stored as separate rows and are marked inspectably.
|
||||
"""
|
||||
with self._lock:
|
||||
content = content.strip()
|
||||
if not content:
|
||||
raise ValueError("content must not be empty")
|
||||
|
||||
metadata = dict(metadata or {})
|
||||
canonical_key = canonical_key.strip()
|
||||
contradiction_group = contradiction_group.strip()
|
||||
observed_at = observed_at.strip()
|
||||
status = status or "active"
|
||||
trust_score = self.default_trust if confidence is None else _clamp_trust(confidence)
|
||||
metadata_json = json.dumps(metadata, sort_keys=True)
|
||||
|
||||
if canonical_key:
|
||||
existing = self._conn.execute(
|
||||
"SELECT fact_id, metadata_json, trust_score, confidence, observed_at FROM facts WHERE canonical_key = ?",
|
||||
(canonical_key,),
|
||||
).fetchone()
|
||||
if existing is not None:
|
||||
merged_metadata = self._merge_metadata(existing["metadata_json"], metadata)
|
||||
merged_trust = max(float(existing["trust_score"]), trust_score)
|
||||
merged_observed_at = existing["observed_at"] or observed_at
|
||||
if observed_at and merged_observed_at:
|
||||
merged_observed_at = min(merged_observed_at, observed_at)
|
||||
elif observed_at:
|
||||
merged_observed_at = observed_at
|
||||
self._conn.execute(
|
||||
"""
|
||||
UPDATE facts
|
||||
SET metadata_json = ?,
|
||||
trust_score = ?,
|
||||
confidence = ?,
|
||||
observed_at = ?,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE fact_id = ?
|
||||
""",
|
||||
(
|
||||
json.dumps(merged_metadata, sort_keys=True),
|
||||
merged_trust,
|
||||
max(float(existing["confidence"] or 0.0), confidence or trust_score),
|
||||
merged_observed_at,
|
||||
existing["fact_id"],
|
||||
),
|
||||
)
|
||||
self._conn.commit()
|
||||
return int(existing["fact_id"])
|
||||
|
||||
contradiction_rows = []
|
||||
if contradiction_group:
|
||||
contradiction_rows = self._conn.execute(
|
||||
"""
|
||||
SELECT fact_id, canonical_key, metadata_json
|
||||
FROM facts
|
||||
WHERE contradiction_group = ?
|
||||
AND canonical_key != ?
|
||||
""",
|
||||
(contradiction_group, canonical_key),
|
||||
).fetchall()
|
||||
if contradiction_rows:
|
||||
status = "contradiction"
|
||||
metadata = dict(metadata)
|
||||
metadata["status"] = "contradiction"
|
||||
metadata["contradiction_group"] = contradiction_group
|
||||
metadata["contradiction_keys"] = sorted(
|
||||
{
|
||||
canonical_key,
|
||||
*[str(row["canonical_key"]) for row in contradiction_rows if row["canonical_key"]],
|
||||
}
|
||||
- {""}
|
||||
)
|
||||
metadata_json = json.dumps(metadata, sort_keys=True)
|
||||
|
||||
try:
|
||||
cur = self._conn.execute(
|
||||
"""
|
||||
INSERT INTO facts (content, category, tags, trust_score)
|
||||
VALUES (?, ?, ?, ?)
|
||||
INSERT INTO facts (
|
||||
content,
|
||||
category,
|
||||
tags,
|
||||
trust_score,
|
||||
canonical_key,
|
||||
metadata_json,
|
||||
confidence,
|
||||
source_role,
|
||||
source_turn,
|
||||
observed_at,
|
||||
contradiction_group,
|
||||
status
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(content, category, tags, self.default_trust),
|
||||
(
|
||||
content,
|
||||
category,
|
||||
tags,
|
||||
trust_score,
|
||||
canonical_key,
|
||||
metadata_json,
|
||||
confidence if confidence is not None else trust_score,
|
||||
source_role,
|
||||
source_turn,
|
||||
observed_at,
|
||||
contradiction_group,
|
||||
status,
|
||||
),
|
||||
)
|
||||
self._conn.commit()
|
||||
fact_id: int = cur.lastrowid # type: ignore[assignment]
|
||||
except sqlite3.IntegrityError:
|
||||
# Duplicate content — return existing id
|
||||
row = self._conn.execute(
|
||||
"SELECT fact_id FROM facts WHERE content = ?", (content,)
|
||||
).fetchone()
|
||||
return int(row["fact_id"])
|
||||
|
||||
# Entity extraction and linking
|
||||
if contradiction_rows:
|
||||
self._mark_contradictions(
|
||||
contradiction_group=contradiction_group,
|
||||
new_canonical_key=canonical_key,
|
||||
existing_rows=contradiction_rows,
|
||||
)
|
||||
|
||||
for name in self._extract_entities(content):
|
||||
entity_id = self._resolve_entity(name)
|
||||
self._link_fact_entity(fact_id, entity_id)
|
||||
|
||||
# Compute HRR vector after entity linking
|
||||
self._compute_hrr_vector(fact_id, content)
|
||||
self._rebuild_bank(category)
|
||||
|
||||
@@ -211,6 +342,9 @@ class MemoryStore:
|
||||
sql = f"""
|
||||
SELECT f.fact_id, f.content, f.category, f.tags,
|
||||
f.trust_score, f.retrieval_count, f.helpful_count,
|
||||
f.canonical_key, f.metadata_json, f.confidence,
|
||||
f.source_role, f.source_turn, f.observed_at,
|
||||
f.contradiction_group, f.status,
|
||||
f.created_at, f.updated_at
|
||||
FROM facts f
|
||||
JOIN facts_fts fts ON fts.rowid = f.fact_id
|
||||
@@ -336,7 +470,11 @@ class MemoryStore:
|
||||
|
||||
sql = f"""
|
||||
SELECT fact_id, content, category, tags, trust_score,
|
||||
retrieval_count, helpful_count, created_at, updated_at
|
||||
retrieval_count, helpful_count,
|
||||
canonical_key, metadata_json, confidence,
|
||||
source_role, source_turn, observed_at,
|
||||
contradiction_group, status,
|
||||
created_at, updated_at
|
||||
FROM facts
|
||||
WHERE trust_score >= ?
|
||||
{category_clause}
|
||||
@@ -387,6 +525,89 @@ class MemoryStore:
|
||||
"helpful_count": row["helpful_count"] + helpful_increment,
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Metadata helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _load_metadata(self, metadata_json: str | None) -> dict:
|
||||
if not metadata_json:
|
||||
return {}
|
||||
try:
|
||||
data = json.loads(metadata_json)
|
||||
return data if isinstance(data, dict) else {}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
def _merge_metadata(self, existing_json: str | None, incoming: dict | None) -> dict:
|
||||
existing = self._load_metadata(existing_json)
|
||||
incoming = dict(incoming or {})
|
||||
merged = dict(existing)
|
||||
merged.update({k: v for k, v in incoming.items() if k not in {"provenance", "evidence", "observation_count", "duplicate_count", "contradiction_keys"}})
|
||||
|
||||
provenance = []
|
||||
seen_provenance: set[str] = set()
|
||||
for item in list(existing.get("provenance", [])) + list(incoming.get("provenance", [])):
|
||||
if item and item not in seen_provenance:
|
||||
seen_provenance.add(item)
|
||||
provenance.append(item)
|
||||
|
||||
evidence = []
|
||||
seen_evidence: set[tuple[str, str, str]] = set()
|
||||
for item in list(existing.get("evidence", [])) + list(incoming.get("evidence", [])):
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
key = (
|
||||
str(item.get("provenance", "")),
|
||||
str(item.get("observed_at", "")),
|
||||
str(item.get("source_text", "")),
|
||||
)
|
||||
if key in seen_evidence:
|
||||
continue
|
||||
seen_evidence.add(key)
|
||||
evidence.append(dict(item))
|
||||
|
||||
observation_count = int(existing.get("observation_count", max(1, len(existing.get("evidence", [])) or 1)))
|
||||
observation_count += int(incoming.get("observation_count", max(1, len(incoming.get("evidence", [])) or 1)))
|
||||
|
||||
contradiction_keys = []
|
||||
seen_keys: set[str] = set()
|
||||
for item in list(existing.get("contradiction_keys", [])) + list(incoming.get("contradiction_keys", [])):
|
||||
if item and item not in seen_keys:
|
||||
seen_keys.add(item)
|
||||
contradiction_keys.append(item)
|
||||
|
||||
merged["provenance"] = provenance
|
||||
merged["evidence"] = evidence
|
||||
merged["observation_count"] = observation_count
|
||||
merged["duplicate_count"] = max(0, observation_count - 1)
|
||||
if contradiction_keys:
|
||||
merged["contradiction_keys"] = contradiction_keys
|
||||
return merged
|
||||
|
||||
def _mark_contradictions(self, contradiction_group: str, new_canonical_key: str, existing_rows: list[sqlite3.Row]) -> None:
|
||||
for row in existing_rows:
|
||||
metadata = self._load_metadata(row["metadata_json"])
|
||||
keys = []
|
||||
seen: set[str] = set()
|
||||
for item in list(metadata.get("contradiction_keys", [])) + [new_canonical_key]:
|
||||
if item and item not in seen:
|
||||
seen.add(item)
|
||||
keys.append(item)
|
||||
metadata["status"] = "contradiction"
|
||||
metadata["contradiction_group"] = contradiction_group
|
||||
metadata["contradiction_keys"] = keys
|
||||
self._conn.execute(
|
||||
"""
|
||||
UPDATE facts
|
||||
SET status = 'contradiction',
|
||||
metadata_json = ?,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE fact_id = ?
|
||||
""",
|
||||
(json.dumps(metadata, sort_keys=True), row["fact_id"]),
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Entity helpers
|
||||
# ------------------------------------------------------------------
|
||||
@@ -560,8 +781,14 @@ class MemoryStore:
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _row_to_dict(self, row: sqlite3.Row) -> dict:
|
||||
"""Convert a sqlite3.Row to a plain dict."""
|
||||
return dict(row)
|
||||
"""Convert a sqlite3.Row to a plain dict with decoded metadata."""
|
||||
data = dict(row)
|
||||
metadata = self._load_metadata(data.get("metadata_json"))
|
||||
if metadata:
|
||||
data["metadata"] = metadata
|
||||
data.setdefault("relation", metadata.get("relation"))
|
||||
data.pop("metadata_json", None)
|
||||
return data
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the database connection."""
|
||||
|
||||
@@ -1,150 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from agent import agent_card as mod
|
||||
|
||||
|
||||
DEFAULT_DESCRIPTION = "Sovereign AI agent — orchestration, code, research"
|
||||
|
||||
|
||||
def _set_base_context(monkeypatch, *, name: str = "Timmy", description: str = DEFAULT_DESCRIPTION, url: str = "https://timmy.local:9443/a2a", skills=None):
|
||||
monkeypatch.setattr(mod, "load_config", lambda: {"agent": {"name": name, "description": description}})
|
||||
monkeypatch.setattr(
|
||||
mod,
|
||||
"_load_skills",
|
||||
lambda: list(
|
||||
skills
|
||||
if skills is not None
|
||||
else [
|
||||
mod.AgentSkill(
|
||||
id="code",
|
||||
name="Code Implementation",
|
||||
description="Implement and patch code",
|
||||
tags=["python", "gitea"],
|
||||
)
|
||||
]
|
||||
),
|
||||
)
|
||||
monkeypatch.setenv("HERMES_A2A_PUBLIC_URL", url)
|
||||
monkeypatch.delenv("HERMES_AGENT_NAME", raising=False)
|
||||
monkeypatch.delenv("AGENT_NAME", raising=False)
|
||||
monkeypatch.delenv("HERMES_AGENT_DESCRIPTION", raising=False)
|
||||
monkeypatch.delenv("AGENT_DESCRIPTION", raising=False)
|
||||
|
||||
|
||||
def test_build_agent_card_matches_issue_802_schema(monkeypatch):
|
||||
_set_base_context(monkeypatch)
|
||||
|
||||
card = mod.build_agent_card()
|
||||
payload = card.to_dict()
|
||||
|
||||
assert payload["name"] == "Timmy"
|
||||
assert payload["description"] == DEFAULT_DESCRIPTION
|
||||
assert payload["url"] == "https://timmy.local:9443/a2a"
|
||||
assert payload["capabilities"] == {
|
||||
"streaming": True,
|
||||
"pushNotifications": False,
|
||||
"stateTransitionHistory": True,
|
||||
}
|
||||
assert payload["defaultInputModes"] == ["text/plain", "application/json"]
|
||||
assert payload["defaultOutputModes"] == ["text/plain", "application/json"]
|
||||
assert payload["skills"][0]["tags"] == ["python", "gitea"]
|
||||
assert mod.validate_agent_card(payload) == []
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("name", "url"),
|
||||
[
|
||||
("Timmy", "https://timmy.local:9443/a2a"),
|
||||
("Allegro", "https://allegro.local:9443/a2a"),
|
||||
("Ezra", "https://ezra.local:9443/a2a"),
|
||||
],
|
||||
)
|
||||
def test_build_agent_card_supports_fleet_members(monkeypatch, name, url):
|
||||
_set_base_context(monkeypatch, name=name, url=url, skills=[])
|
||||
|
||||
payload = mod.build_agent_card().to_dict()
|
||||
|
||||
assert payload["name"] == name
|
||||
assert payload["url"] == url
|
||||
assert mod.validate_agent_card(payload) == []
|
||||
|
||||
|
||||
def test_load_skills_collects_tags_and_category(monkeypatch, tmp_path):
|
||||
skill_root = tmp_path / "skills"
|
||||
skill_dir = skill_root / "code-implementation"
|
||||
skill_dir.mkdir(parents=True)
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
"""---
|
||||
name: Code Implementation
|
||||
description: Implement and patch code
|
||||
tags: [python, gitea]
|
||||
category: discovery
|
||||
---
|
||||
|
||||
# Code Implementation
|
||||
""",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
monkeypatch.setattr(mod, "get_all_skills_dirs", lambda: [skill_root])
|
||||
monkeypatch.setattr(mod, "get_disabled_skill_names", lambda: set())
|
||||
monkeypatch.setattr(mod, "skill_matches_platform", lambda _frontmatter: True)
|
||||
|
||||
skills = mod._load_skills()
|
||||
|
||||
assert len(skills) == 1
|
||||
assert skills[0].id == "code-implementation"
|
||||
assert skills[0].name == "Code Implementation"
|
||||
assert skills[0].description == "Implement and patch code"
|
||||
assert skills[0].tags == ["python", "gitea", "discovery"]
|
||||
|
||||
|
||||
def test_validate_agent_card_reports_schema_errors():
|
||||
errors = mod.validate_agent_card(
|
||||
{
|
||||
"name": "",
|
||||
"description": "",
|
||||
"url": "timmy.local",
|
||||
"version": "",
|
||||
"capabilities": {"streaming": True},
|
||||
"skills": [{"id": "", "name": "", "tags": "python"}],
|
||||
"defaultInputModes": ["text/plain"],
|
||||
"defaultOutputModes": ["plain"],
|
||||
"metadata": [],
|
||||
}
|
||||
)
|
||||
|
||||
assert any("name must be a non-empty string" in error for error in errors)
|
||||
assert any("url must be an absolute http/https URL" in error for error in errors)
|
||||
assert any("capabilities.pushNotifications" in error for error in errors)
|
||||
assert any("skills[0] missing id" in error for error in errors)
|
||||
assert any("skills[0].tags must be a list" in error for error in errors)
|
||||
assert any("defaultInputModes must include application/json" in error for error in errors)
|
||||
assert any("defaultOutputModes entries must be MIME types" in error for error in errors)
|
||||
assert any("metadata must be an object" in error for error in errors)
|
||||
|
||||
|
||||
def test_get_agent_card_json_emits_valid_json(monkeypatch):
|
||||
_set_base_context(monkeypatch)
|
||||
|
||||
payload = json.loads(mod.get_agent_card_json())
|
||||
|
||||
assert payload["name"] == "Timmy"
|
||||
assert mod.validate_agent_card(payload) == []
|
||||
|
||||
|
||||
def test_main_validate_prints_card(monkeypatch, capsys):
|
||||
_set_base_context(monkeypatch)
|
||||
|
||||
exit_code = mod.main(["--validate"])
|
||||
captured = capsys.readouterr()
|
||||
|
||||
assert exit_code == 0
|
||||
payload = json.loads(captured.out)
|
||||
assert payload["url"] == "https://timmy.local:9443/a2a"
|
||||
assert captured.err == ""
|
||||
63
tests/fixtures/memory_extraction_fragments.json
vendored
Normal file
63
tests/fixtures/memory_extraction_fragments.json
vendored
Normal file
@@ -0,0 +1,63 @@
|
||||
{
|
||||
"preferences_and_duplicates": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Deploy via Ansible for production changes.",
|
||||
"created_at": "2026-04-22T10:00:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "We deploy through Ansible on this repo.",
|
||||
"created_at": "2026-04-22T10:01:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Gitea-first for repository work.",
|
||||
"created_at": "2026-04-22T10:02:00Z"
|
||||
}
|
||||
],
|
||||
"operational_and_contradictions": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "The BURN watchdog caps dispatches per cycle to 6.",
|
||||
"created_at": "2026-04-22T11:00:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "The provider should stay openai-codex/gpt-5.4.",
|
||||
"created_at": "2026-04-22T11:01:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Correction: the provider should stay mimo-v2-pro.",
|
||||
"created_at": "2026-04-22T11:02:00Z"
|
||||
}
|
||||
],
|
||||
"mixed_transcript": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Deploy via Ansible for production changes.",
|
||||
"created_at": "2026-04-22T10:00:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "We deploy through Ansible on this repo.",
|
||||
"created_at": "2026-04-22T10:01:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "The BURN watchdog caps dispatches per cycle to 6.",
|
||||
"created_at": "2026-04-22T11:00:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "The provider should stay openai-codex/gpt-5.4.",
|
||||
"created_at": "2026-04-22T11:01:00Z"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Correction: the provider should stay mimo-v2-pro.",
|
||||
"created_at": "2026-04-22T11:02:00Z"
|
||||
}
|
||||
]
|
||||
}
|
||||
50
tests/plugins/memory/test_holographic_auto_extract.py
Normal file
50
tests/plugins/memory/test_holographic_auto_extract.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Integration tests for holographic auto-extraction with structured fact persistence."""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
|
||||
|
||||
from plugins.memory.holographic import HolographicMemoryProvider
|
||||
|
||||
_FIXTURE_PATH = Path(__file__).resolve().parents[2] / "fixtures" / "memory_extraction_fragments.json"
|
||||
|
||||
|
||||
def _load_fixture(name: str):
|
||||
return json.loads(_FIXTURE_PATH.read_text())[name]
|
||||
|
||||
|
||||
class TestHolographicAutoExtract:
|
||||
def test_auto_extract_persists_structured_metadata_and_normalizes_duplicates(self, tmp_path):
|
||||
provider = HolographicMemoryProvider(
|
||||
config={
|
||||
"db_path": str(tmp_path / "memory_store.db"),
|
||||
"auto_extract": True,
|
||||
"default_trust": 0.5,
|
||||
}
|
||||
)
|
||||
provider.initialize("test-session")
|
||||
|
||||
messages = _load_fixture("mixed_transcript")
|
||||
provider.on_session_end(messages)
|
||||
provider.on_session_end(messages)
|
||||
|
||||
facts = provider._store.list_facts(min_trust=0.0, limit=20)
|
||||
deploy_facts = [f for f in facts if f.get("relation") == "workflow.deploy_method"]
|
||||
provider_facts = [f for f in facts if f.get("contradiction_group") == "config.provider"]
|
||||
|
||||
assert len(deploy_facts) == 1
|
||||
assert deploy_facts[0]["metadata"]["duplicate_count"] >= 3
|
||||
assert deploy_facts[0]["observed_at"] == "2026-04-22T10:00:00Z"
|
||||
assert deploy_facts[0]["metadata"]["provenance"] == [
|
||||
"conversation:user:0",
|
||||
"conversation:user:1",
|
||||
]
|
||||
|
||||
assert len(provider_facts) == 2
|
||||
assert {f["status"] for f in provider_facts} == {"contradiction"}
|
||||
assert {f["metadata"]["value"] for f in provider_facts} == {
|
||||
"openai-codex/gpt-5.4",
|
||||
"mimo-v2-pro",
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
"""Tests for session compaction with fact extraction."""
|
||||
|
||||
import pytest
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
@@ -8,12 +8,19 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from agent.session_compactor import (
|
||||
ExtractedFact,
|
||||
extract_facts_from_messages,
|
||||
save_facts_to_store,
|
||||
evaluate_extraction_quality,
|
||||
extract_and_save_facts,
|
||||
extract_facts_from_messages,
|
||||
format_facts_summary,
|
||||
save_facts_to_store,
|
||||
)
|
||||
|
||||
_FIXTURE_PATH = Path(__file__).resolve().parent / "fixtures" / "memory_extraction_fragments.json"
|
||||
|
||||
|
||||
def _load_fixture(name: str):
|
||||
return json.loads(_FIXTURE_PATH.read_text())[name]
|
||||
|
||||
|
||||
class TestFactExtraction:
|
||||
def test_extract_preference(self):
|
||||
@@ -60,14 +67,48 @@ class TestFactExtraction:
|
||||
{"role": "user", "content": "I prefer Python."},
|
||||
]
|
||||
facts = extract_facts_from_messages(messages)
|
||||
# Should deduplicate
|
||||
python_facts = [f for f in facts if "Python" in f.content]
|
||||
assert len(python_facts) == 1
|
||||
|
||||
def test_structured_fact_preserves_provenance_and_temporal_metadata(self):
|
||||
facts = extract_facts_from_messages(_load_fixture("preferences_and_duplicates"))
|
||||
deploy_fact = next(f for f in facts if f.relation == "workflow.deploy_method")
|
||||
assert deploy_fact.source_role == "user"
|
||||
assert deploy_fact.source_turn == 0
|
||||
assert deploy_fact.observed_at == "2026-04-22T10:00:00Z"
|
||||
assert deploy_fact.provenance == "conversation:user:0"
|
||||
assert deploy_fact.canonical_key
|
||||
assert deploy_fact.evidence
|
||||
assert deploy_fact.evidence[0]["source_text"].startswith("Deploy via Ansible")
|
||||
|
||||
def test_near_duplicate_facts_are_normalized_into_one_canonical_fact(self):
|
||||
facts = extract_facts_from_messages(_load_fixture("preferences_and_duplicates"))
|
||||
deploy_facts = [f for f in facts if f.relation == "workflow.deploy_method"]
|
||||
assert len(deploy_facts) == 1
|
||||
assert len(deploy_facts[0].evidence) == 2
|
||||
assert deploy_facts[0].metadata["duplicate_count"] == 1
|
||||
|
||||
def test_contradictory_facts_are_preserved_for_unique_slots(self):
|
||||
facts = extract_facts_from_messages(_load_fixture("operational_and_contradictions"))
|
||||
provider_facts = [f for f in facts if f.contradiction_group == "config.provider"]
|
||||
assert len(provider_facts) == 2
|
||||
assert {f.status for f in provider_facts} == {"contradiction"}
|
||||
assert {f.normalized_content for f in provider_facts} == {
|
||||
"openai codex gpt 5 4",
|
||||
"mimo v2 pro",
|
||||
}
|
||||
|
||||
def test_quality_evaluation_reports_noise_reduction(self):
|
||||
metrics = evaluate_extraction_quality(_load_fixture("mixed_transcript"))
|
||||
assert metrics["raw_candidates"] > metrics["normalized_facts"]
|
||||
assert metrics["noise_reduction"] > 0
|
||||
assert metrics["contradiction_groups"] == 1
|
||||
|
||||
|
||||
class TestSaveFacts:
|
||||
def test_save_with_callback(self):
|
||||
saved = []
|
||||
|
||||
def mock_save(category, entity, content, trust):
|
||||
saved.append({"category": category, "content": content})
|
||||
|
||||
@@ -76,6 +117,38 @@ class TestSaveFacts:
|
||||
assert count == 1
|
||||
assert len(saved) == 1
|
||||
|
||||
def test_save_with_extended_callback_metadata(self):
|
||||
saved = []
|
||||
|
||||
def mock_save(category, entity, content, trust, **kwargs):
|
||||
saved.append({
|
||||
"category": category,
|
||||
"entity": entity,
|
||||
"content": content,
|
||||
"trust": trust,
|
||||
**kwargs,
|
||||
})
|
||||
|
||||
fact = ExtractedFact(
|
||||
"project.operational",
|
||||
"watchdog",
|
||||
"BURN watchdog caps dispatches per cycle to 6",
|
||||
0.9,
|
||||
2,
|
||||
source_role="user",
|
||||
observed_at="2026-04-22T11:00:00Z",
|
||||
provenance="conversation:user:2",
|
||||
canonical_key="project.operational|watchdog|dispatch_cap|6",
|
||||
relation="fleet.dispatch_cap",
|
||||
contradiction_group="fleet.dispatch_cap",
|
||||
metadata={"duplicate_count": 0},
|
||||
)
|
||||
count = save_facts_to_store([fact], fact_store_fn=mock_save)
|
||||
assert count == 1
|
||||
assert saved[0]["canonical_key"] == fact.canonical_key
|
||||
assert saved[0]["observed_at"] == "2026-04-22T11:00:00Z"
|
||||
assert saved[0]["metadata"]["duplicate_count"] == 0
|
||||
|
||||
|
||||
class TestFormatSummary:
|
||||
def test_empty(self):
|
||||
|
||||
Reference in New Issue
Block a user