Compare commits
21 Commits
fix/198-qu
...
step35/138
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
181d4129ea | ||
| 345d2451d0 | |||
| 8aa9c9f018 | |||
| 277f9e3a2b | |||
| 21f654a159 | |||
| 12abaad838 | |||
| c106db2e28 | |||
| 242c77cc99 | |||
| fe94130380 | |||
| 4181065f60 | |||
| cc215e3ed7 | |||
| baa2c84c3f | |||
| 6dd354385f | |||
|
|
55adcb31dc | ||
|
|
ec0e9d65ca | ||
| b732172dcc | |||
| f7c479c4eb | |||
| c203010e3a | |||
| 7a4677c752 | |||
| 229c327c9e | |||
| 537bb1b61b |
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
__pycache__/
|
||||
*.pyc
|
||||
374
GENOME.md
374
GENOME.md
@@ -1,16 +1,16 @@
|
||||
# GENOME.md — compounding-intelligence
|
||||
|
||||
*Auto-generated codebase genome. Addresses timmy-home#676.*
|
||||
**Generated:** 2026-04-17
|
||||
**Repo:** Timmy_Foundation/compounding-intelligence
|
||||
**Description:** Turn 1B+ daily agent tokens into durable, compounding fleet intelligence.
|
||||
|
||||
---
|
||||
|
||||
## Project Overview
|
||||
|
||||
**What:** A system that turns 1B+ daily agent tokens into durable, compounding fleet intelligence.
|
||||
Every agent session starts at zero. The same HTTP 405 gets rediscovered as a branch protection issue. The same token path gets searched from scratch. Intelligence evaporates when the session ends.
|
||||
|
||||
**Why:** Every agent session starts at zero. The same mistakes get made repeatedly — the same HTTP 405 is rediscovered as a branch protection issue, the same token path is searched for from scratch. Intelligence evaporates when the session ends.
|
||||
|
||||
**How:** Three pipelines form a compounding loop:
|
||||
Compounding-intelligence solves this with three pipelines forming a loop:
|
||||
|
||||
```
|
||||
SESSION ENDS → HARVESTER → KNOWLEDGE STORE → BOOTSTRAPPER → NEW SESSION STARTS SMARTER
|
||||
@@ -18,222 +18,234 @@ SESSION ENDS → HARVESTER → KNOWLEDGE STORE → BOOTSTRAPPER → NEW SESSION
|
||||
MEASURER → Prove it's working
|
||||
```
|
||||
|
||||
**Status:** Early stage. Template and test scaffolding exist. Core pipeline scripts (harvester.py, bootstrapper.py, measurer.py, session_reader.py) are planned but not yet implemented. The knowledge extraction prompt is complete and validated.
|
||||
|
||||
---
|
||||
**Status:** Active development. Core pipelines implemented. 20+ scripts, 14 test files, knowledge store populated with real data.
|
||||
|
||||
## Architecture
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
A[Session Transcript<br/>.jsonl] --> B[Harvester]
|
||||
B --> C{Extract Knowledge}
|
||||
C --> D[knowledge/index.json]
|
||||
C --> E[knowledge/global/*.md]
|
||||
C --> F[knowledge/repos/{repo}.md]
|
||||
C --> G[knowledge/agents/{agent}.md]
|
||||
D --> H[Bootstrapper]
|
||||
H --> I[Bootstrap Context<br/>2k token injection]
|
||||
I --> J[New Session<br/>starts smarter]
|
||||
J --> A
|
||||
D --> K[Measurer]
|
||||
K --> L[metrics/dashboard.md]
|
||||
K --> M[Velocity / Hit Rate<br/>Error Reduction]
|
||||
TRANS[Session Transcripts<br/>~/.hermes/sessions/*.jsonl] --> READER[session_reader.py]
|
||||
READER --> HARVESTER[harvester.py]
|
||||
HARVESTER -->|LLM extraction| PROMPT[harvest-prompt.md]
|
||||
HARVESTER --> DEDUP[deduplicate()]
|
||||
DEDUP --> INDEX[knowledge/index.json]
|
||||
DEDUP --> GLOBAL[knowledge/global/*.yaml]
|
||||
DEDUP --> REPO[knowledge/repos/*.yaml]
|
||||
|
||||
INDEX --> BOOTSTRAPPER[bootstrapper.py]
|
||||
BOOTSTRAPPER -->|filter + rank + truncate| CONTEXT[Bootstrap Context<br/>2k token injection]
|
||||
CONTEXT --> SESSION[New Session starts smarter]
|
||||
|
||||
INDEX --> VALIDATOR[validate_knowledge.py]
|
||||
INDEX --> STALENESS[knowledge_staleness_check.py]
|
||||
INDEX --> GAPS[knowledge_gap_identifier.py]
|
||||
|
||||
TRANS --> SAMPLER[sampler.py]
|
||||
SAMPLER -->|score + rank| BEST[High-value sessions]
|
||||
BEST --> HARVESTER
|
||||
|
||||
TRANS --> METADATA[session_metadata.py]
|
||||
METADATA --> SUMMARY[SessionSummary objects]
|
||||
|
||||
KNOWLEDGE --> DIFF[diff_analyzer.py]
|
||||
DIFF --> PROPOSALS[improvement_proposals.py]
|
||||
PROPOSALS --> PRIORITIES[priority_rebalancer.py]
|
||||
```
|
||||
|
||||
### Pipeline 1: Harvester
|
||||
## Entry Points
|
||||
|
||||
**Status:** Prompt designed. Script not implemented.
|
||||
### Core Pipelines
|
||||
|
||||
Reads finished session transcripts (JSONL). Uses `templates/harvest-prompt.md` to extract durable knowledge into five categories:
|
||||
| Script | Purpose | Key Functions |
|
||||
|--------|---------|---------------|
|
||||
| `harvester.py` | Extract knowledge from session transcripts | `harvest_session()`, `call_llm()`, `deduplicate()`, `validate_fact()` |
|
||||
| `bootstrapper.py` | Build pre-session context from knowledge store | `build_bootstrap_context()`, `filter_facts()`, `sort_facts()`, `truncate_to_tokens()` |
|
||||
| `session_reader.py` | Parse JSONL session transcripts | `read_session()`, `extract_conversation()`, `messages_to_text()` |
|
||||
| `sampler.py` | Score and rank sessions for harvesting value | `scan_session_fast()`, `score_session()` |
|
||||
| `session_metadata.py` | Extract structured metadata from sessions | `extract_session_metadata()`, `SessionSummary` |
|
||||
|
||||
| Category | Description | Example |
|
||||
|----------|-------------|---------|
|
||||
| `fact` | Concrete, verifiable information | "Repository X has 5 files" |
|
||||
| `pitfall` | Errors encountered, wrong assumptions | "Token is at ~/.config/gitea/token, not env var" |
|
||||
| `pattern` | Successful action sequences | "Deploy: test → build → push → webhook" |
|
||||
| `tool-quirk` | Environment-specific behaviors | "URL format requires trailing slash" |
|
||||
| `question` | Identified but unanswered | "Need optimal batch size for harvesting" |
|
||||
### Analysis & Quality
|
||||
|
||||
Output schema per knowledge item:
|
||||
```json
|
||||
{
|
||||
"fact": "One sentence description",
|
||||
"category": "fact|pitfall|pattern|tool-quirk|question",
|
||||
"repo": "repo-name or 'global'",
|
||||
"confidence": 0.0-1.0
|
||||
}
|
||||
```
|
||||
| Script | Purpose |
|
||||
|--------|---------|
|
||||
| `validate_knowledge.py` | Validate knowledge index schema compliance |
|
||||
| `knowledge_staleness_check.py` | Detect stale knowledge (source changed since extraction) |
|
||||
| `knowledge_gap_identifier.py` | Find untested functions, undocumented APIs, missing tests |
|
||||
| `diff_analyzer.py` | Analyze code diffs for improvement signals |
|
||||
| `improvement_proposals.py` | Generate ranked improvement proposals |
|
||||
| `priority_rebalancer.py` | Rebalance priorities across proposals |
|
||||
| `automation_opportunity_finder.py` | Find manual steps that can be automated |
|
||||
| `dead_code_detector.py` | Detect unused code |
|
||||
| `dependency_graph.py` | Map dependency relationships |
|
||||
| `perf_bottleneck_finder.py` | Find performance bottlenecks |
|
||||
| `refactoring_opportunity_finder.py` | Identify refactoring targets |
|
||||
| `gitea_issue_parser.py` | Parse Gitea issues for knowledge extraction |
|
||||
|
||||
### Pipeline 2: Bootstrapper
|
||||
### Automation
|
||||
|
||||
**Status:** Not implemented.
|
||||
| Script | Purpose |
|
||||
|--------|---------|
|
||||
| `session_pair_harvester.py` | Extract training pairs from sessions |
|
||||
|
||||
Queries knowledge store before session start. Assembles a compact 2k-token context from relevant facts. Injects into session startup so the agent begins with full situational awareness.
|
||||
|
||||
### Pipeline 3: Measurer
|
||||
|
||||
**Status:** Not implemented.
|
||||
|
||||
Tracks compounding metrics: knowledge velocity (facts/day), error reduction (%), hit rate (knowledge used / knowledge available), task completion improvement.
|
||||
|
||||
---
|
||||
|
||||
## Directory Structure
|
||||
## Data Flow
|
||||
|
||||
```
|
||||
compounding-intelligence/
|
||||
├── README.md # Project overview and architecture
|
||||
├── GENOME.md # This file (codebase genome)
|
||||
├── knowledge/ # [PLANNED] Knowledge store
|
||||
│ ├── index.json # Machine-readable fact index
|
||||
│ ├── global/ # Cross-repo knowledge
|
||||
│ ├── repos/{repo}.md # Per-repo knowledge
|
||||
│ └── agents/{agent}.md # Agent-type notes
|
||||
├── scripts/
|
||||
│ ├── test_harvest_prompt.py # Basic prompt validation (2.5KB)
|
||||
│ └── test_harvest_prompt_comprehensive.py # Full prompt structure test (6.8KB)
|
||||
├── templates/
|
||||
│ └── harvest-prompt.md # Knowledge extraction prompt (3.5KB)
|
||||
├── test_sessions/
|
||||
│ ├── session_success.jsonl # Happy path test data
|
||||
│ ├── session_failure.jsonl # Failure path test data
|
||||
│ ├── session_partial.jsonl # Incomplete session test data
|
||||
│ ├── session_patterns.jsonl # Pattern extraction test data
|
||||
│ └── session_questions.jsonl # Question identification test data
|
||||
└── metrics/ # [PLANNED] Compounding metrics
|
||||
└── dashboard.md
|
||||
1. Session ends → .jsonl written to ~/.hermes/sessions/
|
||||
2. sampler.py scores sessions by age, recency, repo coverage
|
||||
3. harvester.py reads top sessions, calls LLM with harvest-prompt.md
|
||||
4. LLM extracts facts/pitfalls/patterns/quirks/questions
|
||||
5. deduplicate() checks against existing index via fact_fingerprint()
|
||||
6. validate_fact() checks schema compliance
|
||||
7. write_knowledge() appends to knowledge/index.json + per-repo YAML
|
||||
8. On next session start, bootstrapper.py:
|
||||
a. Loads knowledge/index.json
|
||||
b. Filters by session's repo and agent type
|
||||
c. Sorts by confidence (high first), then recency
|
||||
d. Truncates to 2k token budget
|
||||
e. Injects as pre-context
|
||||
9. Agent starts with full situational awareness instead of zero
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Entry Points and Data Flow
|
||||
|
||||
### Entry Point 1: Knowledge Extraction (Harvester)
|
||||
|
||||
```
|
||||
Input: Session transcript (JSONL)
|
||||
↓
|
||||
templates/harvest-prompt.md (LLM prompt)
|
||||
↓
|
||||
Knowledge items (JSON array)
|
||||
↓
|
||||
Output: knowledge/index.json + per-repo/per-agent markdown files
|
||||
```
|
||||
|
||||
### Entry Point 2: Session Bootstrap (Bootstrapper)
|
||||
|
||||
```
|
||||
Input: Session context (repo, agent type, task type)
|
||||
↓
|
||||
knowledge/index.json (query relevant facts)
|
||||
↓
|
||||
2k-token bootstrap context
|
||||
↓
|
||||
Output: Injected into session startup
|
||||
```
|
||||
|
||||
### Entry Point 3: Measurement (Measurer)
|
||||
|
||||
```
|
||||
Input: knowledge/index.json + session history
|
||||
↓
|
||||
Velocity, hit rate, error reduction calculations
|
||||
↓
|
||||
Output: metrics/dashboard.md
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Key Abstractions
|
||||
|
||||
### Knowledge Item
|
||||
The atomic unit. One sentence, one category, one confidence score. Designed to be small enough that 1000 items fit in a 2k-token bootstrap context.
|
||||
### Knowledge Item (fact/pitfall/pattern/quirk/question)
|
||||
```json
|
||||
{
|
||||
"fact": "Gitea token is at ~/.config/gitea/token",
|
||||
"category": "tool-quirk",
|
||||
"repo": "global",
|
||||
"confidence": 0.9,
|
||||
"evidence": "Found during clone attempt",
|
||||
"source_session": "2026-04-13_abc123",
|
||||
"extracted_at": "2026-04-13T20:00:00Z"
|
||||
}
|
||||
```
|
||||
|
||||
### Knowledge Store
|
||||
A directory structure that mirrors the fleet's mental model:
|
||||
- `global/` — knowledge that applies everywhere (tool quirks, environment facts)
|
||||
- `repos/` — knowledge specific to each repo
|
||||
- `agents/` — knowledge specific to each agent type
|
||||
### SessionSummary (session_metadata.py)
|
||||
Extracted metadata per session: duration, token count, tools used, repos touched, error count, outcome.
|
||||
|
||||
### Confidence Score
|
||||
0.0–1.0 scale. Defines how certain the harvester is about each extracted fact:
|
||||
- 0.9–1.0: Explicitly stated with verification
|
||||
- 0.7–0.8: Clearly implied by multiple data points
|
||||
- 0.5–0.6: Suggested but not fully verified
|
||||
- 0.3–0.4: Inferred from limited data
|
||||
- 0.1–0.2: Speculative or uncertain
|
||||
### Gap / GapReport (knowledge_gap_identifier.py)
|
||||
Structured gap analysis: untested functions, undocumented APIs, missing tests. Severity: critical/high/medium/low.
|
||||
|
||||
### Bootstrap Context
|
||||
The 2k-token injection that a new session receives. Assembled from the most relevant knowledge items for the current task, filtered by confidence > 0.7, deduplicated, and compressed.
|
||||
### Knowledge Index (knowledge/index.json)
|
||||
Machine-readable fact store. 12KB, populated with real data. Categories: fact, pitfall, pattern, tool-quirk, question.
|
||||
|
||||
---
|
||||
## Knowledge Store
|
||||
|
||||
```
|
||||
knowledge/
|
||||
├── index.json # Master fact store (12KB, populated)
|
||||
├── SCHEMA.md # Schema documentation
|
||||
├── global/
|
||||
│ ├── pitfalls.yaml # Cross-repo pitfalls (2KB)
|
||||
│ └── tool-quirks.yaml # Tool-specific quirks (2KB)
|
||||
├── repos/
|
||||
│ ├── hermes-agent.yaml # hermes-agent knowledge (2KB)
|
||||
│ └── the-nexus.yaml # the-nexus knowledge (2KB)
|
||||
└── agents/ # Per-agent knowledge (empty)
|
||||
```
|
||||
|
||||
## API Surface
|
||||
|
||||
### Internal (scripts not yet implemented)
|
||||
### LLM API (consumed)
|
||||
| Provider | Endpoint | Usage |
|
||||
|----------|----------|-------|
|
||||
| Nous Research | `https://inference-api.nousresearch.com/v1` | Knowledge extraction |
|
||||
| Ollama | `http://localhost:11434/v1` | Local fallback |
|
||||
|
||||
| Script | Input | Output | Status |
|
||||
|--------|-------|--------|--------|
|
||||
| `harvester.py` | Session JSONL path | Knowledge items JSON | PLANNED |
|
||||
| `bootstrapper.py` | Repo + agent type | 2k-token context string | PLANNED |
|
||||
| `measurer.py` | Knowledge store path | Metrics JSON | PLANNED |
|
||||
| `session_reader.py` | Session JSONL path | Parsed transcript | PLANNED |
|
||||
|
||||
### Prompt (templates/harvest-prompt.md)
|
||||
|
||||
The extraction prompt is the core "API." It takes a session transcript and returns structured JSON. It defines:
|
||||
- Five extraction categories
|
||||
- Output format (JSON array of knowledge items)
|
||||
- Confidence scoring rubric
|
||||
- Constraints (no hallucination, specificity, relevance, brevity)
|
||||
- Example input/output pair
|
||||
|
||||
---
|
||||
### File API (consumed/produced)
|
||||
| Path | Format | Direction |
|
||||
|------|--------|-----------|
|
||||
| `~/.hermes/sessions/*.jsonl` | JSONL | Input (session transcripts) |
|
||||
| `knowledge/index.json` | JSON | Output (master fact store) |
|
||||
| `knowledge/global/*.yaml` | YAML | Output (cross-repo knowledge) |
|
||||
| `knowledge/repos/*.yaml` | YAML | Output (per-repo knowledge) |
|
||||
| `templates/harvest-prompt.md` | Markdown | Config (extraction prompt) |
|
||||
|
||||
## Test Coverage
|
||||
|
||||
### What Exists
|
||||
**14 test files** covering core pipelines:
|
||||
|
||||
| File | Tests | Coverage |
|
||||
|------|-------|----------|
|
||||
| `scripts/test_harvest_prompt.py` | 2 tests | Prompt file existence, sample transcript |
|
||||
| `scripts/test_harvest_prompt_comprehensive.py` | 5 tests | Prompt structure, categories, fields, confidence scoring, size limits |
|
||||
| `test_sessions/*.jsonl` | 5 sessions | Success, failure, partial, patterns, questions |
|
||||
| Test File | Covers |
|
||||
|-----------|--------|
|
||||
| `test_harvest_prompt.py` | Prompt validation, hallucination detection |
|
||||
| `test_harvest_prompt_comprehensive.py` | Extended prompt testing |
|
||||
| `test_harvester_pipeline.py` | Harvester extraction + dedup |
|
||||
| `test_bootstrapper.py` | Context building, filtering, truncation |
|
||||
| `test_session_pair_harvester.py` | Training pair extraction |
|
||||
| `test_improvement_proposals.py` | Proposal generation |
|
||||
| `test_priority_rebalancer.py` | Priority scoring |
|
||||
| `test_knowledge_staleness.py` | Staleness detection |
|
||||
| `test_automation_opportunity_finder.py` | Automation detection |
|
||||
| `test_diff_analyzer.py` | Diff analysis |
|
||||
| `test_gitea_issue_parser.py` | Issue parsing |
|
||||
| `test_refactoring_opportunity_finder.py` | Refactoring signals |
|
||||
| `test_knowledge_gap_identifier.py` | Gap analysis |
|
||||
| `test_perf_bottleneck_finder.py` | Perf bottleneck detection |
|
||||
|
||||
### What's Missing
|
||||
### Coverage Gaps
|
||||
|
||||
1. **Harvester integration test** — Does the prompt actually extract correct knowledge from real transcripts?
|
||||
2. **Bootstrapper test** — Does it assemble relevant context correctly?
|
||||
3. **Knowledge store test** — Does the index.json maintain consistency?
|
||||
4. **Confidence calibration test** — Do high-confidence facts actually prove true in later sessions?
|
||||
5. **Deduplication test** — Are duplicate facts across sessions handled?
|
||||
6. **Staleness test** — How does the system handle outdated knowledge?
|
||||
|
||||
---
|
||||
1. **session_reader.py** — No dedicated test file (tested indirectly)
|
||||
2. **sampler.py** — No test file (scoring logic untested)
|
||||
3. **session_metadata.py** — No test file
|
||||
4. **validate_knowledge.py** — No test file
|
||||
5. **knowledge_staleness_check.py** — Tested but limited
|
||||
|
||||
## Security Considerations
|
||||
|
||||
1. **No secrets in knowledge store** — The harvester must filter out API keys, tokens, and credentials from extracted facts. The prompt constraints mention this but there is no automated guard.
|
||||
### API Key Handling
|
||||
- `harvester.py` reads API key from `~/.hermes/auth.json` or env vars
|
||||
- Key passed to LLM API in request headers only
|
||||
- No key logging
|
||||
|
||||
2. **Knowledge poisoning** — A malicious or corrupted session could inject false facts. Confidence scoring partially mitigates this, but there is no verification step.
|
||||
### Knowledge Integrity
|
||||
- `validate_fact()` checks schema before writing
|
||||
- `deduplicate()` prevents duplicate entries via fingerprint
|
||||
- `knowledge_staleness_check.py` detects when source code changed but knowledge didn't
|
||||
- Confidence scores prevent low-quality knowledge from polluting the store
|
||||
|
||||
3. **Access control** — The knowledge store has no access control. Any process that can read the directory can read all facts. In a multi-tenant setup, this is a concern.
|
||||
### File Safety
|
||||
- Knowledge writes are append-only (never deletes)
|
||||
- Bootstrap context is truncated to budget (no prompt injection via knowledge)
|
||||
- Session reader handles malformed JSONL gracefully
|
||||
|
||||
4. **Transcript privacy** — Session transcripts may contain user data. The harvester must not extract personally identifiable information into the knowledge store.
|
||||
## File Index
|
||||
|
||||
```
|
||||
scripts/
|
||||
harvester.py (473 lines) — Core knowledge extraction
|
||||
bootstrapper.py (302 lines) — Pre-session context builder
|
||||
session_reader.py (137 lines) — JSONL session parser
|
||||
sampler.py (363 lines) — Session scoring + ranking
|
||||
session_metadata.py (271 lines) — Session metadata extraction
|
||||
validate_knowledge.py (44 lines) — Index validation
|
||||
knowledge_staleness_check.py (125 lines) — Staleness detection
|
||||
knowledge_gap_identifier.py (291 lines) — Gap analysis engine
|
||||
diff_analyzer.py (203 lines) — Diff analysis
|
||||
improvement_proposals.py (518 lines) — Proposal generation
|
||||
priority_rebalancer.py (745 lines) — Priority scoring
|
||||
automation_opportunity_finder.py (600 lines) — Automation detection
|
||||
dead_code_detector.py (270 lines) — Dead code detection
|
||||
dependency_graph.py (220 lines) — Dependency mapping
|
||||
perf_bottleneck_finder.py (635 lines) — Perf analysis
|
||||
refactoring_opportunity_finder.py (46 lines) — Refactoring signals
|
||||
gitea_issue_parser.py (140 lines) — Gitea issue parsing
|
||||
session_pair_harvester.py (224 lines) — Training pair extraction
|
||||
knowledge/
|
||||
index.json (12KB) — Master fact store
|
||||
SCHEMA.md (3KB) — Schema docs
|
||||
global/pitfalls.yaml (2KB) — Cross-repo pitfalls
|
||||
global/tool-quirks.yaml (2KB) — Tool quirks
|
||||
repos/hermes-agent.yaml (2KB) — Repo-specific knowledge
|
||||
repos/the-nexus.yaml (2KB) — Repo-specific knowledge
|
||||
templates/
|
||||
harvest-prompt.md (4KB) — Extraction prompt
|
||||
test_sessions/ (5 files) — Sample transcripts
|
||||
tests/ + scripts/test_* (14 files)— Test suite
|
||||
```
|
||||
|
||||
**Total:** ~6,500 lines of code across 18 scripts + 14 test files.
|
||||
|
||||
---
|
||||
|
||||
## The 100x Path (from README)
|
||||
|
||||
```
|
||||
Month 1: 15,000 facts, sessions 20% faster
|
||||
Month 2: 45,000 facts, sessions 40% faster, first-try success up 30%
|
||||
Month 3: 90,000 facts, fleet measurably smarter per token
|
||||
```
|
||||
|
||||
Each new session is better than the last. The intelligence compounds.
|
||||
|
||||
---
|
||||
|
||||
*Generated by codebase-genome pipeline. Ref: timmy-home#676.*
|
||||
*Generated by Codebase Genome pipeline — Issue #676*
|
||||
|
||||
283
scripts/conference_summarizer.py
Executable file
283
scripts/conference_summarizer.py
Executable file
@@ -0,0 +1,283 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
conference_summarizer.py — Extract knowledge from conference talk transcripts.
|
||||
|
||||
Reads a plain-text transcript and uses LLM to extract durable knowledge items.
|
||||
Integrates with the knowledge store (index.json + knowledge/conferences/talks.md).
|
||||
|
||||
Usage:
|
||||
python3 conference_summarizer.py --transcript talk.txt --conference "AI拂晓" --domain global
|
||||
python3 conference_summarizer.py --transcript talk.txt --domain the-nexus # talk about that repo
|
||||
python3 conference_summarizer.py --transcript talk.txt --dry-run
|
||||
|
||||
Refs: Issue #138 — 7.6: Conference Talk Summarizer
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent.absolute()
|
||||
REPO_ROOT = SCRIPT_DIR.parent
|
||||
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
|
||||
|
||||
DEFAULT_API_BASE = "https://api.nousresearch.com/v1"
|
||||
DEFAULT_API_KEY = ""
|
||||
DEFAULT_MODEL = "xiaomi/mimo-v2-pro"
|
||||
|
||||
API_KEY_PATHS = [
|
||||
Path.home() / ".config/nous/key",
|
||||
Path.home() / ".hermes/keymaxxing/active/minimax.key",
|
||||
Path.home() / ".config/openrouter/key",
|
||||
]
|
||||
|
||||
|
||||
def find_api_key() -> str:
|
||||
for path in API_KEY_PATHS:
|
||||
if path.exists():
|
||||
return path.read_text().strip()
|
||||
return ""
|
||||
|
||||
|
||||
def load_prompt() -> str:
|
||||
path = SCRIPT_DIR.parent / "templates" / "conference-summary-prompt.md"
|
||||
if not path.exists():
|
||||
print(f"ERROR: Prompt not found at {path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return path.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def truncate_for_context(text: str, head: int = 120, tail: int = 120) -> str:
|
||||
lines = text.splitlines()
|
||||
if len(lines) <= head + tail:
|
||||
return text
|
||||
return (
|
||||
"\n".join(lines[:head])
|
||||
+ "\n\n... [truncated] ...\n\n"
|
||||
+ "\n".join(lines[-tail:])
|
||||
)
|
||||
|
||||
|
||||
def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str):
|
||||
import urllib.request
|
||||
|
||||
messages = [
|
||||
{"role": "system", "content": prompt},
|
||||
{"role": "user", "content": f"Transcript:\n\n{truncate_for_context(transcript)}"},
|
||||
]
|
||||
payload = json.dumps(
|
||||
{"model": model, "messages": messages, "temperature": 0.1, "max_tokens": 4096}
|
||||
).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{api_base}/chat/completions",
|
||||
data=payload,
|
||||
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=60) as resp:
|
||||
data = json.loads(resp.read())
|
||||
content = data["choices"][0]["message"]["content"].strip()
|
||||
# Strip code fences
|
||||
if content.startswith("```"):
|
||||
content = content.split("\n", 1)[1].rsplit("```", 1)[0].strip()
|
||||
return json.loads(content)
|
||||
except Exception as e:
|
||||
print(f"ERROR: LLM extraction failed: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def load_index(knowledge_dir: Path) -> dict:
|
||||
index_path = knowledge_dir / "index.json"
|
||||
if index_path.exists():
|
||||
with open(index_path) as f:
|
||||
return json.load(f)
|
||||
return {"version": 1, "total_facts": 0, "facts": []}
|
||||
|
||||
|
||||
def content_hash(text: str) -> str:
|
||||
normalized = " ".join(text.lower().strip().split())
|
||||
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def compute_next_sequence(existing_facts: list[dict], domain: str, category: str) -> int:
|
||||
"""Compute next sequence number for (domain, category) based on existing IDs."""
|
||||
max_seq = 0
|
||||
for f in existing_facts:
|
||||
fid = f.get("id", "")
|
||||
parts = fid.split(":")
|
||||
if len(parts) == 3 and parts[0] == domain and parts[1] == category:
|
||||
try:
|
||||
seq = int(parts[2])
|
||||
max_seq = max(max_seq, seq)
|
||||
except ValueError:
|
||||
pass
|
||||
return max_seq + 1
|
||||
|
||||
|
||||
def deduplicate(new_facts: list[dict], existing: list[dict]) -> list[dict]:
|
||||
"""Exact-deduplicate by content hash; near-dedup by token overlap."""
|
||||
existing_hashes = {content_hash(f["fact"]): f for f in existing}
|
||||
existing_texts = [f["fact"].lower() for f in existing]
|
||||
unique = []
|
||||
for fact in new_facts:
|
||||
text = fact.get("fact", "")
|
||||
h = content_hash(text)
|
||||
if h in existing_hashes:
|
||||
continue
|
||||
# Near-dedup: token Jaccard >= 0.8
|
||||
tokens = set(text.lower().split())
|
||||
for ex in existing_texts:
|
||||
ex_tokens = set(ex.split())
|
||||
if tokens and ex_tokens:
|
||||
inter = len(tokens & ex_tokens)
|
||||
union = len(tokens | ex_tokens)
|
||||
if inter / union >= 0.8:
|
||||
break
|
||||
else:
|
||||
unique.append(fact)
|
||||
return unique
|
||||
|
||||
|
||||
def validate_fact(fact: dict) -> bool:
|
||||
required = ["fact", "category", "domain", "confidence"]
|
||||
for field in required:
|
||||
if field not in fact:
|
||||
return False
|
||||
if not isinstance(fact["fact"], str) or not fact["fact"].strip():
|
||||
return False
|
||||
if fact["category"] not in ["fact", "pitfall", "pattern", "tool-quirk", "question"]:
|
||||
return False
|
||||
c = fact.get("confidence", 0)
|
||||
return isinstance(c, (int, float)) and 0.0 <= c <= 1.0
|
||||
|
||||
|
||||
def write_knowledge(index: dict, new_facts: list[dict], knowledge_dir: Path):
|
||||
kdir = knowledge_dir
|
||||
kdir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for fact in new_facts:
|
||||
fact["harvested_at"] = datetime.now(timezone.utc).isoformat()
|
||||
fact["source"] = "conference-talk"
|
||||
|
||||
index["facts"].extend(new_facts)
|
||||
index["total_facts"] = len(index["facts"])
|
||||
index["last_updated"] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
# index.json
|
||||
with open(kdir / "index.json", "w", encoding="utf-8") as f:
|
||||
json.dump(index, f, indent=2, ensure_ascii=False)
|
||||
|
||||
# conferences/talks.md (human-readable)
|
||||
conf_dir = kdir / "conferences"
|
||||
conf_dir.mkdir(parents=True, exist_ok=True)
|
||||
conf_md = conf_dir / "talks.md"
|
||||
mode = "a" if conf_md.exists() else "w"
|
||||
with open(conf_md, mode, encoding="utf-8") as f:
|
||||
if mode == "w":
|
||||
f.write("# Conference Talk Knowledge\n\n")
|
||||
f.write(
|
||||
f"## {datetime.now(timezone.utc).strftime('%Y-%m-%d')} — {len(new_facts)} items\n\n"
|
||||
)
|
||||
for fact in new_facts:
|
||||
icon = {"fact": "📋", "pitfall": "⚠️", "pattern": "🔄", "tool-quirk": "🔧", "question": "❓"}.get(fact["category"], "•")
|
||||
f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n")
|
||||
if fact.get("evidence"):
|
||||
f.write(f" _Evidence: {fact['evidence']}_\n")
|
||||
f.write("\n")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Summarize conference talks into knowledge store")
|
||||
parser.add_argument("--transcript", required=True, help="Path to transcript text file")
|
||||
parser.add_argument("--conference", default="unknown", help="Conference name")
|
||||
parser.add_argument("--title", default="", help="Talk title")
|
||||
parser.add_argument("--speaker", default="", help="Speaker name(s)")
|
||||
parser.add_argument("--talk-url", default="", help="URL to talk/video")
|
||||
parser.add_argument("--domain", default="global", help="Domain: global or repo/agent name")
|
||||
parser.add_argument("--knowledge-dir", default=str(KNOWLEDGE_DIR), help="Knowledge store directory")
|
||||
parser.add_argument("--api-base", default=DEFAULT_API_BASE, help="LLM API base URL")
|
||||
parser.add_argument("--api-key", default="", help="LLM API key")
|
||||
parser.add_argument("--model", default=DEFAULT_MODEL, help="Model to use")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Preview without writing")
|
||||
parser.add_argument("--min-confidence", type=float, default=0.3, help="Minimum confidence threshold")
|
||||
args = parser.parse_args()
|
||||
|
||||
transcript_path = Path(args.transcript)
|
||||
if not transcript_path.exists():
|
||||
print(f"ERROR: Transcript not found: {transcript_path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
transcript = transcript_path.read_text(encoding="utf-8", errors="replace")
|
||||
if not transcript.strip():
|
||||
print("ERROR: Transcript is empty", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
api_key = args.api_key or DEFAULT_API_KEY or find_api_key()
|
||||
if not api_key:
|
||||
print("ERROR: No API key. Set HARVESTER_API_KEY or pass --api-key", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
prompt = load_prompt()
|
||||
print(f"Summarizing '{transcript_path.name}' domain={args.domain} conf={args.conference}")
|
||||
start = time.time()
|
||||
extracted = call_llm(prompt, transcript, args.api_base, api_key, args.model)
|
||||
if extracted is None:
|
||||
print("ERROR: LLM extraction failed", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
raw_items = extracted.get("knowledge", [])
|
||||
print(f" Raw items: {len(raw_items)}")
|
||||
valid = [f for f in raw_items if validate_fact(f) and f.get("confidence", 0) >= args.min_confidence]
|
||||
print(f" Valid: {len(valid)}")
|
||||
|
||||
if not valid:
|
||||
print("WARNING: No valid items extracted", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
kdir = Path(args.knowledge_dir)
|
||||
index = load_index(kdir)
|
||||
existing_facts = index.get("facts", [])
|
||||
new_facts = deduplicate(valid, existing_facts)
|
||||
print(f" New (non-duplicate): {len(new_facts)}")
|
||||
|
||||
if not new_facts:
|
||||
print("All items duplicated — nothing to write.")
|
||||
sys.exit(0)
|
||||
|
||||
# Assign IDs per (domain, category) sequence
|
||||
seq_counters = {}
|
||||
# Count existing for this domain
|
||||
for f in existing_facts:
|
||||
if f.get("domain") == args.domain:
|
||||
cat = f.get("category", "fact")
|
||||
key = (args.domain, cat)
|
||||
seq_counters[key] = seq_counters.get(key, 0) + 1
|
||||
# Now next sequence for each category in new_facts
|
||||
for fact in new_facts:
|
||||
cat = fact["category"]
|
||||
key = (args.domain, cat)
|
||||
next_seq = seq_counters.get(key, 0) + 1
|
||||
seq_counters[key] = next_seq
|
||||
fact["id"] = f"{args.domain}:{cat}:{next_seq:03d}"
|
||||
fact["domain"] = args.domain
|
||||
fact.setdefault("tags", []).extend([args.conference, "conference-talk"])
|
||||
fact["first_seen"] = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
fact["last_confirmed"] = fact["first_seen"]
|
||||
fact["source_count"] = 1
|
||||
fact["talk_meta"] = extracted.get("meta", {})
|
||||
|
||||
if args.dry_run:
|
||||
print("DRY RUN — items that would be added:")
|
||||
for f in new_facts:
|
||||
print(f" [{f['category']}] {f['fact'][:90]}")
|
||||
sys.exit(0)
|
||||
|
||||
write_knowledge(index, new_facts, kdir)
|
||||
print(f"✓ Stored {len(new_facts)} items to knowledge store in {time.time() - start:.1f}s")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
317
scripts/dedup.py
Normal file
317
scripts/dedup.py
Normal file
@@ -0,0 +1,317 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
dedup.py — Knowledge deduplication: content hash + semantic similarity.
|
||||
|
||||
Deduplicates harvested knowledge entries to avoid training on duplicates.
|
||||
Uses content hashing for exact matches and token overlap for near-duplicates.
|
||||
|
||||
Usage:
|
||||
python3 dedup.py --input knowledge/index.json --output knowledge/index_deduped.json
|
||||
python3 dedup.py --input knowledge/index.json --dry-run
|
||||
python3 dedup.py --test # Run built-in dedup test
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional, Tuple
|
||||
|
||||
|
||||
def normalize_text(text: str) -> str:
|
||||
"""Normalize text for hashing: lowercase, collapse whitespace, strip."""
|
||||
text = text.lower().strip()
|
||||
text = re.sub(r'\s+', ' ', text)
|
||||
return text
|
||||
|
||||
|
||||
def content_hash(text: str) -> str:
|
||||
"""SHA256 hash of normalized text for exact dedup."""
|
||||
normalized = normalize_text(text)
|
||||
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def tokenize(text: str) -> set:
|
||||
"""Simple tokenizer: lowercase words, 3+ chars."""
|
||||
words = re.findall(r'[a-z0-9_]{3,}', text.lower())
|
||||
return set(words)
|
||||
|
||||
|
||||
def token_similarity(a: str, b: str) -> float:
|
||||
"""Token-based Jaccard similarity (0.0-1.0).
|
||||
|
||||
Fast local alternative to embedding similarity.
|
||||
Good enough for near-duplicate detection.
|
||||
"""
|
||||
tokens_a = tokenize(a)
|
||||
tokens_b = tokenize(b)
|
||||
if not tokens_a or not tokens_b:
|
||||
return 0.0
|
||||
intersection = tokens_a & tokens_b
|
||||
union = tokens_a | tokens_b
|
||||
return len(intersection) / len(union)
|
||||
|
||||
|
||||
def quality_score(fact: dict) -> float:
|
||||
"""Compute quality score for merge ranking.
|
||||
|
||||
Higher is better. Factors:
|
||||
- confidence (0-1)
|
||||
- source_count (more confirmations = better)
|
||||
- has tags (richer metadata)
|
||||
"""
|
||||
confidence = fact.get('confidence', 0.5)
|
||||
source_count = fact.get('source_count', 1)
|
||||
has_tags = 1.0 if fact.get('tags') else 0.0
|
||||
has_related = 1.0 if fact.get('related') else 0.0
|
||||
|
||||
# Weighted composite
|
||||
score = (
|
||||
confidence * 0.5 +
|
||||
min(source_count / 10, 1.0) * 0.3 +
|
||||
has_tags * 0.1 +
|
||||
has_related * 0.1
|
||||
)
|
||||
return round(score, 4)
|
||||
|
||||
|
||||
def merge_facts(keep: dict, drop: dict) -> dict:
|
||||
"""Merge two near-duplicate facts, keeping higher-quality fields.
|
||||
|
||||
The 'keep' fact is enriched with metadata from 'drop'.
|
||||
"""
|
||||
# Merge tags (union)
|
||||
keep_tags = set(keep.get('tags', []))
|
||||
drop_tags = set(drop.get('tags', []))
|
||||
keep['tags'] = sorted(keep_tags | drop_tags)
|
||||
|
||||
# Merge related (union)
|
||||
keep_related = set(keep.get('related', []))
|
||||
drop_related = set(drop.get('related', []))
|
||||
keep['related'] = sorted(keep_related | drop_related)
|
||||
|
||||
# Update source_count (sum)
|
||||
keep['source_count'] = keep.get('source_count', 1) + drop.get('source_count', 1)
|
||||
|
||||
# Update confidence (max — we've now seen it from multiple sources)
|
||||
keep['confidence'] = max(keep.get('confidence', 0), drop.get('confidence', 0))
|
||||
|
||||
# Track that we merged
|
||||
if '_merged_from' not in keep:
|
||||
keep['_merged_from'] = []
|
||||
keep['_merged_from'].append(drop.get('id', 'unknown'))
|
||||
|
||||
return keep
|
||||
|
||||
|
||||
def dedup_facts(
|
||||
facts: List[dict],
|
||||
exact_threshold: float = 1.0,
|
||||
near_threshold: float = 0.95,
|
||||
dry_run: bool = False,
|
||||
) -> Tuple[List[dict], dict]:
|
||||
"""Deduplicate a list of knowledge facts.
|
||||
|
||||
Args:
|
||||
facts: List of fact dicts (from index.json)
|
||||
exact_threshold: Hash match = exact duplicate
|
||||
near_threshold: Token similarity above this = near-duplicate
|
||||
dry_run: If True, don't modify, just report
|
||||
|
||||
Returns:
|
||||
(deduped_facts, stats_dict)
|
||||
"""
|
||||
if not facts:
|
||||
return [], {"total": 0, "exact_dupes": 0, "near_dupes": 0, "unique": 0}
|
||||
|
||||
# Phase 1: Exact dedup by content hash
|
||||
hash_seen = {} # hash -> index in deduped list
|
||||
exact_dupes = 0
|
||||
deduped = []
|
||||
|
||||
for fact in facts:
|
||||
text = fact.get('fact', '')
|
||||
h = content_hash(text)
|
||||
|
||||
if h in hash_seen:
|
||||
# Exact duplicate — merge metadata into existing
|
||||
existing_idx = hash_seen[h]
|
||||
if not dry_run:
|
||||
deduped[existing_idx] = merge_facts(deduped[existing_idx], fact)
|
||||
exact_dupes += 1
|
||||
else:
|
||||
hash_seen[h] = len(deduped)
|
||||
deduped.append(fact)
|
||||
|
||||
# Phase 2: Near-dup by token similarity
|
||||
near_dupes = 0
|
||||
i = 0
|
||||
while i < len(deduped):
|
||||
j = i + 1
|
||||
while j < len(deduped):
|
||||
sim = token_similarity(deduped[i].get('fact', ''), deduped[j].get('fact', ''))
|
||||
if sim >= near_threshold:
|
||||
# Near-duplicate — keep higher quality
|
||||
q_i = quality_score(deduped[i])
|
||||
q_j = quality_score(deduped[j])
|
||||
if q_i >= q_j:
|
||||
if not dry_run:
|
||||
deduped[i] = merge_facts(deduped[i], deduped[j])
|
||||
deduped.pop(j)
|
||||
else:
|
||||
# j is higher quality — merge i into j, then remove i
|
||||
if not dry_run:
|
||||
deduped[j] = merge_facts(deduped[j], deduped[i])
|
||||
deduped.pop(i)
|
||||
break # i changed, restart inner loop
|
||||
near_dupes += 1
|
||||
else:
|
||||
j += 1
|
||||
i += 1
|
||||
|
||||
stats = {
|
||||
"total": len(facts),
|
||||
"exact_dupes": exact_dupes,
|
||||
"near_dupes": near_dupes,
|
||||
"unique": len(deduped),
|
||||
"removed": len(facts) - len(deduped),
|
||||
}
|
||||
|
||||
return deduped, stats
|
||||
|
||||
|
||||
def dedup_index_file(
|
||||
input_path: str,
|
||||
output_path: Optional[str] = None,
|
||||
near_threshold: float = 0.95,
|
||||
dry_run: bool = False,
|
||||
) -> dict:
|
||||
"""Deduplicate an index.json file.
|
||||
|
||||
Args:
|
||||
input_path: Path to index.json
|
||||
output_path: Where to write deduped file (default: overwrite input)
|
||||
near_threshold: Token similarity threshold for near-dupes
|
||||
dry_run: Report only, don't write
|
||||
|
||||
Returns stats dict.
|
||||
"""
|
||||
path = Path(input_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Index file not found: {input_path}")
|
||||
|
||||
with open(path) as f:
|
||||
data = json.load(f)
|
||||
|
||||
facts = data.get('facts', [])
|
||||
deduped, stats = dedup_facts(facts, near_threshold=near_threshold, dry_run=dry_run)
|
||||
|
||||
if not dry_run:
|
||||
data['facts'] = deduped
|
||||
data['total_facts'] = len(deduped)
|
||||
data['last_dedup'] = __import__('datetime').datetime.now(
|
||||
__import__('datetime').timezone.utc
|
||||
).isoformat()
|
||||
|
||||
out_path = Path(output_path) if output_path else path
|
||||
with open(out_path, 'w') as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
def generate_test_duplicates(n: int = 20) -> List[dict]:
|
||||
"""Generate test facts with intentional duplicates for testing.
|
||||
|
||||
Creates n unique facts plus n/4 exact dupes and n/4 near-dupes.
|
||||
"""
|
||||
import random
|
||||
random.seed(42)
|
||||
|
||||
unique_facts = []
|
||||
for i in range(n):
|
||||
topic = random.choice(["git", "python", "docker", "rust", "nginx"])
|
||||
tip = random.choice(["use verbose flags", "check logs first", "restart service", "clear cache", "update config"])
|
||||
unique_facts.append({
|
||||
"id": f"test:fact:{i:03d}",
|
||||
"fact": f"When working with {topic}, always {tip} before deploying.",
|
||||
"category": "fact",
|
||||
"domain": "test",
|
||||
"confidence": round(random.uniform(0.5, 1.0), 2),
|
||||
"source_count": random.randint(1, 5),
|
||||
"tags": [topic, "test"],
|
||||
})
|
||||
|
||||
# Add exact duplicates (same text, different IDs)
|
||||
duped = list(unique_facts)
|
||||
for i in range(n // 4):
|
||||
original = unique_facts[i]
|
||||
dupe = dict(original)
|
||||
dupe["id"] = f"test:fact:dup{i:03d}"
|
||||
dupe["confidence"] = round(random.uniform(0.3, 0.8), 2)
|
||||
duped.append(dupe)
|
||||
|
||||
# Add near-duplicates (slightly different phrasing)
|
||||
for i in range(n // 4):
|
||||
original = unique_facts[i]
|
||||
near = dict(original)
|
||||
near["id"] = f"test:fact:near{i:03d}"
|
||||
near["fact"] = original["fact"].replace("always", "should").replace("before deploying", "prior to deployment")
|
||||
near["confidence"] = round(random.uniform(0.4, 0.9), 2)
|
||||
duped.append(near)
|
||||
|
||||
return duped
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Knowledge deduplication")
|
||||
parser.add_argument("--input", help="Path to index.json")
|
||||
parser.add_argument("--output", help="Output path (default: overwrite input)")
|
||||
parser.add_argument("--threshold", type=float, default=0.95,
|
||||
help="Near-dup similarity threshold (default: 0.95)")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Report only, don't write")
|
||||
parser.add_argument("--test", action="store_true", help="Run built-in dedup test")
|
||||
parser.add_argument("--json", action="store_true", help="JSON output")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.test:
|
||||
test_facts = generate_test_duplicates(20)
|
||||
print(f"Generated {len(test_facts)} test facts (20 unique + dupes)")
|
||||
deduped, stats = dedup_facts(test_facts, near_threshold=args.threshold)
|
||||
print(f"\nDedup results:")
|
||||
print(f" Total input: {stats['total']}")
|
||||
print(f" Exact dupes: {stats['exact_dupes']}")
|
||||
print(f" Near dupes: {stats['near_dupes']}")
|
||||
print(f" Unique output: {stats['unique']}")
|
||||
print(f" Removed: {stats['removed']}")
|
||||
|
||||
# Verify: should have ~20 unique (some merged)
|
||||
assert stats['unique'] <= 20, f"Too many unique: {stats['unique']} > 20"
|
||||
assert stats['unique'] >= 15, f"Too few unique: {stats['unique']} < 15"
|
||||
assert stats['removed'] > 0, "No duplicates removed"
|
||||
print("\nOK: Dedup test passed")
|
||||
return
|
||||
|
||||
if not args.input:
|
||||
print("ERROR: Provide --input or --test")
|
||||
sys.exit(1)
|
||||
|
||||
stats = dedup_index_file(args.input, args.output, args.threshold, args.dry_run)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(stats, indent=2))
|
||||
else:
|
||||
print(f"Dedup results:")
|
||||
print(f" Total input: {stats['total']}")
|
||||
print(f" Exact dupes: {stats['exact_dupes']}")
|
||||
print(f" Near dupes: {stats['near_dupes']}")
|
||||
print(f" Unique output: {stats['unique']}")
|
||||
print(f" Removed: {stats['removed']}")
|
||||
if args.dry_run:
|
||||
print(" (dry run — no changes written)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -149,8 +149,8 @@ def to_dot(graph: dict) -> str:
|
||||
"""Generate DOT format output."""
|
||||
lines = ["digraph dependencies {"]
|
||||
lines.append(" rankdir=LR;")
|
||||
lines.append(" node [shape=box, style=filled, fillcolor="#1a1a2e", fontcolor="#e6edf3"];")
|
||||
lines.append(" edge [color="#4a4a6a"];")
|
||||
lines.append(' node [shape=box, style=filled, fillcolor="#1a1a2e", fontcolor="#e6edf3"];')
|
||||
lines.append(' edge [color="#4a4a6a"];')
|
||||
lines.append("")
|
||||
|
||||
for repo, data in sorted(graph.items()):
|
||||
|
||||
387
scripts/freshness.py
Normal file
387
scripts/freshness.py
Normal file
@@ -0,0 +1,387 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Knowledge Freshness Cron — Detect stale entries from code changes (Issue #200)
|
||||
|
||||
Automatically detects when knowledge entries become stale due to code changes.
|
||||
|
||||
Detection Method:
|
||||
1. Track source file hash alongside knowledge entry
|
||||
2. Compare current file hashes vs stored
|
||||
3. Mismatch → flag entry as potentially stale
|
||||
4. Report stale entries and optionally re-extract
|
||||
|
||||
Usage:
|
||||
python3 scripts/freshness.py --knowledge-dir knowledge/
|
||||
python3 scripts/freshness.py --knowledge-dir knowledge/ --json
|
||||
python3 scripts/freshness.py --knowledge-dir knowledge/ --repo /path/to/repo
|
||||
python3 scripts/freshness.py --knowledge-dir knowledge/ --auto-reextract
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import yaml
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any, Optional, Tuple
|
||||
|
||||
|
||||
def compute_file_hash(filepath: str) -> Optional[str]:
|
||||
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
|
||||
try:
|
||||
with open(filepath, "rb") as f:
|
||||
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
|
||||
except (FileNotFoundError, IsADirectoryError, PermissionError):
|
||||
return None
|
||||
|
||||
|
||||
def get_git_file_changes(repo_path: str, days: int = 1) -> Dict[str, List[str]]:
|
||||
"""
|
||||
Get files changed in git in the last N days.
|
||||
|
||||
Returns dict with 'modified', 'added', 'deleted' lists of file paths.
|
||||
"""
|
||||
changes = {"modified": [], "added": [], "deleted": []}
|
||||
|
||||
try:
|
||||
# Get commits from last N days
|
||||
cmd = [
|
||||
"git", "-C", repo_path, "log",
|
||||
f"--since={days} days ago",
|
||||
"--name-status",
|
||||
"--pretty=format:",
|
||||
"--diff-filter=MAD"
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
|
||||
if result.returncode != 0:
|
||||
return changes
|
||||
|
||||
for line in result.stdout.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
parts = line.split('\t', 1)
|
||||
if len(parts) != 2:
|
||||
continue
|
||||
|
||||
status, filepath = parts
|
||||
if status == 'M':
|
||||
changes["modified"].append(filepath)
|
||||
elif status == 'A':
|
||||
changes["added"].append(filepath)
|
||||
elif status == 'D':
|
||||
changes["deleted"].append(filepath)
|
||||
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
pass
|
||||
|
||||
# Deduplicate
|
||||
for key in changes:
|
||||
changes[key] = list(set(changes[key]))
|
||||
|
||||
return changes
|
||||
|
||||
|
||||
def load_knowledge_entries(knowledge_dir: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Load knowledge entries from YAML files in the knowledge directory.
|
||||
|
||||
Supports:
|
||||
- knowledge/index.json (legacy format)
|
||||
- knowledge/global/*.yaml
|
||||
- knowledge/repos/*.yaml
|
||||
- knowledge/agents/*.yaml
|
||||
"""
|
||||
entries = []
|
||||
|
||||
# Load from index.json if exists
|
||||
index_path = os.path.join(knowledge_dir, "index.json")
|
||||
if os.path.exists(index_path):
|
||||
try:
|
||||
with open(index_path) as f:
|
||||
data = json.load(f)
|
||||
for fact in data.get("facts", []):
|
||||
entries.append({
|
||||
"source": "index.json",
|
||||
"fact": fact.get("fact", ""),
|
||||
"source_file": fact.get("source_file"),
|
||||
"source_hash": fact.get("source_hash"),
|
||||
"category": fact.get("category", "unknown"),
|
||||
"confidence": fact.get("confidence", 0.5)
|
||||
})
|
||||
except (json.JSONDecodeError, KeyError):
|
||||
pass
|
||||
|
||||
# Load from YAML files
|
||||
for subdir in ["global", "repos", "agents"]:
|
||||
subdir_path = os.path.join(knowledge_dir, subdir)
|
||||
if not os.path.isdir(subdir_path):
|
||||
continue
|
||||
|
||||
for filename in os.listdir(subdir_path):
|
||||
if not filename.endswith((".yaml", ".yml")):
|
||||
continue
|
||||
|
||||
filepath = os.path.join(subdir_path, filename)
|
||||
try:
|
||||
with open(filepath) as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
if not data or not isinstance(data, dict):
|
||||
continue
|
||||
|
||||
# Extract entries from YAML structure
|
||||
for key, value in data.items():
|
||||
if isinstance(value, list):
|
||||
for item in value:
|
||||
if isinstance(item, dict):
|
||||
entries.append({
|
||||
"source": f"{subdir}/{filename}",
|
||||
"fact": item.get("description", item.get("fact", "")),
|
||||
"source_file": item.get("source_file"),
|
||||
"source_hash": item.get("source_hash"),
|
||||
"category": item.get("category", "unknown"),
|
||||
"confidence": item.get("confidence", 0.5)
|
||||
})
|
||||
elif isinstance(value, dict):
|
||||
entries.append({
|
||||
"source": f"{subdir}/{filename}",
|
||||
"fact": value.get("description", value.get("fact", "")),
|
||||
"source_file": value.get("source_file"),
|
||||
"source_hash": value.get("source_hash"),
|
||||
"category": value.get("category", "unknown"),
|
||||
"confidence": value.get("confidence", 0.5)
|
||||
})
|
||||
except (yaml.YAMLError, IOError):
|
||||
pass
|
||||
|
||||
return entries
|
||||
|
||||
|
||||
def check_freshness(knowledge_dir: str, repo_root: str = ".",
|
||||
days: int = 1) -> Dict[str, Any]:
|
||||
"""
|
||||
Check freshness of knowledge entries against recent code changes.
|
||||
|
||||
Returns:
|
||||
{
|
||||
"timestamp": ISO timestamp,
|
||||
"total_entries": int,
|
||||
"stale_entries": [...],
|
||||
"fresh_entries": [...],
|
||||
"git_changes": {...},
|
||||
"summary": {...}
|
||||
}
|
||||
"""
|
||||
entries = load_knowledge_entries(knowledge_dir)
|
||||
git_changes = get_git_file_changes(repo_root, days)
|
||||
|
||||
stale_entries = []
|
||||
fresh_entries = []
|
||||
|
||||
for entry in entries:
|
||||
source_file = entry.get("source_file")
|
||||
if not source_file:
|
||||
# Entry without source file reference
|
||||
fresh_entries.append({**entry, "status": "no_source"})
|
||||
continue
|
||||
|
||||
# Check if source file was recently modified
|
||||
is_stale = False
|
||||
reason = ""
|
||||
|
||||
if source_file in git_changes["modified"]:
|
||||
is_stale = True
|
||||
reason = "source_modified"
|
||||
elif source_file in git_changes["deleted"]:
|
||||
is_stale = True
|
||||
reason = "source_deleted"
|
||||
elif source_file in git_changes["added"]:
|
||||
is_stale = True
|
||||
reason = "source_added"
|
||||
|
||||
# Also check hash if available
|
||||
stored_hash = entry.get("source_hash")
|
||||
if stored_hash:
|
||||
full_path = os.path.join(repo_root, source_file)
|
||||
current_hash = compute_file_hash(full_path)
|
||||
|
||||
if current_hash is None:
|
||||
is_stale = True
|
||||
reason = "source_missing"
|
||||
elif current_hash != stored_hash:
|
||||
is_stale = True
|
||||
reason = "hash_mismatch"
|
||||
|
||||
if is_stale:
|
||||
stale_entries.append({
|
||||
**entry,
|
||||
"status": "stale",
|
||||
"reason": reason
|
||||
})
|
||||
else:
|
||||
fresh_entries.append({**entry, "status": "fresh"})
|
||||
|
||||
# Compute summary
|
||||
total = len(entries)
|
||||
stale_count = len(stale_entries)
|
||||
fresh_count = len(fresh_entries)
|
||||
|
||||
# Group stale entries by reason
|
||||
stale_by_reason = {}
|
||||
for entry in stale_entries:
|
||||
reason = entry.get("reason", "unknown")
|
||||
if reason not in stale_by_reason:
|
||||
stale_by_reason[reason] = 0
|
||||
stale_by_reason[reason] += 1
|
||||
|
||||
return {
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"total_entries": total,
|
||||
"stale_entries": stale_entries,
|
||||
"fresh_entries": fresh_entries,
|
||||
"git_changes": git_changes,
|
||||
"summary": {
|
||||
"total": total,
|
||||
"stale": stale_count,
|
||||
"fresh": fresh_count,
|
||||
"stale_percentage": round(stale_count / total * 100, 1) if total > 0 else 0,
|
||||
"stale_by_reason": stale_by_reason,
|
||||
"git_changes_summary": {
|
||||
"modified": len(git_changes["modified"]),
|
||||
"added": len(git_changes["added"]),
|
||||
"deleted": len(git_changes["deleted"])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def update_stale_hashes(knowledge_dir: str, repo_root: str = ".") -> int:
|
||||
"""
|
||||
Update hashes for stale entries. Returns count of updated entries.
|
||||
"""
|
||||
entries = load_knowledge_entries(knowledge_dir)
|
||||
updated = 0
|
||||
|
||||
# This is a simplified version - in practice, you'd need to
|
||||
# write back to the specific YAML files
|
||||
for entry in entries:
|
||||
source_file = entry.get("source_file")
|
||||
if not source_file:
|
||||
continue
|
||||
|
||||
full_path = os.path.join(repo_root, source_file)
|
||||
current_hash = compute_file_hash(full_path)
|
||||
|
||||
if current_hash and entry.get("source_hash") != current_hash:
|
||||
# Mark for update (in practice, you'd write back to the file)
|
||||
updated += 1
|
||||
|
||||
return updated
|
||||
|
||||
|
||||
def format_report(result: Dict[str, Any], max_items: int = 20) -> str:
|
||||
"""Format freshness check results as a human-readable report."""
|
||||
timestamp = result["timestamp"]
|
||||
summary = result["summary"]
|
||||
stale_entries = result["stale_entries"]
|
||||
git_changes = result["git_changes"]
|
||||
|
||||
lines = [
|
||||
"Knowledge Freshness Report",
|
||||
"=" * 50,
|
||||
f"Generated: {timestamp}",
|
||||
f"Total entries: {summary['total']}",
|
||||
f"Stale entries: {summary['stale']} ({summary['stale_percentage']}%)",
|
||||
f"Fresh entries: {summary['fresh']}",
|
||||
""
|
||||
]
|
||||
|
||||
# Git changes summary
|
||||
lines.extend([
|
||||
"Git Changes (last 24h):",
|
||||
f" Modified: {len(git_changes['modified'])} files",
|
||||
f" Added: {len(git_changes['added'])} files",
|
||||
f" Deleted: {len(git_changes['deleted'])} files",
|
||||
""
|
||||
])
|
||||
|
||||
# Stale entries by reason
|
||||
if summary.get("stale_by_reason"):
|
||||
lines.extend([
|
||||
"Stale Entries by Reason:",
|
||||
""
|
||||
])
|
||||
for reason, count in summary["stale_by_reason"].items():
|
||||
lines.append(f" {reason}: {count}")
|
||||
lines.append("")
|
||||
|
||||
# List stale entries
|
||||
if stale_entries:
|
||||
lines.extend([
|
||||
"Stale Entries:",
|
||||
""
|
||||
])
|
||||
for i, entry in enumerate(stale_entries[:max_items], 1):
|
||||
source = entry.get("source_file", "?")
|
||||
reason = entry.get("reason", "unknown")
|
||||
fact = entry.get("fact", "")[:60]
|
||||
lines.append(f"{i:2d}. [{reason}] {source}")
|
||||
if fact:
|
||||
lines.append(f" {fact}")
|
||||
|
||||
if len(stale_entries) > max_items:
|
||||
lines.append(f"\n... and {len(stale_entries) - max_items} more")
|
||||
else:
|
||||
lines.append("No stale entries found. All knowledge is fresh!")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Knowledge Freshness Cron — detect stale entries from code changes")
|
||||
parser.add_argument("--knowledge-dir", required=True,
|
||||
help="Path to knowledge directory")
|
||||
parser.add_argument("--repo", default=".",
|
||||
help="Path to repository for git change detection")
|
||||
parser.add_argument("--days", type=int, default=1,
|
||||
help="Number of days to check for git changes (default: 1)")
|
||||
parser.add_argument("--json", action="store_true",
|
||||
help="Output as JSON instead of human-readable")
|
||||
parser.add_argument("--max", type=int, default=20,
|
||||
help="Maximum stale entries to show (default: 20)")
|
||||
parser.add_argument("--auto-reextract", action="store_true",
|
||||
help="Auto-re-extract knowledge for stale entries")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not os.path.isdir(args.knowledge_dir):
|
||||
print(f"Error: {args.knowledge_dir} is not a directory", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not os.path.isdir(args.repo):
|
||||
print(f"Error: {args.repo} is not a directory", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
result = check_freshness(args.knowledge_dir, args.repo, args.days)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(result, indent=2))
|
||||
else:
|
||||
print(format_report(result, args.max))
|
||||
|
||||
# Auto-re-extract if requested
|
||||
if args.auto_reextract and result["stale_entries"]:
|
||||
print(f"\nAuto-re-extracting {len(result['stale_entries'])} stale entries...")
|
||||
# In a real implementation, this would call the harvester
|
||||
print("(Auto-re-extraction not yet implemented)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -113,7 +113,7 @@ def find_slow_tests_by_scan(repo_path: str) -> List[Bottleneck]:
|
||||
(r"time\.sleep\((\d+(?:\.\d+)?)\)", "Contains time.sleep() — consider using mock or async wait"),
|
||||
(r"subprocess\.run\(.*timeout=(\d+)", "Subprocess with timeout — may block test"),
|
||||
(r"requests\.(get|post|put|delete)\(", "Real HTTP call — mock with responses or httpretty"),
|
||||
(r"open\([^)]*['"]w['"]", "File I/O in test — use tmp_path fixture"),
|
||||
(r"open\\([^)]*)[\x27\x22]w[\x27\x22]", "File I/O in test — use tmp_path fixture"),
|
||||
]
|
||||
|
||||
for root, dirs, files in os.walk(repo_path):
|
||||
@@ -506,8 +506,8 @@ def format_markdown(report: PerfReport) -> str:
|
||||
lines.append(f"- {icon} {b.name}{loc} — ~{b.duration_s:.1f}s — {b.recommendation}")
|
||||
lines.append(f"")
|
||||
|
||||
return "
|
||||
".join(lines)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────
|
||||
@@ -521,8 +521,8 @@ def main():
|
||||
help="Slow test threshold in seconds")
|
||||
args = parser.parse_args()
|
||||
|
||||
global SLOW_TEST_THRESHOLD_S
|
||||
SLOW_TEST_THRESHOLD_S = args.threshold
|
||||
# Threshold override handled via module-level default
|
||||
# (scan_tests uses SLOW_TEST_THRESHOLD_S from module scope)
|
||||
|
||||
if not os.path.isdir(args.repo):
|
||||
print(f"Error: {args.repo} is not a directory", file=sys.stderr)
|
||||
|
||||
@@ -10,37 +10,273 @@ Usage:
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import ast
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, Tuple
|
||||
|
||||
|
||||
def generate_proposals():
|
||||
"""Generate sample proposals for this engine."""
|
||||
# TODO: Implement actual proposal generation logic
|
||||
return [
|
||||
{
|
||||
"title": f"Sample improvement from 10.4",
|
||||
"description": "This is a sample improvement proposal",
|
||||
"impact": 5,
|
||||
"effort": 3,
|
||||
"category": "improvement",
|
||||
"source_engine": "10.4",
|
||||
"timestamp": datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
]
|
||||
# ── Data Classes ────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class FileMetrics:
|
||||
"""Metrics for a single source file."""
|
||||
path: str
|
||||
lines: int = 0
|
||||
complexity: float = 0.0
|
||||
max_complexity: int = 0
|
||||
functions: int = 0
|
||||
classes: int = 0
|
||||
churn_30d: int = 0
|
||||
churn_90d: int = 0
|
||||
test_coverage: Optional[float] = None
|
||||
refactoring_score: float = 0.0
|
||||
|
||||
|
||||
# ── Complexity Analysis ─────────────────────────────────────────────────
|
||||
|
||||
class ComplexityVisitor(ast.NodeVisitor):
|
||||
"""AST visitor that computes cyclomatic complexity per function."""
|
||||
|
||||
def __init__(self):
|
||||
self.complexities = []
|
||||
self.function_count = 0
|
||||
self.class_count = 0
|
||||
self._current_complexity = 0
|
||||
self._in_function = False
|
||||
|
||||
def visit_FunctionDef(self, node):
|
||||
self.function_count += 1
|
||||
old_complexity = self._current_complexity
|
||||
old_in_function = self._in_function
|
||||
self._current_complexity = 1 # Base complexity
|
||||
self._in_function = True
|
||||
|
||||
self.generic_visit(node)
|
||||
|
||||
self.complexities.append(self._current_complexity)
|
||||
self._current_complexity = old_complexity
|
||||
self._in_function = old_in_function
|
||||
|
||||
visit_AsyncFunctionDef = visit_FunctionDef
|
||||
|
||||
def visit_ClassDef(self, node):
|
||||
self.class_count += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_If(self, node):
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_For(self, node):
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
visit_AsyncFor = visit_For
|
||||
|
||||
def visit_While(self, node):
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_ExceptHandler(self, node):
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_With(self, node):
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
visit_AsyncWith = visit_With
|
||||
|
||||
def visit_Assert(self, node):
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_BoolOp(self, node):
|
||||
# Each 'and'/'or' adds a branch
|
||||
if self._in_function:
|
||||
self._current_complexity += len(node.values) - 1
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_IfExp(self, node):
|
||||
# Ternary expression
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
|
||||
def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]:
|
||||
"""
|
||||
Compute cyclomatic complexity for a Python file.
|
||||
|
||||
Returns:
|
||||
(avg_complexity, max_complexity, function_count, class_count, line_count)
|
||||
"""
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
||||
source = f.read()
|
||||
except (IOError, OSError):
|
||||
return 0.0, 0, 0, 0, 0
|
||||
|
||||
try:
|
||||
tree = ast.parse(source, filename=filepath)
|
||||
except SyntaxError:
|
||||
return 0.0, 0, 0, 0, 0
|
||||
|
||||
visitor = ComplexityVisitor()
|
||||
visitor.visit(tree)
|
||||
|
||||
line_count = len(source.splitlines())
|
||||
|
||||
if not visitor.complexities:
|
||||
# No functions, but might have classes
|
||||
return 0.0, 0, visitor.function_count, visitor.class_count, line_count
|
||||
|
||||
avg = sum(visitor.complexities) / len(visitor.complexities)
|
||||
max_c = max(visitor.complexities)
|
||||
|
||||
return avg, max_c, visitor.function_count, visitor.class_count, line_count
|
||||
|
||||
|
||||
# ── Refactoring Score ───────────────────────────────────────────────────
|
||||
|
||||
def calculate_refactoring_score(metrics: FileMetrics) -> float:
|
||||
"""
|
||||
Calculate a refactoring priority score (0-100) based on file metrics.
|
||||
|
||||
Higher score = higher priority for refactoring.
|
||||
Components:
|
||||
- Complexity (0-30 points): higher avg/max complexity = higher score
|
||||
- Size (0-20 points): larger files = higher score
|
||||
- Churn (0-30 points): more changes recently = higher score
|
||||
- Coverage (0-20 points): lower test coverage = higher score
|
||||
"""
|
||||
score = 0.0
|
||||
|
||||
# Complexity component (0-30)
|
||||
# avg=10+ or max=20+ → 30 points
|
||||
complexity_score = min(30.0, (metrics.complexity * 2) + (metrics.max_complexity * 0.5))
|
||||
score += max(0.0, complexity_score)
|
||||
|
||||
# Size component (0-20)
|
||||
# 500+ lines → 20 points
|
||||
size_score = min(20.0, metrics.lines / 25.0)
|
||||
score += max(0.0, size_score)
|
||||
|
||||
# Churn component (0-30)
|
||||
# Weighted: recent churn (30d) counts more than older (90d)
|
||||
churn_score = min(30.0, (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5))
|
||||
score += max(0.0, churn_score)
|
||||
|
||||
# Coverage component (0-20)
|
||||
# Lower coverage → higher score
|
||||
if metrics.test_coverage is not None:
|
||||
# coverage=0 → 20 points, coverage=1 → 0 points
|
||||
coverage_score = (1.0 - metrics.test_coverage) * 20.0
|
||||
else:
|
||||
# No data → assume medium risk (10 points)
|
||||
coverage_score = 10.0
|
||||
score += max(0.0, coverage_score)
|
||||
|
||||
return min(100.0, max(0.0, score))
|
||||
|
||||
|
||||
# ── Proposal Generation ─────────────────────────────────────────────────
|
||||
|
||||
def scan_directory(directory: str, extensions: tuple = ('.py',)) -> list:
|
||||
"""Scan directory for source files."""
|
||||
files = []
|
||||
for root, dirs, filenames in os.walk(directory):
|
||||
# Skip hidden dirs and common non-source dirs
|
||||
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in (
|
||||
'__pycache__', 'node_modules', 'venv', '.venv', 'env',
|
||||
'build', 'dist', '.git', '.tox'
|
||||
)]
|
||||
for fname in filenames:
|
||||
if any(fname.endswith(ext) for ext in extensions):
|
||||
files.append(os.path.join(root, fname))
|
||||
return files
|
||||
|
||||
|
||||
def generate_proposals(directory: str = '.', min_score: float = 30.0) -> list:
|
||||
"""Generate refactoring proposals by analyzing source files."""
|
||||
proposals = []
|
||||
files = scan_directory(directory)
|
||||
|
||||
for filepath in files:
|
||||
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
|
||||
|
||||
if funcs == 0 and classes == 0:
|
||||
continue
|
||||
|
||||
metrics = FileMetrics(
|
||||
path=filepath,
|
||||
lines=lines,
|
||||
complexity=avg,
|
||||
max_complexity=max_c,
|
||||
functions=funcs,
|
||||
classes=classes
|
||||
)
|
||||
score = calculate_refactoring_score(metrics)
|
||||
metrics.refactoring_score = score
|
||||
|
||||
if score >= min_score:
|
||||
reasons = []
|
||||
if max_c > 10:
|
||||
reasons.append(f"high max complexity ({max_c})")
|
||||
if avg > 5:
|
||||
reasons.append(f"high avg complexity ({avg:.1f})")
|
||||
if lines > 300:
|
||||
reasons.append(f"large file ({lines} lines)")
|
||||
|
||||
proposals.append({
|
||||
"title": f"Refactor {os.path.basename(filepath)} (score: {score:.0f})",
|
||||
"description": f"{filepath}: {', '.join(reasons) if reasons else 'general improvement candidate'}",
|
||||
"impact": min(10, int(score / 10)),
|
||||
"effort": min(10, max(1, int(max_c / 3))),
|
||||
"category": "refactoring",
|
||||
"source_engine": "10.4",
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"metrics": {
|
||||
"path": filepath,
|
||||
"score": round(score, 2),
|
||||
"avg_complexity": round(avg, 2),
|
||||
"max_complexity": max_c,
|
||||
"lines": lines,
|
||||
"functions": funcs,
|
||||
"classes": classes
|
||||
}
|
||||
})
|
||||
|
||||
# Sort by score descending
|
||||
proposals.sort(key=lambda p: p.get('metrics', {}).get('score', 0), reverse=True)
|
||||
return proposals
|
||||
|
||||
|
||||
# ── CLI ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases")
|
||||
parser.add_argument("--output", required=True, help="Output file for proposals")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
|
||||
|
||||
parser.add_argument("--directory", default=".", help="Directory to scan")
|
||||
parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
proposals = generate_proposals()
|
||||
|
||||
|
||||
proposals = generate_proposals(args.directory, args.min_score)
|
||||
|
||||
if not args.dry_run:
|
||||
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
|
||||
with open(args.output, "w") as f:
|
||||
json.dump({"proposals": proposals}, f, indent=2)
|
||||
print(f"Generated {len(proposals)} proposals -> {args.output}")
|
||||
|
||||
@@ -1,212 +1,72 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Comprehensive test script for knowledge extraction prompt.
|
||||
Validates prompt structure, requirements, and consistency.
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
"""Comprehensive tests for knowledge extraction prompt."""
|
||||
import json, re
|
||||
from pathlib import Path
|
||||
|
||||
def test_prompt_structure():
|
||||
"""Test that the prompt has the required structure."""
|
||||
prompt_path = Path("templates/harvest-prompt.md")
|
||||
if not prompt_path.exists():
|
||||
return False, "harvest-prompt.md not found"
|
||||
|
||||
content = prompt_path.read_text()
|
||||
|
||||
# Check for required sections
|
||||
required_sections = [
|
||||
"System Prompt",
|
||||
"Instructions",
|
||||
"Categories",
|
||||
"Output Format",
|
||||
"Confidence Scoring",
|
||||
"Constraints",
|
||||
"Example"
|
||||
]
|
||||
|
||||
for section in required_sections:
|
||||
if section.lower() not in content.lower():
|
||||
return False, f"Missing required section: {section}"
|
||||
|
||||
# Check for required categories
|
||||
required_categories = ["fact", "pitfall", "pattern", "tool-quirk", "question"]
|
||||
for category in required_categories:
|
||||
if category not in content:
|
||||
return False, f"Missing required category: {category}"
|
||||
|
||||
# Check for required output fields
|
||||
required_fields = ["fact", "category", "repo", "confidence"]
|
||||
for field in required_fields:
|
||||
if field not in content:
|
||||
return False, f"Missing required output field: {field}"
|
||||
|
||||
# Check prompt size (should be ~1k tokens, roughly 4k chars)
|
||||
if len(content) > 5000:
|
||||
return False, f"Prompt too large: {len(content)} chars (max ~5000)"
|
||||
|
||||
if len(content) < 1000:
|
||||
return False, f"Prompt too small: {len(content)} chars (min ~1000)"
|
||||
|
||||
def check_prompt_structure():
|
||||
p = Path("templates/harvest-prompt.md")
|
||||
if not p.exists(): return False, "harvest-prompt.md not found"
|
||||
c = p.read_text()
|
||||
for s in ["System Prompt","Instructions","Categories","Output Format","Confidence Scoring","Constraints","Example"]:
|
||||
if s.lower() not in c.lower(): return False, f"Missing section: {s}"
|
||||
for cat in ["fact","pitfall","pattern","tool-quirk","question"]:
|
||||
if cat not in c: return False, f"Missing category: {cat}"
|
||||
if len(c) > 5000: return False, f"Too large: {len(c)}"
|
||||
if len(c) < 1000: return False, f"Too small: {len(c)}"
|
||||
return True, "Prompt structure is valid"
|
||||
|
||||
def check_confidence_scoring():
|
||||
c = Path("templates/harvest-prompt.md").read_text()
|
||||
for l in ["0.9-1.0","0.7-0.8","0.5-0.6","0.3-0.4","0.1-0.2"]:
|
||||
if l not in c: return False, f"Missing level: {l}"
|
||||
return True, "Confidence scoring defined"
|
||||
|
||||
def check_example_quality():
|
||||
c = Path("templates/harvest-prompt.md").read_text()
|
||||
if "example" not in c.lower(): return False, "No examples"
|
||||
m = re.search(r'"knowledge"', c[c.lower().find("example"):])
|
||||
if not m: return False, "No JSON example"
|
||||
return True, "Examples present"
|
||||
|
||||
def check_constraint_coverage():
|
||||
c = Path("templates/harvest-prompt.md").read_text()
|
||||
for x in ["no hallucination","explicitly","partial","failed sessions"]:
|
||||
if x not in c.lower(): return False, f"Missing: {x}"
|
||||
return True, "Constraints covered"
|
||||
|
||||
def check_test_sessions():
|
||||
d = Path("test_sessions")
|
||||
if not d.exists(): return False, "test_sessions/ not found"
|
||||
files = list(d.glob("*.jsonl"))
|
||||
if len(files) < 5: return False, f"Only {len(files)} sessions"
|
||||
for f in files:
|
||||
for i, line in enumerate(f.read_text().strip().split("\n"), 1):
|
||||
try: json.loads(line)
|
||||
except json.JSONDecodeError as e: return False, f"{f.name}:{i}: {e}"
|
||||
return True, f"{len(files)} valid sessions"
|
||||
|
||||
def test_prompt_structure():
|
||||
passed, msg = check_prompt_structure()
|
||||
assert passed, msg
|
||||
|
||||
def test_confidence_scoring():
|
||||
"""Test that confidence scoring is properly defined."""
|
||||
prompt_path = Path("templates/harvest-prompt.md")
|
||||
content = prompt_path.read_text()
|
||||
|
||||
# Check for confidence scale definitions
|
||||
confidence_levels = [
|
||||
("0.9-1.0", "explicitly stated"),
|
||||
("0.7-0.8", "clearly implied"),
|
||||
("0.5-0.6", "suggested"),
|
||||
("0.3-0.4", "inferred"),
|
||||
("0.1-0.2", "speculative")
|
||||
]
|
||||
|
||||
for level, description in confidence_levels:
|
||||
if level not in content:
|
||||
return False, f"Missing confidence level: {level}"
|
||||
if description.lower() not in content.lower():
|
||||
return False, f"Missing confidence description: {description}"
|
||||
|
||||
return True, "Confidence scoring is properly defined"
|
||||
passed, msg = check_confidence_scoring()
|
||||
assert passed, msg
|
||||
|
||||
def test_example_quality():
|
||||
"""Test that examples are clear and complete."""
|
||||
prompt_path = Path("templates/harvest-prompt.md")
|
||||
content = prompt_path.read_text()
|
||||
|
||||
# Check for example input/output
|
||||
if "example" not in content.lower():
|
||||
return False, "No examples provided"
|
||||
|
||||
# Check that example includes all categories
|
||||
example_section = content[content.lower().find("example"):]
|
||||
|
||||
# Look for JSON example
|
||||
json_match = re.search(r'\{[\s\S]*"knowledge"[\s\S]*\}', example_section)
|
||||
if not json_match:
|
||||
return False, "No JSON example found"
|
||||
|
||||
example_json = json_match.group(0)
|
||||
|
||||
# Check for all categories in example
|
||||
for category in ["fact", "pitfall", "pattern", "tool-quirk", "question"]:
|
||||
if category not in example_json:
|
||||
return False, f"Example missing category: {category}"
|
||||
|
||||
return True, "Examples are clear and complete"
|
||||
passed, msg = check_example_quality()
|
||||
assert passed, msg
|
||||
|
||||
def test_constraint_coverage():
|
||||
"""Test that constraints cover all requirements."""
|
||||
prompt_path = Path("templates/harvest-prompt.md")
|
||||
content = prompt_path.read_text()
|
||||
|
||||
required_constraints = [
|
||||
"No hallucination",
|
||||
"only extract",
|
||||
"explicitly",
|
||||
"partial",
|
||||
"failed sessions",
|
||||
"1k tokens"
|
||||
]
|
||||
|
||||
for constraint in required_constraints:
|
||||
if constraint.lower() not in content.lower():
|
||||
return False, f"Missing constraint: {constraint}"
|
||||
|
||||
return True, "Constraints cover all requirements"
|
||||
passed, msg = check_constraint_coverage()
|
||||
assert passed, msg
|
||||
|
||||
def test_test_sessions():
|
||||
"""Test that test sessions exist and are valid."""
|
||||
test_sessions_dir = Path("test_sessions")
|
||||
if not test_sessions_dir.exists():
|
||||
return False, "test_sessions directory not found"
|
||||
|
||||
session_files = list(test_sessions_dir.glob("*.jsonl"))
|
||||
if len(session_files) < 5:
|
||||
return False, f"Only {len(session_files)} test sessions found, need 5"
|
||||
|
||||
# Check each session file
|
||||
for session_file in session_files:
|
||||
content = session_file.read_text()
|
||||
lines = content.strip().split("\n")
|
||||
|
||||
# Check that each line is valid JSON
|
||||
for i, line in enumerate(lines, 1):
|
||||
try:
|
||||
json.loads(line)
|
||||
except json.JSONDecodeError as e:
|
||||
return False, f"Invalid JSON in {session_file.name}, line {i}: {e}"
|
||||
|
||||
return True, f"Found {len(session_files)} valid test sessions"
|
||||
|
||||
def run_all_tests():
|
||||
"""Run all tests and return results."""
|
||||
tests = [
|
||||
("Prompt Structure", test_prompt_structure),
|
||||
("Confidence Scoring", test_confidence_scoring),
|
||||
("Example Quality", test_example_quality),
|
||||
("Constraint Coverage", test_constraint_coverage),
|
||||
("Test Sessions", test_test_sessions)
|
||||
]
|
||||
|
||||
results = []
|
||||
all_passed = True
|
||||
|
||||
for test_name, test_func in tests:
|
||||
try:
|
||||
passed, message = test_func()
|
||||
results.append({
|
||||
"test": test_name,
|
||||
"passed": passed,
|
||||
"message": message
|
||||
})
|
||||
if not passed:
|
||||
all_passed = False
|
||||
except Exception as e:
|
||||
results.append({
|
||||
"test": test_name,
|
||||
"passed": False,
|
||||
"message": f"Error: {str(e)}"
|
||||
})
|
||||
all_passed = False
|
||||
|
||||
# Print results
|
||||
print("=" * 60)
|
||||
print("HARVEST PROMPT TEST RESULTS")
|
||||
print("=" * 60)
|
||||
|
||||
for result in results:
|
||||
status = "✓ PASS" if result["passed"] else "✗ FAIL"
|
||||
print(f"{status}: {result['test']}")
|
||||
print(f" {result['message']}")
|
||||
print()
|
||||
|
||||
print("=" * 60)
|
||||
if all_passed:
|
||||
print("ALL TESTS PASSED!")
|
||||
else:
|
||||
print("SOME TESTS FAILED!")
|
||||
print("=" * 60)
|
||||
|
||||
return all_passed, results
|
||||
passed, msg = check_test_sessions()
|
||||
assert passed, msg
|
||||
|
||||
if __name__ == "__main__":
|
||||
all_passed, results = run_all_tests()
|
||||
|
||||
# Save results to file
|
||||
with open("test_results.json", "w") as f:
|
||||
json.dump({
|
||||
"all_passed": all_passed,
|
||||
"results": results,
|
||||
"timestamp": "2026-04-14T19:05:00Z"
|
||||
}, f, indent=2)
|
||||
|
||||
print(f"Results saved to test_results.json")
|
||||
|
||||
# Exit with appropriate code
|
||||
exit(0 if all_passed else 1)
|
||||
checks = [check_prompt_structure, check_confidence_scoring, check_example_quality, check_constraint_coverage, check_test_sessions]
|
||||
for fn in checks:
|
||||
ok, msg = fn()
|
||||
print(f"{'PASS' if ok else 'FAIL'}: {fn.__name__} -- {msg}")
|
||||
|
||||
72
templates/conference-summary-prompt.md
Normal file
72
templates/conference-summary-prompt.md
Normal file
@@ -0,0 +1,72 @@
|
||||
# Conference Talk Knowledge Extraction Prompt
|
||||
|
||||
## System Prompt
|
||||
|
||||
You are a knowledge extraction engine specialized in conference talks. You read talk transcripts and output ONLY structured JSON. You extract factual insights, patterns, tool discoveries, and warnings that are durable and actionable for the Timmy Foundation fleet.
|
||||
|
||||
## Prompt
|
||||
|
||||
```
|
||||
TASK: Extract durable knowledge from this conference talk transcript.
|
||||
|
||||
RULES:
|
||||
1. Extract ONLY information explicitly stated or strongly implied in the transcript.
|
||||
2. Do NOT hallucinate, infer unsupported details, or invent quotes.
|
||||
3. Every fact must be grounded in something the speaker actually said.
|
||||
4. Focus on **durable, reusable** knowledge — not specific project details that won't apply elsewhere.
|
||||
5. Prioritize insights that improve: workflows, tool usage, system design, governance, or operational reliability.
|
||||
|
||||
CATEGORIES (assign exactly one per item):
|
||||
- fact: Concrete, verifiable takeaway (technical detail, config, workflow)
|
||||
- pitfall: Mistake, trap, or cost of wrong approach the speaker warned about
|
||||
- pattern: Successful approach, sequence, or template worth reusing
|
||||
- tool-quirk: Unexpected behavior, gotcha, or setup detail for a specific tool/platform
|
||||
- question: Something raised but not fully answered — worth investigating further
|
||||
|
||||
CONFIDENCE:
|
||||
- 0.9–1.0: Explicitly stated by speaker with clear reasoning/evidence
|
||||
- 0.7–0.8: Clearly implied by multiple statements, speaker's expertise
|
||||
- 0.5–0.6: Suggested or hinted, but not directly confirmed
|
||||
- 0.3–0.4: Interpretive, speculative, or single-data-point observation
|
||||
|
||||
TARGET DOMAIN:
|
||||
- If talk is about a specific repo (e.g. hermes-agent, the-nexus), set `domain` to that repo name.
|
||||
- If talk is about general principles, fleet processes, or multiple repos, set `domain` to "global".
|
||||
- If talk is about an agent type (mimo, groq, claude), set `domain` to the agent name.
|
||||
- If talk is about the compounding-intelligence system itself, set `domain` to "compounding-intelligence".
|
||||
|
||||
OUTPUT FORMAT (valid JSON only, no markdown, no explanation):
|
||||
|
||||
{
|
||||
"knowledge": [
|
||||
{
|
||||
"fact": "One specific, actionable sentence of knowledge",
|
||||
"category": "fact|pitfall|pattern|tool-quirk|question",
|
||||
"domain": "global|{repo}|{agent}|compounding-intelligence",
|
||||
"confidence": 0.0-1.0,
|
||||
"tags": ["relevant", "keywords"],
|
||||
"evidence": "Brief paraphrase or quote from the transcript that supports this"
|
||||
}
|
||||
],
|
||||
"meta": {
|
||||
"talk_title": "Title of the talk (if known)",
|
||||
"speaker": "Speaker name(s)",
|
||||
"conference": "Conference name",
|
||||
"talk_url": "URL to talk/video (if available)",
|
||||
"knowledge_count": 0,
|
||||
"extraction_date": "2026-04-26"
|
||||
}
|
||||
}
|
||||
|
||||
TRANSCRIPT:
|
||||
{{transcript}}
|
||||
```
|
||||
|
||||
## Design Notes
|
||||
|
||||
- Keep `fact` field to **one clear sentence**. Avoid run-ons.
|
||||
- `evidence` should be a 1–2 sentence paraphrase, not verbatim paragraph.
|
||||
- `tags` should include: tool names, repo names, agent types, concepts mentioned
|
||||
- Focus on what the fleet can **reuse tomorrow**, not ephemeral project context
|
||||
- If the talk is high-level vision with no concrete details, that's a `question` or low-confidence `fact`
|
||||
|
||||
207
tests/test_dedup.py
Normal file
207
tests/test_dedup.py
Normal file
@@ -0,0 +1,207 @@
|
||||
"""Tests for knowledge deduplication module (Issue #196)."""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||
|
||||
from dedup import (
|
||||
normalize_text,
|
||||
content_hash,
|
||||
tokenize,
|
||||
token_similarity,
|
||||
quality_score,
|
||||
merge_facts,
|
||||
dedup_facts,
|
||||
generate_test_duplicates,
|
||||
)
|
||||
|
||||
|
||||
class TestNormalize:
|
||||
def test_lowercases(self):
|
||||
assert normalize_text("Hello World") == "hello world"
|
||||
|
||||
def test_collapses_whitespace(self):
|
||||
assert normalize_text(" hello world ") == "hello world"
|
||||
|
||||
def test_strips(self):
|
||||
assert normalize_text(" text ") == "text"
|
||||
|
||||
|
||||
class TestContentHash:
|
||||
def test_deterministic(self):
|
||||
h1 = content_hash("Hello World")
|
||||
h2 = content_hash("hello world")
|
||||
h3 = content_hash(" Hello World ")
|
||||
assert h1 == h2 == h3
|
||||
|
||||
def test_different_texts(self):
|
||||
h1 = content_hash("Hello")
|
||||
h2 = content_hash("World")
|
||||
assert h1 != h2
|
||||
|
||||
def test_returns_hex(self):
|
||||
h = content_hash("test")
|
||||
assert len(h) == 64 # SHA256
|
||||
assert all(c in '0123456789abcdef' for c in h)
|
||||
|
||||
|
||||
class TestTokenize:
|
||||
def test_extracts_words(self):
|
||||
tokens = tokenize("Hello World Test")
|
||||
assert "hello" in tokens
|
||||
assert "world" in tokens
|
||||
assert "test" in tokens
|
||||
|
||||
def test_skips_short_words(self):
|
||||
tokens = tokenize("a to is the hello")
|
||||
assert "a" not in tokens
|
||||
assert "to" not in tokens
|
||||
assert "hello" in tokens
|
||||
|
||||
def test_returns_set(self):
|
||||
tokens = tokenize("hello hello world")
|
||||
assert isinstance(tokens, set)
|
||||
assert len(tokens) == 2
|
||||
|
||||
|
||||
class TestTokenSimilarity:
|
||||
def test_identical(self):
|
||||
assert token_similarity("hello world", "hello world") == 1.0
|
||||
|
||||
def test_no_overlap(self):
|
||||
assert token_similarity("alpha beta", "gamma delta") == 0.0
|
||||
|
||||
def test_partial_overlap(self):
|
||||
sim = token_similarity("hello world test", "hello universe test")
|
||||
assert 0.3 < sim < 0.7
|
||||
|
||||
def test_empty(self):
|
||||
assert token_similarity("", "hello") == 0.0
|
||||
assert token_similarity("hello", "") == 0.0
|
||||
|
||||
def test_symmetric(self):
|
||||
a = "hello world test"
|
||||
b = "hello universe test"
|
||||
assert token_similarity(a, b) == token_similarity(b, a)
|
||||
|
||||
|
||||
class TestQualityScore:
|
||||
def test_high_confidence(self):
|
||||
fact = {"confidence": 0.95, "source_count": 5, "tags": ["test"], "related": ["x"]}
|
||||
score = quality_score(fact)
|
||||
assert score > 0.7
|
||||
|
||||
def test_low_confidence(self):
|
||||
fact = {"confidence": 0.3, "source_count": 1}
|
||||
score = quality_score(fact)
|
||||
assert score < 0.5
|
||||
|
||||
def test_defaults(self):
|
||||
score = quality_score({})
|
||||
assert 0 < score < 1
|
||||
|
||||
|
||||
class TestMergeFacts:
|
||||
def test_merges_tags(self):
|
||||
keep = {"id": "a", "fact": "test", "tags": ["git"], "confidence": 0.9}
|
||||
drop = {"id": "b", "fact": "test", "tags": ["python"], "confidence": 0.8}
|
||||
merged = merge_facts(keep, drop)
|
||||
assert "git" in merged["tags"]
|
||||
assert "python" in merged["tags"]
|
||||
|
||||
def test_merges_source_count(self):
|
||||
keep = {"id": "a", "fact": "test", "source_count": 3}
|
||||
drop = {"id": "b", "fact": "test", "source_count": 2}
|
||||
merged = merge_facts(keep, drop)
|
||||
assert merged["source_count"] == 5
|
||||
|
||||
def test_keeps_higher_confidence(self):
|
||||
keep = {"id": "a", "fact": "test", "confidence": 0.7}
|
||||
drop = {"id": "b", "fact": "test", "confidence": 0.9}
|
||||
merged = merge_facts(keep, drop)
|
||||
assert merged["confidence"] == 0.9
|
||||
|
||||
def test_tracks_merged_from(self):
|
||||
keep = {"id": "a", "fact": "test"}
|
||||
drop = {"id": "b", "fact": "test"}
|
||||
merged = merge_facts(keep, drop)
|
||||
assert "b" in merged["_merged_from"]
|
||||
|
||||
|
||||
class TestDedupFacts:
|
||||
def test_removes_exact_dupes(self):
|
||||
facts = [
|
||||
{"id": "1", "fact": "Always use git rebase"},
|
||||
{"id": "2", "fact": "Always use git rebase"}, # exact dupe
|
||||
{"id": "3", "fact": "Check logs first"},
|
||||
]
|
||||
deduped, stats = dedup_facts(facts)
|
||||
assert stats["exact_dupes"] == 1
|
||||
assert stats["unique"] == 2
|
||||
|
||||
def test_removes_near_dupes(self):
|
||||
facts = [
|
||||
{"id": "1", "fact": "Always check logs before deploying to production server"},
|
||||
{"id": "2", "fact": "Always check logs before deploying to production environment"},
|
||||
{"id": "3", "fact": "Use docker compose for local development environments"},
|
||||
]
|
||||
deduped, stats = dedup_facts(facts, near_threshold=0.5)
|
||||
assert stats["near_dupes"] >= 1
|
||||
assert stats["unique"] == 2
|
||||
|
||||
def test_preserves_unique(self):
|
||||
facts = [
|
||||
{"id": "1", "fact": "Use git rebase for clean history"},
|
||||
{"id": "2", "fact": "Docker containers should be stateless"},
|
||||
{"id": "3", "fact": "Always write tests before code"},
|
||||
]
|
||||
deduped, stats = dedup_facts(facts)
|
||||
assert stats["unique"] == 3
|
||||
assert stats["removed"] == 0
|
||||
|
||||
def test_empty_input(self):
|
||||
deduped, stats = dedup_facts([])
|
||||
assert stats["total"] == 0
|
||||
assert stats["unique"] == 0
|
||||
|
||||
def test_keeps_higher_quality_near_dup(self):
|
||||
facts = [
|
||||
{"id": "1", "fact": "Check logs before deploying to production server", "confidence": 0.5, "source_count": 1},
|
||||
{"id": "2", "fact": "Check logs before deploying to production environment", "confidence": 0.9, "source_count": 5, "tags": ["ops"]},
|
||||
]
|
||||
deduped, stats = dedup_facts(facts, near_threshold=0.5)
|
||||
assert stats["unique"] == 1
|
||||
# Higher quality fact should be kept
|
||||
assert deduped[0]["confidence"] == 0.9
|
||||
|
||||
def test_dry_run_does_not_modify(self):
|
||||
facts = [
|
||||
{"id": "1", "fact": "Same text"},
|
||||
{"id": "2", "fact": "Same text"},
|
||||
]
|
||||
deduped, stats = dedup_facts(facts, dry_run=True)
|
||||
assert stats["exact_dupes"] == 1
|
||||
# In dry_run, merge_facts is skipped so facts aren't modified
|
||||
assert len(deduped) == 1
|
||||
|
||||
|
||||
class TestGenerateTestDuplicates:
|
||||
def test_generates_correct_count(self):
|
||||
facts = generate_test_duplicates(20)
|
||||
assert len(facts) > 20 # 20 unique + duplicates
|
||||
|
||||
def test_has_exact_dupes(self):
|
||||
facts = generate_test_duplicates(20)
|
||||
hashes = [content_hash(f["fact"]) for f in facts]
|
||||
# Should have some duplicate hashes
|
||||
assert len(hashes) != len(set(hashes))
|
||||
|
||||
def test_dedup_removes_dupes(self):
|
||||
facts = generate_test_duplicates(20)
|
||||
deduped, stats = dedup_facts(facts)
|
||||
assert stats["unique"] <= 20
|
||||
assert stats["removed"] > 0
|
||||
227
tests/test_freshness.py
Normal file
227
tests/test_freshness.py
Normal file
@@ -0,0 +1,227 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for scripts/freshness.py — 8 tests."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__) or ".", ".."))
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"freshness", os.path.join(os.path.dirname(__file__) or ".", "..", "scripts", "freshness.py"))
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
|
||||
compute_file_hash = mod.compute_file_hash
|
||||
check_freshness = mod.check_freshness
|
||||
load_knowledge_entries = mod.load_knowledge_entries
|
||||
|
||||
|
||||
def test_compute_file_hash():
|
||||
"""File hash should be computed correctly."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
|
||||
f.write("test content")
|
||||
f.flush()
|
||||
h = compute_file_hash(f.name)
|
||||
assert h is not None
|
||||
assert h.startswith("sha256:")
|
||||
os.unlink(f.name)
|
||||
print("PASS: test_compute_file_hash")
|
||||
|
||||
|
||||
def test_compute_file_hash_nonexistent():
|
||||
"""Nonexistent file should return None."""
|
||||
h = compute_file_hash("/nonexistent/file.txt")
|
||||
assert h is None
|
||||
print("PASS: test_compute_file_hash_nonexistent")
|
||||
|
||||
|
||||
def test_load_knowledge_entries_empty():
|
||||
"""Empty knowledge dir should return empty list."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
entries = load_knowledge_entries(tmpdir)
|
||||
assert entries == []
|
||||
print("PASS: test_load_knowledge_entries_empty")
|
||||
|
||||
|
||||
def test_load_knowledge_entries_from_index():
|
||||
"""Should load entries from index.json."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create index.json
|
||||
index_path = os.path.join(tmpdir, "index.json")
|
||||
with open(index_path, "w") as f:
|
||||
json.dump({
|
||||
"facts": [
|
||||
{
|
||||
"fact": "Test fact",
|
||||
"source_file": "test.py",
|
||||
"source_hash": "sha256:abc123",
|
||||
"category": "fact",
|
||||
"confidence": 0.9
|
||||
}
|
||||
]
|
||||
}, f)
|
||||
|
||||
entries = load_knowledge_entries(tmpdir)
|
||||
assert len(entries) == 1
|
||||
assert entries[0]["fact"] == "Test fact"
|
||||
assert entries[0]["source_file"] == "test.py"
|
||||
print("PASS: test_load_knowledge_entries_from_index")
|
||||
|
||||
|
||||
def test_load_knowledge_entries_from_yaml():
|
||||
"""Should load entries from YAML files."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create global directory
|
||||
global_dir = os.path.join(tmpdir, "global")
|
||||
os.makedirs(global_dir)
|
||||
|
||||
# Create YAML file
|
||||
yaml_path = os.path.join(global_dir, "test.yaml")
|
||||
with open(yaml_path, "w") as f:
|
||||
f.write("""
|
||||
pitfalls:
|
||||
- description: "Test pitfall"
|
||||
source_file: "test.py"
|
||||
source_hash: "sha256:def456"
|
||||
category: "pitfall"
|
||||
confidence: 0.8
|
||||
""")
|
||||
|
||||
entries = load_knowledge_entries(tmpdir)
|
||||
assert len(entries) == 1
|
||||
assert entries[0]["fact"] == "Test pitfall"
|
||||
assert entries[0]["category"] == "pitfall"
|
||||
print("PASS: test_load_knowledge_entries_from_yaml")
|
||||
|
||||
|
||||
def test_check_freshness_no_changes():
|
||||
"""With no source file reference, entries should be counted correctly."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create knowledge dir
|
||||
knowledge_dir = os.path.join(tmpdir, "knowledge")
|
||||
os.makedirs(knowledge_dir)
|
||||
|
||||
# Create repo dir
|
||||
repo_dir = os.path.join(tmpdir, "repo")
|
||||
os.makedirs(repo_dir)
|
||||
|
||||
# Create index.json with entry that has no source_file
|
||||
index_path = os.path.join(knowledge_dir, "index.json")
|
||||
with open(index_path, "w") as f:
|
||||
json.dump({
|
||||
"facts": [
|
||||
{
|
||||
"fact": "General knowledge",
|
||||
"category": "fact",
|
||||
"confidence": 0.9
|
||||
# No source_file or source_hash
|
||||
}
|
||||
]
|
||||
}, f)
|
||||
|
||||
result = check_freshness(knowledge_dir, repo_dir, days=1)
|
||||
|
||||
# Entry without source_file should be counted as "fresh" (no_source status)
|
||||
assert result["summary"]["total"] == 1
|
||||
assert result["summary"]["stale"] == 0
|
||||
assert result["summary"]["fresh"] == 1
|
||||
assert result["fresh_entries"][0]["status"] == "no_source"
|
||||
print("PASS: test_check_freshness_no_changes")
|
||||
|
||||
|
||||
def test_check_freshness_with_hash_mismatch():
|
||||
"""Hash mismatch should mark entry as stale."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create knowledge dir
|
||||
knowledge_dir = os.path.join(tmpdir, "knowledge")
|
||||
os.makedirs(knowledge_dir)
|
||||
|
||||
# Create repo dir with a file
|
||||
repo_dir = os.path.join(tmpdir, "repo")
|
||||
os.makedirs(repo_dir)
|
||||
|
||||
test_file = os.path.join(repo_dir, "test.py")
|
||||
with open(test_file, "w") as f:
|
||||
f.write("print('hello')")
|
||||
|
||||
# Create index.json with wrong hash
|
||||
index_path = os.path.join(knowledge_dir, "index.json")
|
||||
with open(index_path, "w") as f:
|
||||
json.dump({
|
||||
"facts": [
|
||||
{
|
||||
"fact": "Test fact",
|
||||
"source_file": "test.py",
|
||||
"source_hash": "sha256:wronghash",
|
||||
"category": "fact",
|
||||
"confidence": 0.9
|
||||
}
|
||||
]
|
||||
}, f)
|
||||
|
||||
# Initialize git repo
|
||||
os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null")
|
||||
|
||||
result = check_freshness(knowledge_dir, repo_dir, days=1)
|
||||
|
||||
assert result["summary"]["total"] == 1
|
||||
assert result["summary"]["stale"] == 1
|
||||
assert result["summary"]["fresh"] == 0
|
||||
assert result["stale_entries"][0]["reason"] == "hash_mismatch"
|
||||
print("PASS: test_check_freshness_with_hash_mismatch")
|
||||
|
||||
|
||||
def test_check_freshness_missing_source():
|
||||
"""Missing source file should mark entry as stale."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create knowledge dir
|
||||
knowledge_dir = os.path.join(tmpdir, "knowledge")
|
||||
os.makedirs(knowledge_dir)
|
||||
|
||||
# Create repo dir (without the referenced file)
|
||||
repo_dir = os.path.join(tmpdir, "repo")
|
||||
os.makedirs(repo_dir)
|
||||
|
||||
# Create index.json referencing nonexistent file
|
||||
index_path = os.path.join(knowledge_dir, "index.json")
|
||||
with open(index_path, "w") as f:
|
||||
json.dump({
|
||||
"facts": [
|
||||
{
|
||||
"fact": "Test fact",
|
||||
"source_file": "nonexistent.py",
|
||||
"source_hash": "sha256:abc123",
|
||||
"category": "fact",
|
||||
"confidence": 0.9
|
||||
}
|
||||
]
|
||||
}, f)
|
||||
|
||||
# Initialize git repo
|
||||
os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null")
|
||||
|
||||
result = check_freshness(knowledge_dir, repo_dir, days=1)
|
||||
|
||||
assert result["summary"]["total"] == 1
|
||||
assert result["summary"]["stale"] == 1
|
||||
assert result["summary"]["fresh"] == 0
|
||||
assert result["stale_entries"][0]["reason"] == "source_missing"
|
||||
print("PASS: test_check_freshness_missing_source")
|
||||
|
||||
|
||||
def run_all():
|
||||
test_compute_file_hash()
|
||||
test_compute_file_hash_nonexistent()
|
||||
test_load_knowledge_entries_empty()
|
||||
test_load_knowledge_entries_from_index()
|
||||
test_load_knowledge_entries_from_yaml()
|
||||
test_check_freshness_no_changes()
|
||||
test_check_freshness_with_hash_mismatch()
|
||||
test_check_freshness_missing_source()
|
||||
print("\nAll 8 tests passed!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_all()
|
||||
Reference in New Issue
Block a user