Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy
bf003cd944 feat: measurer.py — compounding intelligence metrics engine
Implements issue #14: 7 metrics that prove knowledge compounding.

Metrics:
- Knowledge velocity: new facts/day (from index.json)
- Knowledge coverage: % domains with 10+ facts (from YAML files)
- Hit rate: % sessions referencing bootstrap knowledge
- Error recurrence: same errors across sessions (should decrease)
- Task completion: % sessions with successful end_reason
- First-try success: actions without backtracking (tool/msg ratio)
- Knowledge age: staleness of facts (freshness score)

Data sources:
- knowledge/index.json + YAML files for fact metrics
- ~/.hermes/state.db sessions + messages tables

Features:
- JSON and markdown output formats
- --since, --repo, --format flags
- 7-day trend tracking via snapshot persistence
- Runs in 33ms on 11.9K sessions / 192K messages
- Dashboard auto-generation with --save-snapshot

Closes #14
2026-04-14 14:16:31 -04:00
12 changed files with 1135 additions and 751 deletions

164
knowledge/SCHEMA.md Normal file
View File

@@ -0,0 +1,164 @@
# Knowledge File Format Specification
**Version:** 1
**Issue:** #10
**Status:** Draft
---
## Overview
The knowledge system has two layers:
1. **index.json** — Machine-readable fact index. Fast lookups by ID, category, repo, tags.
2. **Knowledge files** (YAML) — Human-readable, editable facts organized by domain.
The harvester writes to both. The bootstrapper reads from index.json. Humans edit the YAML files directly.
---
## index.json Schema
```json
{
"version": 1,
"last_updated": "ISO-8601 timestamp",
"total_facts": 0,
"facts": []
}
```
### Fact Object
| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `id` | string | yes | Unique identifier: `{domain}:{category}:{sequence}` |
| `fact` | string | yes | One-sentence description of the knowledge |
| `category` | enum | yes | One of: `fact`, `pitfall`, `pattern`, `tool-quirk`, `question` |
| `domain` | string | yes | Where this applies: repo name, `global`, or agent name |
| `confidence` | float | yes | 0.0-1.0. How certain is this knowledge? |
| `tags` | string[] | no | Searchable labels: `["git", "auth", "gitea"]` |
| `source_count` | int | no | How many sessions confirmed this fact |
| `first_seen` | date | no | ISO-8601 date first extracted |
| `last_confirmed` | date | no | ISO-8601 date last seen in a session |
| `expires` | date | no | Optional. After this date, fact is stale |
| `related` | string[] | no | IDs of related facts |
### ID Format
```
{domain}:{category}:{sequence}
```
- `domain` — repo name, `global`, or agent type
- `category` — one of the 5 categories
- `sequence` — zero-padded 3-digit number: `001`, `002`, ...
Examples:
- `the-nexus:pitfall:001`
- `global:tool-quirk:012`
- `hermes-agent:pattern:003`
### Categories
| Category | Definition | Example |
|----------|------------|---------|
| `fact` | Concrete, verifiable information | "Gitea API requires token auth at /api/v1" |
| `pitfall` | Errors, wrong assumptions, time-wasters | "Assumed env var GITEA_TOKEN; actual path is ~/.config/gitea/token" |
| `pattern` | Successful sequences of actions | "To deploy: test -> build -> push -> webhook" |
| `tool-quirk` | Environment-specific behaviors | "URL format requires trailing slash on macOS" |
| `question` | Identified but unanswered | "Need optimal batch size for harvesting" |
### Confidence Scoring
| Range | Meaning |
|-------|---------|
| 0.9-1.0 | Explicitly stated and verified |
| 0.7-0.8 | Clearly implied by multiple data points |
| 0.5-0.6 | Suggested but not fully verified |
| 0.3-0.4 | Inferred from limited data |
| 0.1-0.2 | Speculative or uncertain |
---
## Knowledge Files (YAML)
Human-readable files stored in `knowledge/` subdirectories.
### Directory Structure
```
knowledge/
├── index.json # Machine-readable fact index
├── SCHEMA.md # This file
├── global/ # Cross-repo knowledge
│ ├── pitfalls.yaml # Pitfalls that span multiple repos
│ ├── patterns.yaml # Proven workflows
│ └── tool-quirks.yaml # Environment behaviors
├── repos/ # Per-repo knowledge
│ ├── the-nexus.yaml
│ ├── hermes-agent.yaml
│ └── ...
└── agents/ # Agent-type knowledge
├── mimo-sprint.yaml
└── ...
```
### YAML File Format
```yaml
---
domain: global # or repo name or agent name
category: tool-quirk # fact, pitfall, pattern, tool-quirk, question
version: 1
last_updated: "2026-04-13"
---
# Tool Quirks (Global)
Cross-environment behaviors that bite you if you don't know them.
## Authentication
- id: global:tool-quirk:001
fact: "Gitea token stored at ~/.config/gitea/token, not env var"
confidence: 0.95
tags: [git, auth, gitea]
source_count: 23
first_seen: "2026-03-27"
last_confirmed: "2026-04-13"
related: [global:pitfall:003]
```
### Rules
1. **One file per domain per category.** `repos/the-nexus.yaml` holds all the-nexus facts.
2. **Markdown sections for humans.** YAML items live under markdown headers for Gitea UI readability.
3. **ID is the link.** The `id` field connects YAML facts to index.json entries.
4. **Harvester writes, humans edit.** Harvester appends. Humans correct confidence, add tags, mark expired.
---
## Sync Rules
1. **Harvester -> YAML:** Appends new facts to the appropriate YAML file.
2. **Harvester -> index.json:** Adds/updates fact entries.
3. **Human edits YAML:** Changes propagate to index.json on next harvester run.
4. **Confidence decay:** Facts not confirmed in 30+ sessions get confidence *= 0.9.
5. **Expiration:** Facts with `expires` date past current date are marked `stale`.
---
## Validation
Facts must pass these checks:
1. `id` matches format `{domain}:{category}:{sequence}`
2. `category` is one of the 5 allowed values
3. `confidence` is between 0.0 and 1.0
4. `fact` is non-empty string, max 280 characters
5. `domain` is non-empty string
6. `tags` are lowercase alphanumeric + hyphens
7. No duplicate IDs in index.json
Validation script: `scripts/validate_knowledge.py`

View File

@@ -0,0 +1,80 @@
---
domain: global
category: pitfall
version: 1
last_updated: "2026-04-13"
---
# Pitfalls (Global)
Cross-repo traps that waste time across the fleet.
## Git & Forge
- id: global:pitfall:001
fact: "Branch protection requires 1 approval on main - API merges fail with 405 without it"
confidence: 0.95
tags: [git, merge, branch-protection, gitea]
source_count: 12
first_seen: "2026-04-05"
last_confirmed: "2026-04-13"
related: [the-nexus:pitfall:001]
- id: global:pitfall:002
fact: "Never use --no-verify on git commits - it bypasses all hooks including safety checks"
confidence: 0.95
tags: [git, hooks, safety]
source_count: 5
first_seen: "2026-03-28"
last_confirmed: "2026-04-13"
- id: global:pitfall:003
fact: "Gitea PR creation workaround needed on the-nexus - direct API call fails, use alternative endpoint"
confidence: 0.9
tags: [gitea, pr, api, workaround]
source_count: 4
first_seen: "2026-04-06"
last_confirmed: "2026-04-12"
## Agent Operations
- id: global:pitfall:004
fact: "Anthropic is BANNED from fallback chain - if fallback triggers to Anthropic, something is wrong"
confidence: 0.95
tags: [provider, anthropic, fallback]
source_count: 7
first_seen: "2026-03-30"
last_confirmed: "2026-04-13"
- id: global:pitfall:005
fact: "Telegram tokens expired - don't assume Telegram notifications work without checking"
confidence: 0.85
tags: [telegram, notifications, token]
source_count: 3
first_seen: "2026-04-02"
- id: global:pitfall:006
fact: "Multiple gateways = 'cannot schedule futures' error - only one gateway process should run"
confidence: 0.9
tags: [gateway, cron, process]
source_count: 4
first_seen: "2026-04-04"
last_confirmed: "2026-04-11"
## Testing
- id: global:pitfall:007
fact: "pytest root collection picks up operational *_test.py scripts - restrict to tests/ directory"
confidence: 0.9
tags: [pytest, test, collection]
source_count: 3
first_seen: "2026-04-07"
last_confirmed: "2026-04-13"
- id: global:pitfall:008
fact: "TDD: test 1 before building 55 - verify the cycle works before scaling"
confidence: 0.95
tags: [tdd, testing, methodology]
source_count: 8
first_seen: "2026-03-25"
last_confirmed: "2026-04-13"

