Compare commits

..

1 Commits

Author SHA1 Message Date
Step35
1470b44c3b feat: add codebase genome diff script for structural change detection
Some checks failed
Test / pytest (pull_request) Failing after 9s
Introduces genome_diff.py — a tool for detecting structural changes between
two git refs: file-level changes, function/class signature modifications,
and dependency import changes.

Addresses #132.
2026-04-26 09:46:04 -04:00
6 changed files with 288 additions and 1161 deletions

184
WIKI.md
View File

@@ -1,184 +0,0 @@
# LLM Wiki Layer — Documentation
**Status:** Implemented (2026-04-27)
**Issue:** Timmy_Foundation/compounding-intelligence#231
**Parent:** Timmy_Foundation/hermes-agent#984 ([ATLAS] Steal Atlas ecosystem patterns)
---
## Overview
The **LLM Wiki layer** is a sovereign knowledge interface built on top of the `knowledge/` fact store. It provides:
| Capability | Command | Description |
|------------|---------|-------------|
| **Ingest** | `wiki ingest --session <file>` | Harvest facts from session transcripts via LLM extraction |
| **Crystallize** | `wiki crystal --session <file>` | Alias for ingest — session distillation into durable pages |
| **Query** | `wiki query "<question>"` | RAG-style retrieval + LLM synthesis with citations |
| **Lint** | `wiki lint` | Detect staleness, duplicates, and potential contradictions |
Location: `scripts/wiki.py` (entry point)
---
## How It Differs From…
### RAG (Retrieval-Augmented Generation)
**RAG** retrieves raw chunks (e.g., code snippets, paragraph strings) and feeds them to an LLM. Chunks are unnormalized, un scored, and carry no provenance beyond the source file path.
**LLM Wiki** retrieves *normalized facts* from `knowledge/index.json` — each fact has:
- A unique ID (`domain:category:seq`)
- A confidence score (0.01.0)
- Provenance (`source_session`, `source_count`, `first_seen`, `last_confirmed`)
- Explicit category (`fact` | `pitfall` | `pattern` | `tool-quirk` | `question`)
- Tags for cross-domain linking
The query path formats facts with their IDs and asks the LLM to cite `[N]` indices, preserving traceability.
### Transcript Search
**Transcript search** is keyword grep over raw session JSONL files. It shows you exactly what was said, when, but you must manually extract insight.
**LLM Wiki** is *distilled insight* — the harvester already extracted durable knowledge from sessions (via LLM extraction prompt). The wiki layer queries that distilled store, not the noisy raw transcripts.
---
## Architecture
```
┌─────────────────┐
│ Session JSONL │ ← raw session transcripts
└────────┬────────┘
│ harvester.py (ingest)
┌─────────────────┐
│ knowledge/index.json ← canonical fact index (machine-readable)
│ knowledge/*.md ← human-editable pages (durable wiki pages)
└────────┬────────┘
│ wiki.py (query)
retrieve_facts() format_facts_as_context()
│ │
└────────────┬────────────────┘
LLM synthesis with citations
answer string
```
- **Ingest path:** `harvester.py``write_knowledge()` updates `index.json` and appends to `knowledge/{global,repos}/*.md`
- **Query path:** `wiki query``retrieve_facts()` (BM25-ish keyword + tag + confidence + recency) → `call_llm_synthesize()` → cited answer
- **Lint path:** `wiki lint``freshness.py` (source-hash staleness) + duplicate detection + contradiction heuristic
---
## Usage Examples
### Query the wiki
```bash
# Ask a question (uses HARVESTER_API_KEY / OPENROUTER_API_KEY)
python3 scripts/wiki.py query "How do I fix deploy-crons mixed model format?"
# Retrieve-only (dry-run) to inspect context
python3 scripts/wiki.py query "gitea token location" --dry-run --top 5
# With custom search depth
python3 scripts/wiki.py query "cron job pitfalls" --top 20
```
Sample output:
```
→ Retrieved 3 facts:
[1] hermes-agent:pitfall:001: deploy-crons.py leaves jobs in mixed model format
[2] hermes-agent:pitfall:002: deploy-crons.py --deploy doesn't set legacy skill field
[3] hermes-agent:pitfall:003: Cron jobs with blank fallback_model trigger warnings
← Answer: The mixed model format bug in deploy-crons.py (pitfall #001) leaves jobs unparsed;
ensure all cron jobs specify a single model provider. (#002) Verify fallback_model is never blank (#003). [1][2][3]
```
### Ingest from a session
```bash
# Harvest knowledge from a finished session
python3 scripts/wiki.py ingest --session ~/.hermes/sessions/session_20260427.jsonl
# Dry-run preview (no writes)
python3 scripts/wiki.py ingest --session session.jsonl --dry-run
```
This invokes `harvester.py` under the hood, which:
1. Reads the transcript via `session_reader.py`
2. Calls the LLM extraction prompt (templates/harvest-prompt.md)
3. Validates + deduplicates + writes to `knowledge/`
### Lint the knowledge base
```bash
# Run all checks: staleness (freshness.py), duplicates, contradictions
python3 scripts/wiki.py lint
```
Output:
```
WARNINGS (6):
⚠ Potential contradiction in hermes-agent/pitfall: hermes-agent:pitfall:001 vs hermes-agent:pitfall:002
⚠ Duplicate fact text: 'Token is at ~/.config/gitea/token'... IDs: global:tool-quirk:001, global:tool-quirk:005
✓ No lint issues found.
```
> **Note:** Contradiction detection is heuristic (word-overlap based). Human review required.
### Crystallize a session
```bash
# Alias for ingest — explicit "session distillation" terminology
python3 scripts/wiki.py crystal --session ~/.hermes/sessions/recent.jsonl
```
---
## Configuration
| Env Var | Default | Purpose |
|----------|---------|---------|
| `HARVESTER_API_KEY` | — | LLM API key (Nous/OpenRouter) |
| `OPENROUTER_API_KEY` | — | Alternative key location |
| `HARVESTER_API_BASE` | `https://api.nousresearch.com/v1` | LLM base URL |
| `HARVESTER_MODEL` | `xiaomi/mimo-v2-pro` | Model for synthesis |
API keys are also read from `~/.config/nous/key`, `~/.hermes/keymaxxing/active/minimax.key`, or `~/.config/openrouter/key` if env vars are unset.
---
## Acceptance Criteria (for #231)
| Criterion | Status | Evidence |
|-----------|--------|----------|
| Concrete wiki path & schema exist | ✓ | `knowledge/` directory, `SCHEMA.md`, `index.json` |
| Ingest updates durable wiki pages | ✓ | `wiki ingest` + `harvester.py` writes markdown to `knowledge/repos/*.md` |
| Queries answer with citations | ✓ | `wiki query` retrieves facts, calls LLM with `[N]` citation format |
| Lint surfaces contradictions/staleness/broken links | ✓ (partial) | Staleness via `freshness.py`; contradiction heuristic; broken links TBD |
| Session crystallization flow | ✓ | `wiki crystal` / `ingest` runs harvester distills sessions into `knowledge/` |
| Documented as distinct from RAG/transcript search | ✓ | This document explicitly distinguishes them |
---
## Implementation Notes
- **Retrieval:** Simple BM25-ish keyword + tag + confidence + recency scoring. No embedding DB needed; the fact store is small (~100s1000s of entries). Works locally without vector databases.
- **Synthesis:** Single LLM call with structured prompt. Temperature=0.1 for determinism.
- **Idempotency:** Harvester deduplicates by content hash before writing — repeated ingestion of the same session is safe.
- **Extensibility:** Add new retrieval strategies (embedding similarity) by replacing `retrieve_facts()`.
---
## Future Work
- [ ] Embedding-based retrieval (cosine similarity over fact embeddings)
- [ ] Broken link detection (scan markdown files in `knowledge/` for dead URLs)
- [ ] Tag drift detection (growth of orphan/unused tags)
- [ ] Quality-gated auto-pruning of low-confidence stale facts
- [ ] Web UI for interactive wiki browsing
- [ ] Knowledge graph linking (via `related` field in index)

