Compare commits

..

1 Commits

Author SHA1 Message Date
628487f7bd fix(cron): rewrite cloud-incompatible prompt instructions (#378)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m9s
Health Monitor prompts say 'Check Ollama is responding' but run
on cloud models that cannot reach localhost. Instead of just
warning the agent, rewrite the instructions to cloud-compatible
equivalents the agent can actually execute.

Changes:
- Add import re
- Add _CLOUD_INCOMPATIBLE_PATTERNS: regex pairs (pattern, replacement)
- Add _rewrite_cloud_incompatible_prompt(): rewrites localhost/Ollama
  references to 'use available tools to check service health'
- Wire into run_job() after resolve_turn_route()

Closes #378
2026-04-14 01:47:00 +00:00
3 changed files with 57 additions and 219 deletions

View File

@@ -13,6 +13,7 @@ import concurrent.futures
import json
import logging
import os
import re
import subprocess
import sys
@@ -643,7 +644,56 @@ def _build_job_prompt(job: dict) -> str:
return "\n".join(parts)
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Regex patterns for local service references that fail on cloud endpoints
_CLOUD_INCOMPATIBLE_PATTERNS = [
(re.compile(r"\b[Cc]heck\s+(?:that\s+)?[Oo]llama\s+(?:is\s+)?(?:responding|running|up|available)", re.IGNORECASE),
"Verify system services are healthy using available tools"),
(re.compile(r"\b[Vv]erify\s+(?:that\s+)?[Oo]llama\s+(?:is\s+)?(?:responding|running|up)", re.IGNORECASE),
"Verify system services are healthy using available tools"),
(re.compile(r"\bcurl\s+localhost:\d+", re.IGNORECASE),
"use available tools to check service health"),
(re.compile(r"\bcurl\s+127\.0\.0\.1:\d+", re.IGNORECASE),
"use available tools to check service health"),
(re.compile(r"\bpoll\s+localhost", re.IGNORECASE),
"check service health via available tools"),
]
def _rewrite_cloud_incompatible_prompt(prompt: str, base_url: str) -> str:
"""Rewrite prompt instructions that assume local service access when running on cloud.
When a cron job runs on a cloud inference endpoint (Nous, OpenRouter, Anthropic),
instructions to "Check Ollama" or "curl localhost:11434" are impossible.
Instead of just warning, this rewrites the instruction to a cloud-compatible
equivalent that the agent can actually execute.
Returns the (possibly rewritten) prompt.
"""
try:
from agent.model_metadata import is_local_endpoint
except ImportError:
return prompt
if is_local_endpoint(base_url or ""):
return prompt # Local — no rewrite needed
rewritten = prompt
for pattern, replacement in _CLOUD_INCOMPATIBLE_PATTERNS:
rewritten = pattern.sub(replacement, rewritten)
if rewritten != prompt:
rewritten = (
"[NOTE: Some instructions were adjusted for cloud execution. "
"Local service checks were rewritten to use available tools.]
"
+ rewritten
)
return rewritten
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
Execute a single cron job.

View File

@@ -234,7 +234,12 @@ class HolographicMemoryProvider(MemoryProvider):
return self._handle_fact_feedback(args)
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def on_session_end(self, messages: List[Dict[str, Any]]) -> None:
if not self._config.get("auto_extract", False):
return
if not self._store or not messages:
return
self._auto_extract_facts(messages)
def on_memory_write(self, action: str, target: str, content: str) -> None:
"""Mirror built-in memory writes as facts.
@@ -261,44 +266,6 @@ class HolographicMemoryProvider(MemoryProvider):
except Exception as e:
logger.debug("Holographic memory_write mirror failed: %s", e)
def on_session_end(self, messages: List[Dict[str, Any]]) -> None:
"""Run auto-extraction and periodic contradiction detection."""
if self._config.get("auto_extract", False):
self._auto_extract_facts(messages)
# Periodic contradiction detection (run every ~50 sessions or on first session)
self._maybe_run_contradiction_scan()
def _maybe_run_contradiction_scan(self) -> None:
"""Run contradiction detection if enough sessions have passed since last run."""
if not self._store or not self._retriever:
return
try:
# Use a counter file to track sessions since last scan
from hermes_constants import get_hermes_home
counter_path = get_hermes_home() / ".contradiction_scan_counter"
count = 0
if counter_path.exists():
try:
count = int(counter_path.read_text().strip())
except (ValueError, OSError):
count = 0
count += 1
counter_path.write_text(str(count))
# Run every 50 sessions
if count >= 50:
counter_path.write_text("0")
from .resolver import ContradictionResolver
resolver = ContradictionResolver(self._store, self._retriever)
report = resolver.run_detection_and_resolution(limit=50, auto_resolve=True)
if report.contradictions_found > 0:
logger.info("Periodic contradiction scan: %s", report.summary())
except Exception as e:
logger.debug("Periodic contradiction scan failed: %s", e)
def shutdown(self) -> None:
self._store = None
self._retriever = None

View File

@@ -1,179 +0,0 @@
"""Contradiction detection and resolution for holographic memory.
Periodically scans the fact store for contradictions using the retriever's
contradict() method, then resolves obvious cases and flags ambiguous ones.
Resolution strategy:
- Obvious: same entity, newer fact supersedes older → lower trust on older
- Ambiguous: different claims about same entity → flag for review, don't auto-resolve
- High-confidence contradiction (>0.7 score): lower trust on both, log warning
Usage:
from plugins.memory.holographic.resolver import ContradictionResolver
resolver = ContradictionResolver(store, retriever)
report = resolver.run_detection_and_resolution()
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# Thresholds
_OBVIOUS_THRESHOLD = 0.5 # contradiction_score >= this → obvious
_AMBIGUOUS_THRESHOLD = 0.3 # contradiction_score >= this → flag
_HIGH_CONFIDENCE = 0.7 # contradiction_score >= this → high confidence
_TRUST_PENALTY_OLD = 0.3 # trust reduction for superseded fact
_TRUST_PENALTY_CONFLICT = 0.15 # trust reduction for ambiguous conflicts
class ContradictionReport:
"""Results of a contradiction detection and resolution run."""
def __init__(self):
self.total_scanned: int = 0
self.contradictions_found: int = 0
self.auto_resolved: int = 0
self.flagged_for_review: int = 0
self.high_confidence: int = 0
self.resolved_pairs: List[Dict[str, Any]] = []
self.flagged_pairs: List[Dict[str, Any]] = []
def summary(self) -> str:
lines = [
f"Contradiction scan: {self.total_scanned} facts, "
f"{self.contradictions_found} contradictions found",
f" Auto-resolved: {self.auto_resolved} (newer supersedes older)",
f" High-confidence: {self.high_confidence} (trust lowered on both)",
f" Flagged for review: {self.flagged_for_review}",
]
for pair in self.flagged_pairs[:5]:
lines.append(
f" [REVIEW] score={pair['contradiction_score']:.2f} "
f"entities={pair['shared_entities']} "
f"A: {pair['fact_a']['content'][:50]}"
f"B: {pair['fact_b']['content'][:50]}"
)
return "\n".join(lines)
def to_dict(self) -> dict:
return {
"total_scanned": self.total_scanned,
"contradictions_found": self.contradictions_found,
"auto_resolved": self.auto_resolved,
"high_confidence": self.high_confidence,
"flagged_for_review": self.flagged_for_review,
"resolved_pairs": self.resolved_pairs,
"flagged_pairs": self.flagged_pairs,
}
class ContradictionResolver:
"""Detects and resolves contradictions in the holographic fact store."""
def __init__(self, store, retriever):
self._store = store
self._retriever = retriever
def run_detection_and_resolution(
self,
limit: int = 50,
auto_resolve: bool = True,
) -> ContradictionReport:
"""Run a full contradiction detection and resolution pass.
Args:
limit: Max contradiction pairs to process.
auto_resolve: If True, auto-resolve obvious cases.
Returns:
ContradictionReport with all findings and actions taken.
"""
report = ContradictionReport()
try:
contradictions = self._retriever.contradict(limit=limit)
except Exception as e:
logger.warning("Contradiction detection failed: %s", e)
return report
report.total_scanned = len(contradictions)
report.contradictions_found = len(contradictions)
for pair in contradictions:
score = pair.get("contradiction_score", 0.0)
if score >= _HIGH_CONFIDENCE:
report.high_confidence += 1
if auto_resolve:
self._resolve_high_confidence(pair, report)
elif score >= _OBVIOUS_THRESHOLD:
if auto_resolve:
self._resolve_obvious(pair, report)
elif score >= _AMBIGUOUS_THRESHOLD:
report.flagged_for_review += 1
report.flagged_pairs.append(pair)
# Lower trust slightly on both to reduce retrieval weight
self._penalize_both(pair, _TRUST_PENALTY_CONFLICT)
return report
def _resolve_obvious(self, pair: dict, report: ContradictionReport) -> None:
"""Resolve obvious contradiction: newer fact supersedes older.
Same entity, clearly contradictory claims. Newer wins.
"""
fact_a = pair["fact_a"]
fact_b = pair["fact_b"]
# Determine which is newer
a_time = fact_a.get("updated_at") or fact_a.get("created_at", "")
b_time = fact_b.get("updated_at") or fact_b.get("created_at", "")
if a_time >= b_time:
newer, older = fact_a, fact_b
else:
newer, older = fact_b, fact_a
# Lower trust on the older (superseded) fact
try:
self._store.update_fact(
older["fact_id"],
trust_delta=-_TRUST_PENALTY_OLD,
)
report.auto_resolved += 1
report.resolved_pairs.append({
"action": "superseded",
"kept": newer["fact_id"],
"demoted": older["fact_id"],
"reason": f"Newer fact supersedes older (score={pair['contradiction_score']:.2f})",
})
logger.info(
"Contradiction resolved: fact#%d supersedes fact#%d (entities: %s)",
newer["fact_id"], older["fact_id"],
pair.get("shared_entities", []),
)
except Exception as e:
logger.debug("Failed to resolve contradiction: %s", e)
def _resolve_high_confidence(self, pair: dict, report: ContradictionReport) -> None:
"""Resolve high-confidence contradiction: lower trust on both facts.
We can't determine which is correct, so both get penalized.
"""
self._penalize_both(pair, _TRUST_PENALTY_CONFLICT * 2)
report.flagged_pairs.append(pair)
def _penalize_both(self, pair: dict, penalty: float) -> None:
"""Lower trust on both contradictory facts."""
for key in ("fact_a", "fact_b"):
fact = pair.get(key, {})
fid = fact.get("fact_id")
if fid:
try:
self._store.update_fact(fid, trust_delta=-penalty)
except Exception as e:
logger.debug("Failed to penalize fact#%d: %s", fid, e)