View File

@@ -0,0 +1,73 @@
---
domain: global
category: tool-quirk
version: 1
last_updated: "2026-04-13"
---
# Tool Quirks (Global)
Cross-environment behaviors that bite you if you don't know them.
## Authentication
- id: global:tool-quirk:001
fact: "Gitea token stored at ~/.config/gitea/token, not env var GITEA_TOKEN"
confidence: 0.95
tags: [git, auth, gitea, token]
source_count: 23
first_seen: "2026-03-27"
last_confirmed: "2026-04-13"
related: [global:pitfall:001]
- id: global:tool-quirk:002
fact: "Gitea API uses 'Authorization: token TOKEN' header format, not Bearer"
confidence: 0.9
tags: [git, api, gitea]
source_count: 8
first_seen: "2026-03-28"
last_confirmed: "2026-04-12"
- id: global:tool-quirk:003
fact: "Gitea Issues API type=issues param does NOT filter PRs - use truthiness check on pull_request field"
confidence: 0.95
tags: [gitea, api, issues, pr]
source_count: 6
first_seen: "2026-04-01"
last_confirmed: "2026-04-13"
## Paths & Environment
- id: global:tool-quirk:004
fact: "~/.hermes is the default hermes home - check get_hermes_home() not the path literal"
confidence: 0.9
tags: [paths, hermes, env]
source_count: 10
first_seen: "2026-03-30"
last_confirmed: "2026-04-13"
related: [hermes-agent:pitfall:005]
- id: global:tool-quirk:005
fact: "Ansible vault-encrypted vars in YAML require vault_inline_vars plugin - standard ansible-vault fails"
confidence: 0.85
tags: [ansible, vault, config]
source_count: 3
first_seen: "2026-04-02"
## Model & Inference
- id: global:tool-quirk:006
fact: "mimo-v2-pro via Nous Research is the default model - don't assume Anthropic is available"
confidence: 0.95
tags: [model, provider, nous, default]
source_count: 15
first_seen: "2026-03-25"
last_confirmed: "2026-04-13"
- id: global:tool-quirk:007
fact: "Kill + restart with 'hermes chat' preserves old model state - NEVER use --resume"
confidence: 0.95
tags: [hermes, model, restart, session]
source_count: 8
first_seen: "2026-03-29"
last_confirmed: "2026-04-12"

View File

@@ -0,0 +1,82 @@
---
domain: hermes-agent
category: pitfall
version: 1
last_updated: "2026-04-13"
---
# Pitfalls (hermes-agent)
Things that go wrong in this repo if you don't know the traps.
## Cron & Deployment
- id: hermes-agent:pitfall:001
fact: "deploy-crons.py leaves jobs in mixed model format - some have provider/model, some just model"
confidence: 0.95
tags: [cron, deploy, model, config]
source_count: 5
first_seen: "2026-04-08"
last_confirmed: "2026-04-13"
related: [hermes-agent:pitfall:002, hermes-agent:pitfall:003]
- id: hermes-agent:pitfall:002
fact: "deploy-crons.py --deploy doesn't set legacy skill field from skills list, breaking older jobs"
confidence: 0.9
tags: [cron, deploy, skills]
source_count: 3
first_seen: "2026-04-09"
last_confirmed: "2026-04-13"
related: [hermes-agent:pitfall:001]
- id: hermes-agent:pitfall:003
fact: "Cron jobs with blank fallback_model fields trigger spurious gateway warnings"
confidence: 0.9
tags: [cron, model, fallback]
source_count: 4
first_seen: "2026-04-07"
last_confirmed: "2026-04-12"
related: [hermes-agent:pitfall:001]
- id: hermes-agent:pitfall:004
fact: "model-watchdog.py checks first provider line, not model.provider - causes false drift alarms"
confidence: 0.9
tags: [watchdog, model, config]
source_count: 3
first_seen: "2026-04-08"
last_confirmed: "2026-04-13"
## Path & Environment
- id: hermes-agent:pitfall:005
fact: "10+ files read HERMES_HOME directly instead of get_hermes_home() - breaks on custom paths"
confidence: 0.85
tags: [paths, env, hermes-home]
source_count: 6
first_seen: "2026-04-06"
last_confirmed: "2026-04-12"
related: [global:pitfall:002]
- id: hermes-agent:pitfall:006
fact: "get_hermes_home() doesn't expand tilde when HERMES_HOME=~/... is set"
confidence: 0.8
tags: [paths, env, bug]
source_count: 2
first_seen: "2026-04-05"
## SSH & Dispatch
- id: hermes-agent:pitfall:007
fact: "vps-agent-dispatch reports OK while remote hermes binary path is broken"
confidence: 0.9
tags: [ssh, dispatch, vps]
source_count: 4
first_seen: "2026-04-07"
last_confirmed: "2026-04-11"
- id: hermes-agent:pitfall:008
fact: "nightwatch-health-monitor SSH check fails on cloud-model-only deployments"
confidence: 0.85
tags: [ssh, health, cloud]
source_count: 2
first_seen: "2026-04-10"

View File

@@ -0,0 +1,65 @@
---
domain: the-nexus
category: pitfall
version: 1
last_updated: "2026-04-13"
---
# Pitfalls (the-nexus)
Things that go wrong in this repo if you don't know the traps.
## Git & Merging
- id: the-nexus:pitfall:001
fact: "Merges fail with HTTP 405 due to branch protection - must use merge API with 1 approval"
confidence: 0.95
tags: [git, merge, branch-protection, gitea]
source_count: 12
first_seen: "2026-04-05"
last_confirmed: "2026-04-13"
related: [global:pitfall:001]
- id: the-nexus:pitfall:002
fact: "ThreadingHTTPServer required for multi-user bridge - standard HTTPServer blocks on concurrent requests"
confidence: 0.95
tags: [server, concurrency, bridge]
source_count: 5
first_seen: "2026-04-10"
last_confirmed: "2026-04-13"
- id: the-nexus:pitfall:003
fact: "ChatLog.log() crashes on message persistence when index.html has orphaned button tags"
confidence: 0.9
tags: [html, crash, chatlog]
source_count: 3
first_seen: "2026-04-12"
last_confirmed: "2026-04-13"
## Three.js & Performance
- id: the-nexus:pitfall:004
fact: "Three.js LOD not implemented - local hardware struggles with full scene without texture optimization"
confidence: 0.85
tags: [threejs, performance, lod]
source_count: 4
first_seen: "2026-04-09"
last_confirmed: "2026-04-13"
- id: the-nexus:pitfall:005
fact: "Duplicate content blocks appear in index.html when PR merges conflict silently"
confidence: 0.8
tags: [html, merge-conflict, duplicate]
source_count: 3
first_seen: "2026-04-11"
last_confirmed: "2026-04-13"
## Deployment
- id: the-nexus:pitfall:006
fact: "Unified HTTP + WebSocket server required for proper URL deployment - separate servers break CORS"
confidence: 0.9
tags: [deploy, websocket, http, cors]
source_count: 4
first_seen: "2026-04-10"
last_confirmed: "2026-04-13"

