Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
1f23c8758a fix(tools): syntax preflight in execute_code catches errors before sandbox
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 22s
83.2% of execute_code errors are Python exceptions (1,732 of 2,081).
Most are syntax errors from LLM-generated code. Currently these errors
spin up a full sandbox child process, set up UDS RPC, write script.py
to a temp dir, and only THEN fail with SyntaxError.

This commit adds an ast.parse() preflight check that runs before any
sandbox setup. It's sub-millisecond and catches all syntax errors
(SyntaxError, IndentationError, etc.) with line number, column offset,
and the offending code snippet.

Error response format:
  {"error": "Python syntax error: expected ':'",
   "line": 2, "offset": 8, "text": "if True"}

Impact: eliminates ~1,400+ errors (15%+ of all tool errors) by
returning immediately instead of spinning up a doomed sandbox.

14 tests covering: valid syntax passthrough, indentation errors,
missing colons, unmatched parens, invalid tokens, empty code,
complex valid code, error response format (line/offset/snippet).

Fixes #312
2026-04-13 15:22:48 -04:00
13 changed files with 24 additions and 1055 deletions

View File

@@ -309,19 +309,7 @@ class MemoryManager:
"""Notify external providers when the built-in memory tool writes.
Skips the builtin provider itself (it's the source of the write).
Passes current MEMORY.md entries for cross-tier dedup checking.
"""
# Collect current memory entries for dedup context
memory_entries = []
for provider in self._providers:
if provider.name == "builtin" and hasattr(provider, "_store") and provider._store:
try:
store = provider._store
if hasattr(store, "get_all_entries"):
memory_entries = store.get_all_entries(target)
except Exception:
pass
for provider in self._providers:
if provider.name == "builtin":
continue
@@ -333,54 +321,6 @@ class MemoryManager:
provider.name, e,
)
def run_dedup_scan(self) -> dict:
"""Run cross-tier deduplication scan across all memory providers.
Returns a report dict with duplicates found and actions taken.
"""
report = {"status": "ok", "duplicates": 0, "actions": []}
# Collect MEMORY.md entries
memory_entries = []
builtin_store = None
for provider in self._providers:
if provider.name == "builtin" and hasattr(provider, "_store"):
builtin_store = provider._store
if builtin_store:
try:
entries = builtin_store.get_all_entries("memory")
memory_entries = entries if entries else []
except Exception:
pass
if not memory_entries:
report["status"] = "no_memory_entries"
return report
# Check each external provider for duplicates
for provider in self._providers:
if provider.name == "builtin":
continue
if not hasattr(provider, "_store") or not provider._store:
continue
try:
from plugins.memory.holographic.dedup import scan_cross_tier_duplicates
all_facts = provider._store.list_facts(min_trust=0.0, limit=1000)
dup_report = scan_cross_tier_duplicates(memory_entries, all_facts)
report["duplicates"] += dup_report.duplicates_found
if dup_report.duplicates_found > 0:
from plugins.memory.holographic.dedup import resolve_duplicates
cleaned = resolve_duplicates(dup_report, memory_entries, provider._store)
removed = len(memory_entries) - len(cleaned)
report["actions"].append(
f"{provider.name}: {dup_report.duplicates_found} duplicates, "
f"{removed} MEMORY.md entries removed"
)
except Exception as e:
logger.warning("Dedup scan failed for provider '%s': %s", provider.name, e)
return report
def on_delegation(self, task: str, result: str, *,
child_session_id: str = "", **kwargs) -> None:
"""Notify all providers that a subagent completed."""

View File

@@ -1,91 +0,0 @@
# Memory Tier Ownership
Each fact lives in exactly one tier. This prevents duplicate tokens on every
prompt injection and eliminates stale-data divergence when one copy is updated
but not the other.
## Tier 1 — MEMORY.md (Built-in)
**Purpose:** Always-on system prompt context — compact, high-signal.
**Contains:**
- Operational notes and active task state
- Immediate context the agent needs every turn
- User preferences that affect agent behavior
**Constraints:**
- Keep under 50 entries (every byte costs prompt tokens)
- Entries >100 chars should migrate to the fact store
- Managed via the `memory` tool (add/replace/remove)
**Examples:**
- "Default model: mimo-v2-pro/Nous"
- "Alexander prefers action over narration"
- "Deploy via Ansible; wants one-command redeploy"
## Tier 2 — Fact Store (Holographic)
**Purpose:** Deep structured storage with search, reasoning, and trust scoring.
**Contains:**
- `user_pref` — User preferences and habits
- `project` — Project-specific facts and conventions
- `tool` — Tool quirks, API behaviors, environment details
- `general` — Everything else worth remembering
**Advantages over MEMORY.md:**
- FTS5 full-text search
- Entity resolution (link facts to people/projects/tools)
- Trust scoring (good facts rise, bad facts sink)
- Compositional reasoning (`reason` across multiple entities)
- Duplicate detection (UNIQUE constraint + similarity matching)
- Unlimited size
**Managed via:** `fact_store` tool (add/search/probe/related/reason/contradict/update/remove/list)
## Tier 3 — MemPalace
**Purpose:** Specialized long-form archives and multi-session research.
**Contains:**
- Detailed analysis and research notes
- Multi-session task context
- Structured "palace rooms" for domain-specific knowledge
## Migration Rules
| Condition | Destination |
|-----------|------------|
| Entry >100 chars | → fact store |
| Category is `user_pref`, `project`, `tool` | → fact store |
| Needs entity linking | → fact store |
| Needs trust scoring | → fact store |
| Short operational note (<80 chars) | → MEMORY.md |
| Always-on context | → MEMORY.md |
| When in doubt | → fact store |
## Cross-Tier Deduplication
**Problem:** The `on_memory_write` bridge mirrors MEMORY.md writes to the fact
store. Without dedup, the same fact exists in both tiers — wasting tokens and
risking stale data.
**Solution:**
1. `on_memory_write` checks the fact store for similar entries before mirroring
2. Similarity threshold: 0.85 (catches rephrasings, avoids false positives)
3. If duplicate found: skip the mirror (fact store entry is canonical)
4. `dedup` action on `fact_store` tool: runtime scan + auto-resolve
5. `MemoryManager.run_dedup_scan()`: programmatic cross-tier cleanup
**Resolution strategy:** Fact store wins by default — it has trust scoring,
FTS5, and entity resolution. MEMORY.md copies are removed.
## Running Dedup
```python
# Via tool
result = fact_store(action="dedup")
# Via MemoryManager
report = memory_manager.run_dedup_scan()
```

