Compare commits

..

2 Commits

Author SHA1 Message Date
d7cfda8f03 test: sync refactoring opportunity finder tests (#210)
Some checks failed
Test / pytest (pull_request) Failing after 49s
2026-04-21 11:30:59 +00:00
b172e720e4 feat: implement refactoring opportunity finder — AST complexity + scoring (#210) 2026-04-21 11:30:58 +00:00
15 changed files with 517 additions and 2553 deletions

2
.gitignore vendored
View File

@@ -1,2 +0,0 @@
__pycache__/
*.pyc

412
GENOME.md
View File

@@ -1,16 +1,16 @@
# GENOME.md — compounding-intelligence
**Generated:** 2026-04-17
**Repo:** Timmy_Foundation/compounding-intelligence
**Description:** Turn 1B+ daily agent tokens into durable, compounding fleet intelligence.
*Auto-generated codebase genome. Addresses timmy-home#676.*
---
## Project Overview
Every agent session starts at zero. The same HTTP 405 gets rediscovered as a branch protection issue. The same token path gets searched from scratch. Intelligence evaporates when the session ends.
**What:** A system that turns 1B+ daily agent tokens into durable, compounding fleet intelligence.
Compounding-intelligence solves this with three pipelines forming a loop:
**Why:** Every agent session starts at zero. The same mistakes get made repeatedly — the same HTTP 405 is rediscovered as a branch protection issue, the same token path is searched for from scratch. Intelligence evaporates when the session ends.
**How:** Three pipelines form a compounding loop:
```
SESSION ENDS → HARVESTER → KNOWLEDGE STORE → BOOTSTRAPPER → NEW SESSION STARTS SMARTER
@@ -18,234 +18,222 @@ SESSION ENDS → HARVESTER → KNOWLEDGE STORE → BOOTSTRAPPER → NEW SESSION
MEASURER → Prove it's working
```
**Status:** Active development. Core pipelines implemented. 20+ scripts, 14 test files, knowledge store populated with real data.
**Status:** Early stage. Template and test scaffolding exist. Core pipeline scripts (harvester.py, bootstrapper.py, measurer.py, session_reader.py) are planned but not yet implemented. The knowledge extraction prompt is complete and validated.
---
## Architecture
```mermaid
graph TD
TRANS[Session Transcripts<br/>~/.hermes/sessions/*.jsonl] --> READER[session_reader.py]
READER --> HARVESTER[harvester.py]
HARVESTER -->|LLM extraction| PROMPT[harvest-prompt.md]
HARVESTER --> DEDUP[deduplicate()]
DEDUP --> INDEX[knowledge/index.json]
DEDUP --> GLOBAL[knowledge/global/*.yaml]
DEDUP --> REPO[knowledge/repos/*.yaml]
INDEX --> BOOTSTRAPPER[bootstrapper.py]
BOOTSTRAPPER -->|filter + rank + truncate| CONTEXT[Bootstrap Context<br/>2k token injection]
CONTEXT --> SESSION[New Session starts smarter]
INDEX --> VALIDATOR[validate_knowledge.py]
INDEX --> STALENESS[knowledge_staleness_check.py]
INDEX --> GAPS[knowledge_gap_identifier.py]
TRANS --> SAMPLER[sampler.py]
SAMPLER -->|score + rank| BEST[High-value sessions]
BEST --> HARVESTER
TRANS --> METADATA[session_metadata.py]
METADATA --> SUMMARY[SessionSummary objects]
KNOWLEDGE --> DIFF[diff_analyzer.py]
DIFF --> PROPOSALS[improvement_proposals.py]
PROPOSALS --> PRIORITIES[priority_rebalancer.py]
A[Session Transcript<br/>.jsonl] --> B[Harvester]
B --> C{Extract Knowledge}
C --> D[knowledge/index.json]
C --> E[knowledge/global/*.md]
C --> F[knowledge/repos/{repo}.md]
C --> G[knowledge/agents/{agent}.md]
D --> H[Bootstrapper]
H --> I[Bootstrap Context<br/>2k token injection]
I --> J[New Session<br/>starts smarter]
J --> A
D --> K[Measurer]
K --> L[metrics/dashboard.md]
K --> M[Velocity / Hit Rate<br/>Error Reduction]
```
## Entry Points
### Pipeline 1: Harvester
### Core Pipelines
**Status:** Prompt designed. Script not implemented.
| Script | Purpose | Key Functions |
|--------|---------|---------------|
| `harvester.py` | Extract knowledge from session transcripts | `harvest_session()`, `call_llm()`, `deduplicate()`, `validate_fact()` |
| `bootstrapper.py` | Build pre-session context from knowledge store | `build_bootstrap_context()`, `filter_facts()`, `sort_facts()`, `truncate_to_tokens()` |
| `session_reader.py` | Parse JSONL session transcripts | `read_session()`, `extract_conversation()`, `messages_to_text()` |
| `sampler.py` | Score and rank sessions for harvesting value | `scan_session_fast()`, `score_session()` |
| `session_metadata.py` | Extract structured metadata from sessions | `extract_session_metadata()`, `SessionSummary` |
Reads finished session transcripts (JSONL). Uses `templates/harvest-prompt.md` to extract durable knowledge into five categories:
### Analysis & Quality
| Category | Description | Example |
|----------|-------------|---------|
| `fact` | Concrete, verifiable information | "Repository X has 5 files" |
| `pitfall` | Errors encountered, wrong assumptions | "Token is at ~/.config/gitea/token, not env var" |
| `pattern` | Successful action sequences | "Deploy: test → build → push → webhook" |
| `tool-quirk` | Environment-specific behaviors | "URL format requires trailing slash" |
| `question` | Identified but unanswered | "Need optimal batch size for harvesting" |
| Script | Purpose |
|--------|---------|
| `validate_knowledge.py` | Validate knowledge index schema compliance |
| `knowledge_staleness_check.py` | Detect stale knowledge (source changed since extraction) |
| `knowledge_gap_identifier.py` | Find untested functions, undocumented APIs, missing tests |
| `diff_analyzer.py` | Analyze code diffs for improvement signals |
| `improvement_proposals.py` | Generate ranked improvement proposals |
| `priority_rebalancer.py` | Rebalance priorities across proposals |
| `automation_opportunity_finder.py` | Find manual steps that can be automated |
| `dead_code_detector.py` | Detect unused code |
| `dependency_graph.py` | Map dependency relationships |
| `perf_bottleneck_finder.py` | Find performance bottlenecks |
| `refactoring_opportunity_finder.py` | Identify refactoring targets |
| `gitea_issue_parser.py` | Parse Gitea issues for knowledge extraction |
### Automation
| Script | Purpose |
|--------|---------|
| `session_pair_harvester.py` | Extract training pairs from sessions |
## Data Flow
```
1. Session ends → .jsonl written to ~/.hermes/sessions/
2. sampler.py scores sessions by age, recency, repo coverage
3. harvester.py reads top sessions, calls LLM with harvest-prompt.md
4. LLM extracts facts/pitfalls/patterns/quirks/questions
5. deduplicate() checks against existing index via fact_fingerprint()
6. validate_fact() checks schema compliance
7. write_knowledge() appends to knowledge/index.json + per-repo YAML
8. On next session start, bootstrapper.py:
a. Loads knowledge/index.json
b. Filters by session's repo and agent type
c. Sorts by confidence (high first), then recency
d. Truncates to 2k token budget
e. Injects as pre-context
9. Agent starts with full situational awareness instead of zero
```
## Key Abstractions
### Knowledge Item (fact/pitfall/pattern/quirk/question)
Output schema per knowledge item:
```json
{
"fact": "Gitea token is at ~/.config/gitea/token",
"category": "tool-quirk",
"repo": "global",
"confidence": 0.9,
"evidence": "Found during clone attempt",
"source_session": "2026-04-13_abc123",
"extracted_at": "2026-04-13T20:00:00Z"
"fact": "One sentence description",
"category": "fact|pitfall|pattern|tool-quirk|question",
"repo": "repo-name or 'global'",
"confidence": 0.0-1.0
}
```
### SessionSummary (session_metadata.py)
Extracted metadata per session: duration, token count, tools used, repos touched, error count, outcome.
### Pipeline 2: Bootstrapper
### Gap / GapReport (knowledge_gap_identifier.py)
Structured gap analysis: untested functions, undocumented APIs, missing tests. Severity: critical/high/medium/low.
**Status:** Not implemented.
### Knowledge Index (knowledge/index.json)
Machine-readable fact store. 12KB, populated with real data. Categories: fact, pitfall, pattern, tool-quirk, question.
Queries knowledge store before session start. Assembles a compact 2k-token context from relevant facts. Injects into session startup so the agent begins with full situational awareness.
## Knowledge Store
### Pipeline 3: Measurer
```
knowledge/
├── index.json # Master fact store (12KB, populated)
├── SCHEMA.md # Schema documentation
├── global/
│ ├── pitfalls.yaml # Cross-repo pitfalls (2KB)
│ └── tool-quirks.yaml # Tool-specific quirks (2KB)
├── repos/
│ ├── hermes-agent.yaml # hermes-agent knowledge (2KB)
│ └── the-nexus.yaml # the-nexus knowledge (2KB)
└── agents/ # Per-agent knowledge (empty)
```
**Status:** Not implemented.
## API Surface
### LLM API (consumed)
| Provider | Endpoint | Usage |
|----------|----------|-------|
| Nous Research | `https://inference-api.nousresearch.com/v1` | Knowledge extraction |
| Ollama | `http://localhost:11434/v1` | Local fallback |
### File API (consumed/produced)
| Path | Format | Direction |
|------|--------|-----------|
| `~/.hermes/sessions/*.jsonl` | JSONL | Input (session transcripts) |
| `knowledge/index.json` | JSON | Output (master fact store) |
| `knowledge/global/*.yaml` | YAML | Output (cross-repo knowledge) |
| `knowledge/repos/*.yaml` | YAML | Output (per-repo knowledge) |
| `templates/harvest-prompt.md` | Markdown | Config (extraction prompt) |
## Test Coverage
**14 test files** covering core pipelines:
| Test File | Covers |
|-----------|--------|
| `test_harvest_prompt.py` | Prompt validation, hallucination detection |
| `test_harvest_prompt_comprehensive.py` | Extended prompt testing |
| `test_harvester_pipeline.py` | Harvester extraction + dedup |
| `test_bootstrapper.py` | Context building, filtering, truncation |
| `test_session_pair_harvester.py` | Training pair extraction |
| `test_improvement_proposals.py` | Proposal generation |
| `test_priority_rebalancer.py` | Priority scoring |
| `test_knowledge_staleness.py` | Staleness detection |
| `test_automation_opportunity_finder.py` | Automation detection |
| `test_diff_analyzer.py` | Diff analysis |
| `test_gitea_issue_parser.py` | Issue parsing |
| `test_refactoring_opportunity_finder.py` | Refactoring signals |
| `test_knowledge_gap_identifier.py` | Gap analysis |
| `test_perf_bottleneck_finder.py` | Perf bottleneck detection |
### Coverage Gaps
1. **session_reader.py** — No dedicated test file (tested indirectly)
2. **sampler.py** — No test file (scoring logic untested)
3. **session_metadata.py** — No test file
4. **validate_knowledge.py** — No test file
5. **knowledge_staleness_check.py** — Tested but limited
## Security Considerations
### API Key Handling
- `harvester.py` reads API key from `~/.hermes/auth.json` or env vars
- Key passed to LLM API in request headers only
- No key logging
### Knowledge Integrity
- `validate_fact()` checks schema before writing
- `deduplicate()` prevents duplicate entries via fingerprint
- `knowledge_staleness_check.py` detects when source code changed but knowledge didn't
- Confidence scores prevent low-quality knowledge from polluting the store
### File Safety
- Knowledge writes are append-only (never deletes)
- Bootstrap context is truncated to budget (no prompt injection via knowledge)
- Session reader handles malformed JSONL gracefully
## File Index
```
scripts/
harvester.py (473 lines) — Core knowledge extraction
bootstrapper.py (302 lines) — Pre-session context builder
session_reader.py (137 lines) — JSONL session parser
sampler.py (363 lines) — Session scoring + ranking
session_metadata.py (271 lines) — Session metadata extraction
validate_knowledge.py (44 lines) — Index validation
knowledge_staleness_check.py (125 lines) — Staleness detection
knowledge_gap_identifier.py (291 lines) — Gap analysis engine
diff_analyzer.py (203 lines) — Diff analysis
improvement_proposals.py (518 lines) — Proposal generation
priority_rebalancer.py (745 lines) — Priority scoring
automation_opportunity_finder.py (600 lines) — Automation detection
dead_code_detector.py (270 lines) — Dead code detection
dependency_graph.py (220 lines) — Dependency mapping
perf_bottleneck_finder.py (635 lines) — Perf analysis
refactoring_opportunity_finder.py (46 lines) — Refactoring signals
gitea_issue_parser.py (140 lines) — Gitea issue parsing
session_pair_harvester.py (224 lines) — Training pair extraction
knowledge/
index.json (12KB) — Master fact store
SCHEMA.md (3KB) — Schema docs
global/pitfalls.yaml (2KB) — Cross-repo pitfalls
global/tool-quirks.yaml (2KB) — Tool quirks
repos/hermes-agent.yaml (2KB) — Repo-specific knowledge
repos/the-nexus.yaml (2KB) — Repo-specific knowledge
templates/
harvest-prompt.md (4KB) — Extraction prompt
test_sessions/ (5 files) — Sample transcripts
tests/ + scripts/test_* (14 files)— Test suite
```
**Total:** ~6,500 lines of code across 18 scripts + 14 test files.
Tracks compounding metrics: knowledge velocity (facts/day), error reduction (%), hit rate (knowledge used / knowledge available), task completion improvement.
---
*Generated by Codebase Genome pipeline — Issue #676*
## Directory Structure
```
compounding-intelligence/
├── README.md # Project overview and architecture
├── GENOME.md # This file (codebase genome)
├── knowledge/ # [PLANNED] Knowledge store
│ ├── index.json # Machine-readable fact index
│ ├── global/ # Cross-repo knowledge
│ ├── repos/{repo}.md # Per-repo knowledge
│ └── agents/{agent}.md # Agent-type notes
├── scripts/
│ ├── test_harvest_prompt.py # Basic prompt validation (2.5KB)
│ └── test_harvest_prompt_comprehensive.py # Full prompt structure test (6.8KB)
├── templates/
│ └── harvest-prompt.md # Knowledge extraction prompt (3.5KB)
├── test_sessions/
│ ├── session_success.jsonl # Happy path test data
│ ├── session_failure.jsonl # Failure path test data
│ ├── session_partial.jsonl # Incomplete session test data
│ ├── session_patterns.jsonl # Pattern extraction test data
│ └── session_questions.jsonl # Question identification test data
└── metrics/ # [PLANNED] Compounding metrics
└── dashboard.md
```
---
## Entry Points and Data Flow
### Entry Point 1: Knowledge Extraction (Harvester)
```
Input: Session transcript (JSONL)
templates/harvest-prompt.md (LLM prompt)
Knowledge items (JSON array)
Output: knowledge/index.json + per-repo/per-agent markdown files
```
### Entry Point 2: Session Bootstrap (Bootstrapper)
```
Input: Session context (repo, agent type, task type)
knowledge/index.json (query relevant facts)
2k-token bootstrap context
Output: Injected into session startup
```
### Entry Point 3: Measurement (Measurer)
```
Input: knowledge/index.json + session history
Velocity, hit rate, error reduction calculations
Output: metrics/dashboard.md
```
---
## Key Abstractions
### Knowledge Item
The atomic unit. One sentence, one category, one confidence score. Designed to be small enough that 1000 items fit in a 2k-token bootstrap context.
### Knowledge Store
A directory structure that mirrors the fleet's mental model:
- `global/` — knowledge that applies everywhere (tool quirks, environment facts)
- `repos/` — knowledge specific to each repo
- `agents/` — knowledge specific to each agent type
### Confidence Score
0.01.0 scale. Defines how certain the harvester is about each extracted fact:
- 0.91.0: Explicitly stated with verification
- 0.70.8: Clearly implied by multiple data points
- 0.50.6: Suggested but not fully verified
- 0.30.4: Inferred from limited data
- 0.10.2: Speculative or uncertain
### Bootstrap Context
The 2k-token injection that a new session receives. Assembled from the most relevant knowledge items for the current task, filtered by confidence > 0.7, deduplicated, and compressed.
---
## API Surface
### Internal (scripts not yet implemented)
| Script | Input | Output | Status |
|--------|-------|--------|--------|
| `harvester.py` | Session JSONL path | Knowledge items JSON | PLANNED |
| `bootstrapper.py` | Repo + agent type | 2k-token context string | PLANNED |
| `measurer.py` | Knowledge store path | Metrics JSON | PLANNED |
| `session_reader.py` | Session JSONL path | Parsed transcript | PLANNED |
### Prompt (templates/harvest-prompt.md)
The extraction prompt is the core "API." It takes a session transcript and returns structured JSON. It defines:
- Five extraction categories
- Output format (JSON array of knowledge items)
- Confidence scoring rubric
- Constraints (no hallucination, specificity, relevance, brevity)
- Example input/output pair
---
## Test Coverage
### What Exists
| File | Tests | Coverage |
|------|-------|----------|
| `scripts/test_harvest_prompt.py` | 2 tests | Prompt file existence, sample transcript |
| `scripts/test_harvest_prompt_comprehensive.py` | 5 tests | Prompt structure, categories, fields, confidence scoring, size limits |
| `test_sessions/*.jsonl` | 5 sessions | Success, failure, partial, patterns, questions |
### What's Missing
1. **Harvester integration test** — Does the prompt actually extract correct knowledge from real transcripts?
2. **Bootstrapper test** — Does it assemble relevant context correctly?
3. **Knowledge store test** — Does the index.json maintain consistency?
4. **Confidence calibration test** — Do high-confidence facts actually prove true in later sessions?
5. **Deduplication test** — Are duplicate facts across sessions handled?
6. **Staleness test** — How does the system handle outdated knowledge?
---
## Security Considerations
1. **No secrets in knowledge store** — The harvester must filter out API keys, tokens, and credentials from extracted facts. The prompt constraints mention this but there is no automated guard.
2. **Knowledge poisoning** — A malicious or corrupted session could inject false facts. Confidence scoring partially mitigates this, but there is no verification step.
3. **Access control** — The knowledge store has no access control. Any process that can read the directory can read all facts. In a multi-tenant setup, this is a concern.
4. **Transcript privacy** — Session transcripts may contain user data. The harvester must not extract personally identifiable information into the knowledge store.
---
## The 100x Path (from README)
```
Month 1: 15,000 facts, sessions 20% faster
Month 2: 45,000 facts, sessions 40% faster, first-try success up 30%
Month 3: 90,000 facts, fleet measurably smarter per token
```
Each new session is better than the last. The intelligence compounds.
---
*Generated by codebase-genome pipeline. Ref: timmy-home#676.*

View File

@@ -1,297 +0,0 @@
#!/usr/bin/env python3
"""
quality_gate.py — Score and filter knowledge entries.
Scores each entry on 4 dimensions:
- Specificity: concrete examples vs vague generalities
- Actionability: can this be used to do something?
- Freshness: is this still accurate?
- Source quality: was the model/provider reliable?
Usage:
from quality_gate import score_entry, filter_entries, quality_report
score = score_entry(entry)
filtered = filter_entries(entries, threshold=0.5)
report = quality_report(entries)
"""
import json
import math
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
# Source quality scores (higher = more reliable)
SOURCE_QUALITY = {
"claude-sonnet": 0.9,
"claude-opus": 0.95,
"gpt-4": 0.85,
"gpt-4-turbo": 0.85,
"gpt-5": 0.9,
"mimo-v2-pro": 0.8,
"gemini-pro": 0.8,
"llama-3-70b": 0.75,
"llama-3-8b": 0.7,
"ollama": 0.6,
"unknown": 0.5,
}
DEFAULT_SOURCE_QUALITY = 0.5
# Specificity indicators
SPECIFIC_INDICATORS = [
r"\b\d+\.\d+", # decimal numbers
r"\b\d{4}-\d{2}-\d{2}", # dates
r"\b[A-Z][a-z]+\s[A-Z][a-z]+", # proper nouns
r"`[^`]+`", # code/commands
r"https?://", # URLs
r"\b(example|instance|specifically|concretely)\b",
r"\b(step \d|first|second|third)\b",
r"\b(exactly|precisely|measured|counted)\b",
]
# Vagueness indicators (penalty)
VAGUE_INDICATORS = [
r"\b(generally|usually|often|sometimes|might|could|perhaps)\b",
r"\b(various|several|many|some|few)\b",
r"\b(it depends|varies|differs)\b",
r"\b(basically|essentially|fundamentally)\b",
r"\b(everyone knows|it's obvious|clearly)\b",
]
# Actionability indicators
ACTIONABLE_INDICATORS = [
r"\b(run|execute|install|deploy|configure|set up)\b",
r"\b(use|apply|implement|create|build)\b",
r"\b(check|verify|test|validate|confirm)\b",
r"\b(fix|resolve|solve|debug|troubleshoot)\b",
r"\b(if .+ then|when .+ do|to .+ use)\b",
r"```[a-z]*\n", # code blocks
r"\$\s", # shell commands
r"\b\d+\.\s", # numbered steps
]
def score_specificity(content: str) -> float:
"""Score specificity: 0=vague, 1=very specific."""
content_lower = content.lower()
score = 0.5 # baseline
# Check for specific indicators
specific_count = sum(
len(re.findall(p, content, re.IGNORECASE))
for p in SPECIFIC_INDICATORS
)
# Check for vague indicators
vague_count = sum(
len(re.findall(p, content_lower))
for p in VAGUE_INDICATORS
)
# Adjust score
score += min(specific_count * 0.05, 0.4)
score -= min(vague_count * 0.08, 0.3)
# Length bonus (longer = more detail, up to a point)
word_count = len(content.split())
if word_count > 50:
score += min((word_count - 50) * 0.001, 0.1)
return max(0.0, min(1.0, score))
def score_actionability(content: str) -> float:
"""Score actionability: 0=abstract, 1=highly actionable."""
content_lower = content.lower()
score = 0.3 # baseline (most knowledge is informational)
# Check for actionable indicators
actionable_count = sum(
len(re.findall(p, content_lower))
for p in ACTIONABLE_INDICATORS
)
score += min(actionable_count * 0.1, 0.6)
# Code blocks are highly actionable
if "```" in content:
score += 0.2
# Numbered steps are actionable
if re.search(r"\d+\.\s+\w", content):
score += 0.1
return max(0.0, min(1.0, score))
def score_freshness(timestamp: Optional[str]) -> float:
"""Score freshness: 1=new, decays over time."""
if not timestamp:
return 0.5
try:
if isinstance(timestamp, str):
ts = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
else:
ts = timestamp
now = datetime.now(timezone.utc)
age_days = (now - ts).days
# Exponential decay: 1.0 at day 0, 0.5 at ~180 days, 0.1 at ~365 days
score = math.exp(-age_days / 180)
return max(0.1, min(1.0, score))
except (ValueError, TypeError):
return 0.5
def score_source_quality(model: Optional[str]) -> float:
"""Score source quality based on model/provider."""
if not model:
return DEFAULT_SOURCE_QUALITY
# Normalize model name
model_lower = model.lower()
for key, score in SOURCE_QUALITY.items():
if key in model_lower:
return score
return DEFAULT_SOURCE_QUALITY
def score_entry(entry: dict) -> float:
"""
Score a knowledge entry on quality (0.0-1.0).
Weights:
- specificity: 0.3
- actionability: 0.3
- freshness: 0.2
- source_quality: 0.2
"""
content = entry.get("content", entry.get("text", entry.get("response", "")))
model = entry.get("model", entry.get("provenance", {}).get("model"))
timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp"))
specificity = score_specificity(content)
actionability = score_actionability(content)
freshness = score_freshness(timestamp)
source = score_source_quality(model)
return round(
0.3 * specificity +
0.3 * actionability +
0.2 * freshness +
0.2 * source,
4
)
def score_entry_detailed(entry: dict) -> dict:
"""Score with breakdown."""
content = entry.get("content", entry.get("text", entry.get("response", "")))
model = entry.get("model", entry.get("provenance", {}).get("model"))
timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp"))
specificity = score_specificity(content)
actionability = score_actionability(content)
freshness = score_freshness(timestamp)
source = score_source_quality(model)
return {
"score": round(0.3 * specificity + 0.3 * actionability + 0.2 * freshness + 0.2 * source, 4),
"specificity": round(specificity, 4),
"actionability": round(actionability, 4),
"freshness": round(freshness, 4),
"source_quality": round(source, 4),
}
def filter_entries(entries: List[dict], threshold: float = 0.5) -> List[dict]:
"""Filter entries below quality threshold."""
filtered = []
for entry in entries:
if score_entry(entry) >= threshold:
filtered.append(entry)
return filtered
def quality_report(entries: List[dict]) -> str:
"""Generate quality distribution report."""
if not entries:
return "No entries to analyze."
scores = [score_entry(e) for e in entries]
avg = sum(scores) / len(scores)
min_score = min(scores)
max_score = max(scores)
# Distribution buckets
buckets = {"high": 0, "medium": 0, "low": 0, "rejected": 0}
for s in scores:
if s >= 0.7:
buckets["high"] += 1
elif s >= 0.5:
buckets["medium"] += 1
elif s >= 0.3:
buckets["low"] += 1
else:
buckets["rejected"] += 1
lines = [
"=" * 50,
" QUALITY GATE REPORT",
"=" * 50,
f" Total entries: {len(entries)}",
f" Average score: {avg:.3f}",
f" Min: {min_score:.3f}",
f" Max: {max_score:.3f}",
"",
" Distribution:",
]
for bucket, count in buckets.items():
pct = count / len(entries) * 100
bar = "" * int(pct / 5)
lines.append(f" {bucket:<12} {count:>5} ({pct:>5.1f}%) {bar}")
passed = buckets["high"] + buckets["medium"]
lines.append(f"\n Pass rate (>= 0.5): {passed}/{len(entries)} ({passed/len(entries)*100:.1f}%)")
lines.append("=" * 50)
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Knowledge quality gate")
parser.add_argument("files", nargs="+", help="JSONL files to score")
parser.add_argument("--threshold", type=float, default=0.5, help="Quality threshold")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--filter", action="store_true", help="Filter and write back")
args = parser.parse_args()
all_entries = []
for filepath in args.files:
with open(filepath) as f:
for line in f:
if line.strip():
all_entries.append(json.loads(line))
if args.json:
results = [{"entry": e, **score_entry_detailed(e)} for e in all_entries]
print(json.dumps(results, indent=2))
elif args.filter:
filtered = filter_entries(all_entries, args.threshold)
print(f"Kept {len(filtered)}/{len(all_entries)} entries (threshold: {args.threshold})")
else:
print(quality_report(all_entries))
if __name__ == "__main__":
main()

View File

@@ -1,317 +0,0 @@
#!/usr/bin/env python3
"""
dedup.py — Knowledge deduplication: content hash + semantic similarity.
Deduplicates harvested knowledge entries to avoid training on duplicates.
Uses content hashing for exact matches and token overlap for near-duplicates.
Usage:
python3 dedup.py --input knowledge/index.json --output knowledge/index_deduped.json
python3 dedup.py --input knowledge/index.json --dry-run
python3 dedup.py --test # Run built-in dedup test
"""
import argparse
import hashlib
import json
import re
import sys
from pathlib import Path
from typing import List, Dict, Optional, Tuple
def normalize_text(text: str) -> str:
"""Normalize text for hashing: lowercase, collapse whitespace, strip."""
text = text.lower().strip()
text = re.sub(r'\s+', ' ', text)
return text
def content_hash(text: str) -> str:
"""SHA256 hash of normalized text for exact dedup."""
normalized = normalize_text(text)
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
def tokenize(text: str) -> set:
"""Simple tokenizer: lowercase words, 3+ chars."""
words = re.findall(r'[a-z0-9_]{3,}', text.lower())
return set(words)
def token_similarity(a: str, b: str) -> float:
"""Token-based Jaccard similarity (0.0-1.0).
Fast local alternative to embedding similarity.
Good enough for near-duplicate detection.
"""
tokens_a = tokenize(a)
tokens_b = tokenize(b)
if not tokens_a or not tokens_b:
return 0.0
intersection = tokens_a & tokens_b
union = tokens_a | tokens_b
return len(intersection) / len(union)
def quality_score(fact: dict) -> float:
"""Compute quality score for merge ranking.
Higher is better. Factors:
- confidence (0-1)
- source_count (more confirmations = better)
- has tags (richer metadata)
"""
confidence = fact.get('confidence', 0.5)
source_count = fact.get('source_count', 1)
has_tags = 1.0 if fact.get('tags') else 0.0
has_related = 1.0 if fact.get('related') else 0.0
# Weighted composite
score = (
confidence * 0.5 +
min(source_count / 10, 1.0) * 0.3 +
has_tags * 0.1 +
has_related * 0.1
)
return round(score, 4)
def merge_facts(keep: dict, drop: dict) -> dict:
"""Merge two near-duplicate facts, keeping higher-quality fields.
The 'keep' fact is enriched with metadata from 'drop'.
"""
# Merge tags (union)
keep_tags = set(keep.get('tags', []))
drop_tags = set(drop.get('tags', []))
keep['tags'] = sorted(keep_tags | drop_tags)
# Merge related (union)
keep_related = set(keep.get('related', []))
drop_related = set(drop.get('related', []))
keep['related'] = sorted(keep_related | drop_related)
# Update source_count (sum)
keep['source_count'] = keep.get('source_count', 1) + drop.get('source_count', 1)
# Update confidence (max — we've now seen it from multiple sources)
keep['confidence'] = max(keep.get('confidence', 0), drop.get('confidence', 0))
# Track that we merged
if '_merged_from' not in keep:
keep['_merged_from'] = []
keep['_merged_from'].append(drop.get('id', 'unknown'))
return keep
def dedup_facts(
facts: List[dict],
exact_threshold: float = 1.0,
near_threshold: float = 0.95,
dry_run: bool = False,
) -> Tuple[List[dict], dict]:
"""Deduplicate a list of knowledge facts.
Args:
facts: List of fact dicts (from index.json)
exact_threshold: Hash match = exact duplicate
near_threshold: Token similarity above this = near-duplicate
dry_run: If True, don't modify, just report
Returns:
(deduped_facts, stats_dict)
"""
if not facts:
return [], {"total": 0, "exact_dupes": 0, "near_dupes": 0, "unique": 0}
# Phase 1: Exact dedup by content hash
hash_seen = {} # hash -> index in deduped list
exact_dupes = 0
deduped = []
for fact in facts:
text = fact.get('fact', '')
h = content_hash(text)
if h in hash_seen:
# Exact duplicate — merge metadata into existing
existing_idx = hash_seen[h]
if not dry_run:
deduped[existing_idx] = merge_facts(deduped[existing_idx], fact)
exact_dupes += 1
else:
hash_seen[h] = len(deduped)
deduped.append(fact)
# Phase 2: Near-dup by token similarity
near_dupes = 0
i = 0
while i < len(deduped):
j = i + 1
while j < len(deduped):
sim = token_similarity(deduped[i].get('fact', ''), deduped[j].get('fact', ''))
if sim >= near_threshold:
# Near-duplicate — keep higher quality
q_i = quality_score(deduped[i])
q_j = quality_score(deduped[j])
if q_i >= q_j:
if not dry_run:
deduped[i] = merge_facts(deduped[i], deduped[j])
deduped.pop(j)
else:
# j is higher quality — merge i into j, then remove i
if not dry_run:
deduped[j] = merge_facts(deduped[j], deduped[i])
deduped.pop(i)
break # i changed, restart inner loop
near_dupes += 1
else:
j += 1
i += 1
stats = {
"total": len(facts),
"exact_dupes": exact_dupes,
"near_dupes": near_dupes,
"unique": len(deduped),
"removed": len(facts) - len(deduped),
}
return deduped, stats
def dedup_index_file(
input_path: str,
output_path: Optional[str] = None,
near_threshold: float = 0.95,
dry_run: bool = False,
) -> dict:
"""Deduplicate an index.json file.
Args:
input_path: Path to index.json
output_path: Where to write deduped file (default: overwrite input)
near_threshold: Token similarity threshold for near-dupes
dry_run: Report only, don't write
Returns stats dict.
"""
path = Path(input_path)
if not path.exists():
raise FileNotFoundError(f"Index file not found: {input_path}")
with open(path) as f:
data = json.load(f)
facts = data.get('facts', [])
deduped, stats = dedup_facts(facts, near_threshold=near_threshold, dry_run=dry_run)
if not dry_run:
data['facts'] = deduped
data['total_facts'] = len(deduped)
data['last_dedup'] = __import__('datetime').datetime.now(
__import__('datetime').timezone.utc
).isoformat()
out_path = Path(output_path) if output_path else path
with open(out_path, 'w') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return stats
def generate_test_duplicates(n: int = 20) -> List[dict]:
"""Generate test facts with intentional duplicates for testing.
Creates n unique facts plus n/4 exact dupes and n/4 near-dupes.
"""
import random
random.seed(42)
unique_facts = []
for i in range(n):
topic = random.choice(["git", "python", "docker", "rust", "nginx"])
tip = random.choice(["use verbose flags", "check logs first", "restart service", "clear cache", "update config"])
unique_facts.append({
"id": f"test:fact:{i:03d}",
"fact": f"When working with {topic}, always {tip} before deploying.",
"category": "fact",
"domain": "test",
"confidence": round(random.uniform(0.5, 1.0), 2),
"source_count": random.randint(1, 5),
"tags": [topic, "test"],
})
# Add exact duplicates (same text, different IDs)
duped = list(unique_facts)
for i in range(n // 4):
original = unique_facts[i]
dupe = dict(original)
dupe["id"] = f"test:fact:dup{i:03d}"
dupe["confidence"] = round(random.uniform(0.3, 0.8), 2)
duped.append(dupe)
# Add near-duplicates (slightly different phrasing)
for i in range(n // 4):
original = unique_facts[i]
near = dict(original)
near["id"] = f"test:fact:near{i:03d}"
near["fact"] = original["fact"].replace("always", "should").replace("before deploying", "prior to deployment")
near["confidence"] = round(random.uniform(0.4, 0.9), 2)
duped.append(near)
return duped
def main():
parser = argparse.ArgumentParser(description="Knowledge deduplication")
parser.add_argument("--input", help="Path to index.json")
parser.add_argument("--output", help="Output path (default: overwrite input)")
parser.add_argument("--threshold", type=float, default=0.95,
help="Near-dup similarity threshold (default: 0.95)")
parser.add_argument("--dry-run", action="store_true", help="Report only, don't write")
parser.add_argument("--test", action="store_true", help="Run built-in dedup test")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
if args.test:
test_facts = generate_test_duplicates(20)
print(f"Generated {len(test_facts)} test facts (20 unique + dupes)")
deduped, stats = dedup_facts(test_facts, near_threshold=args.threshold)
print(f"\nDedup results:")
print(f" Total input: {stats['total']}")
print(f" Exact dupes: {stats['exact_dupes']}")
print(f" Near dupes: {stats['near_dupes']}")
print(f" Unique output: {stats['unique']}")
print(f" Removed: {stats['removed']}")
# Verify: should have ~20 unique (some merged)
assert stats['unique'] <= 20, f"Too many unique: {stats['unique']} > 20"
assert stats['unique'] >= 15, f"Too few unique: {stats['unique']} < 15"
assert stats['removed'] > 0, "No duplicates removed"
print("\nOK: Dedup test passed")
return
if not args.input:
print("ERROR: Provide --input or --test")
sys.exit(1)
stats = dedup_index_file(args.input, args.output, args.threshold, args.dry_run)
if args.json:
print(json.dumps(stats, indent=2))
else:
print(f"Dedup results:")
print(f" Total input: {stats['total']}")
print(f" Exact dupes: {stats['exact_dupes']}")
print(f" Near dupes: {stats['near_dupes']}")
print(f" Unique output: {stats['unique']}")
print(f" Removed: {stats['removed']}")
if args.dry_run:
print(" (dry run — no changes written)")
if __name__ == "__main__":
main()

View File

@@ -149,8 +149,8 @@ def to_dot(graph: dict) -> str:
"""Generate DOT format output."""
lines = ["digraph dependencies {"]
lines.append(" rankdir=LR;")
lines.append(' node [shape=box, style=filled, fillcolor="#1a1a2e", fontcolor="#e6edf3"];')
lines.append(' edge [color="#4a4a6a"];')
lines.append(" node [shape=box, style=filled, fillcolor="#1a1a2e", fontcolor="#e6edf3"];")
lines.append(" edge [color="#4a4a6a"];")
lines.append("")
for repo, data in sorted(graph.items()):

View File

@@ -1,308 +0,0 @@
#!/usr/bin/env python3
"""
Dependency Inventory — Scan repos and list third-party dependencies.
Reads: package.json, requirements.txt, go.mod, Cargo.toml, pyproject.toml
Extracts: package name, version constraint, source file/repo
Outputs: JSON (default) or markdown table
Usage:
python3 scripts/dependency_inventory.py --repos-dir ~/repos/
python3 scripts/dependency_inventory.py --repos ~/repo1,~/repo2 --format markdown
"""
import argparse
import json
import os
import re
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
# Mapping of file pattern to canonical parser name
MANIFEST_PATTERNS = {
'requirements.txt': 'requirements',
'package.json': 'npm',
'pyproject.toml': 'pyproject',
'go.mod': 'go',
'Cargo.toml': 'cargo',
}
# Parser registry
PARSERS = {}
def register_parser(name: str):
"""Decorator to register a parser function."""
def decorator(fn):
PARSERS[name] = fn
return fn
return decorator
# ─── Parsers ────────────────────────────────────────────────────────────────
@register_parser('requirements')
def parse_requirements(content: str) -> List[Dict[str, str]]:
"""Parse requirements.txt — one requirement per line."""
deps = []
for line in content.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
pkg_spec = re.split(r'[ ;#]', line)[0].strip()
if '>=' in pkg_spec:
name, ver = pkg_spec.split('>=', 1)
elif '==' in pkg_spec:
name, ver = pkg_spec.split('==', 1)
elif '<=' in pkg_spec:
name, ver = pkg_spec.split('<=', 1)
elif '~=' in pkg_spec:
name, ver = pkg_spec.split('~=', 1)
elif '>' in pkg_spec:
name, ver = pkg_spec.split('>', 1)
elif '<' in pkg_spec:
name, ver = pkg_spec.split('<', 1)
elif '=' in pkg_spec:
name, ver = pkg_spec.split('=', 1)
else:
name, ver = pkg_spec, ''
deps.append({
'package': name.strip(),
'version': ver.strip(),
'constraint': line[len(name):].strip()
})
return deps
@register_parser('npm')
def parse_package_json(content: str) -> List[Dict[str, str]]:
"""Parse package.json dependencies."""
try:
data = json.loads(content)
except json.JSONDecodeError:
return []
deps = []
for section in ('dependencies', 'devDependencies', 'peerDependencies', 'optionalDependencies'):
for name, ver in data.get(section, {}).items():
deps.append({
'package': name,
'version': ver,
'constraint': ver,
'type': section
})
return deps
@register_parser('pyproject')
def parse_pyproject_toml(content: str) -> List[Dict[str, str]]:
"""Parse pyproject.toml [project] dependencies."""
deps = []
in_deps = False
dep_buffer = ''
for line in content.splitlines():
stripped = line.strip()
if stripped.startswith('dependencies = ['):
in_deps = True
remainder = stripped.split('=', 1)[1].strip()
dep_buffer = remainder[1:] if remainder.startswith('[') else remainder
continue
if in_deps:
if stripped.startswith(']'):
in_deps = False
continue
dep_buffer += ' ' + line
dep_buffer = dep_buffer.strip().rstrip(',')
for match in re.finditer(r'"([^"]+)"', dep_buffer):
spec = match.group(1)
m = re.match(r'^([a-zA-Z0-9_.-]+)\s*([<>=!~]+)?\s*(.*)$', spec)
if m:
name, op, ver = m.groups()
deps.append({
'package': name,
'version': (ver or '').strip(),
'constraint': spec
})
return deps
@register_parser('go')
def parse_go_mod(content: str) -> List[Dict[str, str]]:
"""Parse go.mod — require statements."""
deps = []
for line in content.splitlines():
line = line.strip()
if line.startswith('require ') and not line.startswith('require ('):
parts = line.split()
if len(parts) >= 3:
mod, ver = parts[1], parts[2]
deps.append({'package': mod, 'version': ver, 'constraint': ver})
elif line.startswith('\t') and '/' in line:
parts = line.strip().split()
if len(parts) >= 2:
mod, ver = parts[0], parts[1]
deps.append({'package': mod, 'version': ver, 'constraint': ver})
return deps
@register_parser('cargo')
def parse_cargo_toml(content: str) -> List[Dict[str, str]]:
"""Parse [dependencies] section from Cargo.toml."""
deps = []
in_deps = False
for line in content.splitlines():
stripped = line.strip()
if stripped in ('[dependencies]', '[dependencies]'):
in_deps = True
continue
if stripped.startswith('['):
in_deps = False
continue
if in_deps and '=' in stripped:
name_part, ver_part = stripped.split('=', 1)
name = name_part.strip()
ver = ver_part.strip().strip('"').strip("'")
deps.append({'package': name, 'version': ver, 'constraint': ver})
return deps
# ─── File Discovery ─────────────────────────────────────────────────────────
def find_manifest_files(root: Path) -> Dict[str, List[Path]]:
"""Find all manifest files under root."""
found = {k: [] for k in MANIFEST_PATTERNS}
for pattern in MANIFEST_PATTERNS:
for path in root.rglob(pattern):
if not any(skip in str(path) for skip in ('.git', 'node_modules', '__pycache__', '.venv', 'venv')):
found[pattern].append(path)
return found
# ─── Main Scanner ────────────────────────────────────────────────────────────
def scan_repo(repo_path: Path) -> Dict[str, Any]:
"""Scan a single repo directory for dependency manifests."""
repo_name = repo_path.name
found = find_manifest_files(repo_path)
all_deps: List[Dict[str, str]] = []
files_scanned = 0
for pattern, paths in found.items():
parser_name = MANIFEST_PATTERNS[pattern]
# Map parser_name to function
if parser_name == 'requirements':
parser = parse_requirements
elif parser_name == 'npm':
parser = parse_package_json
elif parser_name == 'pyproject':
parser = parse_pyproject_toml
elif parser_name == 'go':
parser = parse_go_mod
elif parser_name == 'cargo':
parser = parse_cargo_toml
else:
continue
for fp in paths:
try:
content = fp.read_text(encoding='utf-8', errors='replace')
files_scanned += 1
rel = fp.relative_to(repo_path)
for dep in parser(content):
dep['source'] = pattern
dep['file'] = str(rel)
dep['repo'] = repo_name
all_deps.append(dep)
except Exception as e:
print(f" [WARN] Could not parse {fp}: {e}", file=sys.stderr)
return {
'repo': repo_name,
'path': str(repo_path),
'files_scanned': files_scanned,
'dependencies': all_deps,
'dependency_count': len(all_deps),
}
def scan_repos(repos: List[Path]) -> Dict[str, Any]:
"""Scan multiple repos and aggregate."""
results = {}
total_deps = 0
total_files = 0
for repo in repos:
if not repo.is_dir():
print(f"[WARN] Skipping {repo}: not a directory", file=sys.stderr)
continue
print(f"Scanning {repo.name}...", file=sys.stderr)
result = scan_repo(repo)
results[repo.name] = result
total_deps += result['dependency_count']
total_files += result['files_scanned']
return {
'repos': results,
'summary': {
'total_repos': len(results),
'total_files_scanned': total_files,
'total_dependencies': total_deps,
}
}
# ─── Output ─────────────────────────────────────────────────────────────────
def output_json(data: Dict[str, Any], out_path: Optional[Path] = None) -> None:
text = json.dumps(data, indent=2)
if out_path:
out_path.write_text(text)
print(f"Written: {out_path}", file=sys.stderr)
else:
print(text)
def output_markdown(data: Dict[str, Any], out_path: Optional[Path] = None) -> None:
lines = []
lines.append("# Dependency Inventory")
lines.append("\nGenerated: *(TODO: add timestamp)*")
lines.append(f"\n**Summary:** {data['summary']['total_dependencies']} dependencies across {data['summary']['total_repos']} repos")
lines.append("")
lines.append("| Repo | File | Package | Version |")
lines.append("|------|------|---------|---------|")
for repo_name, rdata in sorted(data['repos'].items()):
for dep in sorted(rdata['dependencies'], key=lambda d: d['package']):
lines.append(f"| {repo_name} | {dep['file']} | {dep['package']} | {dep['version']} |")
text = '\n'.join(lines) + '\n'
if out_path:
out_path.write_text(text)
print(f"Written: {out_path}", file=sys.stderr)
else:
print(text)
# ─── CLI Entry ────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Generate org-wide dependency inventory")
parser.add_argument('--repos-dir', help='Directory containing multiple repos')
parser.add_argument('--repos', help='Comma-separated list of repo paths')
parser.add_argument('--output', '-o', help='Output file (default: stdout)')
parser.add_argument('--format', choices=['json', 'markdown'], default='json',
help='Output format (default: json)')
args = parser.parse_args()
if args.repos:
repo_paths = [Path(p.strip()).expanduser() for p in args.repos.split(',')]
elif args.repos_dir:
base = Path(args.repos_dir).expanduser()
repo_paths = [p for p in base.iterdir() if p.is_dir() and not p.name.startswith('.')]
else:
repo_paths = [Path(__file__).resolve().parent.parent]
out_path = Path(args.output).expanduser() if args.output else None
data = scan_repos(repo_paths)
if args.format == 'json':
output_json(data, out_path)
else:
output_markdown(data, out_path)
if __name__ == '__main__':
main()

View File

@@ -1,387 +0,0 @@
#!/usr/bin/env python3
"""
Knowledge Freshness Cron — Detect stale entries from code changes (Issue #200)
Automatically detects when knowledge entries become stale due to code changes.
Detection Method:
1. Track source file hash alongside knowledge entry
2. Compare current file hashes vs stored
3. Mismatch → flag entry as potentially stale
4. Report stale entries and optionally re-extract
Usage:
python3 scripts/freshness.py --knowledge-dir knowledge/
python3 scripts/freshness.py --knowledge-dir knowledge/ --json
python3 scripts/freshness.py --knowledge-dir knowledge/ --repo /path/to/repo
python3 scripts/freshness.py --knowledge-dir knowledge/ --auto-reextract
"""
import argparse
import hashlib
import json
import os
import subprocess
import sys
import yaml
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
try:
with open(filepath, "rb") as f:
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return None
def get_git_file_changes(repo_path: str, days: int = 1) -> Dict[str, List[str]]:
"""
Get files changed in git in the last N days.
Returns dict with 'modified', 'added', 'deleted' lists of file paths.
"""
changes = {"modified": [], "added": [], "deleted": []}
try:
# Get commits from last N days
cmd = [
"git", "-C", repo_path, "log",
f"--since={days} days ago",
"--name-status",
"--pretty=format:",
"--diff-filter=MAD"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return changes
for line in result.stdout.splitlines():
line = line.strip()
if not line:
continue
parts = line.split('\t', 1)
if len(parts) != 2:
continue
status, filepath = parts
if status == 'M':
changes["modified"].append(filepath)
elif status == 'A':
changes["added"].append(filepath)
elif status == 'D':
changes["deleted"].append(filepath)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# Deduplicate
for key in changes:
changes[key] = list(set(changes[key]))
return changes
def load_knowledge_entries(knowledge_dir: str) -> List[Dict[str, Any]]:
"""
Load knowledge entries from YAML files in the knowledge directory.
Supports:
- knowledge/index.json (legacy format)
- knowledge/global/*.yaml
- knowledge/repos/*.yaml
- knowledge/agents/*.yaml
"""
entries = []
# Load from index.json if exists
index_path = os.path.join(knowledge_dir, "index.json")
if os.path.exists(index_path):
try:
with open(index_path) as f:
data = json.load(f)
for fact in data.get("facts", []):
entries.append({
"source": "index.json",
"fact": fact.get("fact", ""),
"source_file": fact.get("source_file"),
"source_hash": fact.get("source_hash"),
"category": fact.get("category", "unknown"),
"confidence": fact.get("confidence", 0.5)
})
except (json.JSONDecodeError, KeyError):
pass
# Load from YAML files
for subdir in ["global", "repos", "agents"]:
subdir_path = os.path.join(knowledge_dir, subdir)
if not os.path.isdir(subdir_path):
continue
for filename in os.listdir(subdir_path):
if not filename.endswith((".yaml", ".yml")):
continue
filepath = os.path.join(subdir_path, filename)
try:
with open(filepath) as f:
data = yaml.safe_load(f)
if not data or not isinstance(data, dict):
continue
# Extract entries from YAML structure
for key, value in data.items():
if isinstance(value, list):
for item in value:
if isinstance(item, dict):
entries.append({
"source": f"{subdir}/{filename}",
"fact": item.get("description", item.get("fact", "")),
"source_file": item.get("source_file"),
"source_hash": item.get("source_hash"),
"category": item.get("category", "unknown"),
"confidence": item.get("confidence", 0.5)
})
elif isinstance(value, dict):
entries.append({
"source": f"{subdir}/{filename}",
"fact": value.get("description", value.get("fact", "")),
"source_file": value.get("source_file"),
"source_hash": value.get("source_hash"),
"category": value.get("category", "unknown"),
"confidence": value.get("confidence", 0.5)
})
except (yaml.YAMLError, IOError):
pass
return entries
def check_freshness(knowledge_dir: str, repo_root: str = ".",
days: int = 1) -> Dict[str, Any]:
"""
Check freshness of knowledge entries against recent code changes.
Returns:
{
"timestamp": ISO timestamp,
"total_entries": int,
"stale_entries": [...],
"fresh_entries": [...],
"git_changes": {...},
"summary": {...}
}
"""
entries = load_knowledge_entries(knowledge_dir)
git_changes = get_git_file_changes(repo_root, days)
stale_entries = []
fresh_entries = []
for entry in entries:
source_file = entry.get("source_file")
if not source_file:
# Entry without source file reference
fresh_entries.append({**entry, "status": "no_source"})
continue
# Check if source file was recently modified
is_stale = False
reason = ""
if source_file in git_changes["modified"]:
is_stale = True
reason = "source_modified"
elif source_file in git_changes["deleted"]:
is_stale = True
reason = "source_deleted"
elif source_file in git_changes["added"]:
is_stale = True
reason = "source_added"
# Also check hash if available
stored_hash = entry.get("source_hash")
if stored_hash:
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash is None:
is_stale = True
reason = "source_missing"
elif current_hash != stored_hash:
is_stale = True
reason = "hash_mismatch"
if is_stale:
stale_entries.append({
**entry,
"status": "stale",
"reason": reason
})
else:
fresh_entries.append({**entry, "status": "fresh"})
# Compute summary
total = len(entries)
stale_count = len(stale_entries)
fresh_count = len(fresh_entries)
# Group stale entries by reason
stale_by_reason = {}
for entry in stale_entries:
reason = entry.get("reason", "unknown")
if reason not in stale_by_reason:
stale_by_reason[reason] = 0
stale_by_reason[reason] += 1
return {
"timestamp": datetime.now(timezone.utc).isoformat(),
"total_entries": total,
"stale_entries": stale_entries,
"fresh_entries": fresh_entries,
"git_changes": git_changes,
"summary": {
"total": total,
"stale": stale_count,
"fresh": fresh_count,
"stale_percentage": round(stale_count / total * 100, 1) if total > 0 else 0,
"stale_by_reason": stale_by_reason,
"git_changes_summary": {
"modified": len(git_changes["modified"]),
"added": len(git_changes["added"]),
"deleted": len(git_changes["deleted"])
}
}
}
def update_stale_hashes(knowledge_dir: str, repo_root: str = ".") -> int:
"""
Update hashes for stale entries. Returns count of updated entries.
"""
entries = load_knowledge_entries(knowledge_dir)
updated = 0
# This is a simplified version - in practice, you'd need to
# write back to the specific YAML files
for entry in entries:
source_file = entry.get("source_file")
if not source_file:
continue
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash and entry.get("source_hash") != current_hash:
# Mark for update (in practice, you'd write back to the file)
updated += 1
return updated
def format_report(result: Dict[str, Any], max_items: int = 20) -> str:
"""Format freshness check results as a human-readable report."""
timestamp = result["timestamp"]
summary = result["summary"]
stale_entries = result["stale_entries"]
git_changes = result["git_changes"]
lines = [
"Knowledge Freshness Report",
"=" * 50,
f"Generated: {timestamp}",
f"Total entries: {summary['total']}",
f"Stale entries: {summary['stale']} ({summary['stale_percentage']}%)",
f"Fresh entries: {summary['fresh']}",
""
]
# Git changes summary
lines.extend([
"Git Changes (last 24h):",
f" Modified: {len(git_changes['modified'])} files",
f" Added: {len(git_changes['added'])} files",
f" Deleted: {len(git_changes['deleted'])} files",
""
])
# Stale entries by reason
if summary.get("stale_by_reason"):
lines.extend([
"Stale Entries by Reason:",
""
])
for reason, count in summary["stale_by_reason"].items():
lines.append(f" {reason}: {count}")
lines.append("")
# List stale entries
if stale_entries:
lines.extend([
"Stale Entries:",
""
])
for i, entry in enumerate(stale_entries[:max_items], 1):
source = entry.get("source_file", "?")
reason = entry.get("reason", "unknown")
fact = entry.get("fact", "")[:60]
lines.append(f"{i:2d}. [{reason}] {source}")
if fact:
lines.append(f" {fact}")
if len(stale_entries) > max_items:
lines.append(f"\n... and {len(stale_entries) - max_items} more")
else:
lines.append("No stale entries found. All knowledge is fresh!")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Knowledge Freshness Cron — detect stale entries from code changes")
parser.add_argument("--knowledge-dir", required=True,
help="Path to knowledge directory")
parser.add_argument("--repo", default=".",
help="Path to repository for git change detection")
parser.add_argument("--days", type=int, default=1,
help="Number of days to check for git changes (default: 1)")
parser.add_argument("--json", action="store_true",
help="Output as JSON instead of human-readable")
parser.add_argument("--max", type=int, default=20,
help="Maximum stale entries to show (default: 20)")
parser.add_argument("--auto-reextract", action="store_true",
help="Auto-re-extract knowledge for stale entries")
args = parser.parse_args()
if not os.path.isdir(args.knowledge_dir):
print(f"Error: {args.knowledge_dir} is not a directory", file=sys.stderr)
sys.exit(1)
if not os.path.isdir(args.repo):
print(f"Error: {args.repo} is not a directory", file=sys.stderr)
sys.exit(1)
result = check_freshness(args.knowledge_dir, args.repo, args.days)
if args.json:
print(json.dumps(result, indent=2))
else:
print(format_report(result, args.max))
# Auto-re-extract if requested
if args.auto_reextract and result["stale_entries"]:
print(f"\nAuto-re-extracting {len(result['stale_entries'])} stale entries...")
# In a real implementation, this would call the harvester
print("(Auto-re-extraction not yet implemented)")
if __name__ == "__main__":
main()

View File

@@ -113,7 +113,7 @@ def find_slow_tests_by_scan(repo_path: str) -> List[Bottleneck]:
(r"time\.sleep\((\d+(?:\.\d+)?)\)", "Contains time.sleep() — consider using mock or async wait"),
(r"subprocess\.run\(.*timeout=(\d+)", "Subprocess with timeout — may block test"),
(r"requests\.(get|post|put|delete)\(", "Real HTTP call — mock with responses or httpretty"),
(r"open\\([^)]*)[\x27\x22]w[\x27\x22]", "File I/O in test — use tmp_path fixture"),
(r"open\([^)]*['"]w['"]", "File I/O in test — use tmp_path fixture"),
]
for root, dirs, files in os.walk(repo_path):
@@ -506,8 +506,8 @@ def format_markdown(report: PerfReport) -> str:
lines.append(f"- {icon} {b.name}{loc} — ~{b.duration_s:.1f}s — {b.recommendation}")
lines.append(f"")
return "\n".join(lines)
return "
".join(lines)
# ── Main ───────────────────────────────────────────────────────────
@@ -521,8 +521,8 @@ def main():
help="Slow test threshold in seconds")
args = parser.parse_args()
# Threshold override handled via module-level default
# (scan_tests uses SLOW_TEST_THRESHOLD_S from module scope)
global SLOW_TEST_THRESHOLD_S
SLOW_TEST_THRESHOLD_S = args.threshold
if not os.path.isdir(args.repo):
print(f"Error: {args.repo} is not a directory", file=sys.stderr)

View File

@@ -1,12 +1,6 @@
#!/usr/bin/env python3
"""
Finds refactoring opportunities in codebases
Engine ID: 10.4
Usage:
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json --dry-run
Refactoring Opportunity Finder — Engine 10.4
"""
import argparse
@@ -14,277 +8,137 @@ import ast
import json
import os
import sys
from dataclasses import dataclass, field
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Optional, Tuple
from typing import List, Optional, Tuple
# ── Data Classes ────────────────────────────────────────────────────────
@dataclass
class FileMetrics:
"""Metrics for a single source file."""
path: str
lines: int = 0
complexity: float = 0.0
max_complexity: int = 0
functions: int = 0
classes: int = 0
lines: int
complexity: float
max_complexity: int
functions: int
classes: int
churn_30d: int = 0
churn_90d: int = 0
test_coverage: Optional[float] = None
refactoring_score: float = 0.0
# ── Complexity Analysis ─────────────────────────────────────────────────
class ComplexityVisitor(ast.NodeVisitor):
"""AST visitor that computes cyclomatic complexity per function."""
def __init__(self):
self.complexities = []
self.function_count = 0
self.class_count = 0
self._current_complexity = 0
self._in_function = False
self.functions = []
self.classes = 0
def visit_FunctionDef(self, node):
self.function_count += 1
old_complexity = self._current_complexity
old_in_function = self._in_function
self._current_complexity = 1 # Base complexity
self._in_function = True
complexity = 1
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For)):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.Assert):
complexity += 1
self.functions.append((node.name, complexity, node.lineno))
self.generic_visit(node)
self.complexities.append(self._current_complexity)
self._current_complexity = old_complexity
self._in_function = old_in_function
visit_AsyncFunctionDef = visit_FunctionDef
def visit_AsyncFunctionDef(self, node):
self.visit_FunctionDef(node)
def visit_ClassDef(self, node):
self.class_count += 1
self.generic_visit(node)
def visit_If(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def visit_For(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
visit_AsyncFor = visit_For
def visit_While(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def visit_ExceptHandler(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def visit_With(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
visit_AsyncWith = visit_With
def visit_Assert(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def visit_BoolOp(self, node):
# Each 'and'/'or' adds a branch
if self._in_function:
self._current_complexity += len(node.values) - 1
self.generic_visit(node)
def visit_IfExp(self, node):
# Ternary expression
if self._in_function:
self._current_complexity += 1
self.classes += 1
self.generic_visit(node)
def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]:
"""
Compute cyclomatic complexity for a Python file.
Returns:
(avg_complexity, max_complexity, function_count, class_count, line_count)
"""
def compute_file_complexity(filepath):
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
with open(filepath) as f:
source = f.read()
except (IOError, OSError):
return 0.0, 0, 0, 0, 0
except (FileNotFoundError, IsADirectoryError, PermissionError):
return (0.0, 0, 0, 0, 0)
try:
tree = ast.parse(source, filename=filepath)
tree = ast.parse(source)
except SyntaxError:
return 0.0, 0, 0, 0, 0
return (0.0, 0, 0, 0, 0)
lines = source.count("\n") + 1 if source.strip() else 0
visitor = ComplexityVisitor()
visitor.visit(tree)
line_count = len(source.splitlines())
if not visitor.complexities:
# No functions, but might have classes
return 0.0, 0, visitor.function_count, visitor.class_count, line_count
avg = sum(visitor.complexities) / len(visitor.complexities)
max_c = max(visitor.complexities)
return avg, max_c, visitor.function_count, visitor.class_count, line_count
if not visitor.functions:
return (0.0, 0, 0, visitor.classes, lines)
complexities = [c for _, c, _ in visitor.functions]
avg = sum(complexities) / len(complexities)
max_c = max(complexities)
return (round(avg, 1), max_c, len(visitor.functions), visitor.classes, lines)
# ── Refactoring Score ───────────────────────────────────────────────────
def calculate_refactoring_score(metrics: FileMetrics) -> float:
"""
Calculate a refactoring priority score (0-100) based on file metrics.
Higher score = higher priority for refactoring.
Components:
- Complexity (0-30 points): higher avg/max complexity = higher score
- Size (0-20 points): larger files = higher score
- Churn (0-30 points): more changes recently = higher score
- Coverage (0-20 points): lower test coverage = higher score
"""
def calculate_refactoring_score(metrics):
score = 0.0
# Complexity component (0-30)
# avg=10+ or max=20+ → 30 points
complexity_score = min(30.0, (metrics.complexity * 2) + (metrics.max_complexity * 0.5))
score += max(0.0, complexity_score)
# Size component (0-20)
# 500+ lines → 20 points
size_score = min(20.0, metrics.lines / 25.0)
score += max(0.0, size_score)
# Churn component (0-30)
# Weighted: recent churn (30d) counts more than older (90d)
churn_score = min(30.0, (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5))
score += max(0.0, churn_score)
# Coverage component (0-20)
# Lower coverage → higher score
if metrics.test_coverage is not None:
# coverage=0 → 20 points, coverage=1 → 0 points
coverage_score = (1.0 - metrics.test_coverage) * 20.0
complexity_score = min(40, metrics.complexity * 4)
if metrics.max_complexity > 10:
complexity_score = min(40, complexity_score + (metrics.max_complexity - 10))
score += complexity_score
if metrics.lines <= 0:
pass
elif metrics.lines <= 100:
score += metrics.lines * 0.1
elif metrics.lines <= 500:
score += 10 + (metrics.lines - 100) * 0.0125
else:
# No data → assume medium risk (10 points)
coverage_score = 10.0
score += max(0.0, coverage_score)
return min(100.0, max(0.0, score))
score += min(20, 15 + (metrics.lines - 500) * 0.01)
churn_score = (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5)
score += min(30, churn_score)
if metrics.test_coverage is None:
score += 5
elif metrics.test_coverage < 0.3:
score += 10
elif metrics.test_coverage < 0.6:
score += 7
elif metrics.test_coverage < 0.8:
score += 4
else:
score += 1
return round(min(100, max(0, score)), 1)
# ── Proposal Generation ─────────────────────────────────────────────────
def scan_directory(directory: str, extensions: tuple = ('.py',)) -> list:
"""Scan directory for source files."""
files = []
for root, dirs, filenames in os.walk(directory):
# Skip hidden dirs and common non-source dirs
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in (
'__pycache__', 'node_modules', 'venv', '.venv', 'env',
'build', 'dist', '.git', '.tox'
)]
for fname in filenames:
if any(fname.endswith(ext) for ext in extensions):
files.append(os.path.join(root, fname))
return files
def generate_proposals(directory: str = '.', min_score: float = 30.0) -> list:
"""Generate refactoring proposals by analyzing source files."""
def generate_proposals(repo_path=".", threshold=30.0):
proposals = []
files = scan_directory(directory)
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith((".", "__pycache__", "node_modules", ".git", "venv"))]
for fname in files:
if not fname.endswith(".py"):
continue
filepath = os.path.join(root, fname)
relpath = os.path.relpath(filepath, repo_path)
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
if funcs == 0 and classes == 0:
continue
metrics = FileMetrics(path=relpath, lines=lines, complexity=avg, max_complexity=max_c, functions=funcs, classes=classes)
score = calculate_refactoring_score(metrics)
metrics.refactoring_score = score
if score >= threshold:
proposals.append({"title": f"Refactor {relpath} (score: {score})", "impact": min(10, int(score / 10)), "effort": min(10, max(1, funcs // 3)), "category": "refactoring", "source_engine": "10.4", "timestamp": datetime.now(timezone.utc).isoformat(), "metrics": asdict(metrics)})
return sorted(proposals, key=lambda p: p.get("metrics", {}).get("refactoring_score", 0), reverse=True)
for filepath in files:
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
if funcs == 0 and classes == 0:
continue
metrics = FileMetrics(
path=filepath,
lines=lines,
complexity=avg,
max_complexity=max_c,
functions=funcs,
classes=classes
)
score = calculate_refactoring_score(metrics)
metrics.refactoring_score = score
if score >= min_score:
reasons = []
if max_c > 10:
reasons.append(f"high max complexity ({max_c})")
if avg > 5:
reasons.append(f"high avg complexity ({avg:.1f})")
if lines > 300:
reasons.append(f"large file ({lines} lines)")
proposals.append({
"title": f"Refactor {os.path.basename(filepath)} (score: {score:.0f})",
"description": f"{filepath}: {', '.join(reasons) if reasons else 'general improvement candidate'}",
"impact": min(10, int(score / 10)),
"effort": min(10, max(1, int(max_c / 3))),
"category": "refactoring",
"source_engine": "10.4",
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {
"path": filepath,
"score": round(score, 2),
"avg_complexity": round(avg, 2),
"max_complexity": max_c,
"lines": lines,
"functions": funcs,
"classes": classes
}
})
# Sort by score descending
proposals.sort(key=lambda p: p.get('metrics', {}).get('score', 0), reverse=True)
return proposals
# ── CLI ─────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases")
parser.add_argument("--output", required=True, help="Output file for proposals")
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
parser.add_argument("--directory", default=".", help="Directory to scan")
parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold")
parser = argparse.ArgumentParser(description="Find refactoring opportunities")
parser.add_argument("--repo", default=".")
parser.add_argument("--output", required=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--threshold", type=float, default=30.0)
args = parser.parse_args()
proposals = generate_proposals(args.directory, args.min_score)
proposals = generate_proposals(args.repo, args.threshold)
if not args.dry_run:
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
with open(args.output, "w") as f:
json.dump({"proposals": proposals}, f, indent=2)
print(f"Generated {len(proposals)} proposals -> {args.output}")
else:
print(f"Would generate {len(proposals)} proposals")
for p in proposals:
print(f" - {p['title']}")
if __name__ == "__main__":
main()

View File

@@ -1,72 +1,212 @@
#!/usr/bin/env python3
"""Comprehensive tests for knowledge extraction prompt."""
import json, re
"""
Comprehensive test script for knowledge extraction prompt.
Validates prompt structure, requirements, and consistency.
"""
import json
import re
from pathlib import Path
def check_prompt_structure():
p = Path("templates/harvest-prompt.md")
if not p.exists(): return False, "harvest-prompt.md not found"
c = p.read_text()
for s in ["System Prompt","Instructions","Categories","Output Format","Confidence Scoring","Constraints","Example"]:
if s.lower() not in c.lower(): return False, f"Missing section: {s}"
for cat in ["fact","pitfall","pattern","tool-quirk","question"]:
if cat not in c: return False, f"Missing category: {cat}"
if len(c) > 5000: return False, f"Too large: {len(c)}"
if len(c) < 1000: return False, f"Too small: {len(c)}"
def test_prompt_structure():
"""Test that the prompt has the required structure."""
prompt_path = Path("templates/harvest-prompt.md")
if not prompt_path.exists():
return False, "harvest-prompt.md not found"
content = prompt_path.read_text()
# Check for required sections
required_sections = [
"System Prompt",
"Instructions",
"Categories",
"Output Format",
"Confidence Scoring",
"Constraints",
"Example"
]
for section in required_sections:
if section.lower() not in content.lower():
return False, f"Missing required section: {section}"
# Check for required categories
required_categories = ["fact", "pitfall", "pattern", "tool-quirk", "question"]
for category in required_categories:
if category not in content:
return False, f"Missing required category: {category}"
# Check for required output fields
required_fields = ["fact", "category", "repo", "confidence"]
for field in required_fields:
if field not in content:
return False, f"Missing required output field: {field}"
# Check prompt size (should be ~1k tokens, roughly 4k chars)
if len(content) > 5000:
return False, f"Prompt too large: {len(content)} chars (max ~5000)"
if len(content) < 1000:
return False, f"Prompt too small: {len(content)} chars (min ~1000)"
return True, "Prompt structure is valid"
def check_confidence_scoring():
c = Path("templates/harvest-prompt.md").read_text()
for l in ["0.9-1.0","0.7-0.8","0.5-0.6","0.3-0.4","0.1-0.2"]:
if l not in c: return False, f"Missing level: {l}"
return True, "Confidence scoring defined"
def check_example_quality():
c = Path("templates/harvest-prompt.md").read_text()
if "example" not in c.lower(): return False, "No examples"
m = re.search(r'"knowledge"', c[c.lower().find("example"):])
if not m: return False, "No JSON example"
return True, "Examples present"
def check_constraint_coverage():
c = Path("templates/harvest-prompt.md").read_text()
for x in ["no hallucination","explicitly","partial","failed sessions"]:
if x not in c.lower(): return False, f"Missing: {x}"
return True, "Constraints covered"
def check_test_sessions():
d = Path("test_sessions")
if not d.exists(): return False, "test_sessions/ not found"
files = list(d.glob("*.jsonl"))
if len(files) < 5: return False, f"Only {len(files)} sessions"
for f in files:
for i, line in enumerate(f.read_text().strip().split("\n"), 1):
try: json.loads(line)
except json.JSONDecodeError as e: return False, f"{f.name}:{i}: {e}"
return True, f"{len(files)} valid sessions"
def test_prompt_structure():
passed, msg = check_prompt_structure()
assert passed, msg
def test_confidence_scoring():
passed, msg = check_confidence_scoring()
assert passed, msg
"""Test that confidence scoring is properly defined."""
prompt_path = Path("templates/harvest-prompt.md")
content = prompt_path.read_text()
# Check for confidence scale definitions
confidence_levels = [
("0.9-1.0", "explicitly stated"),
("0.7-0.8", "clearly implied"),
("0.5-0.6", "suggested"),
("0.3-0.4", "inferred"),
("0.1-0.2", "speculative")
]
for level, description in confidence_levels:
if level not in content:
return False, f"Missing confidence level: {level}"
if description.lower() not in content.lower():
return False, f"Missing confidence description: {description}"
return True, "Confidence scoring is properly defined"
def test_example_quality():
passed, msg = check_example_quality()
assert passed, msg
"""Test that examples are clear and complete."""
prompt_path = Path("templates/harvest-prompt.md")
content = prompt_path.read_text()
# Check for example input/output
if "example" not in content.lower():
return False, "No examples provided"
# Check that example includes all categories
example_section = content[content.lower().find("example"):]
# Look for JSON example
json_match = re.search(r'\{[\s\S]*"knowledge"[\s\S]*\}', example_section)
if not json_match:
return False, "No JSON example found"
example_json = json_match.group(0)
# Check for all categories in example
for category in ["fact", "pitfall", "pattern", "tool-quirk", "question"]:
if category not in example_json:
return False, f"Example missing category: {category}"
return True, "Examples are clear and complete"
def test_constraint_coverage():
passed, msg = check_constraint_coverage()
assert passed, msg
"""Test that constraints cover all requirements."""
prompt_path = Path("templates/harvest-prompt.md")
content = prompt_path.read_text()
required_constraints = [
"No hallucination",
"only extract",
"explicitly",
"partial",
"failed sessions",
"1k tokens"
]
for constraint in required_constraints:
if constraint.lower() not in content.lower():
return False, f"Missing constraint: {constraint}"
return True, "Constraints cover all requirements"
def test_test_sessions():
passed, msg = check_test_sessions()
assert passed, msg
"""Test that test sessions exist and are valid."""
test_sessions_dir = Path("test_sessions")
if not test_sessions_dir.exists():
return False, "test_sessions directory not found"
session_files = list(test_sessions_dir.glob("*.jsonl"))
if len(session_files) < 5:
return False, f"Only {len(session_files)} test sessions found, need 5"
# Check each session file
for session_file in session_files:
content = session_file.read_text()
lines = content.strip().split("\n")
# Check that each line is valid JSON
for i, line in enumerate(lines, 1):
try:
json.loads(line)
except json.JSONDecodeError as e:
return False, f"Invalid JSON in {session_file.name}, line {i}: {e}"
return True, f"Found {len(session_files)} valid test sessions"
def run_all_tests():
"""Run all tests and return results."""
tests = [
("Prompt Structure", test_prompt_structure),
("Confidence Scoring", test_confidence_scoring),
("Example Quality", test_example_quality),
("Constraint Coverage", test_constraint_coverage),
("Test Sessions", test_test_sessions)
]
results = []
all_passed = True
for test_name, test_func in tests:
try:
passed, message = test_func()
results.append({
"test": test_name,
"passed": passed,
"message": message
})
if not passed:
all_passed = False
except Exception as e:
results.append({
"test": test_name,
"passed": False,
"message": f"Error: {str(e)}"
})
all_passed = False
# Print results
print("=" * 60)
print("HARVEST PROMPT TEST RESULTS")
print("=" * 60)
for result in results:
status = "✓ PASS" if result["passed"] else "✗ FAIL"
print(f"{status}: {result['test']}")
print(f" {result['message']}")
print()
print("=" * 60)
if all_passed:
print("ALL TESTS PASSED!")
else:
print("SOME TESTS FAILED!")
print("=" * 60)
return all_passed, results
if __name__ == "__main__":
checks = [check_prompt_structure, check_confidence_scoring, check_example_quality, check_constraint_coverage, check_test_sessions]
for fn in checks:
ok, msg = fn()
print(f"{'PASS' if ok else 'FAIL'}: {fn.__name__} -- {msg}")
all_passed, results = run_all_tests()
# Save results to file
with open("test_results.json", "w") as f:
json.dump({
"all_passed": all_passed,
"results": results,
"timestamp": "2026-04-14T19:05:00Z"
}, f, indent=2)
print(f"Results saved to test_results.json")
# Exit with appropriate code
exit(0 if all_passed else 1)

View File

@@ -19,208 +19,95 @@ FileMetrics = mod.FileMetrics
def test_complexity_simple_function():
"""Simple function should have low complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def simple():
return 42
""")
f.write("\ndef simple():\n return 42\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 1.0, f"Expected 1.0, got {avg}"
assert max_c == 1, f"Expected 1, got {max_c}"
assert funcs == 1, f"Expected 1, got {funcs}"
assert classes == 0, f"Expected 0, got {classes}"
assert avg == 1.0
assert max_c == 1
assert funcs == 1
assert classes == 0
os.unlink(f.name)
print("PASS: test_complexity_simple_function")
def test_complexity_with_conditionals():
"""Function with if/else should have higher complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def complex_func(x):
if x > 0:
if x > 10:
return "big"
else:
return "small"
elif x < 0:
return "negative"
else:
return "zero"
""")
f.write("\ndef complex_func(x):\n if x > 0:\n if x > 10:\n return 'big'\n else:\n return 'small'\n elif x < 0:\n return 'negative'\n else:\n return 'zero'\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
# Base 1 + 3 if/elif + 1 nested if = 5
assert max_c >= 4, f"Expected max_c >= 4, got {max_c}"
assert funcs == 1, f"Expected 1, got {funcs}"
assert max_c >= 4
assert funcs == 1
os.unlink(f.name)
print("PASS: test_complexity_with_conditionals")
def test_complexity_with_loops():
"""Function with loops should increase complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def loop_func(items):
result = []
for item in items:
if item > 0:
result.append(item)
while len(result) > 10:
result.pop()
return result
""")
f.write("\ndef loop_func(items):\n result = []\n for item in items:\n if item > 0:\n result.append(item)\n while len(result) > 10:\n result.pop()\n return result\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
# Base 1 + 1 for + 1 if + 1 while = 4
assert max_c >= 3, f"Expected max_c >= 3, got {max_c}"
assert max_c >= 3
os.unlink(f.name)
print("PASS: test_complexity_with_loops")
def test_complexity_with_class():
"""Class with methods should count both."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
class MyClass:
def method1(self):
if True:
pass
def method2(self):
for i in range(10):
pass
""")
f.write("\nclass MyClass:\n def method1(self):\n if True:\n pass\n def method2(self):\n for i in range(10):\n pass\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert classes == 1, f"Expected 1 class, got {classes}"
assert funcs == 2, f"Expected 2 functions, got {funcs}"
assert classes == 1
assert funcs == 2
os.unlink(f.name)
print("PASS: test_complexity_with_class")
def test_complexity_syntax_error():
"""File with syntax error should return zeros."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("def broken(:\n pass")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 0.0, f"Expected 0.0, got {avg}"
assert funcs == 0, f"Expected 0, got {funcs}"
assert avg == 0.0
assert funcs == 0
os.unlink(f.name)
print("PASS: test_complexity_syntax_error")
def test_refactoring_score_high_complexity():
"""High complexity should give high score."""
metrics = FileMetrics(
path="test.py",
lines=200,
complexity=15.0,
max_complexity=25,
functions=10,
classes=2,
churn_30d=5,
churn_90d=15,
test_coverage=0.3,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=200, complexity=15.0, max_complexity=25, functions=10, classes=2, churn_30d=5, churn_90d=15, test_coverage=0.3, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
assert score > 50, f"Expected score > 50, got {score}"
assert score > 50
print("PASS: test_refactoring_score_high_complexity")
def test_refactoring_score_low_complexity():
"""Low complexity should give lower score."""
metrics = FileMetrics(
path="test.py",
lines=50,
complexity=2.0,
max_complexity=3,
functions=3,
classes=0,
churn_30d=0,
churn_90d=1,
test_coverage=0.9,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=50, complexity=2.0, max_complexity=3, functions=3, classes=0, churn_30d=0, churn_90d=1, test_coverage=0.9, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
assert score < 30, f"Expected score < 30, got {score}"
assert score < 30
print("PASS: test_refactoring_score_low_complexity")
def test_refactoring_score_high_churn():
"""High churn should increase score."""
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=10,
churn_90d=20,
test_coverage=0.5,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=10, churn_90d=20, test_coverage=0.5, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
# Churn should contribute significantly
assert score > 40, f"Expected score > 40 for high churn, got {score}"
assert score > 40
print("PASS: test_refactoring_score_high_churn")
def test_refactoring_score_no_coverage():
"""No coverage data should assume medium risk."""
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=None,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=1, churn_90d=2, test_coverage=None, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
# Should have some score from the 5-point coverage component
assert score > 0, f"Expected positive score, got {score}"
assert score > 0
print("PASS: test_refactoring_score_no_coverage")
def test_refactoring_score_large_file():
"""Large files should score higher."""
metrics_small = FileMetrics(
path="small.py",
lines=50,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
metrics_large = FileMetrics(
path="large.py",
lines=1000,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
metrics_small = FileMetrics(path="small.py", lines=50, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
metrics_large = FileMetrics(path="large.py", lines=1000, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
score_small = calculate_refactoring_score(metrics_small)
score_large = calculate_refactoring_score(metrics_large)
assert score_large > score_small, \
f"Large file ({score_large}) should score higher than small ({score_small})"
assert score_large > score_small
print("PASS: test_refactoring_score_large_file")
@@ -239,4 +126,4 @@ def run_all():
if __name__ == "__main__":
run_all()
run_all()

View File

@@ -1,207 +0,0 @@
"""Tests for knowledge deduplication module (Issue #196)."""
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from dedup import (
normalize_text,
content_hash,
tokenize,
token_similarity,
quality_score,
merge_facts,
dedup_facts,
generate_test_duplicates,
)
class TestNormalize:
def test_lowercases(self):
assert normalize_text("Hello World") == "hello world"
def test_collapses_whitespace(self):
assert normalize_text(" hello world ") == "hello world"
def test_strips(self):
assert normalize_text(" text ") == "text"
class TestContentHash:
def test_deterministic(self):
h1 = content_hash("Hello World")
h2 = content_hash("hello world")
h3 = content_hash(" Hello World ")
assert h1 == h2 == h3
def test_different_texts(self):
h1 = content_hash("Hello")
h2 = content_hash("World")
assert h1 != h2
def test_returns_hex(self):
h = content_hash("test")
assert len(h) == 64 # SHA256
assert all(c in '0123456789abcdef' for c in h)
class TestTokenize:
def test_extracts_words(self):
tokens = tokenize("Hello World Test")
assert "hello" in tokens
assert "world" in tokens
assert "test" in tokens
def test_skips_short_words(self):
tokens = tokenize("a to is the hello")
assert "a" not in tokens
assert "to" not in tokens
assert "hello" in tokens
def test_returns_set(self):
tokens = tokenize("hello hello world")
assert isinstance(tokens, set)
assert len(tokens) == 2
class TestTokenSimilarity:
def test_identical(self):
assert token_similarity("hello world", "hello world") == 1.0
def test_no_overlap(self):
assert token_similarity("alpha beta", "gamma delta") == 0.0
def test_partial_overlap(self):
sim = token_similarity("hello world test", "hello universe test")
assert 0.3 < sim < 0.7
def test_empty(self):
assert token_similarity("", "hello") == 0.0
assert token_similarity("hello", "") == 0.0
def test_symmetric(self):
a = "hello world test"
b = "hello universe test"
assert token_similarity(a, b) == token_similarity(b, a)
class TestQualityScore:
def test_high_confidence(self):
fact = {"confidence": 0.95, "source_count": 5, "tags": ["test"], "related": ["x"]}
score = quality_score(fact)
assert score > 0.7
def test_low_confidence(self):
fact = {"confidence": 0.3, "source_count": 1}
score = quality_score(fact)
assert score < 0.5
def test_defaults(self):
score = quality_score({})
assert 0 < score < 1
class TestMergeFacts:
def test_merges_tags(self):
keep = {"id": "a", "fact": "test", "tags": ["git"], "confidence": 0.9}
drop = {"id": "b", "fact": "test", "tags": ["python"], "confidence": 0.8}
merged = merge_facts(keep, drop)
assert "git" in merged["tags"]
assert "python" in merged["tags"]
def test_merges_source_count(self):
keep = {"id": "a", "fact": "test", "source_count": 3}
drop = {"id": "b", "fact": "test", "source_count": 2}
merged = merge_facts(keep, drop)
assert merged["source_count"] == 5
def test_keeps_higher_confidence(self):
keep = {"id": "a", "fact": "test", "confidence": 0.7}
drop = {"id": "b", "fact": "test", "confidence": 0.9}
merged = merge_facts(keep, drop)
assert merged["confidence"] == 0.9
def test_tracks_merged_from(self):
keep = {"id": "a", "fact": "test"}
drop = {"id": "b", "fact": "test"}
merged = merge_facts(keep, drop)
assert "b" in merged["_merged_from"]
class TestDedupFacts:
def test_removes_exact_dupes(self):
facts = [
{"id": "1", "fact": "Always use git rebase"},
{"id": "2", "fact": "Always use git rebase"}, # exact dupe
{"id": "3", "fact": "Check logs first"},
]
deduped, stats = dedup_facts(facts)
assert stats["exact_dupes"] == 1
assert stats["unique"] == 2
def test_removes_near_dupes(self):
facts = [
{"id": "1", "fact": "Always check logs before deploying to production server"},
{"id": "2", "fact": "Always check logs before deploying to production environment"},
{"id": "3", "fact": "Use docker compose for local development environments"},
]
deduped, stats = dedup_facts(facts, near_threshold=0.5)
assert stats["near_dupes"] >= 1
assert stats["unique"] == 2
def test_preserves_unique(self):
facts = [
{"id": "1", "fact": "Use git rebase for clean history"},
{"id": "2", "fact": "Docker containers should be stateless"},
{"id": "3", "fact": "Always write tests before code"},
]
deduped, stats = dedup_facts(facts)
assert stats["unique"] == 3
assert stats["removed"] == 0
def test_empty_input(self):
deduped, stats = dedup_facts([])
assert stats["total"] == 0
assert stats["unique"] == 0
def test_keeps_higher_quality_near_dup(self):
facts = [
{"id": "1", "fact": "Check logs before deploying to production server", "confidence": 0.5, "source_count": 1},
{"id": "2", "fact": "Check logs before deploying to production environment", "confidence": 0.9, "source_count": 5, "tags": ["ops"]},
]
deduped, stats = dedup_facts(facts, near_threshold=0.5)
assert stats["unique"] == 1
# Higher quality fact should be kept
assert deduped[0]["confidence"] == 0.9
def test_dry_run_does_not_modify(self):
facts = [
{"id": "1", "fact": "Same text"},
{"id": "2", "fact": "Same text"},
]
deduped, stats = dedup_facts(facts, dry_run=True)
assert stats["exact_dupes"] == 1
# In dry_run, merge_facts is skipped so facts aren't modified
assert len(deduped) == 1
class TestGenerateTestDuplicates:
def test_generates_correct_count(self):
facts = generate_test_duplicates(20)
assert len(facts) > 20 # 20 unique + duplicates
def test_has_exact_dupes(self):
facts = generate_test_duplicates(20)
hashes = [content_hash(f["fact"]) for f in facts]
# Should have some duplicate hashes
assert len(hashes) != len(set(hashes))
def test_dedup_removes_dupes(self):
facts = generate_test_duplicates(20)
deduped, stats = dedup_facts(facts)
assert stats["unique"] <= 20
assert stats["removed"] > 0

View File

@@ -1,52 +0,0 @@
"""
Tests for scripts/dependency_inventory.py
"""
import unittest
import json
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from scripts.dependency_inventory import (
parse_requirements,
parse_package_json,
parse_pyproject_toml,
scan_repo,
)
class TestParseRequirements(unittest.TestCase):
def test_parses_simple_requirement(self):
result = parse_requirements("requests>=2.33.0")
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["package"], "requests")
def test_parses_version_range(self):
result = parse_requirements("pytest>=8,<9")
self.assertEqual(result[0]["package"], "pytest")
class TestParsePackageJson(unittest.TestCase):
def test_parses_dependencies(self):
content = json.dumps({"name": "test", "dependencies": {"react": "^18.2.0"}})
result = parse_package_json(content)
self.assertTrue(any(d["package"] == "react" for d in result))
class TestParsePyprojectToml(unittest.TestCase):
def test_parses_project_dependencies(self):
content = "\n[project]\nname = \"test\"\ndependencies = [\n \"openai>=2.21.0,<3\",\n]"
result = parse_pyproject_toml(content)
self.assertEqual(len(result), 1)
class TestScanRepo(unittest.TestCase):
def test_scans_local_repo(self):
result = scan_repo(Path(__file__).resolve().parents[1])
self.assertGreater(result["dependency_count"], 0)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,227 +0,0 @@
#!/usr/bin/env python3
"""Tests for scripts/freshness.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.join(os.path.dirname(__file__) or ".", ".."))
import importlib.util
spec = importlib.util.spec_from_file_location(
"freshness", os.path.join(os.path.dirname(__file__) or ".", "..", "scripts", "freshness.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
compute_file_hash = mod.compute_file_hash
check_freshness = mod.check_freshness
load_knowledge_entries = mod.load_knowledge_entries
def test_compute_file_hash():
"""File hash should be computed correctly."""
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write("test content")
f.flush()
h = compute_file_hash(f.name)
assert h is not None
assert h.startswith("sha256:")
os.unlink(f.name)
print("PASS: test_compute_file_hash")
def test_compute_file_hash_nonexistent():
"""Nonexistent file should return None."""
h = compute_file_hash("/nonexistent/file.txt")
assert h is None
print("PASS: test_compute_file_hash_nonexistent")
def test_load_knowledge_entries_empty():
"""Empty knowledge dir should return empty list."""
with tempfile.TemporaryDirectory() as tmpdir:
entries = load_knowledge_entries(tmpdir)
assert entries == []
print("PASS: test_load_knowledge_entries_empty")
def test_load_knowledge_entries_from_index():
"""Should load entries from index.json."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create index.json
index_path = os.path.join(tmpdir, "index.json")
with open(index_path, "w") as f:
json.dump({
"facts": [
{
"fact": "Test fact",
"source_file": "test.py",
"source_hash": "sha256:abc123",
"category": "fact",
"confidence": 0.9
}
]
}, f)
entries = load_knowledge_entries(tmpdir)
assert len(entries) == 1
assert entries[0]["fact"] == "Test fact"
assert entries[0]["source_file"] == "test.py"
print("PASS: test_load_knowledge_entries_from_index")
def test_load_knowledge_entries_from_yaml():
"""Should load entries from YAML files."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create global directory
global_dir = os.path.join(tmpdir, "global")
os.makedirs(global_dir)
# Create YAML file
yaml_path = os.path.join(global_dir, "test.yaml")
with open(yaml_path, "w") as f:
f.write("""
pitfalls:
- description: "Test pitfall"
source_file: "test.py"
source_hash: "sha256:def456"
category: "pitfall"
confidence: 0.8
""")
entries = load_knowledge_entries(tmpdir)
assert len(entries) == 1
assert entries[0]["fact"] == "Test pitfall"
assert entries[0]["category"] == "pitfall"
print("PASS: test_load_knowledge_entries_from_yaml")
def test_check_freshness_no_changes():
"""With no source file reference, entries should be counted correctly."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create knowledge dir
knowledge_dir = os.path.join(tmpdir, "knowledge")
os.makedirs(knowledge_dir)
# Create repo dir
repo_dir = os.path.join(tmpdir, "repo")
os.makedirs(repo_dir)
# Create index.json with entry that has no source_file
index_path = os.path.join(knowledge_dir, "index.json")
with open(index_path, "w") as f:
json.dump({
"facts": [
{
"fact": "General knowledge",
"category": "fact",
"confidence": 0.9
# No source_file or source_hash
}
]
}, f)
result = check_freshness(knowledge_dir, repo_dir, days=1)
# Entry without source_file should be counted as "fresh" (no_source status)
assert result["summary"]["total"] == 1
assert result["summary"]["stale"] == 0
assert result["summary"]["fresh"] == 1
assert result["fresh_entries"][0]["status"] == "no_source"
print("PASS: test_check_freshness_no_changes")
def test_check_freshness_with_hash_mismatch():
"""Hash mismatch should mark entry as stale."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create knowledge dir
knowledge_dir = os.path.join(tmpdir, "knowledge")
os.makedirs(knowledge_dir)
# Create repo dir with a file
repo_dir = os.path.join(tmpdir, "repo")
os.makedirs(repo_dir)
test_file = os.path.join(repo_dir, "test.py")
with open(test_file, "w") as f:
f.write("print('hello')")
# Create index.json with wrong hash
index_path = os.path.join(knowledge_dir, "index.json")
with open(index_path, "w") as f:
json.dump({
"facts": [
{
"fact": "Test fact",
"source_file": "test.py",
"source_hash": "sha256:wronghash",
"category": "fact",
"confidence": 0.9
}
]
}, f)
# Initialize git repo
os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null")
result = check_freshness(knowledge_dir, repo_dir, days=1)
assert result["summary"]["total"] == 1
assert result["summary"]["stale"] == 1
assert result["summary"]["fresh"] == 0
assert result["stale_entries"][0]["reason"] == "hash_mismatch"
print("PASS: test_check_freshness_with_hash_mismatch")
def test_check_freshness_missing_source():
"""Missing source file should mark entry as stale."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create knowledge dir
knowledge_dir = os.path.join(tmpdir, "knowledge")
os.makedirs(knowledge_dir)
# Create repo dir (without the referenced file)
repo_dir = os.path.join(tmpdir, "repo")
os.makedirs(repo_dir)
# Create index.json referencing nonexistent file
index_path = os.path.join(knowledge_dir, "index.json")
with open(index_path, "w") as f:
json.dump({
"facts": [
{
"fact": "Test fact",
"source_file": "nonexistent.py",
"source_hash": "sha256:abc123",
"category": "fact",
"confidence": 0.9
}
]
}, f)
# Initialize git repo
os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null")
result = check_freshness(knowledge_dir, repo_dir, days=1)
assert result["summary"]["total"] == 1
assert result["summary"]["stale"] == 1
assert result["summary"]["fresh"] == 0
assert result["stale_entries"][0]["reason"] == "source_missing"
print("PASS: test_check_freshness_missing_source")
def run_all():
test_compute_file_hash()
test_compute_file_hash_nonexistent()
test_load_knowledge_entries_empty()
test_load_knowledge_entries_from_index()
test_load_knowledge_entries_from_yaml()
test_check_freshness_no_changes()
test_check_freshness_with_hash_mismatch()
test_check_freshness_missing_source()
print("\nAll 8 tests passed!")
if __name__ == "__main__":
run_all()

View File

@@ -1,108 +0,0 @@
"""
Tests for quality_gate.py — Knowledge entry quality scoring.
"""
import unittest
from datetime import datetime, timezone, timedelta
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from quality_gate import (
score_specificity,
score_actionability,
score_freshness,
score_source_quality,
score_entry,
filter_entries,
)
class TestScoreSpecificity(unittest.TestCase):
def test_specific_content_scores_high(self):
content = "Run `python3 deploy.py --env prod` on 2026-04-15. Example: step 1 configure nginx."
score = score_specificity(content)
self.assertGreater(score, 0.6)
def test_vague_content_scores_low(self):
content = "It generally depends. Various factors might affect this. Basically, it varies."
score = score_specificity(content)
self.assertLess(score, 0.5)
def test_empty_scores_baseline(self):
score = score_specificity("")
self.assertAlmostEqual(score, 0.5, delta=0.1)
class TestScoreActionability(unittest.TestCase):
def test_actionable_content_scores_high(self):
content = "1. Run `pip install -r requirements.txt`\n2. Execute `python3 train.py`\n3. Verify with `pytest`"
score = score_actionability(content)
self.assertGreater(score, 0.6)
def test_abstract_content_scores_low(self):
content = "The concept of intelligence is fascinating and multifaceted."
score = score_actionability(content)
self.assertLess(score, 0.5)
class TestScoreFreshness(unittest.TestCase):
def test_recent_timestamp_scores_high(self):
recent = datetime.now(timezone.utc).isoformat()
score = score_freshness(recent)
self.assertGreater(score, 0.9)
def test_old_timestamp_scores_low(self):
old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
score = score_freshness(old)
self.assertLess(score, 0.2)
def test_none_returns_baseline(self):
score = score_freshness(None)
self.assertEqual(score, 0.5)
class TestScoreSourceQuality(unittest.TestCase):
def test_claude_scores_high(self):
self.assertGreater(score_source_quality("claude-sonnet"), 0.85)
def test_ollama_scores_lower(self):
self.assertLess(score_source_quality("ollama"), 0.7)
def test_unknown_returns_default(self):
self.assertEqual(score_source_quality("unknown"), 0.5)
class TestScoreEntry(unittest.TestCase):
def test_good_entry_scores_high(self):
entry = {
"content": "To deploy: run `kubectl apply -f deployment.yaml`. Verify with `kubectl get pods`.",
"model": "claude-sonnet",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
score = score_entry(entry)
self.assertGreater(score, 0.6)
def test_poor_entry_scores_low(self):
entry = {
"content": "It depends. Various things might happen.",
"model": "unknown",
}
score = score_entry(entry)
self.assertLess(score, 0.5)
class TestFilterEntries(unittest.TestCase):
def test_filters_low_quality(self):
entries = [
{"content": "Run `deploy.py` to fix the issue.", "model": "claude"},
{"content": "It might work sometimes.", "model": "unknown"},
{"content": "Configure nginx: step 1 edit nginx.conf", "model": "gpt-4"},
]
filtered = filter_entries(entries, threshold=0.5)
self.assertGreaterEqual(len(filtered), 2)
if __name__ == "__main__":
unittest.main()