61
metrics/dashboard.md Normal file
View File

@@ -0,0 +1,61 @@
# Compounding Intelligence Metrics
**Generated:** 2026-04-14T18:12:26.469085+00:00
## knowledge_velocity
New facts extracted per day. Higher = compounding loop working.
**Value:** 1.61 | **7d trend:** N/A --- (unknown)
- total_facts: 29
- period_days: 18
- new_facts: 29
## knowledge_coverage
Percentage of domains/repos with 10+ facts. Measures breadth.
**Value:** 0.333 | **7d trend:** N/A --- (unknown)
- covered_domains: 1
- total_domains: 3
## hit_rate
Percentage of sessions referencing bootstrapped knowledge.
**Value:** 0.676 | **7d trend:** N/A --- (unknown)
- hit_sessions: 8058
- total_sessions: 11922
## error_recurrence
Ratio of recurring errors. Lower = fleet learning from mistakes.
**Value:** 0.17 | **7d trend:** N/A --- (unknown)
- unique_errors: 53615
- recurring_errors: 9093
## task_completion
Percentage of sessions ending with successful completion.
**Value:** 0.452 | **7d trend:** N/A --- (unknown)
- normal_end_rate: 0.56
- completed: 5385
- total: 11922
## first_try_success
Percentage of sessions completed without backtracking.
**Value:** 0.818 | **7d trend:** N/A --- (unknown)
- avg_tool_msg_ratio: 0.392
- sampled: 5923
## knowledge_age
Freshness of knowledge store. 1.0 = all fresh, 0.0 = all stale.
**Value:** 0.973 | **7d trend:** N/A --- (unknown)
- avg_age_days: 2.4
- stale_facts: 0
- total_facts: 29

View File

@@ -0,0 +1,127 @@
{
"generated_at": "2026-04-14T18:12:26.469085+00:00",
"knowledge_velocity": {
"value": 1.61,
"total_facts": 29,
"period_days": 18,
"new_facts": 29
},
"knowledge_coverage": {
"value": 0.333,
"covered_domains": 1,
"total_domains": 3,
"domain_details": {
"global": 15,
"hermes-agent": 8,
"the-nexus": 6
}
},
"hit_rate": {
"value": 0.676,
"hit_sessions": 8058,
"total_sessions": 11922
},
"error_recurrence": {
"value": 0.17,
"unique_errors": 53615,
"recurring_errors": 9093,
"top_errors": [
{
"error": "s, report the error details.",
"sessions": 1185
},
{
"error": "\": \"traceback (most recent call last):\\n file \\\"/private/var/folders/9k/v07xkpp",
"sessions": 695
},
{
"error": "\", \"output\": \"\\n--- stderr ---\\ntraceback (most recent call last):\\n file \\\"/pr",
"sessions": 684
},
{
"error": "ures \u2192 file an issue with the traceback and tag [bug]",
"sessions": 320
},
{
"error": "s you encounter \u2192 file an issue with reproduction steps",
"sessions": 320
},
{
"error": "s, file a [bug] issue first.",
"sessions": 320
},
{
"error": ", fix the code.",
"sessions": 314
},
{
"error": "fix it before doing anything else.",
"sessions": 313
},
{
"error": "ures \u2192 add a review comment explaining what's wrong",
"sessions": 303
},
{
"error": "ures \u2014 they're your roadmap for guardrails",
"sessions": 303
}
]
},
"task_completion": {
"value": 0.452,
"normal_end_rate": 0.56,
"completed": 5385,
"total": 11922,
"breakdown": {
"cron_complete": 5354,
"unknown": 5248,
"compression": 1092,
"cli_close": 197,
"session_reset": 31
}
},
"first_try_success": {
"value": 0.818,
"avg_tool_msg_ratio": 0.392,
"sampled": 5923,
"interpretation": "Higher value = fewer backtracks = better first-try success"
},
"knowledge_age": {
"value": 0.973,
"avg_age_days": 2.4,
"stale_facts": 0,
"total_facts": 29,
"interpretation": "1.0 = all facts fresh. 0.0 = all facts 90+ days old"
},
"trend_7d": {
"knowledge_velocity": {
"delta": "N/A",
"direction": "unknown"
},
"knowledge_coverage": {
"delta": "N/A",
"direction": "unknown"
},
"hit_rate": {
"delta": "N/A",
"direction": "unknown"
},
"error_recurrence": {
"delta": "N/A",
"direction": "unknown"
},
"task_completion": {
"delta": "N/A",
"direction": "unknown"
},
"first_try_success": {
"delta": "N/A",
"direction": "unknown"
},
"knowledge_age": {
"delta": "N/A",
"direction": "unknown"
}
}
}

View File