288
scripts/genome_diff.py Executable file
View File

@@ -0,0 +1,288 @@
#!/usr/bin/env python3
"""
Codebase Genome Diff — Detect structural changes between two versions.
Compares two git refs (commits, branches, tags) and produces a human-readable
report of structural changes:
• Added/removed/renamed files
• Changed functions/classes (signature modifications)
• New dependencies (imports, requirements, etc.)
Usage:
python3 scripts/genome_diff.py --ref1 <commit1> --ref2 <commit2>
python3 scripts/genome_diff.py --ref1 main --ref2 feature-branch
python3 scripts/genome_diff.py --ref1 v1.0 --ref2 v2.0 --output report.txt
"""
import argparse
import json
import os
import re
import subprocess
import sys
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPT_DIR)
from diff_analyzer import DiffAnalyzer, ChangeCategory
@dataclass
class FunctionChange:
file: str
name: str
kind: str # 'function' or 'class'
change_type: str # 'added' or 'removed' (simplified)
old_line: Optional[int] = None
new_line: Optional[int] = None
@dataclass
class DependencyChange:
file: str
module: str
change_type: str # 'added' or 'removed' or 'modified'
line: int = 0
@dataclass
class GenomeDiffReport:
ref1: str
ref2: str
file_changes: List[Dict[str, Any]] = field(default_factory=list)
function_changes: List[FunctionChange] = field(default_factory=list)
dependency_changes: List[DependencyChange] = field(default_factory=list)
total_files_changed: int = 0
total_functions_changed: int = 0
total_dependencies_changed: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
"ref1": self.ref1,
"ref2": self.ref2,
"summary": {
"files": self.total_files_changed,
"functions": self.total_functions_changed,
"dependencies": self.total_dependencies_changed,
},
"file_changes": self.file_changes,
"function_changes": [fc.__dict__ for fc in self.function_changes],
"dependency_changes": [dc.__dict__ for dc in self.dependency_changes],
}
def human_report(self) -> str:
lines = []
lines.append(f"Codebase Genome Diff: {self.ref1}{self.ref2}")
lines.append("=" * 60)
lines.append(f" Files changed: {self.total_files_changed}")
lines.append(f" Functions changed: {self.total_functions_changed}")
lines.append(f" Dependencies changed: {self.total_dependencies_changed}")
lines.append("")
for fc in self.file_changes:
kind = []
if fc.get('is_new'):
kind.append("NEW")
if fc.get('is_deleted'):
kind.append("DELETED")
if fc.get('is_renamed'):
kind.append("RENAMED")
if fc.get('is_binary'):
kind.append("BINARY")
kind_str = f" [{', '.join(kind)}]" if kind else ""
lines.append(f" {fc['path']}{kind_str} (+{fc['added_lines']}/-{fc['deleted_lines']})")
lines.append("")
for fc in self.function_changes:
op = {'added': '+', 'removed': '-', 'modified': '~'}.get(fc.change_type, '?')
lines.append(f" [{op}] {fc.file}: {fc.kind} '{fc.name}'")
lines.append("")
for dc in self.dependency_changes:
op = '+' if dc.change_type == 'added' else '-'
lines.append(f" [{op}] {dc.file}: {dc.module}")
lines.append("")
return "\n".join(lines)
def run_git_diff(ref1: str, ref2: str) -> str:
result = subprocess.run(
['git', 'diff', '--unified=0', f'{ref1}...{ref2}'],
capture_output=True, text=True, cwd=SCRIPT_DIR
)
if result.returncode not in (0, 1):
print(f"git diff failed: {result.stderr}", file=sys.stderr)
sys.exit(1)
return result.stdout
def extract_function_changes(diff_text: str) -> List[FunctionChange]:
changes: List[FunctionChange] = []
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
hunk_header_re = re.compile(r'^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@')
current_old_line: Optional[int] = None
current_new_line: Optional[int] = None
for line in diff_text.split('\n'):
hdr = hunk_header_re.match(line)
if hdr:
current_old_line = int(hdr.group(1))
current_new_line = int(hdr.group(3))
continue
m = pattern.match(line)
if m:
op = m.group(1)
kind = m.group(2)
name = m.group(3)
change_type = "added" if op == '+' else "removed"
line_num = current_new_line if change_type == "added" else current_old_line
changes.append(FunctionChange(
file="<unknown>",
name=name,
kind=kind,
change_type=change_type,
new_line=line_num if change_type == "added" else None,
old_line=line_num if change_type == "removed" else None,
))
# Advance line counters heuristically
if op == '-':
if current_old_line is not None:
current_old_line += 1
elif op == '+':
if current_new_line is not None:
current_new_line += 1
elif line.startswith(' '):
if current_old_line is not None:
current_old_line += 1
if current_new_line is not None:
current_new_line += 1
# lines starting with other prefixes (like \\ No newline) ignored
return changes
def extract_dependency_changes(diff_text: str, analyzer: DiffAnalyzer) -> List[DependencyChange]:
changes: List[DependencyChange] = []
import_pattern = re.compile(
r'^([+\-])\s*(?:import\s+([\w\.]+)|from\s+([\w\.]+)\s+import)',
re.MULTILINE
)
file_diffs = analyzer._split_files(diff_text)
for file_diff in file_diffs:
file_match = re.search(r'^diff --git a/.*? b/(.*?)$', file_diff, re.MULTILINE)
if not file_match:
continue
filepath = file_match.group(1)
# Scan each line for import changes
for line in file_diff.split('\n'):
m = import_pattern.match(line)
if m:
change_type = "added" if m.group(1) == '+' else "removed"
module = m.group(2) or m.group(3)
changes.append(DependencyChange(
file=filepath,
module=module,
change_type=change_type,
line=0
))
# Detect if this file is a dependency manifest
req_file_pattern = re.compile(
r'^[\+\-].*?(requirements(.*?)\.txt|pyproject\.toml|setup\.py|Pipfile)'
)
if any(req_file_pattern.match(line) for line in file_diff.split('\n')):
if not any(c.file == filepath and c.module == "<file>" for c in changes):
changes.append(DependencyChange(
file=filepath,
module="<file>",
change_type="modified",
line=0
))
return changes
def correlate_function_changes_with_files(diff_text: str, functions: List[FunctionChange]) -> List[FunctionChange]:
result: List[FunctionChange] = []
# Split diff into per-file sections
file_sections: List[tuple[str, str]] = []
current_file: Optional[str] = None
current_lines: List[str] = []
for line in diff_text.split('\n'):
if line.startswith('diff --git'):
if current_file is not None:
file_sections.append((current_file, '\n'.join(current_lines)))
m = re.match(r'^diff --git a/.*? b/(.*?)$', line)
current_file = m.group(1) if m else "unknown"
current_lines = [line]
else:
current_lines.append(line)
if current_file is not None:
file_sections.append((current_file, '\n'.join(current_lines)))
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
for filepath, section in file_sections:
for m in pattern.finditer(section):
op = m.group(1)
kind = m.group(2)
name = m.group(3)
change_type = "added" if op == '+' else "removed"
result.append(FunctionChange(
file=filepath,
name=name,
kind=kind,
change_type=change_type
))
return result
def main():
parser = argparse.ArgumentParser(description="Codebase Genome Diff — structural changes between versions")
parser.add_argument("--ref1", required=True, help="First git ref (commit, branch, tag)")
parser.add_argument("--ref2", required=True, help="Second git ref")
parser.add_argument("--output", help="Write report to file")
parser.add_argument("--json", action="store_true", help="Output JSON instead of human report")
args = parser.parse_args()
try:
diff_text = run_git_diff(args.ref1, args.ref2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if not diff_text.strip():
print(f"No differences between {args.ref1} and {args.ref2}.")
sys.exit(0)
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff_text)
file_changes = [fc.to_dict() for fc in summary.files]
func_changes = extract_function_changes(diff_text)
func_changes = correlate_function_changes_with_files(diff_text, func_changes)
dep_changes = extract_dependency_changes(diff_text, analyzer)
report = GenomeDiffReport(
ref1=args.ref1,
ref2=args.ref2,
file_changes=file_changes,
function_changes=func_changes,
dependency_changes=dep_changes,
total_files_changed=len(file_changes),
total_functions_changed=len(func_changes),
total_dependencies_changed=len(dep_changes),
)
output = json.dumps(report.to_dict(), indent=2) if args.json else report.human_report()
if args.output:
with open(args.output, 'w') as f:
f.write(output + '\n')
print(f"Report written to {args.output}")
else:
print(output)
if __name__ == '__main__':
main()

