Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Payne
646588a5c8 [BURN #197] feat: provenance chain — add source_session, source_model, source_provider, timestamp to facts
Some checks failed
Test / pytest (pull_request) Failing after 30s
- Adds schemas/provenance.json defining provenance object schema
- Updates harvester.py write_knowledge() to attach provenance to every fact
- Adds extract_provider() helper to infer provider from API base URL
- Updates SCHEMA.md documenting provenance field and object
- Provenance fields: source_session, source_model, source_provider,
  timestamp (harvested_at), extraction_method, confidence, verified
- Does NOT retroactively modify existing facts in index.json
Closes #197
2026-04-25 20:56:13 -04:00
5 changed files with 195 additions and 161 deletions

View File

@@ -43,9 +43,26 @@ The harvester writes to both. The bootstrapper reads from index.json. Humans edi
| `last_confirmed` | date | no | ISO-8601 date last seen in a session |
| `expires` | date | no | Optional. After this date, fact is stale |
| `related` | string[] | no | IDs of related facts |
| `provenance` | object | no | Provenance metadata — see Provenance Object section below |
### ID Format: `{domain}:{category}:{sequence}`
### Provenance Object
Every fact may include a [`provenance`](#fact-object) field that tracks its origin.
| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `source_session` | string | yes | Session ID / file path where this fact was extracted |
| `source_model` | string | yes | Model name used for extraction (e.g., `xiaomi/mimo-v2-pro`) |
| `source_provider` | string | yes | Provider name (`nous`, `openrouter`, `anthropic`, `openai`, etc.) |
| `timestamp` | date-time | yes | Extraction timestamp (ISO-8601 UTC) |
| `extraction_method` | enum | yes | `llm_extraction`, `manual`, or `retroactive_harvest` |
| `confidence` | float | yes | Confidence at extraction time (0.01.0) |
| `verified` | boolean | yes | `true` if fact has been manually reviewed, else `false` |
### Categories
| Category | Definition |
@@ -85,6 +102,35 @@ knowledge/
└── {agent-type}.yaml
```
### Provenance Object (added via `write_knowledge()` and harvester)
```json
{
"source_session": "string — session ID or file path",
"source_model": "string — model used for extraction",
"source_provider": "string — provider name (nous, openrouter, etc.)",
"timestamp": "string — ISO-8601 UTC extraction time",
"extraction_method": "string — llm_extraction|manual|retroactive_harvest",
"confidence": "float — 0.01.0 confidence from extraction",
"verified": "boolean — whether fact has been manually verified"
}
```
The `provenance` field is attached to every fact harvested via `write_knowledge()`. It provides traceability: which session produced this fact, which model/provider extracted it, when, and with what confidence.
| Provenance Field | Type | Required | Description |
|------------------|------|----------|-------------|
| `source_session` | string | yes | Session ID / file path where extracted |
| `source_model` | string | yes | Model name (e.g., `xiaomi/mimo-v2-pro`) |
| `source_provider` | string | yes | Provider (`nous`, `openrouter`, `anthropic`, `openai`) |
| `timestamp` | date-time | yes | Extraction timestamp (ISO-8601) |
| `extraction_method` | enum | yes | `llm_extraction`, `manual`, or `retroactive_harvest` |
| `confidence` | float | yes | Confidence score (0.01.0) at extraction time |
| `verified` | boolean | yes | `true` if manually reviewed, else `false` |
## YAML File Format
YAML files use frontmatter for metadata, then markdown sections with fact entries:

52
schemas/provenance.json Normal file
View File

@@ -0,0 +1,52 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Knowledge Provenance",
"description": "Provenance metadata attached to every knowledge fact",
"type": "object",
"required": [
"source_session",
"source_model",
"source_provider",
"timestamp"
],
"properties": {
"source_session": {
"type": "string",
"description": "Session ID or file path where this fact was extracted"
},
"source_model": {
"type": "string",
"description": "Model used for extraction (e.g., 'xiaomi/mimo-v2-pro')"
},
"source_provider": {
"type": "string",
"description": "Provider name (nous, openrouter, anthropic, etc.)"
},
"timestamp": {
"type": "string",
"format": "date-time",
"description": "UTC ISO-8601 timestamp when this fact was extracted"
},
"extraction_method": {
"type": "string",
"description": "How the fact was extracted (llm_extraction, manual, retroactive_harvest)",
"enum": [
"llm_extraction",
"manual",
"retroactive_harvest"
],
"default": "llm_extraction"
},
"confidence": {
"type": "number",
"minimum": 0,
"maximum": 1,
"description": "Confidence assigned during extraction (copied from top-level fact)"
},
"verified": {
"type": "boolean",
"description": "Whether this fact has been manually verified",
"default": false
}
}
}

View File

@@ -27,6 +27,22 @@ sys.path.insert(0, str(SCRIPT_DIR))
from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text
def extract_provider(api_base: str) -> str:
"""Infer provider name from API base URL."""
url = api_base.lower()
if 'nousresearch' in url or 'nous' in url:
return 'nous'
if 'openrouter' in url:
return 'openrouter'
if 'anthropic' in url:
return 'anthropic'
if 'openai' in url:
return 'openai'
# Fallback: try to extract hostname
from urllib.parse import urlparse
host = urlparse(api_base).netloc
return host.split('.')[0] if host else 'unknown'
# --- Configuration ---
DEFAULT_API_BASE = os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
@@ -229,15 +245,34 @@ def validate_fact(fact: dict) -> bool:
return True
def write_knowledge(index: dict, new_facts: list[dict], knowledge_dir: str, source_session: str = ""):
"""Write new facts to the knowledge store."""
def write_knowledge(index: dict, new_facts: list[dict], knowledge_dir: str, source_session: str = "", model: str = "", provider: str = ""):
"""Write new facts to the knowledge store.
Adds provenance metadata to each fact. If model/provider are empty, tries to
infer from environment or defaults.
"""
kdir = Path(knowledge_dir)
kdir.mkdir(parents=True, exist_ok=True)
# Add source tracking to each fact
# Determine model/provider defaults if not provided
model = model or os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
provider = provider or os.environ.get("HARVESTER_PROVIDER", "nous")
timestamp = datetime.now(timezone.utc).isoformat()
# Add provenance to each fact
for fact in new_facts:
fact['source_session'] = source_session
fact['harvested_at'] = datetime.now(timezone.utc).isoformat()
provenance = {
'source_session': source_session,
'source_model': model,
'source_provider': provider,
'timestamp': timestamp,
'extraction_method': 'llm_extraction',
'confidence': fact.get('confidence', 0.5),
'verified': False
}
fact['provenance'] = provenance
fact['harvested_at'] = timestamp
# Update index
index['facts'].extend(new_facts)
@@ -330,7 +365,7 @@ def harvest_session(session_path: str, knowledge_dir: str, api_base: str, api_ke
# 8. Write (unless dry run)
if new_facts and not dry_run:
write_knowledge(existing_index, new_facts, knowledge_dir, source_session=session_path)
write_knowledge(existing_index, new_facts, knowledge_dir, source_session=session_path, model=model, provider=extract_provider(api_base))
stats['elapsed_seconds'] = round(time.time() - start_time, 2)
return stats

View File

@@ -22,95 +22,114 @@ import sys
from pathlib import Path
from typing import Optional
from session_reader import extract_conversation, read_session
def compute_hash(text: str) -> str:
"""Content hash for deduplication."""
return hashlib.sha256(text.encode()).hexdigest()[:16]
def extract_pairs_from_conversation(conversation: list, session_id: str, model: str,
min_ratio: float = 1.5,
def extract_pairs_from_session(session_data: dict, min_ratio: float = 1.5,
min_response_words: int = 20) -> list:
"""Extract terse→rich pairs from a normalized conversation."""
"""Extract terse→rich pairs from a single session object."""
pairs = []
conversations = session_data.get("conversations", [])
session_id = session_data.get("id", "unknown")
model = session_data.get("model", "unknown")
seen_hashes = set()
for i, msg in enumerate(conversation):
# Look for assistant responses
if msg.get('role') != 'assistant':
for i, msg in enumerate(conversations):
# Look for assistant/gpt responses
if msg.get("from") not in ("gpt", "assistant"):
continue
response_text = msg.get('content', '')
response_text = msg.get("value", "")
if not response_text or len(response_text.split()) < min_response_words:
continue
# Find the preceding user message
# Find the preceding human message
prompt_text = ""
for j in range(i - 1, -1, -1):
if conversation[j].get('role') == 'user':
prompt_text = conversation[j].get('content', '')
if conversations[j].get("from") == "human":
prompt_text = conversations[j].get("value", "")
break
if not prompt_text:
continue
# Filter: skip tool results, system messages embedded as human
if prompt_text.startswith('{') and 'output' in prompt_text[:100]:
continue
if prompt_text.startswith('# SOUL.md') or prompt_text.startswith('You are'):
continue
if prompt_text.startswith("{") and "output" in prompt_text[:100]:
continue # likely a tool result
if prompt_text.startswith("# SOUL.md") or prompt_text.startswith("You are"):
continue # system prompt leak
# Quality filters
prompt_words = len(prompt_text.split())
response_words = len(response_text.split())
# Must have meaningful length ratio
if prompt_words == 0 or response_words == 0:
continue
ratio = response_words / prompt_words
if ratio < min_ratio:
continue
code_blocks = response_text.count('```')
if code_blocks >= 4 and len(response_text.replace('```', '').strip()) < 50:
# Skip responses that are mostly code
code_blocks = response_text.count("```")
if code_blocks >= 4 and len(response_text.replace("```", "").strip()) < 50:
continue
if 'tool_call' in response_text[:100] or 'function_call' in response_text[:100]:
# Skip responses with tool call artifacts
if "tool_call" in response_text[:100] or "function_call" in response_text[:100]:
continue
# Deduplicate by content hash
content_hash = compute_hash(prompt_text + response_text[:200])
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Clean up response: remove markdown headers if too many
clean_response = response_text
pairs.append({
'terse': prompt_text.strip(),
'rich': clean_response.strip(),
'source': session_id,
'model': model,
'prompt_words': prompt_words,
'response_words': response_words,
'ratio': round(ratio, 2),
"terse": prompt_text.strip(),
"rich": clean_response.strip(),
"source": session_id,
"model": model,
"prompt_words": prompt_words,
"response_words": response_words,
"ratio": round(ratio, 2),
})
return pairs
def extract_from_jsonl_file(filepath: str, **kwargs) -> list:
"""Extract pairs from a session JSONL file."""
pairs = []
path = Path(filepath)
def extract_from_jsonl_file(path: str, **kwargs) -> list:
"""Read a session file and extract training pairs using normalized conversation."""
session_messages = read_session(path)
if not session_messages:
return []
conversation = extract_conversation(session_messages)
# Derive session_id and model from first real message metadata
first_msg = next((m for m in session_messages if m.get('role') or m.get('from')), {})
session_id = first_msg.get('meta_session_id', Path(path).name)
model = first_msg.get('model', 'unknown')
return extract_pairs_from_conversation(conversation, session_id, model, **kwargs)
if not path.exists():
print(f"Warning: {filepath} not found", file=sys.stderr)
return pairs
content = path.read_text()
lines = content.strip().split("\n")
for line in lines:
line = line.strip()
if not line:
continue
try:
session = json.loads(line)
except json.JSONDecodeError:
continue
session_pairs = extract_pairs_from_session(session, **kwargs)
pairs.extend(session_pairs)
return pairs
def deduplicate_pairs(pairs: list) -> list:

View File

@@ -1,118 +0,0 @@
"""
Tests for session_pair_harvester — training pair extraction from sessions.
"""
import json
import tempfile
import unittest
from pathlib import Path
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from session_pair_harvester import (
extract_pairs_from_conversation,
extract_from_jsonl_file,
deduplicate_pairs,
compute_hash,
)
class TestSessionPairHarvester(unittest.TestCase):
def test_compute_hash_consistent(self):
h1 = compute_hash("hello world")
h2 = compute_hash("hello world")
self.assertEqual(h1, h2)
self.assertEqual(len(h1), 16)
def test_extract_simple_qa_pair(self):
"""A simple user→assistant exchange produces one pair."""
conversation = [
{"role": "user", "content": "What is the capital of France?"},
{"role": "assistant", "content": "The capital of France is Paris. It is a major European city renowned for its art, fashion, gastronomy, cultural heritage, and historical significance. The city attracts millions of tourists annually."},
]
pairs = extract_pairs_from_conversation(conversation, "test_session", "test-model")
self.assertEqual(len(pairs), 1)
self.assertEqual(pairs[0]["terse"], "What is the capital of France?")
self.assertIn("Paris", pairs[0]["rich"])
self.assertEqual(pairs[0]["source"], "test_session")
def test_min_ratio_filter(self):
"""Very short responses are filtered out."""
conversation = [
{"role": "user", "content": "Yes"},
{"role": "assistant", "content": "No."},
]
# Default min_ratio = 1.5, min_words = 20 for response
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=3)
self.assertEqual(len(pairs), 0)
def test_min_words_filter(self):
"""Assistant responses below min word count are skipped."""
conversation = [
{"role": "user", "content": "Explain the project architecture in detail"},
{"role": "assistant", "content": "OK."},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=5)
self.assertEqual(len(pairs), 0)
def test_skip_non_assistant_messages(self):
"""System and tool messages are ignored."""
conversation = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there! How can I help you today?"},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=3)
self.assertEqual(len(pairs), 1)
self.assertEqual(pairs[0]["terse"], "Hello")
def test_multiple_pairs_from_one_session(self):
"""A conversation with several Q&A turns yields multiple pairs."""
conversation = [
{"role": "user", "content": "First question?"},
{"role": "assistant", "content": "Here is a detailed and comprehensive answer that thoroughly explores multiple aspects of the subject. It provides background context and practical implications for the reader."},
{"role": "user", "content": "Second?"},
{"role": "assistant", "content": "Another comprehensive response with detailed examples. This includes practical code blocks and thorough explanations to ensure deep understanding of the topic at hand."},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_ratio=1.0)
self.assertEqual(len(pairs), 2)
def test_deduplication_removes_duplicates(self):
"""Identical pairs across sessions are deduplicated."""
pairs = [
{"terse": "q1", "rich": "a1", "source": "s1", "model": "m"},
{"terse": "q1", "rich": "a1", "source": "s2", "model": "m"},
{"terse": "q2", "rich": "a2", "source": "s1", "model": "m"},
]
unique = deduplicate_pairs(pairs)
self.assertEqual(len(unique), 2)
sources = {p["source"] for p in unique}
# First unique pair can be from either s1 or s2
self.assertIn("s1", sources)
def test_integration_with_test_sessions(self):
"""Harvester finds pairs in real test session files."""
repo_root = Path(__file__).parent.parent
test_sessions_dir = repo_root / "test_sessions"
if not test_sessions_dir.exists():
self.skipTest("test_sessions not found")
pairs = []
for jsonl_file in sorted(test_sessions_dir.glob("*.jsonl")):
pairs.extend(extract_from_jsonl_file(str(jsonl_file)))
self.assertGreater(len(pairs), 0, "Should extract at least one pair from test_sessions")
for p in pairs:
self.assertIn("terse", p)
self.assertIn("rich", p)
self.assertIn("source", p)
self.assertIn("model", p)
# Verify content exists
self.assertGreater(len(p["terse"]), 0)
self.assertGreater(len(p["rich"]), 0)
if __name__ == "__main__":
unittest.main()