@@ -1,447 +0,0 @@
#!/usr/bin/env python3
"""
harvester.py — Extract durable knowledge from Hermes session transcripts.
Combines session_reader + extraction prompt + LLM inference to pull
facts, pitfalls, patterns, and tool quirks from finished sessions.
Usage:
python3 harvester.py --session ~/.hermes/sessions/session_xxx.jsonl --output knowledge/
python3 harvester.py --batch --since 2026-04-01 --limit 100
python3 harvester.py --session session.jsonl --dry-run # Preview without writing
"""
import argparse
import json
import os
import sys
import time
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Add scripts dir to path for sibling imports
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text
# --- Configuration ---
DEFAULT_API_BASE = os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
DEFAULT_API_KEY = os.environ.get("HARVESTER_API_KEY", "")
DEFAULT_MODEL = os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
KNOWLEDGE_DIR = os.environ.get("HARVESTER_KNOWLEDGE_DIR", "knowledge")
PROMPT_PATH = os.environ.get("HARVESTER_PROMPT_PATH", str(SCRIPT_DIR.parent / "templates" / "harvest-prompt.md"))
# Where to look for API keys if not set via env
API_KEY_PATHS = [
os.path.expanduser("~/.config/nous/key"),
os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"),
os.path.expanduser("~/.config/openrouter/key"),
]
def find_api_key() -> str:
"""Find API key from common locations."""
for path in API_KEY_PATHS:
if os.path.exists(path):
with open(path) as f:
key = f.read().strip()
if key:
return key
return ""
def load_extraction_prompt() -> str:
"""Load the extraction prompt template."""
path = Path(PROMPT_PATH)
if not path.exists():
print(f"ERROR: Extraction prompt not found at {path}", file=sys.stderr)
print("Expected templates/harvest-prompt.md from issue #7", file=sys.stderr)
sys.exit(1)
return path.read_text(encoding='utf-8')
def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str) -> Optional[list[dict]]:
"""Call the LLM API to extract knowledge from a transcript."""
import urllib.request
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": f"Extract knowledge from this session transcript:\n\n{transcript}"}
]
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.1, # Low temp for consistent extraction
"max_tokens": 4096
}).encode('utf-8')
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read().decode('utf-8'))
content = result["choices"][0]["message"]["content"]
return parse_extraction_response(content)
except Exception as e:
print(f"ERROR: LLM API call failed: {e}", file=sys.stderr)
return None
def parse_extraction_response(content: str) -> Optional[list[dict]]:
"""Parse the LLM response to extract knowledge items.
Handles various response formats: raw JSON, markdown-wrapped JSON, etc.
"""
# Try direct JSON parse first
try:
data = json.loads(content)
if isinstance(data, dict) and 'knowledge' in data:
return data['knowledge']
if isinstance(data, list):
return data
except json.JSONDecodeError:
pass
# Try extracting JSON from markdown code blocks
import re
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group(1))
if isinstance(data, dict) and 'knowledge' in data:
return data['knowledge']
if isinstance(data, list):
return data
except json.JSONDecodeError:
pass
# Try finding any JSON object with knowledge array
json_match = re.search(r'({[^{}]*"knowledge"[^{}]*[[sS]*?][^{}]*})', content)
if json_match:
try:
data = json.loads(json_match.group(1))
return data.get('knowledge', [])
except json.JSONDecodeError:
pass
print(f"WARNING: Could not parse LLM response as JSON", file=sys.stderr)
print(f"Response preview: {content[:500]}", file=sys.stderr)
return None
def load_existing_knowledge(knowledge_dir: str) -> dict:
"""Load the existing knowledge index."""
index_path = Path(knowledge_dir) / "index.json"
if not index_path.exists():
return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []}
try:
with open(index_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"WARNING: Could not load knowledge index: {e}", file=sys.stderr)
return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []}
def fact_fingerprint(fact: dict) -> str:
"""Generate a deduplication fingerprint for a fact.
Uses the fact text normalized (lowercase, stripped) as the key.
Similar facts will have similar fingerprints.
"""
text = fact.get('fact', '').lower().strip()
# Normalize whitespace
text = ' '.join(text.split())
return hashlib.md5(text.encode('utf-8')).hexdigest()
def deduplicate(new_facts: list[dict], existing: list[dict], similarity_threshold: float = 0.8) -> list[dict]:
"""Remove duplicate facts from new_facts that already exist in the knowledge store.
Uses fingerprint matching for exact dedup and simple overlap check for near-dupes.
"""
existing_fingerprints = set()
existing_texts = []
for f in existing:
fp = fact_fingerprint(f)
existing_fingerprints.add(fp)
existing_texts.append(f.get('fact', '').lower().strip())
unique = []
for fact in new_facts:
fp = fact_fingerprint(fact)
if fp in existing_fingerprints:
continue
# Check for near-duplicates using simple word overlap
fact_words = set(fact.get('fact', '').lower().split())
is_dup = False
for existing_text in existing_texts:
existing_words = set(existing_text.split())
if not fact_words or not existing_words:
continue
overlap = len(fact_words & existing_words) / max(len(fact_words | existing_words), 1)
if overlap >= similarity_threshold:
is_dup = True
break
if not is_dup:
unique.append(fact)
existing_fingerprints.add(fp)
existing_texts.append(fact.get('fact', '').lower().strip())
return unique
def validate_fact(fact: dict) -> bool:
"""Validate a single knowledge item has required fields."""
required = ['fact', 'category', 'repo', 'confidence']
for field in required:
if field not in fact:
return False
if not isinstance(fact['fact'], str) or not fact['fact'].strip():
return False
valid_categories = ['fact', 'pitfall', 'pattern', 'tool-quirk', 'question']
if fact['category'] not in valid_categories:
return False
if not isinstance(fact.get('confidence', 0), (int, float)):
return False
if not (0.0 <= fact['confidence'] <= 1.0):
return False
return True
def write_knowledge(index: dict, new_facts: list[dict], knowledge_dir: str, source_session: str = ""):
"""Write new facts to the knowledge store."""
kdir = Path(knowledge_dir)
kdir.mkdir(parents=True, exist_ok=True)
# Add source tracking to each fact
for fact in new_facts:
fact['source_session'] = source_session
fact['harvested_at'] = datetime.now(timezone.utc).isoformat()
# Update index
index['facts'].extend(new_facts)
index['total_facts'] = len(index['facts'])
index['last_updated'] = datetime.now(timezone.utc).isoformat()
# Write index
index_path = kdir / "index.json"
with open(index_path, 'w', encoding='utf-8') as f:
json.dump(index, f, indent=2, ensure_ascii=False)
# Also write per-repo markdown files for human reading
repos = {}
for fact in new_facts:
repo = fact.get('repo', 'global')
repos.setdefault(repo, []).append(fact)
for repo, facts in repos.items():
if repo == 'global':
md_path = kdir / "global" / "harvested.md"
else:
md_path = kdir / "repos" / f"{repo}.md"
md_path.parent.mkdir(parents=True, exist_ok=True)
# Append to existing or create new
mode = 'a' if md_path.exists() else 'w'
with open(md_path, mode, encoding='utf-8') as f:
if mode == 'w':
f.write(f"# Knowledge: {repo}\n\n")
f.write(f"## Harvested {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')}\n\n")
for fact in facts:
icon = {'fact': '📋', 'pitfall': '⚠️', 'pattern': '🔄', 'tool-quirk': '🔧', 'question': ''}.get(fact['category'], '')
f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n")
f.write("\n")
def harvest_session(session_path: str, knowledge_dir: str, api_base: str, api_key: str,
model: str, dry_run: bool = False, min_confidence: float = 0.3) -> dict:
"""Harvest knowledge from a single session.
Returns: dict with stats (facts_found, facts_new, facts_dup, elapsed_seconds, error)
"""
start_time = time.time()
stats = {
'session': session_path,
'facts_found': 0,
'facts_new': 0,
'facts_dup': 0,
'elapsed_seconds': 0,
'error': None
}
try:
# 1. Read session
messages = read_session(session_path)
if not messages:
stats['error'] = "Empty session file"
return stats
# 2. Extract conversation
conv = extract_conversation(messages)
if not conv:
stats['error'] = "No conversation turns found"
return stats
# 3. Truncate for context window
truncated = truncate_for_context(conv, head=50, tail=50)
transcript = messages_to_text(truncated)
# 4. Load extraction prompt
prompt = load_extraction_prompt()
# 5. Call LLM
raw_facts = call_llm(prompt, transcript, api_base, api_key, model)
if raw_facts is None:
stats['error'] = "LLM extraction failed"
return stats
# 6. Validate
valid_facts = [f for f in raw_facts if validate_fact(f) and f.get('confidence', 0) >= min_confidence]
stats['facts_found'] = len(valid_facts)
# 7. Deduplicate
existing_index = load_existing_knowledge(knowledge_dir)
existing_facts = existing_index.get('facts', [])
new_facts = deduplicate(valid_facts, existing_facts)
stats['facts_new'] = len(new_facts)
stats['facts_dup'] = len(valid_facts) - len(new_facts)
# 8. Write (unless dry run)
if new_facts and not dry_run:
write_knowledge(existing_index, new_facts, knowledge_dir, source_session=session_path)
stats['elapsed_seconds'] = round(time.time() - start_time, 2)
return stats
except Exception as e:
stats['error'] = str(e)
stats['elapsed_seconds'] = round(time.time() - start_time, 2)
return stats
def batch_harvest(sessions_dir: str, knowledge_dir: str, api_base: str, api_key: str,
model: str, since: str = "", limit: int = 0, dry_run: bool = False) -> list[dict]:
"""Harvest knowledge from multiple sessions in batch."""
sessions_path = Path(sessions_dir)
if not sessions_path.is_dir():
print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr)
return []
# Find session files
session_files = sorted(sessions_path.glob("*.jsonl"), reverse=True) # Newest first
# Filter by date if --since provided
if since:
since_dt = datetime.fromisoformat(since.replace('Z', '+00:00'))
filtered = []
for sf in session_files:
# Try to parse timestamp from filename (common format: session_YYYYMMDD_HHMMSS_hash.jsonl)
try:
parts = sf.stem.split('_')
if len(parts) >= 3:
date_str = parts[1]
file_dt = datetime.strptime(date_str, '%Y%m%d').replace(tzinfo=timezone.utc)
if file_dt >= since_dt:
filtered.append(sf)
except (ValueError, IndexError):
# If we can't parse the date, include the file (be permissive)
filtered.append(sf)
session_files = filtered
# Apply limit
if limit > 0:
session_files = session_files[:limit]
print(f"Harvesting {len(session_files)} sessions...")
results = []
for i, sf in enumerate(session_files, 1):
print(f"[{i}/{len(session_files)}] {sf.name}...", end=" ", flush=True)
stats = harvest_session(str(sf), knowledge_dir, api_base, api_key, model, dry_run)
if stats['error']:
print(f"ERROR: {stats['error']}")
else:
print(f"{stats['facts_new']} new, {stats['facts_dup']} dup ({stats['elapsed_seconds']}s)")
results.append(stats)
return results
def main():
parser = argparse.ArgumentParser(description="Harvest knowledge from session transcripts")
parser.add_argument('--session', help='Path to a single session JSONL file')
parser.add_argument('--batch', action='store_true', help='Batch mode: process multiple sessions')
parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'),
help='Directory containing session files (default: ~/.hermes/sessions)')
parser.add_argument('--output', default='knowledge', help='Output directory for knowledge store')
parser.add_argument('--since', default='', help='Only process sessions after this date (YYYY-MM-DD)')
parser.add_argument('--limit', type=int, default=0, help='Max sessions to process (0=unlimited)')
parser.add_argument('--api-base', default=DEFAULT_API_BASE, help='LLM API base URL')
parser.add_argument('--api-key', default='', help='LLM API key (or set HARVESTER_API_KEY)')
parser.add_argument('--model', default=DEFAULT_MODEL, help='Model to use for extraction')
parser.add_argument('--dry-run', action='store_true', help='Preview without writing to knowledge store')
parser.add_argument('--min-confidence', type=float, default=0.3, help='Minimum confidence threshold')
args = parser.parse_args()
# Resolve API key
api_key = args.api_key or DEFAULT_API_KEY or find_api_key()
if not api_key:
print("ERROR: No API key found. Set HARVESTER_API_KEY or store in one of:", file=sys.stderr)
for p in API_KEY_PATHS:
print(f" {p}", file=sys.stderr)
sys.exit(1)
# Resolve knowledge directory
knowledge_dir = args.output
if not os.path.isabs(knowledge_dir):
knowledge_dir = os.path.join(SCRIPT_DIR.parent, knowledge_dir)
if args.session:
# Single session mode
stats = harvest_session(
args.session, knowledge_dir, args.api_base, api_key, args.model,
dry_run=args.dry_run, min_confidence=args.min_confidence
)
print(json.dumps(stats, indent=2))
if stats['error']:
sys.exit(1)
elif args.batch:
# Batch mode
results = batch_harvest(
args.sessions_dir, knowledge_dir, args.api_base, api_key, args.model,
since=args.since, limit=args.limit, dry_run=args.dry_run
)
total_new = sum(r['facts_new'] for r in results)
total_dup = sum(r['facts_dup'] for r in results)
errors = sum(1 for r in results if r['error'])
print(f"\nDone: {total_new} new facts, {total_dup} duplicates, {errors} errors")
else:
parser.print_help()
sys.exit(1)
if __name__ == '__main__':
main()