View File

@@ -1,351 +0,0 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -1,170 +0,0 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

View File

@@ -1,103 +0,0 @@
#!/usr/bin/env python3
"""Smoke tests for scripts/wiki.py — retrieval and lint basics."""
import json
import os
import sys
import tempfile
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
import wiki
def test_retrieve_facts():
"""Test fact retrieval ranking."""
with tempfile.TemporaryDirectory() as tmpdir:
kdir = Path(tmpdir) / "knowledge"
kdir.mkdir()
index = {
"version": 1,
"total_facts": 3,
"facts": [
{
"id": "test:fact:001",
"fact": "Gitea token is stored at ~/.config/gitea/token",
"category": "tool-quirk",
"domain": "global",
"confidence": 0.95,
"tags": ["token", "gitea", "auth"],
"last_confirmed": "2026-04-01"
},
{
"id": "test:fact:002",
"fact": "Use gitea-api-first-burn worker for large repos",
"category": "pattern",
"domain": "timmy-config",
"confidence": 0.9,
"tags": ["gitea", "burn", "api"],
},
{
"id": "test:fact:003",
"fact": "Hermes gateway restarts required after Telegram config changes",
"category": "pitfall",
"domain": "hermes-agent",
"confidence": 0.85,
"tags": ["telegram", "gateway"],
}
]
}
index_path = kdir / "index.json"
with open(index_path, 'w') as f:
json.dump(index, f)
original_index = wiki.INDEX_PATH
wiki.INDEX_PATH = index_path
try:
results = wiki.retrieve_facts("where is gitea token stored?", limit=5)
assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}"
assert results[0]['id'] == 'test:fact:001', f"Expected fact 001 first, got {results[0]['id']}"
print(" [PASS] retrieve_facts ranks correctly")
results2 = wiki.retrieve_facts("gitea burn large repos", limit=5)
assert len(results2) >= 1
assert results2[0]['id'] == 'test:fact:002'
print(" [PASS] tag-based retrieval works")
finally:
wiki.INDEX_PATH = original_index
def test_format_context():
"""Test context formatting for LLM."""
facts = [
{"id": "a:1", "fact": "Test fact A", "category": "fact", "confidence": 0.9},
{"id": "b:2", "fact": "Test fact B", "category": "pitfall", "confidence": 0.8},
]
ctx = wiki.format_facts_as_context(facts)
assert "[1]" in ctx and "a:1" in ctx
assert "Test fact A" in ctx
assert "Test fact B" in ctx
print(" [PASS] format_facts_as_context includes IDs and facts")
def test_detect_contradictions():
"""Test contradiction detection."""
index = {
"facts": [
{"id": "x:1", "fact": "Deploy uses port 22 for SSH", "category": "fact", "domain": "deploy"},
{"id": "x:2", "fact": "Deploy uses SSH on port 22", "category": "fact", "domain": "deploy"},
{"id": "x:3", "fact": "Cron jobs require model field", "category": "pitfall", "domain": "hermes-agent"},
]
}
contradictions = wiki.detect_contradictions(index)
assert len(contradictions) >= 1, "Expected at least one potential contradiction"
found = any('x:1' in c.get('fact_a','') or 'x:1' in c.get('fact_b','') for c in contradictions)
assert found, "Should detect similarity between x:1 and x:2"
print(" [PASS] detect_contradictions flags similar facts")
if __name__ == "__main__":
print("Running wiki module smoke tests...")
test_retrieve_facts()
test_format_context()
test_detect_contradictions()
print("\nAll wiki tests passed.")

