Compare commits
23 Commits
fix/201-py
...
step35/233
| Author | SHA1 | Date | |
|---|---|---|---|
| 2133b18929 | |||
| c4cb325568 | |||
| ca21e3e886 | |||
| 8628a0d610 | |||
|
|
4b5a675355 | ||
| 345d2451d0 | |||
| 8aa9c9f018 | |||
| 277f9e3a2b | |||
| 21f654a159 | |||
| 12abaad838 | |||
| c106db2e28 | |||
| 242c77cc99 | |||
| fe94130380 | |||
| 4181065f60 | |||
| cc215e3ed7 | |||
| baa2c84c3f | |||
| 6dd354385f | |||
|
|
55adcb31dc | ||
|
|
ec0e9d65ca | ||
| b732172dcc | |||
| f7c479c4eb | |||
| c203010e3a | |||
|
|
e1e42c3f8e |
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
@@ -0,0 +1,5 @@
|
||||
__pycache__/
|
||||
*.pyc
|
||||
|
||||
.pytest_cache/
|
||||
.mypy_cache/
|
||||
374
GENOME.md
374
GENOME.md
@@ -1,16 +1,16 @@
|
||||
# GENOME.md — compounding-intelligence
|
||||
|
||||
*Auto-generated codebase genome. Addresses timmy-home#676.*
|
||||
**Generated:** 2026-04-17
|
||||
**Repo:** Timmy_Foundation/compounding-intelligence
|
||||
**Description:** Turn 1B+ daily agent tokens into durable, compounding fleet intelligence.
|
||||
|
||||
---
|
||||
|
||||
## Project Overview
|
||||
|
||||
**What:** A system that turns 1B+ daily agent tokens into durable, compounding fleet intelligence.
|
||||
Every agent session starts at zero. The same HTTP 405 gets rediscovered as a branch protection issue. The same token path gets searched from scratch. Intelligence evaporates when the session ends.
|
||||
|
||||
**Why:** Every agent session starts at zero. The same mistakes get made repeatedly — the same HTTP 405 is rediscovered as a branch protection issue, the same token path is searched for from scratch. Intelligence evaporates when the session ends.
|
||||
|
||||
**How:** Three pipelines form a compounding loop:
|
||||
Compounding-intelligence solves this with three pipelines forming a loop:
|
||||
|
||||
```
|
||||
SESSION ENDS → HARVESTER → KNOWLEDGE STORE → BOOTSTRAPPER → NEW SESSION STARTS SMARTER
|
||||
@@ -18,222 +18,234 @@ SESSION ENDS → HARVESTER → KNOWLEDGE STORE → BOOTSTRAPPER → NEW SESSION
|
||||
MEASURER → Prove it's working
|
||||
```
|
||||
|
||||
**Status:** Early stage. Template and test scaffolding exist. Core pipeline scripts (harvester.py, bootstrapper.py, measurer.py, session_reader.py) are planned but not yet implemented. The knowledge extraction prompt is complete and validated.
|
||||
|
||||
---
|
||||
**Status:** Active development. Core pipelines implemented. 20+ scripts, 14 test files, knowledge store populated with real data.
|
||||
|
||||
## Architecture
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
A[Session Transcript<br/>.jsonl] --> B[Harvester]
|
||||
B --> C{Extract Knowledge}
|
||||
C --> D[knowledge/index.json]
|
||||
C --> E[knowledge/global/*.md]
|
||||
C --> F[knowledge/repos/{repo}.md]
|
||||
C --> G[knowledge/agents/{agent}.md]
|
||||
D --> H[Bootstrapper]
|
||||
H --> I[Bootstrap Context<br/>2k token injection]
|
||||
I --> J[New Session<br/>starts smarter]
|
||||
J --> A
|
||||
D --> K[Measurer]
|
||||
K --> L[metrics/dashboard.md]
|
||||
K --> M[Velocity / Hit Rate<br/>Error Reduction]
|
||||
TRANS[Session Transcripts<br/>~/.hermes/sessions/*.jsonl] --> READER[session_reader.py]
|
||||
READER --> HARVESTER[harvester.py]
|
||||
HARVESTER -->|LLM extraction| PROMPT[harvest-prompt.md]
|
||||
HARVESTER --> DEDUP[deduplicate()]
|
||||
DEDUP --> INDEX[knowledge/index.json]
|
||||
DEDUP --> GLOBAL[knowledge/global/*.yaml]
|
||||
DEDUP --> REPO[knowledge/repos/*.yaml]
|
||||
|
||||
INDEX --> BOOTSTRAPPER[bootstrapper.py]
|
||||
BOOTSTRAPPER -->|filter + rank + truncate| CONTEXT[Bootstrap Context<br/>2k token injection]
|
||||
CONTEXT --> SESSION[New Session starts smarter]
|
||||
|
||||
INDEX --> VALIDATOR[validate_knowledge.py]
|
||||
INDEX --> STALENESS[knowledge_staleness_check.py]
|
||||
INDEX --> GAPS[knowledge_gap_identifier.py]
|
||||
|
||||
TRANS --> SAMPLER[sampler.py]
|
||||
SAMPLER -->|score + rank| BEST[High-value sessions]
|
||||
BEST --> HARVESTER
|
||||
|
||||
TRANS --> METADATA[session_metadata.py]
|
||||
METADATA --> SUMMARY[SessionSummary objects]
|
||||
|
||||
KNOWLEDGE --> DIFF[diff_analyzer.py]
|
||||
DIFF --> PROPOSALS[improvement_proposals.py]
|
||||
PROPOSALS --> PRIORITIES[priority_rebalancer.py]
|
||||
```
|
||||
|
||||
### Pipeline 1: Harvester
|
||||
## Entry Points
|
||||
|
||||
**Status:** Prompt designed. Script not implemented.
|
||||
### Core Pipelines
|
||||
|
||||
Reads finished session transcripts (JSONL). Uses `templates/harvest-prompt.md` to extract durable knowledge into five categories:
|
||||
| Script | Purpose | Key Functions |
|
||||
|--------|---------|---------------|
|
||||
| `harvester.py` | Extract knowledge from session transcripts | `harvest_session()`, `call_llm()`, `deduplicate()`, `validate_fact()` |
|
||||
| `bootstrapper.py` | Build pre-session context from knowledge store | `build_bootstrap_context()`, `filter_facts()`, `sort_facts()`, `truncate_to_tokens()` |
|
||||
| `session_reader.py` | Parse JSONL session transcripts | `read_session()`, `extract_conversation()`, `messages_to_text()` |
|
||||
| `sampler.py` | Score and rank sessions for harvesting value | `scan_session_fast()`, `score_session()` |
|
||||
| `session_metadata.py` | Extract structured metadata from sessions | `extract_session_metadata()`, `SessionSummary` |
|
||||
|
||||
| Category | Description | Example |
|
||||
|----------|-------------|---------|
|
||||
| `fact` | Concrete, verifiable information | "Repository X has 5 files" |
|
||||
| `pitfall` | Errors encountered, wrong assumptions | "Token is at ~/.config/gitea/token, not env var" |
|
||||
| `pattern` | Successful action sequences | "Deploy: test → build → push → webhook" |
|
||||
| `tool-quirk` | Environment-specific behaviors | "URL format requires trailing slash" |
|
||||
| `question` | Identified but unanswered | "Need optimal batch size for harvesting" |
|
||||
### Analysis & Quality
|
||||
|
||||
Output schema per knowledge item:
|
||||
```json
|
||||
{
|
||||
"fact": "One sentence description",
|
||||
"category": "fact|pitfall|pattern|tool-quirk|question",
|
||||
"repo": "repo-name or 'global'",
|
||||
"confidence": 0.0-1.0
|
||||
}
|
||||
```
|
||||
| Script | Purpose |
|
||||
|--------|---------|
|
||||
| `validate_knowledge.py` | Validate knowledge index schema compliance |
|
||||
| `knowledge_staleness_check.py` | Detect stale knowledge (source changed since extraction) |
|
||||
| `knowledge_gap_identifier.py` | Find untested functions, undocumented APIs, missing tests |
|
||||
| `diff_analyzer.py` | Analyze code diffs for improvement signals |
|
||||
| `improvement_proposals.py` | Generate ranked improvement proposals |
|
||||
| `priority_rebalancer.py` | Rebalance priorities across proposals |
|
||||
| `automation_opportunity_finder.py` | Find manual steps that can be automated |
|
||||
| `dead_code_detector.py` | Detect unused code |
|
||||
| `dependency_graph.py` | Map dependency relationships |
|
||||
| `perf_bottleneck_finder.py` | Find performance bottlenecks |
|
||||
| `refactoring_opportunity_finder.py` | Identify refactoring targets |
|
||||
| `gitea_issue_parser.py` | Parse Gitea issues for knowledge extraction |
|
||||
|
||||
### Pipeline 2: Bootstrapper
|
||||
### Automation
|
||||
|
||||
**Status:** Not implemented.
|
||||
| Script | Purpose |
|
||||
|--------|---------|
|
||||
| `session_pair_harvester.py` | Extract training pairs from sessions |
|
||||
|
||||
Queries knowledge store before session start. Assembles a compact 2k-token context from relevant facts. Injects into session startup so the agent begins with full situational awareness.
|
||||
|
||||
### Pipeline 3: Measurer
|
||||
|
||||
**Status:** Not implemented.
|
||||
|
||||
Tracks compounding metrics: knowledge velocity (facts/day), error reduction (%), hit rate (knowledge used / knowledge available), task completion improvement.
|
||||
|
||||
---
|
||||
|
||||
## Directory Structure
|
||||
## Data Flow
|
||||
|
||||
```
|
||||
compounding-intelligence/
|
||||
├── README.md # Project overview and architecture
|
||||
├── GENOME.md # This file (codebase genome)
|
||||
├── knowledge/ # [PLANNED] Knowledge store
|
||||
│ ├── index.json # Machine-readable fact index
|
||||
│ ├── global/ # Cross-repo knowledge
|
||||
│ ├── repos/{repo}.md # Per-repo knowledge
|
||||
│ └── agents/{agent}.md # Agent-type notes
|
||||
├── scripts/
|
||||
│ ├── test_harvest_prompt.py # Basic prompt validation (2.5KB)
|
||||
│ └── test_harvest_prompt_comprehensive.py # Full prompt structure test (6.8KB)
|
||||
├── templates/
|
||||
│ └── harvest-prompt.md # Knowledge extraction prompt (3.5KB)
|
||||
├── test_sessions/
|
||||
│ ├── session_success.jsonl # Happy path test data
|
||||
│ ├── session_failure.jsonl # Failure path test data
|
||||
│ ├── session_partial.jsonl # Incomplete session test data
|
||||
│ ├── session_patterns.jsonl # Pattern extraction test data
|
||||
│ └── session_questions.jsonl # Question identification test data
|
||||
└── metrics/ # [PLANNED] Compounding metrics
|
||||
└── dashboard.md
|
||||
1. Session ends → .jsonl written to ~/.hermes/sessions/
|
||||
2. sampler.py scores sessions by age, recency, repo coverage
|
||||
3. harvester.py reads top sessions, calls LLM with harvest-prompt.md
|
||||
4. LLM extracts facts/pitfalls/patterns/quirks/questions
|
||||
5. deduplicate() checks against existing index via fact_fingerprint()
|
||||
6. validate_fact() checks schema compliance
|
||||
7. write_knowledge() appends to knowledge/index.json + per-repo YAML
|
||||
8. On next session start, bootstrapper.py:
|
||||
a. Loads knowledge/index.json
|
||||
b. Filters by session's repo and agent type
|
||||
c. Sorts by confidence (high first), then recency
|
||||
d. Truncates to 2k token budget
|
||||
e. Injects as pre-context
|
||||
9. Agent starts with full situational awareness instead of zero
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Entry Points and Data Flow
|
||||
|
||||
### Entry Point 1: Knowledge Extraction (Harvester)
|
||||
|
||||
```
|
||||
Input: Session transcript (JSONL)
|
||||
↓
|
||||
templates/harvest-prompt.md (LLM prompt)
|
||||
↓
|
||||
Knowledge items (JSON array)
|
||||
↓
|
||||
Output: knowledge/index.json + per-repo/per-agent markdown files
|
||||
```
|
||||
|
||||
### Entry Point 2: Session Bootstrap (Bootstrapper)
|
||||
|
||||
```
|
||||
Input: Session context (repo, agent type, task type)
|
||||
↓
|
||||
knowledge/index.json (query relevant facts)
|
||||
↓
|
||||
2k-token bootstrap context
|
||||
↓
|
||||
Output: Injected into session startup
|
||||
```
|
||||
|
||||
### Entry Point 3: Measurement (Measurer)
|
||||
|
||||
```
|
||||
Input: knowledge/index.json + session history
|
||||
↓
|
||||
Velocity, hit rate, error reduction calculations
|
||||
↓
|
||||
Output: metrics/dashboard.md
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Key Abstractions
|
||||
|
||||
### Knowledge Item
|
||||
The atomic unit. One sentence, one category, one confidence score. Designed to be small enough that 1000 items fit in a 2k-token bootstrap context.
|
||||
### Knowledge Item (fact/pitfall/pattern/quirk/question)
|
||||
```json
|
||||
{
|
||||
"fact": "Gitea token is at ~/.config/gitea/token",
|
||||
"category": "tool-quirk",
|
||||
"repo": "global",
|
||||
"confidence": 0.9,
|
||||
"evidence": "Found during clone attempt",
|
||||
"source_session": "2026-04-13_abc123",
|
||||
"extracted_at": "2026-04-13T20:00:00Z"
|
||||
}
|
||||
```
|
||||
|
||||
### Knowledge Store
|
||||
A directory structure that mirrors the fleet's mental model:
|
||||
- `global/` — knowledge that applies everywhere (tool quirks, environment facts)
|
||||
- `repos/` — knowledge specific to each repo
|
||||
- `agents/` — knowledge specific to each agent type
|
||||
### SessionSummary (session_metadata.py)
|
||||
Extracted metadata per session: duration, token count, tools used, repos touched, error count, outcome.
|
||||
|
||||
### Confidence Score
|
||||
0.0–1.0 scale. Defines how certain the harvester is about each extracted fact:
|
||||
- 0.9–1.0: Explicitly stated with verification
|
||||
- 0.7–0.8: Clearly implied by multiple data points
|
||||
- 0.5–0.6: Suggested but not fully verified
|
||||
- 0.3–0.4: Inferred from limited data
|
||||
- 0.1–0.2: Speculative or uncertain
|
||||
### Gap / GapReport (knowledge_gap_identifier.py)
|
||||
Structured gap analysis: untested functions, undocumented APIs, missing tests. Severity: critical/high/medium/low.
|
||||
|
||||
### Bootstrap Context
|
||||
The 2k-token injection that a new session receives. Assembled from the most relevant knowledge items for the current task, filtered by confidence > 0.7, deduplicated, and compressed.
|
||||
### Knowledge Index (knowledge/index.json)
|
||||
Machine-readable fact store. 12KB, populated with real data. Categories: fact, pitfall, pattern, tool-quirk, question.
|
||||
|
||||
---
|
||||
## Knowledge Store
|
||||
|
||||
```
|
||||
knowledge/
|
||||
├── index.json # Master fact store (12KB, populated)
|
||||
├── SCHEMA.md # Schema documentation
|
||||
├── global/
|
||||
│ ├── pitfalls.yaml # Cross-repo pitfalls (2KB)
|
||||
│ └── tool-quirks.yaml # Tool-specific quirks (2KB)
|
||||
├── repos/
|
||||
│ ├── hermes-agent.yaml # hermes-agent knowledge (2KB)
|
||||
│ └── the-nexus.yaml # the-nexus knowledge (2KB)
|
||||
└── agents/ # Per-agent knowledge (empty)
|
||||
```
|
||||
|
||||
## API Surface
|
||||
|
||||
### Internal (scripts not yet implemented)
|
||||
### LLM API (consumed)
|
||||
| Provider | Endpoint | Usage |
|
||||
|----------|----------|-------|
|
||||
| Nous Research | `https://inference-api.nousresearch.com/v1` | Knowledge extraction |
|
||||
| Ollama | `http://localhost:11434/v1` | Local fallback |
|
||||
|
||||
| Script | Input | Output | Status |
|
||||
|--------|-------|--------|--------|
|
||||
| `harvester.py` | Session JSONL path | Knowledge items JSON | PLANNED |
|
||||
| `bootstrapper.py` | Repo + agent type | 2k-token context string | PLANNED |
|
||||
| `measurer.py` | Knowledge store path | Metrics JSON | PLANNED |
|
||||
| `session_reader.py` | Session JSONL path | Parsed transcript | PLANNED |
|
||||
|
||||
### Prompt (templates/harvest-prompt.md)
|
||||
|
||||
The extraction prompt is the core "API." It takes a session transcript and returns structured JSON. It defines:
|
||||
- Five extraction categories
|
||||
- Output format (JSON array of knowledge items)
|
||||
- Confidence scoring rubric
|
||||
- Constraints (no hallucination, specificity, relevance, brevity)
|
||||
- Example input/output pair
|
||||
|
||||
---
|
||||
### File API (consumed/produced)
|
||||
| Path | Format | Direction |
|
||||
|------|--------|-----------|
|
||||
| `~/.hermes/sessions/*.jsonl` | JSONL | Input (session transcripts) |
|
||||
| `knowledge/index.json` | JSON | Output (master fact store) |
|
||||
| `knowledge/global/*.yaml` | YAML | Output (cross-repo knowledge) |
|
||||
| `knowledge/repos/*.yaml` | YAML | Output (per-repo knowledge) |
|
||||
| `templates/harvest-prompt.md` | Markdown | Config (extraction prompt) |
|
||||
|
||||
## Test Coverage
|
||||
|
||||
### What Exists
|
||||
**14 test files** covering core pipelines:
|
||||
|
||||
| File | Tests | Coverage |
|
||||
|------|-------|----------|
|
||||
| `scripts/test_harvest_prompt.py` | 2 tests | Prompt file existence, sample transcript |
|
||||
| `scripts/test_harvest_prompt_comprehensive.py` | 5 tests | Prompt structure, categories, fields, confidence scoring, size limits |
|
||||
| `test_sessions/*.jsonl` | 5 sessions | Success, failure, partial, patterns, questions |
|
||||
| Test File | Covers |
|
||||
|-----------|--------|
|
||||
| `test_harvest_prompt.py` | Prompt validation, hallucination detection |
|
||||
| `test_harvest_prompt_comprehensive.py` | Extended prompt testing |
|
||||
| `test_harvester_pipeline.py` | Harvester extraction + dedup |
|
||||
| `test_bootstrapper.py` | Context building, filtering, truncation |
|
||||
| `test_session_pair_harvester.py` | Training pair extraction |
|
||||
| `test_improvement_proposals.py` | Proposal generation |
|
||||
| `test_priority_rebalancer.py` | Priority scoring |
|
||||
| `test_knowledge_staleness.py` | Staleness detection |
|
||||
| `test_automation_opportunity_finder.py` | Automation detection |
|
||||
| `test_diff_analyzer.py` | Diff analysis |
|
||||
| `test_gitea_issue_parser.py` | Issue parsing |
|
||||
| `test_refactoring_opportunity_finder.py` | Refactoring signals |
|
||||
| `test_knowledge_gap_identifier.py` | Gap analysis |
|
||||
| `test_perf_bottleneck_finder.py` | Perf bottleneck detection |
|
||||
|
||||
### What's Missing
|
||||
### Coverage Gaps
|
||||
|
||||
1. **Harvester integration test** — Does the prompt actually extract correct knowledge from real transcripts?
|
||||
2. **Bootstrapper test** — Does it assemble relevant context correctly?
|
||||
3. **Knowledge store test** — Does the index.json maintain consistency?
|
||||
4. **Confidence calibration test** — Do high-confidence facts actually prove true in later sessions?
|
||||
5. **Deduplication test** — Are duplicate facts across sessions handled?
|
||||
6. **Staleness test** — How does the system handle outdated knowledge?
|
||||
|
||||
---
|
||||
1. **session_reader.py** — No dedicated test file (tested indirectly)
|
||||
2. **sampler.py** — No test file (scoring logic untested)
|
||||
3. **session_metadata.py** — No test file
|
||||
4. **validate_knowledge.py** — No test file
|
||||
5. **knowledge_staleness_check.py** — Tested but limited
|
||||
|
||||
## Security Considerations
|
||||
|
||||
1. **No secrets in knowledge store** — The harvester must filter out API keys, tokens, and credentials from extracted facts. The prompt constraints mention this but there is no automated guard.
|
||||
### API Key Handling
|
||||
- `harvester.py` reads API key from `~/.hermes/auth.json` or env vars
|
||||
- Key passed to LLM API in request headers only
|
||||
- No key logging
|
||||
|
||||
2. **Knowledge poisoning** — A malicious or corrupted session could inject false facts. Confidence scoring partially mitigates this, but there is no verification step.
|
||||
### Knowledge Integrity
|
||||
- `validate_fact()` checks schema before writing
|
||||
- `deduplicate()` prevents duplicate entries via fingerprint
|
||||
- `knowledge_staleness_check.py` detects when source code changed but knowledge didn't
|
||||
- Confidence scores prevent low-quality knowledge from polluting the store
|
||||
|
||||
3. **Access control** — The knowledge store has no access control. Any process that can read the directory can read all facts. In a multi-tenant setup, this is a concern.
|
||||
### File Safety
|
||||
- Knowledge writes are append-only (never deletes)
|
||||
- Bootstrap context is truncated to budget (no prompt injection via knowledge)
|
||||
- Session reader handles malformed JSONL gracefully
|
||||
|
||||
4. **Transcript privacy** — Session transcripts may contain user data. The harvester must not extract personally identifiable information into the knowledge store.
|
||||
## File Index
|
||||
|
||||
```
|
||||
scripts/
|
||||
harvester.py (473 lines) — Core knowledge extraction
|
||||
bootstrapper.py (302 lines) — Pre-session context builder
|
||||
session_reader.py (137 lines) — JSONL session parser
|
||||
sampler.py (363 lines) — Session scoring + ranking
|
||||
session_metadata.py (271 lines) — Session metadata extraction
|
||||
validate_knowledge.py (44 lines) — Index validation
|
||||
knowledge_staleness_check.py (125 lines) — Staleness detection
|
||||
knowledge_gap_identifier.py (291 lines) — Gap analysis engine
|
||||
diff_analyzer.py (203 lines) — Diff analysis
|
||||
improvement_proposals.py (518 lines) — Proposal generation
|
||||
priority_rebalancer.py (745 lines) — Priority scoring
|
||||
automation_opportunity_finder.py (600 lines) — Automation detection
|
||||
dead_code_detector.py (270 lines) — Dead code detection
|
||||
dependency_graph.py (220 lines) — Dependency mapping
|
||||
perf_bottleneck_finder.py (635 lines) — Perf analysis
|
||||
refactoring_opportunity_finder.py (46 lines) — Refactoring signals
|
||||
gitea_issue_parser.py (140 lines) — Gitea issue parsing
|
||||
session_pair_harvester.py (224 lines) — Training pair extraction
|
||||
knowledge/
|
||||
index.json (12KB) — Master fact store
|
||||
SCHEMA.md (3KB) — Schema docs
|
||||
global/pitfalls.yaml (2KB) — Cross-repo pitfalls
|
||||
global/tool-quirks.yaml (2KB) — Tool quirks
|
||||
repos/hermes-agent.yaml (2KB) — Repo-specific knowledge
|
||||
repos/the-nexus.yaml (2KB) — Repo-specific knowledge
|
||||
templates/
|
||||
harvest-prompt.md (4KB) — Extraction prompt
|
||||
test_sessions/ (5 files) — Sample transcripts
|
||||
tests/ + scripts/test_* (14 files)— Test suite
|
||||
```
|
||||
|
||||
**Total:** ~6,500 lines of code across 18 scripts + 14 test files.
|
||||
|
||||
---
|
||||
|
||||
## The 100x Path (from README)
|
||||
|
||||
```
|
||||
Month 1: 15,000 facts, sessions 20% faster
|
||||
Month 2: 45,000 facts, sessions 40% faster, first-try success up 30%
|
||||
Month 3: 90,000 facts, fleet measurably smarter per token
|
||||
```
|
||||
|
||||
Each new session is better than the last. The intelligence compounds.
|
||||
|
||||
---
|
||||
|
||||
*Generated by codebase-genome pipeline. Ref: timmy-home#676.*
|
||||
*Generated by Codebase Genome pipeline — Issue #676*
|
||||
|
||||
9
Makefile
9
Makefile
@@ -2,3 +2,12 @@
|
||||
|
||||
test:
|
||||
python3 -m pytest tests/test_ci_config.py scripts/test_*.py -v
|
||||
|
||||
# Connector targets
|
||||
.PHONY: test-connectors
|
||||
test-connectors:
|
||||
python3 -m pytest tests/test_connectors.py -v
|
||||
|
||||
.PHONY: connectors-help
|
||||
connectors-help:
|
||||
python3 scripts/run_connector.py --help
|
||||
|
||||
13
README.md
13
README.md
@@ -27,6 +27,11 @@ Before a session starts, queries knowledge store for relevant facts. Assembles c
|
||||
### Pipeline 3: Measure
|
||||
Tracks whether compounding is happening. Knowledge velocity, error reduction, hit rate, task completion. Daily report proves the loop works.
|
||||
|
||||
### Connector Pack (EPIC #233)
|
||||
Sovereign personal archive connectors: Twitter/X, Discord, Slack, WhatsApp, Notion, iMessage, Google.
|
||||
Connectors mirror local exports or explicit API tokens → normalize → redact → index → sync with provenance.
|
||||
See [`connectors/`](connectors/README.md) for the full connector suite and usage.
|
||||
|
||||
## Directory Structure
|
||||
|
||||
```
|
||||
@@ -40,6 +45,12 @@ Tracks whether compounding is happening. Knowledge velocity, error reduction, hi
|
||||
│ ├── bootstrapper.py # Pre-session context loader
|
||||
│ ├── measurer.py # Compounding metrics
|
||||
│ └── session_reader.py # JSONL parser
|
||||
├── connectors/ # Personal archive connectors (EPIC #233)
|
||||
│ ├── __init__.py
|
||||
│ ├── base.py
|
||||
│ ├── schema.py
|
||||
│ ├── twitter_archive.py
|
||||
│ └── README.md
|
||||
├── metrics/
|
||||
│ └── dashboard.md # Human-readable status
|
||||
└── templates/
|
||||
@@ -65,4 +76,4 @@ See [all issues](https://forge.alexanderwhitestone.com/Timmy_Foundation/compound
|
||||
- EPIC 1: Session Harvester (#2)
|
||||
- EPIC 2: Knowledge Store & Bootstrap (#3)
|
||||
- EPIC 3: Compounding Measurement (#4)
|
||||
- EPIC 4: Retroactive Harvest (#5)
|
||||
- EPIC 4: Retroactive Harvest (#5)
|
||||
66
connectors/README.md
Normal file
66
connectors/README.md
Normal file
@@ -0,0 +1,66 @@
|
||||
# Sovereign Personal Archive Connector Pack
|
||||
|
||||
This directory contains the connector infrastructure for ingesting personal archives
|
||||
(Discord, Slack, WhatsApp, Notion, iMessage, X/Twitter, Google) into the compounding-intelligence
|
||||
knowledge pipeline.
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
# Run the Twitter archive connector
|
||||
python3 scripts/run_connector.py twitter \
|
||||
--source ~/Documents/TwitterArchive \
|
||||
--output events.jsonl \
|
||||
--limit 100
|
||||
```
|
||||
|
||||
## Connector Output Format
|
||||
|
||||
Each connector emits `SourceEvent` objects (one JSON line per event):
|
||||
|
||||
```json
|
||||
{
|
||||
"source": "twitter",
|
||||
"account": "user_archive",
|
||||
"thread_or_channel": "tweet_123456",
|
||||
"author": "user_archive",
|
||||
"timestamp": "2026-04-26T08:30:00+00:00",
|
||||
"content": "Tweet text here",
|
||||
"attachments": ["https://..."],
|
||||
"raw_ref": "twitter:archive:tweet.js:123456",
|
||||
"hash": "sha256...",
|
||||
"consent_scope": "memory_only",
|
||||
"metadata": { "tweet_id": "123456", "favorite_count": 10 }
|
||||
}
|
||||
```
|
||||
|
||||
## Connector Registry
|
||||
|
||||
| Name | Source Format | Status |
|
||||
|----------------|-----------------------------|----------|
|
||||
| twitter_archive| Official Twitter data export| ✅ Working |
|
||||
| discord_archive| Discord data package / JSON | ⏳ Planned |
|
||||
| slack_archive | Slack export / API | ⏳ Planned |
|
||||
| whatsapp_archive| WhatsApp Desktop export | ⏳ Planned |
|
||||
| notion_archive | Notion markdown/SQLite | ⏳ Planned |
|
||||
| imessage_archive| macOS local chat storage | ⏳ Planned |
|
||||
| google_archive | Google Workspace CLI | ⏳ Planned |
|
||||
|
||||
## Design Principles
|
||||
|
||||
1. **Local-first**: Connectors operate on user-owned exports or explicit API credentials.
|
||||
2. **Incremental**: Checkpoint files (~/.cache/connectors/) allow resumable processing.
|
||||
3. **Consent-gated**: Default `consent_scope=memory_only` — explicit opt-in for broader use.
|
||||
4. **Provenance-preserving**: `metadata` retains all raw fields; `hash` enables deduplication.
|
||||
5. **Sovereign**: No ambient scraping. No cloud dependency unless user explicitly configures tokens.
|
||||
|
||||
## Writing a New Connector
|
||||
|
||||
Subclass `BaseConnector` from `connectors/base.py` and implement:
|
||||
|
||||
- `discover_sources(root: Path) -> Iterator[Path|str]` — find source files or IDs
|
||||
- `parse_source(source) -> Iterator[SourceEvent]` — emit normalized events
|
||||
|
||||
Register in `connectors/__init__.py` `_REGISTRY` dict.
|
||||
|
||||
See `connectors/twitter_archive.py` for a complete example.
|
||||
50
connectors/__init__.py
Normal file
50
connectors/__init__.py
Normal file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
connectors/__init__.py — Sovereign personal archive connector pack.
|
||||
|
||||
Provides:
|
||||
- BaseConnector: abstract base class for all connectors
|
||||
- SourceEvent: unified event schema
|
||||
- compute_event_hash, validate_event: utilities
|
||||
- Registry: connector discovery and loading
|
||||
|
||||
Connectors:
|
||||
- TwitterArchiveConnector: parse official Twitter/X archive exports
|
||||
(Future: Discord, Slack, WhatsApp, Notion, iMessage, Google)
|
||||
"""
|
||||
|
||||
from .base import BaseConnector
|
||||
from .schema import (
|
||||
SourceEvent,
|
||||
compute_event_hash,
|
||||
validate_event,
|
||||
CONSENT_MEMORY_ONLY,
|
||||
CONSENT_BOOTSTRAP,
|
||||
CONSENT_TRAINING,
|
||||
)
|
||||
from .twitter_archive import TwitterArchiveConnector
|
||||
|
||||
# Auto-registry: map of connector name → class
|
||||
_REGISTRY = {
|
||||
"twitter_archive": TwitterArchiveConnector,
|
||||
# Future connectors:
|
||||
# "discord_archive": DiscordArchiveConnector,
|
||||
# "slack_archive": SlackArchiveConnector,
|
||||
# "whatsapp_archive": WhatsAppArchiveConnector,
|
||||
# "notion_archive": NotionArchiveConnector,
|
||||
# "imessage_archive": iMessageArchiveConnector,
|
||||
# "google_archive": GoogleArchiveConnector,
|
||||
}
|
||||
|
||||
|
||||
def get_connector(name: str) -> type[BaseConnector]:
|
||||
"""Get connector class by registry name."""
|
||||
cls = _REGISTRY.get(name)
|
||||
if cls is None:
|
||||
raise ValueError(f"Unknown connector '{name}'. Available: {list(_REGISTRY.keys())}")
|
||||
return cls
|
||||
|
||||
|
||||
def list_connectors() -> list[str]:
|
||||
"""List all registered connector names."""
|
||||
return list(_REGISTRY.keys())
|
||||
100
connectors/base.py
Normal file
100
connectors/base.py
Normal file
@@ -0,0 +1,100 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
connectors/base.py — Abstract base class for all personal archive connectors.
|
||||
|
||||
Defines the contract every connector must implement. Connectors read local
|
||||
exports or API data and yield SourceEvent objects.
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
from typing import Iterator, Optional, Dict, Any
|
||||
import json
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseConnector(ABC):
|
||||
name: str = None
|
||||
source_glob: Optional[str] = None
|
||||
default_consent_scope: str = "memory_only"
|
||||
|
||||
def __init__(self, checkpoint_path: Optional[Path] = None,
|
||||
consent_scope: Optional[str] = None):
|
||||
self.consent_scope = consent_scope or self.default_consent_scope
|
||||
if checkpoint_path is None:
|
||||
cache_dir = Path.home() / ".cache" / "connectors"
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
checkpoint_path = cache_dir / f"{self.name}.checkpoint.jsonl"
|
||||
self.checkpoint_path = checkpoint_path
|
||||
self._processed_hashes = self._load_checkpoint()
|
||||
|
||||
def _load_checkpoint(self) -> set:
|
||||
if not self.checkpoint_path.exists():
|
||||
return set()
|
||||
seen = set()
|
||||
with open(self.checkpoint_path, 'r') as f:
|
||||
for line in f:
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
seen.add(entry.get('hash', ''))
|
||||
except Exception:
|
||||
continue
|
||||
logger.debug("Loaded %d checkpoint hashes for %s", len(seen), self.name)
|
||||
return seen
|
||||
|
||||
def _save_checkpoint(self, event) -> None:
|
||||
with open(self.checkpoint_path, 'a') as f:
|
||||
f.write(json.dumps({'hash': event.hash, 'source': event.source,
|
||||
'raw_ref': event.raw_ref}) + '\n')
|
||||
|
||||
def mark_processed(self, event_hash: str) -> None:
|
||||
self._processed_hashes.add(event_hash)
|
||||
|
||||
def is_processed(self, event_hash: str) -> bool:
|
||||
return event_hash in self._processed_hashes
|
||||
|
||||
def run(self, source_root: Path, limit: Optional[int] = None) -> Iterator:
|
||||
count = 0
|
||||
skipped_dedup = 0
|
||||
skipped_invalid = 0
|
||||
|
||||
for source in self.discover_sources(source_root):
|
||||
for event in self.parse_source(source):
|
||||
if not event.hash:
|
||||
from .schema import compute_event_hash
|
||||
event.hash = compute_event_hash(
|
||||
event.source, event.raw_ref, event.content,
|
||||
event.timestamp, event.author
|
||||
)
|
||||
|
||||
if event.hash in self._processed_hashes:
|
||||
skipped_dedup += 1
|
||||
continue
|
||||
|
||||
from .schema import validate_event
|
||||
if not validate_event(event):
|
||||
skipped_invalid += 1
|
||||
logger.warning("Invalid event: raw_ref=%s", event.raw_ref)
|
||||
continue
|
||||
|
||||
event.consent_scope = self.consent_scope
|
||||
self._save_checkpoint(event)
|
||||
self._processed_hashes.add(event.hash)
|
||||
|
||||
yield event
|
||||
count += 1
|
||||
if limit and count >= limit:
|
||||
return
|
||||
|
||||
logger.info("Connector %s complete: yielded=%d, skipped_dedup=%d, skipped_invalid=%d",
|
||||
self.name, count, skipped_dedup, skipped_invalid)
|
||||
|
||||
@abstractmethod
|
||||
def discover_sources(self, root: Path) -> Iterator:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def parse_source(self, source) -> Iterator:
|
||||
pass
|
||||
100
connectors/schema.py
Normal file
100
connectors/schema.py
Normal file
@@ -0,0 +1,100 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
connectors/schema.py — Unified source-event schema for personal archive connectors.
|
||||
|
||||
All connectors must produce events conforming to this schema so downstream
|
||||
pipelines (harvester → knowledge store) can process them uniformly.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime
|
||||
from typing import Optional, Any, Dict
|
||||
import hashlib
|
||||
import json
|
||||
|
||||
|
||||
@dataclass
|
||||
class SourceEvent:
|
||||
"""
|
||||
Canonical event schema for any ingested personal archive entry.
|
||||
|
||||
Fields
|
||||
------
|
||||
source : str
|
||||
Platform identifier: 'twitter', 'discord', 'slack', 'whatsapp',
|
||||
'notion', 'imessage', 'google', etc.
|
||||
account : str
|
||||
User account/channel identifier on the source platform.
|
||||
thread_or_channel : str
|
||||
Conversation thread, channel, or chat identifier.
|
||||
author : str
|
||||
Who created this content (may differ from account in group chats).
|
||||
timestamp : str
|
||||
ISO-8601 timestamp when the event occurred (not when it was ingested).
|
||||
content : str
|
||||
Primary text content. May be empty for non-text events (images only).
|
||||
attachments : list[str]
|
||||
List of local file paths or URLs for attached media.
|
||||
raw_ref : str
|
||||
Pointer to the raw source record (file path, message ID, URL, etc.).
|
||||
hash : str
|
||||
SHA-256 hash of the raw content for deduplication and provenance.
|
||||
consent_scope : str
|
||||
Privacy gate: where this content may be used.
|
||||
Examples: 'memory_only', 'bootstrap_context', 'training_data'.
|
||||
Default: 'memory_only' for ingested personal archives.
|
||||
metadata : dict[str, Any]
|
||||
Platform-specific fields retained for provenance but not indexed.
|
||||
"""
|
||||
source: str
|
||||
account: str
|
||||
thread_or_channel: str
|
||||
author: str
|
||||
timestamp: str
|
||||
content: str
|
||||
attachments: list[str]
|
||||
raw_ref: str
|
||||
hash: str
|
||||
consent_scope: str = "memory_only"
|
||||
metadata: Optional[Dict[str, Any]] = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert to plain dict for JSON serialization."""
|
||||
d = asdict(self)
|
||||
if d['metadata'] is None:
|
||||
d['metadata'] = {}
|
||||
return d
|
||||
|
||||
def to_json(self) -> str:
|
||||
"""Serialize to JSON line (one event per line)."""
|
||||
return json.dumps(self.to_dict(), ensure_ascii=False)
|
||||
|
||||
|
||||
def compute_event_hash(source: str, raw_ref: str, content: str,
|
||||
timestamp: str, author: str) -> str:
|
||||
"""
|
||||
Compute deterministic SHA-256 hash for an event.
|
||||
|
||||
Hash inputs: source + raw_ref + content + timestamp + author.
|
||||
This ensures identical content always produces the same hash,
|
||||
enabling cross-connector deduplication.
|
||||
"""
|
||||
canonical = f"{source}|{raw_ref}|{content}|{timestamp}|{author}"
|
||||
return hashlib.sha256(canonical.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def validate_event(event: SourceEvent) -> bool:
|
||||
"""
|
||||
Minimal structural validation for a SourceEvent.
|
||||
Returns True if required fields are present and well-formed.
|
||||
"""
|
||||
required = [event.source, event.account, event.thread_or_channel,
|
||||
event.author, event.timestamp, event.content, event.raw_ref,
|
||||
event.hash]
|
||||
return all(str(x).strip() for x in required)
|
||||
|
||||
|
||||
# Consent scope definitions
|
||||
CONSENT_MEMORY_ONLY = "memory_only" # For retrieval only, not bootstrap
|
||||
CONSENT_BOOTSTRAP = "bootstrap_context" # Can seed new sessions
|
||||
CONSENT_TRAINING = "training_data" # May be used for model training
|
||||
155
connectors/twitter_archive.py
Normal file
155
connectors/twitter_archive.py
Normal file
@@ -0,0 +1,155 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
connectors/twitter_archive.py — Twitter/X personal archive connector.
|
||||
|
||||
Parses official Twitter data exports (Twitter's "Download your data" archive).
|
||||
Expects the tweet.js / tweet.json files from the archive's data/ directory.
|
||||
|
||||
Format (Twitter's archived tweets JSON):
|
||||
Each entry has: {"tweet": {"id_str": "...", "full_text": "...", "created_at": "...", ...}}
|
||||
|
||||
Output: normalized SourceEvent with source='twitter'.
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Iterator, Optional
|
||||
import logging
|
||||
|
||||
from .base import BaseConnector
|
||||
from .schema import SourceEvent, compute_event_hash
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TwitterArchiveConnector(BaseConnector):
|
||||
"""Connector for Twitter/X official archive exports."""
|
||||
name = "twitter_archive"
|
||||
source_glob = "**/tweet*.json"
|
||||
default_consent_scope = "memory_only"
|
||||
|
||||
# Twitter's date format in archives: "Wed Oct 10 20:19:24 +0000 2018"
|
||||
TWITTER_DATE_FMT = "%a %b %d %H:%M:%S %z %Y"
|
||||
|
||||
def discover_sources(self, root: Path) -> Iterator[Path]:
|
||||
"""
|
||||
Find tweet.js / tweet.json files in a Twitter archive.
|
||||
|
||||
The official Twitter export places these under:
|
||||
root/
|
||||
data/
|
||||
tweet.js (single-file format, older exports)
|
||||
or
|
||||
account-XXXX-YYYY/
|
||||
tweets.js (per-month splitted format)
|
||||
"""
|
||||
root = Path(root)
|
||||
# Search for .js files that start with 'tweet' — these contain the tweet JSON blobs
|
||||
candidates = list(root.rglob("tweet*.js")) + list(root.rglob("tweet*.json"))
|
||||
logger.info("Discovered %d Twitter archive files under %s", len(candidates), root)
|
||||
for path in candidates:
|
||||
yield path
|
||||
|
||||
def parse_source(self, source: Path) -> Iterator[SourceEvent]:
|
||||
"""
|
||||
Parse a Twitter archive file and yield SourceEvents.
|
||||
|
||||
Handles both single-file (old) and per-month splitted formats.
|
||||
Twitter wraps the JSON array in a JS variable assignment: `window.YTD.tweet.part0 = [...]`
|
||||
"""
|
||||
try:
|
||||
with open(source, 'r', encoding='utf-8') as f:
|
||||
raw = f.read()
|
||||
|
||||
# Extract JSON array from the JS wrapper
|
||||
match = re.search(r'=\s*(\[.+?\])\s*;?\s*$', raw, re.DOTALL)
|
||||
if match:
|
||||
json_str = match.group(1)
|
||||
records = json.loads(json_str)
|
||||
else:
|
||||
# Plain JSON array (no wrapper)
|
||||
records = json.loads(raw)
|
||||
|
||||
logger.debug("Parsing %d tweet records from %s", len(records), source)
|
||||
|
||||
for record in records:
|
||||
event = self._record_to_event(record, source)
|
||||
if event:
|
||||
yield event
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to parse %s: %s", source, e)
|
||||
|
||||
def _record_to_event(self, record: dict, source_path: Path) -> Optional[SourceEvent]:
|
||||
"""
|
||||
Convert a single tweet record into a SourceEvent.
|
||||
|
||||
The record can be either the wrapped format {"tweet": {...}}} or the bare tweet object.
|
||||
"""
|
||||
# Unwrap the tweet object
|
||||
tweet = record.get('tweet', record)
|
||||
|
||||
# Extract core fields
|
||||
id_str = tweet.get('id_str') or tweet.get('id')
|
||||
full_text = tweet.get('full_text') or tweet.get('text', '')
|
||||
created_at = tweet.get('created_at', '')
|
||||
|
||||
# Parse timestamp
|
||||
try:
|
||||
dt = datetime.strptime(created_at, self.TWITTER_DATE_FMT)
|
||||
iso_ts = dt.astimezone().isoformat()
|
||||
except Exception:
|
||||
iso_ts = created_at # fallback: keep as-is
|
||||
|
||||
# Author is always the account owner (Twitter archives don't include others' DMs by default)
|
||||
account = "user_archive" # normalized account identifier
|
||||
|
||||
# Thread/channel: individual tweets have no thread ID; threads aren't preserved in basic export
|
||||
thread_id = f"tweet_{id_str}"
|
||||
|
||||
# Attachments: extract media URLs
|
||||
attachments = []
|
||||
extended_entities = tweet.get('extended_entities', {})
|
||||
for media in extended_entities.get('media', []):
|
||||
url = media.get('media_url_https') or media.get('media_url')
|
||||
if url:
|
||||
attachments.append(url)
|
||||
|
||||
# Build raw_ref
|
||||
raw_ref = f"twitter:archive:{source_path.name}:{id_str}"
|
||||
|
||||
# Compute hash
|
||||
content_for_hash = full_text or ""
|
||||
hash_val = compute_event_hash(
|
||||
source="twitter",
|
||||
raw_ref=raw_ref,
|
||||
content=content_for_hash,
|
||||
timestamp=iso_ts,
|
||||
author=account
|
||||
)
|
||||
|
||||
# Preserve metadata for provenance
|
||||
metadata = {
|
||||
"tweet_id": id_str,
|
||||
"source_file": str(source_path),
|
||||
"favorite_count": tweet.get('favorite_count'),
|
||||
"retweet_count": tweet.get('retweet_count'),
|
||||
"in_reply_to_status_id": tweet.get('in_reply_to_status_id_str'),
|
||||
"lang": tweet.get('lang'),
|
||||
}
|
||||
|
||||
return SourceEvent(
|
||||
source="twitter",
|
||||
account=account,
|
||||
thread_or_channel=thread_id,
|
||||
author=account,
|
||||
timestamp=iso_ts,
|
||||
content=full_text,
|
||||
attachments=attachments,
|
||||
raw_ref=raw_ref,
|
||||
hash=hash_val,
|
||||
consent_scope=self.consent_scope,
|
||||
metadata=metadata
|
||||
)
|
||||
297
quality_gate.py
Normal file
297
quality_gate.py
Normal file
@@ -0,0 +1,297 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
quality_gate.py — Score and filter knowledge entries.
|
||||
|
||||
Scores each entry on 4 dimensions:
|
||||
- Specificity: concrete examples vs vague generalities
|
||||
- Actionability: can this be used to do something?
|
||||
- Freshness: is this still accurate?
|
||||
- Source quality: was the model/provider reliable?
|
||||
|
||||
Usage:
|
||||
from quality_gate import score_entry, filter_entries, quality_report
|
||||
|
||||
score = score_entry(entry)
|
||||
filtered = filter_entries(entries, threshold=0.5)
|
||||
report = quality_report(entries)
|
||||
"""
|
||||
|
||||
import json
|
||||
import math
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any, Optional
|
||||
|
||||
# Source quality scores (higher = more reliable)
|
||||
SOURCE_QUALITY = {
|
||||
"claude-sonnet": 0.9,
|
||||
"claude-opus": 0.95,
|
||||
"gpt-4": 0.85,
|
||||
"gpt-4-turbo": 0.85,
|
||||
"gpt-5": 0.9,
|
||||
"mimo-v2-pro": 0.8,
|
||||
"gemini-pro": 0.8,
|
||||
"llama-3-70b": 0.75,
|
||||
"llama-3-8b": 0.7,
|
||||
"ollama": 0.6,
|
||||
"unknown": 0.5,
|
||||
}
|
||||
|
||||
DEFAULT_SOURCE_QUALITY = 0.5
|
||||
|
||||
# Specificity indicators
|
||||
SPECIFIC_INDICATORS = [
|
||||
r"\b\d+\.\d+", # decimal numbers
|
||||
r"\b\d{4}-\d{2}-\d{2}", # dates
|
||||
r"\b[A-Z][a-z]+\s[A-Z][a-z]+", # proper nouns
|
||||
r"`[^`]+`", # code/commands
|
||||
r"https?://", # URLs
|
||||
r"\b(example|instance|specifically|concretely)\b",
|
||||
r"\b(step \d|first|second|third)\b",
|
||||
r"\b(exactly|precisely|measured|counted)\b",
|
||||
]
|
||||
|
||||
# Vagueness indicators (penalty)
|
||||
VAGUE_INDICATORS = [
|
||||
r"\b(generally|usually|often|sometimes|might|could|perhaps)\b",
|
||||
r"\b(various|several|many|some|few)\b",
|
||||
r"\b(it depends|varies|differs)\b",
|
||||
r"\b(basically|essentially|fundamentally)\b",
|
||||
r"\b(everyone knows|it's obvious|clearly)\b",
|
||||
]
|
||||
|
||||
# Actionability indicators
|
||||
ACTIONABLE_INDICATORS = [
|
||||
r"\b(run|execute|install|deploy|configure|set up)\b",
|
||||
r"\b(use|apply|implement|create|build)\b",
|
||||
r"\b(check|verify|test|validate|confirm)\b",
|
||||
r"\b(fix|resolve|solve|debug|troubleshoot)\b",
|
||||
r"\b(if .+ then|when .+ do|to .+ use)\b",
|
||||
r"```[a-z]*\n", # code blocks
|
||||
r"\$\s", # shell commands
|
||||
r"\b\d+\.\s", # numbered steps
|
||||
]
|
||||
|
||||
|
||||
def score_specificity(content: str) -> float:
|
||||
"""Score specificity: 0=vague, 1=very specific."""
|
||||
content_lower = content.lower()
|
||||
score = 0.5 # baseline
|
||||
|
||||
# Check for specific indicators
|
||||
specific_count = sum(
|
||||
len(re.findall(p, content, re.IGNORECASE))
|
||||
for p in SPECIFIC_INDICATORS
|
||||
)
|
||||
|
||||
# Check for vague indicators
|
||||
vague_count = sum(
|
||||
len(re.findall(p, content_lower))
|
||||
for p in VAGUE_INDICATORS
|
||||
)
|
||||
|
||||
# Adjust score
|
||||
score += min(specific_count * 0.05, 0.4)
|
||||
score -= min(vague_count * 0.08, 0.3)
|
||||
|
||||
# Length bonus (longer = more detail, up to a point)
|
||||
word_count = len(content.split())
|
||||
if word_count > 50:
|
||||
score += min((word_count - 50) * 0.001, 0.1)
|
||||
|
||||
return max(0.0, min(1.0, score))
|
||||
|
||||
|
||||
def score_actionability(content: str) -> float:
|
||||
"""Score actionability: 0=abstract, 1=highly actionable."""
|
||||
content_lower = content.lower()
|
||||
score = 0.3 # baseline (most knowledge is informational)
|
||||
|
||||
# Check for actionable indicators
|
||||
actionable_count = sum(
|
||||
len(re.findall(p, content_lower))
|
||||
for p in ACTIONABLE_INDICATORS
|
||||
)
|
||||
|
||||
score += min(actionable_count * 0.1, 0.6)
|
||||
|
||||
# Code blocks are highly actionable
|
||||
if "```" in content:
|
||||
score += 0.2
|
||||
|
||||
# Numbered steps are actionable
|
||||
if re.search(r"\d+\.\s+\w", content):
|
||||
score += 0.1
|
||||
|
||||
return max(0.0, min(1.0, score))
|
||||
|
||||
|
||||
def score_freshness(timestamp: Optional[str]) -> float:
|
||||
"""Score freshness: 1=new, decays over time."""
|
||||
if not timestamp:
|
||||
return 0.5
|
||||
|
||||
try:
|
||||
if isinstance(timestamp, str):
|
||||
ts = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
|
||||
else:
|
||||
ts = timestamp
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
age_days = (now - ts).days
|
||||
|
||||
# Exponential decay: 1.0 at day 0, 0.5 at ~180 days, 0.1 at ~365 days
|
||||
score = math.exp(-age_days / 180)
|
||||
return max(0.1, min(1.0, score))
|
||||
except (ValueError, TypeError):
|
||||
return 0.5
|
||||
|
||||
|
||||
def score_source_quality(model: Optional[str]) -> float:
|
||||
"""Score source quality based on model/provider."""
|
||||
if not model:
|
||||
return DEFAULT_SOURCE_QUALITY
|
||||
|
||||
# Normalize model name
|
||||
model_lower = model.lower()
|
||||
for key, score in SOURCE_QUALITY.items():
|
||||
if key in model_lower:
|
||||
return score
|
||||
|
||||
return DEFAULT_SOURCE_QUALITY
|
||||
|
||||
|
||||
def score_entry(entry: dict) -> float:
|
||||
"""
|
||||
Score a knowledge entry on quality (0.0-1.0).
|
||||
|
||||
Weights:
|
||||
- specificity: 0.3
|
||||
- actionability: 0.3
|
||||
- freshness: 0.2
|
||||
- source_quality: 0.2
|
||||
"""
|
||||
content = entry.get("content", entry.get("text", entry.get("response", "")))
|
||||
model = entry.get("model", entry.get("provenance", {}).get("model"))
|
||||
timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp"))
|
||||
|
||||
specificity = score_specificity(content)
|
||||
actionability = score_actionability(content)
|
||||
freshness = score_freshness(timestamp)
|
||||
source = score_source_quality(model)
|
||||
|
||||
return round(
|
||||
0.3 * specificity +
|
||||
0.3 * actionability +
|
||||
0.2 * freshness +
|
||||
0.2 * source,
|
||||
4
|
||||
)
|
||||
|
||||
|
||||
def score_entry_detailed(entry: dict) -> dict:
|
||||
"""Score with breakdown."""
|
||||
content = entry.get("content", entry.get("text", entry.get("response", "")))
|
||||
model = entry.get("model", entry.get("provenance", {}).get("model"))
|
||||
timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp"))
|
||||
|
||||
specificity = score_specificity(content)
|
||||
actionability = score_actionability(content)
|
||||
freshness = score_freshness(timestamp)
|
||||
source = score_source_quality(model)
|
||||
|
||||
return {
|
||||
"score": round(0.3 * specificity + 0.3 * actionability + 0.2 * freshness + 0.2 * source, 4),
|
||||
"specificity": round(specificity, 4),
|
||||
"actionability": round(actionability, 4),
|
||||
"freshness": round(freshness, 4),
|
||||
"source_quality": round(source, 4),
|
||||
}
|
||||
|
||||
|
||||
def filter_entries(entries: List[dict], threshold: float = 0.5) -> List[dict]:
|
||||
"""Filter entries below quality threshold."""
|
||||
filtered = []
|
||||
for entry in entries:
|
||||
if score_entry(entry) >= threshold:
|
||||
filtered.append(entry)
|
||||
return filtered
|
||||
|
||||
|
||||
def quality_report(entries: List[dict]) -> str:
|
||||
"""Generate quality distribution report."""
|
||||
if not entries:
|
||||
return "No entries to analyze."
|
||||
|
||||
scores = [score_entry(e) for e in entries]
|
||||
|
||||
avg = sum(scores) / len(scores)
|
||||
min_score = min(scores)
|
||||
max_score = max(scores)
|
||||
|
||||
# Distribution buckets
|
||||
buckets = {"high": 0, "medium": 0, "low": 0, "rejected": 0}
|
||||
for s in scores:
|
||||
if s >= 0.7:
|
||||
buckets["high"] += 1
|
||||
elif s >= 0.5:
|
||||
buckets["medium"] += 1
|
||||
elif s >= 0.3:
|
||||
buckets["low"] += 1
|
||||
else:
|
||||
buckets["rejected"] += 1
|
||||
|
||||
lines = [
|
||||
"=" * 50,
|
||||
" QUALITY GATE REPORT",
|
||||
"=" * 50,
|
||||
f" Total entries: {len(entries)}",
|
||||
f" Average score: {avg:.3f}",
|
||||
f" Min: {min_score:.3f}",
|
||||
f" Max: {max_score:.3f}",
|
||||
"",
|
||||
" Distribution:",
|
||||
]
|
||||
|
||||
for bucket, count in buckets.items():
|
||||
pct = count / len(entries) * 100
|
||||
bar = "█" * int(pct / 5)
|
||||
lines.append(f" {bucket:<12} {count:>5} ({pct:>5.1f}%) {bar}")
|
||||
|
||||
passed = buckets["high"] + buckets["medium"]
|
||||
lines.append(f"\n Pass rate (>= 0.5): {passed}/{len(entries)} ({passed/len(entries)*100:.1f}%)")
|
||||
lines.append("=" * 50)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Knowledge quality gate")
|
||||
parser.add_argument("files", nargs="+", help="JSONL files to score")
|
||||
parser.add_argument("--threshold", type=float, default=0.5, help="Quality threshold")
|
||||
parser.add_argument("--json", action="store_true", help="JSON output")
|
||||
parser.add_argument("--filter", action="store_true", help="Filter and write back")
|
||||
args = parser.parse_args()
|
||||
|
||||
all_entries = []
|
||||
for filepath in args.files:
|
||||
with open(filepath) as f:
|
||||
for line in f:
|
||||
if line.strip():
|
||||
all_entries.append(json.loads(line))
|
||||
|
||||
if args.json:
|
||||
results = [{"entry": e, **score_entry_detailed(e)} for e in all_entries]
|
||||
print(json.dumps(results, indent=2))
|
||||
elif args.filter:
|
||||
filtered = filter_entries(all_entries, args.threshold)
|
||||
print(f"Kept {len(filtered)}/{len(all_entries)} entries (threshold: {args.threshold})")
|
||||
else:
|
||||
print(quality_report(all_entries))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
317
scripts/dedup.py
Normal file
317
scripts/dedup.py
Normal file
@@ -0,0 +1,317 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
dedup.py — Knowledge deduplication: content hash + semantic similarity.
|
||||
|
||||
Deduplicates harvested knowledge entries to avoid training on duplicates.
|
||||
Uses content hashing for exact matches and token overlap for near-duplicates.
|
||||
|
||||
Usage:
|
||||
python3 dedup.py --input knowledge/index.json --output knowledge/index_deduped.json
|
||||
python3 dedup.py --input knowledge/index.json --dry-run
|
||||
python3 dedup.py --test # Run built-in dedup test
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional, Tuple
|
||||
|
||||
|
||||
def normalize_text(text: str) -> str:
|
||||
"""Normalize text for hashing: lowercase, collapse whitespace, strip."""
|
||||
text = text.lower().strip()
|
||||
text = re.sub(r'\s+', ' ', text)
|
||||
return text
|
||||
|
||||
|
||||
def content_hash(text: str) -> str:
|
||||
"""SHA256 hash of normalized text for exact dedup."""
|
||||
normalized = normalize_text(text)
|
||||
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def tokenize(text: str) -> set:
|
||||
"""Simple tokenizer: lowercase words, 3+ chars."""
|
||||
words = re.findall(r'[a-z0-9_]{3,}', text.lower())
|
||||
return set(words)
|
||||
|
||||
|
||||
def token_similarity(a: str, b: str) -> float:
|
||||
"""Token-based Jaccard similarity (0.0-1.0).
|
||||
|
||||
Fast local alternative to embedding similarity.
|
||||
Good enough for near-duplicate detection.
|
||||
"""
|
||||
tokens_a = tokenize(a)
|
||||
tokens_b = tokenize(b)
|
||||
if not tokens_a or not tokens_b:
|
||||
return 0.0
|
||||
intersection = tokens_a & tokens_b
|
||||
union = tokens_a | tokens_b
|
||||
return len(intersection) / len(union)
|
||||
|
||||
|
||||
def quality_score(fact: dict) -> float:
|
||||
"""Compute quality score for merge ranking.
|
||||
|
||||
Higher is better. Factors:
|
||||
- confidence (0-1)
|
||||
- source_count (more confirmations = better)
|
||||
- has tags (richer metadata)
|
||||
"""
|
||||
confidence = fact.get('confidence', 0.5)
|
||||
source_count = fact.get('source_count', 1)
|
||||
has_tags = 1.0 if fact.get('tags') else 0.0
|
||||
has_related = 1.0 if fact.get('related') else 0.0
|
||||
|
||||
# Weighted composite
|
||||
score = (
|
||||
confidence * 0.5 +
|
||||
min(source_count / 10, 1.0) * 0.3 +
|
||||
has_tags * 0.1 +
|
||||
has_related * 0.1
|
||||
)
|
||||
return round(score, 4)
|
||||
|
||||
|
||||
def merge_facts(keep: dict, drop: dict) -> dict:
|
||||
"""Merge two near-duplicate facts, keeping higher-quality fields.
|
||||
|
||||
The 'keep' fact is enriched with metadata from 'drop'.
|
||||
"""
|
||||
# Merge tags (union)
|
||||
keep_tags = set(keep.get('tags', []))
|
||||
drop_tags = set(drop.get('tags', []))
|
||||
keep['tags'] = sorted(keep_tags | drop_tags)
|
||||
|
||||
# Merge related (union)
|
||||
keep_related = set(keep.get('related', []))
|
||||
drop_related = set(drop.get('related', []))
|
||||
keep['related'] = sorted(keep_related | drop_related)
|
||||
|
||||
# Update source_count (sum)
|
||||
keep['source_count'] = keep.get('source_count', 1) + drop.get('source_count', 1)
|
||||
|
||||
# Update confidence (max — we've now seen it from multiple sources)
|
||||
keep['confidence'] = max(keep.get('confidence', 0), drop.get('confidence', 0))
|
||||
|
||||
# Track that we merged
|
||||
if '_merged_from' not in keep:
|
||||
keep['_merged_from'] = []
|
||||
keep['_merged_from'].append(drop.get('id', 'unknown'))
|
||||
|
||||
return keep
|
||||
|
||||
|
||||
def dedup_facts(
|
||||
facts: List[dict],
|
||||
exact_threshold: float = 1.0,
|
||||
near_threshold: float = 0.95,
|
||||
dry_run: bool = False,
|
||||
) -> Tuple[List[dict], dict]:
|
||||
"""Deduplicate a list of knowledge facts.
|
||||
|
||||
Args:
|
||||
facts: List of fact dicts (from index.json)
|
||||
exact_threshold: Hash match = exact duplicate
|
||||
near_threshold: Token similarity above this = near-duplicate
|
||||
dry_run: If True, don't modify, just report
|
||||
|
||||
Returns:
|
||||
(deduped_facts, stats_dict)
|
||||
"""
|
||||
if not facts:
|
||||
return [], {"total": 0, "exact_dupes": 0, "near_dupes": 0, "unique": 0}
|
||||
|
||||
# Phase 1: Exact dedup by content hash
|
||||
hash_seen = {} # hash -> index in deduped list
|
||||
exact_dupes = 0
|
||||
deduped = []
|
||||
|
||||
for fact in facts:
|
||||
text = fact.get('fact', '')
|
||||
h = content_hash(text)
|
||||
|
||||
if h in hash_seen:
|
||||
# Exact duplicate — merge metadata into existing
|
||||
existing_idx = hash_seen[h]
|
||||
if not dry_run:
|
||||
deduped[existing_idx] = merge_facts(deduped[existing_idx], fact)
|
||||
exact_dupes += 1
|
||||
else:
|
||||
hash_seen[h] = len(deduped)
|
||||
deduped.append(fact)
|
||||
|
||||
# Phase 2: Near-dup by token similarity
|
||||
near_dupes = 0
|
||||
i = 0
|
||||
while i < len(deduped):
|
||||
j = i + 1
|
||||
while j < len(deduped):
|
||||
sim = token_similarity(deduped[i].get('fact', ''), deduped[j].get('fact', ''))
|
||||
if sim >= near_threshold:
|
||||
# Near-duplicate — keep higher quality
|
||||
q_i = quality_score(deduped[i])
|
||||
q_j = quality_score(deduped[j])
|
||||
if q_i >= q_j:
|
||||
if not dry_run:
|
||||
deduped[i] = merge_facts(deduped[i], deduped[j])
|
||||
deduped.pop(j)
|
||||
else:
|
||||
# j is higher quality — merge i into j, then remove i
|
||||
if not dry_run:
|
||||
deduped[j] = merge_facts(deduped[j], deduped[i])
|
||||
deduped.pop(i)
|
||||
break # i changed, restart inner loop
|
||||
near_dupes += 1
|
||||
else:
|
||||
j += 1
|
||||
i += 1
|
||||
|
||||
stats = {
|
||||
"total": len(facts),
|
||||
"exact_dupes": exact_dupes,
|
||||
"near_dupes": near_dupes,
|
||||
"unique": len(deduped),
|
||||
"removed": len(facts) - len(deduped),
|
||||
}
|
||||
|
||||
return deduped, stats
|
||||
|
||||
|
||||
def dedup_index_file(
|
||||
input_path: str,
|
||||
output_path: Optional[str] = None,
|
||||
near_threshold: float = 0.95,
|
||||
dry_run: bool = False,
|
||||
) -> dict:
|
||||
"""Deduplicate an index.json file.
|
||||
|
||||
Args:
|
||||
input_path: Path to index.json
|
||||
output_path: Where to write deduped file (default: overwrite input)
|
||||
near_threshold: Token similarity threshold for near-dupes
|
||||
dry_run: Report only, don't write
|
||||
|
||||
Returns stats dict.
|
||||
"""
|
||||
path = Path(input_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Index file not found: {input_path}")
|
||||
|
||||
with open(path) as f:
|
||||
data = json.load(f)
|
||||
|
||||
facts = data.get('facts', [])
|
||||
deduped, stats = dedup_facts(facts, near_threshold=near_threshold, dry_run=dry_run)
|
||||
|
||||
if not dry_run:
|
||||
data['facts'] = deduped
|
||||
data['total_facts'] = len(deduped)
|
||||
data['last_dedup'] = __import__('datetime').datetime.now(
|
||||
__import__('datetime').timezone.utc
|
||||
).isoformat()
|
||||
|
||||
out_path = Path(output_path) if output_path else path
|
||||
with open(out_path, 'w') as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
def generate_test_duplicates(n: int = 20) -> List[dict]:
|
||||
"""Generate test facts with intentional duplicates for testing.
|
||||
|
||||
Creates n unique facts plus n/4 exact dupes and n/4 near-dupes.
|
||||
"""
|
||||
import random
|
||||
random.seed(42)
|
||||
|
||||
unique_facts = []
|
||||
for i in range(n):
|
||||
topic = random.choice(["git", "python", "docker", "rust", "nginx"])
|
||||
tip = random.choice(["use verbose flags", "check logs first", "restart service", "clear cache", "update config"])
|
||||
unique_facts.append({
|
||||
"id": f"test:fact:{i:03d}",
|
||||
"fact": f"When working with {topic}, always {tip} before deploying.",
|
||||
"category": "fact",
|
||||
"domain": "test",
|
||||
"confidence": round(random.uniform(0.5, 1.0), 2),
|
||||
"source_count": random.randint(1, 5),
|
||||
"tags": [topic, "test"],
|
||||
})
|
||||
|
||||
# Add exact duplicates (same text, different IDs)
|
||||
duped = list(unique_facts)
|
||||
for i in range(n // 4):
|
||||
original = unique_facts[i]
|
||||
dupe = dict(original)
|
||||
dupe["id"] = f"test:fact:dup{i:03d}"
|
||||
dupe["confidence"] = round(random.uniform(0.3, 0.8), 2)
|
||||
duped.append(dupe)
|
||||
|
||||
# Add near-duplicates (slightly different phrasing)
|
||||
for i in range(n // 4):
|
||||
original = unique_facts[i]
|
||||
near = dict(original)
|
||||
near["id"] = f"test:fact:near{i:03d}"
|
||||
near["fact"] = original["fact"].replace("always", "should").replace("before deploying", "prior to deployment")
|
||||
near["confidence"] = round(random.uniform(0.4, 0.9), 2)
|
||||
duped.append(near)
|
||||
|
||||
return duped
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Knowledge deduplication")
|
||||
parser.add_argument("--input", help="Path to index.json")
|
||||
parser.add_argument("--output", help="Output path (default: overwrite input)")
|
||||
parser.add_argument("--threshold", type=float, default=0.95,
|
||||
help="Near-dup similarity threshold (default: 0.95)")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Report only, don't write")
|
||||
parser.add_argument("--test", action="store_true", help="Run built-in dedup test")
|
||||
parser.add_argument("--json", action="store_true", help="JSON output")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.test:
|
||||
test_facts = generate_test_duplicates(20)
|
||||
print(f"Generated {len(test_facts)} test facts (20 unique + dupes)")
|
||||
deduped, stats = dedup_facts(test_facts, near_threshold=args.threshold)
|
||||
print(f"\nDedup results:")
|
||||
print(f" Total input: {stats['total']}")
|
||||
print(f" Exact dupes: {stats['exact_dupes']}")
|
||||
print(f" Near dupes: {stats['near_dupes']}")
|
||||
print(f" Unique output: {stats['unique']}")
|
||||
print(f" Removed: {stats['removed']}")
|
||||
|
||||
# Verify: should have ~20 unique (some merged)
|
||||
assert stats['unique'] <= 20, f"Too many unique: {stats['unique']} > 20"
|
||||
assert stats['unique'] >= 15, f"Too few unique: {stats['unique']} < 15"
|
||||
assert stats['removed'] > 0, "No duplicates removed"
|
||||
print("\nOK: Dedup test passed")
|
||||
return
|
||||
|
||||
if not args.input:
|
||||
print("ERROR: Provide --input or --test")
|
||||
sys.exit(1)
|
||||
|
||||
stats = dedup_index_file(args.input, args.output, args.threshold, args.dry_run)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(stats, indent=2))
|
||||
else:
|
||||
print(f"Dedup results:")
|
||||
print(f" Total input: {stats['total']}")
|
||||
print(f" Exact dupes: {stats['exact_dupes']}")
|
||||
print(f" Near dupes: {stats['near_dupes']}")
|
||||
print(f" Unique output: {stats['unique']}")
|
||||
print(f" Removed: {stats['removed']}")
|
||||
if args.dry_run:
|
||||
print(" (dry run — no changes written)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -149,8 +149,8 @@ def to_dot(graph: dict) -> str:
|
||||
"""Generate DOT format output."""
|
||||
lines = ["digraph dependencies {"]
|
||||
lines.append(" rankdir=LR;")
|
||||
lines.append(" node [shape=box, style=filled, fillcolor="#1a1a2e", fontcolor="#e6edf3"];")
|
||||
lines.append(" edge [color="#4a4a6a"];")
|
||||
lines.append(' node [shape=box, style=filled, fillcolor="#1a1a2e", fontcolor="#e6edf3"];')
|
||||
lines.append(' edge [color="#4a4a6a"];')
|
||||
lines.append("")
|
||||
|
||||
for repo, data in sorted(graph.items()):
|
||||
|
||||
387
scripts/freshness.py
Normal file
387
scripts/freshness.py
Normal file
@@ -0,0 +1,387 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Knowledge Freshness Cron — Detect stale entries from code changes (Issue #200)
|
||||
|
||||
Automatically detects when knowledge entries become stale due to code changes.
|
||||
|
||||
Detection Method:
|
||||
1. Track source file hash alongside knowledge entry
|
||||
2. Compare current file hashes vs stored
|
||||
3. Mismatch → flag entry as potentially stale
|
||||
4. Report stale entries and optionally re-extract
|
||||
|
||||
Usage:
|
||||
python3 scripts/freshness.py --knowledge-dir knowledge/
|
||||
python3 scripts/freshness.py --knowledge-dir knowledge/ --json
|
||||
python3 scripts/freshness.py --knowledge-dir knowledge/ --repo /path/to/repo
|
||||
python3 scripts/freshness.py --knowledge-dir knowledge/ --auto-reextract
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import yaml
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any, Optional, Tuple
|
||||
|
||||
|
||||
def compute_file_hash(filepath: str) -> Optional[str]:
|
||||
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
|
||||
try:
|
||||
with open(filepath, "rb") as f:
|
||||
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
|
||||
except (FileNotFoundError, IsADirectoryError, PermissionError):
|
||||
return None
|
||||
|
||||
|
||||
def get_git_file_changes(repo_path: str, days: int = 1) -> Dict[str, List[str]]:
|
||||
"""
|
||||
Get files changed in git in the last N days.
|
||||
|
||||
Returns dict with 'modified', 'added', 'deleted' lists of file paths.
|
||||
"""
|
||||
changes = {"modified": [], "added": [], "deleted": []}
|
||||
|
||||
try:
|
||||
# Get commits from last N days
|
||||
cmd = [
|
||||
"git", "-C", repo_path, "log",
|
||||
f"--since={days} days ago",
|
||||
"--name-status",
|
||||
"--pretty=format:",
|
||||
"--diff-filter=MAD"
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
|
||||
if result.returncode != 0:
|
||||
return changes
|
||||
|
||||
for line in result.stdout.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
parts = line.split('\t', 1)
|
||||
if len(parts) != 2:
|
||||
continue
|
||||
|
||||
status, filepath = parts
|
||||
if status == 'M':
|
||||
changes["modified"].append(filepath)
|
||||
elif status == 'A':
|
||||
changes["added"].append(filepath)
|
||||
elif status == 'D':
|
||||
changes["deleted"].append(filepath)
|
||||
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
pass
|
||||
|
||||
# Deduplicate
|
||||
for key in changes:
|
||||
changes[key] = list(set(changes[key]))
|
||||
|
||||
return changes
|
||||
|
||||
|
||||
def load_knowledge_entries(knowledge_dir: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Load knowledge entries from YAML files in the knowledge directory.
|
||||
|
||||
Supports:
|
||||
- knowledge/index.json (legacy format)
|
||||
- knowledge/global/*.yaml
|
||||
- knowledge/repos/*.yaml
|
||||
- knowledge/agents/*.yaml
|
||||
"""
|
||||
entries = []
|
||||
|
||||
# Load from index.json if exists
|
||||
index_path = os.path.join(knowledge_dir, "index.json")
|
||||
if os.path.exists(index_path):
|
||||
try:
|
||||
with open(index_path) as f:
|
||||
data = json.load(f)
|
||||
for fact in data.get("facts", []):
|
||||
entries.append({
|
||||
"source": "index.json",
|
||||
"fact": fact.get("fact", ""),
|
||||
"source_file": fact.get("source_file"),
|
||||
"source_hash": fact.get("source_hash"),
|
||||
"category": fact.get("category", "unknown"),
|
||||
"confidence": fact.get("confidence", 0.5)
|
||||
})
|
||||
except (json.JSONDecodeError, KeyError):
|
||||
pass
|
||||
|
||||
# Load from YAML files
|
||||
for subdir in ["global", "repos", "agents"]:
|
||||
subdir_path = os.path.join(knowledge_dir, subdir)
|
||||
if not os.path.isdir(subdir_path):
|
||||
continue
|
||||
|
||||
for filename in os.listdir(subdir_path):
|
||||
if not filename.endswith((".yaml", ".yml")):
|
||||
continue
|
||||
|
||||
filepath = os.path.join(subdir_path, filename)
|
||||
try:
|
||||
with open(filepath) as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
if not data or not isinstance(data, dict):
|
||||
continue
|
||||
|
||||
# Extract entries from YAML structure
|
||||
for key, value in data.items():
|
||||
if isinstance(value, list):
|
||||
for item in value:
|
||||
if isinstance(item, dict):
|
||||
entries.append({
|
||||
"source": f"{subdir}/{filename}",
|
||||
"fact": item.get("description", item.get("fact", "")),
|
||||
"source_file": item.get("source_file"),
|
||||
"source_hash": item.get("source_hash"),
|
||||
"category": item.get("category", "unknown"),
|
||||
"confidence": item.get("confidence", 0.5)
|
||||
})
|
||||
elif isinstance(value, dict):
|
||||
entries.append({
|
||||
"source": f"{subdir}/{filename}",
|
||||
"fact": value.get("description", value.get("fact", "")),
|
||||
"source_file": value.get("source_file"),
|
||||
"source_hash": value.get("source_hash"),
|
||||
"category": value.get("category", "unknown"),
|
||||
"confidence": value.get("confidence", 0.5)
|
||||
})
|
||||
except (yaml.YAMLError, IOError):
|
||||
pass
|
||||
|
||||
return entries
|
||||
|
||||
|
||||
def check_freshness(knowledge_dir: str, repo_root: str = ".",
|
||||
days: int = 1) -> Dict[str, Any]:
|
||||
"""
|
||||
Check freshness of knowledge entries against recent code changes.
|
||||
|
||||
Returns:
|
||||
{
|
||||
"timestamp": ISO timestamp,
|
||||
"total_entries": int,
|
||||
"stale_entries": [...],
|
||||
"fresh_entries": [...],
|
||||
"git_changes": {...},
|
||||
"summary": {...}
|
||||
}
|
||||
"""
|
||||
entries = load_knowledge_entries(knowledge_dir)
|
||||
git_changes = get_git_file_changes(repo_root, days)
|
||||
|
||||
stale_entries = []
|
||||
fresh_entries = []
|
||||
|
||||
for entry in entries:
|
||||
source_file = entry.get("source_file")
|
||||
if not source_file:
|
||||
# Entry without source file reference
|
||||
fresh_entries.append({**entry, "status": "no_source"})
|
||||
continue
|
||||
|
||||
# Check if source file was recently modified
|
||||
is_stale = False
|
||||
reason = ""
|
||||
|
||||
if source_file in git_changes["modified"]:
|
||||
is_stale = True
|
||||
reason = "source_modified"
|
||||
elif source_file in git_changes["deleted"]:
|
||||
is_stale = True
|
||||
reason = "source_deleted"
|
||||
elif source_file in git_changes["added"]:
|
||||
is_stale = True
|
||||
reason = "source_added"
|
||||
|
||||
# Also check hash if available
|
||||
stored_hash = entry.get("source_hash")
|
||||
if stored_hash:
|
||||
full_path = os.path.join(repo_root, source_file)
|
||||
current_hash = compute_file_hash(full_path)
|
||||
|
||||
if current_hash is None:
|
||||
is_stale = True
|
||||
reason = "source_missing"
|
||||
elif current_hash != stored_hash:
|
||||
is_stale = True
|
||||
reason = "hash_mismatch"
|
||||
|
||||
if is_stale:
|
||||
stale_entries.append({
|
||||
**entry,
|
||||
"status": "stale",
|
||||
"reason": reason
|
||||
})
|
||||
else:
|
||||
fresh_entries.append({**entry, "status": "fresh"})
|
||||
|
||||
# Compute summary
|
||||
total = len(entries)
|
||||
stale_count = len(stale_entries)
|
||||
fresh_count = len(fresh_entries)
|
||||
|
||||
# Group stale entries by reason
|
||||
stale_by_reason = {}
|
||||
for entry in stale_entries:
|
||||
reason = entry.get("reason", "unknown")
|
||||
if reason not in stale_by_reason:
|
||||
stale_by_reason[reason] = 0
|
||||
stale_by_reason[reason] += 1
|
||||
|
||||
return {
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"total_entries": total,
|
||||
"stale_entries": stale_entries,
|
||||
"fresh_entries": fresh_entries,
|
||||
"git_changes": git_changes,
|
||||
"summary": {
|
||||
"total": total,
|
||||
"stale": stale_count,
|
||||
"fresh": fresh_count,
|
||||
"stale_percentage": round(stale_count / total * 100, 1) if total > 0 else 0,
|
||||
"stale_by_reason": stale_by_reason,
|
||||
"git_changes_summary": {
|
||||
"modified": len(git_changes["modified"]),
|
||||
"added": len(git_changes["added"]),
|
||||
"deleted": len(git_changes["deleted"])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def update_stale_hashes(knowledge_dir: str, repo_root: str = ".") -> int:
|
||||
"""
|
||||
Update hashes for stale entries. Returns count of updated entries.
|
||||
"""
|
||||
entries = load_knowledge_entries(knowledge_dir)
|
||||
updated = 0
|
||||
|
||||
# This is a simplified version - in practice, you'd need to
|
||||
# write back to the specific YAML files
|
||||
for entry in entries:
|
||||
source_file = entry.get("source_file")
|
||||
if not source_file:
|
||||
continue
|
||||
|
||||
full_path = os.path.join(repo_root, source_file)
|
||||
current_hash = compute_file_hash(full_path)
|
||||
|
||||
if current_hash and entry.get("source_hash") != current_hash:
|
||||
# Mark for update (in practice, you'd write back to the file)
|
||||
updated += 1
|
||||
|
||||
return updated
|
||||
|
||||
|
||||
def format_report(result: Dict[str, Any], max_items: int = 20) -> str:
|
||||
"""Format freshness check results as a human-readable report."""
|
||||
timestamp = result["timestamp"]
|
||||
summary = result["summary"]
|
||||
stale_entries = result["stale_entries"]
|
||||
git_changes = result["git_changes"]
|
||||
|
||||
lines = [
|
||||
"Knowledge Freshness Report",
|
||||
"=" * 50,
|
||||
f"Generated: {timestamp}",
|
||||
f"Total entries: {summary['total']}",
|
||||
f"Stale entries: {summary['stale']} ({summary['stale_percentage']}%)",
|
||||
f"Fresh entries: {summary['fresh']}",
|
||||
""
|
||||
]
|
||||
|
||||
# Git changes summary
|
||||
lines.extend([
|
||||
"Git Changes (last 24h):",
|
||||
f" Modified: {len(git_changes['modified'])} files",
|
||||
f" Added: {len(git_changes['added'])} files",
|
||||
f" Deleted: {len(git_changes['deleted'])} files",
|
||||
""
|
||||
])
|
||||
|
||||
# Stale entries by reason
|
||||
if summary.get("stale_by_reason"):
|
||||
lines.extend([
|
||||
"Stale Entries by Reason:",
|
||||
""
|
||||
])
|
||||
for reason, count in summary["stale_by_reason"].items():
|
||||
lines.append(f" {reason}: {count}")
|
||||
lines.append("")
|
||||
|
||||
# List stale entries
|
||||
if stale_entries:
|
||||
lines.extend([
|
||||
"Stale Entries:",
|
||||
""
|
||||
])
|
||||
for i, entry in enumerate(stale_entries[:max_items], 1):
|
||||
source = entry.get("source_file", "?")
|
||||
reason = entry.get("reason", "unknown")
|
||||
fact = entry.get("fact", "")[:60]
|
||||
lines.append(f"{i:2d}. [{reason}] {source}")
|
||||
if fact:
|
||||
lines.append(f" {fact}")
|
||||
|
||||
if len(stale_entries) > max_items:
|
||||
lines.append(f"\n... and {len(stale_entries) - max_items} more")
|
||||
else:
|
||||
lines.append("No stale entries found. All knowledge is fresh!")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Knowledge Freshness Cron — detect stale entries from code changes")
|
||||
parser.add_argument("--knowledge-dir", required=True,
|
||||
help="Path to knowledge directory")
|
||||
parser.add_argument("--repo", default=".",
|
||||
help="Path to repository for git change detection")
|
||||
parser.add_argument("--days", type=int, default=1,
|
||||
help="Number of days to check for git changes (default: 1)")
|
||||
parser.add_argument("--json", action="store_true",
|
||||
help="Output as JSON instead of human-readable")
|
||||
parser.add_argument("--max", type=int, default=20,
|
||||
help="Maximum stale entries to show (default: 20)")
|
||||
parser.add_argument("--auto-reextract", action="store_true",
|
||||
help="Auto-re-extract knowledge for stale entries")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not os.path.isdir(args.knowledge_dir):
|
||||
print(f"Error: {args.knowledge_dir} is not a directory", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not os.path.isdir(args.repo):
|
||||
print(f"Error: {args.repo} is not a directory", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
result = check_freshness(args.knowledge_dir, args.repo, args.days)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(result, indent=2))
|
||||
else:
|
||||
print(format_report(result, args.max))
|
||||
|
||||
# Auto-re-extract if requested
|
||||
if args.auto_reextract and result["stale_entries"]:
|
||||
print(f"\nAuto-re-extracting {len(result['stale_entries'])} stale entries...")
|
||||
# In a real implementation, this would call the harvester
|
||||
print("(Auto-re-extraction not yet implemented)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -113,7 +113,7 @@ def find_slow_tests_by_scan(repo_path: str) -> List[Bottleneck]:
|
||||
(r"time\.sleep\((\d+(?:\.\d+)?)\)", "Contains time.sleep() — consider using mock or async wait"),
|
||||
(r"subprocess\.run\(.*timeout=(\d+)", "Subprocess with timeout — may block test"),
|
||||
(r"requests\.(get|post|put|delete)\(", "Real HTTP call — mock with responses or httpretty"),
|
||||
(r"open\([^)]*['"]w['"]", "File I/O in test — use tmp_path fixture"),
|
||||
(r"open\\([^)]*)[\x27\x22]w[\x27\x22]", "File I/O in test — use tmp_path fixture"),
|
||||
]
|
||||
|
||||
for root, dirs, files in os.walk(repo_path):
|
||||
@@ -506,8 +506,8 @@ def format_markdown(report: PerfReport) -> str:
|
||||
lines.append(f"- {icon} {b.name}{loc} — ~{b.duration_s:.1f}s — {b.recommendation}")
|
||||
lines.append(f"")
|
||||
|
||||
return "
|
||||
".join(lines)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────
|
||||
@@ -521,8 +521,8 @@ def main():
|
||||
help="Slow test threshold in seconds")
|
||||
args = parser.parse_args()
|
||||
|
||||
global SLOW_TEST_THRESHOLD_S
|
||||
SLOW_TEST_THRESHOLD_S = args.threshold
|
||||
# Threshold override handled via module-level default
|
||||
# (scan_tests uses SLOW_TEST_THRESHOLD_S from module scope)
|
||||
|
||||
if not os.path.isdir(args.repo):
|
||||
print(f"Error: {args.repo} is not a directory", file=sys.stderr)
|
||||
|
||||
351
scripts/pr_complexity_scorer.py
Normal file
351
scripts/pr_complexity_scorer.py
Normal file
@@ -0,0 +1,351 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
PR Complexity Scorer - Estimate review effort for PRs.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
|
||||
DEPENDENCY_FILES = {
|
||||
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
|
||||
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
|
||||
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
|
||||
}
|
||||
|
||||
TEST_PATTERNS = [
|
||||
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
|
||||
r"spec/.*\.rb$", r".*_spec\.rb$",
|
||||
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
|
||||
]
|
||||
|
||||
WEIGHT_FILES = 0.25
|
||||
WEIGHT_LINES = 0.25
|
||||
WEIGHT_DEPS = 0.30
|
||||
WEIGHT_TEST_COV = 0.20
|
||||
|
||||
SMALL_FILES = 5
|
||||
MEDIUM_FILES = 20
|
||||
LARGE_FILES = 50
|
||||
|
||||
SMALL_LINES = 100
|
||||
MEDIUM_LINES = 500
|
||||
LARGE_LINES = 2000
|
||||
|
||||
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PRComplexity:
|
||||
pr_number: int
|
||||
title: str
|
||||
files_changed: int
|
||||
additions: int
|
||||
deletions: int
|
||||
has_dependency_changes: bool
|
||||
test_coverage_delta: Optional[int]
|
||||
score: int
|
||||
estimated_minutes: int
|
||||
reasons: List[str]
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str):
|
||||
self.token = token
|
||||
self.base_url = GITEA_BASE.rstrip("/")
|
||||
|
||||
def _request(self, path: str, params: Dict = None) -> Any:
|
||||
url = f"{self.base_url}{path}"
|
||||
if params:
|
||||
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
||||
url += f"?{qs}"
|
||||
|
||||
req = urllib.request.Request(url)
|
||||
req.add_header("Authorization", f"token {self.token}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
|
||||
return None
|
||||
except urllib.error.URLError as e:
|
||||
print(f"Network error: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
|
||||
if not batch:
|
||||
break
|
||||
prs.extend(batch)
|
||||
if len(batch) < 50:
|
||||
break
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
|
||||
files = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(
|
||||
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
|
||||
{"limit": 100, "page": page}
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
files.extend(batch)
|
||||
if len(batch) < 100:
|
||||
break
|
||||
page += 1
|
||||
return files
|
||||
|
||||
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
|
||||
data = json.dumps({"body": body}).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
|
||||
data=data,
|
||||
method="POST",
|
||||
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return resp.status in (200, 201)
|
||||
except urllib.error.HTTPError:
|
||||
return False
|
||||
|
||||
|
||||
def is_dependency_file(filename: str) -> bool:
|
||||
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
|
||||
|
||||
|
||||
def is_test_file(filename: str) -> bool:
|
||||
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
|
||||
|
||||
|
||||
def score_pr(
|
||||
files_changed: int,
|
||||
additions: int,
|
||||
deletions: int,
|
||||
has_dependency_changes: bool,
|
||||
test_coverage_delta: Optional[int] = None
|
||||
) -> tuple[int, int, List[str]]:
|
||||
score = 1.0
|
||||
reasons = []
|
||||
|
||||
# Files changed
|
||||
if files_changed <= SMALL_FILES:
|
||||
fscore = 1.0
|
||||
reasons.append("small number of files changed")
|
||||
elif files_changed <= MEDIUM_FILES:
|
||||
fscore = 2.0
|
||||
reasons.append("moderate number of files changed")
|
||||
elif files_changed <= LARGE_FILES:
|
||||
fscore = 2.5
|
||||
reasons.append("large number of files changed")
|
||||
else:
|
||||
fscore = 3.0
|
||||
reasons.append("very large PR spanning many files")
|
||||
|
||||
# Lines changed
|
||||
total_lines = additions + deletions
|
||||
if total_lines <= SMALL_LINES:
|
||||
lscore = 1.0
|
||||
reasons.append("small change size")
|
||||
elif total_lines <= MEDIUM_LINES:
|
||||
lscore = 2.0
|
||||
reasons.append("moderate change size")
|
||||
elif total_lines <= LARGE_LINES:
|
||||
lscore = 3.0
|
||||
reasons.append("large change size")
|
||||
else:
|
||||
lscore = 4.0
|
||||
reasons.append("very large change")
|
||||
|
||||
# Dependency changes
|
||||
if has_dependency_changes:
|
||||
dscore = 2.5
|
||||
reasons.append("dependency changes (architectural impact)")
|
||||
else:
|
||||
dscore = 0.0
|
||||
|
||||
# Test coverage delta
|
||||
tscore = 0.0
|
||||
if test_coverage_delta is not None:
|
||||
if test_coverage_delta > 0:
|
||||
reasons.append(f"test additions (+{test_coverage_delta} test files)")
|
||||
tscore = -min(2.0, test_coverage_delta / 2.0)
|
||||
elif test_coverage_delta < 0:
|
||||
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
|
||||
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
|
||||
else:
|
||||
reasons.append("test coverage change not assessed")
|
||||
|
||||
# Weighted sum, scaled by 3 to use full 1-10 range
|
||||
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
|
||||
scaled_bonus = bonus * 3.0
|
||||
score = 1.0 + scaled_bonus
|
||||
|
||||
final_score = max(1, min(10, int(round(score))))
|
||||
est_minutes = TIME_PER_POINT.get(final_score, 30)
|
||||
|
||||
return final_score, est_minutes, reasons
|
||||
|
||||
|
||||
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
|
||||
pr_num = pr_data["number"]
|
||||
title = pr_data.get("title", "")
|
||||
files = client.get_pr_files(org, repo, pr_num)
|
||||
|
||||
additions = sum(f.get("additions", 0) for f in files)
|
||||
deletions = sum(f.get("deletions", 0) for f in files)
|
||||
filenames = [f.get("filename", "") for f in files]
|
||||
|
||||
has_deps = any(is_dependency_file(f) for f in filenames)
|
||||
|
||||
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
|
||||
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
|
||||
test_delta = test_added - test_removed if (test_added or test_removed) else None
|
||||
|
||||
score, est_min, reasons = score_pr(
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta
|
||||
)
|
||||
|
||||
return PRComplexity(
|
||||
pr_number=pr_num,
|
||||
title=title,
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta,
|
||||
score=score,
|
||||
estimated_minutes=est_min,
|
||||
reasons=reasons
|
||||
)
|
||||
|
||||
|
||||
def build_comment(complexity: PRComplexity) -> str:
|
||||
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
|
||||
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
|
||||
test_note = ""
|
||||
if complexity.test_coverage_delta is not None:
|
||||
if complexity.test_coverage_delta > 0:
|
||||
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
|
||||
elif complexity.test_coverage_delta < 0:
|
||||
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
|
||||
|
||||
comment = f"## 📊 PR Complexity Analysis\n\n"
|
||||
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
|
||||
comment += f"| Metric | Value |\n|--------|-------|\n"
|
||||
comment += f"| Changes | {change_desc} |\n"
|
||||
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
|
||||
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
|
||||
comment += f"### Scoring rationale:"
|
||||
for r in complexity.reasons:
|
||||
comment += f"\n- {r}"
|
||||
if deps_note:
|
||||
comment += deps_note
|
||||
if test_note:
|
||||
comment += test_note
|
||||
comment += f"\n\n---\n"
|
||||
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
|
||||
return comment
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
|
||||
parser.add_argument("--org", default="Timmy_Foundation")
|
||||
parser.add_argument("--repo", default="compounding-intelligence")
|
||||
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--apply", action="store_true")
|
||||
parser.add_argument("--output", default="metrics/pr_complexity.json")
|
||||
args = parser.parse_args()
|
||||
|
||||
token_path = args.token
|
||||
if os.path.exists(token_path):
|
||||
with open(token_path) as f:
|
||||
token = f.read().strip()
|
||||
else:
|
||||
token = args.token
|
||||
|
||||
if not token:
|
||||
print("ERROR: No Gitea token provided", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
client = GiteaClient(token)
|
||||
|
||||
print(f"Fetching open PRs for {args.org}/{args.repo}...")
|
||||
prs = client.get_open_prs(args.org, args.repo)
|
||||
if not prs:
|
||||
print("No open PRs found.")
|
||||
sys.exit(0)
|
||||
|
||||
print(f"Found {len(prs)} open PR(s). Analyzing...")
|
||||
|
||||
results = []
|
||||
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for pr in prs:
|
||||
pr_num = pr["number"]
|
||||
title = pr.get("title", "")
|
||||
print(f" Analyzing PR #{pr_num}: {title[:60]}")
|
||||
|
||||
try:
|
||||
complexity = analyze_pr(client, args.org, args.repo, pr)
|
||||
results.append(complexity.to_dict())
|
||||
|
||||
comment = build_comment(complexity)
|
||||
|
||||
if args.dry_run:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
|
||||
elif args.apply:
|
||||
success = client.post_comment(args.org, args.repo, pr_num, comment)
|
||||
status = "[commented]" if success else "[FAILED]"
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
|
||||
else:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump({
|
||||
"org": args.org,
|
||||
"repo": args.repo,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"pr_count": len(results),
|
||||
"results": results
|
||||
}, f, indent=2)
|
||||
|
||||
if results:
|
||||
scores = [r["score"] for r in results]
|
||||
print(f"\nResults saved to {args.output}")
|
||||
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
|
||||
else:
|
||||
print("\nNo results to save.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -10,37 +10,273 @@ Usage:
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import ast
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, Tuple
|
||||
|
||||
|
||||
def generate_proposals():
|
||||
"""Generate sample proposals for this engine."""
|
||||
# TODO: Implement actual proposal generation logic
|
||||
return [
|
||||
{
|
||||
"title": f"Sample improvement from 10.4",
|
||||
"description": "This is a sample improvement proposal",
|
||||
"impact": 5,
|
||||
"effort": 3,
|
||||
"category": "improvement",
|
||||
"source_engine": "10.4",
|
||||
"timestamp": datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
]
|
||||
# ── Data Classes ────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class FileMetrics:
|
||||
"""Metrics for a single source file."""
|
||||
path: str
|
||||
lines: int = 0
|
||||
complexity: float = 0.0
|
||||
max_complexity: int = 0
|
||||
functions: int = 0
|
||||
classes: int = 0
|
||||
churn_30d: int = 0
|
||||
churn_90d: int = 0
|
||||
test_coverage: Optional[float] = None
|
||||
refactoring_score: float = 0.0
|
||||
|
||||
|
||||
# ── Complexity Analysis ─────────────────────────────────────────────────
|
||||
|
||||
class ComplexityVisitor(ast.NodeVisitor):
|
||||
"""AST visitor that computes cyclomatic complexity per function."""
|
||||
|
||||
def __init__(self):
|
||||
self.complexities = []
|
||||
self.function_count = 0
|
||||
self.class_count = 0
|
||||
self._current_complexity = 0
|
||||
self._in_function = False
|
||||
|
||||
def visit_FunctionDef(self, node):
|
||||
self.function_count += 1
|
||||
old_complexity = self._current_complexity
|
||||
old_in_function = self._in_function
|
||||
self._current_complexity = 1 # Base complexity
|
||||
self._in_function = True
|
||||
|
||||
self.generic_visit(node)
|
||||
|
||||
self.complexities.append(self._current_complexity)
|
||||
self._current_complexity = old_complexity
|
||||
self._in_function = old_in_function
|
||||
|
||||
visit_AsyncFunctionDef = visit_FunctionDef
|
||||
|
||||
def visit_ClassDef(self, node):
|
||||
self.class_count += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_If(self, node):
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_For(self, node):
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
visit_AsyncFor = visit_For
|
||||
|
||||
def visit_While(self, node):
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_ExceptHandler(self, node):
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_With(self, node):
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
visit_AsyncWith = visit_With
|
||||
|
||||
def visit_Assert(self, node):
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_BoolOp(self, node):
|
||||
# Each 'and'/'or' adds a branch
|
||||
if self._in_function:
|
||||
self._current_complexity += len(node.values) - 1
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_IfExp(self, node):
|
||||
# Ternary expression
|
||||
if self._in_function:
|
||||
self._current_complexity += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
|
||||
def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]:
|
||||
"""
|
||||
Compute cyclomatic complexity for a Python file.
|
||||
|
||||
Returns:
|
||||
(avg_complexity, max_complexity, function_count, class_count, line_count)
|
||||
"""
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
||||
source = f.read()
|
||||
except (IOError, OSError):
|
||||
return 0.0, 0, 0, 0, 0
|
||||
|
||||
try:
|
||||
tree = ast.parse(source, filename=filepath)
|
||||
except SyntaxError:
|
||||
return 0.0, 0, 0, 0, 0
|
||||
|
||||
visitor = ComplexityVisitor()
|
||||
visitor.visit(tree)
|
||||
|
||||
line_count = len(source.splitlines())
|
||||
|
||||
if not visitor.complexities:
|
||||
# No functions, but might have classes
|
||||
return 0.0, 0, visitor.function_count, visitor.class_count, line_count
|
||||
|
||||
avg = sum(visitor.complexities) / len(visitor.complexities)
|
||||
max_c = max(visitor.complexities)
|
||||
|
||||
return avg, max_c, visitor.function_count, visitor.class_count, line_count
|
||||
|
||||
|
||||
# ── Refactoring Score ───────────────────────────────────────────────────
|
||||
|
||||
def calculate_refactoring_score(metrics: FileMetrics) -> float:
|
||||
"""
|
||||
Calculate a refactoring priority score (0-100) based on file metrics.
|
||||
|
||||
Higher score = higher priority for refactoring.
|
||||
Components:
|
||||
- Complexity (0-30 points): higher avg/max complexity = higher score
|
||||
- Size (0-20 points): larger files = higher score
|
||||
- Churn (0-30 points): more changes recently = higher score
|
||||
- Coverage (0-20 points): lower test coverage = higher score
|
||||
"""
|
||||
score = 0.0
|
||||
|
||||
# Complexity component (0-30)
|
||||
# avg=10+ or max=20+ → 30 points
|
||||
complexity_score = min(30.0, (metrics.complexity * 2) + (metrics.max_complexity * 0.5))
|
||||
score += max(0.0, complexity_score)
|
||||
|
||||
# Size component (0-20)
|
||||
# 500+ lines → 20 points
|
||||
size_score = min(20.0, metrics.lines / 25.0)
|
||||
score += max(0.0, size_score)
|
||||
|
||||
# Churn component (0-30)
|
||||
# Weighted: recent churn (30d) counts more than older (90d)
|
||||
churn_score = min(30.0, (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5))
|
||||
score += max(0.0, churn_score)
|
||||
|
||||
# Coverage component (0-20)
|
||||
# Lower coverage → higher score
|
||||
if metrics.test_coverage is not None:
|
||||
# coverage=0 → 20 points, coverage=1 → 0 points
|
||||
coverage_score = (1.0 - metrics.test_coverage) * 20.0
|
||||
else:
|
||||
# No data → assume medium risk (10 points)
|
||||
coverage_score = 10.0
|
||||
score += max(0.0, coverage_score)
|
||||
|
||||
return min(100.0, max(0.0, score))
|
||||
|
||||
|
||||
# ── Proposal Generation ─────────────────────────────────────────────────
|
||||
|
||||
def scan_directory(directory: str, extensions: tuple = ('.py',)) -> list:
|
||||
"""Scan directory for source files."""
|
||||
files = []
|
||||
for root, dirs, filenames in os.walk(directory):
|
||||
# Skip hidden dirs and common non-source dirs
|
||||
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in (
|
||||
'__pycache__', 'node_modules', 'venv', '.venv', 'env',
|
||||
'build', 'dist', '.git', '.tox'
|
||||
)]
|
||||
for fname in filenames:
|
||||
if any(fname.endswith(ext) for ext in extensions):
|
||||
files.append(os.path.join(root, fname))
|
||||
return files
|
||||
|
||||
|
||||
def generate_proposals(directory: str = '.', min_score: float = 30.0) -> list:
|
||||
"""Generate refactoring proposals by analyzing source files."""
|
||||
proposals = []
|
||||
files = scan_directory(directory)
|
||||
|
||||
for filepath in files:
|
||||
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
|
||||
|
||||
if funcs == 0 and classes == 0:
|
||||
continue
|
||||
|
||||
metrics = FileMetrics(
|
||||
path=filepath,
|
||||
lines=lines,
|
||||
complexity=avg,
|
||||
max_complexity=max_c,
|
||||
functions=funcs,
|
||||
classes=classes
|
||||
)
|
||||
score = calculate_refactoring_score(metrics)
|
||||
metrics.refactoring_score = score
|
||||
|
||||
if score >= min_score:
|
||||
reasons = []
|
||||
if max_c > 10:
|
||||
reasons.append(f"high max complexity ({max_c})")
|
||||
if avg > 5:
|
||||
reasons.append(f"high avg complexity ({avg:.1f})")
|
||||
if lines > 300:
|
||||
reasons.append(f"large file ({lines} lines)")
|
||||
|
||||
proposals.append({
|
||||
"title": f"Refactor {os.path.basename(filepath)} (score: {score:.0f})",
|
||||
"description": f"{filepath}: {', '.join(reasons) if reasons else 'general improvement candidate'}",
|
||||
"impact": min(10, int(score / 10)),
|
||||
"effort": min(10, max(1, int(max_c / 3))),
|
||||
"category": "refactoring",
|
||||
"source_engine": "10.4",
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"metrics": {
|
||||
"path": filepath,
|
||||
"score": round(score, 2),
|
||||
"avg_complexity": round(avg, 2),
|
||||
"max_complexity": max_c,
|
||||
"lines": lines,
|
||||
"functions": funcs,
|
||||
"classes": classes
|
||||
}
|
||||
})
|
||||
|
||||
# Sort by score descending
|
||||
proposals.sort(key=lambda p: p.get('metrics', {}).get('score', 0), reverse=True)
|
||||
return proposals
|
||||
|
||||
|
||||
# ── CLI ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases")
|
||||
parser.add_argument("--output", required=True, help="Output file for proposals")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
|
||||
|
||||
parser.add_argument("--directory", default=".", help="Directory to scan")
|
||||
parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
proposals = generate_proposals()
|
||||
|
||||
|
||||
proposals = generate_proposals(args.directory, args.min_score)
|
||||
|
||||
if not args.dry_run:
|
||||
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
|
||||
with open(args.output, "w") as f:
|
||||
json.dump({"proposals": proposals}, f, indent=2)
|
||||
print(f"Generated {len(proposals)} proposals -> {args.output}")
|
||||
|
||||
116
scripts/run_connector.py
Normal file
116
scripts/run_connector.py
Normal file
@@ -0,0 +1,116 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
scripts/run_connector.py — Run a personal archive connector and emit SourceEvents.
|
||||
|
||||
Usage:
|
||||
python3 scripts/run_connector.py twitter --source /path/to/twitter/archive --output events.jsonl [--limit 100]
|
||||
|
||||
This is the entry point that ties the connectors pack into the existing compounding-intelligence
|
||||
pipeline. Output is JSONL (one SourceEvent per line), ready for downstream ingestion by
|
||||
harvester.py or a future connector-targeted harvester.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent dir to path for sibling imports
|
||||
SCRIPT_DIR = Path(__file__).parent.absolute()
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
|
||||
from connectors import get_connector, list_connectors
|
||||
from connectors.base import BaseConnector
|
||||
from connectors.schema import SourceEvent
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
||||
)
|
||||
logger = logging.getLogger("run_connector")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Run a personal archive connector and emit normalized events."
|
||||
)
|
||||
parser.add_argument(
|
||||
"connector",
|
||||
choices=list_connectors(),
|
||||
help="Connector name to run"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--source", "-s",
|
||||
required=True,
|
||||
help="Path to the source archive root (e.g., ~/Documents/TwitterArchive)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output", "-o",
|
||||
required=True,
|
||||
help="Output file path (JSONL, one SourceEvent per line)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--limit", "-n",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Stop after N events (default: unlimited)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--consent-scope",
|
||||
choices=["memory_only", "bootstrap_context", "training_data"],
|
||||
default="memory_only",
|
||||
help="Consent scope for emitted events (default: memory_only)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--checkpoint",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Checkpoint file path (default: ~/.cache/connectors/{name}.checkpoint.jsonl)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Parse and count events but do not write output"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Resolve connector
|
||||
connector_cls = get_connector(args.connector)
|
||||
connector: BaseConnector = connector_cls(
|
||||
checkpoint_path=args.checkpoint,
|
||||
consent_scope=args.consent_scope
|
||||
)
|
||||
|
||||
# Resolve source path
|
||||
source_path = Path(args.source).expanduser().resolve()
|
||||
if not source_path.exists():
|
||||
logger.error("Source path does not exist: %s", source_path)
|
||||
sys.exit(1)
|
||||
|
||||
# Run connector
|
||||
logger.info("Running connector '%s' on source: %s", args.connector, source_path)
|
||||
events = connector.run(source_path, limit=args.limit)
|
||||
|
||||
if args.dry_run:
|
||||
count = sum(1 for _ in events)
|
||||
logger.info("[DRY RUN] Would emit %d events", count)
|
||||
return 0
|
||||
|
||||
# Write output
|
||||
output_path = Path(args.output).expanduser().resolve()
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
count = 0
|
||||
with open(output_path, 'w', encoding='utf-8') as out:
|
||||
for event in events:
|
||||
out.write(event.to_json() + '\n')
|
||||
count += 1
|
||||
|
||||
logger.info("Connector complete. Emitted %d events to %s", count, output_path)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
170
scripts/test_pr_complexity_scorer.py
Normal file
170
scripts/test_pr_complexity_scorer.py
Normal file
@@ -0,0 +1,170 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for PR Complexity Scorer — unit tests for the scoring logic.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
from pr_complexity_scorer import (
|
||||
score_pr,
|
||||
is_dependency_file,
|
||||
is_test_file,
|
||||
TIME_PER_POINT,
|
||||
SMALL_FILES,
|
||||
MEDIUM_FILES,
|
||||
LARGE_FILES,
|
||||
SMALL_LINES,
|
||||
MEDIUM_LINES,
|
||||
LARGE_LINES,
|
||||
)
|
||||
|
||||
PASS = 0
|
||||
FAIL = 0
|
||||
|
||||
def test(name):
|
||||
def decorator(fn):
|
||||
global PASS, FAIL
|
||||
try:
|
||||
fn()
|
||||
PASS += 1
|
||||
print(f" [PASS] {name}")
|
||||
except AssertionError as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: {e}")
|
||||
except Exception as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: Unexpected error: {e}")
|
||||
return decorator
|
||||
|
||||
def assert_eq(a, b, msg=""):
|
||||
if a != b:
|
||||
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
|
||||
|
||||
def assert_true(v, msg=""):
|
||||
if not v:
|
||||
raise AssertionError(msg or "Expected True")
|
||||
|
||||
def assert_false(v, msg=""):
|
||||
if v:
|
||||
raise AssertionError(msg or "Expected False")
|
||||
|
||||
|
||||
print("=== PR Complexity Scorer Tests ===\n")
|
||||
|
||||
print("-- File Classification --")
|
||||
|
||||
@test("dependency file detection — requirements.txt")
|
||||
def _():
|
||||
assert_true(is_dependency_file("requirements.txt"))
|
||||
assert_true(is_dependency_file("src/requirements.txt"))
|
||||
assert_false(is_dependency_file("requirements_test.txt"))
|
||||
|
||||
@test("dependency file detection — pyproject.toml")
|
||||
def _():
|
||||
assert_true(is_dependency_file("pyproject.toml"))
|
||||
assert_false(is_dependency_file("myproject.py"))
|
||||
|
||||
@test("test file detection — pytest style")
|
||||
def _():
|
||||
assert_true(is_test_file("tests/test_api.py"))
|
||||
assert_true(is_test_file("test_module.py"))
|
||||
assert_true(is_test_file("src/module_test.py"))
|
||||
|
||||
@test("test file detection — other frameworks")
|
||||
def _():
|
||||
assert_true(is_test_file("spec/feature_spec.rb"))
|
||||
assert_true(is_test_file("__tests__/component.test.js"))
|
||||
assert_false(is_test_file("testfixtures/helper.py"))
|
||||
|
||||
|
||||
print("\n-- Scoring Logic --")
|
||||
|
||||
@test("small PR gets low score (1-3)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=3,
|
||||
additions=50,
|
||||
deletions=10,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
|
||||
assert_true(minutes < 20)
|
||||
|
||||
@test("medium PR gets medium score (4-6)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=15,
|
||||
additions=400,
|
||||
deletions=100,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
|
||||
assert_true(20 <= minutes <= 45)
|
||||
|
||||
@test("large PR gets high score (7-9)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=60,
|
||||
additions=3000,
|
||||
deletions=1500,
|
||||
has_dependency_changes=True,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
|
||||
assert_true(minutes >= 45)
|
||||
|
||||
@test("dependency changes boost score")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
dep_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=True, test_coverage_delta=None
|
||||
)
|
||||
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
|
||||
|
||||
@test("adding tests lowers complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
better_score, _, _ = score_pr(
|
||||
files_changed=8, additions=180, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=3
|
||||
)
|
||||
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
|
||||
|
||||
@test("removing tests increases complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
worse_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=-2
|
||||
)
|
||||
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
|
||||
|
||||
@test("score bounded 1-10")
|
||||
def _():
|
||||
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
|
||||
score, _, _ = score_pr(files, adds, dels, False, None)
|
||||
assert_true(1 <= score <= 10, f"Score {score} out of range")
|
||||
|
||||
@test("estimated minutes exist for all scores")
|
||||
def _():
|
||||
for s in range(1, 11):
|
||||
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
|
||||
|
||||
|
||||
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
|
||||
sys.exit(0 if FAIL == 0 else 1)
|
||||
270
tests/test_connectors.py
Normal file
270
tests/test_connectors.py
Normal file
@@ -0,0 +1,270 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
tests/test_connectors.py — Test suite for the personal archive connector pack.
|
||||
|
||||
Tests cover:
|
||||
- SourceEvent schema validation
|
||||
- Event hash determinism
|
||||
- TwitterArchiveConnector parsing of standard Twitter export format
|
||||
- Deduplication gate
|
||||
"""
|
||||
|
||||
import json
|
||||
import hashlib
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
import pytest
|
||||
|
||||
# Add scripts dir to path for sibling imports
|
||||
import sys
|
||||
SCRIPT_DIR = Path(__file__).parent.parent / "scripts"
|
||||
sys.path.insert(0, str(SCRIPT_DIR.parent))
|
||||
|
||||
from connectors.schema import (
|
||||
SourceEvent,
|
||||
compute_event_hash,
|
||||
validate_event,
|
||||
CONSENT_MEMORY_ONLY,
|
||||
CONSENT_BOOTSTRAP,
|
||||
)
|
||||
from connectors.twitter_archive import TwitterArchiveConnector
|
||||
|
||||
|
||||
class TestSourceEventSchema:
|
||||
"""Tests for SourceEvent dataclass and helpers."""
|
||||
|
||||
def test_create_minimal_event(self):
|
||||
event = SourceEvent(
|
||||
source="twitter",
|
||||
account="user123",
|
||||
thread_or_channel="tweet_456",
|
||||
author="user123",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
content="Hello world",
|
||||
attachments=[],
|
||||
raw_ref="twitter:test:456",
|
||||
hash="",
|
||||
)
|
||||
assert event.source == "twitter"
|
||||
assert event.consent_scope == CONSENT_MEMORY_ONLY # default
|
||||
|
||||
def test_compute_event_hash_deterministic(self):
|
||||
h1 = compute_event_hash(
|
||||
source="twitter",
|
||||
raw_ref="ref:123",
|
||||
content="test content",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
author="alice"
|
||||
)
|
||||
h2 = compute_event_hash(
|
||||
source="twitter",
|
||||
raw_ref="ref:123",
|
||||
content="test content",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
author="alice"
|
||||
)
|
||||
assert h1 == h2
|
||||
assert len(h1) == 64 # SHA-256 hex
|
||||
|
||||
def test_compute_event_hash_different_inputs(self):
|
||||
h1 = compute_event_hash("twitter", "ref:1", "content", "ts", "alice")
|
||||
h2 = compute_event_hash("twitter", "ref:1", "different", "ts", "alice")
|
||||
assert h1 != h2
|
||||
|
||||
def test_validate_event_accepts_valid(self):
|
||||
event = SourceEvent(
|
||||
source="discord",
|
||||
account="user#1234",
|
||||
thread_or_channel="channel_abc",
|
||||
author="user#1234",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
content="test",
|
||||
attachments=[],
|
||||
raw_ref="discord:msg:123",
|
||||
hash="a" * 64,
|
||||
)
|
||||
assert validate_event(event) is True
|
||||
|
||||
def test_validate_event_rejects_empty_content(self):
|
||||
event = SourceEvent(
|
||||
source="twitter",
|
||||
account="user",
|
||||
thread_or_channel="thread",
|
||||
author="user",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
content="", # empty
|
||||
attachments=[],
|
||||
raw_ref="ref",
|
||||
hash="a" * 64,
|
||||
)
|
||||
assert validate_event(event) is False
|
||||
|
||||
def test_validate_event_rejects_missing_hash(self):
|
||||
event = SourceEvent(
|
||||
source="twitter",
|
||||
account="user",
|
||||
thread_or_channel="thread",
|
||||
author="user",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
content="test",
|
||||
attachments=[],
|
||||
raw_ref="ref",
|
||||
hash=" ", # whitespace only
|
||||
)
|
||||
assert validate_event(event) is False
|
||||
|
||||
def test_event_to_json_roundtrip(self):
|
||||
event = SourceEvent(
|
||||
source="twitter",
|
||||
account="user",
|
||||
thread_or_channel="t1",
|
||||
author="user",
|
||||
timestamp="2026-04-26T12:00:00Z",
|
||||
content="hello",
|
||||
attachments=["https://example.com/img.jpg"],
|
||||
raw_ref="twitter:123",
|
||||
hash="b" * 64,
|
||||
metadata={"retweet_count": 5}
|
||||
)
|
||||
json_str = event.to_json()
|
||||
parsed = json.loads(json_str)
|
||||
assert parsed["source"] == "twitter"
|
||||
assert parsed["metadata"]["retweet_count"] == 5
|
||||
|
||||
|
||||
class TestTwitterArchiveConnector:
|
||||
"""Tests for the Twitter/X archive connector."""
|
||||
|
||||
def test_connector_name(self):
|
||||
assert TwitterArchiveConnector.name == "twitter_archive"
|
||||
|
||||
def test_discover_sources_finds_tweet_js(self, tmp_path: Path):
|
||||
# Arrange: create a fake Twitter archive structure
|
||||
archive = tmp_path / "twitter_archive"
|
||||
archive.mkdir()
|
||||
data_dir = archive / "data"
|
||||
data_dir.mkdir()
|
||||
(data_dir / "tweet.js").write_text("[]")
|
||||
(data_dir / "tweets_2024_01.js").write_text("[]")
|
||||
|
||||
connector = TwitterArchiveConnector()
|
||||
sources = list(connector.discover_sources(archive))
|
||||
|
||||
assert len(sources) == 2
|
||||
assert any("tweet.js" in str(p) for p in sources)
|
||||
|
||||
def test_parse_single_tweet_wrapped_format(self, tmp_path: Path):
|
||||
"""
|
||||
Twitter's official export wraps the JSON array in a JS assignment:
|
||||
window.YTD.tweet.part0 = [ {...tweet...}, ... ];
|
||||
"""
|
||||
# Create a minimal tweet record
|
||||
tweet = {
|
||||
"id_str": "1234567890",
|
||||
"full_text": "Hello from Twitter archive!",
|
||||
"created_at": "Mon Apr 26 08:30:00 +0000 2026",
|
||||
"favorite_count": 10,
|
||||
"retweet_count": 2,
|
||||
"lang": "en"
|
||||
}
|
||||
wrapped = "window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet}]) + ";\n"
|
||||
|
||||
js_file = tmp_path / "tweet.js"
|
||||
js_file.write_text(wrapped)
|
||||
|
||||
connector = TwitterArchiveConnector()
|
||||
events = list(connector.parse_source(js_file))
|
||||
|
||||
assert len(events) == 1
|
||||
ev = events[0]
|
||||
assert ev.source == "twitter"
|
||||
assert ev.content == "Hello from Twitter archive!"
|
||||
assert ev.author == "user_archive"
|
||||
assert ev.consent_scope == "memory_only"
|
||||
# Hash must be computed
|
||||
assert len(ev.hash) == 64
|
||||
# Metadata preservation
|
||||
assert ev.metadata["tweet_id"] == "1234567890"
|
||||
assert ev.metadata["favorite_count"] == 10
|
||||
|
||||
def test_parse_tweet_array_without_wrapper(self, tmp_path: Path):
|
||||
"""Some Twitter exports are plain JSON arrays (no JS wrapper)."""
|
||||
tweet = {
|
||||
"id_str": "999",
|
||||
"full_text": "Plain JSON tweet",
|
||||
"created_at": "Mon Apr 26 08:30:00 +0000 2026",
|
||||
}
|
||||
json_file = tmp_path / "tweets.json"
|
||||
json_file.write_text(json.dumps([{"tweet": tweet}]))
|
||||
|
||||
connector = TwitterArchiveConnector()
|
||||
events = list(connector.parse_source(json_file))
|
||||
|
||||
assert len(events) == 1
|
||||
assert events[0].content == "Plain JSON tweet"
|
||||
|
||||
def test_parse_with_media_attachments(self, tmp_path: Path):
|
||||
tweet = {
|
||||
"id_str": "111",
|
||||
"full_text": "Check this photo",
|
||||
"created_at": "Mon Apr 26 08:30:00 +0000 2026",
|
||||
"extended_entities": {
|
||||
"media": [
|
||||
{"media_url_https": "https://pbs.twimg.com/media/example1.jpg"},
|
||||
{"media_url_https": "https://pbs.twimg.com/media/example2.jpg"},
|
||||
]
|
||||
}
|
||||
}
|
||||
js_file = tmp_path / "tweet.js"
|
||||
js_file.write_text("window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet}]) + ";\n")
|
||||
|
||||
connector = TwitterArchiveConnector()
|
||||
events = list(connector.parse_source(js_file))
|
||||
|
||||
assert len(events) == 1
|
||||
atts = events[0].attachments
|
||||
assert len(atts) == 2
|
||||
assert "example1.jpg" in atts[0]
|
||||
|
||||
def test_integration_run_connector(self, tmp_path: Path):
|
||||
"""End-to-end: create a mini archive, run connector, write JSONL output."""
|
||||
# Arrange: create archive with two tweets
|
||||
archive_root = tmp_path / "my_twitter_archive" / "data"
|
||||
archive_root.mkdir(parents=True)
|
||||
|
||||
tweet1 = {
|
||||
"id_str": "1",
|
||||
"full_text": "First tweet",
|
||||
"created_at": "Mon Apr 26 08:00:00 +0000 2026",
|
||||
}
|
||||
tweet2 = {
|
||||
"id_str": "2",
|
||||
"full_text": "Second tweet",
|
||||
"created_at": "Mon Apr 26 09:00:00 +0000 2026",
|
||||
}
|
||||
(archive_root / "tweet.js").write_text(
|
||||
"window.YTD.tweet.part0 = " + json.dumps([{'tweet': tweet1}, {'tweet': tweet2}]) + "\n"
|
||||
)
|
||||
|
||||
connector = TwitterArchiveConnector(checkpoint_path=tmp_path / "ckpt.jsonl")
|
||||
output_path = tmp_path / "events.jsonl"
|
||||
|
||||
# Act
|
||||
count = 0
|
||||
with open(output_path, 'w') as out:
|
||||
for event in connector.run(archive_root):
|
||||
out.write(event.to_json() + '\n')
|
||||
count += 1
|
||||
|
||||
# Assert
|
||||
assert count == 2
|
||||
lines = output_path.read_text().strip().split('\n')
|
||||
assert len(lines) == 2
|
||||
ev1 = json.loads(lines[0])
|
||||
assert ev1["content"] == "First tweet"
|
||||
ev2 = json.loads(lines[1])
|
||||
assert ev2["content"] == "Second tweet"
|
||||
# Check duplicates are filtered on re-run
|
||||
count2 = sum(1 for _ in connector.run(archive_root))
|
||||
assert count2 == 0 # all deduped via checkpoint
|
||||
207
tests/test_dedup.py
Normal file
207
tests/test_dedup.py
Normal file
@@ -0,0 +1,207 @@
|
||||
"""Tests for knowledge deduplication module (Issue #196)."""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||
|
||||
from dedup import (
|
||||
normalize_text,
|
||||
content_hash,
|
||||
tokenize,
|
||||
token_similarity,
|
||||
quality_score,
|
||||
merge_facts,
|
||||
dedup_facts,
|
||||
generate_test_duplicates,
|
||||
)
|
||||
|
||||
|
||||
class TestNormalize:
|
||||
def test_lowercases(self):
|
||||
assert normalize_text("Hello World") == "hello world"
|
||||
|
||||
def test_collapses_whitespace(self):
|
||||
assert normalize_text(" hello world ") == "hello world"
|
||||
|
||||
def test_strips(self):
|
||||
assert normalize_text(" text ") == "text"
|
||||
|
||||
|
||||
class TestContentHash:
|
||||
def test_deterministic(self):
|
||||
h1 = content_hash("Hello World")
|
||||
h2 = content_hash("hello world")
|
||||
h3 = content_hash(" Hello World ")
|
||||
assert h1 == h2 == h3
|
||||
|
||||
def test_different_texts(self):
|
||||
h1 = content_hash("Hello")
|
||||
h2 = content_hash("World")
|
||||
assert h1 != h2
|
||||
|
||||
def test_returns_hex(self):
|
||||
h = content_hash("test")
|
||||
assert len(h) == 64 # SHA256
|
||||
assert all(c in '0123456789abcdef' for c in h)
|
||||
|
||||
|
||||
class TestTokenize:
|
||||
def test_extracts_words(self):
|
||||
tokens = tokenize("Hello World Test")
|
||||
assert "hello" in tokens
|
||||
assert "world" in tokens
|
||||
assert "test" in tokens
|
||||
|
||||
def test_skips_short_words(self):
|
||||
tokens = tokenize("a to is the hello")
|
||||
assert "a" not in tokens
|
||||
assert "to" not in tokens
|
||||
assert "hello" in tokens
|
||||
|
||||
def test_returns_set(self):
|
||||
tokens = tokenize("hello hello world")
|
||||
assert isinstance(tokens, set)
|
||||
assert len(tokens) == 2
|
||||
|
||||
|
||||
class TestTokenSimilarity:
|
||||
def test_identical(self):
|
||||
assert token_similarity("hello world", "hello world") == 1.0
|
||||
|
||||
def test_no_overlap(self):
|
||||
assert token_similarity("alpha beta", "gamma delta") == 0.0
|
||||
|
||||
def test_partial_overlap(self):
|
||||
sim = token_similarity("hello world test", "hello universe test")
|
||||
assert 0.3 < sim < 0.7
|
||||
|
||||
def test_empty(self):
|
||||
assert token_similarity("", "hello") == 0.0
|
||||
assert token_similarity("hello", "") == 0.0
|
||||
|
||||
def test_symmetric(self):
|
||||
a = "hello world test"
|
||||
b = "hello universe test"
|
||||
assert token_similarity(a, b) == token_similarity(b, a)
|
||||
|
||||
|
||||
class TestQualityScore:
|
||||
def test_high_confidence(self):
|
||||
fact = {"confidence": 0.95, "source_count": 5, "tags": ["test"], "related": ["x"]}
|
||||
score = quality_score(fact)
|
||||
assert score > 0.7
|
||||
|
||||
def test_low_confidence(self):
|
||||
fact = {"confidence": 0.3, "source_count": 1}
|
||||
score = quality_score(fact)
|
||||
assert score < 0.5
|
||||
|
||||
def test_defaults(self):
|
||||
score = quality_score({})
|
||||
assert 0 < score < 1
|
||||
|
||||
|
||||
class TestMergeFacts:
|
||||
def test_merges_tags(self):
|
||||
keep = {"id": "a", "fact": "test", "tags": ["git"], "confidence": 0.9}
|
||||
drop = {"id": "b", "fact": "test", "tags": ["python"], "confidence": 0.8}
|
||||
merged = merge_facts(keep, drop)
|
||||
assert "git" in merged["tags"]
|
||||
assert "python" in merged["tags"]
|
||||
|
||||
def test_merges_source_count(self):
|
||||
keep = {"id": "a", "fact": "test", "source_count": 3}
|
||||
drop = {"id": "b", "fact": "test", "source_count": 2}
|
||||
merged = merge_facts(keep, drop)
|
||||
assert merged["source_count"] == 5
|
||||
|
||||
def test_keeps_higher_confidence(self):
|
||||
keep = {"id": "a", "fact": "test", "confidence": 0.7}
|
||||
drop = {"id": "b", "fact": "test", "confidence": 0.9}
|
||||
merged = merge_facts(keep, drop)
|
||||
assert merged["confidence"] == 0.9
|
||||
|
||||
def test_tracks_merged_from(self):
|
||||
keep = {"id": "a", "fact": "test"}
|
||||
drop = {"id": "b", "fact": "test"}
|
||||
merged = merge_facts(keep, drop)
|
||||
assert "b" in merged["_merged_from"]
|
||||
|
||||
|
||||
class TestDedupFacts:
|
||||
def test_removes_exact_dupes(self):
|
||||
facts = [
|
||||
{"id": "1", "fact": "Always use git rebase"},
|
||||
{"id": "2", "fact": "Always use git rebase"}, # exact dupe
|
||||
{"id": "3", "fact": "Check logs first"},
|
||||
]
|
||||
deduped, stats = dedup_facts(facts)
|
||||
assert stats["exact_dupes"] == 1
|
||||
assert stats["unique"] == 2
|
||||
|
||||
def test_removes_near_dupes(self):
|
||||
facts = [
|
||||
{"id": "1", "fact": "Always check logs before deploying to production server"},
|
||||
{"id": "2", "fact": "Always check logs before deploying to production environment"},
|
||||
{"id": "3", "fact": "Use docker compose for local development environments"},
|
||||
]
|
||||
deduped, stats = dedup_facts(facts, near_threshold=0.5)
|
||||
assert stats["near_dupes"] >= 1
|
||||
assert stats["unique"] == 2
|
||||
|
||||
def test_preserves_unique(self):
|
||||
facts = [
|
||||
{"id": "1", "fact": "Use git rebase for clean history"},
|
||||
{"id": "2", "fact": "Docker containers should be stateless"},
|
||||
{"id": "3", "fact": "Always write tests before code"},
|
||||
]
|
||||
deduped, stats = dedup_facts(facts)
|
||||
assert stats["unique"] == 3
|
||||
assert stats["removed"] == 0
|
||||
|
||||
def test_empty_input(self):
|
||||
deduped, stats = dedup_facts([])
|
||||
assert stats["total"] == 0
|
||||
assert stats["unique"] == 0
|
||||
|
||||
def test_keeps_higher_quality_near_dup(self):
|
||||
facts = [
|
||||
{"id": "1", "fact": "Check logs before deploying to production server", "confidence": 0.5, "source_count": 1},
|
||||
{"id": "2", "fact": "Check logs before deploying to production environment", "confidence": 0.9, "source_count": 5, "tags": ["ops"]},
|
||||
]
|
||||
deduped, stats = dedup_facts(facts, near_threshold=0.5)
|
||||
assert stats["unique"] == 1
|
||||
# Higher quality fact should be kept
|
||||
assert deduped[0]["confidence"] == 0.9
|
||||
|
||||
def test_dry_run_does_not_modify(self):
|
||||
facts = [
|
||||
{"id": "1", "fact": "Same text"},
|
||||
{"id": "2", "fact": "Same text"},
|
||||
]
|
||||
deduped, stats = dedup_facts(facts, dry_run=True)
|
||||
assert stats["exact_dupes"] == 1
|
||||
# In dry_run, merge_facts is skipped so facts aren't modified
|
||||
assert len(deduped) == 1
|
||||
|
||||
|
||||
class TestGenerateTestDuplicates:
|
||||
def test_generates_correct_count(self):
|
||||
facts = generate_test_duplicates(20)
|
||||
assert len(facts) > 20 # 20 unique + duplicates
|
||||
|
||||
def test_has_exact_dupes(self):
|
||||
facts = generate_test_duplicates(20)
|
||||
hashes = [content_hash(f["fact"]) for f in facts]
|
||||
# Should have some duplicate hashes
|
||||
assert len(hashes) != len(set(hashes))
|
||||
|
||||
def test_dedup_removes_dupes(self):
|
||||
facts = generate_test_duplicates(20)
|
||||
deduped, stats = dedup_facts(facts)
|
||||
assert stats["unique"] <= 20
|
||||
assert stats["removed"] > 0
|
||||
227
tests/test_freshness.py
Normal file
227
tests/test_freshness.py
Normal file
@@ -0,0 +1,227 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for scripts/freshness.py — 8 tests."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__) or ".", ".."))
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"freshness", os.path.join(os.path.dirname(__file__) or ".", "..", "scripts", "freshness.py"))
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
|
||||
compute_file_hash = mod.compute_file_hash
|
||||
check_freshness = mod.check_freshness
|
||||
load_knowledge_entries = mod.load_knowledge_entries
|
||||
|
||||
|
||||
def test_compute_file_hash():
|
||||
"""File hash should be computed correctly."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
|
||||
f.write("test content")
|
||||
f.flush()
|
||||
h = compute_file_hash(f.name)
|
||||
assert h is not None
|
||||
assert h.startswith("sha256:")
|
||||
os.unlink(f.name)
|
||||
print("PASS: test_compute_file_hash")
|
||||
|
||||
|
||||
def test_compute_file_hash_nonexistent():
|
||||
"""Nonexistent file should return None."""
|
||||
h = compute_file_hash("/nonexistent/file.txt")
|
||||
assert h is None
|
||||
print("PASS: test_compute_file_hash_nonexistent")
|
||||
|
||||
|
||||
def test_load_knowledge_entries_empty():
|
||||
"""Empty knowledge dir should return empty list."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
entries = load_knowledge_entries(tmpdir)
|
||||
assert entries == []
|
||||
print("PASS: test_load_knowledge_entries_empty")
|
||||
|
||||
|
||||
def test_load_knowledge_entries_from_index():
|
||||
"""Should load entries from index.json."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create index.json
|
||||
index_path = os.path.join(tmpdir, "index.json")
|
||||
with open(index_path, "w") as f:
|
||||
json.dump({
|
||||
"facts": [
|
||||
{
|
||||
"fact": "Test fact",
|
||||
"source_file": "test.py",
|
||||
"source_hash": "sha256:abc123",
|
||||
"category": "fact",
|
||||
"confidence": 0.9
|
||||
}
|
||||
]
|
||||
}, f)
|
||||
|
||||
entries = load_knowledge_entries(tmpdir)
|
||||
assert len(entries) == 1
|
||||
assert entries[0]["fact"] == "Test fact"
|
||||
assert entries[0]["source_file"] == "test.py"
|
||||
print("PASS: test_load_knowledge_entries_from_index")
|
||||
|
||||
|
||||
def test_load_knowledge_entries_from_yaml():
|
||||
"""Should load entries from YAML files."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create global directory
|
||||
global_dir = os.path.join(tmpdir, "global")
|
||||
os.makedirs(global_dir)
|
||||
|
||||
# Create YAML file
|
||||
yaml_path = os.path.join(global_dir, "test.yaml")
|
||||
with open(yaml_path, "w") as f:
|
||||
f.write("""
|
||||
pitfalls:
|
||||
- description: "Test pitfall"
|
||||
source_file: "test.py"
|
||||
source_hash: "sha256:def456"
|
||||
category: "pitfall"
|
||||
confidence: 0.8
|
||||
""")
|
||||
|
||||
entries = load_knowledge_entries(tmpdir)
|
||||
assert len(entries) == 1
|
||||
assert entries[0]["fact"] == "Test pitfall"
|
||||
assert entries[0]["category"] == "pitfall"
|
||||
print("PASS: test_load_knowledge_entries_from_yaml")
|
||||
|
||||
|
||||
def test_check_freshness_no_changes():
|
||||
"""With no source file reference, entries should be counted correctly."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create knowledge dir
|
||||
knowledge_dir = os.path.join(tmpdir, "knowledge")
|
||||
os.makedirs(knowledge_dir)
|
||||
|
||||
# Create repo dir
|
||||
repo_dir = os.path.join(tmpdir, "repo")
|
||||
os.makedirs(repo_dir)
|
||||
|
||||
# Create index.json with entry that has no source_file
|
||||
index_path = os.path.join(knowledge_dir, "index.json")
|
||||
with open(index_path, "w") as f:
|
||||
json.dump({
|
||||
"facts": [
|
||||
{
|
||||
"fact": "General knowledge",
|
||||
"category": "fact",
|
||||
"confidence": 0.9
|
||||
# No source_file or source_hash
|
||||
}
|
||||
]
|
||||
}, f)
|
||||
|
||||
result = check_freshness(knowledge_dir, repo_dir, days=1)
|
||||
|
||||
# Entry without source_file should be counted as "fresh" (no_source status)
|
||||
assert result["summary"]["total"] == 1
|
||||
assert result["summary"]["stale"] == 0
|
||||
assert result["summary"]["fresh"] == 1
|
||||
assert result["fresh_entries"][0]["status"] == "no_source"
|
||||
print("PASS: test_check_freshness_no_changes")
|
||||
|
||||
|
||||
def test_check_freshness_with_hash_mismatch():
|
||||
"""Hash mismatch should mark entry as stale."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create knowledge dir
|
||||
knowledge_dir = os.path.join(tmpdir, "knowledge")
|
||||
os.makedirs(knowledge_dir)
|
||||
|
||||
# Create repo dir with a file
|
||||
repo_dir = os.path.join(tmpdir, "repo")
|
||||
os.makedirs(repo_dir)
|
||||
|
||||
test_file = os.path.join(repo_dir, "test.py")
|
||||
with open(test_file, "w") as f:
|
||||
f.write("print('hello')")
|
||||
|
||||
# Create index.json with wrong hash
|
||||
index_path = os.path.join(knowledge_dir, "index.json")
|
||||
with open(index_path, "w") as f:
|
||||
json.dump({
|
||||
"facts": [
|
||||
{
|
||||
"fact": "Test fact",
|
||||
"source_file": "test.py",
|
||||
"source_hash": "sha256:wronghash",
|
||||
"category": "fact",
|
||||
"confidence": 0.9
|
||||
}
|
||||
]
|
||||
}, f)
|
||||
|
||||
# Initialize git repo
|
||||
os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null")
|
||||
|
||||
result = check_freshness(knowledge_dir, repo_dir, days=1)
|
||||
|
||||
assert result["summary"]["total"] == 1
|
||||
assert result["summary"]["stale"] == 1
|
||||
assert result["summary"]["fresh"] == 0
|
||||
assert result["stale_entries"][0]["reason"] == "hash_mismatch"
|
||||
print("PASS: test_check_freshness_with_hash_mismatch")
|
||||
|
||||
|
||||
def test_check_freshness_missing_source():
|
||||
"""Missing source file should mark entry as stale."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create knowledge dir
|
||||
knowledge_dir = os.path.join(tmpdir, "knowledge")
|
||||
os.makedirs(knowledge_dir)
|
||||
|
||||
# Create repo dir (without the referenced file)
|
||||
repo_dir = os.path.join(tmpdir, "repo")
|
||||
os.makedirs(repo_dir)
|
||||
|
||||
# Create index.json referencing nonexistent file
|
||||
index_path = os.path.join(knowledge_dir, "index.json")
|
||||
with open(index_path, "w") as f:
|
||||
json.dump({
|
||||
"facts": [
|
||||
{
|
||||
"fact": "Test fact",
|
||||
"source_file": "nonexistent.py",
|
||||
"source_hash": "sha256:abc123",
|
||||
"category": "fact",
|
||||
"confidence": 0.9
|
||||
}
|
||||
]
|
||||
}, f)
|
||||
|
||||
# Initialize git repo
|
||||
os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null")
|
||||
|
||||
result = check_freshness(knowledge_dir, repo_dir, days=1)
|
||||
|
||||
assert result["summary"]["total"] == 1
|
||||
assert result["summary"]["stale"] == 1
|
||||
assert result["summary"]["fresh"] == 0
|
||||
assert result["stale_entries"][0]["reason"] == "source_missing"
|
||||
print("PASS: test_check_freshness_missing_source")
|
||||
|
||||
|
||||
def run_all():
|
||||
test_compute_file_hash()
|
||||
test_compute_file_hash_nonexistent()
|
||||
test_load_knowledge_entries_empty()
|
||||
test_load_knowledge_entries_from_index()
|
||||
test_load_knowledge_entries_from_yaml()
|
||||
test_check_freshness_no_changes()
|
||||
test_check_freshness_with_hash_mismatch()
|
||||
test_check_freshness_missing_source()
|
||||
print("\nAll 8 tests passed!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_all()
|
||||
108
tests/test_quality_gate.py
Normal file
108
tests/test_quality_gate.py
Normal file
@@ -0,0 +1,108 @@
|
||||
"""
|
||||
Tests for quality_gate.py — Knowledge entry quality scoring.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from quality_gate import (
|
||||
score_specificity,
|
||||
score_actionability,
|
||||
score_freshness,
|
||||
score_source_quality,
|
||||
score_entry,
|
||||
filter_entries,
|
||||
)
|
||||
|
||||
|
||||
class TestScoreSpecificity(unittest.TestCase):
|
||||
def test_specific_content_scores_high(self):
|
||||
content = "Run `python3 deploy.py --env prod` on 2026-04-15. Example: step 1 configure nginx."
|
||||
score = score_specificity(content)
|
||||
self.assertGreater(score, 0.6)
|
||||
|
||||
def test_vague_content_scores_low(self):
|
||||
content = "It generally depends. Various factors might affect this. Basically, it varies."
|
||||
score = score_specificity(content)
|
||||
self.assertLess(score, 0.5)
|
||||
|
||||
def test_empty_scores_baseline(self):
|
||||
score = score_specificity("")
|
||||
self.assertAlmostEqual(score, 0.5, delta=0.1)
|
||||
|
||||
|
||||
class TestScoreActionability(unittest.TestCase):
|
||||
def test_actionable_content_scores_high(self):
|
||||
content = "1. Run `pip install -r requirements.txt`\n2. Execute `python3 train.py`\n3. Verify with `pytest`"
|
||||
score = score_actionability(content)
|
||||
self.assertGreater(score, 0.6)
|
||||
|
||||
def test_abstract_content_scores_low(self):
|
||||
content = "The concept of intelligence is fascinating and multifaceted."
|
||||
score = score_actionability(content)
|
||||
self.assertLess(score, 0.5)
|
||||
|
||||
|
||||
class TestScoreFreshness(unittest.TestCase):
|
||||
def test_recent_timestamp_scores_high(self):
|
||||
recent = datetime.now(timezone.utc).isoformat()
|
||||
score = score_freshness(recent)
|
||||
self.assertGreater(score, 0.9)
|
||||
|
||||
def test_old_timestamp_scores_low(self):
|
||||
old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
|
||||
score = score_freshness(old)
|
||||
self.assertLess(score, 0.2)
|
||||
|
||||
def test_none_returns_baseline(self):
|
||||
score = score_freshness(None)
|
||||
self.assertEqual(score, 0.5)
|
||||
|
||||
|
||||
class TestScoreSourceQuality(unittest.TestCase):
|
||||
def test_claude_scores_high(self):
|
||||
self.assertGreater(score_source_quality("claude-sonnet"), 0.85)
|
||||
|
||||
def test_ollama_scores_lower(self):
|
||||
self.assertLess(score_source_quality("ollama"), 0.7)
|
||||
|
||||
def test_unknown_returns_default(self):
|
||||
self.assertEqual(score_source_quality("unknown"), 0.5)
|
||||
|
||||
|
||||
class TestScoreEntry(unittest.TestCase):
|
||||
def test_good_entry_scores_high(self):
|
||||
entry = {
|
||||
"content": "To deploy: run `kubectl apply -f deployment.yaml`. Verify with `kubectl get pods`.",
|
||||
"model": "claude-sonnet",
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
score = score_entry(entry)
|
||||
self.assertGreater(score, 0.6)
|
||||
|
||||
def test_poor_entry_scores_low(self):
|
||||
entry = {
|
||||
"content": "It depends. Various things might happen.",
|
||||
"model": "unknown",
|
||||
}
|
||||
score = score_entry(entry)
|
||||
self.assertLess(score, 0.5)
|
||||
|
||||
|
||||
class TestFilterEntries(unittest.TestCase):
|
||||
def test_filters_low_quality(self):
|
||||
entries = [
|
||||
{"content": "Run `deploy.py` to fix the issue.", "model": "claude"},
|
||||
{"content": "It might work sometimes.", "model": "unknown"},
|
||||
{"content": "Configure nginx: step 1 edit nginx.conf", "model": "gpt-4"},
|
||||
]
|
||||
filtered = filter_entries(entries, threshold=0.5)
|
||||
self.assertGreaterEqual(len(filtered), 2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user