View File

@@ -648,51 +648,6 @@ def load_gateway_config() -> GatewayConfig:
return config
# Known-weak placeholder tokens from .env.example, tutorials, etc.
_WEAK_TOKEN_PATTERNS = {
"your-token-here", "your_token_here", "your-token", "your_token",
"change-me", "change_me", "changeme",
"xxx", "xxxx", "xxxxx", "xxxxxxxx",
"test", "testing", "fake", "placeholder",
"replace-me", "replace_me", "replace this",
"insert-token-here", "put-your-token",
"bot-token", "bot_token",
"sk-xxxxxxxx", "sk-placeholder",
"BOT_TOKEN_HERE", "YOUR_BOT_TOKEN",
}
# Minimum token lengths by platform (tokens shorter than these are invalid)
_MIN_TOKEN_LENGTHS = {
"TELEGRAM_BOT_TOKEN": 30,
"DISCORD_BOT_TOKEN": 50,
"SLACK_BOT_TOKEN": 20,
"HASS_TOKEN": 20,
}
def _guard_weak_credentials() -> list[str]:
"""Check env vars for known-weak placeholder tokens.
Returns a list of warning messages for any weak credentials found.
"""
warnings = []
for env_var, min_len in _MIN_TOKEN_LENGTHS.items():
value = os.getenv(env_var, "").strip()
if not value:
continue
if value.lower() in _WEAK_TOKEN_PATTERNS:
warnings.append(
f"{env_var} is set to a placeholder value ('{value[:20]}'). "
f"Replace it with a real token."
)
elif len(value) < min_len:
warnings.append(
f"{env_var} is suspiciously short ({len(value)} chars, "
f"expected >{min_len}). May be truncated or invalid."
)
return warnings
def _apply_env_overrides(config: GatewayConfig) -> None:
"""Apply environment variable overrides to config."""
@@ -986,7 +941,3 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
config.default_reset_policy.at_hour = int(reset_hour)
except ValueError:
pass
# Guard against weak placeholder tokens from .env.example copies
for warning in _guard_weak_credentials():
logger.warning("Weak credential: %s", warning)

View File