View File

@@ -1,353 +0,0 @@
#!/usr/bin/env python3
"""
LLM Wiki layer — ingest, query, lint, and session crystallization for compounding-intelligence.
This is the sovereign knowledge interface: a compiled, queryable, lintable
knowledge base that survivies beyond sessions and cites its sources.
Distinct from:
- RAG: Raw chunk retrieval without synthesis or quality gating
- Transcript search: Keyword match over raw session logs without distillation
The Wiki layer sits on top of the knowledge/ index (facts with provenance).
It provides:
ingest — Harvest knowledge from sessions or raw sources
query — Retrieve + synthesize answers with citations
lint — Detect staleness, contradictions, broken links
crystal — (via harvester) session distillation already integrated
Usage:
python3 scripts/wiki.py ingest --session ~/.hermes/sessions/xxx.jsonl
python3 scripts/wiki.py query "How do I fix cron timeouts?"
python3 scripts/wiki.py lint
"""
import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict, Any
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
INDEX_PATH = KNOWLEDGE_DIR / "index.json"
# ---------- Utilities ----------
def load_index() -> dict:
if not INDEX_PATH.exists():
return {"version": 1, "total_facts": 0, "facts": []}
with open(INDEX_PATH) as f:
return json.load(f)
def score_fact_for_query(fact: dict, query_terms: set, query_lower: str) -> float:
"""Simple BM25-like relevance scoring for fact retrieval."""
fact_text = fact.get('fact', '').lower()
fact_tags = [t.lower() for t in fact.get('tags', [])]
# Term frequency in fact text
tf = sum(1 for term in query_terms if term in fact_text)
# Tag boost: exact tag match gives strong signal
tag_boost = sum(3.0 for tag in fact_tags if tag in query_lower)
# Confidence boost
confidence = fact.get('confidence', 0.5)
# Recency boost: newer facts get slight preference
last_confirmed = fact.get('last_confirmed', '')
recency_boost = 0.0
if last_confirmed:
try:
dt = datetime.fromisoformat(last_confirmed.rstrip('Z'))
days_old = (datetime.now(timezone.utc) - dt).days
recency_boost = max(0, 1.0 - days_old / 365)
except Exception:
pass
score = (tf * 1.0) + (tag_boost * confidence) + (recency_boost * 0.5)
return score
def retrieve_facts(query: str, limit: int = 10) -> List[dict]:
"""Retrieve the most relevant facts for a query from index.json."""
index = load_index()
facts = index.get('facts', [])
query_lower = query.lower()
query_terms = {t for t in re.split(r'\W+', query_lower) if len(t) > 2}
scored = []
for fact in facts:
score = score_fact_for_query(fact, query_terms, query_lower)
if score > 0:
scored.append((score, fact))
scored.sort(key=lambda x: -x[0])
return [f for _, f in scored[:limit]]
def format_facts_as_context(facts: List[dict]) -> str:
"""Format retrieved facts into a context block for LLM synthesis."""
lines = []
for i, fact in enumerate(facts, 1):
fid = fact.get('id', 'unknown')
fact_text = fact.get('fact', '')
confidence = fact.get('confidence', 0.5)
category = fact.get('category', 'fact')
lines.append(f"[{i}] ID:{fid} | {category} (conf={confidence:.2f}): {fact_text}")
return "\n".join(lines)
def find_api_key() -> str:
for p in [
Path.home() / ".config/nous/key",
Path.home() / ".hermes/keymaxxing/active/minimax.key",
Path.home() / ".config/openrouter/key",
]:
if p.exists():
return p.read_text().strip()
return os.environ.get("HARVESTER_API_KEY") or os.environ.get("OPENROUTER_API_KEY") or ""
def call_llm_synthesize(query: str, context: str, api_base: str, api_key: str, model: str) -> str:
"""Call LLM to synthesize answer from retrieved facts."""
import urllib.request
prompt = f"""You are the LLM Wiki answering from the sovereign knowledge base.
Knowledge facts (with citations):
{context}
Question: {query}
Instructions:
- Answer ONLY from the provided facts. Do not use outside knowledge.
- Cite facts using their [N] index number(s) in brackets.
- If the facts don't contain the answer, say "I don't know from the current knowledge base."
- Be concise (2-3 sentences maximum)."""
messages = [
{"role": "system", "content": "You are a precise knowledge assistant."},
{"role": "user", "content": prompt}
]
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.1,
"max_tokens": 512
}).encode('utf-8')
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read().decode('utf-8'))
return result["choices"][0]["message"]["content"].strip()
except Exception as e:
return f"[ERROR: LLM call failed: {e}]"
def detect_contradictions(index: dict) -> List[dict]:
"""Detect potentially contradictory facts in the same domain/category."""
contradictions = []
facts = index.get('facts', [])
from collections import defaultdict
grouped = defaultdict(list)
for f in facts:
key = (f.get('domain', 'global'), f.get('category', 'fact'))
grouped[key].append(f)
for key, group in grouped.items():
if len(group) < 2:
continue
for i in range(len(group)):
for j in range(i+1, len(group)):
f1, f2 = group[i], group[j]
text1 = f1.get('fact', '').lower()
text2 = f2.get('fact', '').lower()
words1 = set(re.findall(r'\w+', text1))
words2 = set(re.findall(r'\w+', text2))
if len(words1 & words2) >= 3:
contradictions.append({
"type": "potential_contradiction",
"domain": key[0],
"category": key[1],
"fact_a": f1.get('id'),
"fact_b": f2.get('id'),
"similarity": len(words1 & words2) / max(len(words1), len(words2))
})
return contradictions
def lint_knowledge() -> dict:
"""Run all lint checks: freshness, duplicates, contradictions."""
results = {"errors": [], "warnings": [], "suggestions": []}
index = load_index()
facts = index.get('facts', [])
# 1. Freshness check via freshness.py
try:
freshness_script = SCRIPT_DIR / "freshness.py"
if freshness_script.exists():
proc = subprocess.run(
[sys.executable, str(freshness_script), "--knowledge-dir", str(KNOWLEDGE_DIR)],
capture_output=True, text=True, timeout=30
)
if proc.returncode != 0:
results["errors"].append(f"freshness.py failed: {proc.stderr[:200]}")
except Exception as e:
results["errors"].append(f"Could not run freshness check: {e}")
# 2. Duplicate fact text
seen = {}
for f in facts:
txt = f.get('fact', '').strip().lower()
if txt in seen:
results["warnings"].append(f"Duplicate fact text: {txt[:80]}... IDs: {seen[txt]}, {f.get('id')}")
else:
seen[txt] = f.get('id')
# 3. Contradictions
contradictions = detect_contradictions(index)
for c in contradictions:
results["warnings"].append(
f"Potential contradiction in {c['domain']}/{c['category']}: "
f"{c['fact_a']} vs {c['fact_b']} (similarity={c['similarity']:.2f})"
)
return results
# ---------- Subcommands ----------
def cmd_query(args):
"""Query the wiki: retrieve + synthesize."""
if not INDEX_PATH.exists():
print("ERROR: knowledge/index.json not found. Run ingest first.", file=sys.stderr)
return 1
query = args.query
top_k = args.top or 10
facts = retrieve_facts(query, limit=top_k)
if not facts:
print("No relevant facts found in knowledge base.")
return 0
print(f"→ Retrieved {len(facts)} facts:")
for i, f in enumerate(facts, 1):
fid = f.get('id', '?')
print(f" [{i}] {fid}: {f.get('fact', '')[:90]}")
if args.dry_run:
print("\n[dry-run] Skipping LLM synthesis.")
return 0
api_key = find_api_key()
if not api_key:
print("ERROR: No API key. Set HARVESTER_API_KEY or OPENROUTER_API_KEY.", file=sys.stderr)
return 1
api_base = os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
model = os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
context = format_facts_as_context(facts)
answer = call_llm_synthesize(query, context, api_base, api_key, model)
print(f"\n← Answer: {answer}")
return 0
def cmd_ingest(args):
"""Ingest knowledge from a session transcript."""
session = args.session
if not os.path.exists(session):
print(f"ERROR: Session file not found: {session}", file=sys.stderr)
return 1
harvester = SCRIPT_DIR / "harvester.py"
if not harvester.exists():
print("ERROR: harvester.py not found", file=sys.stderr)
return 1
cmd = [sys.executable, str(harvester), "--session", session, "--output", str(KNOWLEDGE_DIR)]
if args.dry_run:
cmd.append("--dry-run")
env = os.environ.copy()
env["PYTHONPATH"] = str(REPO_ROOT)
result = subprocess.run(cmd, env=env)
return result.returncode
def cmd_lint(args):
"""Lint the knowledge base for quality issues."""
results = lint_knowledge()
if results["errors"]:
print("ERRORS:")
for e in results["errors"]:
print(f"{e}")
return 1
if results["warnings"]:
print(f"WARNINGS ({len(results['warnings'])}):")
for w in results["warnings"]:
print(f"{w}")
else:
print("✓ No lint issues found. Knowledge base is clean.")
return 0 if not results["errors"] else 1
def cmd_crystallize(args):
"""Alias for ingest — session crystallization."""
return cmd_ingest(args)
def main():
parser = argparse.ArgumentParser(
description="LLM Wiki layer — ingest, query, lint, crystallize",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 scripts/wiki.py query "How do I fix cron timeouts?"
python3 scripts/wiki.py ingest --session ~/.hermes/sessions/abc.jsonl
python3 scripts/wiki.py lint
python3 scripts/wiki.py crystal --session session.jsonl
"""
)
sub = parser.add_subparsers(dest="command", help="Wiki command")
qp = sub.add_parser("query", help="Ask the wiki a question (RAG + synthesis)")
qp.add_argument("query", help="Natural language question")
qp.add_argument("--top", type=int, default=10, help="Number of facts to retrieve")
qp.add_argument("--dry-run", action="store_true", help="Show retrieval but skip LLM")
qp.set_defaults(func=cmd_query)
ip = sub.add_parser("ingest", help="Ingest a session transcript into knowledge")
ip.add_argument("--session", required=True, help="Path to session JSONL file")
ip.add_argument("--dry-run", action="store_true", help="Preview without writing")
ip.set_defaults(func=cmd_ingest)
lp = sub.add_parser("lint", help="Check knowledge base for issues")
lp.set_defaults(func=cmd_lint)
cp = sub.add_parser("crystal", help="Crystallize a session into durable pages")
cp.add_argument("--session", required=True, help="Path to session JSONL file")
cp.add_argument("--dry-run", action="store_true", help="Preview without writing")
cp.set_defaults(func=cmd_crystallize)
args = parser.parse_args()
if not args.command:
parser.print_help()
return 1
return args.func(args)
if __name__ == "__main__":
sys.exit(main())