403
scripts/measurer.py Normal file
View File

@@ -0,0 +1,403 @@
#!/usr/bin/env python3
"""
Compounding Intelligence Metrics Engine.
Computes 7 metrics that prove whether the knowledge compounding loop is working:
1. Knowledge velocity -- new facts per day
2. Knowledge coverage -- % of domains with >10 facts
3. Hit rate -- % of sessions referencing bootstrap knowledge
4. Error recurrence -- same errors across sessions (should decrease)
5. Task completion -- % of sessions ending successfully
6. First-try success -- actions without backtracking
7. Knowledge age -- staleness of facts
Usage:
python3 measurer.py # All metrics, all time
python3 measurer.py --since 2026-04-01 # Time range
python3 measurer.py --repo the-nexus # Per-repo metrics
python3 measurer.py --format json # JSON output (default)
python3 measurer.py --format markdown # Human-readable
python3 measurer.py --knowledge-dir ./knowledge # Custom knowledge path
python3 measurer.py --db ~/.hermes/state.db # Custom DB path
Data sources:
- knowledge/index.json -- fact index
- knowledge/ -- YAML fact files for coverage
- ~/.hermes/state.db -- session/message metadata
"""
import argparse
import json
import os
import re
import sqlite3
import sys
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
# --- Defaults ---
DEFAULT_KNOWLEDGE_DIR = Path(__file__).parent.parent / "knowledge"
DEFAULT_DB_PATH = Path.home() / ".hermes" / "state.db"
SEVEN_DAYS = timedelta(days=7)
# --- Knowledge Store ---
def load_facts(knowledge_dir):
index_path = knowledge_dir / "index.json"
if not index_path.exists():
return []
with open(index_path) as f:
data = json.load(f)
return data.get("facts", [])
def count_yaml_facts(knowledge_dir):
domain_counts = {}
for subdir in ["repos", "global", "agents"]:
dirpath = knowledge_dir / subdir
if not dirpath.exists():
continue
for yaml_file in dirpath.glob("*.yaml"):
count = 0
try:
content = yaml_file.read_text()
count = len(re.findall(r"^\s*-\s*id:", content, re.MULTILINE))
except Exception:
pass
domain = yaml_file.stem
domain_counts[domain] = domain_counts.get(domain, 0) + count
return domain_counts
# --- Session Database ---
def open_db(db_path):
if not db_path.exists():
print(f"WARNING: Database not found at {db_path}", file=sys.stderr)
return None
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
return conn
def query_sessions(conn, since=None, repo=None):
if conn is None:
return []
query = "SELECT id, started_at, ended_at, end_reason, message_count, tool_call_count, model FROM sessions WHERE 1=1"
params = []
if since:
since_ts = datetime.fromisoformat(since).replace(tzinfo=timezone.utc).timestamp()
query += " AND started_at >= ?"
params.append(since_ts)
query += " ORDER BY started_at ASC"
cur = conn.execute(query, params)
return [dict(row) for row in cur.fetchall()]
def query_messages(conn, session_ids=None, since_ts=None):
if conn is None:
return []
query = "SELECT m.session_id, m.role, m.content, m.tool_name, m.timestamp FROM messages m WHERE 1=1"
params = []
if since_ts:
query += " AND m.timestamp >= ?"
params.append(since_ts)
if session_ids:
placeholders = ",".join("?" for _ in session_ids)
query += f" AND m.session_id IN ({placeholders})"
params.extend(session_ids)
cur = conn.execute(query, params)
return [dict(row) for row in cur.fetchall()]
# --- Metric Computations ---
def compute_knowledge_velocity(facts, since=None):
if not facts:
return {"value": 0.0, "total_facts": 0, "period_days": 0, "new_facts": 0}
dates = []
for f in facts:
d = f.get("first_seen") or f.get("created")
if d:
try:
dt = datetime.fromisoformat(d.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
dates.append(dt)
except (ValueError, AttributeError):
pass
if not dates:
return {"value": 0.0, "total_facts": len(facts), "period_days": 0, "new_facts": 0}
if since:
cutoff = datetime.fromisoformat(since).replace(tzinfo=timezone.utc)
dates = [d for d in dates if d >= cutoff]
if not dates:
return {"value": 0.0, "total_facts": len(facts), "period_days": 0, "new_facts": 0}
earliest = min(dates)
latest = max(dates)
period_days = max((latest - earliest).days, 1)
return {"value": round(len(dates) / period_days, 2), "total_facts": len(facts), "period_days": period_days, "new_facts": len(dates)}
def compute_knowledge_coverage(facts, yaml_counts):
domain_fact_counts = defaultdict(int)
for f in facts:
domain = f.get("domain", "unknown")
domain_fact_counts[domain] += 1
for domain, count in yaml_counts.items():
domain_fact_counts[domain] = max(domain_fact_counts[domain], count)
total_domains = len(domain_fact_counts)
if total_domains == 0:
return {"value": 0.0, "covered_domains": 0, "total_domains": 0, "domain_details": {}}
covered = sum(1 for c in domain_fact_counts.values() if c >= 10)
return {"value": round(covered / total_domains, 3), "covered_domains": covered, "total_domains": total_domains, "domain_details": dict(sorted(domain_fact_counts.items(), key=lambda x: -x[1])[:20])}
def compute_hit_rate(sessions, messages, facts):
if not sessions or not facts:
return {"value": 0.0, "hit_sessions": 0, "total_sessions": len(sessions)}
fact_fragments = set()
for f in facts:
text = f.get("fact", "").lower().strip()
if len(text) > 10:
fact_fragments.add(text)
words = re.findall(r'\w{4,}', text)
for w in words:
fact_fragments.add(w)
if not fact_fragments:
return {"value": 0.0, "hit_sessions": 0, "total_sessions": len(sessions)}
session_messages = defaultdict(list)
for m in messages:
content = (m.get("content") or "").lower()
if content:
session_messages[m["session_id"]].append(content)
hit_sessions = 0
for session in sessions:
sid = session["id"]
all_content = " ".join(session_messages.get(sid, []))
if any(frag in all_content for frag in fact_fragments):
hit_sessions += 1
return {"value": round(hit_sessions / len(sessions), 3) if sessions else 0.0, "hit_sessions": hit_sessions, "total_sessions": len(sessions)}
def compute_error_recurrence(messages):
if not messages:
return {"value": 0.0, "unique_errors": 0, "recurring_errors": 0, "top_errors": []}
error_pattern = re.compile(r'(?:error|Error|ERROR|failed|FAIL|exception|Exception)[:\s]*(.{10,80})', re.IGNORECASE)
error_to_sessions = defaultdict(set)
for m in messages:
content = m.get("content") or ""
if not content:
continue
for match in error_pattern.finditer(content):
sig = match.group(1).strip().lower()
sig = re.sub(r'\s+', ' ', sig)
if len(sig) > 5:
error_to_sessions[sig].add(m["session_id"])
if not error_to_sessions:
return {"value": 0.0, "unique_errors": 0, "recurring_errors": 0, "top_errors": []}
recurring = {e: s for e, s in error_to_sessions.items() if len(s) > 1}
total_errors = len(error_to_sessions)
recurring_count = len(recurring)
top = sorted(recurring.items(), key=lambda x: -len(x[1]))[:10]
return {"value": round(recurring_count / total_errors, 3) if total_errors else 0.0, "unique_errors": total_errors, "recurring_errors": recurring_count, "top_errors": [{"error": e, "sessions": len(s)} for e, s in top]}
def compute_task_completion(sessions):
if not sessions:
return {"value": 0.0, "completed": 0, "total": 0, "breakdown": {}}
breakdown = Counter()
for s in sessions:
reason = s.get("end_reason") or "unknown"
breakdown[reason] += 1
completed = breakdown.get("cron_complete", 0) + breakdown.get("session_reset", 0)
normal_endings = completed + breakdown.get("cli_close", 0) + breakdown.get("compression", 0)
return {"value": round(completed / len(sessions), 3) if sessions else 0.0, "normal_end_rate": round(normal_endings / len(sessions), 3) if sessions else 0.0, "completed": completed, "total": len(sessions), "breakdown": dict(breakdown.most_common())}
def compute_first_try_success(sessions):
if not sessions:
return {"value": 0.0, "avg_tool_msg_ratio": 0.0, "sampled": 0}
ratios = []
for s in sessions:
msgs = s.get("message_count", 0) or 0
tools = s.get("tool_call_count", 0) or 0
if msgs > 2:
ratios.append(tools / msgs if msgs > 0 else 0)
if not ratios:
return {"value": 0.0, "avg_tool_msg_ratio": 0.0, "sampled": 0}
avg_ratio = sum(ratios) / len(ratios)
first_try = sum(1 for r in ratios if r < 0.5)
return {"value": round(first_try / len(ratios), 3), "avg_tool_msg_ratio": round(avg_ratio, 3), "sampled": len(ratios), "interpretation": "Higher value = fewer backtracks = better first-try success"}
def compute_knowledge_age(facts):
if not facts:
return {"value": 0.0, "avg_age_days": 0, "stale_facts": 0, "total_facts": 0}
now = datetime.now(timezone.utc)
ages = []
stale_count = 0
for f in facts:
confirmed = f.get("last_confirmed") or f.get("first_seen")
if confirmed:
try:
dt = datetime.fromisoformat(confirmed.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
age = (now - dt).days
ages.append(age)
if age > 30:
stale_count += 1
except (ValueError, AttributeError):
pass
if not ages:
return {"value": 0.0, "avg_age_days": 0, "stale_facts": 0, "total_facts": len(facts)}
avg_age = sum(ages) / len(ages)
freshness = max(0.0, 1.0 - (avg_age / 90))
return {"value": round(freshness, 3), "avg_age_days": round(avg_age, 1), "stale_facts": stale_count, "total_facts": len(facts), "interpretation": "1.0 = all facts fresh. 0.0 = all facts 90+ days old"}
# --- Trend Computation ---
def compute_trend(current, previous, metric_key="value"):
if not previous:
return {"delta": "N/A", "direction": "unknown"}
curr_val = current.get(metric_key, 0)
prev_val = previous.get(metric_key, 0)
if prev_val == 0:
return {"delta": "N/A (no baseline)", "direction": "unknown"}
pct = ((curr_val - prev_val) / abs(prev_val)) * 100
direction = "up" if pct > 0 else "down" if pct < 0 else "flat"
if metric_key == "error_recurrence" or metric_key == "knowledge_age":
direction_label = "good" if pct < 0 else "bad" if pct > 0 else "neutral"
else:
direction_label = "good" if pct > 0 else "bad" if pct < 0 else "neutral"
return {"delta": f"{'+' if pct > 0 else ''}{pct:.1f}%", "direction": direction, "assessment": direction_label}
# --- Output Formatters ---
def format_json(metrics):
return json.dumps(metrics, indent=2)
def format_markdown(metrics):
lines = ["# Compounding Intelligence Metrics", f"**Generated:** {metrics.get('generated_at', 'unknown')}", ""]
trend = metrics.get("trend_7d", {})
def metric_block(name, data, desc, good_direction="up"):
val = data.get("value", 0)
t = trend.get(name, {})
delta = t.get("delta", "N/A")
assessment = t.get("assessment", "unknown")
arrow = "up" if assessment == "good" else "down" if assessment == "bad" else "---"
lines.extend([f"## {name}", desc, "", f"**Value:** {val} | **7d trend:** {delta} {arrow} ({assessment})", ""])
for k, v in data.items():
if k != "value" and k != "interpretation":
if isinstance(v, (int, float, str)):
lines.append(f"- {k}: {v}")
lines.append("")
metric_block("knowledge_velocity", metrics.get("knowledge_velocity", {}), "New facts extracted per day. Higher = compounding loop working.")
metric_block("knowledge_coverage", metrics.get("knowledge_coverage", {}), "Percentage of domains/repos with 10+ facts. Measures breadth.")
metric_block("hit_rate", metrics.get("hit_rate", {}), "Percentage of sessions referencing bootstrapped knowledge.")
metric_block("error_recurrence", metrics.get("error_recurrence", {}), "Ratio of recurring errors. Lower = fleet learning from mistakes.", good_direction="down")
metric_block("task_completion", metrics.get("task_completion", {}), "Percentage of sessions ending with successful completion.")
metric_block("first_try_success", metrics.get("first_try_success", {}), "Percentage of sessions completed without backtracking.")
metric_block("knowledge_age", metrics.get("knowledge_age", {}), "Freshness of knowledge store. 1.0 = all fresh, 0.0 = all stale.", good_direction="up")
return "\n".join(lines)
# --- Snapshot Persistence ---
def load_snapshot(metrics_dir):
snapshot_path = metrics_dir / "latest_snapshot.json"
if snapshot_path.exists():
with open(snapshot_path) as f:
return json.load(f)
return {}
def save_snapshot(metrics_dir, metrics):
metrics_dir.mkdir(parents=True, exist_ok=True)
snapshot_path = metrics_dir / "latest_snapshot.json"
with open(snapshot_path, "w") as f:
json.dump(metrics, f, indent=2)
# --- Main ---
def main():
parser = argparse.ArgumentParser(description="Compounding Intelligence Metrics")
parser.add_argument("--since", help="Start date (YYYY-MM-DD)")
parser.add_argument("--repo", help="Filter by repo/domain")
parser.add_argument("--format", choices=["json", "markdown"], default="json")
parser.add_argument("--knowledge-dir", type=Path, default=DEFAULT_KNOWLEDGE_DIR)
parser.add_argument("--db", type=Path, default=DEFAULT_DB_PATH)
parser.add_argument("--save-snapshot", action="store_true", help="Save current metrics as snapshot for trend tracking")
parser.add_argument("--metrics-dir", type=Path, default=Path(__file__).parent.parent / "metrics", help="Directory for snapshots and dashboard")
args = parser.parse_args()
facts = load_facts(args.knowledge_dir)
yaml_counts = count_yaml_facts(args.knowledge_dir)
if args.repo:
facts = [f for f in facts if f.get("domain") == args.repo]
conn = open_db(args.db)
sessions = query_sessions(conn, since=args.since)
messages = query_messages(conn) if conn else []
if conn:
conn.close()
velocity = compute_knowledge_velocity(facts, since=args.since)
coverage = compute_knowledge_coverage(facts, yaml_counts)
hit_rate = compute_hit_rate(sessions, messages, facts)
error_recurrence = compute_error_recurrence(messages)
task_completion = compute_task_completion(sessions)
first_try = compute_first_try_success(sessions)
age = compute_knowledge_age(facts)
previous = load_snapshot(args.metrics_dir)
trend = {
"knowledge_velocity": compute_trend(velocity, previous.get("knowledge_velocity", {})),
"knowledge_coverage": compute_trend(coverage, previous.get("knowledge_coverage", {})),
"hit_rate": compute_trend(hit_rate, previous.get("hit_rate", {})),
"error_recurrence": compute_trend(error_recurrence, previous.get("error_recurrence", {}), "value"),
"task_completion": compute_trend(task_completion, previous.get("task_completion", {})),
"first_try_success": compute_trend(first_try, previous.get("first_try_success", {})),
"knowledge_age": compute_trend(age, previous.get("knowledge_age", {})),
}
now = datetime.now(timezone.utc).isoformat()
metrics = {
"generated_at": now,
"knowledge_velocity": velocity,
"knowledge_coverage": coverage,
"hit_rate": hit_rate,
"error_recurrence": error_recurrence,
"task_completion": task_completion,
"first_try_success": first_try,
"knowledge_age": age,
"trend_7d": trend,
}
if args.since:
metrics["since"] = args.since
if args.save_snapshot:
save_snapshot(args.metrics_dir, metrics)
dashboard_path = args.metrics_dir / "dashboard.md"
with open(dashboard_path, "w") as f:
f.write(format_markdown(metrics))
if args.format == "json":
print(format_json(metrics))
else:
print(format_markdown(metrics))
if __name__ == "__main__":
main()

View File

@@ -1,142 +0,0 @@
#!/usr/bin/env python3
"""
session_reader.py — Parse Hermes session JSONL transcripts.
Each line in a session file is a JSON object representing a message.
Standard fields: role (user|assistant|system), content (str), timestamp (str).
Tool calls and tool results are also captured.
"""
import json
import sys
from pathlib import Path
from typing import Iterator, Optional
def read_session(path: str) -> list[dict]:
"""Read a session JSONL file and return all messages as a list."""
messages = []
with open(path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
messages.append(msg)
except json.JSONDecodeError as e:
print(f"WARNING: Skipping malformed JSON at line {line_num}: {e}", file=sys.stderr)
return messages
def read_session_iter(path: str) -> Iterator[dict]:
"""Iterate over session messages without loading all into memory."""
with open(path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
print(f"WARNING: Skipping malformed JSON at line {line_num}: {e}", file=sys.stderr)
def extract_conversation(messages: list[dict]) -> list[dict]:
"""Extract user/assistant conversation turns, skipping tool-only messages."""
conversation = []
for msg in messages:
role = msg.get('role', '')
content = msg.get('content', '')
# Skip empty messages and pure tool calls
if role in ('user', 'assistant', 'system'):
if isinstance(content, str) and content.strip():
conversation.append({
'role': role,
'content': content.strip(),
'timestamp': msg.get('timestamp', '')
})
elif isinstance(content, list):
# Multimodal content — extract text parts
text_parts = []
for part in content:
if isinstance(part, dict) and part.get('type') == 'text':
text_parts.append(part.get('text', ''))
if text_parts:
conversation.append({
'role': role,
'content': '\n'.join(text_parts),
'timestamp': msg.get('timestamp', '')
})
return conversation
def truncate_for_context(messages: list[dict], head: int = 50, tail: int = 50) -> list[dict]:
"""Truncate long sessions: keep first N + last N messages.
This preserves session start (initial context) and end (final results),
skipping the messy middle of long debugging sessions.
"""
if len(messages) <= head + tail:
return messages
truncated = messages[:head]
truncated.append({
'role': 'system',
'content': f'[{len(messages) - head - tail} messages truncated]',
'timestamp': ''
})
truncated.extend(messages[-tail:])
return truncated
def messages_to_text(messages: list[dict]) -> str:
"""Convert message list to plain text for LLM consumption."""
lines = []
for msg in messages:
role = msg.get('role', 'unknown').upper()
content = msg.get('content', '')
if msg.get('role') == 'system' and 'truncated' in content:
lines.append(f'--- {content} ---')
else:
lines.append(f'{role}: {content}')
return '\n\n'.join(lines)
def get_session_metadata(path: str) -> dict:
"""Extract metadata from a session file (first message often has config info)."""
messages = read_session(path)
if not messages:
return {'path': path, 'message_count': 0}
first = messages[0]
last = messages[-1]
return {
'path': path,
'message_count': len(messages),
'first_timestamp': first.get('timestamp', ''),
'last_timestamp': last.get('timestamp', ''),
'first_role': first.get('role', ''),
'has_tool_calls': any(m.get('tool_calls') for m in messages),
}
if __name__ == '__main__':
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <session.jsonl>")
sys.exit(1)
path = sys.argv[1]
meta = get_session_metadata(path)
print(json.dumps(meta, indent=2))
messages = read_session(path)
conv = extract_conversation(messages)
print(f"\nConversation: {len(conv)} turns")
truncated = truncate_for_context(conv)
print(f"After truncation: {len(truncated)} turns")
print(f"\nPreview (first 500 chars):")
print(messages_to_text(truncated[:5])[:500])

View File

@@ -1,162 +0,0 @@
#!/usr/bin/env python3
"""
Smoke test for harvester pipeline — verifies the full chain:
session_reader -> prompt -> LLM (mocked) -> validate -> deduplicate -> store
Does NOT call the real LLM. Tests plumbing only.
"""
import json
import sys
import tempfile
import os
from pathlib import Path
# Setup path
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text
from harvester import validate_fact, deduplicate, load_existing_knowledge, fact_fingerprint
def test_session_reader():
"""Test that session_reader parses JSONL correctly."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
f.write('{"role": "user", "content": "Hello", "timestamp": "2026-04-13T10:00:00Z"}\n')
f.write('{"role": "assistant", "content": "Hi there", "timestamp": "2026-04-13T10:00:01Z"}\n')
f.write('{"role": "user", "content": "Clone the repo", "timestamp": "2026-04-13T10:00:02Z"}\n')
f.write('{"role": "assistant", "content": "Cloned successfully", "timestamp": "2026-04-13T10:00:05Z"}\n')
path = f.name
messages = read_session(path)
assert len(messages) == 4, f"Expected 4 messages, got {len(messages)}"
conv = extract_conversation(messages)
assert len(conv) == 4, f"Expected 4 conversation turns, got {len(conv)}"
text = messages_to_text(conv)
assert "USER: Hello" in text
assert "ASSISTANT: Hi there" in text
truncated = truncate_for_context(conv, head=2, tail=2)
assert len(truncated) == 4 # 4 <= head+tail, so no truncation
os.unlink(path)
print(" [PASS] session_reader pipeline works")
def test_validate_fact():
"""Test fact validation."""
good = {"fact": "Gitea token is at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9}
assert validate_fact(good), "Valid fact should pass"
bad_missing = {"fact": "Something", "category": "fact"}
assert not validate_fact(bad_missing), "Missing fields should fail"
bad_category = {"fact": "Something", "category": "nonsense", "repo": "x", "confidence": 0.5}
assert not validate_fact(bad_category), "Bad category should fail"
bad_conf = {"fact": "Something", "category": "fact", "repo": "x", "confidence": 1.5}
assert not validate_fact(bad_conf), "Confidence > 1.0 should fail"
print(" [PASS] fact validation works")
def test_deduplicate():
"""Test deduplication."""
existing = [
{"fact": "Token is at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9}
]
new = [
{"fact": "Token is at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9}, # exact dup
{"fact": "Deploy uses Ansible on port 22", "category": "pattern", "repo": "fleet", "confidence": 0.8}, # unique
]
result = deduplicate(new, existing)
assert len(result) == 1, f"Expected 1 unique, got {len(result)}"
assert result[0]["fact"] == "Deploy uses Ansible on port 22"
print(" [PASS] deduplication works")
def test_knowledge_store_roundtrip():
"""Test loading and writing knowledge index."""
with tempfile.TemporaryDirectory() as tmpdir:
# Load empty index
index = load_existing_knowledge(tmpdir)
assert index["total_facts"] == 0
# Write a fact
new_facts = [{"fact": "Test fact", "category": "fact", "repo": "test", "confidence": 0.9}]
# Use harvester's write function
from harvester import write_knowledge
write_knowledge(index, new_facts, tmpdir, source_session="test.jsonl")
# Reload and verify
index2 = load_existing_knowledge(tmpdir)
assert index2["total_facts"] == 1
assert index2["facts"][0]["fact"] == "Test fact"
assert index2["facts"][0]["source_session"] == "test.jsonl"
# Check markdown was written
md_path = Path(tmpdir) / "repos" / "test.md"
assert md_path.exists(), "Markdown file should be created"
print(" [PASS] knowledge store roundtrip works")
def test_full_chain_no_llm():
"""Test the full pipeline minus the LLM call."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
f.write('{"role": "user", "content": "Clone compounding-intelligence", "timestamp": "2026-04-13T10:00:00Z"}\n')
f.write('{"role": "assistant", "content": "Cloned successfully", "timestamp": "2026-04-13T10:00:05Z"}\n')
session_path = f.name
with tempfile.TemporaryDirectory() as knowledge_dir:
# Step 1: Read
messages = read_session(session_path)
assert len(messages) == 2
# Step 2: Extract conversation
conv = extract_conversation(messages)
assert len(conv) == 2
# Step 3: Truncate
truncated = truncate_for_context(conv, head=50, tail=50)
# Step 4: Convert to text (this goes to the LLM)
transcript = messages_to_text(truncated)
assert "Clone compounding-intelligence" in transcript
# Step 5-7: Would be LLM call, validate, deduplicate
# We simulate LLM output here
mock_facts = [
{"fact": "compounding-intelligence repo was cloned", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9}
]
valid = [f for f in mock_facts if validate_fact(f)]
# Step 6: Deduplicate
index = load_existing_knowledge(knowledge_dir)
new_facts = deduplicate(valid, index.get("facts", []))
assert len(new_facts) == 1
# Step 7: Store
from harvester import write_knowledge
write_knowledge(index, new_facts, knowledge_dir, source_session=session_path)
# Verify
index2 = load_existing_knowledge(knowledge_dir)
assert index2["total_facts"] == 1
os.unlink(session_path)
print(" [PASS] full chain (reader -> validate -> dedup -> store) works")
if __name__ == "__main__":
print("Running harvester pipeline smoke tests...")
test_session_reader()
test_validate_fact()
test_deduplicate()
test_knowledge_store_roundtrip()
test_full_chain_no_llm()
print("\nAll tests passed.")

View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""
Validate knowledge files and index.json against the schema.
Usage:
python scripts/validate_knowledge.py
"""
import json
import sys
from pathlib import Path
VALID_CATEGORIES = {"fact", "pitfall", "pattern", "tool-quirk", "question"}
REQUIRED_FACT_FIELDS = {"id", "fact", "category", "domain", "confidence"}
MAX_FACT_LENGTH = 280
def validate_fact(fact, source=""):
errors = []
for field in REQUIRED_FACT_FIELDS:
if field not in fact:
errors.append(f"{source}: missing required field '{field}'")
if "fact" in fact:
if not isinstance(fact["fact"], str) or len(fact["fact"].strip()) == 0:
errors.append(f"{source}: 'fact' must be non-empty string")
elif len(fact["fact"]) > MAX_FACT_LENGTH:
errors.append(f"{source}: 'fact' exceeds {MAX_FACT_LENGTH} chars")
if "category" in fact and fact["category"] not in VALID_CATEGORIES:
errors.append(f"{source}: invalid category '{fact['category']}'")
if "confidence" in fact:
if not isinstance(fact["confidence"], (int, float)):
errors.append(f"{source}: 'confidence' must be a number")
elif not (0.0 <= fact["confidence"] <= 1.0):
errors.append(f"{source}: 'confidence' must be 0.0-1.0")
if "id" in fact:
parts = fact["id"].split(":")
if len(parts) != 3:
errors.append(f"{source}: 'id' must be domain:category:sequence")
elif parts[1] not in VALID_CATEGORIES:
errors.append(f"{source}: id category '{parts[1]}' invalid")
return errors
def main():
repo_root = Path(__file__).parent.parent
index_path = repo_root / "knowledge" / "index.json"
all_errors = []
if not index_path.exists():
print(f"VALIDATION FAILED: index.json not found at {index_path}")
sys.exit(1)
with open(index_path) as f:
data = json.load(f)
if "version" not in data:
all_errors.append("index.json: missing 'version'")
if "facts" not in data or not isinstance(data["facts"], list):
all_errors.append("index.json: missing or invalid 'facts'")
seen_ids = set()
for i, fact in enumerate(data.get("facts", [])):
all_errors.extend(validate_fact(fact, f"facts[{i}]"))
if "id" in fact:
if fact["id"] in seen_ids:
all_errors.append(f"index.json: duplicate id '{fact['id']}'")
seen_ids.add(fact["id"])
if all_errors:
print(f"VALIDATION FAILED - {len(all_errors)} error(s):\n")
for e in all_errors:
print(f" x {e}")
sys.exit(1)
else:
print(f"VALIDATION PASSED - {len(data.get('facts', []))} facts, schema v{data.get('version', '?')}")
sys.exit(0)
if __name__ == "__main__":
main()