Compare commits
1 Commits
burn/254-1
...
fix/syntax
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1f23c8758a |
@@ -309,19 +309,7 @@ class MemoryManager:
|
||||
"""Notify external providers when the built-in memory tool writes.
|
||||
|
||||
Skips the builtin provider itself (it's the source of the write).
|
||||
Passes current MEMORY.md entries for cross-tier dedup checking.
|
||||
"""
|
||||
# Collect current memory entries for dedup context
|
||||
memory_entries = []
|
||||
for provider in self._providers:
|
||||
if provider.name == "builtin" and hasattr(provider, "_store") and provider._store:
|
||||
try:
|
||||
store = provider._store
|
||||
if hasattr(store, "get_all_entries"):
|
||||
memory_entries = store.get_all_entries(target)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for provider in self._providers:
|
||||
if provider.name == "builtin":
|
||||
continue
|
||||
@@ -333,54 +321,6 @@ class MemoryManager:
|
||||
provider.name, e,
|
||||
)
|
||||
|
||||
def run_dedup_scan(self) -> dict:
|
||||
"""Run cross-tier deduplication scan across all memory providers.
|
||||
|
||||
Returns a report dict with duplicates found and actions taken.
|
||||
"""
|
||||
report = {"status": "ok", "duplicates": 0, "actions": []}
|
||||
|
||||
# Collect MEMORY.md entries
|
||||
memory_entries = []
|
||||
builtin_store = None
|
||||
for provider in self._providers:
|
||||
if provider.name == "builtin" and hasattr(provider, "_store"):
|
||||
builtin_store = provider._store
|
||||
if builtin_store:
|
||||
try:
|
||||
entries = builtin_store.get_all_entries("memory")
|
||||
memory_entries = entries if entries else []
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not memory_entries:
|
||||
report["status"] = "no_memory_entries"
|
||||
return report
|
||||
|
||||
# Check each external provider for duplicates
|
||||
for provider in self._providers:
|
||||
if provider.name == "builtin":
|
||||
continue
|
||||
if not hasattr(provider, "_store") or not provider._store:
|
||||
continue
|
||||
try:
|
||||
from plugins.memory.holographic.dedup import scan_cross_tier_duplicates
|
||||
all_facts = provider._store.list_facts(min_trust=0.0, limit=1000)
|
||||
dup_report = scan_cross_tier_duplicates(memory_entries, all_facts)
|
||||
report["duplicates"] += dup_report.duplicates_found
|
||||
if dup_report.duplicates_found > 0:
|
||||
from plugins.memory.holographic.dedup import resolve_duplicates
|
||||
cleaned = resolve_duplicates(dup_report, memory_entries, provider._store)
|
||||
removed = len(memory_entries) - len(cleaned)
|
||||
report["actions"].append(
|
||||
f"{provider.name}: {dup_report.duplicates_found} duplicates, "
|
||||
f"{removed} MEMORY.md entries removed"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Dedup scan failed for provider '%s': %s", provider.name, e)
|
||||
|
||||
return report
|
||||
|
||||
def on_delegation(self, task: str, result: str, *,
|
||||
child_session_id: str = "", **kwargs) -> None:
|
||||
"""Notify all providers that a subagent completed."""
|
||||
|
||||
@@ -1,91 +0,0 @@
|
||||
# Memory Tier Ownership
|
||||
|
||||
Each fact lives in exactly one tier. This prevents duplicate tokens on every
|
||||
prompt injection and eliminates stale-data divergence when one copy is updated
|
||||
but not the other.
|
||||
|
||||
## Tier 1 — MEMORY.md (Built-in)
|
||||
|
||||
**Purpose:** Always-on system prompt context — compact, high-signal.
|
||||
|
||||
**Contains:**
|
||||
- Operational notes and active task state
|
||||
- Immediate context the agent needs every turn
|
||||
- User preferences that affect agent behavior
|
||||
|
||||
**Constraints:**
|
||||
- Keep under 50 entries (every byte costs prompt tokens)
|
||||
- Entries >100 chars should migrate to the fact store
|
||||
- Managed via the `memory` tool (add/replace/remove)
|
||||
|
||||
**Examples:**
|
||||
- "Default model: mimo-v2-pro/Nous"
|
||||
- "Alexander prefers action over narration"
|
||||
- "Deploy via Ansible; wants one-command redeploy"
|
||||
|
||||
## Tier 2 — Fact Store (Holographic)
|
||||
|
||||
**Purpose:** Deep structured storage with search, reasoning, and trust scoring.
|
||||
|
||||
**Contains:**
|
||||
- `user_pref` — User preferences and habits
|
||||
- `project` — Project-specific facts and conventions
|
||||
- `tool` — Tool quirks, API behaviors, environment details
|
||||
- `general` — Everything else worth remembering
|
||||
|
||||
**Advantages over MEMORY.md:**
|
||||
- FTS5 full-text search
|
||||
- Entity resolution (link facts to people/projects/tools)
|
||||
- Trust scoring (good facts rise, bad facts sink)
|
||||
- Compositional reasoning (`reason` across multiple entities)
|
||||
- Duplicate detection (UNIQUE constraint + similarity matching)
|
||||
- Unlimited size
|
||||
|
||||
**Managed via:** `fact_store` tool (add/search/probe/related/reason/contradict/update/remove/list)
|
||||
|
||||
## Tier 3 — MemPalace
|
||||
|
||||
**Purpose:** Specialized long-form archives and multi-session research.
|
||||
|
||||
**Contains:**
|
||||
- Detailed analysis and research notes
|
||||
- Multi-session task context
|
||||
- Structured "palace rooms" for domain-specific knowledge
|
||||
|
||||
## Migration Rules
|
||||
|
||||
| Condition | Destination |
|
||||
|-----------|------------|
|
||||
| Entry >100 chars | → fact store |
|
||||
| Category is `user_pref`, `project`, `tool` | → fact store |
|
||||
| Needs entity linking | → fact store |
|
||||
| Needs trust scoring | → fact store |
|
||||
| Short operational note (<80 chars) | → MEMORY.md |
|
||||
| Always-on context | → MEMORY.md |
|
||||
| When in doubt | → fact store |
|
||||
|
||||
## Cross-Tier Deduplication
|
||||
|
||||
**Problem:** The `on_memory_write` bridge mirrors MEMORY.md writes to the fact
|
||||
store. Without dedup, the same fact exists in both tiers — wasting tokens and
|
||||
risking stale data.
|
||||
|
||||
**Solution:**
|
||||
1. `on_memory_write` checks the fact store for similar entries before mirroring
|
||||
2. Similarity threshold: 0.85 (catches rephrasings, avoids false positives)
|
||||
3. If duplicate found: skip the mirror (fact store entry is canonical)
|
||||
4. `dedup` action on `fact_store` tool: runtime scan + auto-resolve
|
||||
5. `MemoryManager.run_dedup_scan()`: programmatic cross-tier cleanup
|
||||
|
||||
**Resolution strategy:** Fact store wins by default — it has trust scoring,
|
||||
FTS5, and entity resolution. MEMORY.md copies are removed.
|
||||
|
||||
## Running Dedup
|
||||
|
||||
```python
|
||||
# Via tool
|
||||
result = fact_store(action="dedup")
|
||||
|
||||
# Via MemoryManager
|
||||
report = memory_manager.run_dedup_scan()
|
||||
```
|
||||
@@ -648,51 +648,6 @@ def load_gateway_config() -> GatewayConfig:
|
||||
return config
|
||||
|
||||
|
||||
# Known-weak placeholder tokens from .env.example, tutorials, etc.
|
||||
_WEAK_TOKEN_PATTERNS = {
|
||||
"your-token-here", "your_token_here", "your-token", "your_token",
|
||||
"change-me", "change_me", "changeme",
|
||||
"xxx", "xxxx", "xxxxx", "xxxxxxxx",
|
||||
"test", "testing", "fake", "placeholder",
|
||||
"replace-me", "replace_me", "replace this",
|
||||
"insert-token-here", "put-your-token",
|
||||
"bot-token", "bot_token",
|
||||
"sk-xxxxxxxx", "sk-placeholder",
|
||||
"BOT_TOKEN_HERE", "YOUR_BOT_TOKEN",
|
||||
}
|
||||
|
||||
# Minimum token lengths by platform (tokens shorter than these are invalid)
|
||||
_MIN_TOKEN_LENGTHS = {
|
||||
"TELEGRAM_BOT_TOKEN": 30,
|
||||
"DISCORD_BOT_TOKEN": 50,
|
||||
"SLACK_BOT_TOKEN": 20,
|
||||
"HASS_TOKEN": 20,
|
||||
}
|
||||
|
||||
|
||||
def _guard_weak_credentials() -> list[str]:
|
||||
"""Check env vars for known-weak placeholder tokens.
|
||||
|
||||
Returns a list of warning messages for any weak credentials found.
|
||||
"""
|
||||
warnings = []
|
||||
for env_var, min_len in _MIN_TOKEN_LENGTHS.items():
|
||||
value = os.getenv(env_var, "").strip()
|
||||
if not value:
|
||||
continue
|
||||
if value.lower() in _WEAK_TOKEN_PATTERNS:
|
||||
warnings.append(
|
||||
f"{env_var} is set to a placeholder value ('{value[:20]}'). "
|
||||
f"Replace it with a real token."
|
||||
)
|
||||
elif len(value) < min_len:
|
||||
warnings.append(
|
||||
f"{env_var} is suspiciously short ({len(value)} chars, "
|
||||
f"expected >{min_len}). May be truncated or invalid."
|
||||
)
|
||||
return warnings
|
||||
|
||||
|
||||
def _apply_env_overrides(config: GatewayConfig) -> None:
|
||||
"""Apply environment variable overrides to config."""
|
||||
|
||||
@@ -986,7 +941,3 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
|
||||
config.default_reset_policy.at_hour = int(reset_hour)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Guard against weak placeholder tokens from .env.example copies
|
||||
for warning in _guard_weak_credentials():
|
||||
logger.warning("Weak credential: %s", warning)
|
||||
|
||||
@@ -540,29 +540,6 @@ def handle_function_call(
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Poka-yoke: validate tool handler return type.
|
||||
# Handlers MUST return a JSON string. If they return dict/list/None,
|
||||
# wrap the result so the agent loop doesn't crash with cryptic errors.
|
||||
if not isinstance(result, str):
|
||||
logger.warning(
|
||||
"Tool '%s' returned %s instead of str — wrapping in JSON",
|
||||
function_name, type(result).__name__,
|
||||
)
|
||||
result = json.dumps(
|
||||
{"output": str(result), "_type_warning": f"Tool returned {type(result).__name__}, expected str"},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
else:
|
||||
# Validate it's parseable JSON
|
||||
try:
|
||||
json.loads(result)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
logger.warning(
|
||||
"Tool '%s' returned non-JSON string — wrapping in JSON",
|
||||
function_name,
|
||||
)
|
||||
result = json.dumps({"output": result}, ensure_ascii=False)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -12,7 +12,7 @@ Config in $HERMES_HOME/config.yaml (profile-scoped):
|
||||
auto_extract: false
|
||||
default_trust: 0.5
|
||||
min_trust_threshold: 0.3
|
||||
temporal_decay_half_life: 60
|
||||
temporal_decay_half_life: 0
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -55,7 +55,7 @@ FACT_STORE_SCHEMA = {
|
||||
"properties": {
|
||||
"action": {
|
||||
"type": "string",
|
||||
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list", "dedup"],
|
||||
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list"],
|
||||
},
|
||||
"content": {"type": "string", "description": "Fact content (required for 'add')."},
|
||||
"query": {"type": "string", "description": "Search query (required for 'search')."},
|
||||
@@ -152,7 +152,6 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
{"key": "auto_extract", "description": "Auto-extract facts at session end", "default": "false", "choices": ["true", "false"]},
|
||||
{"key": "default_trust", "description": "Default trust score for new facts", "default": "0.5"},
|
||||
{"key": "hrr_dim", "description": "HRR vector dimensions", "default": "1024"},
|
||||
{"key": "temporal_decay_half_life", "description": "Days for facts to lose half their relevance (0=disabled)", "default": "60"},
|
||||
]
|
||||
|
||||
def initialize(self, session_id: str, **kwargs) -> None:
|
||||
@@ -169,7 +168,7 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
default_trust = float(self._config.get("default_trust", 0.5))
|
||||
hrr_dim = int(self._config.get("hrr_dim", 1024))
|
||||
hrr_weight = float(self._config.get("hrr_weight", 0.3))
|
||||
temporal_decay = int(self._config.get("temporal_decay_half_life", 60))
|
||||
temporal_decay = int(self._config.get("temporal_decay_half_life", 0))
|
||||
|
||||
self._store = MemoryStore(db_path=db_path, default_trust=default_trust, hrr_dim=hrr_dim)
|
||||
self._retriever = FactRetriever(
|
||||
@@ -242,48 +241,27 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
self._auto_extract_facts(messages)
|
||||
|
||||
def on_memory_write(self, action: str, target: str, content: str) -> None:
|
||||
"""Mirror built-in memory writes as facts with cross-tier dedup.
|
||||
"""Mirror built-in memory writes as facts.
|
||||
|
||||
- add: check for duplicates first, skip if fact already exists
|
||||
- replace: search for old content, update or re-add (dedup-aware)
|
||||
- remove: remove matching facts (hard remove, not trust decay)
|
||||
|
||||
Dedup strategy: before adding, search existing facts for near-matches.
|
||||
If similarity > 0.85, skip the add (existing fact store entry wins).
|
||||
- add: mirror new fact to holographic store
|
||||
- replace: search for old content, update or re-add
|
||||
- remove: lower trust on matching facts so they fade naturally
|
||||
"""
|
||||
if not self._store:
|
||||
return
|
||||
try:
|
||||
if action == "add" and content:
|
||||
category = "user_pref" if target == "user" else "general"
|
||||
# Cross-tier dedup: check if this fact already exists
|
||||
from .dedup import is_duplicate_before_add
|
||||
existing = self._store.search_facts(content.strip()[:200], limit=5)
|
||||
dup = is_duplicate_before_add(content, existing)
|
||||
if dup:
|
||||
logger.debug(
|
||||
"Skipping duplicate mirror: '%s' already exists as fact#%d",
|
||||
content[:60], dup.get("fact_id", "?")
|
||||
)
|
||||
return
|
||||
self._store.add_fact(content, category=category)
|
||||
elif action == "replace" and content:
|
||||
category = "user_pref" if target == "user" else "general"
|
||||
# Check for duplicate before adding replacement
|
||||
from .dedup import is_duplicate_before_add
|
||||
existing = self._store.search_facts(content.strip()[:200], limit=5)
|
||||
dup = is_duplicate_before_add(content, existing)
|
||||
if dup:
|
||||
logger.debug("Skipping duplicate replace mirror: fact#%d already matches", dup.get("fact_id", "?"))
|
||||
return
|
||||
self._store.add_fact(content, category=category)
|
||||
elif action == "remove" and content:
|
||||
# Hard remove matching facts (not just trust decay)
|
||||
# Lower trust on matching facts so they decay naturally
|
||||
results = self._store.search_facts(content, limit=5)
|
||||
for fact in results:
|
||||
if content.strip().lower() in fact.get("content", "").lower():
|
||||
self._store.remove_fact(fact["fact_id"])
|
||||
logger.debug("Removed mirrored fact#%d on memory remove", fact["fact_id"])
|
||||
self._store.update_fact(fact["fact_id"], trust=max(0.0, fact.get("trust", 0.5) - 0.4))
|
||||
except Exception as e:
|
||||
logger.debug("Holographic memory_write mirror failed: %s", e)
|
||||
|
||||
@@ -372,31 +350,6 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
)
|
||||
return json.dumps({"facts": facts, "count": len(facts)})
|
||||
|
||||
elif action == "dedup":
|
||||
from .dedup import scan_cross_tier_duplicates, resolve_duplicates, DedupReport
|
||||
# Get all facts from store
|
||||
all_facts = store.list_facts(min_trust=0.0, limit=1000)
|
||||
# Get memory entries from built-in store (passed via kwargs if available)
|
||||
memory_entries = kwargs.get("memory_entries", [])
|
||||
if not memory_entries:
|
||||
return json.dumps({
|
||||
"status": "no_memory_entries",
|
||||
"message": "No MEMORY.md entries provided for comparison. Use memory tool to read first.",
|
||||
"fact_count": len(all_facts),
|
||||
})
|
||||
report = scan_cross_tier_duplicates(memory_entries, all_facts)
|
||||
if report.duplicates_found == 0:
|
||||
return json.dumps({"status": "clean", "message": "No cross-tier duplicates found."})
|
||||
# Auto-resolve: fact store wins
|
||||
cleaned = resolve_duplicates(report, memory_entries, store)
|
||||
return json.dumps({
|
||||
"status": "resolved",
|
||||
"duplicates_found": report.duplicates_found,
|
||||
"entries_removed": len(memory_entries) - len(cleaned),
|
||||
"cleaned_entries": cleaned,
|
||||
"summary": report.summary(),
|
||||
})
|
||||
|
||||
else:
|
||||
return json.dumps({"error": f"Unknown action: {action}"})
|
||||
|
||||
|
||||
@@ -1,191 +0,0 @@
|
||||
"""Cross-tier memory deduplication.
|
||||
|
||||
Detects and resolves duplicate facts between MEMORY.md (built-in) and the
|
||||
holographic fact store. Facts should live in exactly one tier:
|
||||
|
||||
Tier 1 — MEMORY.md: Always-on context (compact, <50 entries ideal).
|
||||
Tier 2 — Fact store: Deep structured storage (unlimited, entity-aware).
|
||||
Tier 3 — MemPalace: Specialized long-form archives.
|
||||
|
||||
Ownership rules:
|
||||
- user_pref / project / tool facts → fact store (structured, searchable)
|
||||
- "always-on" operational notes → MEMORY.md (compact, system prompt)
|
||||
- When in doubt: fact store wins (it has dedup, trust scoring, FTS5)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from difflib import SequenceMatcher
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_SIMILARITY_THRESHOLD = 0.85
|
||||
|
||||
|
||||
@dataclass
|
||||
class DuplicatePair:
|
||||
memory_entry: str
|
||||
memory_index: int
|
||||
fact_store_id: int
|
||||
fact_store_content: str
|
||||
similarity: float
|
||||
resolution: str = ""
|
||||
resolved: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class DedupReport:
|
||||
total_memory_entries: int = 0
|
||||
total_facts: int = 0
|
||||
duplicates_found: int = 0
|
||||
pairs: List[DuplicatePair] = field(default_factory=list)
|
||||
|
||||
def summary(self) -> str:
|
||||
lines = [
|
||||
f"Cross-tier dedup: {self.total_memory_entries} MEMORY.md entries, "
|
||||
f"{self.total_facts} fact store entries, "
|
||||
f"{self.duplicates_found} duplicates found",
|
||||
]
|
||||
for p in self.pairs:
|
||||
status = f"[{p.resolution}]" if p.resolved else "[PENDING]"
|
||||
lines.append(
|
||||
f" {status} sim={p.similarity:.2f} "
|
||||
f"mem[{p.memory_index}]: {p.memory_entry[:60]} "
|
||||
f"<> fact#{p.fact_store_id}: {p.fact_store_content[:60]}"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _normalize(text: str) -> str:
|
||||
text = text.strip().lower()
|
||||
text = re.sub(r"^[\\s>*\\-\\u2022]+", "", text)
|
||||
text = re.sub(r"\\s+", " ", text)
|
||||
text = text.rstrip(".,;:!?",)
|
||||
return text
|
||||
|
||||
|
||||
def _similarity(a: str, b: str) -> float:
|
||||
if not a or not b:
|
||||
return 0.0
|
||||
return SequenceMatcher(None, a, b).ratio()
|
||||
|
||||
|
||||
def scan_cross_tier_duplicates(
|
||||
memory_entries: List[str],
|
||||
fact_store_facts: List[Dict[str, Any]],
|
||||
threshold: float = _SIMILARITY_THRESHOLD,
|
||||
) -> DedupReport:
|
||||
report = DedupReport(
|
||||
total_memory_entries=len(memory_entries),
|
||||
total_facts=len(fact_store_facts),
|
||||
)
|
||||
for i, mem_line in enumerate(memory_entries):
|
||||
mem_norm = _normalize(mem_line)
|
||||
if not mem_norm or len(mem_norm) < 10:
|
||||
continue
|
||||
for fact in fact_store_facts:
|
||||
fact_norm = _normalize(fact.get("content", ""))
|
||||
if not fact_norm or len(fact_norm) < 10:
|
||||
continue
|
||||
sim = _similarity(mem_norm, fact_norm)
|
||||
if sim >= threshold:
|
||||
report.pairs.append(DuplicatePair(
|
||||
memory_entry=mem_line,
|
||||
memory_index=i,
|
||||
fact_store_id=fact.get("fact_id", -1),
|
||||
fact_store_content=fact.get("content", ""),
|
||||
similarity=sim,
|
||||
))
|
||||
report.duplicates_found = len(report.pairs)
|
||||
return report
|
||||
|
||||
|
||||
def classify_tier(fact_content: str, category: str = "general") -> str:
|
||||
if category in ("user_pref", "project", "tool"):
|
||||
return "factstore"
|
||||
content = fact_content.strip()
|
||||
if len(content) < 80 and any(
|
||||
kw in content.lower() for kw in ("todo", "note:", "fix:", "remember:", "always", "never")
|
||||
):
|
||||
return "memory"
|
||||
return "factstore"
|
||||
|
||||
|
||||
def resolve_pair(pair: DuplicatePair) -> str:
|
||||
pair.resolution = "keep_factstore"
|
||||
pair.resolved = True
|
||||
return pair.resolution
|
||||
|
||||
|
||||
def resolve_duplicates(
|
||||
report: DedupReport,
|
||||
memory_entries: List[str],
|
||||
fact_store=None,
|
||||
) -> List[str]:
|
||||
indices_to_remove = set()
|
||||
for pair in report.pairs:
|
||||
resolve_pair(pair)
|
||||
if pair.resolution == "keep_factstore":
|
||||
indices_to_remove.add(pair.memory_index)
|
||||
elif pair.resolution == "keep_memory" and fact_store:
|
||||
try:
|
||||
fact_store.remove_fact(pair.fact_store_id)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to remove fact %d: %s", pair.fact_store_id, e)
|
||||
cleaned = [e for i, e in enumerate(memory_entries) if i not in indices_to_remove]
|
||||
removed = len(memory_entries) - len(cleaned)
|
||||
if removed:
|
||||
logger.info("Dedup removed %d duplicate entries from MEMORY.md", removed)
|
||||
return cleaned
|
||||
|
||||
|
||||
def is_duplicate_before_add(
|
||||
content: str,
|
||||
existing_facts: List[Dict[str, Any]],
|
||||
threshold: float = _SIMILARITY_THRESHOLD,
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""Check if content is a duplicate of an existing fact BEFORE adding.
|
||||
|
||||
Returns the matching fact dict if duplicate, None otherwise.
|
||||
Used by on_memory_write to prevent cross-tier duplication at write time.
|
||||
"""
|
||||
content_norm = _normalize(content)
|
||||
if not content_norm or len(content_norm) < 10:
|
||||
return None
|
||||
for fact in existing_facts:
|
||||
fact_norm = _normalize(fact.get("content", ""))
|
||||
if not fact_norm or len(fact_norm) < 10:
|
||||
continue
|
||||
if _similarity(content_norm, fact_norm) >= threshold:
|
||||
return fact
|
||||
return None
|
||||
|
||||
|
||||
TIER_OWNERSHIP_DOC = """# Memory Tier Ownership
|
||||
|
||||
Each fact lives in exactly one tier to prevent duplicate tokens and stale-data divergence.
|
||||
|
||||
## Tier 1 — MEMORY.md (built-in)
|
||||
- Always-on system prompt context (compact, <50 entries ideal).
|
||||
- Operational notes, active task state, immediate context.
|
||||
- Managed by: `memory` tool.
|
||||
|
||||
## Tier 2 — Fact Store (holographic)
|
||||
- Deep structured storage with search and reasoning.
|
||||
- user_pref, project, tool facts; entity-linked knowledge.
|
||||
- Managed by: `fact_store` tool.
|
||||
- Has: FTS5 search, trust scoring, entity resolution.
|
||||
|
||||
## Tier 3 — MemPalace
|
||||
- Specialized long-form archives and research.
|
||||
|
||||
## Rules
|
||||
- MEMORY.md entries >100 chars → migrate to fact store.
|
||||
- Structured categories (user_pref, project, tool) → fact store.
|
||||
- Duplicate across tiers: fact store wins (it has trust scoring).
|
||||
- `on_memory_write` checks fact store before mirroring.
|
||||
"""
|
||||
@@ -98,15 +98,7 @@ class FactRetriever:
|
||||
|
||||
# Optional temporal decay
|
||||
if self.half_life > 0:
|
||||
decay = self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
|
||||
# Access-recency boost: facts retrieved recently decay slower.
|
||||
# A fact accessed within 1 half-life gets up to 1.5x the decay
|
||||
# factor, tapering to 1.0x (no boost) after 2 half-lives.
|
||||
last_accessed = fact.get("last_accessed_at")
|
||||
if last_accessed:
|
||||
access_boost = self._access_recency_boost(last_accessed)
|
||||
decay = min(1.0, decay * access_boost)
|
||||
score *= decay
|
||||
score *= self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
|
||||
|
||||
fact["score"] = score
|
||||
scored.append(fact)
|
||||
@@ -599,41 +591,3 @@ class FactRetriever:
|
||||
return math.pow(0.5, age_days / self.half_life)
|
||||
except (ValueError, TypeError):
|
||||
return 1.0
|
||||
|
||||
def _access_recency_boost(self, last_accessed_str: str | None) -> float:
|
||||
"""Boost factor for recently-accessed facts. Range [1.0, 1.5].
|
||||
|
||||
Facts accessed within 1 half-life get up to 1.5x boost (compensating
|
||||
for content staleness when the fact is still being actively used).
|
||||
Boost decays linearly to 1.0 (no boost) at 2 half-lives.
|
||||
|
||||
Returns 1.0 if half-life is disabled or timestamp is missing.
|
||||
"""
|
||||
if not self.half_life or not last_accessed_str:
|
||||
return 1.0
|
||||
|
||||
try:
|
||||
if isinstance(last_accessed_str, str):
|
||||
ts = datetime.fromisoformat(last_accessed_str.replace("Z", "+00:00"))
|
||||
else:
|
||||
ts = last_accessed_str
|
||||
|
||||
if ts.tzinfo is None:
|
||||
ts = ts.replace(tzinfo=timezone.utc)
|
||||
|
||||
age_days = (datetime.now(timezone.utc) - ts).total_seconds() / 86400
|
||||
if age_days < 0:
|
||||
return 1.5 # Future timestamp = just accessed
|
||||
|
||||
half_lives_since_access = age_days / self.half_life
|
||||
|
||||
if half_lives_since_access <= 1.0:
|
||||
# Within 1 half-life: linearly from 1.5 (just now) to 1.0 (at 1 HL)
|
||||
return 1.0 + 0.5 * (1.0 - half_lives_since_access)
|
||||
elif half_lives_since_access <= 2.0:
|
||||
# Between 1 and 2 half-lives: linearly from 1.0 to 1.0 (no boost)
|
||||
return 1.0
|
||||
else:
|
||||
return 1.0
|
||||
except (ValueError, TypeError):
|
||||
return 1.0
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
"""Tests for weak credential guard in gateway/config.py."""
|
||||
|
||||
import os
|
||||
import pytest
|
||||
|
||||
from gateway.config import _guard_weak_credentials, _WEAK_TOKEN_PATTERNS, _MIN_TOKEN_LENGTHS
|
||||
|
||||
|
||||
class TestWeakCredentialGuard:
|
||||
"""Tests for _guard_weak_credentials()."""
|
||||
|
||||
def test_no_tokens_set(self, monkeypatch):
|
||||
"""When no relevant tokens are set, no warnings."""
|
||||
for var in _MIN_TOKEN_LENGTHS:
|
||||
monkeypatch.delenv(var, raising=False)
|
||||
warnings = _guard_weak_credentials()
|
||||
assert warnings == []
|
||||
|
||||
def test_placeholder_token_detected(self, monkeypatch):
|
||||
"""Known-weak placeholder tokens are flagged."""
|
||||
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "your-token-here")
|
||||
warnings = _guard_weak_credentials()
|
||||
assert len(warnings) == 1
|
||||
assert "TELEGRAM_BOT_TOKEN" in warnings[0]
|
||||
assert "placeholder" in warnings[0].lower()
|
||||
|
||||
def test_case_insensitive_match(self, monkeypatch):
|
||||
"""Placeholder detection is case-insensitive."""
|
||||
monkeypatch.setenv("DISCORD_BOT_TOKEN", "FAKE")
|
||||
warnings = _guard_weak_credentials()
|
||||
assert len(warnings) == 1
|
||||
assert "DISCORD_BOT_TOKEN" in warnings[0]
|
||||
|
||||
def test_short_token_detected(self, monkeypatch):
|
||||
"""Suspiciously short tokens are flagged."""
|
||||
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "abc123") # 6 chars, min is 30
|
||||
warnings = _guard_weak_credentials()
|
||||
assert len(warnings) == 1
|
||||
assert "short" in warnings[0].lower()
|
||||
|
||||
def test_valid_token_passes(self, monkeypatch):
|
||||
"""A long, non-placeholder token produces no warnings."""
|
||||
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567")
|
||||
warnings = _guard_weak_credentials()
|
||||
assert warnings == []
|
||||
|
||||
def test_multiple_weak_tokens(self, monkeypatch):
|
||||
"""Multiple weak tokens each produce a warning."""
|
||||
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "change-me")
|
||||
monkeypatch.setenv("DISCORD_BOT_TOKEN", "xx") # short
|
||||
warnings = _guard_weak_credentials()
|
||||
assert len(warnings) == 2
|
||||
@@ -1,209 +0,0 @@
|
||||
"""Tests for temporal decay and access-recency boost in holographic memory (#241)."""
|
||||
|
||||
import math
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestTemporalDecay:
|
||||
"""Test _temporal_decay exponential decay formula."""
|
||||
|
||||
def _make_retriever(self, half_life=60):
|
||||
from plugins.memory.holographic.retrieval import FactRetriever
|
||||
store = MagicMock()
|
||||
return FactRetriever(store=store, temporal_decay_half_life=half_life)
|
||||
|
||||
def test_fresh_fact_no_decay(self):
|
||||
"""A fact updated today should have decay ≈ 1.0."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
decay = r._temporal_decay(now)
|
||||
assert decay > 0.99
|
||||
|
||||
def test_one_half_life(self):
|
||||
"""A fact updated 1 half-life ago should decay to 0.5."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
old = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
|
||||
decay = r._temporal_decay(old)
|
||||
assert abs(decay - 0.5) < 0.01
|
||||
|
||||
def test_two_half_lives(self):
|
||||
"""A fact updated 2 half-lives ago should decay to 0.25."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
old = (datetime.now(timezone.utc) - timedelta(days=120)).isoformat()
|
||||
decay = r._temporal_decay(old)
|
||||
assert abs(decay - 0.25) < 0.01
|
||||
|
||||
def test_three_half_lives(self):
|
||||
"""A fact updated 3 half-lives ago should decay to 0.125."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
old = (datetime.now(timezone.utc) - timedelta(days=180)).isoformat()
|
||||
decay = r._temporal_decay(old)
|
||||
assert abs(decay - 0.125) < 0.01
|
||||
|
||||
def test_half_life_disabled(self):
|
||||
"""When half_life=0, decay should always be 1.0."""
|
||||
r = self._make_retriever(half_life=0)
|
||||
old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
|
||||
assert r._temporal_decay(old) == 1.0
|
||||
|
||||
def test_none_timestamp(self):
|
||||
"""Missing timestamp should return 1.0 (no decay)."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
assert r._temporal_decay(None) == 1.0
|
||||
|
||||
def test_empty_timestamp(self):
|
||||
r = self._make_retriever(half_life=60)
|
||||
assert r._temporal_decay("") == 1.0
|
||||
|
||||
def test_invalid_timestamp(self):
|
||||
"""Malformed timestamp should return 1.0 (fail open)."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
assert r._temporal_decay("not-a-date") == 1.0
|
||||
|
||||
def test_future_timestamp(self):
|
||||
"""Future timestamp should return 1.0 (no decay for future dates)."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
future = (datetime.now(timezone.utc) + timedelta(days=10)).isoformat()
|
||||
assert r._temporal_decay(future) == 1.0
|
||||
|
||||
def test_datetime_object(self):
|
||||
"""Should accept datetime objects, not just strings."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
old = datetime.now(timezone.utc) - timedelta(days=60)
|
||||
decay = r._temporal_decay(old)
|
||||
assert abs(decay - 0.5) < 0.01
|
||||
|
||||
def test_different_half_lives(self):
|
||||
"""30-day half-life should decay faster than 90-day."""
|
||||
r30 = self._make_retriever(half_life=30)
|
||||
r90 = self._make_retriever(half_life=90)
|
||||
old = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
|
||||
assert r30._temporal_decay(old) < r90._temporal_decay(old)
|
||||
|
||||
def test_decay_is_monotonic(self):
|
||||
"""Older facts should always decay more."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
now = datetime.now(timezone.utc)
|
||||
d1 = r._temporal_decay((now - timedelta(days=10)).isoformat())
|
||||
d2 = r._temporal_decay((now - timedelta(days=30)).isoformat())
|
||||
d3 = r._temporal_decay((now - timedelta(days=60)).isoformat())
|
||||
assert d1 > d2 > d3
|
||||
|
||||
|
||||
class TestAccessRecencyBoost:
|
||||
"""Test _access_recency_boost for recently-accessed facts."""
|
||||
|
||||
def _make_retriever(self, half_life=60):
|
||||
from plugins.memory.holographic.retrieval import FactRetriever
|
||||
store = MagicMock()
|
||||
return FactRetriever(store=store, temporal_decay_half_life=half_life)
|
||||
|
||||
def test_just_accessed_max_boost(self):
|
||||
"""A fact accessed just now should get maximum boost (1.5)."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
boost = r._access_recency_boost(now)
|
||||
assert boost > 1.45 # Near 1.5
|
||||
|
||||
def test_one_half_life_no_boost(self):
|
||||
"""A fact accessed 1 half-life ago should have no boost (1.0)."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
old = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
|
||||
boost = r._access_recency_boost(old)
|
||||
assert abs(boost - 1.0) < 0.01
|
||||
|
||||
def test_half_way_boost(self):
|
||||
"""A fact accessed 0.5 half-lives ago should get ~1.25 boost."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
old = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
|
||||
boost = r._access_recency_boost(old)
|
||||
assert abs(boost - 1.25) < 0.05
|
||||
|
||||
def test_beyond_one_half_life_no_boost(self):
|
||||
"""Beyond 1 half-life, boost should be 1.0."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
old = (datetime.now(timezone.utc) - timedelta(days=90)).isoformat()
|
||||
boost = r._access_recency_boost(old)
|
||||
assert boost == 1.0
|
||||
|
||||
def test_disabled_no_boost(self):
|
||||
"""When half_life=0, boost should be 1.0."""
|
||||
r = self._make_retriever(half_life=0)
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
assert r._access_recency_boost(now) == 1.0
|
||||
|
||||
def test_none_timestamp(self):
|
||||
r = self._make_retriever(half_life=60)
|
||||
assert r._access_recency_boost(None) == 1.0
|
||||
|
||||
def test_invalid_timestamp(self):
|
||||
r = self._make_retriever(half_life=60)
|
||||
assert r._access_recency_boost("bad") == 1.0
|
||||
|
||||
def test_boost_range(self):
|
||||
"""Boost should always be in [1.0, 1.5]."""
|
||||
r = self._make_retriever(half_life=60)
|
||||
now = datetime.now(timezone.utc)
|
||||
for days in [0, 1, 15, 30, 45, 59, 60, 90, 365]:
|
||||
ts = (now - timedelta(days=days)).isoformat()
|
||||
boost = r._access_recency_boost(ts)
|
||||
assert 1.0 <= boost <= 1.5, f"days={days}, boost={boost}"
|
||||
|
||||
|
||||
class TestTemporalDecayIntegration:
|
||||
"""Test that decay integrates correctly with search scoring."""
|
||||
|
||||
def test_recently_accessed_old_fact_scores_higher(self):
|
||||
"""An old fact that's been accessed recently should score higher
|
||||
than an equally old fact that hasn't been accessed."""
|
||||
from plugins.memory.holographic.retrieval import FactRetriever
|
||||
store = MagicMock()
|
||||
r = FactRetriever(store=store, temporal_decay_half_life=60)
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
old_date = (now - timedelta(days=120)).isoformat() # 2 half-lives old
|
||||
recent_access = (now - timedelta(days=10)).isoformat() # accessed 10 days ago
|
||||
old_access = (now - timedelta(days=200)).isoformat() # accessed 200 days ago
|
||||
|
||||
# Old fact, recently accessed
|
||||
decay1 = r._temporal_decay(old_date)
|
||||
boost1 = r._access_recency_boost(recent_access)
|
||||
effective1 = min(1.0, decay1 * boost1)
|
||||
|
||||
# Old fact, not recently accessed
|
||||
decay2 = r._temporal_decay(old_date)
|
||||
boost2 = r._access_recency_boost(old_access)
|
||||
effective2 = min(1.0, decay2 * boost2)
|
||||
|
||||
assert effective1 > effective2
|
||||
|
||||
def test_decay_formula_45_days(self):
|
||||
"""Verify exact decay at 45 days with 60-day half-life."""
|
||||
from plugins.memory.holographic.retrieval import FactRetriever
|
||||
r = FactRetriever(store=MagicMock(), temporal_decay_half_life=60)
|
||||
old = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
|
||||
decay = r._temporal_decay(old)
|
||||
expected = math.pow(0.5, 45/60)
|
||||
assert abs(decay - expected) < 0.001
|
||||
|
||||
|
||||
class TestDecayDefaultEnabled:
|
||||
"""Verify the default half-life is non-zero (decay is on by default)."""
|
||||
|
||||
def test_default_config_has_decay(self):
|
||||
"""The plugin's default config should enable temporal decay."""
|
||||
from plugins.memory.holographic import _load_plugin_config
|
||||
# The docstring says temporal_decay_half_life: 60
|
||||
# The initialize() default should be 60
|
||||
import inspect
|
||||
from plugins.memory.holographic import HolographicMemoryProvider
|
||||
src = inspect.getsource(HolographicMemoryProvider.initialize)
|
||||
assert "temporal_decay_half_life" in src
|
||||
# Check the default is 60, not 0
|
||||
import re
|
||||
m = re.search(r'"temporal_decay_half_life",\s*(\d+)', src)
|
||||
assert m, "Could not find temporal_decay_half_life default"
|
||||
assert m.group(1) == "60", f"Default is {m.group(1)}, expected 60"
|
||||
@@ -1,178 +0,0 @@
|
||||
"""Tests for cross-tier memory deduplication.
|
||||
|
||||
Tests the dedup module's normalize, similarity, scan, resolve, and
|
||||
is_duplicate_before_add functions.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add the plugins path so we can import dedup
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "plugins", "memory", "holographic"))
|
||||
|
||||
from dedup import (
|
||||
_normalize,
|
||||
_similarity,
|
||||
scan_cross_tier_duplicates,
|
||||
resolve_duplicates,
|
||||
is_duplicate_before_add,
|
||||
classify_tier,
|
||||
DedupReport,
|
||||
DuplicatePair,
|
||||
)
|
||||
|
||||
|
||||
class TestNormalize:
|
||||
def test_basic_lowercasing(self):
|
||||
assert _normalize("Hello World") == "hello world"
|
||||
|
||||
def test_strips_markdown_bullets(self):
|
||||
assert _normalize("- some fact") == "some fact"
|
||||
assert _normalize("* some fact") == "some fact"
|
||||
assert _normalize(" - some fact ") == "some fact"
|
||||
|
||||
def test_strips_trailing_punctuation(self):
|
||||
assert _normalize("some fact.") == "some fact"
|
||||
assert _normalize("some fact,") == "some fact"
|
||||
assert _normalize("some fact;") == "some fact"
|
||||
|
||||
def test_collapses_whitespace(self):
|
||||
assert _normalize("some fact here") == "some fact here"
|
||||
|
||||
def test_empty_and_short(self):
|
||||
assert _normalize("") == ""
|
||||
assert _normalize(" ") == ""
|
||||
assert _normalize("abc") == "abc"
|
||||
|
||||
|
||||
class TestSimilarity:
|
||||
def test_identical_strings(self):
|
||||
assert _similarity("hello world", "hello world") == 1.0
|
||||
|
||||
def test_completely_different(self):
|
||||
assert _similarity("abc", "xyz") < 0.3
|
||||
|
||||
def test_similar_rephrasing(self):
|
||||
sim = _similarity("deploy via ansible", "deploy with ansible")
|
||||
assert sim > 0.7
|
||||
|
||||
def test_empty_strings(self):
|
||||
assert _similarity("", "hello") == 0.0
|
||||
assert _similarity("hello", "") == 0.0
|
||||
assert _similarity("", "") == 0.0
|
||||
|
||||
|
||||
class TestScanCrossTierDuplicates:
|
||||
def test_no_duplicates(self):
|
||||
memory = ["Deploy via Ansible", "Use mimo-v2-pro model"]
|
||||
facts = [
|
||||
{"fact_id": 1, "content": "User prefers dark mode"},
|
||||
{"fact_id": 2, "content": "Project uses Python 3.11"},
|
||||
]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
assert report.duplicates_found == 0
|
||||
assert len(report.pairs) == 0
|
||||
|
||||
def test_exact_duplicate(self):
|
||||
memory = ["Deploy via Ansible"]
|
||||
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
assert report.duplicates_found == 1
|
||||
assert report.pairs[0].similarity == 1.0
|
||||
assert report.pairs[0].fact_store_id == 1
|
||||
|
||||
def test_near_duplicate_above_threshold(self):
|
||||
memory = ["Alexander prefers action over narration"]
|
||||
facts = [{"fact_id": 1, "content": "Alexander prefers action over narration."}]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
assert report.duplicates_found == 1
|
||||
|
||||
def test_below_threshold_not_duplicate(self):
|
||||
memory = ["Deploy via Ansible on VPS"]
|
||||
facts = [{"fact_id": 1, "content": "Deploy via Docker on local machine"}]
|
||||
report = scan_cross_tier_duplicates(memory, facts, threshold=0.85)
|
||||
assert report.duplicates_found == 0
|
||||
|
||||
def test_short_entries_skipped(self):
|
||||
memory = ["OK", "ab"]
|
||||
facts = [{"fact_id": 1, "content": "OK"}]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
assert report.duplicates_found == 0
|
||||
|
||||
def test_multiple_duplicates(self):
|
||||
memory = ["Fact A here", "Fact B here"]
|
||||
facts = [
|
||||
{"fact_id": 1, "content": "Fact A here"},
|
||||
{"fact_id": 2, "content": "Fact B here"},
|
||||
]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
assert report.duplicates_found == 2
|
||||
|
||||
def test_report_summary(self):
|
||||
memory = ["Deploy via Ansible"]
|
||||
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
summary = report.summary()
|
||||
assert "1 MEMORY.md entries" in summary
|
||||
assert "1 fact store entries" in summary
|
||||
assert "1 duplicates" in summary
|
||||
|
||||
|
||||
class TestResolveDuplicates:
|
||||
def test_removes_memory_duplicates(self):
|
||||
memory = ["Deploy via Ansible", "Use Python 3.11"]
|
||||
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
cleaned = resolve_duplicates(report, memory)
|
||||
assert len(cleaned) == 1
|
||||
assert cleaned[0] == "Use Python 3.11"
|
||||
|
||||
def test_no_duplicates_returns_same(self):
|
||||
memory = ["Deploy via Ansible", "Use Python 3.11"]
|
||||
facts = [{"fact_id": 1, "content": "Completely different fact"}]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
cleaned = resolve_duplicates(report, memory)
|
||||
assert len(cleaned) == 2
|
||||
|
||||
|
||||
class TestIsDuplicateBeforeAdd:
|
||||
def test_finds_duplicate(self):
|
||||
existing = [{"fact_id": 1, "content": "Deploy via Ansible"}]
|
||||
result = is_duplicate_before_add("Deploy via Ansible", existing)
|
||||
assert result is not None
|
||||
assert result["fact_id"] == 1
|
||||
|
||||
def test_no_duplicate_returns_none(self):
|
||||
existing = [{"fact_id": 1, "content": "Use dark mode"}]
|
||||
result = is_duplicate_before_add("Deploy via Ansible", existing)
|
||||
assert result is None
|
||||
|
||||
def test_short_content_returns_none(self):
|
||||
existing = [{"fact_id": 1, "content": "OK"}]
|
||||
result = is_duplicate_before_add("OK", existing)
|
||||
assert result is None
|
||||
|
||||
def test_empty_existing_returns_none(self):
|
||||
result = is_duplicate_before_add("Some fact here", [])
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestClassifyTier:
|
||||
def test_user_pref_goes_to_factstore(self):
|
||||
assert classify_tier("anything", "user_pref") == "factstore"
|
||||
|
||||
def test_project_goes_to_factstore(self):
|
||||
assert classify_tier("anything", "project") == "factstore"
|
||||
|
||||
def test_short_operational_note_goes_to_memory(self):
|
||||
assert classify_tier("remember: always use sudo") == "memory"
|
||||
assert classify_tier("todo: fix the deploy script") == "memory"
|
||||
|
||||
def test_long_fact_goes_to_factstore(self):
|
||||
long_fact = "The deployment process requires running ansible-playbook with the production inventory file and verifying health checks after completion"
|
||||
assert classify_tier(long_fact) == "factstore"
|
||||
|
||||
def test_general_short_goes_to_factstore(self):
|
||||
# Short but not operational
|
||||
assert classify_tier("user likes dark mode") == "factstore"
|
||||
@@ -137,78 +137,3 @@ class TestBackwardCompat:
|
||||
def test_tool_to_toolset_map(self):
|
||||
assert isinstance(TOOL_TO_TOOLSET_MAP, dict)
|
||||
assert len(TOOL_TO_TOOLSET_MAP) > 0
|
||||
|
||||
|
||||
class TestToolReturnTypeValidation:
|
||||
"""Poka-yoke: tool handlers must return JSON strings."""
|
||||
|
||||
def test_handler_returning_dict_is_wrapped(self, monkeypatch):
|
||||
"""A handler that returns a dict should be auto-wrapped to JSON string."""
|
||||
from tools.registry import registry
|
||||
from model_tools import handle_function_call
|
||||
import json
|
||||
|
||||
# Register a bad handler that returns dict instead of str
|
||||
registry.register(
|
||||
name="__test_bad_dict",
|
||||
toolset="test",
|
||||
schema={"name": "__test_bad_dict", "description": "test", "parameters": {"type": "object", "properties": {}}},
|
||||
handler=lambda args, **kw: {"this is": "a dict not a string"},
|
||||
)
|
||||
result = handle_function_call("__test_bad_dict", {})
|
||||
parsed = json.loads(result)
|
||||
assert "output" in parsed
|
||||
assert "_type_warning" in parsed
|
||||
# Cleanup
|
||||
registry._tools.pop("__test_bad_dict", None)
|
||||
|
||||
def test_handler_returning_none_is_wrapped(self, monkeypatch):
|
||||
"""A handler that returns None should be auto-wrapped."""
|
||||
from tools.registry import registry
|
||||
from model_tools import handle_function_call
|
||||
import json
|
||||
|
||||
registry.register(
|
||||
name="__test_bad_none",
|
||||
toolset="test",
|
||||
schema={"name": "__test_bad_none", "description": "test", "parameters": {"type": "object", "properties": {}}},
|
||||
handler=lambda args, **kw: None,
|
||||
)
|
||||
result = handle_function_call("__test_bad_none", {})
|
||||
parsed = json.loads(result)
|
||||
assert "_type_warning" in parsed
|
||||
registry._tools.pop("__test_bad_none", None)
|
||||
|
||||
def test_handler_returning_non_json_string_is_wrapped(self):
|
||||
"""A handler returning a plain string (not JSON) should be wrapped."""
|
||||
from tools.registry import registry
|
||||
from model_tools import handle_function_call
|
||||
import json
|
||||
|
||||
registry.register(
|
||||
name="__test_bad_plain",
|
||||
toolset="test",
|
||||
schema={"name": "__test_bad_plain", "description": "test", "parameters": {"type": "object", "properties": {}}},
|
||||
handler=lambda args, **kw: "just a plain string, not json",
|
||||
)
|
||||
result = handle_function_call("__test_bad_plain", {})
|
||||
parsed = json.loads(result)
|
||||
assert "output" in parsed
|
||||
registry._tools.pop("__test_bad_plain", None)
|
||||
|
||||
def test_handler_returning_valid_json_passes_through(self):
|
||||
"""A handler returning valid JSON string passes through unchanged."""
|
||||
from tools.registry import registry
|
||||
from model_tools import handle_function_call
|
||||
import json
|
||||
|
||||
registry.register(
|
||||
name="__test_good",
|
||||
toolset="test",
|
||||
schema={"name": "__test_good", "description": "test", "parameters": {"type": "object", "properties": {}}},
|
||||
handler=lambda args, **kw: json.dumps({"status": "ok", "data": [1, 2, 3]}),
|
||||
)
|
||||
result = handle_function_call("__test_good", {})
|
||||
parsed = json.loads(result)
|
||||
assert parsed == {"status": "ok", "data": [1, 2, 3]}
|
||||
registry._tools.pop("__test_good", None)
|
||||
|
||||
@@ -144,8 +144,7 @@ class TestMemoryStoreReplace:
|
||||
def test_replace_no_match(self, store):
|
||||
store.add("memory", "fact A")
|
||||
result = store.replace("memory", "nonexistent", "new")
|
||||
assert result["success"] is True
|
||||
assert result["result"] == "no_match"
|
||||
assert result["success"] is False
|
||||
|
||||
def test_replace_ambiguous_match(self, store):
|
||||
store.add("memory", "server A runs nginx")
|
||||
@@ -178,8 +177,7 @@ class TestMemoryStoreRemove:
|
||||
|
||||
def test_remove_no_match(self, store):
|
||||
result = store.remove("memory", "nonexistent")
|
||||
assert result["success"] is True
|
||||
assert result["result"] == "no_match"
|
||||
assert result["success"] is False
|
||||
|
||||
def test_remove_empty_old_text(self, store):
|
||||
result = store.remove("memory", " ")
|
||||
|
||||
@@ -260,12 +260,8 @@ class MemoryStore:
|
||||
entries = self._entries_for(target)
|
||||
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
|
||||
|
||||
if not matches:
|
||||
return {
|
||||
"success": True,
|
||||
"result": "no_match",
|
||||
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
|
||||
}
|
||||
if len(matches) == 0:
|
||||
return {"success": False, "error": f"No entry matched '{old_text}'."}
|
||||
|
||||
if len(matches) > 1:
|
||||
# If all matches are identical (exact duplicates), operate on the first one
|
||||
@@ -314,12 +310,8 @@ class MemoryStore:
|
||||
entries = self._entries_for(target)
|
||||
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
|
||||
|
||||
if not matches:
|
||||
return {
|
||||
"success": True,
|
||||
"result": "no_match",
|
||||
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
|
||||
}
|
||||
if len(matches) == 0:
|
||||
return {"success": False, "error": f"No entry matched '{old_text}'."}
|
||||
|
||||
if len(matches) > 1:
|
||||
# If all matches are identical (exact duplicates), remove the first one
|
||||
@@ -457,30 +449,30 @@ def memory_tool(
|
||||
Returns JSON string with results.
|
||||
"""
|
||||
if store is None:
|
||||
return tool_error("Memory is not available. It may be disabled in config or this environment.", success=False)
|
||||
return json.dumps({"success": False, "error": "Memory is not available. It may be disabled in config or this environment."}, ensure_ascii=False)
|
||||
|
||||
if target not in ("memory", "user"):
|
||||
return tool_error(f"Invalid target '{target}'. Use 'memory' or 'user'.", success=False)
|
||||
return json.dumps({"success": False, "error": f"Invalid target '{target}'. Use 'memory' or 'user'."}, ensure_ascii=False)
|
||||
|
||||
if action == "add":
|
||||
if not content:
|
||||
return tool_error("Content is required for 'add' action.", success=False)
|
||||
return json.dumps({"success": False, "error": "Content is required for 'add' action."}, ensure_ascii=False)
|
||||
result = store.add(target, content)
|
||||
|
||||
elif action == "replace":
|
||||
if not old_text:
|
||||
return tool_error("old_text is required for 'replace' action.", success=False)
|
||||
return json.dumps({"success": False, "error": "old_text is required for 'replace' action."}, ensure_ascii=False)
|
||||
if not content:
|
||||
return tool_error("content is required for 'replace' action.", success=False)
|
||||
return json.dumps({"success": False, "error": "content is required for 'replace' action."}, ensure_ascii=False)
|
||||
result = store.replace(target, old_text, content)
|
||||
|
||||
elif action == "remove":
|
||||
if not old_text:
|
||||
return tool_error("old_text is required for 'remove' action.", success=False)
|
||||
return json.dumps({"success": False, "error": "old_text is required for 'remove' action."}, ensure_ascii=False)
|
||||
result = store.remove(target, old_text)
|
||||
|
||||
else:
|
||||
return tool_error(f"Unknown action '{action}'. Use: add, replace, remove", success=False)
|
||||
return json.dumps({"success": False, "error": f"Unknown action '{action}'. Use: add, replace, remove"}, ensure_ascii=False)
|
||||
|
||||
return json.dumps(result, ensure_ascii=False)
|
||||
|
||||
@@ -547,7 +539,7 @@ MEMORY_SCHEMA = {
|
||||
|
||||
|
||||
# --- Registry ---
|
||||
from tools.registry import registry, tool_error
|
||||
from tools.registry import registry
|
||||
|
||||
registry.register(
|
||||
name="memory",
|
||||
|
||||
Reference in New Issue
Block a user