Compare commits

..

1 Commits

Author SHA1 Message Date
55797c8a3e feat: add sampler.py — session value scorer (#17) 2026-04-15 03:02:12 +00:00
3 changed files with 353 additions and 416 deletions

View File

@@ -1,275 +0,0 @@
"""
Knowledge Gap Identifier — Pipeline 10.7
Cross-references code, docs, and tests to find gaps:
- Undocumented functions/classes
- Untested code paths
- Documented but missing implementations
- Test files without corresponding source
Produces a gap report with severity and suggestions.
"""
from __future__ import annotations
import ast
import os
import re
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Set
class GapSeverity(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
class GapType(Enum):
UNDOCUMENTED = "undocumented"
UNTESTED = "untested"
MISSING_IMPLEMENTATION = "missing_implementation"
ORPHAN_TEST = "orphan_test"
STALE_DOC = "stale_doc"
@dataclass
class Gap:
"""A single knowledge gap."""
gap_type: GapType
severity: GapSeverity
file: str
line: Optional[int]
name: str
description: str
suggestion: str
@dataclass
class GapReport:
"""Full gap analysis report."""
repo_path: str
gaps: List[Gap] = field(default_factory=list)
stats: Dict[str, int] = field(default_factory=dict)
def summary(self) -> str:
lines = [f"Gap Report for {self.repo_path}", "=" * 40]
by_type = {}
for g in self.gaps:
by_type.setdefault(g.gap_type.value, []).append(g)
for gtype, items in sorted(by_type.items()):
lines.append(f"\n{gtype.upper()} ({len(items)}):")
for g in items:
loc = f"{g.file}:{g.line}" if g.line else g.file
lines.append(f" [{g.severity.value}] {g.name} @ {loc}")
lines.append(f" {g.description}")
lines.append(f"\nTotal gaps: {len(self.gaps)}")
self.stats = {k: len(v) for k, v in by_type.items()}
return "\n".join(lines)
def to_dict(self) -> dict:
return {
"repo_path": self.repo_path,
"total_gaps": len(self.gaps),
"stats": {k: len(v) for k, v in
{gt: [g for g in self.gaps if g.gap_type == gt]
for gt in GapType}.items() if v},
"gaps": [
{
"type": g.gap_type.value,
"severity": g.severity.value,
"file": g.file,
"line": g.line,
"name": g.name,
"description": g.description,
"suggestion": g.suggestion,
}
for g in self.gaps
],
}
def _collect_python_files(root: Path) -> List[Path]:
"""Collect .py files, excluding venv/node_modules/.git."""
skip = {".git", "venv", "env", ".venv", "node_modules", "__pycache__", ".tox", ".mypy_cache"}
files = []
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [d for d in dirnames if d not in skip]
for f in filenames:
if f.endswith(".py"):
files.append(Path(dirpath) / f)
return files
def _extract_python_symbols(filepath: Path) -> Set[str]:
"""Extract top-level function and class names from a Python file."""
symbols = set()
try:
source = filepath.read_text(encoding="utf-8", errors="replace")
tree = ast.parse(source, filename=str(filepath))
except (SyntaxError, UnicodeDecodeError):
return symbols
for node in ast.iter_child_nodes(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
symbols.add(node.name)
return symbols
def _extract_doc_symbols(filepath: Path) -> Set[str]:
"""Extract function/class names mentioned in markdown docs."""
symbols = set()
try:
text = filepath.read_text(encoding="utf-8", errors="replace")
except (UnicodeDecodeError, OSError):
return symbols
# Match backtick-quoted identifiers: `ClassName`, `func_name`, `func()`
for m in re.finditer(r"`([A-Za-z_]\w+)(?:\(\))?`", text):
symbols.add(m.group(1))
# Match ## ClassName or ### func_name headings
for m in re.finditer(r"^#{1,4}\s+(\w+)", text, re.MULTILINE):
symbols.add(m.group(1))
return symbols
def _collect_test_files(root: Path) -> Dict[str, Path]:
"""Map test module names to their file paths."""
test_map = {}
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [d for d in dirnames if d not in {".git", "venv", "node_modules"}]
for f in filenames:
if f.startswith("test_") and f.endswith(".py"):
# test_foo.py -> foo
module_name = f[5:-3]
test_map[module_name] = Path(dirpath) / f
return test_map
class KnowledgeGapIdentifier:
"""Analyzes a repo for knowledge gaps between code, docs, and tests."""
def analyze(self, repo_path: str) -> GapReport:
root = Path(repo_path).resolve()
report = GapReport(repo_path=str(root))
if not root.is_dir():
report.gaps.append(Gap(
gap_type=GapType.UNDOCUMENTED,
severity=GapSeverity.ERROR,
file=str(root),
line=None,
name="repo",
description="Path is not a directory",
suggestion="Provide a valid repo directory",
))
return report
# Collect artifacts
py_files = _collect_python_files(root)
doc_files = list(root.glob("docs/**/*.md")) + list(root.glob("*.md"))
test_map = _collect_test_files(root / "tests") if (root / "tests").is_dir() else {}
# Extract symbols from each source file
source_symbols: Dict[str, Set[str]] = {} # relative_path -> symbols
all_source_symbols: Set[str] = set()
for pf in py_files:
rel = str(pf.relative_to(root))
# Skip test files and setup/config
if "/tests/" in rel or rel.startswith("tests/") or rel.startswith("test_"):
continue
if pf.name in ("setup.py", "conftest.py", "conf.py"):
continue
syms = _extract_python_symbols(pf)
if syms:
source_symbols[rel] = syms
all_source_symbols.update(syms)
# Extract documented symbols
doc_symbols: Set[str] = set()
for df in doc_files:
doc_symbols.update(_extract_doc_symbols(df))
# Extract test-covered symbols
tested_modules: Set[str] = set(test_map.keys())
# --- Find gaps ---
# 1. Undocumented: source symbols not in any doc
for rel_path, syms in source_symbols.items():
for sym in sorted(syms):
if sym.startswith("_") and not sym.startswith("__"):
continue # Skip private
if sym not in doc_symbols:
report.gaps.append(Gap(
gap_type=GapType.UNDOCUMENTED,
severity=GapSeverity.WARNING,
file=rel_path,
line=None,
name=sym,
description=f"{sym} defined in {rel_path} but not referenced in any docs",
suggestion=f"Add documentation for {sym} in a .md file",
))
# 2. Untested: source modules without a corresponding test file
for rel_path in source_symbols:
module_name = Path(rel_path).stem
if module_name not in tested_modules and module_name not in ("__init__", "main", "config"):
report.gaps.append(Gap(
gap_type=GapType.UNTESTED,
severity=GapSeverity.ERROR,
file=rel_path,
line=None,
name=module_name,
description=f"No test file found for {rel_path}",
suggestion=f"Create tests/test_{module_name}.py",
))
# 3. Missing implementation: doc references symbol not in any source
referenced_but_missing = doc_symbols - all_source_symbols
for sym in sorted(referenced_but_missing):
# Filter out common non-code terms
if sym.lower() in {"todo", "fixme", "note", "example", "usage", "api",
"install", "setup", "config", "license", "contributing",
"changelog", "readme", "python", "bash", "json", "yaml",
"http", "url", "cli", "gui", "ui", "api", "rest"}:
continue
if len(sym) < 3:
continue
report.gaps.append(Gap(
gap_type=GapType.MISSING_IMPLEMENTATION,
severity=GapSeverity.INFO,
file="(docs)",
line=None,
name=sym,
description=f"{sym} referenced in docs but not found in source code",
suggestion=f"Verify if {sym} should be implemented or update docs",
))
# 4. Orphan tests: test files without matching source
for test_mod, test_path in test_map.items():
if test_mod not in tested_modules and not any(
test_mod in Path(f).stem for f in source_symbols
):
# Check if any source file partially matches
matches_source = any(test_mod.replace("_", "-") in f or test_mod.replace("_", "") in Path(f).stem
for f in source_symbols)
if not matches_source:
rel = str(test_path.relative_to(root))
report.gaps.append(Gap(
gap_type=GapType.ORPHAN_TEST,
severity=GapSeverity.WARNING,
file=rel,
line=None,
name=test_mod,
description=f"Test file {rel} exists but no matching source module found",
suggestion=f"Verify if the source was renamed or removed",
))
return report

353
scripts/sampler.py Normal file
View File

@@ -0,0 +1,353 @@
#!/usr/bin/env python3
"""
sampler.py — Score and rank sessions by harvest value.
With 20k+ sessions on disk, we can't harvest all at once. This script
scores each session by how likely it is to contain valuable knowledge,
so the harvester processes the best ones first.
Scoring strategy:
- Recency: last 7d=3pts, last 30d=2pts, older=1pt
- Length: >50 messages=3pts, >20=2pts, <20=1pt
- Repo uniqueness: first session for a repo=5pts, otherwise=1pt
- Outcome: failure=3pts (most to learn), success=2pts, unknown=1pt
- Tool calls: >10 tool invocations=2pts (complex sessions)
Usage:
python3 sampler.py --count 100 # Top 100 sessions
python3 sampler.py --repo the-nexus --count 20 # Top 20 for a repo
python3 sampler.py --since 2026-04-01 # All sessions since date
python3 sampler.py --count 50 --min-score 8 # Only high-value sessions
python3 sampler.py --count 100 --output sample.json # Save to file
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
# --- Fast session scanning (no full parse) ---
def scan_session_fast(path: str) -> dict:
"""Extract scoring metadata from a session without parsing the full JSONL.
Reads only: first line, last ~20 lines, and line count. This processes
20k sessions in seconds instead of minutes.
"""
meta = {
'path': path,
'message_count': 0,
'has_tool_calls': False,
'tool_call_count': 0,
'first_timestamp': '',
'last_timestamp': '',
'is_failure': False,
'repos_mentioned': set(),
'first_role': '',
'last_content_preview': '',
}
try:
file_size = os.path.getsize(path)
if file_size == 0:
return meta
with open(path, 'r', encoding='utf-8', errors='replace') as f:
# Read first line for timestamp + role
first_line = f.readline().strip()
if first_line:
try:
first_msg = json.loads(first_line)
meta['first_timestamp'] = first_msg.get('timestamp', '')
meta['first_role'] = first_msg.get('role', '')
except json.JSONDecodeError:
pass
# Fast line count + collect tail lines
# For the tail, seek to near end of file
tail_lines = []
line_count = 1 # already read first
if file_size > 8192:
# Seek to last 8KB for tail sampling
f.seek(max(0, file_size - 8192))
f.readline() # skip partial line
for line in f:
line = line.strip()
if line:
tail_lines.append(line)
line_count += 1
# We lost the exact count for big files — estimate from file size
# Average JSONL line is ~500 bytes
if line_count < 100:
line_count = max(line_count, file_size // 500)
else:
# Small file — read all
for line in f:
line = line.strip()
if line:
tail_lines.append(line)
line_count += 1
meta['message_count'] = line_count
# Parse tail lines for outcome, tool calls, repos
for line in tail_lines[-30:]: # last 30 non-empty lines
try:
msg = json.loads(line)
# Track last timestamp
ts = msg.get('timestamp', '')
if ts:
meta['last_timestamp'] = ts
# Count tool calls
if msg.get('tool_calls'):
meta['has_tool_calls'] = True
meta['tool_call_count'] += len(msg['tool_calls'])
# Detect failure signals in content
content = ''
if isinstance(msg.get('content'), str):
content = msg['content'].lower()
elif isinstance(msg.get('content'), list):
for part in msg['content']:
if isinstance(part, dict) and part.get('type') == 'text':
content += part.get('text', '').lower()
if content:
meta['last_content_preview'] = content[:200]
failure_signals = ['error', 'failed', 'cannot', 'unable',
'exception', 'traceback', 'rejected', 'denied']
if any(sig in content for sig in failure_signals):
meta['is_failure'] = True
# Extract repo references from tool call arguments
if msg.get('tool_calls'):
for tc in msg['tool_calls']:
args = tc.get('function', {}).get('arguments', '')
if isinstance(args, str):
# Look for repo patterns
for pattern in ['Timmy_Foundation/', 'Rockachopa/', 'compounding-intelligence', 'the-nexus', 'timmy-home', 'hermes-agent', 'the-beacon', 'the-door']:
if pattern in args:
repo = pattern.rstrip('/')
meta['repos_mentioned'].add(repo)
except json.JSONDecodeError:
continue
except (IOError, OSError):
pass
meta['repos_mentioned'] = list(meta['repos_mentioned'])
return meta
# --- Filename timestamp parsing ---
def parse_session_timestamp(filename: str) -> Optional[datetime]:
"""Parse timestamp from session filename.
Common formats:
session_20260413_123456_hash.jsonl
20260413_123456_hash.jsonl
"""
stem = Path(filename).stem
parts = stem.split('_')
# Try session_YYYYMMDD_HHMMSS format
for i, part in enumerate(parts):
if len(part) == 8 and part.isdigit():
date_part = part
time_part = parts[i + 1] if i + 1 < len(parts) and len(parts[i + 1]) == 6 else '000000'
try:
return datetime.strptime(f"{date_part}_{time_part}", '%Y%m%d_%H%M%S').replace(tzinfo=timezone.utc)
except ValueError:
continue
# Fallback: use file modification time
return None
# --- Scoring ---
def score_session(meta: dict, now: datetime, seen_repos: set) -> tuple[int, dict]:
"""Score a session for harvest value. Returns (score, breakdown)."""
score = 0
breakdown = {}
# 1. Recency
ts = parse_session_timestamp(os.path.basename(meta['path']))
if ts is None:
# Fallback to mtime
try:
ts = datetime.fromtimestamp(os.path.getmtime(meta['path']), tz=timezone.utc)
except OSError:
ts = now - timedelta(days=365)
age_days = (now - ts).days
if age_days <= 7:
recency = 3
elif age_days <= 30:
recency = 2
else:
recency = 1
score += recency
breakdown['recency'] = recency
# 2. Length
count = meta['message_count']
if count > 50:
length = 3
elif count > 20:
length = 2
else:
length = 1
score += length
breakdown['length'] = length
# 3. Repo uniqueness (first session mentioning a repo gets bonus)
repo_score = 0
for repo in meta.get('repos_mentioned', []):
if repo not in seen_repos:
seen_repos.add(repo)
repo_score = max(repo_score, 5)
else:
repo_score = max(repo_score, 1)
score += repo_score
breakdown['repo_unique'] = repo_score
# 4. Outcome
if meta.get('is_failure'):
outcome = 3
elif meta.get('last_content_preview', '').strip():
outcome = 2 # has some content = likely completed
else:
outcome = 1
score += outcome
breakdown['outcome'] = outcome
# 5. Tool calls
if meta.get('tool_call_count', 0) > 10:
tool = 2
else:
tool = 0
score += tool
breakdown['tool_calls'] = tool
return score, breakdown
# --- Main ---
def main():
parser = argparse.ArgumentParser(description="Score and rank sessions for harvesting")
parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'),
help='Directory containing session files')
parser.add_argument('--count', type=int, default=100, help='Number of top sessions to return')
parser.add_argument('--repo', default='', help='Filter to sessions mentioning this repo')
parser.add_argument('--since', default='', help='Only score sessions after this date (YYYY-MM-DD)')
parser.add_argument('--min-score', type=int, default=0, help='Minimum score threshold')
parser.add_argument('--output', default='', help='Output file (JSON). Default: stdout')
parser.add_argument('--format', choices=['json', 'paths', 'table'], default='table',
help='Output format: json (full), paths (one per line), table (human)')
parser.add_argument('--top-percent', type=float, default=0, help='Return top N%% instead of --count')
args = parser.parse_args()
sessions_dir = Path(args.sessions_dir)
if not sessions_dir.is_dir():
print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr)
sys.exit(1)
# Find all JSONL files
print(f"Scanning {sessions_dir}...", file=sys.stderr)
t0 = time.time()
session_files = list(sessions_dir.glob('*.jsonl'))
total = len(session_files)
print(f"Found {total} session files", file=sys.stderr)
# Parse since date
since_dt = None
if args.since:
since_dt = datetime.strptime(args.since, '%Y-%m-%d').replace(tzinfo=timezone.utc)
# Score all sessions
now = datetime.now(timezone.utc)
seen_repos = set() # Track repos for uniqueness scoring
scored = []
for i, sf in enumerate(session_files):
# Date filter (fast path: check filename first)
if since_dt:
ts = parse_session_timestamp(sf.name)
if ts and ts < since_dt:
continue
meta = scan_session_fast(str(sf))
# Repo filter
if args.repo:
repos = meta.get('repos_mentioned', [])
if args.repo.lower() not in [r.lower() for r in repos]:
# Also check filename
if args.repo.lower() not in sf.name.lower():
continue
score, breakdown = score_session(meta, now, seen_repos)
if score >= args.min_score:
scored.append({
'path': str(sf),
'filename': sf.name,
'score': score,
'breakdown': breakdown,
'message_count': meta['message_count'],
'repos': meta['repos_mentioned'],
'is_failure': meta['is_failure'],
})
if (i + 1) % 5000 == 0:
elapsed = time.time() - t0
print(f" Scanned {i + 1}/{total} ({elapsed:.1f}s)", file=sys.stderr)
elapsed = time.time() - t0
print(f"Scored {len(scored)} sessions in {elapsed:.1f}s", file=sys.stderr)
# Sort by score descending
scored.sort(key=lambda x: x['score'], reverse=True)
# Apply count or percent
if args.top_percent > 0:
count = max(1, int(len(scored) * args.top_percent / 100))
else:
count = args.count
scored = scored[:count]
# Output
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(scored, f, indent=2)
print(f"Wrote {len(scored)} sessions to {args.output}", file=sys.stderr)
elif args.format == 'json':
json.dump(scored, sys.stdout, indent=2)
elif args.format == 'paths':
for s in scored:
print(s['path'])
else: # table
print(f"{'SCORE':>5} {'MSGS':>5} {'REPOS':<25} {'FILE'}")
print(f"{'-'*5} {'-'*5} {'-'*25} {'-'*40}")
for s in scored:
repos = ', '.join(s['repos'][:2]) if s['repos'] else '-'
fail = ' FAIL' if s['is_failure'] else ''
print(f"{s['score']:>5} {s['message_count']:>5} {repos:<25} {s['filename'][:40]}{fail}")
if __name__ == '__main__':
main()

View File

@@ -1,141 +0,0 @@
"""Tests for knowledge_gap_identifier module."""
import sys
import os
import tempfile
import shutil
from pathlib import Path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts'))
from knowledge_gap_identifier import KnowledgeGapIdentifier, GapType, GapSeverity
def _make_repo(tmpdir, structure):
"""Create a test repo from a dict of {path: content}."""
for rel_path, content in structure.items():
p = Path(tmpdir) / rel_path
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content)
def test_undocumented_symbol():
with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, {
"src/calculator.py": "def add(a, b):\n return a + b\n",
"README.md": "# Calculator\n",
})
report = KnowledgeGapIdentifier().analyze(tmpdir)
undocumented = [g for g in report.gaps if g.gap_type == GapType.UNDOCUMENTED]
assert any(g.name == "add" for g in undocumented), "add should be undocumented"
def test_documented_symbol_no_gap():
with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, {
"src/calculator.py": "def add(a, b):\n return a + b\n",
"README.md": "# Calculator\nUse `add()` to add numbers.\n",
})
report = KnowledgeGapIdentifier().analyze(tmpdir)
undocumented = [g for g in report.gaps
if g.gap_type == GapType.UNDOCUMENTED and g.name == "add"]
assert len(undocumented) == 0, "add is documented, should not be flagged"
def test_untested_module():
with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, {
"src/calculator.py": "def add(a, b):\n return a + b\n",
"src/helper.py": "def format(x):\n return str(x)\n",
"tests/test_calculator.py": "from src.calculator import add\nassert add(1,2) == 3\n",
})
report = KnowledgeGapIdentifier().analyze(tmpdir)
untested = [g for g in report.gaps if g.gap_type == GapType.UNTESTED]
assert any("helper" in g.name for g in untested), "helper should be untested"
def test_tested_module_no_gap():
with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, {
"src/calculator.py": "def add(a, b):\n return a + b\n",
"tests/test_calculator.py": "def test_add():\n assert True\n",
})
report = KnowledgeGapIdentifier().analyze(tmpdir)
untested = [g for g in report.gaps
if g.gap_type == GapType.UNTESTED and "calculator" in g.name]
assert len(untested) == 0, "calculator has tests, should not be flagged"
def test_missing_implementation():
with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, {
"src/app.py": "def run():\n pass\n",
"docs/api.md": "# API\nUse `NonExistentClass` to do things.\n",
})
report = KnowledgeGapIdentifier().analyze(tmpdir)
missing = [g for g in report.gaps if g.gap_type == GapType.MISSING_IMPLEMENTATION]
assert any(g.name == "NonExistentClass" for g in missing)
def test_private_symbols_skipped():
with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, {
"src/app.py": "def _internal():\n pass\ndef public():\n pass\n",
"README.md": "# App\n",
})
report = KnowledgeGapIdentifier().analyze(tmpdir)
undocumented_names = [g.name for g in report.gaps if g.gap_type == GapType.UNDOCUMENTED]
assert "_internal" not in undocumented_names, "Private symbols should be skipped"
assert "public" in undocumented_names
def test_empty_repo():
with tempfile.TemporaryDirectory() as tmpdir:
report = KnowledgeGapIdentifier().analyze(tmpdir)
assert len(report.gaps) == 0
def test_invalid_path():
report = KnowledgeGapIdentifier().analyze("/nonexistent/path/xyz")
assert len(report.gaps) == 1
assert report.gaps[0].severity == GapSeverity.ERROR
def test_report_summary():
with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, {
"src/app.py": "class MyService:\n def handle(self):\n pass\n",
"README.md": "# App\n",
})
report = KnowledgeGapIdentifier().analyze(tmpdir)
summary = report.summary()
assert "UNDOCUMENTED" in summary
assert "MyService" in summary
def test_report_to_dict():
with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, {
"src/app.py": "def hello():\n pass\n",
"README.md": "# App\n",
})
report = KnowledgeGapIdentifier().analyze(tmpdir)
d = report.to_dict()
assert "total_gaps" in d
assert "gaps" in d
assert isinstance(d["gaps"], list)
assert d["total_gaps"] > 0
if __name__ == "__main__":
test_undocumented_symbol()
test_documented_symbol_no_gap()
test_untested_module()
test_tested_module_no_gap()
test_missing_implementation()
test_private_symbols_skipped()
test_empty_repo()
test_invalid_path()
test_report_summary()
test_report_to_dict()
print("All 10 tests passed.")