354 lines
12 KiB
Python
354 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
LLM Wiki layer — ingest, query, lint, and session crystallization for compounding-intelligence.
|
|
|
|
This is the sovereign knowledge interface: a compiled, queryable, lintable
|
|
knowledge base that survivies beyond sessions and cites its sources.
|
|
|
|
Distinct from:
|
|
- RAG: Raw chunk retrieval without synthesis or quality gating
|
|
- Transcript search: Keyword match over raw session logs without distillation
|
|
|
|
The Wiki layer sits on top of the knowledge/ index (facts with provenance).
|
|
It provides:
|
|
ingest — Harvest knowledge from sessions or raw sources
|
|
query — Retrieve + synthesize answers with citations
|
|
lint — Detect staleness, contradictions, broken links
|
|
crystal — (via harvester) session distillation already integrated
|
|
|
|
Usage:
|
|
python3 scripts/wiki.py ingest --session ~/.hermes/sessions/xxx.jsonl
|
|
python3 scripts/wiki.py query "How do I fix cron timeouts?"
|
|
python3 scripts/wiki.py lint
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
REPO_ROOT = SCRIPT_DIR.parent
|
|
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
|
|
INDEX_PATH = KNOWLEDGE_DIR / "index.json"
|
|
|
|
# ---------- Utilities ----------
|
|
|
|
def load_index() -> dict:
|
|
if not INDEX_PATH.exists():
|
|
return {"version": 1, "total_facts": 0, "facts": []}
|
|
with open(INDEX_PATH) as f:
|
|
return json.load(f)
|
|
|
|
def score_fact_for_query(fact: dict, query_terms: set, query_lower: str) -> float:
|
|
"""Simple BM25-like relevance scoring for fact retrieval."""
|
|
fact_text = fact.get('fact', '').lower()
|
|
fact_tags = [t.lower() for t in fact.get('tags', [])]
|
|
|
|
# Term frequency in fact text
|
|
tf = sum(1 for term in query_terms if term in fact_text)
|
|
|
|
# Tag boost: exact tag match gives strong signal
|
|
tag_boost = sum(3.0 for tag in fact_tags if tag in query_lower)
|
|
|
|
# Confidence boost
|
|
confidence = fact.get('confidence', 0.5)
|
|
|
|
# Recency boost: newer facts get slight preference
|
|
last_confirmed = fact.get('last_confirmed', '')
|
|
recency_boost = 0.0
|
|
if last_confirmed:
|
|
try:
|
|
dt = datetime.fromisoformat(last_confirmed.rstrip('Z'))
|
|
days_old = (datetime.now(timezone.utc) - dt).days
|
|
recency_boost = max(0, 1.0 - days_old / 365)
|
|
except Exception:
|
|
pass
|
|
|
|
score = (tf * 1.0) + (tag_boost * confidence) + (recency_boost * 0.5)
|
|
return score
|
|
|
|
def retrieve_facts(query: str, limit: int = 10) -> List[dict]:
|
|
"""Retrieve the most relevant facts for a query from index.json."""
|
|
index = load_index()
|
|
facts = index.get('facts', [])
|
|
|
|
query_lower = query.lower()
|
|
query_terms = {t for t in re.split(r'\W+', query_lower) if len(t) > 2}
|
|
|
|
scored = []
|
|
for fact in facts:
|
|
score = score_fact_for_query(fact, query_terms, query_lower)
|
|
if score > 0:
|
|
scored.append((score, fact))
|
|
|
|
scored.sort(key=lambda x: -x[0])
|
|
return [f for _, f in scored[:limit]]
|
|
|
|
def format_facts_as_context(facts: List[dict]) -> str:
|
|
"""Format retrieved facts into a context block for LLM synthesis."""
|
|
lines = []
|
|
for i, fact in enumerate(facts, 1):
|
|
fid = fact.get('id', 'unknown')
|
|
fact_text = fact.get('fact', '')
|
|
confidence = fact.get('confidence', 0.5)
|
|
category = fact.get('category', 'fact')
|
|
lines.append(f"[{i}] ID:{fid} | {category} (conf={confidence:.2f}): {fact_text}")
|
|
return "\n".join(lines)
|
|
|
|
def find_api_key() -> str:
|
|
for p in [
|
|
Path.home() / ".config/nous/key",
|
|
Path.home() / ".hermes/keymaxxing/active/minimax.key",
|
|
Path.home() / ".config/openrouter/key",
|
|
]:
|
|
if p.exists():
|
|
return p.read_text().strip()
|
|
return os.environ.get("HARVESTER_API_KEY") or os.environ.get("OPENROUTER_API_KEY") or ""
|
|
|
|
def call_llm_synthesize(query: str, context: str, api_base: str, api_key: str, model: str) -> str:
|
|
"""Call LLM to synthesize answer from retrieved facts."""
|
|
import urllib.request
|
|
|
|
prompt = f"""You are the LLM Wiki answering from the sovereign knowledge base.
|
|
|
|
Knowledge facts (with citations):
|
|
{context}
|
|
|
|
Question: {query}
|
|
|
|
Instructions:
|
|
- Answer ONLY from the provided facts. Do not use outside knowledge.
|
|
- Cite facts using their [N] index number(s) in brackets.
|
|
- If the facts don't contain the answer, say "I don't know from the current knowledge base."
|
|
- Be concise (2-3 sentences maximum)."""
|
|
|
|
messages = [
|
|
{"role": "system", "content": "You are a precise knowledge assistant."},
|
|
{"role": "user", "content": prompt}
|
|
]
|
|
|
|
payload = json.dumps({
|
|
"model": model,
|
|
"messages": messages,
|
|
"temperature": 0.1,
|
|
"max_tokens": 512
|
|
}).encode('utf-8')
|
|
|
|
req = urllib.request.Request(
|
|
f"{api_base}/chat/completions",
|
|
data=payload,
|
|
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
|
|
method="POST"
|
|
)
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
result = json.loads(resp.read().decode('utf-8'))
|
|
return result["choices"][0]["message"]["content"].strip()
|
|
except Exception as e:
|
|
return f"[ERROR: LLM call failed: {e}]"
|
|
|
|
def detect_contradictions(index: dict) -> List[dict]:
|
|
"""Detect potentially contradictory facts in the same domain/category."""
|
|
contradictions = []
|
|
facts = index.get('facts', [])
|
|
|
|
from collections import defaultdict
|
|
grouped = defaultdict(list)
|
|
for f in facts:
|
|
key = (f.get('domain', 'global'), f.get('category', 'fact'))
|
|
grouped[key].append(f)
|
|
|
|
for key, group in grouped.items():
|
|
if len(group) < 2:
|
|
continue
|
|
for i in range(len(group)):
|
|
for j in range(i+1, len(group)):
|
|
f1, f2 = group[i], group[j]
|
|
text1 = f1.get('fact', '').lower()
|
|
text2 = f2.get('fact', '').lower()
|
|
words1 = set(re.findall(r'\w+', text1))
|
|
words2 = set(re.findall(r'\w+', text2))
|
|
if len(words1 & words2) >= 3:
|
|
contradictions.append({
|
|
"type": "potential_contradiction",
|
|
"domain": key[0],
|
|
"category": key[1],
|
|
"fact_a": f1.get('id'),
|
|
"fact_b": f2.get('id'),
|
|
"similarity": len(words1 & words2) / max(len(words1), len(words2))
|
|
})
|
|
return contradictions
|
|
|
|
def lint_knowledge() -> dict:
|
|
"""Run all lint checks: freshness, duplicates, contradictions."""
|
|
results = {"errors": [], "warnings": [], "suggestions": []}
|
|
|
|
index = load_index()
|
|
facts = index.get('facts', [])
|
|
|
|
# 1. Freshness check via freshness.py
|
|
try:
|
|
freshness_script = SCRIPT_DIR / "freshness.py"
|
|
if freshness_script.exists():
|
|
proc = subprocess.run(
|
|
[sys.executable, str(freshness_script), "--knowledge-dir", str(KNOWLEDGE_DIR)],
|
|
capture_output=True, text=True, timeout=30
|
|
)
|
|
if proc.returncode != 0:
|
|
results["errors"].append(f"freshness.py failed: {proc.stderr[:200]}")
|
|
except Exception as e:
|
|
results["errors"].append(f"Could not run freshness check: {e}")
|
|
|
|
# 2. Duplicate fact text
|
|
seen = {}
|
|
for f in facts:
|
|
txt = f.get('fact', '').strip().lower()
|
|
if txt in seen:
|
|
results["warnings"].append(f"Duplicate fact text: {txt[:80]}... IDs: {seen[txt]}, {f.get('id')}")
|
|
else:
|
|
seen[txt] = f.get('id')
|
|
|
|
# 3. Contradictions
|
|
contradictions = detect_contradictions(index)
|
|
for c in contradictions:
|
|
results["warnings"].append(
|
|
f"Potential contradiction in {c['domain']}/{c['category']}: "
|
|
f"{c['fact_a']} vs {c['fact_b']} (similarity={c['similarity']:.2f})"
|
|
)
|
|
|
|
return results
|
|
|
|
# ---------- Subcommands ----------
|
|
|
|
def cmd_query(args):
|
|
"""Query the wiki: retrieve + synthesize."""
|
|
if not INDEX_PATH.exists():
|
|
print("ERROR: knowledge/index.json not found. Run ingest first.", file=sys.stderr)
|
|
return 1
|
|
|
|
query = args.query
|
|
top_k = args.top or 10
|
|
|
|
facts = retrieve_facts(query, limit=top_k)
|
|
if not facts:
|
|
print("No relevant facts found in knowledge base.")
|
|
return 0
|
|
|
|
print(f"→ Retrieved {len(facts)} facts:")
|
|
for i, f in enumerate(facts, 1):
|
|
fid = f.get('id', '?')
|
|
print(f" [{i}] {fid}: {f.get('fact', '')[:90]}")
|
|
|
|
if args.dry_run:
|
|
print("\n[dry-run] Skipping LLM synthesis.")
|
|
return 0
|
|
|
|
api_key = find_api_key()
|
|
if not api_key:
|
|
print("ERROR: No API key. Set HARVESTER_API_KEY or OPENROUTER_API_KEY.", file=sys.stderr)
|
|
return 1
|
|
|
|
api_base = os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
|
|
model = os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
|
|
|
|
context = format_facts_as_context(facts)
|
|
answer = call_llm_synthesize(query, context, api_base, api_key, model)
|
|
|
|
print(f"\n← Answer: {answer}")
|
|
return 0
|
|
|
|
def cmd_ingest(args):
|
|
"""Ingest knowledge from a session transcript."""
|
|
session = args.session
|
|
if not os.path.exists(session):
|
|
print(f"ERROR: Session file not found: {session}", file=sys.stderr)
|
|
return 1
|
|
|
|
harvester = SCRIPT_DIR / "harvester.py"
|
|
if not harvester.exists():
|
|
print("ERROR: harvester.py not found", file=sys.stderr)
|
|
return 1
|
|
|
|
cmd = [sys.executable, str(harvester), "--session", session, "--output", str(KNOWLEDGE_DIR)]
|
|
if args.dry_run:
|
|
cmd.append("--dry-run")
|
|
|
|
env = os.environ.copy()
|
|
env["PYTHONPATH"] = str(REPO_ROOT)
|
|
|
|
result = subprocess.run(cmd, env=env)
|
|
return result.returncode
|
|
|
|
def cmd_lint(args):
|
|
"""Lint the knowledge base for quality issues."""
|
|
results = lint_knowledge()
|
|
|
|
if results["errors"]:
|
|
print("ERRORS:")
|
|
for e in results["errors"]:
|
|
print(f" ✗ {e}")
|
|
return 1
|
|
|
|
if results["warnings"]:
|
|
print(f"WARNINGS ({len(results['warnings'])}):")
|
|
for w in results["warnings"]:
|
|
print(f" ⚠ {w}")
|
|
else:
|
|
print("✓ No lint issues found. Knowledge base is clean.")
|
|
|
|
return 0 if not results["errors"] else 1
|
|
|
|
def cmd_crystallize(args):
|
|
"""Alias for ingest — session crystallization."""
|
|
return cmd_ingest(args)
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="LLM Wiki layer — ingest, query, lint, crystallize",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
python3 scripts/wiki.py query "How do I fix cron timeouts?"
|
|
python3 scripts/wiki.py ingest --session ~/.hermes/sessions/abc.jsonl
|
|
python3 scripts/wiki.py lint
|
|
python3 scripts/wiki.py crystal --session session.jsonl
|
|
"""
|
|
)
|
|
sub = parser.add_subparsers(dest="command", help="Wiki command")
|
|
|
|
qp = sub.add_parser("query", help="Ask the wiki a question (RAG + synthesis)")
|
|
qp.add_argument("query", help="Natural language question")
|
|
qp.add_argument("--top", type=int, default=10, help="Number of facts to retrieve")
|
|
qp.add_argument("--dry-run", action="store_true", help="Show retrieval but skip LLM")
|
|
qp.set_defaults(func=cmd_query)
|
|
|
|
ip = sub.add_parser("ingest", help="Ingest a session transcript into knowledge")
|
|
ip.add_argument("--session", required=True, help="Path to session JSONL file")
|
|
ip.add_argument("--dry-run", action="store_true", help="Preview without writing")
|
|
ip.set_defaults(func=cmd_ingest)
|
|
|
|
lp = sub.add_parser("lint", help="Check knowledge base for issues")
|
|
lp.set_defaults(func=cmd_lint)
|
|
|
|
cp = sub.add_parser("crystal", help="Crystallize a session into durable pages")
|
|
cp.add_argument("--session", required=True, help="Path to session JSONL file")
|
|
cp.add_argument("--dry-run", action="store_true", help="Preview without writing")
|
|
cp.set_defaults(func=cmd_crystallize)
|
|
|
|
args = parser.parse_args()
|
|
if not args.command:
|
|
parser.print_help()
|
|
return 1
|
|
|
|
return args.func(args)
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|