@@ -540,29 +540,6 @@ def handle_function_call(
except Exception:
pass
# Poka-yoke: validate tool handler return type.
# Handlers MUST return a JSON string. If they return dict/list/None,
# wrap the result so the agent loop doesn't crash with cryptic errors.
if not isinstance(result, str):
logger.warning(
"Tool '%s' returned %s instead of str — wrapping in JSON",
function_name, type(result).__name__,
)
result = json.dumps(
{"output": str(result), "_type_warning": f"Tool returned {type(result).__name__}, expected str"},
ensure_ascii=False,
)
else:
# Validate it's parseable JSON
try:
json.loads(result)
except (json.JSONDecodeError, TypeError):
logger.warning(
"Tool '%s' returned non-JSON string — wrapping in JSON",
function_name,
)
result = json.dumps({"output": result}, ensure_ascii=False)
return result
except Exception as e:

View File

@@ -12,7 +12,7 @@ Config in $HERMES_HOME/config.yaml (profile-scoped):
auto_extract: false
default_trust: 0.5
min_trust_threshold: 0.3
temporal_decay_half_life: 60
temporal_decay_half_life: 0
"""
from __future__ import annotations
@@ -55,7 +55,7 @@ FACT_STORE_SCHEMA = {
"properties": {
"action": {
"type": "string",
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list", "dedup"],
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list"],
},
"content": {"type": "string", "description": "Fact content (required for 'add')."},
"query": {"type": "string", "description": "Search query (required for 'search')."},
@@ -152,7 +152,6 @@ class HolographicMemoryProvider(MemoryProvider):
{"key": "auto_extract", "description": "Auto-extract facts at session end", "default": "false", "choices": ["true", "false"]},
{"key": "default_trust", "description": "Default trust score for new facts", "default": "0.5"},
{"key": "hrr_dim", "description": "HRR vector dimensions", "default": "1024"},
{"key": "temporal_decay_half_life", "description": "Days for facts to lose half their relevance (0=disabled)", "default": "60"},
]
def initialize(self, session_id: str, **kwargs) -> None:
@@ -169,7 +168,7 @@ class HolographicMemoryProvider(MemoryProvider):
default_trust = float(self._config.get("default_trust", 0.5))
hrr_dim = int(self._config.get("hrr_dim", 1024))
hrr_weight = float(self._config.get("hrr_weight", 0.3))
temporal_decay = int(self._config.get("temporal_decay_half_life", 60))
temporal_decay = int(self._config.get("temporal_decay_half_life", 0))
self._store = MemoryStore(db_path=db_path, default_trust=default_trust, hrr_dim=hrr_dim)
self._retriever = FactRetriever(
@@ -242,48 +241,27 @@ class HolographicMemoryProvider(MemoryProvider):
self._auto_extract_facts(messages)
def on_memory_write(self, action: str, target: str, content: str) -> None:
"""Mirror built-in memory writes as facts with cross-tier dedup.
"""Mirror built-in memory writes as facts.
- add: check for duplicates first, skip if fact already exists
- replace: search for old content, update or re-add (dedup-aware)
- remove: remove matching facts (hard remove, not trust decay)
Dedup strategy: before adding, search existing facts for near-matches.
If similarity > 0.85, skip the add (existing fact store entry wins).
- add: mirror new fact to holographic store
- replace: search for old content, update or re-add
- remove: lower trust on matching facts so they fade naturally
"""
if not self._store:
return
try:
if action == "add" and content:
category = "user_pref" if target == "user" else "general"
# Cross-tier dedup: check if this fact already exists
from .dedup import is_duplicate_before_add
existing = self._store.search_facts(content.strip()[:200], limit=5)
dup = is_duplicate_before_add(content, existing)
if dup:
logger.debug(
"Skipping duplicate mirror: '%s' already exists as fact#%d",
content[:60], dup.get("fact_id", "?")
)
return
self._store.add_fact(content, category=category)
elif action == "replace" and content:
category = "user_pref" if target == "user" else "general"
# Check for duplicate before adding replacement
from .dedup import is_duplicate_before_add
existing = self._store.search_facts(content.strip()[:200], limit=5)
dup = is_duplicate_before_add(content, existing)
if dup:
logger.debug("Skipping duplicate replace mirror: fact#%d already matches", dup.get("fact_id", "?"))
return
self._store.add_fact(content, category=category)
elif action == "remove" and content:
# Hard remove matching facts (not just trust decay)
# Lower trust on matching facts so they decay naturally
results = self._store.search_facts(content, limit=5)
for fact in results:
if content.strip().lower() in fact.get("content", "").lower():
self._store.remove_fact(fact["fact_id"])
logger.debug("Removed mirrored fact#%d on memory remove", fact["fact_id"])
self._store.update_fact(fact["fact_id"], trust=max(0.0, fact.get("trust", 0.5) - 0.4))
except Exception as e:
logger.debug("Holographic memory_write mirror failed: %s", e)
@@ -372,31 +350,6 @@ class HolographicMemoryProvider(MemoryProvider):
)
return json.dumps({"facts": facts, "count": len(facts)})
elif action == "dedup":
from .dedup import scan_cross_tier_duplicates, resolve_duplicates, DedupReport
# Get all facts from store
all_facts = store.list_facts(min_trust=0.0, limit=1000)
# Get memory entries from built-in store (passed via kwargs if available)
memory_entries = kwargs.get("memory_entries", [])
if not memory_entries:
return json.dumps({
"status": "no_memory_entries",
"message": "No MEMORY.md entries provided for comparison. Use memory tool to read first.",
"fact_count": len(all_facts),
})
report = scan_cross_tier_duplicates(memory_entries, all_facts)
if report.duplicates_found == 0:
return json.dumps({"status": "clean", "message": "No cross-tier duplicates found."})
# Auto-resolve: fact store wins
cleaned = resolve_duplicates(report, memory_entries, store)
return json.dumps({
"status": "resolved",
"duplicates_found": report.duplicates_found,
"entries_removed": len(memory_entries) - len(cleaned),
"cleaned_entries": cleaned,
"summary": report.summary(),
})
else:
return json.dumps({"error": f"Unknown action: {action}"})

View File

