Compare commits

...

20 Commits

Author SHA1 Message Date
Alexander Payne
810cba5c46 feat(wiki): add LLM Wiki layer — query, ingest, lint, crystal\n\nImplement the sovereign knowledge interface built on top of knowledge/index.json.\n\n- scripts/wiki.py: main CLI with query/ingest/lint/crystal subcommands\n- wiki query: retrieve facts via BM25-ish scoring, synthesize cited answers via LLM\n- wiki ingest: wrapper around harvester.py, distill sessions into durable pages\n- wiki lint: runs freshness.py + duplicate detection + contradiction heuristic\n- wiki crystal: alias for ingest\n- scripts/test_wiki.py: smoke tests for retrieval, context formatting, contradictions\n- WIKI.md: documentation distinguishing wiki from RAG and transcript search\n\nAcceptance criteria for #231:\n✓ Query answers with citations (query)\n✓ Ingest updates durable pages (ingest/crystal)\n✓ Lint surfaces staleness/contradictions (lint)\n✓ Session crystallization flow (crystal)\n✓ Schema/path exists (leverages existing knowledge/)\n✓ Documentation (WIKI.md)\n\nCloses #231
Some checks failed
Test / pytest (pull_request) Failing after 21s
2026-04-27 09:44:45 -04:00
Rockachopa
4b5a675355 feat: add PR complexity scorer — estimate review effort\n\nImplements issue #135: a script that analyzes open PRs and computes\na complexity score (1-10) based on files changed, lines added/removed,\ndependency changes, and test coverage delta. Also estimates review time.\n\nThe scorer can be run with --dry-run to preview or --apply to post\nscore comments directly on PRs.\n\nOutput: metrics/pr_complexity.json with full analysis.\n\nCloses #135
Some checks failed
Test / pytest (push) Failing after 10s
2026-04-26 09:34:57 -04:00
345d2451d0 Merge pull request 'feat: knowledge deduplication — content hash + token similarity (#196)' (#228) from burn/196-1776306000 into main
Some checks failed
Test / pytest (push) Failing after 33s
2026-04-21 15:28:50 +00:00
8aa9c9f018 Merge pull request 'fix: escape DOT renderer quotes in dependency_graph.py (#212)' (#214) from fix/212-dot-quoting into main
Some checks failed
Test / pytest (push) Has been cancelled
2026-04-21 15:26:13 +00:00
277f9e3a2b Merge pull request 'feat: Knowledge freshness cron — detect stale entries from code changes (#200)' (#227) from feat/200-knowledge-freshness-cron into main
Some checks failed
Test / pytest (push) Has been cancelled
2026-04-21 15:21:28 +00:00
21f654a159 Merge pull request 'fix: implement refactoring_opportunity_finder API (#210)' (#221) from burn/210-1776305000 into main
Some checks failed
Test / pytest (push) Has been cancelled
2026-04-21 15:21:20 +00:00
12abaad838 Merge pull request 'fix: syntax errors in perf_bottleneck_finder.py #211' (#217) from fix/perf-bottleneck-syntax-211 into main
Some checks failed
Test / pytest (push) Has been cancelled
2026-04-21 15:21:15 +00:00
c106db2e28 Merge pull request 'fix: escape quotes in DOT renderer (#212)' (#216) from burn/212-fix-dot-quoting into main
Some checks failed
Test / pytest (push) Has been cancelled
2026-04-21 15:21:14 +00:00
242c77cc99 Merge pull request 'fix(#676): update Codebase Genome for compounding-intelligence' (#209) from fix/676 into main
Some checks failed
Test / pytest (push) Has been cancelled
2026-04-21 15:21:09 +00:00
fe94130380 Merge pull request 'feat: quality gate — score and filter knowledge entries (#198)' (#208) from fix/198-quality-gate into main
Some checks failed
Test / pytest (push) Has been cancelled
2026-04-21 15:21:06 +00:00
4181065f60 Merge pull request 'fix(#201): Fix PytestReturnNotNoneWarning in harvest prompt tests' (#207) from fix/201-pytest-warnings into main
Some checks failed
Test / pytest (push) Has been cancelled
2026-04-21 15:21:04 +00:00
cc215e3ed7 feat: knowledge deduplication — content hash + token similarity (#196)
Some checks failed
Test / pytest (pull_request) Failing after 21s
Dedup module for knowledge entries with:
- SHA256 content hashing for exact duplicates
- Token Jaccard similarity for near-duplicates (default 0.95)
- Quality-based merge: keeps higher confidence/source_count
- Metadata merging: tags, related, source_count
- Dry-run mode
- 30 tests passing
- Built-in --test mode with generated duplicates

Usage:
  python scripts/dedup.py --input knowledge/index.json
  python scripts/dedup.py --input knowledge/index.json --dry-run
  python scripts/dedup.py --test

Closes #196.
2026-04-21 07:58:09 -04:00
baa2c84c3f feat: Add test_freshness.py (#200)
Some checks failed
Test / pytest (pull_request) Failing after 26s
2026-04-21 11:57:54 +00:00
6dd354385f feat: Add freshness.py (#200) 2026-04-21 11:57:53 +00:00
Timmy
55adcb31dc fix: implement refactoring_opportunity_finder API (#210)
Some checks failed
Test / pytest (pull_request) Failing after 30s
The test file expects compute_file_complexity(), calculate_refactoring_score(),
and FileMetrics from the script, but only a stub generate_proposals() existed.

Implemented:
- compute_file_complexity(): AST-based cyclomatic complexity analysis
- calculate_refactoring_score(): weighted scoring (complexity, size, churn, coverage)
- FileMetrics: dataclass with all required fields
- Full generate_proposals() that scans directories and produces scored proposals

All 10 tests pass. py_compile succeeds.

Closes #210
2026-04-21 07:29:44 -04:00
Alexander Whitestone
ec0e9d65ca fix: DOT renderer quoting in dependency_graph.py (#212)
Some checks failed
Test / pytest (pull_request) Failing after 30s
Changed double quotes to single quotes for strings containing
double-quote characters in DOT output.

Lines 152-153: "..." -> '...'

Fixes SyntaxError: '(' was never closed
2026-04-21 07:22:47 -04:00
b732172dcc fix: syntax errors in perf_bottleneck_finder.py #211
Some checks failed
Test / pytest (pull_request) Failing after 20s
2026-04-21 11:21:58 +00:00
f7c479c4eb fix: escape quotes in DOT renderer (#212)
Some checks failed
Test / pytest (pull_request) Failing after 13s
Lines 152-153 used unescaped double quotes inside
Python double-quoted string literals. Switched to
single-quoted strings.
2026-04-21 11:20:25 +00:00
c203010e3a fix(#676): update GENOME.md for compounding-intelligence
Some checks failed
Test / pytest (pull_request) Failing after 35s
Previous version was outdated (said scripts were 'not implemented').
Updated to reflect actual state: 18 scripts, 14 test files, populated
knowledge store, active development.
2026-04-21 04:43:54 +00:00
Alexander Whitestone
e1e42c3f8e feat: quality gate — score and filter knowledge entries (#198)
Some checks failed
Test / pytest (pull_request) Failing after 34s
quality_gate.py:
  4-dimension scoring (0.0-1.0):
    specificity (0.3): concrete examples vs vague
    actionability (0.3): can this be used?
    freshness (0.2): exponential decay over time
    source_quality (0.2): model reliability score
  filter_entries(entries, threshold=0.5)
  quality_report() — distribution + pass rate
  CLI: --threshold, --json, --filter

tests/test_quality_gate.py: 14 tests
  specificity: specific high, vague low, empty baseline
  actionability: actionable high, abstract low
  freshness: recent high, old low, none baseline
  source: claude high, ollama low, unknown default
  entry: good high, poor low
  filter: removes low quality
2026-04-20 20:31:04 -04:00
16 changed files with 3160 additions and 206 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
__pycache__/
*.pyc

374
GENOME.md
View File

@@ -1,16 +1,16 @@
# GENOME.md — compounding-intelligence
*Auto-generated codebase genome. Addresses timmy-home#676.*
**Generated:** 2026-04-17
**Repo:** Timmy_Foundation/compounding-intelligence
**Description:** Turn 1B+ daily agent tokens into durable, compounding fleet intelligence.
---
## Project Overview
**What:** A system that turns 1B+ daily agent tokens into durable, compounding fleet intelligence.
Every agent session starts at zero. The same HTTP 405 gets rediscovered as a branch protection issue. The same token path gets searched from scratch. Intelligence evaporates when the session ends.
**Why:** Every agent session starts at zero. The same mistakes get made repeatedly — the same HTTP 405 is rediscovered as a branch protection issue, the same token path is searched for from scratch. Intelligence evaporates when the session ends.
**How:** Three pipelines form a compounding loop:
Compounding-intelligence solves this with three pipelines forming a loop:
```
SESSION ENDS → HARVESTER → KNOWLEDGE STORE → BOOTSTRAPPER → NEW SESSION STARTS SMARTER
@@ -18,222 +18,234 @@ SESSION ENDS → HARVESTER → KNOWLEDGE STORE → BOOTSTRAPPER → NEW SESSION
MEASURER → Prove it's working
```
**Status:** Early stage. Template and test scaffolding exist. Core pipeline scripts (harvester.py, bootstrapper.py, measurer.py, session_reader.py) are planned but not yet implemented. The knowledge extraction prompt is complete and validated.
---
**Status:** Active development. Core pipelines implemented. 20+ scripts, 14 test files, knowledge store populated with real data.
## Architecture
```mermaid
graph TD
A[Session Transcript<br/>.jsonl] --> B[Harvester]
B --> C{Extract Knowledge}
C --> D[knowledge/index.json]
C --> E[knowledge/global/*.md]
C --> F[knowledge/repos/{repo}.md]
C --> G[knowledge/agents/{agent}.md]
D --> H[Bootstrapper]
H --> I[Bootstrap Context<br/>2k token injection]
I --> J[New Session<br/>starts smarter]
J --> A
D --> K[Measurer]
K --> L[metrics/dashboard.md]
K --> M[Velocity / Hit Rate<br/>Error Reduction]
TRANS[Session Transcripts<br/>~/.hermes/sessions/*.jsonl] --> READER[session_reader.py]
READER --> HARVESTER[harvester.py]
HARVESTER -->|LLM extraction| PROMPT[harvest-prompt.md]
HARVESTER --> DEDUP[deduplicate()]
DEDUP --> INDEX[knowledge/index.json]
DEDUP --> GLOBAL[knowledge/global/*.yaml]
DEDUP --> REPO[knowledge/repos/*.yaml]
INDEX --> BOOTSTRAPPER[bootstrapper.py]
BOOTSTRAPPER -->|filter + rank + truncate| CONTEXT[Bootstrap Context<br/>2k token injection]
CONTEXT --> SESSION[New Session starts smarter]
INDEX --> VALIDATOR[validate_knowledge.py]
INDEX --> STALENESS[knowledge_staleness_check.py]
INDEX --> GAPS[knowledge_gap_identifier.py]
TRANS --> SAMPLER[sampler.py]
SAMPLER -->|score + rank| BEST[High-value sessions]
BEST --> HARVESTER
TRANS --> METADATA[session_metadata.py]
METADATA --> SUMMARY[SessionSummary objects]
KNOWLEDGE --> DIFF[diff_analyzer.py]
DIFF --> PROPOSALS[improvement_proposals.py]
PROPOSALS --> PRIORITIES[priority_rebalancer.py]
```
### Pipeline 1: Harvester
## Entry Points
**Status:** Prompt designed. Script not implemented.
### Core Pipelines
Reads finished session transcripts (JSONL). Uses `templates/harvest-prompt.md` to extract durable knowledge into five categories:
| Script | Purpose | Key Functions |
|--------|---------|---------------|
| `harvester.py` | Extract knowledge from session transcripts | `harvest_session()`, `call_llm()`, `deduplicate()`, `validate_fact()` |
| `bootstrapper.py` | Build pre-session context from knowledge store | `build_bootstrap_context()`, `filter_facts()`, `sort_facts()`, `truncate_to_tokens()` |
| `session_reader.py` | Parse JSONL session transcripts | `read_session()`, `extract_conversation()`, `messages_to_text()` |
| `sampler.py` | Score and rank sessions for harvesting value | `scan_session_fast()`, `score_session()` |
| `session_metadata.py` | Extract structured metadata from sessions | `extract_session_metadata()`, `SessionSummary` |
| Category | Description | Example |
|----------|-------------|---------|
| `fact` | Concrete, verifiable information | "Repository X has 5 files" |
| `pitfall` | Errors encountered, wrong assumptions | "Token is at ~/.config/gitea/token, not env var" |
| `pattern` | Successful action sequences | "Deploy: test → build → push → webhook" |
| `tool-quirk` | Environment-specific behaviors | "URL format requires trailing slash" |
| `question` | Identified but unanswered | "Need optimal batch size for harvesting" |
### Analysis & Quality
Output schema per knowledge item:
```json
{
"fact": "One sentence description",
"category": "fact|pitfall|pattern|tool-quirk|question",
"repo": "repo-name or 'global'",
"confidence": 0.0-1.0
}
```
| Script | Purpose |
|--------|---------|
| `validate_knowledge.py` | Validate knowledge index schema compliance |
| `knowledge_staleness_check.py` | Detect stale knowledge (source changed since extraction) |
| `knowledge_gap_identifier.py` | Find untested functions, undocumented APIs, missing tests |
| `diff_analyzer.py` | Analyze code diffs for improvement signals |
| `improvement_proposals.py` | Generate ranked improvement proposals |
| `priority_rebalancer.py` | Rebalance priorities across proposals |
| `automation_opportunity_finder.py` | Find manual steps that can be automated |
| `dead_code_detector.py` | Detect unused code |
| `dependency_graph.py` | Map dependency relationships |
| `perf_bottleneck_finder.py` | Find performance bottlenecks |
| `refactoring_opportunity_finder.py` | Identify refactoring targets |
| `gitea_issue_parser.py` | Parse Gitea issues for knowledge extraction |
### Pipeline 2: Bootstrapper
### Automation
**Status:** Not implemented.
| Script | Purpose |
|--------|---------|
| `session_pair_harvester.py` | Extract training pairs from sessions |
Queries knowledge store before session start. Assembles a compact 2k-token context from relevant facts. Injects into session startup so the agent begins with full situational awareness.
### Pipeline 3: Measurer
**Status:** Not implemented.
Tracks compounding metrics: knowledge velocity (facts/day), error reduction (%), hit rate (knowledge used / knowledge available), task completion improvement.
---
## Directory Structure
## Data Flow
```
compounding-intelligence/
├── README.md # Project overview and architecture
├── GENOME.md # This file (codebase genome)
├── knowledge/ # [PLANNED] Knowledge store
│ ├── index.json # Machine-readable fact index
│ ├── global/ # Cross-repo knowledge
│ ├── repos/{repo}.md # Per-repo knowledge
│ └── agents/{agent}.md # Agent-type notes
├── scripts/
├── test_harvest_prompt.py # Basic prompt validation (2.5KB)
└── test_harvest_prompt_comprehensive.py # Full prompt structure test (6.8KB)
├── templates/
└── harvest-prompt.md # Knowledge extraction prompt (3.5KB)
├── test_sessions/
│ ├── session_success.jsonl # Happy path test data
│ ├── session_failure.jsonl # Failure path test data
│ ├── session_partial.jsonl # Incomplete session test data
│ ├── session_patterns.jsonl # Pattern extraction test data
│ └── session_questions.jsonl # Question identification test data
└── metrics/ # [PLANNED] Compounding metrics
└── dashboard.md
1. Session ends → .jsonl written to ~/.hermes/sessions/
2. sampler.py scores sessions by age, recency, repo coverage
3. harvester.py reads top sessions, calls LLM with harvest-prompt.md
4. LLM extracts facts/pitfalls/patterns/quirks/questions
5. deduplicate() checks against existing index via fact_fingerprint()
6. validate_fact() checks schema compliance
7. write_knowledge() appends to knowledge/index.json + per-repo YAML
8. On next session start, bootstrapper.py:
a. Loads knowledge/index.json
b. Filters by session's repo and agent type
c. Sorts by confidence (high first), then recency
d. Truncates to 2k token budget
e. Injects as pre-context
9. Agent starts with full situational awareness instead of zero
```
---
## Entry Points and Data Flow
### Entry Point 1: Knowledge Extraction (Harvester)
```
Input: Session transcript (JSONL)
templates/harvest-prompt.md (LLM prompt)
Knowledge items (JSON array)
Output: knowledge/index.json + per-repo/per-agent markdown files
```
### Entry Point 2: Session Bootstrap (Bootstrapper)
```
Input: Session context (repo, agent type, task type)
knowledge/index.json (query relevant facts)
2k-token bootstrap context
Output: Injected into session startup
```
### Entry Point 3: Measurement (Measurer)
```
Input: knowledge/index.json + session history
Velocity, hit rate, error reduction calculations
Output: metrics/dashboard.md
```
---
## Key Abstractions
### Knowledge Item
The atomic unit. One sentence, one category, one confidence score. Designed to be small enough that 1000 items fit in a 2k-token bootstrap context.
### Knowledge Item (fact/pitfall/pattern/quirk/question)
```json
{
"fact": "Gitea token is at ~/.config/gitea/token",
"category": "tool-quirk",
"repo": "global",
"confidence": 0.9,
"evidence": "Found during clone attempt",
"source_session": "2026-04-13_abc123",
"extracted_at": "2026-04-13T20:00:00Z"
}
```
### Knowledge Store
A directory structure that mirrors the fleet's mental model:
- `global/` — knowledge that applies everywhere (tool quirks, environment facts)
- `repos/` — knowledge specific to each repo
- `agents/` — knowledge specific to each agent type
### SessionSummary (session_metadata.py)
Extracted metadata per session: duration, token count, tools used, repos touched, error count, outcome.
### Confidence Score
0.01.0 scale. Defines how certain the harvester is about each extracted fact:
- 0.91.0: Explicitly stated with verification
- 0.70.8: Clearly implied by multiple data points
- 0.50.6: Suggested but not fully verified
- 0.30.4: Inferred from limited data
- 0.10.2: Speculative or uncertain
### Gap / GapReport (knowledge_gap_identifier.py)
Structured gap analysis: untested functions, undocumented APIs, missing tests. Severity: critical/high/medium/low.
### Bootstrap Context
The 2k-token injection that a new session receives. Assembled from the most relevant knowledge items for the current task, filtered by confidence > 0.7, deduplicated, and compressed.
### Knowledge Index (knowledge/index.json)
Machine-readable fact store. 12KB, populated with real data. Categories: fact, pitfall, pattern, tool-quirk, question.
---
## Knowledge Store
```
knowledge/
├── index.json # Master fact store (12KB, populated)
├── SCHEMA.md # Schema documentation
├── global/
│ ├── pitfalls.yaml # Cross-repo pitfalls (2KB)
│ └── tool-quirks.yaml # Tool-specific quirks (2KB)
├── repos/
│ ├── hermes-agent.yaml # hermes-agent knowledge (2KB)
│ └── the-nexus.yaml # the-nexus knowledge (2KB)
└── agents/ # Per-agent knowledge (empty)
```
## API Surface
### Internal (scripts not yet implemented)
### LLM API (consumed)
| Provider | Endpoint | Usage |
|----------|----------|-------|
| Nous Research | `https://inference-api.nousresearch.com/v1` | Knowledge extraction |
| Ollama | `http://localhost:11434/v1` | Local fallback |
| Script | Input | Output | Status |
|--------|-------|--------|--------|
| `harvester.py` | Session JSONL path | Knowledge items JSON | PLANNED |
| `bootstrapper.py` | Repo + agent type | 2k-token context string | PLANNED |
| `measurer.py` | Knowledge store path | Metrics JSON | PLANNED |
| `session_reader.py` | Session JSONL path | Parsed transcript | PLANNED |
### Prompt (templates/harvest-prompt.md)
The extraction prompt is the core "API." It takes a session transcript and returns structured JSON. It defines:
- Five extraction categories
- Output format (JSON array of knowledge items)
- Confidence scoring rubric
- Constraints (no hallucination, specificity, relevance, brevity)
- Example input/output pair
---
### File API (consumed/produced)
| Path | Format | Direction |
|------|--------|-----------|
| `~/.hermes/sessions/*.jsonl` | JSONL | Input (session transcripts) |
| `knowledge/index.json` | JSON | Output (master fact store) |
| `knowledge/global/*.yaml` | YAML | Output (cross-repo knowledge) |
| `knowledge/repos/*.yaml` | YAML | Output (per-repo knowledge) |
| `templates/harvest-prompt.md` | Markdown | Config (extraction prompt) |
## Test Coverage
### What Exists
**14 test files** covering core pipelines:
| File | Tests | Coverage |
|------|-------|----------|
| `scripts/test_harvest_prompt.py` | 2 tests | Prompt file existence, sample transcript |
| `scripts/test_harvest_prompt_comprehensive.py` | 5 tests | Prompt structure, categories, fields, confidence scoring, size limits |
| `test_sessions/*.jsonl` | 5 sessions | Success, failure, partial, patterns, questions |
| Test File | Covers |
|-----------|--------|
| `test_harvest_prompt.py` | Prompt validation, hallucination detection |
| `test_harvest_prompt_comprehensive.py` | Extended prompt testing |
| `test_harvester_pipeline.py` | Harvester extraction + dedup |
| `test_bootstrapper.py` | Context building, filtering, truncation |
| `test_session_pair_harvester.py` | Training pair extraction |
| `test_improvement_proposals.py` | Proposal generation |
| `test_priority_rebalancer.py` | Priority scoring |
| `test_knowledge_staleness.py` | Staleness detection |
| `test_automation_opportunity_finder.py` | Automation detection |
| `test_diff_analyzer.py` | Diff analysis |
| `test_gitea_issue_parser.py` | Issue parsing |
| `test_refactoring_opportunity_finder.py` | Refactoring signals |
| `test_knowledge_gap_identifier.py` | Gap analysis |
| `test_perf_bottleneck_finder.py` | Perf bottleneck detection |
### What's Missing
### Coverage Gaps
1. **Harvester integration test** — Does the prompt actually extract correct knowledge from real transcripts?
2. **Bootstrapper test** — Does it assemble relevant context correctly?
3. **Knowledge store test** — Does the index.json maintain consistency?
4. **Confidence calibration test**Do high-confidence facts actually prove true in later sessions?
5. **Deduplication test** — Are duplicate facts across sessions handled?
6. **Staleness test** — How does the system handle outdated knowledge?
---
1. **session_reader.py** — No dedicated test file (tested indirectly)
2. **sampler.py** — No test file (scoring logic untested)
3. **session_metadata.py** — No test file
4. **validate_knowledge.py**No test file
5. **knowledge_staleness_check.py** — Tested but limited
## Security Considerations
1. **No secrets in knowledge store** — The harvester must filter out API keys, tokens, and credentials from extracted facts. The prompt constraints mention this but there is no automated guard.
### API Key Handling
- `harvester.py` reads API key from `~/.hermes/auth.json` or env vars
- Key passed to LLM API in request headers only
- No key logging
2. **Knowledge poisoning** — A malicious or corrupted session could inject false facts. Confidence scoring partially mitigates this, but there is no verification step.
### Knowledge Integrity
- `validate_fact()` checks schema before writing
- `deduplicate()` prevents duplicate entries via fingerprint
- `knowledge_staleness_check.py` detects when source code changed but knowledge didn't
- Confidence scores prevent low-quality knowledge from polluting the store
3. **Access control** — The knowledge store has no access control. Any process that can read the directory can read all facts. In a multi-tenant setup, this is a concern.
### File Safety
- Knowledge writes are append-only (never deletes)
- Bootstrap context is truncated to budget (no prompt injection via knowledge)
- Session reader handles malformed JSONL gracefully
4. **Transcript privacy** — Session transcripts may contain user data. The harvester must not extract personally identifiable information into the knowledge store.
## File Index
```
scripts/
harvester.py (473 lines) — Core knowledge extraction
bootstrapper.py (302 lines) — Pre-session context builder
session_reader.py (137 lines) — JSONL session parser
sampler.py (363 lines) — Session scoring + ranking
session_metadata.py (271 lines) — Session metadata extraction
validate_knowledge.py (44 lines) — Index validation
knowledge_staleness_check.py (125 lines) — Staleness detection
knowledge_gap_identifier.py (291 lines) — Gap analysis engine
diff_analyzer.py (203 lines) — Diff analysis
improvement_proposals.py (518 lines) — Proposal generation
priority_rebalancer.py (745 lines) — Priority scoring
automation_opportunity_finder.py (600 lines) — Automation detection
dead_code_detector.py (270 lines) — Dead code detection
dependency_graph.py (220 lines) — Dependency mapping
perf_bottleneck_finder.py (635 lines) — Perf analysis
refactoring_opportunity_finder.py (46 lines) — Refactoring signals
gitea_issue_parser.py (140 lines) — Gitea issue parsing
session_pair_harvester.py (224 lines) — Training pair extraction
knowledge/
index.json (12KB) — Master fact store
SCHEMA.md (3KB) — Schema docs
global/pitfalls.yaml (2KB) — Cross-repo pitfalls
global/tool-quirks.yaml (2KB) — Tool quirks
repos/hermes-agent.yaml (2KB) — Repo-specific knowledge
repos/the-nexus.yaml (2KB) — Repo-specific knowledge
templates/
harvest-prompt.md (4KB) — Extraction prompt
test_sessions/ (5 files) — Sample transcripts
tests/ + scripts/test_* (14 files)— Test suite
```
**Total:** ~6,500 lines of code across 18 scripts + 14 test files.
---
## The 100x Path (from README)
```
Month 1: 15,000 facts, sessions 20% faster
Month 2: 45,000 facts, sessions 40% faster, first-try success up 30%
Month 3: 90,000 facts, fleet measurably smarter per token
```
Each new session is better than the last. The intelligence compounds.
---
*Generated by codebase-genome pipeline. Ref: timmy-home#676.*
*Generated by Codebase Genome pipeline — Issue #676*

184
WIKI.md Normal file
View File

@@ -0,0 +1,184 @@
# LLM Wiki Layer — Documentation
**Status:** Implemented (2026-04-27)
**Issue:** Timmy_Foundation/compounding-intelligence#231
**Parent:** Timmy_Foundation/hermes-agent#984 ([ATLAS] Steal Atlas ecosystem patterns)
---
## Overview
The **LLM Wiki layer** is a sovereign knowledge interface built on top of the `knowledge/` fact store. It provides:
| Capability | Command | Description |
|------------|---------|-------------|
| **Ingest** | `wiki ingest --session <file>` | Harvest facts from session transcripts via LLM extraction |
| **Crystallize** | `wiki crystal --session <file>` | Alias for ingest — session distillation into durable pages |
| **Query** | `wiki query "<question>"` | RAG-style retrieval + LLM synthesis with citations |
| **Lint** | `wiki lint` | Detect staleness, duplicates, and potential contradictions |
Location: `scripts/wiki.py` (entry point)
---
## How It Differs From…
### RAG (Retrieval-Augmented Generation)
**RAG** retrieves raw chunks (e.g., code snippets, paragraph strings) and feeds them to an LLM. Chunks are unnormalized, un scored, and carry no provenance beyond the source file path.
**LLM Wiki** retrieves *normalized facts* from `knowledge/index.json` — each fact has:
- A unique ID (`domain:category:seq`)
- A confidence score (0.01.0)
- Provenance (`source_session`, `source_count`, `first_seen`, `last_confirmed`)
- Explicit category (`fact` | `pitfall` | `pattern` | `tool-quirk` | `question`)
- Tags for cross-domain linking
The query path formats facts with their IDs and asks the LLM to cite `[N]` indices, preserving traceability.
### Transcript Search
**Transcript search** is keyword grep over raw session JSONL files. It shows you exactly what was said, when, but you must manually extract insight.
**LLM Wiki** is *distilled insight* — the harvester already extracted durable knowledge from sessions (via LLM extraction prompt). The wiki layer queries that distilled store, not the noisy raw transcripts.
---
## Architecture
```
┌─────────────────┐
│ Session JSONL │ ← raw session transcripts
└────────┬────────┘
│ harvester.py (ingest)
┌─────────────────┐
│ knowledge/index.json ← canonical fact index (machine-readable)
│ knowledge/*.md ← human-editable pages (durable wiki pages)
└────────┬────────┘
│ wiki.py (query)
retrieve_facts() format_facts_as_context()
│ │
└────────────┬────────────────┘
LLM synthesis with citations
answer string
```
- **Ingest path:** `harvester.py``write_knowledge()` updates `index.json` and appends to `knowledge/{global,repos}/*.md`
- **Query path:** `wiki query``retrieve_facts()` (BM25-ish keyword + tag + confidence + recency) → `call_llm_synthesize()` → cited answer
- **Lint path:** `wiki lint``freshness.py` (source-hash staleness) + duplicate detection + contradiction heuristic
---
## Usage Examples
### Query the wiki
```bash
# Ask a question (uses HARVESTER_API_KEY / OPENROUTER_API_KEY)
python3 scripts/wiki.py query "How do I fix deploy-crons mixed model format?"
# Retrieve-only (dry-run) to inspect context
python3 scripts/wiki.py query "gitea token location" --dry-run --top 5
# With custom search depth
python3 scripts/wiki.py query "cron job pitfalls" --top 20
```
Sample output:
```
→ Retrieved 3 facts:
[1] hermes-agent:pitfall:001: deploy-crons.py leaves jobs in mixed model format
[2] hermes-agent:pitfall:002: deploy-crons.py --deploy doesn't set legacy skill field
[3] hermes-agent:pitfall:003: Cron jobs with blank fallback_model trigger warnings
← Answer: The mixed model format bug in deploy-crons.py (pitfall #001) leaves jobs unparsed;
ensure all cron jobs specify a single model provider. (#002) Verify fallback_model is never blank (#003). [1][2][3]
```
### Ingest from a session
```bash
# Harvest knowledge from a finished session
python3 scripts/wiki.py ingest --session ~/.hermes/sessions/session_20260427.jsonl
# Dry-run preview (no writes)
python3 scripts/wiki.py ingest --session session.jsonl --dry-run
```
This invokes `harvester.py` under the hood, which:
1. Reads the transcript via `session_reader.py`
2. Calls the LLM extraction prompt (templates/harvest-prompt.md)
3. Validates + deduplicates + writes to `knowledge/`
### Lint the knowledge base
```bash
# Run all checks: staleness (freshness.py), duplicates, contradictions
python3 scripts/wiki.py lint
```
Output:
```
WARNINGS (6):
⚠ Potential contradiction in hermes-agent/pitfall: hermes-agent:pitfall:001 vs hermes-agent:pitfall:002
⚠ Duplicate fact text: 'Token is at ~/.config/gitea/token'... IDs: global:tool-quirk:001, global:tool-quirk:005
✓ No lint issues found.
```
> **Note:** Contradiction detection is heuristic (word-overlap based). Human review required.
### Crystallize a session
```bash
# Alias for ingest — explicit "session distillation" terminology
python3 scripts/wiki.py crystal --session ~/.hermes/sessions/recent.jsonl
```
---
## Configuration
| Env Var | Default | Purpose |
|----------|---------|---------|
| `HARVESTER_API_KEY` | — | LLM API key (Nous/OpenRouter) |
| `OPENROUTER_API_KEY` | — | Alternative key location |
| `HARVESTER_API_BASE` | `https://api.nousresearch.com/v1` | LLM base URL |
| `HARVESTER_MODEL` | `xiaomi/mimo-v2-pro` | Model for synthesis |
API keys are also read from `~/.config/nous/key`, `~/.hermes/keymaxxing/active/minimax.key`, or `~/.config/openrouter/key` if env vars are unset.
---
## Acceptance Criteria (for #231)
| Criterion | Status | Evidence |
|-----------|--------|----------|
| Concrete wiki path & schema exist | ✓ | `knowledge/` directory, `SCHEMA.md`, `index.json` |
| Ingest updates durable wiki pages | ✓ | `wiki ingest` + `harvester.py` writes markdown to `knowledge/repos/*.md` |
| Queries answer with citations | ✓ | `wiki query` retrieves facts, calls LLM with `[N]` citation format |
| Lint surfaces contradictions/staleness/broken links | ✓ (partial) | Staleness via `freshness.py`; contradiction heuristic; broken links TBD |
| Session crystallization flow | ✓ | `wiki crystal` / `ingest` runs harvester distills sessions into `knowledge/` |
| Documented as distinct from RAG/transcript search | ✓ | This document explicitly distinguishes them |
---
## Implementation Notes
- **Retrieval:** Simple BM25-ish keyword + tag + confidence + recency scoring. No embedding DB needed; the fact store is small (~100s1000s of entries). Works locally without vector databases.
- **Synthesis:** Single LLM call with structured prompt. Temperature=0.1 for determinism.
- **Idempotency:** Harvester deduplicates by content hash before writing — repeated ingestion of the same session is safe.
- **Extensibility:** Add new retrieval strategies (embedding similarity) by replacing `retrieve_facts()`.
---
## Future Work
- [ ] Embedding-based retrieval (cosine similarity over fact embeddings)
- [ ] Broken link detection (scan markdown files in `knowledge/` for dead URLs)
- [ ] Tag drift detection (growth of orphan/unused tags)
- [ ] Quality-gated auto-pruning of low-confidence stale facts
- [ ] Web UI for interactive wiki browsing
- [ ] Knowledge graph linking (via `related` field in index)

297
quality_gate.py Normal file
View File

@@ -0,0 +1,297 @@
#!/usr/bin/env python3
"""
quality_gate.py — Score and filter knowledge entries.
Scores each entry on 4 dimensions:
- Specificity: concrete examples vs vague generalities
- Actionability: can this be used to do something?
- Freshness: is this still accurate?
- Source quality: was the model/provider reliable?
Usage:
from quality_gate import score_entry, filter_entries, quality_report
score = score_entry(entry)
filtered = filter_entries(entries, threshold=0.5)
report = quality_report(entries)
"""
import json
import math
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
# Source quality scores (higher = more reliable)
SOURCE_QUALITY = {
"claude-sonnet": 0.9,
"claude-opus": 0.95,
"gpt-4": 0.85,
"gpt-4-turbo": 0.85,
"gpt-5": 0.9,
"mimo-v2-pro": 0.8,
"gemini-pro": 0.8,
"llama-3-70b": 0.75,
"llama-3-8b": 0.7,
"ollama": 0.6,
"unknown": 0.5,
}
DEFAULT_SOURCE_QUALITY = 0.5
# Specificity indicators
SPECIFIC_INDICATORS = [
r"\b\d+\.\d+", # decimal numbers
r"\b\d{4}-\d{2}-\d{2}", # dates
r"\b[A-Z][a-z]+\s[A-Z][a-z]+", # proper nouns
r"`[^`]+`", # code/commands
r"https?://", # URLs
r"\b(example|instance|specifically|concretely)\b",
r"\b(step \d|first|second|third)\b",
r"\b(exactly|precisely|measured|counted)\b",
]
# Vagueness indicators (penalty)
VAGUE_INDICATORS = [
r"\b(generally|usually|often|sometimes|might|could|perhaps)\b",
r"\b(various|several|many|some|few)\b",
r"\b(it depends|varies|differs)\b",
r"\b(basically|essentially|fundamentally)\b",
r"\b(everyone knows|it's obvious|clearly)\b",
]
# Actionability indicators
ACTIONABLE_INDICATORS = [
r"\b(run|execute|install|deploy|configure|set up)\b",
r"\b(use|apply|implement|create|build)\b",
r"\b(check|verify|test|validate|confirm)\b",
r"\b(fix|resolve|solve|debug|troubleshoot)\b",
r"\b(if .+ then|when .+ do|to .+ use)\b",
r"```[a-z]*\n", # code blocks
r"\$\s", # shell commands
r"\b\d+\.\s", # numbered steps
]
def score_specificity(content: str) -> float:
"""Score specificity: 0=vague, 1=very specific."""
content_lower = content.lower()
score = 0.5 # baseline
# Check for specific indicators
specific_count = sum(
len(re.findall(p, content, re.IGNORECASE))
for p in SPECIFIC_INDICATORS
)
# Check for vague indicators
vague_count = sum(
len(re.findall(p, content_lower))
for p in VAGUE_INDICATORS
)
# Adjust score
score += min(specific_count * 0.05, 0.4)
score -= min(vague_count * 0.08, 0.3)
# Length bonus (longer = more detail, up to a point)
word_count = len(content.split())
if word_count > 50:
score += min((word_count - 50) * 0.001, 0.1)
return max(0.0, min(1.0, score))
def score_actionability(content: str) -> float:
"""Score actionability: 0=abstract, 1=highly actionable."""
content_lower = content.lower()
score = 0.3 # baseline (most knowledge is informational)
# Check for actionable indicators
actionable_count = sum(
len(re.findall(p, content_lower))
for p in ACTIONABLE_INDICATORS
)
score += min(actionable_count * 0.1, 0.6)
# Code blocks are highly actionable
if "```" in content:
score += 0.2
# Numbered steps are actionable
if re.search(r"\d+\.\s+\w", content):
score += 0.1
return max(0.0, min(1.0, score))
def score_freshness(timestamp: Optional[str]) -> float:
"""Score freshness: 1=new, decays over time."""
if not timestamp:
return 0.5
try:
if isinstance(timestamp, str):
ts = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
else:
ts = timestamp
now = datetime.now(timezone.utc)
age_days = (now - ts).days
# Exponential decay: 1.0 at day 0, 0.5 at ~180 days, 0.1 at ~365 days
score = math.exp(-age_days / 180)
return max(0.1, min(1.0, score))
except (ValueError, TypeError):
return 0.5
def score_source_quality(model: Optional[str]) -> float:
"""Score source quality based on model/provider."""
if not model:
return DEFAULT_SOURCE_QUALITY
# Normalize model name
model_lower = model.lower()
for key, score in SOURCE_QUALITY.items():
if key in model_lower:
return score
return DEFAULT_SOURCE_QUALITY
def score_entry(entry: dict) -> float:
"""
Score a knowledge entry on quality (0.0-1.0).
Weights:
- specificity: 0.3
- actionability: 0.3
- freshness: 0.2
- source_quality: 0.2
"""
content = entry.get("content", entry.get("text", entry.get("response", "")))
model = entry.get("model", entry.get("provenance", {}).get("model"))
timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp"))
specificity = score_specificity(content)
actionability = score_actionability(content)
freshness = score_freshness(timestamp)
source = score_source_quality(model)
return round(
0.3 * specificity +
0.3 * actionability +
0.2 * freshness +
0.2 * source,
4
)
def score_entry_detailed(entry: dict) -> dict:
"""Score with breakdown."""
content = entry.get("content", entry.get("text", entry.get("response", "")))
model = entry.get("model", entry.get("provenance", {}).get("model"))
timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp"))
specificity = score_specificity(content)
actionability = score_actionability(content)
freshness = score_freshness(timestamp)
source = score_source_quality(model)
return {
"score": round(0.3 * specificity + 0.3 * actionability + 0.2 * freshness + 0.2 * source, 4),
"specificity": round(specificity, 4),
"actionability": round(actionability, 4),
"freshness": round(freshness, 4),
"source_quality": round(source, 4),
}
def filter_entries(entries: List[dict], threshold: float = 0.5) -> List[dict]:
"""Filter entries below quality threshold."""
filtered = []
for entry in entries:
if score_entry(entry) >= threshold:
filtered.append(entry)
return filtered
def quality_report(entries: List[dict]) -> str:
"""Generate quality distribution report."""
if not entries:
return "No entries to analyze."
scores = [score_entry(e) for e in entries]
avg = sum(scores) / len(scores)
min_score = min(scores)
max_score = max(scores)
# Distribution buckets
buckets = {"high": 0, "medium": 0, "low": 0, "rejected": 0}
for s in scores:
if s >= 0.7:
buckets["high"] += 1
elif s >= 0.5:
buckets["medium"] += 1
elif s >= 0.3:
buckets["low"] += 1
else:
buckets["rejected"] += 1
lines = [
"=" * 50,
" QUALITY GATE REPORT",
"=" * 50,
f" Total entries: {len(entries)}",
f" Average score: {avg:.3f}",
f" Min: {min_score:.3f}",
f" Max: {max_score:.3f}",
"",
" Distribution:",
]
for bucket, count in buckets.items():
pct = count / len(entries) * 100
bar = "" * int(pct / 5)
lines.append(f" {bucket:<12} {count:>5} ({pct:>5.1f}%) {bar}")
passed = buckets["high"] + buckets["medium"]
lines.append(f"\n Pass rate (>= 0.5): {passed}/{len(entries)} ({passed/len(entries)*100:.1f}%)")
lines.append("=" * 50)
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Knowledge quality gate")
parser.add_argument("files", nargs="+", help="JSONL files to score")
parser.add_argument("--threshold", type=float, default=0.5, help="Quality threshold")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--filter", action="store_true", help="Filter and write back")
args = parser.parse_args()
all_entries = []
for filepath in args.files:
with open(filepath) as f:
for line in f:
if line.strip():
all_entries.append(json.loads(line))
if args.json:
results = [{"entry": e, **score_entry_detailed(e)} for e in all_entries]
print(json.dumps(results, indent=2))
elif args.filter:
filtered = filter_entries(all_entries, args.threshold)
print(f"Kept {len(filtered)}/{len(all_entries)} entries (threshold: {args.threshold})")
else:
print(quality_report(all_entries))
if __name__ == "__main__":
main()

317
scripts/dedup.py Normal file
View File

@@ -0,0 +1,317 @@
#!/usr/bin/env python3
"""
dedup.py — Knowledge deduplication: content hash + semantic similarity.
Deduplicates harvested knowledge entries to avoid training on duplicates.
Uses content hashing for exact matches and token overlap for near-duplicates.
Usage:
python3 dedup.py --input knowledge/index.json --output knowledge/index_deduped.json
python3 dedup.py --input knowledge/index.json --dry-run
python3 dedup.py --test # Run built-in dedup test
"""
import argparse
import hashlib
import json
import re
import sys
from pathlib import Path
from typing import List, Dict, Optional, Tuple
def normalize_text(text: str) -> str:
"""Normalize text for hashing: lowercase, collapse whitespace, strip."""
text = text.lower().strip()
text = re.sub(r'\s+', ' ', text)
return text
def content_hash(text: str) -> str:
"""SHA256 hash of normalized text for exact dedup."""
normalized = normalize_text(text)
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
def tokenize(text: str) -> set:
"""Simple tokenizer: lowercase words, 3+ chars."""
words = re.findall(r'[a-z0-9_]{3,}', text.lower())
return set(words)
def token_similarity(a: str, b: str) -> float:
"""Token-based Jaccard similarity (0.0-1.0).
Fast local alternative to embedding similarity.
Good enough for near-duplicate detection.
"""
tokens_a = tokenize(a)
tokens_b = tokenize(b)
if not tokens_a or not tokens_b:
return 0.0
intersection = tokens_a & tokens_b
union = tokens_a | tokens_b
return len(intersection) / len(union)
def quality_score(fact: dict) -> float:
"""Compute quality score for merge ranking.
Higher is better. Factors:
- confidence (0-1)
- source_count (more confirmations = better)
- has tags (richer metadata)
"""
confidence = fact.get('confidence', 0.5)
source_count = fact.get('source_count', 1)
has_tags = 1.0 if fact.get('tags') else 0.0
has_related = 1.0 if fact.get('related') else 0.0
# Weighted composite
score = (
confidence * 0.5 +
min(source_count / 10, 1.0) * 0.3 +
has_tags * 0.1 +
has_related * 0.1
)
return round(score, 4)
def merge_facts(keep: dict, drop: dict) -> dict:
"""Merge two near-duplicate facts, keeping higher-quality fields.
The 'keep' fact is enriched with metadata from 'drop'.
"""
# Merge tags (union)
keep_tags = set(keep.get('tags', []))
drop_tags = set(drop.get('tags', []))
keep['tags'] = sorted(keep_tags | drop_tags)
# Merge related (union)
keep_related = set(keep.get('related', []))
drop_related = set(drop.get('related', []))
keep['related'] = sorted(keep_related | drop_related)
# Update source_count (sum)
keep['source_count'] = keep.get('source_count', 1) + drop.get('source_count', 1)
# Update confidence (max — we've now seen it from multiple sources)
keep['confidence'] = max(keep.get('confidence', 0), drop.get('confidence', 0))
# Track that we merged
if '_merged_from' not in keep:
keep['_merged_from'] = []
keep['_merged_from'].append(drop.get('id', 'unknown'))
return keep
def dedup_facts(
facts: List[dict],
exact_threshold: float = 1.0,
near_threshold: float = 0.95,
dry_run: bool = False,
) -> Tuple[List[dict], dict]:
"""Deduplicate a list of knowledge facts.
Args:
facts: List of fact dicts (from index.json)
exact_threshold: Hash match = exact duplicate
near_threshold: Token similarity above this = near-duplicate
dry_run: If True, don't modify, just report
Returns:
(deduped_facts, stats_dict)
"""
if not facts:
return [], {"total": 0, "exact_dupes": 0, "near_dupes": 0, "unique": 0}
# Phase 1: Exact dedup by content hash
hash_seen = {} # hash -> index in deduped list
exact_dupes = 0
deduped = []
for fact in facts:
text = fact.get('fact', '')
h = content_hash(text)
if h in hash_seen:
# Exact duplicate — merge metadata into existing
existing_idx = hash_seen[h]
if not dry_run:
deduped[existing_idx] = merge_facts(deduped[existing_idx], fact)
exact_dupes += 1
else:
hash_seen[h] = len(deduped)
deduped.append(fact)
# Phase 2: Near-dup by token similarity
near_dupes = 0
i = 0
while i < len(deduped):
j = i + 1
while j < len(deduped):
sim = token_similarity(deduped[i].get('fact', ''), deduped[j].get('fact', ''))
if sim >= near_threshold:
# Near-duplicate — keep higher quality
q_i = quality_score(deduped[i])
q_j = quality_score(deduped[j])
if q_i >= q_j:
if not dry_run:
deduped[i] = merge_facts(deduped[i], deduped[j])
deduped.pop(j)
else:
# j is higher quality — merge i into j, then remove i
if not dry_run:
deduped[j] = merge_facts(deduped[j], deduped[i])
deduped.pop(i)
break # i changed, restart inner loop
near_dupes += 1
else:
j += 1
i += 1
stats = {
"total": len(facts),
"exact_dupes": exact_dupes,
"near_dupes": near_dupes,
"unique": len(deduped),
"removed": len(facts) - len(deduped),
}
return deduped, stats
def dedup_index_file(
input_path: str,
output_path: Optional[str] = None,
near_threshold: float = 0.95,
dry_run: bool = False,
) -> dict:
"""Deduplicate an index.json file.
Args:
input_path: Path to index.json
output_path: Where to write deduped file (default: overwrite input)
near_threshold: Token similarity threshold for near-dupes
dry_run: Report only, don't write
Returns stats dict.
"""
path = Path(input_path)
if not path.exists():
raise FileNotFoundError(f"Index file not found: {input_path}")
with open(path) as f:
data = json.load(f)
facts = data.get('facts', [])
deduped, stats = dedup_facts(facts, near_threshold=near_threshold, dry_run=dry_run)
if not dry_run:
data['facts'] = deduped
data['total_facts'] = len(deduped)
data['last_dedup'] = __import__('datetime').datetime.now(
__import__('datetime').timezone.utc
).isoformat()
out_path = Path(output_path) if output_path else path
with open(out_path, 'w') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return stats
def generate_test_duplicates(n: int = 20) -> List[dict]:
"""Generate test facts with intentional duplicates for testing.
Creates n unique facts plus n/4 exact dupes and n/4 near-dupes.
"""
import random
random.seed(42)
unique_facts = []
for i in range(n):
topic = random.choice(["git", "python", "docker", "rust", "nginx"])
tip = random.choice(["use verbose flags", "check logs first", "restart service", "clear cache", "update config"])
unique_facts.append({
"id": f"test:fact:{i:03d}",
"fact": f"When working with {topic}, always {tip} before deploying.",
"category": "fact",
"domain": "test",
"confidence": round(random.uniform(0.5, 1.0), 2),
"source_count": random.randint(1, 5),
"tags": [topic, "test"],
})
# Add exact duplicates (same text, different IDs)
duped = list(unique_facts)
for i in range(n // 4):
original = unique_facts[i]
dupe = dict(original)
dupe["id"] = f"test:fact:dup{i:03d}"
dupe["confidence"] = round(random.uniform(0.3, 0.8), 2)
duped.append(dupe)
# Add near-duplicates (slightly different phrasing)
for i in range(n // 4):
original = unique_facts[i]
near = dict(original)
near["id"] = f"test:fact:near{i:03d}"
near["fact"] = original["fact"].replace("always", "should").replace("before deploying", "prior to deployment")
near["confidence"] = round(random.uniform(0.4, 0.9), 2)
duped.append(near)
return duped
def main():
parser = argparse.ArgumentParser(description="Knowledge deduplication")
parser.add_argument("--input", help="Path to index.json")
parser.add_argument("--output", help="Output path (default: overwrite input)")
parser.add_argument("--threshold", type=float, default=0.95,
help="Near-dup similarity threshold (default: 0.95)")
parser.add_argument("--dry-run", action="store_true", help="Report only, don't write")
parser.add_argument("--test", action="store_true", help="Run built-in dedup test")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
if args.test:
test_facts = generate_test_duplicates(20)
print(f"Generated {len(test_facts)} test facts (20 unique + dupes)")
deduped, stats = dedup_facts(test_facts, near_threshold=args.threshold)
print(f"\nDedup results:")
print(f" Total input: {stats['total']}")
print(f" Exact dupes: {stats['exact_dupes']}")
print(f" Near dupes: {stats['near_dupes']}")
print(f" Unique output: {stats['unique']}")
print(f" Removed: {stats['removed']}")
# Verify: should have ~20 unique (some merged)
assert stats['unique'] <= 20, f"Too many unique: {stats['unique']} > 20"
assert stats['unique'] >= 15, f"Too few unique: {stats['unique']} < 15"
assert stats['removed'] > 0, "No duplicates removed"
print("\nOK: Dedup test passed")
return
if not args.input:
print("ERROR: Provide --input or --test")
sys.exit(1)
stats = dedup_index_file(args.input, args.output, args.threshold, args.dry_run)
if args.json:
print(json.dumps(stats, indent=2))
else:
print(f"Dedup results:")
print(f" Total input: {stats['total']}")
print(f" Exact dupes: {stats['exact_dupes']}")
print(f" Near dupes: {stats['near_dupes']}")
print(f" Unique output: {stats['unique']}")
print(f" Removed: {stats['removed']}")
if args.dry_run:
print(" (dry run — no changes written)")
if __name__ == "__main__":
main()

View File

@@ -149,8 +149,8 @@ def to_dot(graph: dict) -> str:
"""Generate DOT format output."""
lines = ["digraph dependencies {"]
lines.append(" rankdir=LR;")
lines.append(" node [shape=box, style=filled, fillcolor="#1a1a2e", fontcolor="#e6edf3"];")
lines.append(" edge [color="#4a4a6a"];")
lines.append(' node [shape=box, style=filled, fillcolor="#1a1a2e", fontcolor="#e6edf3"];')
lines.append(' edge [color="#4a4a6a"];')
lines.append("")
for repo, data in sorted(graph.items()):

387
scripts/freshness.py Normal file
View File

@@ -0,0 +1,387 @@
#!/usr/bin/env python3
"""
Knowledge Freshness Cron — Detect stale entries from code changes (Issue #200)
Automatically detects when knowledge entries become stale due to code changes.
Detection Method:
1. Track source file hash alongside knowledge entry
2. Compare current file hashes vs stored
3. Mismatch → flag entry as potentially stale
4. Report stale entries and optionally re-extract
Usage:
python3 scripts/freshness.py --knowledge-dir knowledge/
python3 scripts/freshness.py --knowledge-dir knowledge/ --json
python3 scripts/freshness.py --knowledge-dir knowledge/ --repo /path/to/repo
python3 scripts/freshness.py --knowledge-dir knowledge/ --auto-reextract
"""
import argparse
import hashlib
import json
import os
import subprocess
import sys
import yaml
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
try:
with open(filepath, "rb") as f:
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return None
def get_git_file_changes(repo_path: str, days: int = 1) -> Dict[str, List[str]]:
"""
Get files changed in git in the last N days.
Returns dict with 'modified', 'added', 'deleted' lists of file paths.
"""
changes = {"modified": [], "added": [], "deleted": []}
try:
# Get commits from last N days
cmd = [
"git", "-C", repo_path, "log",
f"--since={days} days ago",
"--name-status",
"--pretty=format:",
"--diff-filter=MAD"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return changes
for line in result.stdout.splitlines():
line = line.strip()
if not line:
continue
parts = line.split('\t', 1)
if len(parts) != 2:
continue
status, filepath = parts
if status == 'M':
changes["modified"].append(filepath)
elif status == 'A':
changes["added"].append(filepath)
elif status == 'D':
changes["deleted"].append(filepath)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# Deduplicate
for key in changes:
changes[key] = list(set(changes[key]))
return changes
def load_knowledge_entries(knowledge_dir: str) -> List[Dict[str, Any]]:
"""
Load knowledge entries from YAML files in the knowledge directory.
Supports:
- knowledge/index.json (legacy format)
- knowledge/global/*.yaml
- knowledge/repos/*.yaml
- knowledge/agents/*.yaml
"""
entries = []
# Load from index.json if exists
index_path = os.path.join(knowledge_dir, "index.json")
if os.path.exists(index_path):
try:
with open(index_path) as f:
data = json.load(f)
for fact in data.get("facts", []):
entries.append({
"source": "index.json",
"fact": fact.get("fact", ""),
"source_file": fact.get("source_file"),
"source_hash": fact.get("source_hash"),
"category": fact.get("category", "unknown"),
"confidence": fact.get("confidence", 0.5)
})
except (json.JSONDecodeError, KeyError):
pass
# Load from YAML files
for subdir in ["global", "repos", "agents"]:
subdir_path = os.path.join(knowledge_dir, subdir)
if not os.path.isdir(subdir_path):
continue
for filename in os.listdir(subdir_path):
if not filename.endswith((".yaml", ".yml")):
continue
filepath = os.path.join(subdir_path, filename)
try:
with open(filepath) as f:
data = yaml.safe_load(f)
if not data or not isinstance(data, dict):
continue
# Extract entries from YAML structure
for key, value in data.items():
if isinstance(value, list):
for item in value:
if isinstance(item, dict):
entries.append({
"source": f"{subdir}/{filename}",
"fact": item.get("description", item.get("fact", "")),
"source_file": item.get("source_file"),
"source_hash": item.get("source_hash"),
"category": item.get("category", "unknown"),
"confidence": item.get("confidence", 0.5)
})
elif isinstance(value, dict):
entries.append({
"source": f"{subdir}/{filename}",
"fact": value.get("description", value.get("fact", "")),
"source_file": value.get("source_file"),
"source_hash": value.get("source_hash"),
"category": value.get("category", "unknown"),
"confidence": value.get("confidence", 0.5)
})
except (yaml.YAMLError, IOError):
pass
return entries
def check_freshness(knowledge_dir: str, repo_root: str = ".",
days: int = 1) -> Dict[str, Any]:
"""
Check freshness of knowledge entries against recent code changes.
Returns:
{
"timestamp": ISO timestamp,
"total_entries": int,
"stale_entries": [...],
"fresh_entries": [...],
"git_changes": {...},
"summary": {...}
}
"""
entries = load_knowledge_entries(knowledge_dir)
git_changes = get_git_file_changes(repo_root, days)
stale_entries = []
fresh_entries = []
for entry in entries:
source_file = entry.get("source_file")
if not source_file:
# Entry without source file reference
fresh_entries.append({**entry, "status": "no_source"})
continue
# Check if source file was recently modified
is_stale = False
reason = ""
if source_file in git_changes["modified"]:
is_stale = True
reason = "source_modified"
elif source_file in git_changes["deleted"]:
is_stale = True
reason = "source_deleted"
elif source_file in git_changes["added"]:
is_stale = True
reason = "source_added"
# Also check hash if available
stored_hash = entry.get("source_hash")
if stored_hash:
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash is None:
is_stale = True
reason = "source_missing"
elif current_hash != stored_hash:
is_stale = True
reason = "hash_mismatch"
if is_stale:
stale_entries.append({
**entry,
"status": "stale",
"reason": reason
})
else:
fresh_entries.append({**entry, "status": "fresh"})
# Compute summary
total = len(entries)
stale_count = len(stale_entries)
fresh_count = len(fresh_entries)
# Group stale entries by reason
stale_by_reason = {}
for entry in stale_entries:
reason = entry.get("reason", "unknown")
if reason not in stale_by_reason:
stale_by_reason[reason] = 0
stale_by_reason[reason] += 1
return {
"timestamp": datetime.now(timezone.utc).isoformat(),
"total_entries": total,
"stale_entries": stale_entries,
"fresh_entries": fresh_entries,
"git_changes": git_changes,
"summary": {
"total": total,
"stale": stale_count,
"fresh": fresh_count,
"stale_percentage": round(stale_count / total * 100, 1) if total > 0 else 0,
"stale_by_reason": stale_by_reason,
"git_changes_summary": {
"modified": len(git_changes["modified"]),
"added": len(git_changes["added"]),
"deleted": len(git_changes["deleted"])
}
}
}
def update_stale_hashes(knowledge_dir: str, repo_root: str = ".") -> int:
"""
Update hashes for stale entries. Returns count of updated entries.
"""
entries = load_knowledge_entries(knowledge_dir)
updated = 0
# This is a simplified version - in practice, you'd need to
# write back to the specific YAML files
for entry in entries:
source_file = entry.get("source_file")
if not source_file:
continue
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash and entry.get("source_hash") != current_hash:
# Mark for update (in practice, you'd write back to the file)
updated += 1
return updated
def format_report(result: Dict[str, Any], max_items: int = 20) -> str:
"""Format freshness check results as a human-readable report."""
timestamp = result["timestamp"]
summary = result["summary"]
stale_entries = result["stale_entries"]
git_changes = result["git_changes"]
lines = [
"Knowledge Freshness Report",
"=" * 50,
f"Generated: {timestamp}",
f"Total entries: {summary['total']}",
f"Stale entries: {summary['stale']} ({summary['stale_percentage']}%)",
f"Fresh entries: {summary['fresh']}",
""
]
# Git changes summary
lines.extend([
"Git Changes (last 24h):",
f" Modified: {len(git_changes['modified'])} files",
f" Added: {len(git_changes['added'])} files",
f" Deleted: {len(git_changes['deleted'])} files",
""
])
# Stale entries by reason
if summary.get("stale_by_reason"):
lines.extend([
"Stale Entries by Reason:",
""
])
for reason, count in summary["stale_by_reason"].items():
lines.append(f" {reason}: {count}")
lines.append("")
# List stale entries
if stale_entries:
lines.extend([
"Stale Entries:",
""
])
for i, entry in enumerate(stale_entries[:max_items], 1):
source = entry.get("source_file", "?")
reason = entry.get("reason", "unknown")
fact = entry.get("fact", "")[:60]
lines.append(f"{i:2d}. [{reason}] {source}")
if fact:
lines.append(f" {fact}")
if len(stale_entries) > max_items:
lines.append(f"\n... and {len(stale_entries) - max_items} more")
else:
lines.append("No stale entries found. All knowledge is fresh!")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Knowledge Freshness Cron — detect stale entries from code changes")
parser.add_argument("--knowledge-dir", required=True,
help="Path to knowledge directory")
parser.add_argument("--repo", default=".",
help="Path to repository for git change detection")
parser.add_argument("--days", type=int, default=1,
help="Number of days to check for git changes (default: 1)")
parser.add_argument("--json", action="store_true",
help="Output as JSON instead of human-readable")
parser.add_argument("--max", type=int, default=20,
help="Maximum stale entries to show (default: 20)")
parser.add_argument("--auto-reextract", action="store_true",
help="Auto-re-extract knowledge for stale entries")
args = parser.parse_args()
if not os.path.isdir(args.knowledge_dir):
print(f"Error: {args.knowledge_dir} is not a directory", file=sys.stderr)
sys.exit(1)
if not os.path.isdir(args.repo):
print(f"Error: {args.repo} is not a directory", file=sys.stderr)
sys.exit(1)
result = check_freshness(args.knowledge_dir, args.repo, args.days)
if args.json:
print(json.dumps(result, indent=2))
else:
print(format_report(result, args.max))
# Auto-re-extract if requested
if args.auto_reextract and result["stale_entries"]:
print(f"\nAuto-re-extracting {len(result['stale_entries'])} stale entries...")
# In a real implementation, this would call the harvester
print("(Auto-re-extraction not yet implemented)")
if __name__ == "__main__":
main()

View File

@@ -113,7 +113,7 @@ def find_slow_tests_by_scan(repo_path: str) -> List[Bottleneck]:
(r"time\.sleep\((\d+(?:\.\d+)?)\)", "Contains time.sleep() — consider using mock or async wait"),
(r"subprocess\.run\(.*timeout=(\d+)", "Subprocess with timeout — may block test"),
(r"requests\.(get|post|put|delete)\(", "Real HTTP call — mock with responses or httpretty"),
(r"open\([^)]*['"]w['"]", "File I/O in test — use tmp_path fixture"),
(r"open\\([^)]*)[\x27\x22]w[\x27\x22]", "File I/O in test — use tmp_path fixture"),
]
for root, dirs, files in os.walk(repo_path):
@@ -506,8 +506,8 @@ def format_markdown(report: PerfReport) -> str:
lines.append(f"- {icon} {b.name}{loc} — ~{b.duration_s:.1f}s — {b.recommendation}")
lines.append(f"")
return "
".join(lines)
return "\n".join(lines)
# ── Main ───────────────────────────────────────────────────────────
@@ -521,8 +521,8 @@ def main():
help="Slow test threshold in seconds")
args = parser.parse_args()
global SLOW_TEST_THRESHOLD_S
SLOW_TEST_THRESHOLD_S = args.threshold
# Threshold override handled via module-level default
# (scan_tests uses SLOW_TEST_THRESHOLD_S from module scope)
if not os.path.isdir(args.repo):
print(f"Error: {args.repo} is not a directory", file=sys.stderr)

View File

@@ -0,0 +1,351 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -10,37 +10,273 @@ Usage:
"""
import argparse
import ast
import json
import os
import sys
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Tuple
def generate_proposals():
"""Generate sample proposals for this engine."""
# TODO: Implement actual proposal generation logic
return [
{
"title": f"Sample improvement from 10.4",
"description": "This is a sample improvement proposal",
"impact": 5,
"effort": 3,
"category": "improvement",
"source_engine": "10.4",
"timestamp": datetime.now(timezone.utc).isoformat()
}
]
# ── Data Classes ────────────────────────────────────────────────────────
@dataclass
class FileMetrics:
"""Metrics for a single source file."""
path: str
lines: int = 0
complexity: float = 0.0
max_complexity: int = 0
functions: int = 0
classes: int = 0
churn_30d: int = 0
churn_90d: int = 0
test_coverage: Optional[float] = None
refactoring_score: float = 0.0
# ── Complexity Analysis ─────────────────────────────────────────────────
class ComplexityVisitor(ast.NodeVisitor):
"""AST visitor that computes cyclomatic complexity per function."""
def __init__(self):
self.complexities = []
self.function_count = 0
self.class_count = 0
self._current_complexity = 0
self._in_function = False
def visit_FunctionDef(self, node):
self.function_count += 1
old_complexity = self._current_complexity
old_in_function = self._in_function
self._current_complexity = 1 # Base complexity
self._in_function = True
self.generic_visit(node)
self.complexities.append(self._current_complexity)
self._current_complexity = old_complexity
self._in_function = old_in_function
visit_AsyncFunctionDef = visit_FunctionDef
def visit_ClassDef(self, node):
self.class_count += 1
self.generic_visit(node)
def visit_If(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def visit_For(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
visit_AsyncFor = visit_For
def visit_While(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def visit_ExceptHandler(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def visit_With(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
visit_AsyncWith = visit_With
def visit_Assert(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def visit_BoolOp(self, node):
# Each 'and'/'or' adds a branch
if self._in_function:
self._current_complexity += len(node.values) - 1
self.generic_visit(node)
def visit_IfExp(self, node):
# Ternary expression
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]:
"""
Compute cyclomatic complexity for a Python file.
Returns:
(avg_complexity, max_complexity, function_count, class_count, line_count)
"""
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
source = f.read()
except (IOError, OSError):
return 0.0, 0, 0, 0, 0
try:
tree = ast.parse(source, filename=filepath)
except SyntaxError:
return 0.0, 0, 0, 0, 0
visitor = ComplexityVisitor()
visitor.visit(tree)
line_count = len(source.splitlines())
if not visitor.complexities:
# No functions, but might have classes
return 0.0, 0, visitor.function_count, visitor.class_count, line_count
avg = sum(visitor.complexities) / len(visitor.complexities)
max_c = max(visitor.complexities)
return avg, max_c, visitor.function_count, visitor.class_count, line_count
# ── Refactoring Score ───────────────────────────────────────────────────
def calculate_refactoring_score(metrics: FileMetrics) -> float:
"""
Calculate a refactoring priority score (0-100) based on file metrics.
Higher score = higher priority for refactoring.
Components:
- Complexity (0-30 points): higher avg/max complexity = higher score
- Size (0-20 points): larger files = higher score
- Churn (0-30 points): more changes recently = higher score
- Coverage (0-20 points): lower test coverage = higher score
"""
score = 0.0
# Complexity component (0-30)
# avg=10+ or max=20+ → 30 points
complexity_score = min(30.0, (metrics.complexity * 2) + (metrics.max_complexity * 0.5))
score += max(0.0, complexity_score)
# Size component (0-20)
# 500+ lines → 20 points
size_score = min(20.0, metrics.lines / 25.0)
score += max(0.0, size_score)
# Churn component (0-30)
# Weighted: recent churn (30d) counts more than older (90d)
churn_score = min(30.0, (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5))
score += max(0.0, churn_score)
# Coverage component (0-20)
# Lower coverage → higher score
if metrics.test_coverage is not None:
# coverage=0 → 20 points, coverage=1 → 0 points
coverage_score = (1.0 - metrics.test_coverage) * 20.0
else:
# No data → assume medium risk (10 points)
coverage_score = 10.0
score += max(0.0, coverage_score)
return min(100.0, max(0.0, score))
# ── Proposal Generation ─────────────────────────────────────────────────
def scan_directory(directory: str, extensions: tuple = ('.py',)) -> list:
"""Scan directory for source files."""
files = []
for root, dirs, filenames in os.walk(directory):
# Skip hidden dirs and common non-source dirs
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in (
'__pycache__', 'node_modules', 'venv', '.venv', 'env',
'build', 'dist', '.git', '.tox'
)]
for fname in filenames:
if any(fname.endswith(ext) for ext in extensions):
files.append(os.path.join(root, fname))
return files
def generate_proposals(directory: str = '.', min_score: float = 30.0) -> list:
"""Generate refactoring proposals by analyzing source files."""
proposals = []
files = scan_directory(directory)
for filepath in files:
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
if funcs == 0 and classes == 0:
continue
metrics = FileMetrics(
path=filepath,
lines=lines,
complexity=avg,
max_complexity=max_c,
functions=funcs,
classes=classes
)
score = calculate_refactoring_score(metrics)
metrics.refactoring_score = score
if score >= min_score:
reasons = []
if max_c > 10:
reasons.append(f"high max complexity ({max_c})")
if avg > 5:
reasons.append(f"high avg complexity ({avg:.1f})")
if lines > 300:
reasons.append(f"large file ({lines} lines)")
proposals.append({
"title": f"Refactor {os.path.basename(filepath)} (score: {score:.0f})",
"description": f"{filepath}: {', '.join(reasons) if reasons else 'general improvement candidate'}",
"impact": min(10, int(score / 10)),
"effort": min(10, max(1, int(max_c / 3))),
"category": "refactoring",
"source_engine": "10.4",
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {
"path": filepath,
"score": round(score, 2),
"avg_complexity": round(avg, 2),
"max_complexity": max_c,
"lines": lines,
"functions": funcs,
"classes": classes
}
})
# Sort by score descending
proposals.sort(key=lambda p: p.get('metrics', {}).get('score', 0), reverse=True)
return proposals
# ── CLI ─────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases")
parser.add_argument("--output", required=True, help="Output file for proposals")
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
parser.add_argument("--directory", default=".", help="Directory to scan")
parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold")
args = parser.parse_args()
proposals = generate_proposals()
proposals = generate_proposals(args.directory, args.min_score)
if not args.dry_run:
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
with open(args.output, "w") as f:
json.dump({"proposals": proposals}, f, indent=2)
print(f"Generated {len(proposals)} proposals -> {args.output}")

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

103
scripts/test_wiki.py Normal file
View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""Smoke tests for scripts/wiki.py — retrieval and lint basics."""
import json
import os
import sys
import tempfile
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
import wiki
def test_retrieve_facts():
"""Test fact retrieval ranking."""
with tempfile.TemporaryDirectory() as tmpdir:
kdir = Path(tmpdir) / "knowledge"
kdir.mkdir()
index = {
"version": 1,
"total_facts": 3,
"facts": [
{
"id": "test:fact:001",
"fact": "Gitea token is stored at ~/.config/gitea/token",
"category": "tool-quirk",
"domain": "global",
"confidence": 0.95,
"tags": ["token", "gitea", "auth"],
"last_confirmed": "2026-04-01"
},
{
"id": "test:fact:002",
"fact": "Use gitea-api-first-burn worker for large repos",
"category": "pattern",
"domain": "timmy-config",
"confidence": 0.9,
"tags": ["gitea", "burn", "api"],
},
{
"id": "test:fact:003",
"fact": "Hermes gateway restarts required after Telegram config changes",
"category": "pitfall",
"domain": "hermes-agent",
"confidence": 0.85,
"tags": ["telegram", "gateway"],
}
]
}
index_path = kdir / "index.json"
with open(index_path, 'w') as f:
json.dump(index, f)
original_index = wiki.INDEX_PATH
wiki.INDEX_PATH = index_path
try:
results = wiki.retrieve_facts("where is gitea token stored?", limit=5)
assert len(results) >= 1, f"Expected at least 1 result, got {len(results)}"
assert results[0]['id'] == 'test:fact:001', f"Expected fact 001 first, got {results[0]['id']}"
print(" [PASS] retrieve_facts ranks correctly")
results2 = wiki.retrieve_facts("gitea burn large repos", limit=5)
assert len(results2) >= 1
assert results2[0]['id'] == 'test:fact:002'
print(" [PASS] tag-based retrieval works")
finally:
wiki.INDEX_PATH = original_index
def test_format_context():
"""Test context formatting for LLM."""
facts = [
{"id": "a:1", "fact": "Test fact A", "category": "fact", "confidence": 0.9},
{"id": "b:2", "fact": "Test fact B", "category": "pitfall", "confidence": 0.8},
]
ctx = wiki.format_facts_as_context(facts)
assert "[1]" in ctx and "a:1" in ctx
assert "Test fact A" in ctx
assert "Test fact B" in ctx
print(" [PASS] format_facts_as_context includes IDs and facts")
def test_detect_contradictions():
"""Test contradiction detection."""
index = {
"facts": [
{"id": "x:1", "fact": "Deploy uses port 22 for SSH", "category": "fact", "domain": "deploy"},
{"id": "x:2", "fact": "Deploy uses SSH on port 22", "category": "fact", "domain": "deploy"},
{"id": "x:3", "fact": "Cron jobs require model field", "category": "pitfall", "domain": "hermes-agent"},
]
}
contradictions = wiki.detect_contradictions(index)
assert len(contradictions) >= 1, "Expected at least one potential contradiction"
found = any('x:1' in c.get('fact_a','') or 'x:1' in c.get('fact_b','') for c in contradictions)
assert found, "Should detect similarity between x:1 and x:2"
print(" [PASS] detect_contradictions flags similar facts")
if __name__ == "__main__":
print("Running wiki module smoke tests...")
test_retrieve_facts()
test_format_context()
test_detect_contradictions()
print("\nAll wiki tests passed.")

353
scripts/wiki.py Normal file
View File

@@ -0,0 +1,353 @@
#!/usr/bin/env python3
"""
LLM Wiki layer — ingest, query, lint, and session crystallization for compounding-intelligence.
This is the sovereign knowledge interface: a compiled, queryable, lintable
knowledge base that survivies beyond sessions and cites its sources.
Distinct from:
- RAG: Raw chunk retrieval without synthesis or quality gating
- Transcript search: Keyword match over raw session logs without distillation
The Wiki layer sits on top of the knowledge/ index (facts with provenance).
It provides:
ingest — Harvest knowledge from sessions or raw sources
query — Retrieve + synthesize answers with citations
lint — Detect staleness, contradictions, broken links
crystal — (via harvester) session distillation already integrated
Usage:
python3 scripts/wiki.py ingest --session ~/.hermes/sessions/xxx.jsonl
python3 scripts/wiki.py query "How do I fix cron timeouts?"
python3 scripts/wiki.py lint
"""
import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict, Any
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
INDEX_PATH = KNOWLEDGE_DIR / "index.json"
# ---------- Utilities ----------
def load_index() -> dict:
if not INDEX_PATH.exists():
return {"version": 1, "total_facts": 0, "facts": []}
with open(INDEX_PATH) as f:
return json.load(f)
def score_fact_for_query(fact: dict, query_terms: set, query_lower: str) -> float:
"""Simple BM25-like relevance scoring for fact retrieval."""
fact_text = fact.get('fact', '').lower()
fact_tags = [t.lower() for t in fact.get('tags', [])]
# Term frequency in fact text
tf = sum(1 for term in query_terms if term in fact_text)
# Tag boost: exact tag match gives strong signal
tag_boost = sum(3.0 for tag in fact_tags if tag in query_lower)
# Confidence boost
confidence = fact.get('confidence', 0.5)
# Recency boost: newer facts get slight preference
last_confirmed = fact.get('last_confirmed', '')
recency_boost = 0.0
if last_confirmed:
try:
dt = datetime.fromisoformat(last_confirmed.rstrip('Z'))
days_old = (datetime.now(timezone.utc) - dt).days
recency_boost = max(0, 1.0 - days_old / 365)
except Exception:
pass
score = (tf * 1.0) + (tag_boost * confidence) + (recency_boost * 0.5)
return score
def retrieve_facts(query: str, limit: int = 10) -> List[dict]:
"""Retrieve the most relevant facts for a query from index.json."""
index = load_index()
facts = index.get('facts', [])
query_lower = query.lower()
query_terms = {t for t in re.split(r'\W+', query_lower) if len(t) > 2}
scored = []
for fact in facts:
score = score_fact_for_query(fact, query_terms, query_lower)
if score > 0:
scored.append((score, fact))
scored.sort(key=lambda x: -x[0])
return [f for _, f in scored[:limit]]
def format_facts_as_context(facts: List[dict]) -> str:
"""Format retrieved facts into a context block for LLM synthesis."""
lines = []
for i, fact in enumerate(facts, 1):
fid = fact.get('id', 'unknown')
fact_text = fact.get('fact', '')
confidence = fact.get('confidence', 0.5)
category = fact.get('category', 'fact')
lines.append(f"[{i}] ID:{fid} | {category} (conf={confidence:.2f}): {fact_text}")
return "\n".join(lines)
def find_api_key() -> str:
for p in [
Path.home() / ".config/nous/key",
Path.home() / ".hermes/keymaxxing/active/minimax.key",
Path.home() / ".config/openrouter/key",
]:
if p.exists():
return p.read_text().strip()
return os.environ.get("HARVESTER_API_KEY") or os.environ.get("OPENROUTER_API_KEY") or ""
def call_llm_synthesize(query: str, context: str, api_base: str, api_key: str, model: str) -> str:
"""Call LLM to synthesize answer from retrieved facts."""
import urllib.request
prompt = f"""You are the LLM Wiki answering from the sovereign knowledge base.
Knowledge facts (with citations):
{context}
Question: {query}
Instructions:
- Answer ONLY from the provided facts. Do not use outside knowledge.
- Cite facts using their [N] index number(s) in brackets.
- If the facts don't contain the answer, say "I don't know from the current knowledge base."
- Be concise (2-3 sentences maximum)."""
messages = [
{"role": "system", "content": "You are a precise knowledge assistant."},
{"role": "user", "content": prompt}
]
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.1,
"max_tokens": 512
}).encode('utf-8')
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read().decode('utf-8'))
return result["choices"][0]["message"]["content"].strip()
except Exception as e:
return f"[ERROR: LLM call failed: {e}]"
def detect_contradictions(index: dict) -> List[dict]:
"""Detect potentially contradictory facts in the same domain/category."""
contradictions = []
facts = index.get('facts', [])
from collections import defaultdict
grouped = defaultdict(list)
for f in facts:
key = (f.get('domain', 'global'), f.get('category', 'fact'))
grouped[key].append(f)
for key, group in grouped.items():
if len(group) < 2:
continue
for i in range(len(group)):
for j in range(i+1, len(group)):
f1, f2 = group[i], group[j]
text1 = f1.get('fact', '').lower()
text2 = f2.get('fact', '').lower()
words1 = set(re.findall(r'\w+', text1))
words2 = set(re.findall(r'\w+', text2))
if len(words1 & words2) >= 3:
contradictions.append({
"type": "potential_contradiction",
"domain": key[0],
"category": key[1],
"fact_a": f1.get('id'),
"fact_b": f2.get('id'),
"similarity": len(words1 & words2) / max(len(words1), len(words2))
})
return contradictions
def lint_knowledge() -> dict:
"""Run all lint checks: freshness, duplicates, contradictions."""
results = {"errors": [], "warnings": [], "suggestions": []}
index = load_index()
facts = index.get('facts', [])
# 1. Freshness check via freshness.py
try:
freshness_script = SCRIPT_DIR / "freshness.py"
if freshness_script.exists():
proc = subprocess.run(
[sys.executable, str(freshness_script), "--knowledge-dir", str(KNOWLEDGE_DIR)],
capture_output=True, text=True, timeout=30
)
if proc.returncode != 0:
results["errors"].append(f"freshness.py failed: {proc.stderr[:200]}")
except Exception as e:
results["errors"].append(f"Could not run freshness check: {e}")
# 2. Duplicate fact text
seen = {}
for f in facts:
txt = f.get('fact', '').strip().lower()
if txt in seen:
results["warnings"].append(f"Duplicate fact text: {txt[:80]}... IDs: {seen[txt]}, {f.get('id')}")
else:
seen[txt] = f.get('id')
# 3. Contradictions
contradictions = detect_contradictions(index)
for c in contradictions:
results["warnings"].append(
f"Potential contradiction in {c['domain']}/{c['category']}: "
f"{c['fact_a']} vs {c['fact_b']} (similarity={c['similarity']:.2f})"
)
return results
# ---------- Subcommands ----------
def cmd_query(args):
"""Query the wiki: retrieve + synthesize."""
if not INDEX_PATH.exists():
print("ERROR: knowledge/index.json not found. Run ingest first.", file=sys.stderr)
return 1
query = args.query
top_k = args.top or 10
facts = retrieve_facts(query, limit=top_k)
if not facts:
print("No relevant facts found in knowledge base.")
return 0
print(f"→ Retrieved {len(facts)} facts:")
for i, f in enumerate(facts, 1):
fid = f.get('id', '?')
print(f" [{i}] {fid}: {f.get('fact', '')[:90]}")
if args.dry_run:
print("\n[dry-run] Skipping LLM synthesis.")
return 0
api_key = find_api_key()
if not api_key:
print("ERROR: No API key. Set HARVESTER_API_KEY or OPENROUTER_API_KEY.", file=sys.stderr)
return 1
api_base = os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
model = os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
context = format_facts_as_context(facts)
answer = call_llm_synthesize(query, context, api_base, api_key, model)
print(f"\n← Answer: {answer}")
return 0
def cmd_ingest(args):
"""Ingest knowledge from a session transcript."""
session = args.session
if not os.path.exists(session):
print(f"ERROR: Session file not found: {session}", file=sys.stderr)
return 1
harvester = SCRIPT_DIR / "harvester.py"
if not harvester.exists():
print("ERROR: harvester.py not found", file=sys.stderr)
return 1
cmd = [sys.executable, str(harvester), "--session", session, "--output", str(KNOWLEDGE_DIR)]
if args.dry_run:
cmd.append("--dry-run")
env = os.environ.copy()
env["PYTHONPATH"] = str(REPO_ROOT)
result = subprocess.run(cmd, env=env)
return result.returncode
def cmd_lint(args):
"""Lint the knowledge base for quality issues."""
results = lint_knowledge()
if results["errors"]:
print("ERRORS:")
for e in results["errors"]:
print(f"{e}")
return 1
if results["warnings"]:
print(f"WARNINGS ({len(results['warnings'])}):")
for w in results["warnings"]:
print(f"{w}")
else:
print("✓ No lint issues found. Knowledge base is clean.")
return 0 if not results["errors"] else 1
def cmd_crystallize(args):
"""Alias for ingest — session crystallization."""
return cmd_ingest(args)
def main():
parser = argparse.ArgumentParser(
description="LLM Wiki layer — ingest, query, lint, crystallize",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 scripts/wiki.py query "How do I fix cron timeouts?"
python3 scripts/wiki.py ingest --session ~/.hermes/sessions/abc.jsonl
python3 scripts/wiki.py lint
python3 scripts/wiki.py crystal --session session.jsonl
"""
)
sub = parser.add_subparsers(dest="command", help="Wiki command")
qp = sub.add_parser("query", help="Ask the wiki a question (RAG + synthesis)")
qp.add_argument("query", help="Natural language question")
qp.add_argument("--top", type=int, default=10, help="Number of facts to retrieve")
qp.add_argument("--dry-run", action="store_true", help="Show retrieval but skip LLM")
qp.set_defaults(func=cmd_query)
ip = sub.add_parser("ingest", help="Ingest a session transcript into knowledge")
ip.add_argument("--session", required=True, help="Path to session JSONL file")
ip.add_argument("--dry-run", action="store_true", help="Preview without writing")
ip.set_defaults(func=cmd_ingest)
lp = sub.add_parser("lint", help="Check knowledge base for issues")
lp.set_defaults(func=cmd_lint)
cp = sub.add_parser("crystal", help="Crystallize a session into durable pages")
cp.add_argument("--session", required=True, help="Path to session JSONL file")
cp.add_argument("--dry-run", action="store_true", help="Preview without writing")
cp.set_defaults(func=cmd_crystallize)
args = parser.parse_args()
if not args.command:
parser.print_help()
return 1
return args.func(args)
if __name__ == "__main__":
sys.exit(main())

207
tests/test_dedup.py Normal file
View File

@@ -0,0 +1,207 @@
"""Tests for knowledge deduplication module (Issue #196)."""
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from dedup import (
normalize_text,
content_hash,
tokenize,
token_similarity,
quality_score,
merge_facts,
dedup_facts,
generate_test_duplicates,
)
class TestNormalize:
def test_lowercases(self):
assert normalize_text("Hello World") == "hello world"
def test_collapses_whitespace(self):
assert normalize_text(" hello world ") == "hello world"
def test_strips(self):
assert normalize_text(" text ") == "text"
class TestContentHash:
def test_deterministic(self):
h1 = content_hash("Hello World")
h2 = content_hash("hello world")
h3 = content_hash(" Hello World ")
assert h1 == h2 == h3
def test_different_texts(self):
h1 = content_hash("Hello")
h2 = content_hash("World")
assert h1 != h2
def test_returns_hex(self):
h = content_hash("test")
assert len(h) == 64 # SHA256
assert all(c in '0123456789abcdef' for c in h)
class TestTokenize:
def test_extracts_words(self):
tokens = tokenize("Hello World Test")
assert "hello" in tokens
assert "world" in tokens
assert "test" in tokens
def test_skips_short_words(self):
tokens = tokenize("a to is the hello")
assert "a" not in tokens
assert "to" not in tokens
assert "hello" in tokens
def test_returns_set(self):
tokens = tokenize("hello hello world")
assert isinstance(tokens, set)
assert len(tokens) == 2
class TestTokenSimilarity:
def test_identical(self):
assert token_similarity("hello world", "hello world") == 1.0
def test_no_overlap(self):
assert token_similarity("alpha beta", "gamma delta") == 0.0
def test_partial_overlap(self):
sim = token_similarity("hello world test", "hello universe test")
assert 0.3 < sim < 0.7
def test_empty(self):
assert token_similarity("", "hello") == 0.0
assert token_similarity("hello", "") == 0.0
def test_symmetric(self):
a = "hello world test"
b = "hello universe test"
assert token_similarity(a, b) == token_similarity(b, a)
class TestQualityScore:
def test_high_confidence(self):
fact = {"confidence": 0.95, "source_count": 5, "tags": ["test"], "related": ["x"]}
score = quality_score(fact)
assert score > 0.7
def test_low_confidence(self):
fact = {"confidence": 0.3, "source_count": 1}
score = quality_score(fact)
assert score < 0.5
def test_defaults(self):
score = quality_score({})
assert 0 < score < 1
class TestMergeFacts:
def test_merges_tags(self):
keep = {"id": "a", "fact": "test", "tags": ["git"], "confidence": 0.9}
drop = {"id": "b", "fact": "test", "tags": ["python"], "confidence": 0.8}
merged = merge_facts(keep, drop)
assert "git" in merged["tags"]
assert "python" in merged["tags"]
def test_merges_source_count(self):
keep = {"id": "a", "fact": "test", "source_count": 3}
drop = {"id": "b", "fact": "test", "source_count": 2}
merged = merge_facts(keep, drop)
assert merged["source_count"] == 5
def test_keeps_higher_confidence(self):
keep = {"id": "a", "fact": "test", "confidence": 0.7}
drop = {"id": "b", "fact": "test", "confidence": 0.9}
merged = merge_facts(keep, drop)
assert merged["confidence"] == 0.9
def test_tracks_merged_from(self):
keep = {"id": "a", "fact": "test"}
drop = {"id": "b", "fact": "test"}
merged = merge_facts(keep, drop)
assert "b" in merged["_merged_from"]
class TestDedupFacts:
def test_removes_exact_dupes(self):
facts = [
{"id": "1", "fact": "Always use git rebase"},
{"id": "2", "fact": "Always use git rebase"}, # exact dupe
{"id": "3", "fact": "Check logs first"},
]
deduped, stats = dedup_facts(facts)
assert stats["exact_dupes"] == 1
assert stats["unique"] == 2
def test_removes_near_dupes(self):
facts = [
{"id": "1", "fact": "Always check logs before deploying to production server"},
{"id": "2", "fact": "Always check logs before deploying to production environment"},
{"id": "3", "fact": "Use docker compose for local development environments"},
]
deduped, stats = dedup_facts(facts, near_threshold=0.5)
assert stats["near_dupes"] >= 1
assert stats["unique"] == 2
def test_preserves_unique(self):
facts = [
{"id": "1", "fact": "Use git rebase for clean history"},
{"id": "2", "fact": "Docker containers should be stateless"},
{"id": "3", "fact": "Always write tests before code"},
]
deduped, stats = dedup_facts(facts)
assert stats["unique"] == 3
assert stats["removed"] == 0
def test_empty_input(self):
deduped, stats = dedup_facts([])
assert stats["total"] == 0
assert stats["unique"] == 0
def test_keeps_higher_quality_near_dup(self):
facts = [
{"id": "1", "fact": "Check logs before deploying to production server", "confidence": 0.5, "source_count": 1},
{"id": "2", "fact": "Check logs before deploying to production environment", "confidence": 0.9, "source_count": 5, "tags": ["ops"]},
]
deduped, stats = dedup_facts(facts, near_threshold=0.5)
assert stats["unique"] == 1
# Higher quality fact should be kept
assert deduped[0]["confidence"] == 0.9
def test_dry_run_does_not_modify(self):
facts = [
{"id": "1", "fact": "Same text"},
{"id": "2", "fact": "Same text"},
]
deduped, stats = dedup_facts(facts, dry_run=True)
assert stats["exact_dupes"] == 1
# In dry_run, merge_facts is skipped so facts aren't modified
assert len(deduped) == 1
class TestGenerateTestDuplicates:
def test_generates_correct_count(self):
facts = generate_test_duplicates(20)
assert len(facts) > 20 # 20 unique + duplicates
def test_has_exact_dupes(self):
facts = generate_test_duplicates(20)
hashes = [content_hash(f["fact"]) for f in facts]
# Should have some duplicate hashes
assert len(hashes) != len(set(hashes))
def test_dedup_removes_dupes(self):
facts = generate_test_duplicates(20)
deduped, stats = dedup_facts(facts)
assert stats["unique"] <= 20
assert stats["removed"] > 0

227
tests/test_freshness.py Normal file
View File

@@ -0,0 +1,227 @@
#!/usr/bin/env python3
"""Tests for scripts/freshness.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.join(os.path.dirname(__file__) or ".", ".."))
import importlib.util
spec = importlib.util.spec_from_file_location(
"freshness", os.path.join(os.path.dirname(__file__) or ".", "..", "scripts", "freshness.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
compute_file_hash = mod.compute_file_hash
check_freshness = mod.check_freshness
load_knowledge_entries = mod.load_knowledge_entries
def test_compute_file_hash():
"""File hash should be computed correctly."""
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write("test content")
f.flush()
h = compute_file_hash(f.name)
assert h is not None
assert h.startswith("sha256:")
os.unlink(f.name)
print("PASS: test_compute_file_hash")
def test_compute_file_hash_nonexistent():
"""Nonexistent file should return None."""
h = compute_file_hash("/nonexistent/file.txt")
assert h is None
print("PASS: test_compute_file_hash_nonexistent")
def test_load_knowledge_entries_empty():
"""Empty knowledge dir should return empty list."""
with tempfile.TemporaryDirectory() as tmpdir:
entries = load_knowledge_entries(tmpdir)
assert entries == []
print("PASS: test_load_knowledge_entries_empty")
def test_load_knowledge_entries_from_index():
"""Should load entries from index.json."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create index.json
index_path = os.path.join(tmpdir, "index.json")
with open(index_path, "w") as f:
json.dump({
"facts": [
{
"fact": "Test fact",
"source_file": "test.py",
"source_hash": "sha256:abc123",
"category": "fact",
"confidence": 0.9
}
]
}, f)
entries = load_knowledge_entries(tmpdir)
assert len(entries) == 1
assert entries[0]["fact"] == "Test fact"
assert entries[0]["source_file"] == "test.py"
print("PASS: test_load_knowledge_entries_from_index")
def test_load_knowledge_entries_from_yaml():
"""Should load entries from YAML files."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create global directory
global_dir = os.path.join(tmpdir, "global")
os.makedirs(global_dir)
# Create YAML file
yaml_path = os.path.join(global_dir, "test.yaml")
with open(yaml_path, "w") as f:
f.write("""
pitfalls:
- description: "Test pitfall"
source_file: "test.py"
source_hash: "sha256:def456"
category: "pitfall"
confidence: 0.8
""")
entries = load_knowledge_entries(tmpdir)
assert len(entries) == 1
assert entries[0]["fact"] == "Test pitfall"
assert entries[0]["category"] == "pitfall"
print("PASS: test_load_knowledge_entries_from_yaml")
def test_check_freshness_no_changes():
"""With no source file reference, entries should be counted correctly."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create knowledge dir
knowledge_dir = os.path.join(tmpdir, "knowledge")
os.makedirs(knowledge_dir)
# Create repo dir
repo_dir = os.path.join(tmpdir, "repo")
os.makedirs(repo_dir)
# Create index.json with entry that has no source_file
index_path = os.path.join(knowledge_dir, "index.json")
with open(index_path, "w") as f:
json.dump({
"facts": [
{
"fact": "General knowledge",
"category": "fact",
"confidence": 0.9
# No source_file or source_hash
}
]
}, f)
result = check_freshness(knowledge_dir, repo_dir, days=1)
# Entry without source_file should be counted as "fresh" (no_source status)
assert result["summary"]["total"] == 1
assert result["summary"]["stale"] == 0
assert result["summary"]["fresh"] == 1
assert result["fresh_entries"][0]["status"] == "no_source"
print("PASS: test_check_freshness_no_changes")
def test_check_freshness_with_hash_mismatch():
"""Hash mismatch should mark entry as stale."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create knowledge dir
knowledge_dir = os.path.join(tmpdir, "knowledge")
os.makedirs(knowledge_dir)
# Create repo dir with a file
repo_dir = os.path.join(tmpdir, "repo")
os.makedirs(repo_dir)
test_file = os.path.join(repo_dir, "test.py")
with open(test_file, "w") as f:
f.write("print('hello')")
# Create index.json with wrong hash
index_path = os.path.join(knowledge_dir, "index.json")
with open(index_path, "w") as f:
json.dump({
"facts": [
{
"fact": "Test fact",
"source_file": "test.py",
"source_hash": "sha256:wronghash",
"category": "fact",
"confidence": 0.9
}
]
}, f)
# Initialize git repo
os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null")
result = check_freshness(knowledge_dir, repo_dir, days=1)
assert result["summary"]["total"] == 1
assert result["summary"]["stale"] == 1
assert result["summary"]["fresh"] == 0
assert result["stale_entries"][0]["reason"] == "hash_mismatch"
print("PASS: test_check_freshness_with_hash_mismatch")
def test_check_freshness_missing_source():
"""Missing source file should mark entry as stale."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create knowledge dir
knowledge_dir = os.path.join(tmpdir, "knowledge")
os.makedirs(knowledge_dir)
# Create repo dir (without the referenced file)
repo_dir = os.path.join(tmpdir, "repo")
os.makedirs(repo_dir)
# Create index.json referencing nonexistent file
index_path = os.path.join(knowledge_dir, "index.json")
with open(index_path, "w") as f:
json.dump({
"facts": [
{
"fact": "Test fact",
"source_file": "nonexistent.py",
"source_hash": "sha256:abc123",
"category": "fact",
"confidence": 0.9
}
]
}, f)
# Initialize git repo
os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null")
result = check_freshness(knowledge_dir, repo_dir, days=1)
assert result["summary"]["total"] == 1
assert result["summary"]["stale"] == 1
assert result["summary"]["fresh"] == 0
assert result["stale_entries"][0]["reason"] == "source_missing"
print("PASS: test_check_freshness_missing_source")
def run_all():
test_compute_file_hash()
test_compute_file_hash_nonexistent()
test_load_knowledge_entries_empty()
test_load_knowledge_entries_from_index()
test_load_knowledge_entries_from_yaml()
test_check_freshness_no_changes()
test_check_freshness_with_hash_mismatch()
test_check_freshness_missing_source()
print("\nAll 8 tests passed!")
if __name__ == "__main__":
run_all()

108
tests/test_quality_gate.py Normal file
View File

@@ -0,0 +1,108 @@
"""
Tests for quality_gate.py — Knowledge entry quality scoring.
"""
import unittest
from datetime import datetime, timezone, timedelta
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from quality_gate import (
score_specificity,
score_actionability,
score_freshness,
score_source_quality,
score_entry,
filter_entries,
)
class TestScoreSpecificity(unittest.TestCase):
def test_specific_content_scores_high(self):
content = "Run `python3 deploy.py --env prod` on 2026-04-15. Example: step 1 configure nginx."
score = score_specificity(content)
self.assertGreater(score, 0.6)
def test_vague_content_scores_low(self):
content = "It generally depends. Various factors might affect this. Basically, it varies."
score = score_specificity(content)
self.assertLess(score, 0.5)
def test_empty_scores_baseline(self):
score = score_specificity("")
self.assertAlmostEqual(score, 0.5, delta=0.1)
class TestScoreActionability(unittest.TestCase):
def test_actionable_content_scores_high(self):
content = "1. Run `pip install -r requirements.txt`\n2. Execute `python3 train.py`\n3. Verify with `pytest`"
score = score_actionability(content)
self.assertGreater(score, 0.6)
def test_abstract_content_scores_low(self):
content = "The concept of intelligence is fascinating and multifaceted."
score = score_actionability(content)
self.assertLess(score, 0.5)
class TestScoreFreshness(unittest.TestCase):
def test_recent_timestamp_scores_high(self):
recent = datetime.now(timezone.utc).isoformat()
score = score_freshness(recent)
self.assertGreater(score, 0.9)
def test_old_timestamp_scores_low(self):
old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
score = score_freshness(old)
self.assertLess(score, 0.2)
def test_none_returns_baseline(self):
score = score_freshness(None)
self.assertEqual(score, 0.5)
class TestScoreSourceQuality(unittest.TestCase):
def test_claude_scores_high(self):
self.assertGreater(score_source_quality("claude-sonnet"), 0.85)
def test_ollama_scores_lower(self):
self.assertLess(score_source_quality("ollama"), 0.7)
def test_unknown_returns_default(self):
self.assertEqual(score_source_quality("unknown"), 0.5)
class TestScoreEntry(unittest.TestCase):
def test_good_entry_scores_high(self):
entry = {
"content": "To deploy: run `kubectl apply -f deployment.yaml`. Verify with `kubectl get pods`.",
"model": "claude-sonnet",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
score = score_entry(entry)
self.assertGreater(score, 0.6)
def test_poor_entry_scores_low(self):
entry = {
"content": "It depends. Various things might happen.",
"model": "unknown",
}
score = score_entry(entry)
self.assertLess(score, 0.5)
class TestFilterEntries(unittest.TestCase):
def test_filters_low_quality(self):
entries = [
{"content": "Run `deploy.py` to fix the issue.", "model": "claude"},
{"content": "It might work sometimes.", "model": "unknown"},
{"content": "Configure nginx: step 1 edit nginx.conf", "model": "gpt-4"},
]
filtered = filter_entries(entries, threshold=0.5)
self.assertGreaterEqual(len(filtered), 2)
if __name__ == "__main__":
unittest.main()