Compare commits
1 Commits
fix/813
...
fix/748-se
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
13ef670c05 |
@@ -1,302 +0,0 @@
|
||||
"""Self-Modifying Prompt Engine — agent learns from its own failures.
|
||||
|
||||
Analyzes session transcripts, identifies failure patterns, and generates
|
||||
prompt patches to prevent future failures.
|
||||
|
||||
The loop: fail → analyze → rewrite → retry → verify improvement.
|
||||
|
||||
Usage:
|
||||
from agent.self_modify import PromptLearner
|
||||
learner = PromptLearner()
|
||||
patches = learner.analyze_session(session_id)
|
||||
learner.apply_patches(patches)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
PATCHES_DIR = HERMES_HOME / "prompt_patches"
|
||||
ROLLBACK_DIR = HERMES_HOME / "prompt_rollback"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FailurePattern:
|
||||
"""A detected failure pattern in session transcripts."""
|
||||
pattern_type: str # retry_loop, timeout, error_hallucination, context_loss
|
||||
description: str
|
||||
frequency: int
|
||||
example_messages: List[str] = field(default_factory=list)
|
||||
suggested_fix: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class PromptPatch:
|
||||
"""A modification to the system prompt based on failure analysis."""
|
||||
id: str
|
||||
failure_type: str
|
||||
original_rule: str
|
||||
new_rule: str
|
||||
confidence: float
|
||||
applied_at: Optional[float] = None
|
||||
reverted: bool = False
|
||||
|
||||
|
||||
# Failure detection patterns
|
||||
FAILURE_SIGNALS = {
|
||||
"retry_loop": {
|
||||
"patterns": [
|
||||
r"(?i)retry(?:ing)?\s*(?:attempt|again)",
|
||||
r"(?i)failed.*retrying",
|
||||
r"(?i)error.*again",
|
||||
r"(?i)attempt\s+\d+\s*(?:of|/)\s*\d+",
|
||||
],
|
||||
"description": "Agent stuck in retry loop",
|
||||
},
|
||||
"timeout": {
|
||||
"patterns": [
|
||||
r"(?i)timed?\s*out",
|
||||
r"(?i)deadline\s+exceeded",
|
||||
r"(?i)took\s+(?:too\s+)?long",
|
||||
],
|
||||
"description": "Operation timed out",
|
||||
},
|
||||
"hallucination": {
|
||||
"patterns": [
|
||||
r"(?i)i\s+(?:don't|do\s+not)\s+(?:have|see|find)\s+(?:any|that|this)\s+(?:information|data|file)",
|
||||
r"(?i)the\s+file\s+doesn't\s+exist",
|
||||
r"(?i)i\s+(?:made|invented|fabricated)\s+(?:that\s+up|this)",
|
||||
],
|
||||
"description": "Agent hallucinated or fabricated information",
|
||||
},
|
||||
"context_loss": {
|
||||
"patterns": [
|
||||
r"(?i)i\s+(?:don't|do\s+not)\s+(?:remember|recall|know)\s+(?:what|where|when|how)",
|
||||
r"(?i)could\s+you\s+remind\s+me",
|
||||
r"(?i)what\s+were\s+we\s+(?:doing|working|talking)\s+(?:on|about)",
|
||||
],
|
||||
"description": "Agent lost context from earlier in conversation",
|
||||
},
|
||||
"tool_failure": {
|
||||
"patterns": [
|
||||
r"(?i)tool\s+(?:call|execution)\s+failed",
|
||||
r"(?i)command\s+not\s+found",
|
||||
r"(?i)permission\s+denied",
|
||||
r"(?i)no\s+such\s+file",
|
||||
],
|
||||
"description": "Tool execution failed",
|
||||
},
|
||||
}
|
||||
|
||||
# Prompt improvement templates
|
||||
PROMPT_FIXES = {
|
||||
"retry_loop": (
|
||||
"If an operation fails more than twice, stop retrying. "
|
||||
"Report the failure and ask the user for guidance. "
|
||||
"Do not enter retry loops — they waste tokens."
|
||||
),
|
||||
"timeout": (
|
||||
"For operations that may take long, set a timeout and report "
|
||||
"progress. If an operation takes more than 30 seconds, report "
|
||||
"what you've done so far and ask if you should continue."
|
||||
),
|
||||
"hallucination": (
|
||||
"If you cannot find information, say 'I don't know' or "
|
||||
"'I couldn't find that.' Never fabricate information. "
|
||||
"If a file doesn't exist, say so — don't guess its contents."
|
||||
),
|
||||
"context_loss": (
|
||||
"When you need context from earlier in the conversation, "
|
||||
"use session_search to find it. Don't ask the user to repeat themselves."
|
||||
),
|
||||
"tool_failure": (
|
||||
"If a tool fails, check the error message and try a different approach. "
|
||||
"Don't retry the exact same command — diagnose first."
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class PromptLearner:
|
||||
"""Analyze session transcripts and generate prompt improvements."""
|
||||
|
||||
def __init__(self):
|
||||
PATCHES_DIR.mkdir(parents=True, exist_ok=True)
|
||||
ROLLBACK_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def analyze_session(self, session_data: dict) -> List[FailurePattern]:
|
||||
"""Analyze a session for failure patterns.
|
||||
|
||||
Args:
|
||||
session_data: Session dict with 'messages' list.
|
||||
|
||||
Returns:
|
||||
List of detected failure patterns.
|
||||
"""
|
||||
messages = session_data.get("messages", [])
|
||||
patterns_found: Dict[str, FailurePattern] = {}
|
||||
|
||||
for msg in messages:
|
||||
content = str(msg.get("content", ""))
|
||||
role = msg.get("role", "")
|
||||
|
||||
# Only analyze assistant messages and tool results
|
||||
if role not in ("assistant", "tool"):
|
||||
continue
|
||||
|
||||
for failure_type, config in FAILURE_SIGNALS.items():
|
||||
for pattern in config["patterns"]:
|
||||
if re.search(pattern, content):
|
||||
if failure_type not in patterns_found:
|
||||
patterns_found[failure_type] = FailurePattern(
|
||||
pattern_type=failure_type,
|
||||
description=config["description"],
|
||||
frequency=0,
|
||||
suggested_fix=PROMPT_FIXES.get(failure_type, ""),
|
||||
)
|
||||
patterns_found[failure_type].frequency += 1
|
||||
if len(patterns_found[failure_type].example_messages) < 3:
|
||||
patterns_found[failure_type].example_messages.append(
|
||||
content[:200]
|
||||
)
|
||||
break # One match per message per type is enough
|
||||
|
||||
return list(patterns_found.values())
|
||||
|
||||
def generate_patches(self, patterns: List[FailurePattern],
|
||||
min_confidence: float = 0.7) -> List[PromptPatch]:
|
||||
"""Generate prompt patches from failure patterns.
|
||||
|
||||
Args:
|
||||
patterns: Detected failure patterns.
|
||||
min_confidence: Minimum confidence to generate a patch.
|
||||
|
||||
Returns:
|
||||
List of prompt patches.
|
||||
"""
|
||||
patches = []
|
||||
for pattern in patterns:
|
||||
# Confidence based on frequency
|
||||
if pattern.frequency >= 3:
|
||||
confidence = 0.9
|
||||
elif pattern.frequency >= 2:
|
||||
confidence = 0.75
|
||||
else:
|
||||
confidence = 0.5
|
||||
|
||||
if confidence < min_confidence:
|
||||
continue
|
||||
|
||||
if not pattern.suggested_fix:
|
||||
continue
|
||||
|
||||
patch = PromptPatch(
|
||||
id=f"{pattern.pattern_type}-{int(time.time())}",
|
||||
failure_type=pattern.pattern_type,
|
||||
original_rule="(missing — no existing rule for this pattern)",
|
||||
new_rule=pattern.suggested_fix,
|
||||
confidence=confidence,
|
||||
)
|
||||
patches.append(patch)
|
||||
|
||||
return patches
|
||||
|
||||
def apply_patches(self, patches: List[PromptPatch],
|
||||
prompt_path: Optional[str] = None) -> int:
|
||||
"""Apply patches to the system prompt.
|
||||
|
||||
Args:
|
||||
patches: Patches to apply.
|
||||
prompt_path: Path to prompt file (default: ~/.hermes/system_prompt.md)
|
||||
|
||||
Returns:
|
||||
Number of patches applied.
|
||||
"""
|
||||
if prompt_path is None:
|
||||
prompt_path = str(HERMES_HOME / "system_prompt.md")
|
||||
|
||||
prompt_file = Path(prompt_path)
|
||||
|
||||
# Backup current prompt
|
||||
if prompt_file.exists():
|
||||
backup = ROLLBACK_DIR / f"{prompt_file.name}.{int(time.time())}.bak"
|
||||
backup.write_text(prompt_file.read_text())
|
||||
|
||||
# Read current prompt
|
||||
current = prompt_file.read_text() if prompt_file.exists() else ""
|
||||
|
||||
# Apply patches
|
||||
applied = 0
|
||||
additions = []
|
||||
for patch in patches:
|
||||
if patch.new_rule not in current:
|
||||
additions.append(f"\n## Auto-learned: {patch.failure_type}\n{patch.new_rule}")
|
||||
patch.applied_at = time.time()
|
||||
applied += 1
|
||||
|
||||
if additions:
|
||||
new_content = current + "\n".join(additions)
|
||||
prompt_file.write_text(new_content)
|
||||
|
||||
# Log patches
|
||||
patches_file = PATCHES_DIR / f"patches-{int(time.time())}.json"
|
||||
with open(patches_file, "w") as f:
|
||||
json.dump([p.__dict__ for p in patches], f, indent=2, default=str)
|
||||
|
||||
logger.info("Applied %d prompt patches", applied)
|
||||
return applied
|
||||
|
||||
def rollback_last(self, prompt_path: Optional[str] = None) -> bool:
|
||||
"""Rollback to the most recent backup.
|
||||
|
||||
Args:
|
||||
prompt_path: Path to prompt file.
|
||||
|
||||
Returns:
|
||||
True if rollback succeeded.
|
||||
"""
|
||||
if prompt_path is None:
|
||||
prompt_path = str(HERMES_HOME / "system_prompt.md")
|
||||
|
||||
backups = sorted(ROLLBACK_DIR.glob("*.bak"), reverse=True)
|
||||
if not backups:
|
||||
logger.warning("No backups to rollback to")
|
||||
return False
|
||||
|
||||
latest = backups[0]
|
||||
Path(prompt_path).write_text(latest.read_text())
|
||||
logger.info("Rolled back to %s", latest.name)
|
||||
return True
|
||||
|
||||
def learn_from_session(self, session_data: dict) -> Dict[str, Any]:
|
||||
"""Full learning cycle: analyze → patch → apply.
|
||||
|
||||
Args:
|
||||
session_data: Session dict.
|
||||
|
||||
Returns:
|
||||
Summary of what was learned and applied.
|
||||
"""
|
||||
patterns = self.analyze_session(session_data)
|
||||
patches = self.generate_patches(patterns)
|
||||
applied = self.apply_patches(patches)
|
||||
|
||||
return {
|
||||
"patterns_detected": len(patterns),
|
||||
"patches_generated": len(patches),
|
||||
"patches_applied": applied,
|
||||
"patterns": [
|
||||
{"type": p.pattern_type, "frequency": p.frequency, "description": p.description}
|
||||
for p in patterns
|
||||
],
|
||||
}
|
||||
231
agent/session_compactor.py
Normal file
231
agent/session_compactor.py
Normal file
@@ -0,0 +1,231 @@
|
||||
"""Session compaction with fact extraction.
|
||||
|
||||
Before compressing conversation context, extracts durable facts
|
||||
(user preferences, corrections, project details) and saves them
|
||||
to the fact store so they survive compression.
|
||||
|
||||
Usage:
|
||||
from agent.session_compactor import extract_and_save_facts
|
||||
facts = extract_and_save_facts(messages)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExtractedFact:
|
||||
"""A fact extracted from conversation."""
|
||||
category: str # "user_pref", "correction", "project", "tool_quirk", "general"
|
||||
entity: str # what the fact is about
|
||||
content: str # the fact itself
|
||||
confidence: float # 0.0-1.0
|
||||
source_turn: int # which message turn it came from
|
||||
timestamp: float = 0.0
|
||||
|
||||
|
||||
# Patterns that indicate user preferences
|
||||
_PREFERENCE_PATTERNS = [
|
||||
(r"(?:I|we) (?:prefer|like|want|need) (.+?)(?:\.|$)", "preference"),
|
||||
(r"(?:always|never) (?:use|do|run|deploy) (.+?)(?:\.|$)", "preference"),
|
||||
(r"(?:my|our) (?:default|preferred|usual) (.+?) (?:is|are) (.+?)(?:\.|$)", "preference"),
|
||||
(r"(?:make sure|ensure|remember) (?:to|that) (.+?)(?:\.|$)", "instruction"),
|
||||
(r"(?:don'?t|do not) (?:ever|ever again) (.+?)(?:\.|$)", "constraint"),
|
||||
]
|
||||
|
||||
# Patterns that indicate corrections
|
||||
_CORRECTION_PATTERNS = [
|
||||
(r"(?:actually|no[, ]|wait[, ]|correction[: ]|sorry[, ]) (.+)", "correction"),
|
||||
(r"(?:I meant|what I meant was|the correct) (.+?)(?:\.|$)", "correction"),
|
||||
(r"(?:it'?s|its) (?:not|shouldn'?t be|wrong) (.+?)(?:\.|$)", "correction"),
|
||||
]
|
||||
|
||||
# Patterns that indicate project/tool facts
|
||||
_PROJECT_PATTERNS = [
|
||||
(r"(?:the |our )?(?:project|repo|codebase|code) (?:is|uses|needs|requires) (.+?)(?:\.|$)", "project"),
|
||||
(r"(?:deploy|push|commit) (?:to|on) (.+?)(?:\.|$)", "project"),
|
||||
(r"(?:this|that|the) (?:server|host|machine|VPS) (?:is|runs|has) (.+?)(?:\.|$)", "infrastructure"),
|
||||
(r"(?:model|provider|engine) (?:is|should be|needs to be) (.+?)(?:\.|$)", "config"),
|
||||
]
|
||||
|
||||
|
||||
def extract_facts_from_messages(messages: List[Dict[str, Any]]) -> List[ExtractedFact]:
|
||||
"""Extract durable facts from conversation messages.
|
||||
|
||||
Scans user messages for preferences, corrections, project facts,
|
||||
and infrastructure details that should survive compression.
|
||||
"""
|
||||
facts = []
|
||||
seen_contents = set()
|
||||
|
||||
for turn_idx, msg in enumerate(messages):
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
|
||||
# Only scan user messages and assistant responses with corrections
|
||||
if role not in ("user", "assistant"):
|
||||
continue
|
||||
if not content or not isinstance(content, str):
|
||||
continue
|
||||
if len(content) < 10:
|
||||
continue
|
||||
|
||||
# Skip tool results and system messages
|
||||
if role == "assistant" and msg.get("tool_calls"):
|
||||
continue
|
||||
|
||||
extracted = _extract_from_text(content, turn_idx, role)
|
||||
|
||||
# Deduplicate by content
|
||||
for fact in extracted:
|
||||
key = f"{fact.category}:{fact.content[:100]}"
|
||||
if key not in seen_contents:
|
||||
seen_contents.add(key)
|
||||
facts.append(fact)
|
||||
|
||||
return facts
|
||||
|
||||
|
||||
def _extract_from_text(text: str, turn_idx: int, role: str) -> List[ExtractedFact]:
|
||||
"""Extract facts from a single text block."""
|
||||
facts = []
|
||||
timestamp = time.time()
|
||||
|
||||
# Clean text for pattern matching
|
||||
clean = text.strip()
|
||||
|
||||
# User preference patterns (from user messages)
|
||||
if role == "user":
|
||||
for pattern, subcategory in _PREFERENCE_PATTERNS:
|
||||
for match in re.finditer(pattern, clean, re.IGNORECASE):
|
||||
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
|
||||
if len(content) > 5:
|
||||
facts.append(ExtractedFact(
|
||||
category=f"user_pref.{subcategory}",
|
||||
entity="user",
|
||||
content=content[:200],
|
||||
confidence=0.7,
|
||||
source_turn=turn_idx,
|
||||
timestamp=timestamp,
|
||||
))
|
||||
|
||||
# Correction patterns (from user messages)
|
||||
if role == "user":
|
||||
for pattern, subcategory in _CORRECTION_PATTERNS:
|
||||
for match in re.finditer(pattern, clean, re.IGNORECASE):
|
||||
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
|
||||
if len(content) > 5:
|
||||
facts.append(ExtractedFact(
|
||||
category=f"correction.{subcategory}",
|
||||
entity="user",
|
||||
content=content[:200],
|
||||
confidence=0.8,
|
||||
source_turn=turn_idx,
|
||||
timestamp=timestamp,
|
||||
))
|
||||
|
||||
# Project/infrastructure patterns (from both user and assistant)
|
||||
for pattern, subcategory in _PROJECT_PATTERNS:
|
||||
for match in re.finditer(pattern, clean, re.IGNORECASE):
|
||||
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
|
||||
if len(content) > 5:
|
||||
facts.append(ExtractedFact(
|
||||
category=f"project.{subcategory}",
|
||||
entity=subcategory,
|
||||
content=content[:200],
|
||||
confidence=0.6,
|
||||
source_turn=turn_idx,
|
||||
timestamp=timestamp,
|
||||
))
|
||||
|
||||
return facts
|
||||
|
||||
|
||||
def save_facts_to_store(facts: List[ExtractedFact], fact_store_fn=None) -> int:
|
||||
"""Save extracted facts to the fact store.
|
||||
|
||||
Args:
|
||||
facts: List of extracted facts.
|
||||
fact_store_fn: Optional callable(category, entity, content, trust).
|
||||
If None, uses the holographic fact store if available.
|
||||
|
||||
Returns:
|
||||
Number of facts saved.
|
||||
"""
|
||||
saved = 0
|
||||
|
||||
if fact_store_fn:
|
||||
for fact in facts:
|
||||
try:
|
||||
fact_store_fn(
|
||||
category=fact.category,
|
||||
entity=fact.entity,
|
||||
content=fact.content,
|
||||
trust=fact.confidence,
|
||||
)
|
||||
saved += 1
|
||||
except Exception as e:
|
||||
logger.debug("Failed to save fact: %s", e)
|
||||
else:
|
||||
# Try holographic fact store
|
||||
try:
|
||||
from fact_store import fact_store as _fs
|
||||
for fact in facts:
|
||||
try:
|
||||
_fs(
|
||||
action="add",
|
||||
content=fact.content,
|
||||
category=fact.category,
|
||||
tags=fact.entity,
|
||||
trust_delta=fact.confidence - 0.5,
|
||||
)
|
||||
saved += 1
|
||||
except Exception as e:
|
||||
logger.debug("Failed to save fact via fact_store: %s", e)
|
||||
except ImportError:
|
||||
logger.debug("fact_store not available — facts not persisted")
|
||||
|
||||
return saved
|
||||
|
||||
|
||||
def extract_and_save_facts(
|
||||
messages: List[Dict[str, Any]],
|
||||
fact_store_fn=None,
|
||||
) -> Tuple[List[ExtractedFact], int]:
|
||||
"""Extract facts from messages and save them.
|
||||
|
||||
Returns (extracted_facts, saved_count).
|
||||
"""
|
||||
facts = extract_facts_from_messages(messages)
|
||||
if facts:
|
||||
logger.info("Extracted %d facts from conversation", len(facts))
|
||||
saved = save_facts_to_store(facts, fact_store_fn)
|
||||
logger.info("Saved %d/%d facts to store", saved, len(facts))
|
||||
else:
|
||||
saved = 0
|
||||
return facts, saved
|
||||
|
||||
|
||||
def format_facts_summary(facts: List[ExtractedFact]) -> str:
|
||||
"""Format extracted facts as a readable summary."""
|
||||
if not facts:
|
||||
return "No facts extracted."
|
||||
|
||||
by_category = {}
|
||||
for f in facts:
|
||||
by_category.setdefault(f.category, []).append(f)
|
||||
|
||||
lines = [f"Extracted {len(facts)} facts:", ""]
|
||||
for cat, cat_facts in sorted(by_category.items()):
|
||||
lines.append(f" {cat}:")
|
||||
for f in cat_facts:
|
||||
lines.append(f" - {f.content[:80]}")
|
||||
return "\n".join(lines)
|
||||
91
tests/test_session_compactor.py
Normal file
91
tests/test_session_compactor.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""Tests for session compaction with fact extraction."""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from agent.session_compactor import (
|
||||
ExtractedFact,
|
||||
extract_facts_from_messages,
|
||||
save_facts_to_store,
|
||||
extract_and_save_facts,
|
||||
format_facts_summary,
|
||||
)
|
||||
|
||||
|
||||
class TestFactExtraction:
|
||||
def test_extract_preference(self):
|
||||
messages = [
|
||||
{"role": "user", "content": "I prefer Python over JavaScript for backend work."},
|
||||
]
|
||||
facts = extract_facts_from_messages(messages)
|
||||
assert len(facts) >= 1
|
||||
assert any("Python" in f.content for f in facts)
|
||||
|
||||
def test_extract_correction(self):
|
||||
messages = [
|
||||
{"role": "user", "content": "Actually the port is 8081 not 8080."},
|
||||
]
|
||||
facts = extract_facts_from_messages(messages)
|
||||
assert len(facts) >= 1
|
||||
assert any("8081" in f.content for f in facts)
|
||||
|
||||
def test_extract_project_fact(self):
|
||||
messages = [
|
||||
{"role": "user", "content": "The project uses Gitea for source control."},
|
||||
]
|
||||
facts = extract_facts_from_messages(messages)
|
||||
assert len(facts) >= 1
|
||||
|
||||
def test_skip_tool_results(self):
|
||||
messages = [
|
||||
{"role": "assistant", "content": "Running command...", "tool_calls": [{"id": "1"}]},
|
||||
{"role": "tool", "content": "output here"},
|
||||
]
|
||||
facts = extract_facts_from_messages(messages)
|
||||
assert len(facts) == 0
|
||||
|
||||
def test_skip_short_messages(self):
|
||||
messages = [
|
||||
{"role": "user", "content": "ok"},
|
||||
]
|
||||
facts = extract_facts_from_messages(messages)
|
||||
assert len(facts) == 0
|
||||
|
||||
def test_deduplication(self):
|
||||
messages = [
|
||||
{"role": "user", "content": "I prefer Python."},
|
||||
{"role": "user", "content": "I prefer Python."},
|
||||
]
|
||||
facts = extract_facts_from_messages(messages)
|
||||
# Should deduplicate
|
||||
python_facts = [f for f in facts if "Python" in f.content]
|
||||
assert len(python_facts) == 1
|
||||
|
||||
|
||||
class TestSaveFacts:
|
||||
def test_save_with_callback(self):
|
||||
saved = []
|
||||
def mock_save(category, entity, content, trust):
|
||||
saved.append({"category": category, "content": content})
|
||||
|
||||
facts = [ExtractedFact("user_pref", "user", "likes dark mode", 0.8, 0)]
|
||||
count = save_facts_to_store(facts, fact_store_fn=mock_save)
|
||||
assert count == 1
|
||||
assert len(saved) == 1
|
||||
|
||||
|
||||
class TestFormatSummary:
|
||||
def test_empty(self):
|
||||
assert "No facts" in format_facts_summary([])
|
||||
|
||||
def test_with_facts(self):
|
||||
facts = [
|
||||
ExtractedFact("user_pref", "user", "likes dark mode", 0.8, 0),
|
||||
ExtractedFact("correction", "user", "port is 8081", 0.9, 1),
|
||||
]
|
||||
summary = format_facts_summary(facts)
|
||||
assert "2 facts" in summary
|
||||
assert "user_pref" in summary
|
||||
Reference in New Issue
Block a user