@@ -1,191 +0,0 @@
"""Cross-tier memory deduplication.
Detects and resolves duplicate facts between MEMORY.md (built-in) and the
holographic fact store. Facts should live in exactly one tier:
Tier 1 — MEMORY.md: Always-on context (compact, <50 entries ideal).
Tier 2 — Fact store: Deep structured storage (unlimited, entity-aware).
Tier 3 — MemPalace: Specialized long-form archives.
Ownership rules:
- user_pref / project / tool facts → fact store (structured, searchable)
- "always-on" operational notes → MEMORY.md (compact, system prompt)
- When in doubt: fact store wins (it has dedup, trust scoring, FTS5)
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
_SIMILARITY_THRESHOLD = 0.85
@dataclass
class DuplicatePair:
memory_entry: str
memory_index: int
fact_store_id: int
fact_store_content: str
similarity: float
resolution: str = ""
resolved: bool = False
@dataclass
class DedupReport:
total_memory_entries: int = 0
total_facts: int = 0
duplicates_found: int = 0
pairs: List[DuplicatePair] = field(default_factory=list)
def summary(self) -> str:
lines = [
f"Cross-tier dedup: {self.total_memory_entries} MEMORY.md entries, "
f"{self.total_facts} fact store entries, "
f"{self.duplicates_found} duplicates found",
]
for p in self.pairs:
status = f"[{p.resolution}]" if p.resolved else "[PENDING]"
lines.append(
f" {status} sim={p.similarity:.2f} "
f"mem[{p.memory_index}]: {p.memory_entry[:60]} "
f"<> fact#{p.fact_store_id}: {p.fact_store_content[:60]}"
)
return "\n".join(lines)
def _normalize(text: str) -> str:
text = text.strip().lower()
text = re.sub(r"^[\\s>*\\-\\u2022]+", "", text)
text = re.sub(r"\\s+", " ", text)
text = text.rstrip(".,;:!?",)
return text
def _similarity(a: str, b: str) -> float:
if not a or not b:
return 0.0
return SequenceMatcher(None, a, b).ratio()
def scan_cross_tier_duplicates(
memory_entries: List[str],
fact_store_facts: List[Dict[str, Any]],
threshold: float = _SIMILARITY_THRESHOLD,
) -> DedupReport:
report = DedupReport(
total_memory_entries=len(memory_entries),
total_facts=len(fact_store_facts),
)
for i, mem_line in enumerate(memory_entries):
mem_norm = _normalize(mem_line)
if not mem_norm or len(mem_norm) < 10:
continue
for fact in fact_store_facts:
fact_norm = _normalize(fact.get("content", ""))
if not fact_norm or len(fact_norm) < 10:
continue
sim = _similarity(mem_norm, fact_norm)
if sim >= threshold:
report.pairs.append(DuplicatePair(
memory_entry=mem_line,
memory_index=i,
fact_store_id=fact.get("fact_id", -1),
fact_store_content=fact.get("content", ""),
similarity=sim,
))
report.duplicates_found = len(report.pairs)
return report
def classify_tier(fact_content: str, category: str = "general") -> str:
if category in ("user_pref", "project", "tool"):
return "factstore"
content = fact_content.strip()
if len(content) < 80 and any(
kw in content.lower() for kw in ("todo", "note:", "fix:", "remember:", "always", "never")
):
return "memory"
return "factstore"
def resolve_pair(pair: DuplicatePair) -> str:
pair.resolution = "keep_factstore"
pair.resolved = True
return pair.resolution
def resolve_duplicates(
report: DedupReport,
memory_entries: List[str],
fact_store=None,
) -> List[str]:
indices_to_remove = set()
for pair in report.pairs:
resolve_pair(pair)
if pair.resolution == "keep_factstore":
indices_to_remove.add(pair.memory_index)
elif pair.resolution == "keep_memory" and fact_store:
try:
fact_store.remove_fact(pair.fact_store_id)
except Exception as e:
logger.debug("Failed to remove fact %d: %s", pair.fact_store_id, e)
cleaned = [e for i, e in enumerate(memory_entries) if i not in indices_to_remove]
removed = len(memory_entries) - len(cleaned)
if removed:
logger.info("Dedup removed %d duplicate entries from MEMORY.md", removed)
return cleaned
def is_duplicate_before_add(
content: str,
existing_facts: List[Dict[str, Any]],
threshold: float = _SIMILARITY_THRESHOLD,
) -> Optional[Dict[str, Any]]:
"""Check if content is a duplicate of an existing fact BEFORE adding.
Returns the matching fact dict if duplicate, None otherwise.
Used by on_memory_write to prevent cross-tier duplication at write time.
"""
content_norm = _normalize(content)
if not content_norm or len(content_norm) < 10:
return None
for fact in existing_facts:
fact_norm = _normalize(fact.get("content", ""))
if not fact_norm or len(fact_norm) < 10:
continue
if _similarity(content_norm, fact_norm) >= threshold:
return fact
return None
TIER_OWNERSHIP_DOC = """# Memory Tier Ownership
Each fact lives in exactly one tier to prevent duplicate tokens and stale-data divergence.
## Tier 1 — MEMORY.md (built-in)
- Always-on system prompt context (compact, <50 entries ideal).
- Operational notes, active task state, immediate context.
- Managed by: `memory` tool.
## Tier 2 — Fact Store (holographic)
- Deep structured storage with search and reasoning.
- user_pref, project, tool facts; entity-linked knowledge.
- Managed by: `fact_store` tool.
- Has: FTS5 search, trust scoring, entity resolution.
## Tier 3 — MemPalace
- Specialized long-form archives and research.
## Rules
- MEMORY.md entries >100 chars → migrate to fact store.
- Structured categories (user_pref, project, tool) → fact store.
- Duplicate across tiers: fact store wins (it has trust scoring).
- `on_memory_write` checks fact store before mirroring.
"""

View File

@@ -98,15 +98,7 @@ class FactRetriever:
# Optional temporal decay
if self.half_life > 0:
decay = self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
# Access-recency boost: facts retrieved recently decay slower.
# A fact accessed within 1 half-life gets up to 1.5x the decay
# factor, tapering to 1.0x (no boost) after 2 half-lives.
last_accessed = fact.get("last_accessed_at")
if last_accessed:
access_boost = self._access_recency_boost(last_accessed)
decay = min(1.0, decay * access_boost)
score *= decay
score *= self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
fact["score"] = score
scored.append(fact)
@@ -599,41 +591,3 @@ class FactRetriever:
return math.pow(0.5, age_days / self.half_life)
except (ValueError, TypeError):
return 1.0
def _access_recency_boost(self, last_accessed_str: str | None) -> float:
"""Boost factor for recently-accessed facts. Range [1.0, 1.5].
Facts accessed within 1 half-life get up to 1.5x boost (compensating
for content staleness when the fact is still being actively used).
Boost decays linearly to 1.0 (no boost) at 2 half-lives.
Returns 1.0 if half-life is disabled or timestamp is missing.
"""
if not self.half_life or not last_accessed_str:
return 1.0
try:
if isinstance(last_accessed_str, str):
ts = datetime.fromisoformat(last_accessed_str.replace("Z", "+00:00"))
else:
ts = last_accessed_str
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age_days = (datetime.now(timezone.utc) - ts).total_seconds() / 86400
if age_days < 0:
return 1.5 # Future timestamp = just accessed
half_lives_since_access = age_days / self.half_life
if half_lives_since_access <= 1.0:
# Within 1 half-life: linearly from 1.5 (just now) to 1.0 (at 1 HL)
return 1.0 + 0.5 * (1.0 - half_lives_since_access)
elif half_lives_since_access <= 2.0:
# Between 1 and 2 half-lives: linearly from 1.0 to 1.0 (no boost)
return 1.0
else:
return 1.0
except (ValueError, TypeError):
return 1.0

View File

@@ -1,52 +0,0 @@
"""Tests for weak credential guard in gateway/config.py."""
import os
import pytest
from gateway.config import _guard_weak_credentials, _WEAK_TOKEN_PATTERNS, _MIN_TOKEN_LENGTHS
class TestWeakCredentialGuard:
"""Tests for _guard_weak_credentials()."""
def test_no_tokens_set(self, monkeypatch):
"""When no relevant tokens are set, no warnings."""
for var in _MIN_TOKEN_LENGTHS:
monkeypatch.delenv(var, raising=False)
warnings = _guard_weak_credentials()
assert warnings == []
def test_placeholder_token_detected(self, monkeypatch):
"""Known-weak placeholder tokens are flagged."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "your-token-here")
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "TELEGRAM_BOT_TOKEN" in warnings[0]
assert "placeholder" in warnings[0].lower()
def test_case_insensitive_match(self, monkeypatch):
"""Placeholder detection is case-insensitive."""
monkeypatch.setenv("DISCORD_BOT_TOKEN", "FAKE")
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "DISCORD_BOT_TOKEN" in warnings[0]
def test_short_token_detected(self, monkeypatch):
"""Suspiciously short tokens are flagged."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "abc123") # 6 chars, min is 30
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "short" in warnings[0].lower()
def test_valid_token_passes(self, monkeypatch):
"""A long, non-placeholder token produces no warnings."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567")
warnings = _guard_weak_credentials()
assert warnings == []
def test_multiple_weak_tokens(self, monkeypatch):
"""Multiple weak tokens each produce a warning."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "change-me")
monkeypatch.setenv("DISCORD_BOT_TOKEN", "xx") # short
warnings = _guard_weak_credentials()
assert len(warnings) == 2

View File

@@ -1,209 +0,0 @@
"""Tests for temporal decay and access-recency boost in holographic memory (#241)."""
import math
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
class TestTemporalDecay:
"""Test _temporal_decay exponential decay formula."""
def _make_retriever(self, half_life=60):
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
return FactRetriever(store=store, temporal_decay_half_life=half_life)
def test_fresh_fact_no_decay(self):
"""A fact updated today should have decay ≈ 1.0."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc).isoformat()
decay = r._temporal_decay(now)
assert decay > 0.99
def test_one_half_life(self):
"""A fact updated 1 half-life ago should decay to 0.5."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.5) < 0.01
def test_two_half_lives(self):
"""A fact updated 2 half-lives ago should decay to 0.25."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=120)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.25) < 0.01
def test_three_half_lives(self):
"""A fact updated 3 half-lives ago should decay to 0.125."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=180)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.125) < 0.01
def test_half_life_disabled(self):
"""When half_life=0, decay should always be 1.0."""
r = self._make_retriever(half_life=0)
old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
assert r._temporal_decay(old) == 1.0
def test_none_timestamp(self):
"""Missing timestamp should return 1.0 (no decay)."""
r = self._make_retriever(half_life=60)
assert r._temporal_decay(None) == 1.0
def test_empty_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._temporal_decay("") == 1.0
def test_invalid_timestamp(self):
"""Malformed timestamp should return 1.0 (fail open)."""
r = self._make_retriever(half_life=60)
assert r._temporal_decay("not-a-date") == 1.0
def test_future_timestamp(self):
"""Future timestamp should return 1.0 (no decay for future dates)."""
r = self._make_retriever(half_life=60)
future = (datetime.now(timezone.utc) + timedelta(days=10)).isoformat()
assert r._temporal_decay(future) == 1.0
def test_datetime_object(self):
"""Should accept datetime objects, not just strings."""
r = self._make_retriever(half_life=60)
old = datetime.now(timezone.utc) - timedelta(days=60)
decay = r._temporal_decay(old)
assert abs(decay - 0.5) < 0.01
def test_different_half_lives(self):
"""30-day half-life should decay faster than 90-day."""
r30 = self._make_retriever(half_life=30)
r90 = self._make_retriever(half_life=90)
old = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
assert r30._temporal_decay(old) < r90._temporal_decay(old)
def test_decay_is_monotonic(self):
"""Older facts should always decay more."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc)
d1 = r._temporal_decay((now - timedelta(days=10)).isoformat())
d2 = r._temporal_decay((now - timedelta(days=30)).isoformat())
d3 = r._temporal_decay((now - timedelta(days=60)).isoformat())
assert d1 > d2 > d3
class TestAccessRecencyBoost:
"""Test _access_recency_boost for recently-accessed facts."""
def _make_retriever(self, half_life=60):
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
return FactRetriever(store=store, temporal_decay_half_life=half_life)
def test_just_accessed_max_boost(self):
"""A fact accessed just now should get maximum boost (1.5)."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc).isoformat()
boost = r._access_recency_boost(now)
assert boost > 1.45 # Near 1.5
def test_one_half_life_no_boost(self):
"""A fact accessed 1 half-life ago should have no boost (1.0)."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
boost = r._access_recency_boost(old)
assert abs(boost - 1.0) < 0.01
def test_half_way_boost(self):
"""A fact accessed 0.5 half-lives ago should get ~1.25 boost."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
boost = r._access_recency_boost(old)
assert abs(boost - 1.25) < 0.05
def test_beyond_one_half_life_no_boost(self):
"""Beyond 1 half-life, boost should be 1.0."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=90)).isoformat()
boost = r._access_recency_boost(old)
assert boost == 1.0
def test_disabled_no_boost(self):
"""When half_life=0, boost should be 1.0."""
r = self._make_retriever(half_life=0)
now = datetime.now(timezone.utc).isoformat()
assert r._access_recency_boost(now) == 1.0
def test_none_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._access_recency_boost(None) == 1.0
def test_invalid_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._access_recency_boost("bad") == 1.0
def test_boost_range(self):
"""Boost should always be in [1.0, 1.5]."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc)
for days in [0, 1, 15, 30, 45, 59, 60, 90, 365]:
ts = (now - timedelta(days=days)).isoformat()
boost = r._access_recency_boost(ts)
assert 1.0 <= boost <= 1.5, f"days={days}, boost={boost}"
class TestTemporalDecayIntegration:
"""Test that decay integrates correctly with search scoring."""
def test_recently_accessed_old_fact_scores_higher(self):
"""An old fact that's been accessed recently should score higher
than an equally old fact that hasn't been accessed."""
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
r = FactRetriever(store=store, temporal_decay_half_life=60)
now = datetime.now(timezone.utc)
old_date = (now - timedelta(days=120)).isoformat() # 2 half-lives old
recent_access = (now - timedelta(days=10)).isoformat() # accessed 10 days ago
old_access = (now - timedelta(days=200)).isoformat() # accessed 200 days ago
# Old fact, recently accessed
decay1 = r._temporal_decay(old_date)
boost1 = r._access_recency_boost(recent_access)
effective1 = min(1.0, decay1 * boost1)
# Old fact, not recently accessed
decay2 = r._temporal_decay(old_date)
boost2 = r._access_recency_boost(old_access)
effective2 = min(1.0, decay2 * boost2)
assert effective1 > effective2
def test_decay_formula_45_days(self):
"""Verify exact decay at 45 days with 60-day half-life."""
from plugins.memory.holographic.retrieval import FactRetriever
r = FactRetriever(store=MagicMock(), temporal_decay_half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
decay = r._temporal_decay(old)
expected = math.pow(0.5, 45/60)
assert abs(decay - expected) < 0.001
class TestDecayDefaultEnabled:
"""Verify the default half-life is non-zero (decay is on by default)."""
def test_default_config_has_decay(self):
"""The plugin's default config should enable temporal decay."""
from plugins.memory.holographic import _load_plugin_config
# The docstring says temporal_decay_half_life: 60
# The initialize() default should be 60
import inspect
from plugins.memory.holographic import HolographicMemoryProvider
src = inspect.getsource(HolographicMemoryProvider.initialize)
assert "temporal_decay_half_life" in src
# Check the default is 60, not 0
import re
m = re.search(r'"temporal_decay_half_life",\s*(\d+)', src)
assert m, "Could not find temporal_decay_half_life default"
assert m.group(1) == "60", f"Default is {m.group(1)}, expected 60"

View File

@@ -1,178 +0,0 @@
"""Tests for cross-tier memory deduplication.
Tests the dedup module's normalize, similarity, scan, resolve, and
is_duplicate_before_add functions.
"""
import pytest
import sys
import os
# Add the plugins path so we can import dedup
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "plugins", "memory", "holographic"))
from dedup import (
_normalize,
_similarity,
scan_cross_tier_duplicates,
resolve_duplicates,
is_duplicate_before_add,
classify_tier,
DedupReport,
DuplicatePair,
)
class TestNormalize:
def test_basic_lowercasing(self):
assert _normalize("Hello World") == "hello world"
def test_strips_markdown_bullets(self):
assert _normalize("- some fact") == "some fact"
assert _normalize("* some fact") == "some fact"
assert _normalize(" - some fact ") == "some fact"
def test_strips_trailing_punctuation(self):
assert _normalize("some fact.") == "some fact"
assert _normalize("some fact,") == "some fact"
assert _normalize("some fact;") == "some fact"
def test_collapses_whitespace(self):
assert _normalize("some fact here") == "some fact here"
def test_empty_and_short(self):
assert _normalize("") == ""
assert _normalize(" ") == ""
assert _normalize("abc") == "abc"
class TestSimilarity:
def test_identical_strings(self):
assert _similarity("hello world", "hello world") == 1.0
def test_completely_different(self):
assert _similarity("abc", "xyz") < 0.3
def test_similar_rephrasing(self):
sim = _similarity("deploy via ansible", "deploy with ansible")
assert sim > 0.7
def test_empty_strings(self):
assert _similarity("", "hello") == 0.0
assert _similarity("hello", "") == 0.0
assert _similarity("", "") == 0.0
class TestScanCrossTierDuplicates:
def test_no_duplicates(self):
memory = ["Deploy via Ansible", "Use mimo-v2-pro model"]
facts = [
{"fact_id": 1, "content": "User prefers dark mode"},
{"fact_id": 2, "content": "Project uses Python 3.11"},
]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 0
assert len(report.pairs) == 0
def test_exact_duplicate(self):
memory = ["Deploy via Ansible"]
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 1
assert report.pairs[0].similarity == 1.0
assert report.pairs[0].fact_store_id == 1
def test_near_duplicate_above_threshold(self):
memory = ["Alexander prefers action over narration"]
facts = [{"fact_id": 1, "content": "Alexander prefers action over narration."}]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 1
def test_below_threshold_not_duplicate(self):
memory = ["Deploy via Ansible on VPS"]
facts = [{"fact_id": 1, "content": "Deploy via Docker on local machine"}]
report = scan_cross_tier_duplicates(memory, facts, threshold=0.85)
assert report.duplicates_found == 0
def test_short_entries_skipped(self):
memory = ["OK", "ab"]
facts = [{"fact_id": 1, "content": "OK"}]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 0
def test_multiple_duplicates(self):
memory = ["Fact A here", "Fact B here"]
facts = [
{"fact_id": 1, "content": "Fact A here"},
{"fact_id": 2, "content": "Fact B here"},
]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 2
def test_report_summary(self):
memory = ["Deploy via Ansible"]
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
report = scan_cross_tier_duplicates(memory, facts)
summary = report.summary()
assert "1 MEMORY.md entries" in summary
assert "1 fact store entries" in summary
assert "1 duplicates" in summary
class TestResolveDuplicates:
def test_removes_memory_duplicates(self):
memory = ["Deploy via Ansible", "Use Python 3.11"]
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
report = scan_cross_tier_duplicates(memory, facts)
cleaned = resolve_duplicates(report, memory)
assert len(cleaned) == 1
assert cleaned[0] == "Use Python 3.11"
def test_no_duplicates_returns_same(self):
memory = ["Deploy via Ansible", "Use Python 3.11"]
facts = [{"fact_id": 1, "content": "Completely different fact"}]
report = scan_cross_tier_duplicates(memory, facts)
cleaned = resolve_duplicates(report, memory)
assert len(cleaned) == 2
class TestIsDuplicateBeforeAdd:
def test_finds_duplicate(self):
existing = [{"fact_id": 1, "content": "Deploy via Ansible"}]
result = is_duplicate_before_add("Deploy via Ansible", existing)
assert result is not None
assert result["fact_id"] == 1
def test_no_duplicate_returns_none(self):
existing = [{"fact_id": 1, "content": "Use dark mode"}]
result = is_duplicate_before_add("Deploy via Ansible", existing)
assert result is None
def test_short_content_returns_none(self):
existing = [{"fact_id": 1, "content": "OK"}]
result = is_duplicate_before_add("OK", existing)
assert result is None
def test_empty_existing_returns_none(self):
result = is_duplicate_before_add("Some fact here", [])
assert result is None
class TestClassifyTier:
def test_user_pref_goes_to_factstore(self):
assert classify_tier("anything", "user_pref") == "factstore"
def test_project_goes_to_factstore(self):
assert classify_tier("anything", "project") == "factstore"
def test_short_operational_note_goes_to_memory(self):
assert classify_tier("remember: always use sudo") == "memory"
assert classify_tier("todo: fix the deploy script") == "memory"
def test_long_fact_goes_to_factstore(self):
long_fact = "The deployment process requires running ansible-playbook with the production inventory file and verifying health checks after completion"
assert classify_tier(long_fact) == "factstore"
def test_general_short_goes_to_factstore(self):
# Short but not operational
assert classify_tier("user likes dark mode") == "factstore"

