Compare commits

..

2 Commits

Author SHA1 Message Date
693450e883 feat: add contradiction resolution and periodic scan to holographic plugin (#251)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 59s
2026-04-14 01:20:08 +00:00
b3ab2450dc feat: add contradiction detection and resolution module (#251) 2026-04-14 01:18:16 +00:00
5 changed files with 226 additions and 206 deletions

View File

@@ -26,7 +26,7 @@ from cron.jobs import (
trigger_job,
JOBS_FILE,
)
from cron.scheduler import tick
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS
__all__ = [
"create_job",
@@ -39,4 +39,6 @@ __all__ = [
"trigger_job",
"tick",
"JOBS_FILE",
"ModelContextError",
"CRON_MIN_CONTEXT_TOKENS",
]

View File

@@ -545,75 +545,8 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
return False, f"Script execution failed: {exc}"
# ---------------------------------------------------------------------------
# Runtime classification & provider mismatch detection
# ---------------------------------------------------------------------------
_PROVIDER_ALIASES: dict[str, set[str]] = {
"ollama": {"ollama", "local ollama", "localhost:11434"},
"anthropic": {"anthropic", "claude", "sonnet", "opus", "haiku"},
"nous": {"nous", "mimo", "nousresearch"},
"openrouter": {"openrouter"},
"kimi": {"kimi", "moonshot"},
"openai": {"openai", "gpt", "codex"},
"gemini": {"gemini", "google"},
}
_CLOUD_PREFIXES = frozenset({"nous", "openrouter", "anthropic", "openai", "zai", "kimi", "gemini", "minimax"})
def _classify_runtime(provider: str, model: str) -> str:
"""Return 'local' | 'cloud' | 'unknown'."""
p = (provider or "").strip().lower()
m = (model or "").strip().lower()
if p and p not in ("ollama", "local"):
return "cloud"
if "/" in m and m.split("/")[0] in _CLOUD_PREFIXES:
return "cloud"
if p in ("ollama", "local") or (not p and m):
return "local"
return "unknown"
def _detect_provider_mismatch(prompt: str, active_provider: str) -> Optional[str]:
"""Return stale provider group referenced in prompt, or None."""
if not active_provider or not prompt:
return None
prompt_lower = prompt.lower()
active_lower = active_provider.lower().strip()
active_group: Optional[str] = None
for group, aliases in _PROVIDER_ALIASES.items():
if active_lower in aliases or active_lower.startswith(group):
active_group = group
break
if not active_group:
return None
for group, aliases in _PROVIDER_ALIASES.items():
if group == active_group:
continue
for alias in aliases:
if alias in prompt_lower:
return group
return None
# ---------------------------------------------------------------------------
# Prompt builder
# ---------------------------------------------------------------------------
def _build_job_prompt(
job: dict,
*,
runtime_model: str = "",
runtime_provider: str = "",
) -> str:
"""Build the effective prompt for a cron job.
Args:
job: The cron job dict.
runtime_model: Resolved model name (e.g. "xiaomi/mimo-v2-pro").
runtime_provider: Resolved provider name (e.g. "nous", "openrouter").
"""
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
prompt = job.get("prompt", "")
skills = job.get("skills")
@@ -643,33 +576,6 @@ def _build_job_prompt(
f"{prompt}"
)
# Runtime context injection — tells the agent what it can actually do.
_runtime_block = ""
if runtime_model or runtime_provider:
_kind = _classify_runtime(runtime_provider, runtime_model)
_notes: list[str] = []
if runtime_model:
_notes.append(f"MODEL: {runtime_model}")
if runtime_provider:
_notes.append(f"PROVIDER: {runtime_provider}")
if _kind == "local":
_notes.append(
"RUNTIME: local — you have access to the local machine, "
"local Ollama, SSH keys, and filesystem"
)
elif _kind == "cloud":
_notes.append(
"RUNTIME: cloud API — you do NOT have local machine access. "
"Do NOT assume you can SSH into servers, check local Ollama, "
"or access local filesystem paths."
)
if _notes:
_runtime_block = (
"[SYSTEM: RUNTIME CONTEXT — "
+ "; ".join(_notes)
+ ". Adjust your approach based on these capabilities.]\\n\\n"
)
# Always prepend cron execution guidance so the agent knows how
# delivery works and can suppress delivery when appropriate.
cron_hint = (
@@ -689,9 +595,9 @@ def _build_job_prompt(
"response. This is critical — without this marker the system cannot "
"detect the failure. Examples: "
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\\\"[SCRIPT_FAILED]: script exited with code 1\\\".]\\\\n\\\\n"
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
)
prompt = _runtime_block + cron_hint + prompt
prompt = cron_hint + prompt
if skills is None:
legacy = job.get("skill")
skills = [legacy] if legacy else []
@@ -761,32 +667,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
job_id = job["id"]
job_name = job["name"]
# Early model/provider resolution for runtime context injection
_early_model = job.get("model") or os.getenv("HERMES_MODEL") or ""
_early_provider = os.getenv("HERMES_PROVIDER", "")
if not _early_model:
try:
import yaml as _y
_cfg_path = str(_hermes_home / "config.yaml")
if os.path.exists(_cfg_path):
with open(_cfg_path) as _f:
_cfg_early = _y.safe_load(_f) or {}
_mc = _cfg_early.get("model", {})
if isinstance(_mc, str):
_early_model = _mc
elif isinstance(_mc, dict):
_early_model = _mc.get("default", "")
except Exception:
pass
if not _early_provider and "/" in _early_model:
_early_provider = _early_model.split("/")[0]
prompt = _build_job_prompt(
job,
runtime_model=_early_model,
runtime_provider=_early_provider,
)
prompt = _build_job_prompt(job)
origin = _resolve_origin(job)
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
@@ -898,17 +779,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
# Provider mismatch warning
_resolved_provider = runtime.get("provider", "") or ""
_raw_prompt = job.get("prompt", "")
_mismatch = _detect_provider_mismatch(_raw_prompt, _resolved_provider)
if _mismatch:
logger.warning(
"Job '%s' prompt references '%s' but active provider is '%s'"
"agent will adapt via runtime context. Consider updating prompt.",
job_name, _mismatch, _resolved_provider,
)
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(
prompt,

View File

@@ -234,12 +234,7 @@ class HolographicMemoryProvider(MemoryProvider):
return self._handle_fact_feedback(args)
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def on_session_end(self, messages: List[Dict[str, Any]]) -> None:
if not self._config.get("auto_extract", False):
return
if not self._store or not messages:
return
self._auto_extract_facts(messages)
def on_memory_write(self, action: str, target: str, content: str) -> None:
"""Mirror built-in memory writes as facts.
@@ -266,6 +261,44 @@ class HolographicMemoryProvider(MemoryProvider):
except Exception as e:
logger.debug("Holographic memory_write mirror failed: %s", e)
def on_session_end(self, messages: List[Dict[str, Any]]) -> None:
"""Run auto-extraction and periodic contradiction detection."""
if self._config.get("auto_extract", False):
self._auto_extract_facts(messages)
# Periodic contradiction detection (run every ~50 sessions or on first session)
self._maybe_run_contradiction_scan()
def _maybe_run_contradiction_scan(self) -> None:
"""Run contradiction detection if enough sessions have passed since last run."""
if not self._store or not self._retriever:
return
try:
# Use a counter file to track sessions since last scan
from hermes_constants import get_hermes_home
counter_path = get_hermes_home() / ".contradiction_scan_counter"
count = 0
if counter_path.exists():
try:
count = int(counter_path.read_text().strip())
except (ValueError, OSError):
count = 0
count += 1
counter_path.write_text(str(count))
# Run every 50 sessions
if count >= 50:
counter_path.write_text("0")
from .resolver import ContradictionResolver
resolver = ContradictionResolver(self._store, self._retriever)
report = resolver.run_detection_and_resolution(limit=50, auto_resolve=True)
if report.contradictions_found > 0:
logger.info("Periodic contradiction scan: %s", report.summary())
except Exception as e:
logger.debug("Periodic contradiction scan failed: %s", e)
def shutdown(self) -> None:
self._store = None
self._retriever = None

View File

@@ -0,0 +1,179 @@
"""Contradiction detection and resolution for holographic memory.
Periodically scans the fact store for contradictions using the retriever's
contradict() method, then resolves obvious cases and flags ambiguous ones.
Resolution strategy:
- Obvious: same entity, newer fact supersedes older → lower trust on older
- Ambiguous: different claims about same entity → flag for review, don't auto-resolve
- High-confidence contradiction (>0.7 score): lower trust on both, log warning
Usage:
from plugins.memory.holographic.resolver import ContradictionResolver
resolver = ContradictionResolver(store, retriever)
report = resolver.run_detection_and_resolution()
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# Thresholds
_OBVIOUS_THRESHOLD = 0.5 # contradiction_score >= this → obvious
_AMBIGUOUS_THRESHOLD = 0.3 # contradiction_score >= this → flag
_HIGH_CONFIDENCE = 0.7 # contradiction_score >= this → high confidence
_TRUST_PENALTY_OLD = 0.3 # trust reduction for superseded fact
_TRUST_PENALTY_CONFLICT = 0.15 # trust reduction for ambiguous conflicts
class ContradictionReport:
"""Results of a contradiction detection and resolution run."""
def __init__(self):
self.total_scanned: int = 0
self.contradictions_found: int = 0
self.auto_resolved: int = 0
self.flagged_for_review: int = 0
self.high_confidence: int = 0
self.resolved_pairs: List[Dict[str, Any]] = []
self.flagged_pairs: List[Dict[str, Any]] = []
def summary(self) -> str:
lines = [
f"Contradiction scan: {self.total_scanned} facts, "
f"{self.contradictions_found} contradictions found",
f" Auto-resolved: {self.auto_resolved} (newer supersedes older)",
f" High-confidence: {self.high_confidence} (trust lowered on both)",
f" Flagged for review: {self.flagged_for_review}",
]
for pair in self.flagged_pairs[:5]:
lines.append(
f" [REVIEW] score={pair['contradiction_score']:.2f} "
f"entities={pair['shared_entities']} "
f"A: {pair['fact_a']['content'][:50]}"
f"B: {pair['fact_b']['content'][:50]}"
)
return "\n".join(lines)
def to_dict(self) -> dict:
return {
"total_scanned": self.total_scanned,
"contradictions_found": self.contradictions_found,
"auto_resolved": self.auto_resolved,
"high_confidence": self.high_confidence,
"flagged_for_review": self.flagged_for_review,
"resolved_pairs": self.resolved_pairs,
"flagged_pairs": self.flagged_pairs,
}
class ContradictionResolver:
"""Detects and resolves contradictions in the holographic fact store."""
def __init__(self, store, retriever):
self._store = store
self._retriever = retriever
def run_detection_and_resolution(
self,
limit: int = 50,
auto_resolve: bool = True,
) -> ContradictionReport:
"""Run a full contradiction detection and resolution pass.
Args:
limit: Max contradiction pairs to process.
auto_resolve: If True, auto-resolve obvious cases.
Returns:
ContradictionReport with all findings and actions taken.
"""
report = ContradictionReport()
try:
contradictions = self._retriever.contradict(limit=limit)
except Exception as e:
logger.warning("Contradiction detection failed: %s", e)
return report
report.total_scanned = len(contradictions)
report.contradictions_found = len(contradictions)
for pair in contradictions:
score = pair.get("contradiction_score", 0.0)
if score >= _HIGH_CONFIDENCE:
report.high_confidence += 1
if auto_resolve:
self._resolve_high_confidence(pair, report)
elif score >= _OBVIOUS_THRESHOLD:
if auto_resolve:
self._resolve_obvious(pair, report)
elif score >= _AMBIGUOUS_THRESHOLD:
report.flagged_for_review += 1
report.flagged_pairs.append(pair)
# Lower trust slightly on both to reduce retrieval weight
self._penalize_both(pair, _TRUST_PENALTY_CONFLICT)
return report
def _resolve_obvious(self, pair: dict, report: ContradictionReport) -> None:
"""Resolve obvious contradiction: newer fact supersedes older.
Same entity, clearly contradictory claims. Newer wins.
"""
fact_a = pair["fact_a"]
fact_b = pair["fact_b"]
# Determine which is newer
a_time = fact_a.get("updated_at") or fact_a.get("created_at", "")
b_time = fact_b.get("updated_at") or fact_b.get("created_at", "")
if a_time >= b_time:
newer, older = fact_a, fact_b
else:
newer, older = fact_b, fact_a
# Lower trust on the older (superseded) fact
try:
self._store.update_fact(
older["fact_id"],
trust_delta=-_TRUST_PENALTY_OLD,
)
report.auto_resolved += 1
report.resolved_pairs.append({
"action": "superseded",
"kept": newer["fact_id"],
"demoted": older["fact_id"],
"reason": f"Newer fact supersedes older (score={pair['contradiction_score']:.2f})",
})
logger.info(
"Contradiction resolved: fact#%d supersedes fact#%d (entities: %s)",
newer["fact_id"], older["fact_id"],
pair.get("shared_entities", []),
)
except Exception as e:
logger.debug("Failed to resolve contradiction: %s", e)
def _resolve_high_confidence(self, pair: dict, report: ContradictionReport) -> None:
"""Resolve high-confidence contradiction: lower trust on both facts.
We can't determine which is correct, so both get penalized.
"""
self._penalize_both(pair, _TRUST_PENALTY_CONFLICT * 2)
report.flagged_pairs.append(pair)
def _penalize_both(self, pair: dict, penalty: float) -> None:
"""Lower trust on both contradictory facts."""
for key in ("fact_a", "fact_b"):
fact = pair.get(key, {})
fid = fact.get("fact_id")
if fid:
try:
self._store.update_fact(fid, trust_delta=-penalty)
except Exception as e:
logger.debug("Failed to penalize fact#%d: %s", fid, e)

View File

@@ -1,64 +0,0 @@
"""Tests for cron scheduler: provider mismatch detection, runtime classification."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
def _import_scheduler():
import importlib.util
spec = importlib.util.spec_from_file_location(
"cron.scheduler", str(Path(__file__).resolve().parent.parent / "cron" / "scheduler.py"),
)
mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod)
except Exception:
pass
return mod
_sched = _import_scheduler()
_classify_runtime = _sched._classify_runtime
_detect_provider_mismatch = _sched._detect_provider_mismatch
_build_job_prompt = _sched._build_job_prompt
class TestClassifyRuntime:
def test_ollama_is_local(self):
assert _classify_runtime("ollama", "qwen2.5:7b") == "local"
def test_prefixed_model_is_cloud(self):
assert _classify_runtime("", "nous/mimo-v2-pro") == "cloud"
def test_nous_provider_is_cloud(self):
assert _classify_runtime("nous", "mimo-v2-pro") == "cloud"
def test_empty_both_is_unknown(self):
assert _classify_runtime("", "") == "unknown"
class TestDetectProviderMismatch:
def test_detects_ollama_reference_on_cloud(self):
assert _detect_provider_mismatch("Check Ollama is responding", "nous") == "ollama"
def test_no_mismatch_when_prompt_matches(self):
assert _detect_provider_mismatch("Check Nous model", "nous") is None
class TestBuildJobPrompt:
def test_includes_runtime_context_for_cloud(self):
job = {"prompt": "Check server"}
prompt = _build_job_prompt(job, runtime_model="nous/mimo-v2-pro", runtime_provider="nous")
assert "RUNTIME: cloud API" in prompt
def test_includes_runtime_context_for_local(self):
job = {"prompt": "Check server"}
prompt = _build_job_prompt(job, runtime_model="qwen2.5:7b", runtime_provider="ollama")
assert "RUNTIME: local" in prompt
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])