Compare commits

..

3 Commits

Author SHA1 Message Date
Alexander Whitestone
fc1db11f9b fix: preserve explicit KittenTTS output format outside Telegram
All checks were successful
Lint / lint (pull_request) Successful in 8s
Refs #955
2026-04-22 10:57:02 -04:00
Alexander Whitestone
4b075f5055 feat: add KittenTTS local provider support for #955
Refs #955
2026-04-22 10:51:32 -04:00
Alexander Whitestone
7eace4ead9 wip: add failing KittenTTS QA coverage for #955
Refs #955
2026-04-22 10:41:18 -04:00
12 changed files with 649 additions and 1088 deletions

View File

@@ -1,546 +1,197 @@
"""Session compaction with structured fact extraction.
"""Session compaction with fact extraction.
Before compressing conversation context, extract durable facts with enough
structure to survive retrieval: source/provenance, temporal anchors,
normalized canonical keys, and contradiction groups.
Before compressing conversation context, extracts durable facts
(user preferences, corrections, project details) and saves them
to the fact store so they survive compression.
Usage:
from agent.session_compactor import extract_and_save_facts
facts = extract_and_save_facts(messages)
"""
from __future__ import annotations
import json
import logging
import re
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
_DEPLOY_METHOD_RE = re.compile(r"\bdeploy(?:ing)?\s+(?:via|through|with)\s+([A-Za-z0-9_./+-]+)", re.IGNORECASE)
_WATCHDOG_CAP_RE = re.compile(
r"\b(?:the\s+)?([A-Za-z0-9_-]+(?:\s+watchdog)?)\s+(?:caps|limits)\s+dispatches(?:\s+per\s+cycle)?\s+to\s+([0-9]+)",
re.IGNORECASE,
)
_PROVIDER_RE = re.compile(
r"\bprovider\s+(?:is|should\s+stay|should\s+be|needs\s+to\s+be)\s+([A-Za-z0-9._/-]+)",
re.IGNORECASE,
)
_MODEL_RE = re.compile(
r"\bmodel\s+(?:is|should\s+stay|should\s+be|needs\s+to\s+be)\s+([A-Za-z0-9._:/-]+)",
re.IGNORECASE,
)
_PORT_RE = re.compile(r"\bport\s+(?:is|should\s+be)\s+([0-9]+)", re.IGNORECASE)
_PROJECT_USES_RE = re.compile(r"\b(?:the\s+)?project\s+(?:uses|needs|requires)\s+(.+?)(?:[.!?]|$)", re.IGNORECASE)
_PREFERENCE_RE = re.compile(r"\bI\s+(?:prefer|like|want|need)\s+(.+?)(?:[.!?]|$)", re.IGNORECASE)
_CONSTRAINT_RE = re.compile(r"\b(?:do\s+not|don't)\s+(?:ever\s+|again\s+)?(.+?)(?:[.!?]|$)", re.IGNORECASE)
_DECISION_RE = re.compile(r"\b(?:we|the\s+team)\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+?)(?:[.!?]|$)", re.IGNORECASE)
@dataclass
class ExtractedFact:
"""A durable fact extracted from conversation."""
category: str
entity: str
content: str
confidence: float
source_turn: int
"""A fact extracted from conversation."""
category: str # "user_pref", "correction", "project", "tool_quirk", "general"
entity: str # what the fact is about
content: str # the fact itself
confidence: float # 0.0-1.0
source_turn: int # which message turn it came from
timestamp: float = 0.0
source_role: str = "user"
source_text: str = ""
normalized_content: str = ""
canonical_key: str = ""
relation: str = "general"
contradiction_group: str = ""
status: str = "active"
provenance: str = ""
observed_at: str = ""
evidence: List[Dict[str, Any]] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if not self.timestamp:
self.timestamp = time.time()
if not self.observed_at:
self.observed_at = _iso_from_timestamp(self.timestamp)
if not self.normalized_content:
self.normalized_content = _normalize_value(self.content)
if not self.provenance:
self.provenance = f"conversation:{self.source_role}:{self.source_turn}"
if not self.canonical_key:
self.canonical_key = _canonical_key(self.entity, self.relation, self.normalized_content)
if not self.evidence:
self.evidence = [
{
"source_role": self.source_role,
"source_turn": self.source_turn,
"source_text": self.source_text or self.content,
"observed_at": self.observed_at,
"provenance": self.provenance,
}
]
self.metadata = dict(self.metadata or {})
self.metadata.setdefault("entity", self.entity)
self.metadata.setdefault("relation", self.relation)
self.metadata.setdefault("value", self.content)
self.metadata.setdefault("normalized_value", self.normalized_content)
self.metadata.setdefault("provenance", [self.provenance])
self.metadata.setdefault("evidence", list(self.evidence))
self.metadata.setdefault("observation_count", len(self.evidence))
self.metadata.setdefault("duplicate_count", max(0, self.metadata["observation_count"] - 1))
if self.contradiction_group:
self.metadata.setdefault("contradiction_group", self.contradiction_group)
self.metadata.setdefault("status", self.status)
# Patterns that indicate user preferences
_PREFERENCE_PATTERNS = [
(r"(?:I|we) (?:prefer|like|want|need) (.+?)(?:\.|$)", "preference"),
(r"(?:always|never) (?:use|do|run|deploy) (.+?)(?:\.|$)", "preference"),
(r"(?:my|our) (?:default|preferred|usual) (.+?) (?:is|are) (.+?)(?:\.|$)", "preference"),
(r"(?:make sure|ensure|remember) (?:to|that) (.+?)(?:\.|$)", "instruction"),
(r"(?:don'?t|do not) (?:ever|ever again) (.+?)(?:\.|$)", "constraint"),
]
# Patterns that indicate corrections
_CORRECTION_PATTERNS = [
(r"(?:actually|no[, ]|wait[, ]|correction[: ]|sorry[, ]) (.+)", "correction"),
(r"(?:I meant|what I meant was|the correct) (.+?)(?:\.|$)", "correction"),
(r"(?:it'?s|its) (?:not|shouldn'?t be|wrong) (.+?)(?:\.|$)", "correction"),
]
# Patterns that indicate project/tool facts
_PROJECT_PATTERNS = [
(r"(?:the |our )?(?:project|repo|codebase|code) (?:is|uses|needs|requires) (.+?)(?:\.|$)", "project"),
(r"(?:deploy|push|commit) (?:to|on) (.+?)(?:\.|$)", "project"),
(r"(?:this|that|the) (?:server|host|machine|VPS) (?:is|runs|has) (.+?)(?:\.|$)", "infrastructure"),
(r"(?:model|provider|engine) (?:is|should be|needs to be) (.+?)(?:\.|$)", "config"),
]
def extract_facts_from_messages(messages: List[Dict[str, Any]]) -> List[ExtractedFact]:
"""Extract durable facts from conversation messages.
Scans conversation turns for preferences, decisions, corrections, and
operational state. Raw candidates are normalized into canonical facts so
near-duplicates merge and contradictions remain inspectable.
Scans user messages for preferences, corrections, project facts,
and infrastructure details that should survive compression.
"""
facts = []
seen_contents = set()
raw_candidates: list[ExtractedFact] = []
for turn_idx, msg in enumerate(messages):
role = msg.get("role", "")
content = msg.get("content", "")
if role not in {"user", "assistant"}:
# Only scan user messages and assistant responses with corrections
if role not in ("user", "assistant"):
continue
if not content or not isinstance(content, str):
continue
if len(content) < 10:
continue
# Skip tool results and system messages
if role == "assistant" and msg.get("tool_calls"):
continue
if not isinstance(content, str) or len(content.strip()) < 10:
continue
timestamp, observed_at = _message_time(msg)
raw_candidates.extend(
_extract_from_text(
content.strip(),
turn_idx=turn_idx,
role=role,
timestamp=timestamp,
observed_at=observed_at,
)
)
extracted = _extract_from_text(content, turn_idx, role)
return _normalize_candidates(raw_candidates)
def evaluate_extraction_quality(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Return before/after metrics for raw vs normalized extraction quality."""
raw_candidates: list[ExtractedFact] = []
for turn_idx, msg in enumerate(messages):
role = msg.get("role", "")
content = msg.get("content", "")
if role not in {"user", "assistant"}:
continue
if role == "assistant" and msg.get("tool_calls"):
continue
if not isinstance(content, str) or len(content.strip()) < 10:
continue
timestamp, observed_at = _message_time(msg)
raw_candidates.extend(
_extract_from_text(
content.strip(),
turn_idx=turn_idx,
role=role,
timestamp=timestamp,
observed_at=observed_at,
)
)
normalized = _normalize_candidates(raw_candidates)
raw_count = len(raw_candidates)
normalized_count = len(normalized)
contradiction_groups = {
fact.contradiction_group
for fact in normalized
if fact.status == "contradiction" and fact.contradiction_group
}
duplicate_count = max(0, raw_count - normalized_count)
noise_reduction = (duplicate_count / raw_count) if raw_count else 0.0
return {
"raw_candidates": raw_count,
"normalized_facts": normalized_count,
"duplicates_merged": duplicate_count,
"contradiction_groups": len(contradiction_groups),
"noise_reduction": round(noise_reduction, 3),
}
def _extract_from_text(
text: str,
*,
turn_idx: int,
role: str,
timestamp: float,
observed_at: str,
) -> List[ExtractedFact]:
"""Extract raw fact candidates from a single text block."""
facts: list[ExtractedFact] = []
if role != "user":
return facts
deploy_match = _DEPLOY_METHOD_RE.search(text)
if deploy_match:
method = deploy_match.group(1).strip()
facts.append(
_build_fact(
category="project.decision",
entity="project",
relation="workflow.deploy_method",
value=method,
content=f"Deploy via {method}",
confidence=0.88,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
watchdog_match = _WATCHDOG_CAP_RE.search(text)
if watchdog_match:
watchdog = watchdog_match.group(1).strip()
cap = watchdog_match.group(2).strip()
facts.append(
_build_fact(
category="project.operational",
entity=_normalize_entity(watchdog),
relation="fleet.dispatch_cap",
value=cap,
content=f"{watchdog} caps dispatches per cycle to {cap}",
confidence=0.92,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
provider_match = _PROVIDER_RE.search(text)
if provider_match:
provider = provider_match.group(1).strip()
facts.append(
_build_fact(
category="project.config",
entity="project",
relation="config.provider",
value=provider,
content=f"Provider should stay {provider}",
confidence=0.91,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
model_match = _MODEL_RE.search(text)
if model_match:
model = model_match.group(1).strip()
facts.append(
_build_fact(
category="project.config",
entity="project",
relation="config.model",
value=model,
content=f"Model should stay {model}",
confidence=0.9,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
port_match = _PORT_RE.search(text)
if port_match:
port = port_match.group(1).strip()
facts.append(
_build_fact(
category="project.config",
entity="project",
relation="config.port",
value=port,
content=f"Port is {port}",
confidence=0.9,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=True,
)
)
project_match = _PROJECT_USES_RE.search(text)
if project_match:
value = project_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="project.stack",
entity="project",
relation="project.stack",
value=value,
content=f"Project uses {value}",
confidence=0.74,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
preference_match = _PREFERENCE_RE.search(text)
if preference_match:
value = preference_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="user_pref.preference",
entity="user",
relation="user.preference",
value=value,
content=value,
confidence=0.72,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
constraint_match = _CONSTRAINT_RE.search(text)
if constraint_match:
value = constraint_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="user_pref.constraint",
entity="user",
relation="user.constraint",
value=value,
content=f"Do not {value}",
confidence=0.82,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
decision_match = _DECISION_RE.search(text)
if decision_match:
value = decision_match.group(1).strip().rstrip(".")
facts.append(
_build_fact(
category="project.decision",
entity="project",
relation="project.decision",
value=value,
content=f"Decision: {value}",
confidence=0.79,
source_turn=turn_idx,
source_role=role,
source_text=text,
timestamp=timestamp,
observed_at=observed_at,
unique_slot=False,
)
)
# Deduplicate by content
for fact in extracted:
key = f"{fact.category}:{fact.content[:100]}"
if key not in seen_contents:
seen_contents.add(key)
facts.append(fact)
return facts
def _build_fact(
*,
category: str,
entity: str,
relation: str,
value: str,
content: str,
confidence: float,
source_turn: int,
source_role: str,
source_text: str,
timestamp: float,
observed_at: str,
unique_slot: bool,
) -> ExtractedFact:
normalized_value = _normalize_value(value.rstrip(".!?"))
value = value.rstrip(".!?")
content = content.rstrip(".!?")
provenance = f"conversation:{source_role}:{source_turn}"
contradiction_group = relation if unique_slot else ""
evidence = [
{
"source_role": source_role,
"source_turn": source_turn,
"source_text": source_text,
"observed_at": observed_at,
"provenance": provenance,
}
]
metadata = {
"entity": entity,
"relation": relation,
"value": value,
"normalized_value": normalized_value,
"provenance": [provenance],
"evidence": list(evidence),
"observation_count": 1,
"duplicate_count": 0,
"status": "active",
}
if contradiction_group:
metadata["contradiction_group"] = contradiction_group
return ExtractedFact(
category=category,
entity=entity,
content=content,
confidence=confidence,
source_turn=source_turn,
timestamp=timestamp,
source_role=source_role,
source_text=source_text,
normalized_content=normalized_value,
canonical_key=_canonical_key(entity, relation, normalized_value),
relation=relation,
contradiction_group=contradiction_group,
status="active",
provenance=provenance,
observed_at=observed_at,
evidence=evidence,
metadata=metadata,
)
def _extract_from_text(text: str, turn_idx: int, role: str) -> List[ExtractedFact]:
"""Extract facts from a single text block."""
facts = []
timestamp = time.time()
# Clean text for pattern matching
clean = text.strip()
def _normalize_candidates(candidates: List[ExtractedFact]) -> List[ExtractedFact]:
"""Merge duplicates and mark contradictions while preserving evidence."""
# User preference patterns (from user messages)
if role == "user":
for pattern, subcategory in _PREFERENCE_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"user_pref.{subcategory}",
entity="user",
content=content[:200],
confidence=0.7,
source_turn=turn_idx,
timestamp=timestamp,
))
by_key: dict[str, ExtractedFact] = {}
contradiction_groups: dict[str, list[ExtractedFact]] = {}
# Correction patterns (from user messages)
if role == "user":
for pattern, subcategory in _CORRECTION_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"correction.{subcategory}",
entity="user",
content=content[:200],
confidence=0.8,
source_turn=turn_idx,
timestamp=timestamp,
))
for candidate in candidates:
existing = by_key.get(candidate.canonical_key)
if existing is not None:
by_key[candidate.canonical_key] = _merge_fact(existing, candidate)
continue
# Project/infrastructure patterns (from both user and assistant)
for pattern, subcategory in _PROJECT_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"project.{subcategory}",
entity=subcategory,
content=content[:200],
confidence=0.6,
source_turn=turn_idx,
timestamp=timestamp,
))
by_key[candidate.canonical_key] = candidate
if candidate.contradiction_group:
contradiction_groups.setdefault(candidate.contradiction_group, []).append(candidate)
for group, facts in contradiction_groups.items():
canonical_keys = {fact.canonical_key for fact in facts}
if len(canonical_keys) <= 1:
continue
for fact in facts:
fact.status = "contradiction"
fact.metadata["status"] = "contradiction"
fact.metadata["contradiction_group"] = group
fact.metadata["contradiction_keys"] = sorted(canonical_keys - {fact.canonical_key})
return sorted(by_key.values(), key=lambda fact: (fact.source_turn, fact.timestamp, fact.canonical_key))
def _merge_fact(existing: ExtractedFact, incoming: ExtractedFact) -> ExtractedFact:
existing.confidence = max(existing.confidence, incoming.confidence)
existing.timestamp = min(existing.timestamp, incoming.timestamp)
existing.source_turn = min(existing.source_turn, incoming.source_turn)
if not existing.observed_at or (incoming.observed_at and incoming.observed_at < existing.observed_at):
existing.observed_at = incoming.observed_at
existing.provenance = min(existing.provenance, incoming.provenance)
provenance = _ordered_unique(existing.metadata.get("provenance", []), incoming.metadata.get("provenance", []))
evidence = _merge_evidence(existing.metadata.get("evidence", []), incoming.metadata.get("evidence", []))
observation_count = int(existing.metadata.get("observation_count", len(existing.evidence) or 1))
observation_count += int(incoming.metadata.get("observation_count", len(incoming.evidence) or 1))
existing.evidence = evidence
existing.metadata["provenance"] = provenance
existing.metadata["evidence"] = evidence
existing.metadata["observation_count"] = observation_count
existing.metadata["duplicate_count"] = max(0, observation_count - 1)
existing.metadata["status"] = existing.status
return existing
return facts
def save_facts_to_store(facts: List[ExtractedFact], fact_store_fn=None) -> int:
"""Save extracted facts to the fact store.
If a callback is supplied, prefer the structured signature but fall back to
the legacy four-argument callback for compatibility.
Args:
facts: List of extracted facts.
fact_store_fn: Optional callable(category, entity, content, trust).
If None, uses the holographic fact store if available.
Returns:
Number of facts saved.
"""
saved = 0
for fact in facts:
payload = {
"category": _store_category(fact.category),
"entity": fact.entity,
"content": fact.content,
"trust": fact.confidence,
"metadata": dict(fact.metadata),
"canonical_key": fact.canonical_key,
"observed_at": fact.observed_at,
"source_role": fact.source_role,
"source_turn": fact.source_turn,
"contradiction_group": fact.contradiction_group,
"status": fact.status,
"relation": fact.relation,
}
if fact_store_fn:
if fact_store_fn:
for fact in facts:
try:
fact_store_fn(**payload)
fact_store_fn(
category=fact.category,
entity=fact.entity,
content=fact.content,
trust=fact.confidence,
)
saved += 1
continue
except TypeError:
try:
fact_store_fn(payload["category"], payload["entity"], payload["content"], payload["trust"])
saved += 1
continue
except Exception as exc:
logger.debug("Failed to save fact via callback: %s", exc)
continue
except Exception as exc:
logger.debug("Failed to save fact via callback: %s", exc)
continue
except Exception as e:
logger.debug("Failed to save fact: %s", e)
else:
# Try holographic fact store
try:
from fact_store import fact_store as _fs
tags = ",".join(filter(None, [fact.entity, fact.relation, fact.status]))
_fs(
action="add",
content=fact.content,
category=_store_category(fact.category),
tags=tags,
trust_delta=fact.confidence - 0.5,
)
saved += 1
for fact in facts:
try:
_fs(
action="add",
content=fact.content,
category=fact.category,
tags=fact.entity,
trust_delta=fact.confidence - 0.5,
)
saved += 1
except Exception as e:
logger.debug("Failed to save fact via fact_store: %s", e)
except ImportError:
logger.debug("fact_store not available — facts not persisted")
break
except Exception as exc:
logger.debug("Failed to save fact via fact_store: %s", exc)
return saved
@@ -553,10 +204,9 @@ def extract_and_save_facts(
Returns (extracted_facts, saved_count).
"""
facts = extract_facts_from_messages(messages)
if facts:
logger.info("Extracted %d normalized facts from conversation", len(facts))
logger.info("Extracted %d facts from conversation", len(facts))
saved = save_facts_to_store(facts, fact_store_fn)
logger.info("Saved %d/%d facts to store", saved, len(facts))
else:
@@ -566,105 +216,16 @@ def extract_and_save_facts(
def format_facts_summary(facts: List[ExtractedFact]) -> str:
"""Format extracted facts as a readable summary."""
if not facts:
return "No facts extracted."
by_category: dict[str, list[ExtractedFact]] = {}
for fact in facts:
by_category.setdefault(fact.category, []).append(fact)
by_category = {}
for f in facts:
by_category.setdefault(f.category, []).append(f)
lines = [f"Extracted {len(facts)} facts:", ""]
for category, category_facts in sorted(by_category.items()):
lines.append(f" {category}:")
for fact in category_facts:
suffix = f" [{fact.status}]" if fact.status != "active" else ""
lines.append(f" - {fact.content[:80]}{suffix}")
for cat, cat_facts in sorted(by_category.items()):
lines.append(f" {cat}:")
for f in cat_facts:
lines.append(f" - {f.content[:80]}")
return "\n".join(lines)
def _store_category(category: str) -> str:
if category.startswith("user_pref"):
return "user_pref"
if category.startswith("project"):
return "project"
if category.startswith("tool"):
return "tool"
return "general"
def _message_time(msg: Dict[str, Any]) -> Tuple[float, str]:
for key in ("created_at", "timestamp", "time"):
value = msg.get(key)
if value is None:
continue
if isinstance(value, (int, float)):
ts = float(value)
return ts, _iso_from_timestamp(ts)
if isinstance(value, str):
parsed = _parse_time_string(value)
if parsed is not None:
return parsed, _iso_from_timestamp(parsed) if "T" not in value else value.replace("+00:00", "Z")
return time.time(), value
now = time.time()
return now, _iso_from_timestamp(now)
def _parse_time_string(value: str) -> float | None:
text = value.strip()
if not text:
return None
try:
return float(text)
except ValueError:
pass
try:
normalized = text[:-1] + "+00:00" if text.endswith("Z") else text
return datetime.fromisoformat(normalized).timestamp()
except ValueError:
return None
def _iso_from_timestamp(value: float) -> str:
return datetime.fromtimestamp(value, tz=timezone.utc).isoformat().replace("+00:00", "Z")
def _normalize_value(value: str) -> str:
normalized = re.sub(r"[^a-z0-9]+", " ", value.lower())
normalized = re.sub(r"\s+", " ", normalized).strip()
return normalized
def _normalize_entity(value: str) -> str:
return _normalize_value(value).replace(" ", "_") or "entity"
def _canonical_key(entity: str, relation: str, normalized_value: str) -> str:
return f"{entity}|{relation}|{normalized_value}"
def _ordered_unique(*groups: List[str]) -> List[str]:
seen: set[str] = set()
ordered: list[str] = []
for group in groups:
for item in group:
if item and item not in seen:
seen.add(item)
ordered.append(item)
return ordered
def _merge_evidence(existing: List[Dict[str, Any]], incoming: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
seen: set[tuple[str, str, str]] = set()
merged: list[dict[str, Any]] = []
for item in list(existing) + list(incoming):
key = (
str(item.get("provenance", "")),
str(item.get("observed_at", "")),
str(item.get("source_text", "")),
)
if key in seen:
continue
seen.add(key)
merged.append(dict(item))
return merged

View File

@@ -523,7 +523,7 @@ DEFAULT_CONFIG = {
# Text-to-speech configuration
"tts": {
"provider": "edge", # "edge" (free) | "elevenlabs" (premium) | "openai" | "minimax" | "mistral" | "neutts" (local)
"provider": "edge", # "edge" (free) | "elevenlabs" (premium) | "openai" | "minimax" | "mistral" | "neutts" (local) | "kittentts" (local)
"edge": {
"voice": "en-US-AriaNeural",
# Popular: AriaNeural, JennyNeural, AndrewNeural, BrianNeural, SoniaNeural
@@ -547,6 +547,12 @@ DEFAULT_CONFIG = {
"model": "neuphonic/neutts-air-q4-gguf", # HuggingFace model repo
"device": "cpu", # cpu, cuda, or mps
},
"kittentts": {
"model": "KittenML/kitten-tts-nano-0.8-int8", # 25MB int8 default
"voice": "Jasper", # Jasper, Bella, Luna, Bruno, Rosie, Hugo, Kiki, Leo
"speed": 1.0,
"clean_text": True,
},
},
"stt": {

View File

@@ -443,6 +443,16 @@ def _print_setup_summary(config: dict, hermes_home):
tool_status.append(("Text-to-Speech (NeuTTS local)", True, None))
else:
tool_status.append(("Text-to-Speech (NeuTTS — not installed)", False, "run 'hermes setup tts'"))
elif tts_provider == "kittentts":
try:
import importlib.util
kittentts_ok = importlib.util.find_spec("kittentts") is not None
except Exception:
kittentts_ok = False
if kittentts_ok:
tool_status.append(("Text-to-Speech (KittenTTS local)", True, None))
else:
tool_status.append(("Text-to-Speech (KittenTTS — not installed)", False, "run 'hermes setup tts'"))
else:
tool_status.append(("Text-to-Speech (Edge TTS)", True, None))
@@ -891,6 +901,7 @@ def _install_neutts_deps() -> bool:
return False
else:
print_warning("espeak-ng is required for NeuTTS. Install it manually before using NeuTTS.")
return False
# Install neutts Python package
print()
@@ -910,8 +921,34 @@ def _install_neutts_deps() -> bool:
return False
def _install_kittentts_deps() -> bool:
"""Install KittenTTS dependencies with user approval. Returns True on success."""
import subprocess
import sys
wheel_url = (
"https://github.com/KittenML/KittenTTS/releases/download/"
"0.8.1/kittentts-0.8.1-py3-none-any.whl"
)
print()
print_info("Installing kittentts Python package (~25-80MB model downloaded on first use)...")
print()
try:
subprocess.run(
[sys.executable, "-m", "pip", "install", "-U", wheel_url, "soundfile", "--quiet"],
check=True, timeout=300,
)
print_success("kittentts installed successfully")
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
print_error(f"Failed to install kittentts: {e}")
print_info(f"Try manually: python -m pip install -U '{wheel_url}' soundfile")
return False
def _setup_tts_provider(config: dict):
"""Interactive TTS provider selection with install flow for NeuTTS."""
"""Interactive TTS provider selection with install flow for local providers."""
tts_config = config.get("tts", {})
current_provider = tts_config.get("provider", "edge")
subscription_features = get_nous_subscription_features(config)
@@ -923,6 +960,7 @@ def _setup_tts_provider(config: dict):
"minimax": "MiniMax TTS",
"mistral": "Mistral Voxtral TTS",
"neutts": "NeuTTS",
"kittentts": "KittenTTS",
}
current_label = provider_labels.get(current_provider, current_provider)
@@ -944,9 +982,10 @@ def _setup_tts_provider(config: dict):
"MiniMax TTS (high quality with voice cloning, needs API key)",
"Mistral Voxtral TTS (multilingual, native Opus, needs API key)",
"NeuTTS (local on-device, free, ~300MB model download)",
"KittenTTS (local on-device, free, lightweight ~25-80MB ONNX)",
]
)
providers.extend(["edge", "elevenlabs", "openai", "minimax", "mistral", "neutts"])
providers.extend(["edge", "elevenlabs", "openai", "minimax", "mistral", "neutts", "kittentts"])
choices.append(f"Keep current ({current_label})")
keep_current_idx = len(choices) - 1
idx = prompt_choice("Select TTS provider:", choices, keep_current_idx)
@@ -988,6 +1027,28 @@ def _setup_tts_provider(config: dict):
print_info("Skipping install. Set tts.provider to 'neutts' after installing manually.")
selected = "edge"
elif selected == "kittentts":
try:
import importlib.util
already_installed = importlib.util.find_spec("kittentts") is not None
except Exception:
already_installed = False
if already_installed:
print_success("KittenTTS is already installed")
else:
print()
print_info("KittenTTS is lightweight (~25-80MB, CPU-only, no API key required).")
print_info("Voices: Jasper, Bella, Luna, Bruno, Rosie, Hugo, Kiki, Leo")
print()
if prompt_yes_no("Install KittenTTS now?", True):
if not _install_kittentts_deps():
print_warning("KittenTTS installation incomplete. Falling back to Edge TTS.")
selected = "edge"
else:
print_info("Skipping install. Set tts.provider to 'kittentts' after installing manually.")
selected = "edge"
elif selected == "elevenlabs":
existing = get_env_value("ELEVENLABS_API_KEY")
if not existing:

View File

@@ -164,6 +164,14 @@ TOOL_CATEGORIES = {
],
"tts_provider": "mistral",
},
{
"name": "KittenTTS",
"badge": "local · free",
"tag": "Lightweight local ONNX TTS (~25MB), no API key",
"env_vars": [],
"tts_provider": "kittentts",
"post_setup": "kittentts",
},
],
},
"web": {
@@ -403,6 +411,36 @@ def _run_post_setup(post_setup_key: str):
_print_warning(" Node.js not found. Install Camofox via Docker:")
_print_info(" docker run -p 9377:9377 -e CAMOFOX_PORT=9377 jo-inc/camofox-browser")
elif post_setup_key == "kittentts":
try:
__import__("kittentts")
_print_success(" kittentts is already installed")
return
except ImportError:
pass
import subprocess
_print_info(" Installing kittentts (~25-80MB model, CPU-only)...")
wheel_url = (
"https://github.com/KittenML/KittenTTS/releases/download/"
"0.8.1/kittentts-0.8.1-py3-none-any.whl"
)
try:
result = subprocess.run(
[sys.executable, "-m", "pip", "install", "-U", wheel_url, "soundfile", "--quiet"],
capture_output=True, text=True, timeout=300,
)
if result.returncode == 0:
_print_success(" kittentts installed")
_print_info(" Voices: Jasper, Bella, Luna, Bruno, Rosie, Hugo, Kiki, Leo")
_print_info(" Models: KittenML/kitten-tts-nano-0.8-int8 (25MB), micro (41MB), mini (80MB)")
else:
_print_warning(" kittentts install failed:")
_print_info(f" {result.stderr.strip()[:300]}")
_print_info(f" Run manually: python -m pip install -U '{wheel_url}' soundfile")
except subprocess.TimeoutExpired:
_print_warning(" kittentts install timed out (>5min)")
_print_info(f" Run manually: python -m pip install -U '{wheel_url}' soundfile")
elif post_setup_key == "rl_training":
try:
__import__("tinker_atropos")

View File

@@ -356,57 +356,44 @@ class HolographicMemoryProvider(MemoryProvider):
# -- Auto-extraction (on_session_end) ------------------------------------
def _auto_extract_facts(self, messages: list) -> None:
from agent.session_compactor import evaluate_extraction_quality, extract_facts_from_messages
def _store_category(category: str) -> str:
if category.startswith("user_pref"):
return "user_pref"
if category.startswith("project"):
return "project"
if category.startswith("tool"):
return "tool"
return "general"
facts = extract_facts_from_messages(messages)
if not facts:
return
_PREF_PATTERNS = [
re.compile(r'\bI\s+(?:prefer|like|love|use|want|need)\s+(.+)', re.IGNORECASE),
re.compile(r'\bmy\s+(?:favorite|preferred|default)\s+\w+\s+is\s+(.+)', re.IGNORECASE),
re.compile(r'\bI\s+(?:always|never|usually)\s+(.+)', re.IGNORECASE),
]
_DECISION_PATTERNS = [
re.compile(r'\bwe\s+(?:decided|agreed|chose)\s+(?:to\s+)?(.+)', re.IGNORECASE),
re.compile(r'\bthe\s+project\s+(?:uses|needs|requires)\s+(.+)', re.IGNORECASE),
]
extracted = 0
for fact in facts:
try:
metadata = dict(fact.metadata)
metadata.setdefault("relation", fact.relation)
metadata.setdefault("value", fact.content)
metadata.setdefault("provenance", [fact.provenance])
metadata.setdefault("evidence", list(fact.evidence))
metadata.setdefault("observation_count", len(fact.evidence))
metadata.setdefault("duplicate_count", max(0, len(fact.evidence) - 1))
self._store.add_fact(
fact.content,
category=_store_category(fact.category),
tags=",".join(filter(None, [fact.entity, fact.relation, fact.status])),
canonical_key=fact.canonical_key,
metadata=metadata,
confidence=fact.confidence,
source_role=fact.source_role,
source_turn=fact.source_turn,
observed_at=fact.observed_at,
contradiction_group=fact.contradiction_group,
status=fact.status,
)
extracted += 1
except Exception as exc:
logger.debug("Structured auto-extract failed for %s: %s", fact.canonical_key, exc)
for msg in messages:
if msg.get("role") != "user":
continue
content = msg.get("content", "")
if not isinstance(content, str) or len(content) < 10:
continue
for pattern in _PREF_PATTERNS:
if pattern.search(content):
try:
self._store.add_fact(content[:400], category="user_pref")
extracted += 1
except Exception:
pass
break
for pattern in _DECISION_PATTERNS:
if pattern.search(content):
try:
self._store.add_fact(content[:400], category="project")
extracted += 1
except Exception:
pass
break
if extracted:
metrics = evaluate_extraction_quality(messages)
logger.info(
"Auto-extracted %d structured facts from conversation (raw=%d normalized=%d contradictions=%d)",
extracted,
metrics["raw_candidates"],
metrics["normalized_facts"],
metrics["contradiction_groups"],
)
logger.info("Auto-extracted %d facts from conversation", extracted)
# ---------------------------------------------------------------------------

View File

@@ -3,7 +3,6 @@ SQLite-backed fact store with entity resolution and trust scoring.
Single-user Hermes memory store plugin.
"""
import json
import re
import sqlite3
import threading
@@ -16,24 +15,16 @@ except ImportError:
_SCHEMA = """
CREATE TABLE IF NOT EXISTS facts (
fact_id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL UNIQUE,
category TEXT DEFAULT 'general',
tags TEXT DEFAULT '',
trust_score REAL DEFAULT 0.5,
retrieval_count INTEGER DEFAULT 0,
helpful_count INTEGER DEFAULT 0,
canonical_key TEXT DEFAULT '',
metadata_json TEXT DEFAULT '{}',
confidence REAL DEFAULT 0.5,
source_role TEXT DEFAULT '',
source_turn INTEGER DEFAULT -1,
observed_at TEXT DEFAULT '',
contradiction_group TEXT DEFAULT '',
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hrr_vector BLOB
fact_id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL UNIQUE,
category TEXT DEFAULT 'general',
tags TEXT DEFAULT '',
trust_score REAL DEFAULT 0.5,
retrieval_count INTEGER DEFAULT 0,
helpful_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
hrr_vector BLOB
);
CREATE TABLE IF NOT EXISTS entities (
@@ -50,11 +41,9 @@ CREATE TABLE IF NOT EXISTS fact_entities (
PRIMARY KEY (fact_id, entity_id)
);
CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC);
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
CREATE INDEX IF NOT EXISTS idx_facts_canonical_key ON facts(canonical_key);
CREATE INDEX IF NOT EXISTS idx_facts_contradiction_group ON facts(contradiction_group);
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
CREATE INDEX IF NOT EXISTS idx_facts_trust ON facts(trust_score DESC);
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name);
CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts
USING fts5(content, tags, content=facts, content_rowid=fact_id);
@@ -140,23 +129,10 @@ class MemoryStore:
"""Create tables, indexes, and triggers if they do not exist. Enable WAL mode."""
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.executescript(_SCHEMA)
# Migrate: add hrr_vector column if missing (safe for existing databases)
columns = {row[1] for row in self._conn.execute("PRAGMA table_info(facts)").fetchall()}
migrations = {
"hrr_vector": "ALTER TABLE facts ADD COLUMN hrr_vector BLOB",
"canonical_key": "ALTER TABLE facts ADD COLUMN canonical_key TEXT DEFAULT ''",
"metadata_json": "ALTER TABLE facts ADD COLUMN metadata_json TEXT DEFAULT '{}'",
"confidence": "ALTER TABLE facts ADD COLUMN confidence REAL DEFAULT 0.5",
"source_role": "ALTER TABLE facts ADD COLUMN source_role TEXT DEFAULT ''",
"source_turn": "ALTER TABLE facts ADD COLUMN source_turn INTEGER DEFAULT -1",
"observed_at": "ALTER TABLE facts ADD COLUMN observed_at TEXT DEFAULT ''",
"contradiction_group": "ALTER TABLE facts ADD COLUMN contradiction_group TEXT DEFAULT ''",
"status": "ALTER TABLE facts ADD COLUMN status TEXT DEFAULT 'active'",
}
for column, ddl in migrations.items():
if column not in columns:
self._conn.execute(ddl)
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_canonical_key ON facts(canonical_key)")
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_facts_contradiction_group ON facts(contradiction_group)")
if "hrr_vector" not in columns:
self._conn.execute("ALTER TABLE facts ADD COLUMN hrr_vector BLOB")
self._conn.commit()
# ------------------------------------------------------------------
@@ -168,148 +144,41 @@ class MemoryStore:
content: str,
category: str = "general",
tags: str = "",
*,
canonical_key: str = "",
metadata: dict | None = None,
confidence: float | None = None,
source_role: str = "",
source_turn: int = -1,
observed_at: str = "",
contradiction_group: str = "",
status: str = "active",
) -> int:
"""Insert a fact and return its fact_id.
Exact duplicates are deduplicated by content. Near-duplicates are
normalized by canonical_key, with provenance/evidence merged into the
existing row. Contradictions sharing the same contradiction_group remain
stored as separate rows and are marked inspectably.
Deduplicates by content (UNIQUE constraint). On duplicate, returns
the existing fact_id without modifying the row. Extracts entities from
the content and links them to the fact.
"""
with self._lock:
content = content.strip()
if not content:
raise ValueError("content must not be empty")
metadata = dict(metadata or {})
canonical_key = canonical_key.strip()
contradiction_group = contradiction_group.strip()
observed_at = observed_at.strip()
status = status or "active"
trust_score = self.default_trust if confidence is None else _clamp_trust(confidence)
metadata_json = json.dumps(metadata, sort_keys=True)
if canonical_key:
existing = self._conn.execute(
"SELECT fact_id, metadata_json, trust_score, confidence, observed_at FROM facts WHERE canonical_key = ?",
(canonical_key,),
).fetchone()
if existing is not None:
merged_metadata = self._merge_metadata(existing["metadata_json"], metadata)
merged_trust = max(float(existing["trust_score"]), trust_score)
merged_observed_at = existing["observed_at"] or observed_at
if observed_at and merged_observed_at:
merged_observed_at = min(merged_observed_at, observed_at)
elif observed_at:
merged_observed_at = observed_at
self._conn.execute(
"""
UPDATE facts
SET metadata_json = ?,
trust_score = ?,
confidence = ?,
observed_at = ?,
updated_at = CURRENT_TIMESTAMP
WHERE fact_id = ?
""",
(
json.dumps(merged_metadata, sort_keys=True),
merged_trust,
max(float(existing["confidence"] or 0.0), confidence or trust_score),
merged_observed_at,
existing["fact_id"],
),
)
self._conn.commit()
return int(existing["fact_id"])
contradiction_rows = []
if contradiction_group:
contradiction_rows = self._conn.execute(
"""
SELECT fact_id, canonical_key, metadata_json
FROM facts
WHERE contradiction_group = ?
AND canonical_key != ?
""",
(contradiction_group, canonical_key),
).fetchall()
if contradiction_rows:
status = "contradiction"
metadata = dict(metadata)
metadata["status"] = "contradiction"
metadata["contradiction_group"] = contradiction_group
metadata["contradiction_keys"] = sorted(
{
canonical_key,
*[str(row["canonical_key"]) for row in contradiction_rows if row["canonical_key"]],
}
- {""}
)
metadata_json = json.dumps(metadata, sort_keys=True)
try:
cur = self._conn.execute(
"""
INSERT INTO facts (
content,
category,
tags,
trust_score,
canonical_key,
metadata_json,
confidence,
source_role,
source_turn,
observed_at,
contradiction_group,
status
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO facts (content, category, tags, trust_score)
VALUES (?, ?, ?, ?)
""",
(
content,
category,
tags,
trust_score,
canonical_key,
metadata_json,
confidence if confidence is not None else trust_score,
source_role,
source_turn,
observed_at,
contradiction_group,
status,
),
(content, category, tags, self.default_trust),
)
self._conn.commit()
fact_id: int = cur.lastrowid # type: ignore[assignment]
except sqlite3.IntegrityError:
# Duplicate content — return existing id
row = self._conn.execute(
"SELECT fact_id FROM facts WHERE content = ?", (content,)
).fetchone()
return int(row["fact_id"])
if contradiction_rows:
self._mark_contradictions(
contradiction_group=contradiction_group,
new_canonical_key=canonical_key,
existing_rows=contradiction_rows,
)
# Entity extraction and linking
for name in self._extract_entities(content):
entity_id = self._resolve_entity(name)
self._link_fact_entity(fact_id, entity_id)
# Compute HRR vector after entity linking
self._compute_hrr_vector(fact_id, content)
self._rebuild_bank(category)
@@ -342,9 +211,6 @@ class MemoryStore:
sql = f"""
SELECT f.fact_id, f.content, f.category, f.tags,
f.trust_score, f.retrieval_count, f.helpful_count,
f.canonical_key, f.metadata_json, f.confidence,
f.source_role, f.source_turn, f.observed_at,
f.contradiction_group, f.status,
f.created_at, f.updated_at
FROM facts f
JOIN facts_fts fts ON fts.rowid = f.fact_id
@@ -470,11 +336,7 @@ class MemoryStore:
sql = f"""
SELECT fact_id, content, category, tags, trust_score,
retrieval_count, helpful_count,
canonical_key, metadata_json, confidence,
source_role, source_turn, observed_at,
contradiction_group, status,
created_at, updated_at
retrieval_count, helpful_count, created_at, updated_at
FROM facts
WHERE trust_score >= ?
{category_clause}
@@ -525,89 +387,6 @@ class MemoryStore:
"helpful_count": row["helpful_count"] + helpful_increment,
}
# ------------------------------------------------------------------
# Metadata helpers
# ------------------------------------------------------------------
def _load_metadata(self, metadata_json: str | None) -> dict:
if not metadata_json:
return {}
try:
data = json.loads(metadata_json)
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _merge_metadata(self, existing_json: str | None, incoming: dict | None) -> dict:
existing = self._load_metadata(existing_json)
incoming = dict(incoming or {})
merged = dict(existing)
merged.update({k: v for k, v in incoming.items() if k not in {"provenance", "evidence", "observation_count", "duplicate_count", "contradiction_keys"}})
provenance = []
seen_provenance: set[str] = set()
for item in list(existing.get("provenance", [])) + list(incoming.get("provenance", [])):
if item and item not in seen_provenance:
seen_provenance.add(item)
provenance.append(item)
evidence = []
seen_evidence: set[tuple[str, str, str]] = set()
for item in list(existing.get("evidence", [])) + list(incoming.get("evidence", [])):
if not isinstance(item, dict):
continue
key = (
str(item.get("provenance", "")),
str(item.get("observed_at", "")),
str(item.get("source_text", "")),
)
if key in seen_evidence:
continue
seen_evidence.add(key)
evidence.append(dict(item))
observation_count = int(existing.get("observation_count", max(1, len(existing.get("evidence", [])) or 1)))
observation_count += int(incoming.get("observation_count", max(1, len(incoming.get("evidence", [])) or 1)))
contradiction_keys = []
seen_keys: set[str] = set()
for item in list(existing.get("contradiction_keys", [])) + list(incoming.get("contradiction_keys", [])):
if item and item not in seen_keys:
seen_keys.add(item)
contradiction_keys.append(item)
merged["provenance"] = provenance
merged["evidence"] = evidence
merged["observation_count"] = observation_count
merged["duplicate_count"] = max(0, observation_count - 1)
if contradiction_keys:
merged["contradiction_keys"] = contradiction_keys
return merged
def _mark_contradictions(self, contradiction_group: str, new_canonical_key: str, existing_rows: list[sqlite3.Row]) -> None:
for row in existing_rows:
metadata = self._load_metadata(row["metadata_json"])
keys = []
seen: set[str] = set()
for item in list(metadata.get("contradiction_keys", [])) + [new_canonical_key]:
if item and item not in seen:
seen.add(item)
keys.append(item)
metadata["status"] = "contradiction"
metadata["contradiction_group"] = contradiction_group
metadata["contradiction_keys"] = keys
self._conn.execute(
"""
UPDATE facts
SET status = 'contradiction',
metadata_json = ?,
updated_at = CURRENT_TIMESTAMP
WHERE fact_id = ?
""",
(json.dumps(metadata, sort_keys=True), row["fact_id"]),
)
self._conn.commit()
# ------------------------------------------------------------------
# Entity helpers
# ------------------------------------------------------------------
@@ -781,14 +560,8 @@ class MemoryStore:
# ------------------------------------------------------------------
def _row_to_dict(self, row: sqlite3.Row) -> dict:
"""Convert a sqlite3.Row to a plain dict with decoded metadata."""
data = dict(row)
metadata = self._load_metadata(data.get("metadata_json"))
if metadata:
data["metadata"] = metadata
data.setdefault("relation", metadata.get("relation"))
data.pop("metadata_json", None)
return data
"""Convert a sqlite3.Row to a plain dict."""
return dict(row)
def close(self) -> None:
"""Close the database connection."""

View File

@@ -1,63 +0,0 @@
{
"preferences_and_duplicates": [
{
"role": "user",
"content": "Deploy via Ansible for production changes.",
"created_at": "2026-04-22T10:00:00Z"
},
{
"role": "user",
"content": "We deploy through Ansible on this repo.",
"created_at": "2026-04-22T10:01:00Z"
},
{
"role": "user",
"content": "Gitea-first for repository work.",
"created_at": "2026-04-22T10:02:00Z"
}
],
"operational_and_contradictions": [
{
"role": "user",
"content": "The BURN watchdog caps dispatches per cycle to 6.",
"created_at": "2026-04-22T11:00:00Z"
},
{
"role": "user",
"content": "The provider should stay openai-codex/gpt-5.4.",
"created_at": "2026-04-22T11:01:00Z"
},
{
"role": "user",
"content": "Correction: the provider should stay mimo-v2-pro.",
"created_at": "2026-04-22T11:02:00Z"
}
],
"mixed_transcript": [
{
"role": "user",
"content": "Deploy via Ansible for production changes.",
"created_at": "2026-04-22T10:00:00Z"
},
{
"role": "user",
"content": "We deploy through Ansible on this repo.",
"created_at": "2026-04-22T10:01:00Z"
},
{
"role": "user",
"content": "The BURN watchdog caps dispatches per cycle to 6.",
"created_at": "2026-04-22T11:00:00Z"
},
{
"role": "user",
"content": "The provider should stay openai-codex/gpt-5.4.",
"created_at": "2026-04-22T11:01:00Z"
},
{
"role": "user",
"content": "Correction: the provider should stay mimo-v2-pro.",
"created_at": "2026-04-22T11:02:00Z"
}
]
}

View File

@@ -1,50 +0,0 @@
"""Integration tests for holographic auto-extraction with structured fact persistence."""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
from plugins.memory.holographic import HolographicMemoryProvider
_FIXTURE_PATH = Path(__file__).resolve().parents[2] / "fixtures" / "memory_extraction_fragments.json"
def _load_fixture(name: str):
return json.loads(_FIXTURE_PATH.read_text())[name]
class TestHolographicAutoExtract:
def test_auto_extract_persists_structured_metadata_and_normalizes_duplicates(self, tmp_path):
provider = HolographicMemoryProvider(
config={
"db_path": str(tmp_path / "memory_store.db"),
"auto_extract": True,
"default_trust": 0.5,
}
)
provider.initialize("test-session")
messages = _load_fixture("mixed_transcript")
provider.on_session_end(messages)
provider.on_session_end(messages)
facts = provider._store.list_facts(min_trust=0.0, limit=20)
deploy_facts = [f for f in facts if f.get("relation") == "workflow.deploy_method"]
provider_facts = [f for f in facts if f.get("contradiction_group") == "config.provider"]
assert len(deploy_facts) == 1
assert deploy_facts[0]["metadata"]["duplicate_count"] >= 3
assert deploy_facts[0]["observed_at"] == "2026-04-22T10:00:00Z"
assert deploy_facts[0]["metadata"]["provenance"] == [
"conversation:user:0",
"conversation:user:1",
]
assert len(provider_facts) == 2
assert {f["status"] for f in provider_facts} == {"contradiction"}
assert {f["metadata"]["value"] for f in provider_facts} == {
"openai-codex/gpt-5.4",
"mimo-v2-pro",
}

View File

@@ -1,6 +1,6 @@
"""Tests for session compaction with fact extraction."""
import json
import pytest
import sys
from pathlib import Path
@@ -8,19 +8,12 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from agent.session_compactor import (
ExtractedFact,
evaluate_extraction_quality,
extract_and_save_facts,
extract_facts_from_messages,
format_facts_summary,
save_facts_to_store,
extract_and_save_facts,
format_facts_summary,
)
_FIXTURE_PATH = Path(__file__).resolve().parent / "fixtures" / "memory_extraction_fragments.json"
def _load_fixture(name: str):
return json.loads(_FIXTURE_PATH.read_text())[name]
class TestFactExtraction:
def test_extract_preference(self):
@@ -67,48 +60,14 @@ class TestFactExtraction:
{"role": "user", "content": "I prefer Python."},
]
facts = extract_facts_from_messages(messages)
# Should deduplicate
python_facts = [f for f in facts if "Python" in f.content]
assert len(python_facts) == 1
def test_structured_fact_preserves_provenance_and_temporal_metadata(self):
facts = extract_facts_from_messages(_load_fixture("preferences_and_duplicates"))
deploy_fact = next(f for f in facts if f.relation == "workflow.deploy_method")
assert deploy_fact.source_role == "user"
assert deploy_fact.source_turn == 0
assert deploy_fact.observed_at == "2026-04-22T10:00:00Z"
assert deploy_fact.provenance == "conversation:user:0"
assert deploy_fact.canonical_key
assert deploy_fact.evidence
assert deploy_fact.evidence[0]["source_text"].startswith("Deploy via Ansible")
def test_near_duplicate_facts_are_normalized_into_one_canonical_fact(self):
facts = extract_facts_from_messages(_load_fixture("preferences_and_duplicates"))
deploy_facts = [f for f in facts if f.relation == "workflow.deploy_method"]
assert len(deploy_facts) == 1
assert len(deploy_facts[0].evidence) == 2
assert deploy_facts[0].metadata["duplicate_count"] == 1
def test_contradictory_facts_are_preserved_for_unique_slots(self):
facts = extract_facts_from_messages(_load_fixture("operational_and_contradictions"))
provider_facts = [f for f in facts if f.contradiction_group == "config.provider"]
assert len(provider_facts) == 2
assert {f.status for f in provider_facts} == {"contradiction"}
assert {f.normalized_content for f in provider_facts} == {
"openai codex gpt 5 4",
"mimo v2 pro",
}
def test_quality_evaluation_reports_noise_reduction(self):
metrics = evaluate_extraction_quality(_load_fixture("mixed_transcript"))
assert metrics["raw_candidates"] > metrics["normalized_facts"]
assert metrics["noise_reduction"] > 0
assert metrics["contradiction_groups"] == 1
class TestSaveFacts:
def test_save_with_callback(self):
saved = []
def mock_save(category, entity, content, trust):
saved.append({"category": category, "content": content})
@@ -117,38 +76,6 @@ class TestSaveFacts:
assert count == 1
assert len(saved) == 1
def test_save_with_extended_callback_metadata(self):
saved = []
def mock_save(category, entity, content, trust, **kwargs):
saved.append({
"category": category,
"entity": entity,
"content": content,
"trust": trust,
**kwargs,
})
fact = ExtractedFact(
"project.operational",
"watchdog",
"BURN watchdog caps dispatches per cycle to 6",
0.9,
2,
source_role="user",
observed_at="2026-04-22T11:00:00Z",
provenance="conversation:user:2",
canonical_key="project.operational|watchdog|dispatch_cap|6",
relation="fleet.dispatch_cap",
contradiction_group="fleet.dispatch_cap",
metadata={"duplicate_count": 0},
)
count = save_facts_to_store([fact], fact_store_fn=mock_save)
assert count == 1
assert saved[0]["canonical_key"] == fact.canonical_key
assert saved[0]["observed_at"] == "2026-04-22T11:00:00Z"
assert saved[0]["metadata"]["duplicate_count"] == 0
class TestFormatSummary:
def test_empty(self):

View File

@@ -0,0 +1,236 @@
"""Tests for the KittenTTS local provider in tools/tts_tool.py."""
import json
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
@pytest.fixture(autouse=True)
def clean_env(monkeypatch):
for key in ("HERMES_SESSION_PLATFORM",):
monkeypatch.delenv(key, raising=False)
@pytest.fixture(autouse=True)
def clear_kittentts_cache():
"""Reset the module-level model cache between tests."""
from tools import tts_tool as _tt
_tt._kittentts_model_cache.clear()
yield
_tt._kittentts_model_cache.clear()
@pytest.fixture
def mock_kittentts_module():
"""Inject a fake kittentts + soundfile module that return stub objects."""
fake_model = MagicMock()
# 24kHz float32 PCM at ~2s of silence
fake_model.generate.return_value = np.zeros(48000, dtype=np.float32)
fake_cls = MagicMock(return_value=fake_model)
fake_kittentts = MagicMock()
fake_kittentts.KittenTTS = fake_cls
# Stub soundfile — the real package isn't installed in CI venv, and
# _generate_kittentts does `import soundfile as sf` at runtime.
fake_sf = MagicMock()
def _fake_write(path, audio, samplerate):
# Emulate writing a real file so downstream path checks succeed.
import pathlib
pathlib.Path(path).write_bytes(b"RIFF\x00\x00\x00\x00WAVEfmt fake")
fake_sf.write = _fake_write
with patch.dict(
"sys.modules",
{"kittentts": fake_kittentts, "soundfile": fake_sf},
):
yield fake_model, fake_cls
class TestGenerateKittenTts:
def test_successful_wav_generation(self, tmp_path, mock_kittentts_module):
from tools.tts_tool import _generate_kittentts
fake_model, fake_cls = mock_kittentts_module
output_path = str(tmp_path / "test.wav")
result = _generate_kittentts("Hello world", output_path, {})
assert result == output_path
assert (tmp_path / "test.wav").exists()
fake_cls.assert_called_once()
fake_model.generate.assert_called_once()
def test_config_passes_voice_speed_cleantext(self, tmp_path, mock_kittentts_module):
from tools.tts_tool import _generate_kittentts
fake_model, _ = mock_kittentts_module
config = {
"kittentts": {
"model": "KittenML/kitten-tts-mini-0.8",
"voice": "Luna",
"speed": 1.25,
"clean_text": False,
}
}
_generate_kittentts("Hi there", str(tmp_path / "out.wav"), config)
call_kwargs = fake_model.generate.call_args.kwargs
assert call_kwargs["voice"] == "Luna"
assert call_kwargs["speed"] == 1.25
assert call_kwargs["clean_text"] is False
def test_default_model_and_voice(self, tmp_path, mock_kittentts_module):
from tools.tts_tool import (
DEFAULT_KITTENTTS_MODEL,
DEFAULT_KITTENTTS_VOICE,
_generate_kittentts,
)
fake_model, fake_cls = mock_kittentts_module
_generate_kittentts("Hi", str(tmp_path / "out.wav"), {})
fake_cls.assert_called_once_with(DEFAULT_KITTENTTS_MODEL)
assert fake_model.generate.call_args.kwargs["voice"] == DEFAULT_KITTENTTS_VOICE
def test_model_is_cached_across_calls(self, tmp_path, mock_kittentts_module):
from tools.tts_tool import _generate_kittentts
_, fake_cls = mock_kittentts_module
_generate_kittentts("One", str(tmp_path / "a.wav"), {})
_generate_kittentts("Two", str(tmp_path / "b.wav"), {})
# Same model name → class instantiated exactly once
assert fake_cls.call_count == 1
def test_different_models_are_cached_separately(self, tmp_path, mock_kittentts_module):
from tools.tts_tool import _generate_kittentts
_, fake_cls = mock_kittentts_module
_generate_kittentts(
"A",
str(tmp_path / "a.wav"),
{"kittentts": {"model": "KittenML/kitten-tts-nano-0.8-int8"}},
)
_generate_kittentts(
"B",
str(tmp_path / "b.wav"),
{"kittentts": {"model": "KittenML/kitten-tts-mini-0.8"}},
)
assert fake_cls.call_count == 2
def test_non_wav_extension_triggers_ffmpeg_conversion(
self, tmp_path, mock_kittentts_module, monkeypatch
):
"""Non-.wav output path causes WAV → target ffmpeg conversion."""
from tools import tts_tool as _tt
calls = []
def fake_shutil_which(cmd):
return "/usr/bin/ffmpeg" if cmd == "ffmpeg" else None
def fake_run(cmd, check=False, timeout=None, **kw):
calls.append(cmd)
# Emulate ffmpeg writing the output file
import pathlib
out_path = cmd[-1]
pathlib.Path(out_path).write_bytes(b"fake-mp3-data")
return MagicMock(returncode=0)
monkeypatch.setattr(_tt.shutil, "which", fake_shutil_which)
monkeypatch.setattr(_tt.subprocess, "run", fake_run)
output_path = str(tmp_path / "test.mp3")
result = _tt._generate_kittentts("Hi", output_path, {})
assert result == output_path
assert len(calls) == 1
assert calls[0][0] == "/usr/bin/ffmpeg"
def test_missing_kittentts_raises_import_error(self, tmp_path, monkeypatch):
"""When kittentts package is not installed, _import_kittentts raises."""
import sys
monkeypatch.setitem(sys.modules, "kittentts", None)
from tools.tts_tool import _generate_kittentts
with pytest.raises((ImportError, TypeError)):
_generate_kittentts("Hi", str(tmp_path / "out.wav"), {})
class TestCheckKittenttsAvailable:
def test_reports_available_when_package_present(self, monkeypatch):
import importlib.util
from tools.tts_tool import _check_kittentts_available
fake_spec = MagicMock()
monkeypatch.setattr(
importlib.util,
"find_spec",
lambda name: fake_spec if name == "kittentts" else None,
)
assert _check_kittentts_available() is True
def test_reports_unavailable_when_package_missing(self, monkeypatch):
import importlib.util
from tools.tts_tool import _check_kittentts_available
monkeypatch.setattr(importlib.util, "find_spec", lambda name: None)
assert _check_kittentts_available() is False
class TestDispatcherBranch:
def test_kittentts_not_installed_returns_helpful_error(self, monkeypatch, tmp_path):
"""When provider=kittentts but package missing, return JSON error with setup hint."""
import sys
monkeypatch.setitem(sys.modules, "kittentts", None)
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from tools.tts_tool import text_to_speech_tool
# Write a config telling it to use kittentts
import yaml
(tmp_path / "config.yaml").write_text(
yaml.safe_dump({"tts": {"provider": "kittentts"}})
)
result = json.loads(text_to_speech_tool(text="Hello"))
assert result["success"] is False
assert "kittentts" in result["error"].lower()
assert "hermes setup tts" in result["error"].lower()
def test_non_telegram_explicit_wav_path_is_preserved(
self, monkeypatch, tmp_path, mock_kittentts_module
):
"""Explicit WAV outputs should stay WAV outside Telegram sessions."""
import yaml
from tools import tts_tool as _tt
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
(tmp_path / "config.yaml").write_text(
yaml.safe_dump({"tts": {"provider": "kittentts"}})
)
def fail_convert(_path):
raise AssertionError("_convert_to_opus should not run outside Telegram")
monkeypatch.setattr(_tt, "_convert_to_opus", fail_convert)
result = json.loads(
_tt.text_to_speech_tool(
text="Hello from KittenTTS",
output_path=str(tmp_path / "out.wav"),
)
)
assert result["success"] is True
assert result["file_path"] == str(tmp_path / "out.wav")
assert (tmp_path / "out.wav").exists()

View File

@@ -2,13 +2,14 @@
"""
Text-to-Speech Tool Module
Supports six TTS providers:
Supports seven TTS providers:
- Edge TTS (default, free, no API key): Microsoft Edge neural voices
- ElevenLabs (premium): High-quality voices, needs ELEVENLABS_API_KEY
- OpenAI TTS: Good quality, needs OPENAI_API_KEY
- MiniMax TTS: High-quality with voice cloning, needs MINIMAX_API_KEY
- Mistral (Voxtral TTS): Multilingual, native Opus, needs MISTRAL_API_KEY
- NeuTTS (local, free, no API key): On-device TTS via neutts_cli, needs neutts installed
- KittenTTS (local, free, no API key): Lightweight on-device ONNX TTS via kittentts
Output formats:
- Opus (.ogg) for Telegram voice bubbles (requires ffmpeg for Edge TTS)
@@ -77,6 +78,12 @@ def _import_sounddevice():
return sd
def _import_kittentts():
"""Lazy import KittenTTS. Returns the class or raises ImportError."""
from kittentts import KittenTTS
return KittenTTS
# ===========================================================================
# Defaults
# ===========================================================================
@@ -86,6 +93,8 @@ DEFAULT_ELEVENLABS_VOICE_ID = "pNInz6obpgDQGcFmaJgB" # Adam
DEFAULT_ELEVENLABS_MODEL_ID = "eleven_multilingual_v2"
DEFAULT_ELEVENLABS_STREAMING_MODEL_ID = "eleven_flash_v2_5"
DEFAULT_OPENAI_MODEL = "gpt-4o-mini-tts"
DEFAULT_KITTENTTS_MODEL = "KittenML/kitten-tts-nano-0.8-int8" # 25MB
DEFAULT_KITTENTTS_VOICE = "Jasper"
DEFAULT_OPENAI_VOICE = "alloy"
DEFAULT_OPENAI_BASE_URL = "https://api.openai.com/v1"
DEFAULT_MINIMAX_MODEL = "speech-2.8-hd"
@@ -448,6 +457,15 @@ def _check_neutts_available() -> bool:
return False
def _check_kittentts_available() -> bool:
"""Check if the kittentts engine is importable (installed locally)."""
try:
import importlib.util
return importlib.util.find_spec("kittentts") is not None
except Exception:
return False
def _default_neutts_ref_audio() -> str:
"""Return path to the bundled default voice reference audio."""
return str(Path(__file__).parent / "neutts_samples" / "jo.wav")
@@ -511,6 +529,51 @@ def _generate_neutts(text: str, output_path: str, tts_config: Dict[str, Any]) ->
return output_path
# ===========================================================================
# Provider: KittenTTS (local, lightweight)
# ===========================================================================
# Module-level cache for KittenTTS model instances
_kittentts_model_cache: Dict[str, Any] = {}
def _generate_kittentts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
"""Generate speech using the local KittenTTS ONNX model."""
KittenTTS = _import_kittentts()
kt_config = tts_config.get("kittentts", {})
model_name = kt_config.get("model", DEFAULT_KITTENTTS_MODEL)
voice = kt_config.get("voice", DEFAULT_KITTENTTS_VOICE)
speed = kt_config.get("speed", 1.0)
clean_text = kt_config.get("clean_text", True)
global _kittentts_model_cache
if model_name not in _kittentts_model_cache:
logger.info("[KittenTTS] Loading model: %s", model_name)
_kittentts_model_cache[model_name] = KittenTTS(model_name)
model = _kittentts_model_cache[model_name]
audio = model.generate(text, voice=voice, speed=speed, clean_text=clean_text)
import soundfile as sf
wav_path = output_path
if not output_path.endswith(".wav"):
wav_path = output_path.rsplit(".", 1)[0] + ".wav"
sf.write(wav_path, audio, 24000)
if wav_path != output_path:
ffmpeg = shutil.which("ffmpeg")
if ffmpeg:
conv_cmd = [ffmpeg, "-i", wav_path, "-y", "-loglevel", "error", output_path]
subprocess.run(conv_cmd, check=True, timeout=30)
os.remove(wav_path)
else:
os.rename(wav_path, output_path)
return output_path
# ===========================================================================
# Main tool function
# ===========================================================================
@@ -622,6 +685,19 @@ def text_to_speech_tool(
logger.info("Generating speech with NeuTTS (local)...")
_generate_neutts(text, file_str, tts_config)
elif provider == "kittentts":
try:
_import_kittentts()
except ImportError:
return json.dumps({
"success": False,
"error": "KittenTTS provider selected but 'kittentts' package not installed. "
"Run 'hermes setup tts' and choose KittenTTS, or install manually: "
"pip install https://github.com/KittenML/KittenTTS/releases/download/0.8.1/kittentts-0.8.1-py3-none-any.whl"
}, ensure_ascii=False)
logger.info("Generating speech with KittenTTS (local, lightweight)...")
_generate_kittentts(text, file_str, tts_config)
else:
# Default: Edge TTS (free), with NeuTTS as local fallback
edge_available = True
@@ -658,10 +734,10 @@ def text_to_speech_tool(
"error": f"TTS generation produced no output (provider: {provider})"
}, ensure_ascii=False)
# Try Opus conversion for Telegram compatibility
# Edge TTS outputs MP3, NeuTTS outputs WAV — both need ffmpeg conversion
# Try Opus conversion for Telegram compatibility only.
# Outside Telegram, preserve the caller's explicit output format.
voice_compatible = False
if provider in ("edge", "neutts", "minimax") and not file_str.endswith(".ogg"):
if want_opus and provider in ("edge", "neutts", "minimax", "kittentts") and not file_str.endswith(".ogg"):
opus_path = _convert_to_opus(file_str)
if opus_path:
file_str = opus_path
@@ -742,6 +818,8 @@ def check_tts_requirements() -> bool:
pass
if _check_neutts_available():
return True
if _check_kittentts_available():
return True
return False

View File

@@ -10,7 +10,7 @@ Hermes Agent supports both text-to-speech output and voice message transcription
## Text-to-Speech
Convert text to speech with six providers:
Convert text to speech with seven providers:
| Provider | Quality | Cost | API Key |
|----------|---------|------|---------|
@@ -20,6 +20,7 @@ Convert text to speech with six providers:
| **MiniMax TTS** | Excellent | Paid | `MINIMAX_API_KEY` |
| **Mistral (Voxtral TTS)** | Excellent | Paid | `MISTRAL_API_KEY` |
| **NeuTTS** | Good | Free | None needed |
| **KittenTTS** | Good | Free (local) | None needed |
### Platform Delivery
@@ -35,7 +36,7 @@ Convert text to speech with six providers:
```yaml
# In ~/.hermes/config.yaml
tts:
provider: "edge" # "edge" | "elevenlabs" | "openai" | "minimax" | "mistral" | "neutts"
provider: "edge" # "edge" | "elevenlabs" | "openai" | "minimax" | "mistral" | "neutts" | "kittentts"
speed: 1.0 # Global speed multiplier (provider-specific settings override this)
edge:
voice: "en-US-AriaNeural" # 322 voices, 74 languages
@@ -62,6 +63,11 @@ tts:
ref_text: ''
model: neuphonic/neutts-air-q4-gguf
device: cpu
kittentts:
model: KittenML/kitten-tts-nano-0.8-int8 # 25MB int8 default; also micro and mini variants
voice: Jasper # Jasper, Bella, Luna, Bruno, Rosie, Hugo, Kiki, Leo
speed: 1.0
clean_text: true
```
**Speed control**: The global `tts.speed` value applies to all providers by default. Each provider can override it with its own `speed` setting (e.g., `tts.openai.speed: 1.5`). Provider-specific speed takes precedence over the global value. Default is `1.0` (normal speed).
@@ -74,6 +80,7 @@ Telegram voice bubbles require Opus/OGG audio format:
- **Edge TTS** (default) outputs MP3 and needs **ffmpeg** to convert:
- **MiniMax TTS** outputs MP3 and needs **ffmpeg** to convert for Telegram voice bubbles
- **NeuTTS** outputs WAV and also needs **ffmpeg** to convert for Telegram voice bubbles
- **KittenTTS** outputs WAV and also needs **ffmpeg** to convert for Telegram voice bubbles
```bash
# Ubuntu/Debian
@@ -86,7 +93,7 @@ brew install ffmpeg
sudo dnf install ffmpeg
```
Without ffmpeg, Edge TTS, MiniMax TTS, and NeuTTS audio are sent as regular audio files (playable, but shown as a rectangular player instead of a voice bubble).
Without ffmpeg, Edge TTS, MiniMax TTS, NeuTTS, and KittenTTS audio are sent as regular audio files (playable, but shown as a rectangular player instead of a voice bubble).
:::tip
If you want voice bubbles without installing ffmpeg, switch to the OpenAI, ElevenLabs, or Mistral provider.