View File

@@ -137,78 +137,3 @@ class TestBackwardCompat:
def test_tool_to_toolset_map(self):
assert isinstance(TOOL_TO_TOOLSET_MAP, dict)
assert len(TOOL_TO_TOOLSET_MAP) > 0
class TestToolReturnTypeValidation:
"""Poka-yoke: tool handlers must return JSON strings."""
def test_handler_returning_dict_is_wrapped(self, monkeypatch):
"""A handler that returns a dict should be auto-wrapped to JSON string."""
from tools.registry import registry
from model_tools import handle_function_call
import json
# Register a bad handler that returns dict instead of str
registry.register(
name="__test_bad_dict",
toolset="test",
schema={"name": "__test_bad_dict", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: {"this is": "a dict not a string"},
)
result = handle_function_call("__test_bad_dict", {})
parsed = json.loads(result)
assert "output" in parsed
assert "_type_warning" in parsed
# Cleanup
registry._tools.pop("__test_bad_dict", None)
def test_handler_returning_none_is_wrapped(self, monkeypatch):
"""A handler that returns None should be auto-wrapped."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_bad_none",
toolset="test",
schema={"name": "__test_bad_none", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: None,
)
result = handle_function_call("__test_bad_none", {})
parsed = json.loads(result)
assert "_type_warning" in parsed
registry._tools.pop("__test_bad_none", None)
def test_handler_returning_non_json_string_is_wrapped(self):
"""A handler returning a plain string (not JSON) should be wrapped."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_bad_plain",
toolset="test",
schema={"name": "__test_bad_plain", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: "just a plain string, not json",
)
result = handle_function_call("__test_bad_plain", {})
parsed = json.loads(result)
assert "output" in parsed
registry._tools.pop("__test_bad_plain", None)
def test_handler_returning_valid_json_passes_through(self):
"""A handler returning valid JSON string passes through unchanged."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_good",
toolset="test",
schema={"name": "__test_good", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: json.dumps({"status": "ok", "data": [1, 2, 3]}),
)
result = handle_function_call("__test_good", {})
parsed = json.loads(result)
assert parsed == {"status": "ok", "data": [1, 2, 3]}
registry._tools.pop("__test_good", None)

View File

@@ -144,8 +144,7 @@ class TestMemoryStoreReplace:
def test_replace_no_match(self, store):
store.add("memory", "fact A")
result = store.replace("memory", "nonexistent", "new")
assert result["success"] is True
assert result["result"] == "no_match"
assert result["success"] is False
def test_replace_ambiguous_match(self, store):
store.add("memory", "server A runs nginx")
@@ -178,8 +177,7 @@ class TestMemoryStoreRemove:
def test_remove_no_match(self, store):
result = store.remove("memory", "nonexistent")
assert result["success"] is True
assert result["result"] == "no_match"
assert result["success"] is False
def test_remove_empty_old_text(self, store):
result = store.remove("memory", " ")

View File

@@ -260,12 +260,8 @@ class MemoryStore:
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if not matches:
return {
"success": True,
"result": "no_match",
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
}
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
# If all matches are identical (exact duplicates), operate on the first one
@@ -314,12 +310,8 @@ class MemoryStore:
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if not matches:
return {
"success": True,
"result": "no_match",
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
}
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
# If all matches are identical (exact duplicates), remove the first one
@@ -457,30 +449,30 @@ def memory_tool(
Returns JSON string with results.
"""
if store is None:
return tool_error("Memory is not available. It may be disabled in config or this environment.", success=False)
return json.dumps({"success": False, "error": "Memory is not available. It may be disabled in config or this environment."}, ensure_ascii=False)
if target not in ("memory", "user"):
return tool_error(f"Invalid target '{target}'. Use 'memory' or 'user'.", success=False)
return json.dumps({"success": False, "error": f"Invalid target '{target}'. Use 'memory' or 'user'."}, ensure_ascii=False)
if action == "add":
if not content:
return tool_error("Content is required for 'add' action.", success=False)
return json.dumps({"success": False, "error": "Content is required for 'add' action."}, ensure_ascii=False)
result = store.add(target, content)
elif action == "replace":
if not old_text:
return tool_error("old_text is required for 'replace' action.", success=False)
return json.dumps({"success": False, "error": "old_text is required for 'replace' action."}, ensure_ascii=False)
if not content:
return tool_error("content is required for 'replace' action.", success=False)
return json.dumps({"success": False, "error": "content is required for 'replace' action."}, ensure_ascii=False)
result = store.replace(target, old_text, content)
elif action == "remove":
if not old_text:
return tool_error("old_text is required for 'remove' action.", success=False)
return json.dumps({"success": False, "error": "old_text is required for 'remove' action."}, ensure_ascii=False)
result = store.remove(target, old_text)
else:
return tool_error(f"Unknown action '{action}'. Use: add, replace, remove", success=False)
return json.dumps({"success": False, "error": f"Unknown action '{action}'. Use: add, replace, remove"}, ensure_ascii=False)
return json.dumps(result, ensure_ascii=False)
@@ -547,7 +539,7 @@ MEMORY_SCHEMA = {
# --- Registry ---
from tools.registry import registry, tool_error
from tools.registry import registry
registry.register(
name="memory",