Compare commits
2 Commits
fix/794-au
...
fix/792-gr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
55c8100b8f | ||
|
|
1f92fb0480 |
155
scripts/grounding.py
Executable file
155
scripts/grounding.py
Executable file
@@ -0,0 +1,155 @@
|
||||
#!/usr/bin/env python3
|
||||
# grounding.py - Grounding before generation.
|
||||
# SOUL.md: "When I have verified sources, I must consult them
|
||||
# before I generate from pattern alone. Retrieval is not a feature.
|
||||
# It is the primary mechanism by which I avoid lying."
|
||||
# Part of #792
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
|
||||
MEMORY_DIR = HERMES_HOME / "memory"
|
||||
|
||||
|
||||
@dataclass
|
||||
class GroundingResult:
|
||||
query: str
|
||||
sources_found: List[Dict[str, Any]] = field(default_factory=list)
|
||||
grounded: bool = False
|
||||
confidence: float = 0.0
|
||||
source_text: str = ""
|
||||
source_type: str = "" # memory, file, chain, tool_result
|
||||
|
||||
@property
|
||||
def needs_hedging(self):
|
||||
return not self.grounded
|
||||
|
||||
|
||||
class GroundingLayer:
|
||||
def __init__(self, memory_dir=None):
|
||||
self.memory_dir = Path(memory_dir) if memory_dir else MEMORY_DIR
|
||||
|
||||
def ground(self, query, context=None):
|
||||
"""Query local sources before generation."""
|
||||
sources = []
|
||||
|
||||
# 1. Search memory files
|
||||
memory_hits = self._search_memory(query)
|
||||
sources.extend(memory_hits)
|
||||
|
||||
# 2. Search context files if provided
|
||||
if context:
|
||||
context_hits = self._search_context(query, context)
|
||||
sources.extend(context_hits)
|
||||
|
||||
# 3. Build result
|
||||
grounded = len(sources) > 0
|
||||
confidence = min(0.95, 0.3 + len(sources) * 0.2) if grounded else 0.0
|
||||
|
||||
source_text = ""
|
||||
source_type = ""
|
||||
if sources:
|
||||
best = max(sources, key=lambda s: s.get("score", 0))
|
||||
source_text = best.get("text", "")[:200]
|
||||
source_type = best.get("type", "unknown")
|
||||
|
||||
return GroundingResult(
|
||||
query=query, sources_found=sources, grounded=grounded,
|
||||
confidence=confidence, source_text=source_text, source_type=source_type,
|
||||
)
|
||||
|
||||
def _search_memory(self, query):
|
||||
"""Search memory files for relevant content."""
|
||||
results = []
|
||||
if not self.memory_dir.exists():
|
||||
return results
|
||||
|
||||
query_lower = query.lower()
|
||||
query_words = set(query_lower.split())
|
||||
|
||||
for mem_file in self.memory_dir.rglob("*.md"):
|
||||
try:
|
||||
content = mem_file.read_text(encoding="utf-8", errors="replace")
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
content_lower = content.lower()
|
||||
# Simple relevance: count query word matches
|
||||
matches = sum(1 for w in query_words if w in content_lower)
|
||||
if matches > 0:
|
||||
score = matches / max(len(query_words), 1)
|
||||
# Extract relevant snippet
|
||||
lines = content.split("\n")
|
||||
snippet = ""
|
||||
for line in lines:
|
||||
if any(w in line.lower() for w in query_words):
|
||||
snippet = line.strip()[:200]
|
||||
break
|
||||
|
||||
results.append({
|
||||
"text": snippet or content[:200],
|
||||
"source": str(mem_file.relative_to(self.memory_dir)),
|
||||
"type": "memory",
|
||||
"score": round(score, 3),
|
||||
})
|
||||
|
||||
return sorted(results, key=lambda r: -r["score"])[:5]
|
||||
|
||||
def _search_context(self, query, context):
|
||||
"""Search provided context text for relevant content."""
|
||||
results = []
|
||||
if not context:
|
||||
return results
|
||||
|
||||
query_lower = query.lower()
|
||||
query_words = set(query_lower.split())
|
||||
|
||||
for ctx in context:
|
||||
if isinstance(ctx, dict):
|
||||
text = ctx.get("content", "") or ctx.get("text", "")
|
||||
source = ctx.get("source", "context")
|
||||
else:
|
||||
text = str(ctx)
|
||||
source = "context"
|
||||
|
||||
text_lower = text.lower()
|
||||
matches = sum(1 for w in query_words if w in text_lower)
|
||||
if matches > 0:
|
||||
score = matches / max(len(query_words), 1)
|
||||
results.append({
|
||||
"text": text[:200],
|
||||
"source": source,
|
||||
"type": "context",
|
||||
"score": round(score, 3),
|
||||
})
|
||||
|
||||
return sorted(results, key=lambda r: -r["score"])[:5]
|
||||
|
||||
def format_sources(self, result):
|
||||
"""Format grounding result for display."""
|
||||
if not result.grounded:
|
||||
return "No verified sources found. Proceeding from pattern matching."
|
||||
|
||||
lines = ["Based on verified sources:"]
|
||||
for s in result.sources_found[:3]:
|
||||
ref = s.get("source", "unknown")
|
||||
text = s.get("text", "")[:100]
|
||||
lines.append(" - [" + ref + "] " + text)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# Convenience
|
||||
_default_layer = None
|
||||
|
||||
def get_grounding_layer():
|
||||
global _default_layer
|
||||
if _default_layer is None:
|
||||
_default_layer = GroundingLayer()
|
||||
return _default_layer
|
||||
|
||||
def ground(query, **kwargs):
|
||||
return get_grounding_layer().ground(query, **kwargs)
|
||||
101
scripts/source_distinction.py
Executable file
101
scripts/source_distinction.py
Executable file
@@ -0,0 +1,101 @@
|
||||
#!/usr/bin/env python3
|
||||
# source_distinction.py - I think vs I know annotation system.
|
||||
# SOUL.md: "Every claim I make comes from one of two places: a verified source
|
||||
# I can point to, or my own pattern-matching."
|
||||
# Part of #793
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
class SourceType(Enum):
|
||||
VERIFIED = "verified"
|
||||
INFERRED = "inferred"
|
||||
STATED = "stated"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Claim:
|
||||
text: str
|
||||
source_type: SourceType
|
||||
source_ref: str = ""
|
||||
confidence: float = 0.0
|
||||
hedging: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class AnnotatedResponse:
|
||||
raw_text: str
|
||||
claims: List[Claim] = field(default_factory=list)
|
||||
|
||||
def render(self):
|
||||
if not self.claims:
|
||||
return self.raw_text
|
||||
parts = []
|
||||
for claim in self.claims:
|
||||
if claim.source_type == SourceType.VERIFIED:
|
||||
prefix = "[verified: " + claim.source_ref + "]" if claim.source_ref else "[verified]"
|
||||
parts.append(claim.text + " " + prefix)
|
||||
elif claim.source_type == SourceType.INFERRED:
|
||||
hedge = claim.hedging or "I think"
|
||||
parts.append(hedge + " " + claim.text)
|
||||
elif claim.source_type == SourceType.STATED:
|
||||
parts.append(claim.text + " [you told me]")
|
||||
else:
|
||||
parts.append("I am not certain, but " + claim.text)
|
||||
return " ".join(parts)
|
||||
|
||||
@property
|
||||
def verified_count(self):
|
||||
return sum(1 for c in self.claims if c.source_type == SourceType.VERIFIED)
|
||||
|
||||
@property
|
||||
def inferred_count(self):
|
||||
return sum(1 for c in self.claims if c.source_type == SourceType.INFERRED)
|
||||
|
||||
|
||||
def verified(text, source, confidence=0.95):
|
||||
return Claim(text=text, source_type=SourceType.VERIFIED, source_ref=source, confidence=confidence)
|
||||
|
||||
def inferred(text, hedging="I think", confidence=0.6):
|
||||
return Claim(text=text, source_type=SourceType.INFERRED, confidence=confidence, hedging=hedging)
|
||||
|
||||
def stated(text):
|
||||
return Claim(text=text, source_type=SourceType.STATED, confidence=1.0)
|
||||
|
||||
|
||||
def annotate_response(raw_text, claims):
|
||||
return AnnotatedResponse(raw_text=raw_text, claims=claims)
|
||||
|
||||
|
||||
def format_for_display(response):
|
||||
lines = []
|
||||
for claim in response.claims:
|
||||
if claim.source_type == SourceType.VERIFIED:
|
||||
ref = " (" + claim.source_ref + ")" if claim.source_ref else ""
|
||||
lines.append(" = " + claim.text + ref)
|
||||
elif claim.source_type == SourceType.INFERRED:
|
||||
lines.append(" ~ " + claim.hedging + " " + claim.text)
|
||||
elif claim.source_type == SourceType.STATED:
|
||||
lines.append(" > " + claim.text)
|
||||
else:
|
||||
lines.append(" ? " + claim.text)
|
||||
if response.claims:
|
||||
v = response.verified_count
|
||||
i = response.inferred_count
|
||||
t = len(response.claims)
|
||||
lines.append("")
|
||||
lines.append(" [" + str(v) + " verified, " + str(i) + " inferred, " + str(t) + " total]")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def source_distinction_check(text):
|
||||
hedging_words = ["i think", "i believe", "probably", "likely", "might",
|
||||
"it seems", "perhaps", "i am not sure", "i guess",
|
||||
"my understanding is", "i suspect"]
|
||||
text_lower = text.lower()
|
||||
hedging_count = sum(1 for h in hedging_words if h in text_lower)
|
||||
return {"has_hedging": hedging_count > 0, "hedging_count": hedging_count,
|
||||
"likely_inferred": hedging_count > 2}
|
||||
67
tests/test_grounding.py
Normal file
67
tests/test_grounding.py
Normal file
@@ -0,0 +1,67 @@
|
||||
"""Tests for grounding-before-generation - SOUL.md compliance."""
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
|
||||
|
||||
class TestGrounding:
|
||||
def test_ground_with_memory(self, tmp_path):
|
||||
from scripts.grounding import GroundingLayer
|
||||
mem_dir = tmp_path / "memory"
|
||||
mem_dir.mkdir()
|
||||
(mem_dir / "test.md").write_text("Python is a programming language created by Guido.")
|
||||
|
||||
layer = GroundingLayer(memory_dir=mem_dir)
|
||||
result = layer.ground("What is Python?")
|
||||
|
||||
assert result.grounded
|
||||
assert result.confidence > 0
|
||||
assert len(result.sources_found) > 0
|
||||
|
||||
def test_ground_no_sources(self, tmp_path):
|
||||
from scripts.grounding import GroundingLayer
|
||||
mem_dir = tmp_path / "memory"
|
||||
mem_dir.mkdir()
|
||||
|
||||
layer = GroundingLayer(memory_dir=mem_dir)
|
||||
result = layer.ground("What is quantum physics?")
|
||||
|
||||
assert not result.grounded
|
||||
assert result.needs_hedging
|
||||
assert result.confidence == 0.0
|
||||
|
||||
def test_ground_with_context(self):
|
||||
from scripts.grounding import GroundingLayer
|
||||
layer = GroundingLayer(memory_dir=Path("/nonexistent"))
|
||||
|
||||
context = [{"content": "The fleet uses tmux for agent management", "source": "fleet-ops"}]
|
||||
result = layer.ground("How does the fleet work?", context=context)
|
||||
|
||||
assert result.grounded
|
||||
assert result.source_type == "context"
|
||||
|
||||
def test_format_sources_grounded(self):
|
||||
from scripts.grounding import GroundingLayer, GroundingResult
|
||||
layer = GroundingLayer()
|
||||
result = GroundingResult(
|
||||
query="test", grounded=True,
|
||||
sources_found=[{"text": "test info", "source": "test.md", "type": "memory", "score": 0.8}],
|
||||
)
|
||||
formatted = layer.format_sources(result)
|
||||
assert "verified sources" in formatted
|
||||
assert "test.md" in formatted
|
||||
|
||||
def test_format_sources_ungrounded(self):
|
||||
from scripts.grounding import GroundingLayer, GroundingResult
|
||||
layer = GroundingLayer()
|
||||
result = GroundingResult(query="test", grounded=False)
|
||||
formatted = layer.format_sources(result)
|
||||
assert "pattern matching" in formatted
|
||||
|
||||
def test_empty_memory_dir(self, tmp_path):
|
||||
from scripts.grounding import GroundingLayer
|
||||
mem_dir = tmp_path / "empty"
|
||||
mem_dir.mkdir()
|
||||
layer = GroundingLayer(memory_dir=mem_dir)
|
||||
result = layer.ground("anything")
|
||||
assert not result.grounded
|
||||
61
tests/test_source_distinction.py
Normal file
61
tests/test_source_distinction.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""Tests for source distinction - SOUL.md compliance."""
|
||||
import pytest
|
||||
|
||||
|
||||
class TestSourceDistinction:
|
||||
def test_verified_claim(self):
|
||||
from scripts.source_distinction import verified, SourceType
|
||||
claim = verified("Paris is the capital", "web_search:Paris")
|
||||
assert claim.source_type == SourceType.VERIFIED
|
||||
assert claim.source_ref == "web_search:Paris"
|
||||
assert claim.confidence == 0.95
|
||||
|
||||
def test_inferred_claim(self):
|
||||
from scripts.source_distinction import inferred, SourceType
|
||||
claim = inferred("this approach is better")
|
||||
assert claim.source_type == SourceType.INFERRED
|
||||
assert claim.hedging == "I think"
|
||||
|
||||
def test_stated_claim(self):
|
||||
from scripts.source_distinction import stated, SourceType
|
||||
claim = stated("my name is Alexander")
|
||||
assert claim.source_type == SourceType.STATED
|
||||
assert claim.confidence == 1.0
|
||||
|
||||
def test_render_verified(self):
|
||||
from scripts.source_distinction import annotate_response, verified
|
||||
resp = annotate_response("test", [verified("Paris is capital", "web")])
|
||||
rendered = resp.render()
|
||||
assert "[verified: web]" in rendered
|
||||
|
||||
def test_render_inferred(self):
|
||||
from scripts.source_distinction import annotate_response, inferred
|
||||
resp = annotate_response("test", [ inferred("this is better")])
|
||||
rendered = resp.render()
|
||||
assert "I think" in rendered
|
||||
|
||||
def test_counts(self):
|
||||
from scripts.source_distinction import annotate_response, verified, inferred
|
||||
resp = annotate_response("test", [
|
||||
verified("a", "src"), verified("b", "src"), inferred("c"),
|
||||
])
|
||||
assert resp.verified_count == 2
|
||||
assert resp.inferred_count == 1
|
||||
|
||||
def test_hedging_detection(self):
|
||||
from scripts.source_distinction import source_distinction_check
|
||||
result = source_distinction_check("I think this is probably right, but I believe it could be different")
|
||||
assert result["has_hedging"]
|
||||
assert result["hedging_count"] >= 3
|
||||
|
||||
def test_no_hedging(self):
|
||||
from scripts.source_distinction import source_distinction_check
|
||||
result = source_distinction_check("The capital of France is Paris.")
|
||||
assert not result["has_hedging"]
|
||||
|
||||
def test_format_for_display(self):
|
||||
from scripts.source_distinction import format_for_display, annotate_response, verified, inferred
|
||||
resp = annotate_response("test", [verified("a", "src"), inferred("b")])
|
||||
output = format_for_display(resp)
|
||||
assert "=" in output # verified icon
|
||||
assert "~" in output # inferred icon
|
||||
Reference in New Issue
Block a user