Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
da073ad7cf feat: add harvester.py — session knowledge extractor (#8)
Main harvester module that chains:
  session_reader → extraction prompt → LLM → validate → deduplicate → store

Includes:
- scripts/harvester.py — main module (reader + prompt + storage pipeline)
- scripts/session_reader.py — JSONL transcript parser
- scripts/test_harvester_pipeline.py — smoke tests (all passing)

Pipeline:
  1. Read session JSONL via session_reader
  2. Truncate long sessions (first 50 + last 50 messages)
  3. Send transcript + extraction prompt to LLM (mimo-v2-pro)
  4. Parse structured JSON response (facts/pitfalls/patterns/quirks/questions)
  5. Validate fields + confidence threshold
  6. Deduplicate against knowledge/index.json (fingerprint + word overlap)
  7. Write to knowledge store (index.json + per-repo markdown)

CLI:
  Single:  python3 harvester.py --session <path> --output knowledge/
  Batch:   python3 harvester.py --batch --since 2026-04-01 --limit 100
  Dry-run: python3 harvester.py --session <path> --dry-run
2026-04-14 14:03:30 -04:00
13 changed files with 751 additions and 1409 deletions

447
scripts/harvester.py Normal file
View File

@@ -0,0 +1,447 @@
#!/usr/bin/env python3
"""
harvester.py — Extract durable knowledge from Hermes session transcripts.
Combines session_reader + extraction prompt + LLM inference to pull
facts, pitfalls, patterns, and tool quirks from finished sessions.
Usage:
python3 harvester.py --session ~/.hermes/sessions/session_xxx.jsonl --output knowledge/
python3 harvester.py --batch --since 2026-04-01 --limit 100
python3 harvester.py --session session.jsonl --dry-run # Preview without writing
"""
import argparse
import json
import os
import sys
import time
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Add scripts dir to path for sibling imports
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text
# --- Configuration ---
DEFAULT_API_BASE = os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
DEFAULT_API_KEY = os.environ.get("HARVESTER_API_KEY", "")
DEFAULT_MODEL = os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
KNOWLEDGE_DIR = os.environ.get("HARVESTER_KNOWLEDGE_DIR", "knowledge")
PROMPT_PATH = os.environ.get("HARVESTER_PROMPT_PATH", str(SCRIPT_DIR.parent / "templates" / "harvest-prompt.md"))
# Where to look for API keys if not set via env
API_KEY_PATHS = [
os.path.expanduser("~/.config/nous/key"),
os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"),
os.path.expanduser("~/.config/openrouter/key"),
]
def find_api_key() -> str:
"""Find API key from common locations."""
for path in API_KEY_PATHS:
if os.path.exists(path):
with open(path) as f:
key = f.read().strip()
if key:
return key
return ""
def load_extraction_prompt() -> str:
"""Load the extraction prompt template."""
path = Path(PROMPT_PATH)
if not path.exists():
print(f"ERROR: Extraction prompt not found at {path}", file=sys.stderr)
print("Expected templates/harvest-prompt.md from issue #7", file=sys.stderr)
sys.exit(1)
return path.read_text(encoding='utf-8')
def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str) -> Optional[list[dict]]:
"""Call the LLM API to extract knowledge from a transcript."""
import urllib.request
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": f"Extract knowledge from this session transcript:\n\n{transcript}"}
]
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.1, # Low temp for consistent extraction
"max_tokens": 4096
}).encode('utf-8')
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read().decode('utf-8'))
content = result["choices"][0]["message"]["content"]
return parse_extraction_response(content)
except Exception as e:
print(f"ERROR: LLM API call failed: {e}", file=sys.stderr)
return None
def parse_extraction_response(content: str) -> Optional[list[dict]]:
"""Parse the LLM response to extract knowledge items.
Handles various response formats: raw JSON, markdown-wrapped JSON, etc.
"""
# Try direct JSON parse first
try:
data = json.loads(content)
if isinstance(data, dict) and 'knowledge' in data:
return data['knowledge']
if isinstance(data, list):
return data
except json.JSONDecodeError:
pass
# Try extracting JSON from markdown code blocks
import re
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group(1))
if isinstance(data, dict) and 'knowledge' in data:
return data['knowledge']
if isinstance(data, list):
return data
except json.JSONDecodeError:
pass
# Try finding any JSON object with knowledge array
json_match = re.search(r'({[^{}]*"knowledge"[^{}]*[[sS]*?][^{}]*})', content)
if json_match:
try:
data = json.loads(json_match.group(1))
return data.get('knowledge', [])
except json.JSONDecodeError:
pass
print(f"WARNING: Could not parse LLM response as JSON", file=sys.stderr)
print(f"Response preview: {content[:500]}", file=sys.stderr)
return None
def load_existing_knowledge(knowledge_dir: str) -> dict:
"""Load the existing knowledge index."""
index_path = Path(knowledge_dir) / "index.json"
if not index_path.exists():
return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []}
try:
with open(index_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"WARNING: Could not load knowledge index: {e}", file=sys.stderr)
return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []}
def fact_fingerprint(fact: dict) -> str:
"""Generate a deduplication fingerprint for a fact.
Uses the fact text normalized (lowercase, stripped) as the key.
Similar facts will have similar fingerprints.
"""
text = fact.get('fact', '').lower().strip()
# Normalize whitespace
text = ' '.join(text.split())
return hashlib.md5(text.encode('utf-8')).hexdigest()
def deduplicate(new_facts: list[dict], existing: list[dict], similarity_threshold: float = 0.8) -> list[dict]:
"""Remove duplicate facts from new_facts that already exist in the knowledge store.
Uses fingerprint matching for exact dedup and simple overlap check for near-dupes.
"""
existing_fingerprints = set()
existing_texts = []
for f in existing:
fp = fact_fingerprint(f)
existing_fingerprints.add(fp)
existing_texts.append(f.get('fact', '').lower().strip())
unique = []
for fact in new_facts:
fp = fact_fingerprint(fact)
if fp in existing_fingerprints:
continue
# Check for near-duplicates using simple word overlap
fact_words = set(fact.get('fact', '').lower().split())
is_dup = False
for existing_text in existing_texts:
existing_words = set(existing_text.split())
if not fact_words or not existing_words:
continue
overlap = len(fact_words & existing_words) / max(len(fact_words | existing_words), 1)
if overlap >= similarity_threshold:
is_dup = True
break
if not is_dup:
unique.append(fact)
existing_fingerprints.add(fp)
existing_texts.append(fact.get('fact', '').lower().strip())
return unique
def validate_fact(fact: dict) -> bool:
"""Validate a single knowledge item has required fields."""
required = ['fact', 'category', 'repo', 'confidence']
for field in required:
if field not in fact:
return False
if not isinstance(fact['fact'], str) or not fact['fact'].strip():
return False
valid_categories = ['fact', 'pitfall', 'pattern', 'tool-quirk', 'question']
if fact['category'] not in valid_categories:
return False
if not isinstance(fact.get('confidence', 0), (int, float)):
return False
if not (0.0 <= fact['confidence'] <= 1.0):
return False
return True
def write_knowledge(index: dict, new_facts: list[dict], knowledge_dir: str, source_session: str = ""):
"""Write new facts to the knowledge store."""
kdir = Path(knowledge_dir)
kdir.mkdir(parents=True, exist_ok=True)
# Add source tracking to each fact
for fact in new_facts:
fact['source_session'] = source_session
fact['harvested_at'] = datetime.now(timezone.utc).isoformat()
# Update index
index['facts'].extend(new_facts)
index['total_facts'] = len(index['facts'])
index['last_updated'] = datetime.now(timezone.utc).isoformat()
# Write index
index_path = kdir / "index.json"
with open(index_path, 'w', encoding='utf-8') as f:
json.dump(index, f, indent=2, ensure_ascii=False)
# Also write per-repo markdown files for human reading
repos = {}
for fact in new_facts:
repo = fact.get('repo', 'global')
repos.setdefault(repo, []).append(fact)
for repo, facts in repos.items():
if repo == 'global':
md_path = kdir / "global" / "harvested.md"
else:
md_path = kdir / "repos" / f"{repo}.md"
md_path.parent.mkdir(parents=True, exist_ok=True)
# Append to existing or create new
mode = 'a' if md_path.exists() else 'w'
with open(md_path, mode, encoding='utf-8') as f:
if mode == 'w':
f.write(f"# Knowledge: {repo}\n\n")
f.write(f"## Harvested {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')}\n\n")
for fact in facts:
icon = {'fact': '📋', 'pitfall': '⚠️', 'pattern': '🔄', 'tool-quirk': '🔧', 'question': ''}.get(fact['category'], '')
f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n")
f.write("\n")
def harvest_session(session_path: str, knowledge_dir: str, api_base: str, api_key: str,
model: str, dry_run: bool = False, min_confidence: float = 0.3) -> dict:
"""Harvest knowledge from a single session.
Returns: dict with stats (facts_found, facts_new, facts_dup, elapsed_seconds, error)
"""
start_time = time.time()
stats = {
'session': session_path,
'facts_found': 0,
'facts_new': 0,
'facts_dup': 0,
'elapsed_seconds': 0,
'error': None
}
try:
# 1. Read session
messages = read_session(session_path)
if not messages:
stats['error'] = "Empty session file"
return stats
# 2. Extract conversation
conv = extract_conversation(messages)
if not conv:
stats['error'] = "No conversation turns found"
return stats
# 3. Truncate for context window
truncated = truncate_for_context(conv, head=50, tail=50)
transcript = messages_to_text(truncated)
# 4. Load extraction prompt
prompt = load_extraction_prompt()
# 5. Call LLM
raw_facts = call_llm(prompt, transcript, api_base, api_key, model)
if raw_facts is None:
stats['error'] = "LLM extraction failed"
return stats
# 6. Validate
valid_facts = [f for f in raw_facts if validate_fact(f) and f.get('confidence', 0) >= min_confidence]
stats['facts_found'] = len(valid_facts)
# 7. Deduplicate
existing_index = load_existing_knowledge(knowledge_dir)
existing_facts = existing_index.get('facts', [])
new_facts = deduplicate(valid_facts, existing_facts)
stats['facts_new'] = len(new_facts)
stats['facts_dup'] = len(valid_facts) - len(new_facts)
# 8. Write (unless dry run)
if new_facts and not dry_run:
write_knowledge(existing_index, new_facts, knowledge_dir, source_session=session_path)
stats['elapsed_seconds'] = round(time.time() - start_time, 2)
return stats
except Exception as e:
stats['error'] = str(e)
stats['elapsed_seconds'] = round(time.time() - start_time, 2)
return stats
def batch_harvest(sessions_dir: str, knowledge_dir: str, api_base: str, api_key: str,
model: str, since: str = "", limit: int = 0, dry_run: bool = False) -> list[dict]:
"""Harvest knowledge from multiple sessions in batch."""
sessions_path = Path(sessions_dir)
if not sessions_path.is_dir():
print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr)
return []
# Find session files
session_files = sorted(sessions_path.glob("*.jsonl"), reverse=True) # Newest first
# Filter by date if --since provided
if since:
since_dt = datetime.fromisoformat(since.replace('Z', '+00:00'))
filtered = []
for sf in session_files:
# Try to parse timestamp from filename (common format: session_YYYYMMDD_HHMMSS_hash.jsonl)
try:
parts = sf.stem.split('_')
if len(parts) >= 3:
date_str = parts[1]
file_dt = datetime.strptime(date_str, '%Y%m%d').replace(tzinfo=timezone.utc)
if file_dt >= since_dt:
filtered.append(sf)
except (ValueError, IndexError):
# If we can't parse the date, include the file (be permissive)
filtered.append(sf)
session_files = filtered
# Apply limit
if limit > 0:
session_files = session_files[:limit]
print(f"Harvesting {len(session_files)} sessions...")
results = []
for i, sf in enumerate(session_files, 1):
print(f"[{i}/{len(session_files)}] {sf.name}...", end=" ", flush=True)
stats = harvest_session(str(sf), knowledge_dir, api_base, api_key, model, dry_run)
if stats['error']:
print(f"ERROR: {stats['error']}")
else:
print(f"{stats['facts_new']} new, {stats['facts_dup']} dup ({stats['elapsed_seconds']}s)")
results.append(stats)
return results
def main():
parser = argparse.ArgumentParser(description="Harvest knowledge from session transcripts")
parser.add_argument('--session', help='Path to a single session JSONL file')
parser.add_argument('--batch', action='store_true', help='Batch mode: process multiple sessions')
parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'),
help='Directory containing session files (default: ~/.hermes/sessions)')
parser.add_argument('--output', default='knowledge', help='Output directory for knowledge store')
parser.add_argument('--since', default='', help='Only process sessions after this date (YYYY-MM-DD)')
parser.add_argument('--limit', type=int, default=0, help='Max sessions to process (0=unlimited)')
parser.add_argument('--api-base', default=DEFAULT_API_BASE, help='LLM API base URL')
parser.add_argument('--api-key', default='', help='LLM API key (or set HARVESTER_API_KEY)')
parser.add_argument('--model', default=DEFAULT_MODEL, help='Model to use for extraction')
parser.add_argument('--dry-run', action='store_true', help='Preview without writing to knowledge store')
parser.add_argument('--min-confidence', type=float, default=0.3, help='Minimum confidence threshold')
args = parser.parse_args()
# Resolve API key
api_key = args.api_key or DEFAULT_API_KEY or find_api_key()
if not api_key:
print("ERROR: No API key found. Set HARVESTER_API_KEY or store in one of:", file=sys.stderr)
for p in API_KEY_PATHS:
print(f" {p}", file=sys.stderr)
sys.exit(1)
# Resolve knowledge directory
knowledge_dir = args.output
if not os.path.isabs(knowledge_dir):
knowledge_dir = os.path.join(SCRIPT_DIR.parent, knowledge_dir)
if args.session:
# Single session mode
stats = harvest_session(
args.session, knowledge_dir, args.api_base, api_key, args.model,
dry_run=args.dry_run, min_confidence=args.min_confidence
)
print(json.dumps(stats, indent=2))
if stats['error']:
sys.exit(1)
elif args.batch:
# Batch mode
results = batch_harvest(
args.sessions_dir, knowledge_dir, args.api_base, api_key, args.model,
since=args.since, limit=args.limit, dry_run=args.dry_run
)
total_new = sum(r['facts_new'] for r in results)
total_dup = sum(r['facts_dup'] for r in results)
errors = sum(1 for r in results if r['error'])
print(f"\nDone: {total_new} new facts, {total_dup} duplicates, {errors} errors")
else:
parser.print_help()
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -1,131 +0,0 @@
#!/usr/bin/env python3
"""
Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes.
Usage:
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
"""
import argparse
import hashlib
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
try:
with open(filepath, "rb") as f:
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return None
def check_staleness(index_path: str, repo_root: str = ".") -> List[Dict[str, Any]]:
"""Check all entries in knowledge index for staleness.
Returns list of entries with staleness info:
- status: "fresh" | "stale" | "missing_source" | "no_hash"
- current_hash: computed hash (if source exists)
- stored_hash: hash from index
"""
with open(index_path) as f:
data = json.load(f)
facts = data.get("facts", [])
results = []
for entry in facts:
source_file = entry.get("source_file")
stored_hash = entry.get("source_hash")
if not source_file:
results.append({**entry, "status": "no_source", "current_hash": None})
continue
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash is None:
results.append({**entry, "status": "missing_source", "current_hash": None})
elif not stored_hash:
results.append({**entry, "status": "no_hash", "current_hash": current_hash})
elif current_hash != stored_hash:
results.append({**entry, "status": "stale", "current_hash": current_hash})
else:
results.append({**entry, "status": "fresh", "current_hash": current_hash})
return results
def fix_hashes(index_path: str, repo_root: str = ".") -> int:
"""Add hashes to entries missing them. Returns count of fixed entries."""
with open(index_path) as f:
data = json.load(f)
fixed = 0
for entry in data.get("facts", []):
if entry.get("source_hash"):
continue
source_file = entry.get("source_file")
if not source_file:
continue
full_path = os.path.join(repo_root, source_file)
h = compute_file_hash(full_path)
if h:
entry["source_hash"] = h
fixed += 1
with open(index_path, "w") as f:
json.dump(data, f, indent=2)
return fixed
def main():
parser = argparse.ArgumentParser(description="Check knowledge store staleness")
parser.add_argument("--index", required=True, help="Path to knowledge/index.json")
parser.add_argument("--repo", default=".", help="Repo root for source file resolution")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--fix", action="store_true", help="Add hashes to entries missing them")
args = parser.parse_args()
if args.fix:
fixed = fix_hashes(args.index, args.repo)
print(f"Fixed {fixed} entries with missing hashes.")
return
results = check_staleness(args.index, args.repo)
if args.json:
print(json.dumps(results, indent=2))
else:
stale = [r for r in results if r["status"] != "fresh"]
fresh = [r for r in results if r["status"] == "fresh"]
print(f"Knowledge Store Staleness Check")
print(f" Total entries: {len(results)}")
print(f" Fresh: {len(fresh)}")
print(f" Stale/Issues: {len(stale)}")
print()
if stale:
print("Issues found:")
for r in stale:
status = r["status"]
fact = r.get("fact", "?")[:60]
source = r.get("source_file", "?")
print(f" [{status}] {source}: {fact}")
else:
print("All entries are fresh!")
if __name__ == "__main__":
main()

View File

@@ -1,551 +0,0 @@
#!/usr/bin/env python3
"""
Performance Bottleneck Finder — Identify slow tests, builds, and CI steps.
Analyzes:
1. Pytest output for slow tests
2. Build logs for slow steps
3. CI workflow durations
4. File system for large/slow artifacts
Usage:
python3 scripts/perf_bottleneck_finder.py --repo /path/to/repo
python3 scripts/perf_bottleneck_finder.py --repo /path/to/repo --json
python3 scripts/perf_bottleneck_finder.py --repo /path/to/repo --report metrics/perf_report.md
Weekly cron:
0 9 * * 1 cd /path/to/compounding-intelligence && python3 scripts/perf_bottleneck_finder.py --repo /path/to/target --report metrics/perf_report.md
"""
import argparse
import json
import os
import re
import subprocess
import sys
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# ── Configuration ──────────────────────────────────────────────────
SLOW_TEST_THRESHOLD_S = 2.0 # Tests slower than this are flagged
SLOW_BUILD_STEP_THRESHOLD_S = 10.0
TOP_N_BOTTLENECKS = 10 # Report top N bottlenecks
PYTEST_DURATIONS_COUNT = 20 # Number of slow tests to collect
LOG_EXTENSIONS = {".log", ".txt"}
@dataclass
class Bottleneck:
"""A single performance bottleneck."""
category: str # "test", "build", "ci", "artifact", "import"
name: str # What's slow
duration_s: float # How long it takes
severity: str # "critical", "warning", "info"
recommendation: str # How to fix
file_path: Optional[str] = None
line_number: Optional[int] = None
@dataclass
class PerfReport:
"""Full performance report."""
timestamp: str
repo_path: str
bottlenecks: List[Bottleneck] = field(default_factory=list)
summary: Dict[str, Any] = field(default_factory=dict)
test_stats: Dict[str, Any] = field(default_factory=dict)
build_stats: Dict[str, Any] = field(default_factory=dict)
ci_stats: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict:
d = asdict(self)
return d
# ── Test Analysis ──────────────────────────────────────────────────
def find_slow_tests_pytest(repo_path: str) -> List[Bottleneck]:
"""Run pytest --durations and parse slow tests."""
bottlenecks = []
# Try to run pytest with durations
try:
result = subprocess.run(
["python3", "-m", "pytest", "--co", "-q", "--durations=0"],
cwd=repo_path, capture_output=True, text=True, timeout=30
)
# If tests exist, try to get durations from last run
durations_file = os.path.join(repo_path, ".pytest_cache", "v", "cache", "durations")
if os.path.exists(durations_file):
with open(durations_file) as f:
for line in f:
parts = line.strip().split()
if len(parts) >= 2:
try:
duration = float(parts[0])
test_name = " ".join(parts[1:])
if duration > SLOW_TEST_THRESHOLD_S:
severity = "critical" if duration > 10 else "warning"
bottlenecks.append(Bottleneck(
category="test",
name=test_name,
duration_s=duration,
severity=severity,
recommendation=f"Test takes {duration:.1f}s. Consider mocking slow I/O, using fixtures, or marking with @pytest.mark.slow."
))
except ValueError:
continue
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return bottlenecks
def find_slow_tests_by_scan(repo_path: str) -> List[Bottleneck]:
"""Scan test files for patterns that indicate slow tests."""
bottlenecks = []
test_patterns = [
(r"time\.sleep\((\d+(?:\.\d+)?)\)", "Contains time.sleep() — consider using mock or async wait"),
(r"subprocess\.run\(.*timeout=(\d+)", "Subprocess with timeout — may block test"),
(r"requests\.(get|post|put|delete)\(", "Real HTTP call — mock with responses or httpretty"),
(r"open\([^)]*['"]w['"]", "File I/O in test — use tmp_path fixture"),
]
for root, dirs, files in os.walk(repo_path):
# Skip hidden and cache dirs
dirs[:] = [d for d in dirs if not d.startswith(('.', '__pycache__', 'node_modules', '.git'))]
for fname in files:
if not (fname.startswith("test_") or fname.endswith("_test.py")):
continue
if not fname.endswith(".py"):
continue
fpath = os.path.join(root, fname)
rel_path = os.path.relpath(fpath, repo_path)
try:
with open(fpath) as f:
lines = f.readlines()
except (PermissionError, UnicodeDecodeError):
continue
for i, line in enumerate(lines):
for pattern, recommendation in test_patterns:
match = re.search(pattern, line)
if match:
duration = 1.0 # Default estimate
if "sleep" in pattern:
try:
duration = float(match.group(1))
except (ValueError, IndexError):
duration = 1.0
elif "timeout" in pattern:
try:
duration = float(match.group(1))
except (ValueError, IndexError):
duration = 10.0
else:
duration = 2.0 # Estimated
bottlenecks.append(Bottleneck(
category="test",
name=f"{rel_path}:{i+1}",
duration_s=duration,
severity="warning" if duration < 5 else "critical",
recommendation=recommendation,
file_path=rel_path,
line_number=i + 1
))
return bottlenecks
# ── Build Analysis ─────────────────────────────────────────────────
def analyze_build_artifacts(repo_path: str) -> List[Bottleneck]:
"""Find large build artifacts that slow down builds."""
bottlenecks = []
large_dirs = {
"node_modules": "Consider using npm ci --production or yarn --production",
"__pycache__": "Consider .gitignore and cleaning before builds",
".tox": "Consider caching tox environments",
".pytest_cache": "Consider cleaning between CI runs",
"dist": "Check if dist/ artifacts are being rebuilt unnecessarily",
"build": "Check if build/ artifacts are being rebuilt unnecessarily",
".next": "Next.js cache — consider incremental builds",
"venv": "Virtual env in repo — move outside or use Docker",
}
for dirname, recommendation in large_dirs.items():
dirpath = os.path.join(repo_path, dirname)
if os.path.isdir(dirpath):
total_size = 0
file_count = 0
for root, dirs, files in os.walk(dirpath):
for f in files:
try:
fpath = os.path.join(root, f)
total_size += os.path.getsize(fpath)
file_count += 1
except OSError:
pass
if total_size > 10 * 1024 * 1024: # > 10MB
size_mb = total_size / (1024 * 1024)
bottlenecks.append(Bottleneck(
category="build",
name=f"{dirname}/ ({size_mb:.1f}MB, {file_count} files)",
duration_s=size_mb * 0.5, # Rough estimate
severity="critical" if size_mb > 100 else "warning",
recommendation=recommendation
))
return bottlenecks
def analyze_makefile_targets(repo_path: str) -> List[Bottleneck]:
"""Analyze Makefile for potentially slow targets."""
bottlenecks = []
makefiles = []
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith(('.', '__pycache__'))]
for f in files:
if f in ("Makefile", "makefile", "GNUmakefile"):
makefiles.append(os.path.join(root, f))
slow_patterns = [
(r"pip install", "pip install without --no-deps or constraints"),
(r"npm install(?!.*--production)", "npm install without --production flag"),
(r"docker build", "Docker build — consider multi-stage and layer caching"),
(r"pytest(?!.*-x|--maxfail)", "pytest without early exit on failure"),
(r"mypy|mypy --strict", "Type checking — consider incremental mode"),
]
for mfile in makefiles:
rel_path = os.path.relpath(mfile, repo_path)
try:
with open(mfile) as f:
content = f.read()
except (PermissionError, UnicodeDecodeError):
continue
for pattern, recommendation in slow_patterns:
if re.search(pattern, content):
bottlenecks.append(Bottleneck(
category="build",
name=f"{rel_path}: {pattern}",
duration_s=5.0,
severity="info",
recommendation=recommendation,
file_path=rel_path
))
return bottlenecks
# ── CI Analysis ────────────────────────────────────────────────────
def analyze_github_actions(repo_path: str) -> List[Bottleneck]:
"""Analyze GitHub Actions workflow files for inefficiencies."""
bottlenecks = []
workflow_dir = os.path.join(repo_path, ".github", "workflows")
if not os.path.isdir(workflow_dir):
return bottlenecks
slow_patterns = [
(r"runs-on:\s*ubuntu-latest", 0, "Consider caching dependencies between runs"),
(r"npm install", 2, "Use npm ci instead of npm install for reproducible builds"),
(r"pip install(?!.*--cache-dir)", 2, "Add --cache-dir or use pip cache action"),
(r"docker build(?!.*--cache-from)", 5, "Use Docker layer caching"),
(r"python -m pytest(?!.*-n|--numprocesses)", 3, "Consider pytest-xdist for parallel test execution"),
]
for fname in os.listdir(workflow_dir):
if not fname.endswith(('.yml', '.yaml')):
continue
fpath = os.path.join(workflow_dir, fname)
try:
with open(fpath) as f:
content = f.read()
except (PermissionError, UnicodeDecodeError):
continue
for pattern, est_savings, recommendation in slow_patterns:
if re.search(pattern, content):
bottlenecks.append(Bottleneck(
category="ci",
name=f"{fname}: {pattern}",
duration_s=est_savings,
severity="info",
recommendation=recommendation,
file_path=f".github/workflows/{fname}"
))
return bottlenecks
def analyze_gitea_ci(repo_path: str) -> List[Bottleneck]:
"""Analyze Gitea/Drone CI config files."""
bottlenecks = []
ci_files = [".gitea/workflows", ".drone.yml", ".woodpecker.yml"]
for ci_path in ci_files:
full_path = os.path.join(repo_path, ci_path)
if os.path.isfile(full_path):
try:
with open(full_path) as f:
content = f.read()
except (PermissionError, UnicodeDecodeError):
continue
if "pip install" in content and "--cache-dir" not in content:
bottlenecks.append(Bottleneck(
category="ci",
name=f"{ci_path}: pip without cache",
duration_s=5.0,
severity="warning",
recommendation="Add --cache-dir or mount pip cache volume",
file_path=ci_path
))
elif os.path.isdir(full_path):
for fname in os.listdir(full_path):
if not fname.endswith(('.yml', '.yaml')):
continue
fpath = os.path.join(full_path, fname)
try:
with open(fpath) as f:
content = f.read()
except (PermissionError, UnicodeDecodeError):
continue
if "pip install" in content and "--cache-dir" not in content:
bottlenecks.append(Bottleneck(
category="ci",
name=f"{ci_path}/{fname}: pip without cache",
duration_s=5.0,
severity="warning",
recommendation="Add --cache-dir or mount pip cache volume",
file_path=f"{ci_path}/{fname}"
))
return bottlenecks
# ── Import Analysis ────────────────────────────────────────────────
def find_slow_imports(repo_path: str) -> List[Bottleneck]:
"""Find Python files with heavy import chains."""
bottlenecks = []
heavy_imports = {
"pandas": 0.5,
"numpy": 0.3,
"torch": 2.0,
"tensorflow": 3.0,
"scipy": 0.5,
"matplotlib": 0.8,
"sklearn": 0.5,
"transformers": 1.5,
}
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith(('.', '__pycache__', 'node_modules'))]
for fname in files:
if not fname.endswith(".py"):
continue
fpath = os.path.join(root, fname)
rel_path = os.path.relpath(fpath, repo_path)
try:
with open(fpath) as f:
lines = f.readlines()
except (PermissionError, UnicodeDecodeError):
continue
for i, line in enumerate(lines):
stripped = line.strip()
if stripped.startswith("import ") or stripped.startswith("from "):
for heavy, est_time in heavy_imports.items():
if heavy in stripped:
bottlenecks.append(Bottleneck(
category="import",
name=f"{rel_path}:{i+1}: import {heavy}",
duration_s=est_time,
severity="info" if est_time < 1.0 else "warning",
recommendation=f"Heavy import ({heavy} ~{est_time}s). Consider lazy import or conditional import.",
file_path=rel_path,
line_number=i + 1
))
return bottlenecks
# ── Report Generation ──────────────────────────────────────────────
def severity_sort_key(b: Bottleneck) -> Tuple[int, float]:
"""Sort by severity then duration."""
sev_order = {"critical": 0, "warning": 1, "info": 2}
return (sev_order.get(b.severity, 3), -b.duration_s)
def generate_report(repo_path: str) -> PerfReport:
"""Run all analyses and generate a performance report."""
report = PerfReport(
timestamp=datetime.now(timezone.utc).isoformat(),
repo_path=os.path.abspath(repo_path)
)
# Collect all bottlenecks
all_bottlenecks = []
print("Scanning for slow tests (pytest cache)...")
all_bottlenecks.extend(find_slow_tests_pytest(repo_path))
print("Scanning for slow test patterns...")
all_bottlenecks.extend(find_slow_tests_by_scan(repo_path))
print("Analyzing build artifacts...")
all_bottlenecks.extend(analyze_build_artifacts(repo_path))
print("Analyzing Makefiles...")
all_bottlenecks.extend(analyze_makefile_targets(repo_path))
print("Analyzing CI workflows...")
all_bottlenecks.extend(analyze_github_actions(repo_path))
all_bottlenecks.extend(analyze_gitea_ci(repo_path))
print("Scanning for heavy imports...")
all_bottlenecks.extend(find_slow_imports(repo_path))
# Sort by severity and duration
all_bottlenecks.sort(key=severity_sort_key)
report.bottlenecks = all_bottlenecks[:TOP_N_BOTTLENECKS * 2] # Keep more for stats
# Compute summary
by_category = defaultdict(list)
for b in all_bottlenecks:
by_category[b.category].append(b)
report.summary = {
"total_bottlenecks": len(all_bottlenecks),
"critical": sum(1 for b in all_bottlenecks if b.severity == "critical"),
"warning": sum(1 for b in all_bottlenecks if b.severity == "warning"),
"info": sum(1 for b in all_bottlenecks if b.severity == "info"),
"estimated_total_slowdown_s": sum(b.duration_s for b in all_bottlenecks),
"by_category": {cat: len(items) for cat, items in by_category.items()},
}
report.test_stats = {
"slow_tests": len(by_category.get("test", [])),
"total_estimated_s": sum(b.duration_s for b in by_category.get("test", [])),
}
report.build_stats = {
"build_issues": len(by_category.get("build", [])),
"total_estimated_s": sum(b.duration_s for b in by_category.get("build", [])),
}
report.ci_stats = {
"ci_issues": len(by_category.get("ci", [])),
"total_estimated_s": sum(b.duration_s for b in by_category.get("ci", [])),
}
return report
def format_markdown(report: PerfReport) -> str:
"""Format report as markdown."""
lines = []
lines.append(f"# Performance Bottleneck Report")
lines.append(f"")
lines.append(f"Generated: {report.timestamp}")
lines.append(f"Repository: {report.repo_path}")
lines.append(f"")
# Summary
s = report.summary
lines.append(f"## Summary")
lines.append(f"")
lines.append(f"- **Total bottlenecks:** {s['total_bottlenecks']}")
lines.append(f"- **Critical:** {s['critical']} | **Warning:** {s['warning']} | **Info:** {s['info']}")
lines.append(f"- **Estimated total slowdown:** {s['estimated_total_slowdown_s']:.1f}s")
lines.append(f"- **By category:** {', '.join(f'{k}: {v}' for k, v in s['by_category'].items())}")
lines.append(f"")
# Top bottlenecks
lines.append(f"## Top {min(TOP_N_BOTTLENECKS, len(report.bottlenecks))} Bottlenecks")
lines.append(f"")
for i, b in enumerate(report.bottlenecks[:TOP_N_BOTTLENECKS], 1):
icon = {"critical": "🔴", "warning": "🟡", "info": "🔵"}.get(b.severity, "")
loc = f" ({b.file_path}:{b.line_number})" if b.file_path else ""
lines.append(f"{i}. {icon} **{b.category}** — {b.name}{loc}")
lines.append(f" - Duration: ~{b.duration_s:.1f}s | Severity: {b.severity}")
lines.append(f" - Fix: {b.recommendation}")
lines.append(f"")
# Category breakdowns
for cat in ["test", "build", "ci", "import"]:
items = [b for b in report.bottlenecks if b.category == cat]
if items:
lines.append(f"## {cat.title()} Bottlenecks")
lines.append(f"")
for b in items:
icon = {"critical": "🔴", "warning": "🟡", "info": "🔵"}.get(b.severity, "")
loc = f" ({b.file_path}:{b.line_number})" if b.file_path else ""
lines.append(f"- {icon} {b.name}{loc} — ~{b.duration_s:.1f}s — {b.recommendation}")
lines.append(f"")
return "
".join(lines)
# ── Main ───────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Performance Bottleneck Finder")
parser.add_argument("--repo", default=".", help="Path to repository to analyze")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--report", help="Write markdown report to file")
parser.add_argument("--threshold", type=float, default=SLOW_TEST_THRESHOLD_S,
help="Slow test threshold in seconds")
args = parser.parse_args()
global SLOW_TEST_THRESHOLD_S
SLOW_TEST_THRESHOLD_S = args.threshold
if not os.path.isdir(args.repo):
print(f"Error: {args.repo} is not a directory", file=sys.stderr)
sys.exit(1)
report = generate_report(args.repo)
if args.json:
print(json.dumps(report.to_dict(), indent=2))
else:
md = format_markdown(report)
if args.report:
os.makedirs(os.path.dirname(args.report) or ".", exist_ok=True)
with open(args.report, "w") as f:
f.write(md)
print(f"Report written to {args.report}")
else:
print(md)
# Exit code: 1 if critical bottlenecks found
if report.summary.get("critical", 0) > 0:
sys.exit(1)
if __name__ == "__main__":
main()

142
scripts/session_reader.py Normal file
View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
session_reader.py — Parse Hermes session JSONL transcripts.
Each line in a session file is a JSON object representing a message.
Standard fields: role (user|assistant|system), content (str), timestamp (str).
Tool calls and tool results are also captured.
"""
import json
import sys
from pathlib import Path
from typing import Iterator, Optional
def read_session(path: str) -> list[dict]:
"""Read a session JSONL file and return all messages as a list."""
messages = []
with open(path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
messages.append(msg)
except json.JSONDecodeError as e:
print(f"WARNING: Skipping malformed JSON at line {line_num}: {e}", file=sys.stderr)
return messages
def read_session_iter(path: str) -> Iterator[dict]:
"""Iterate over session messages without loading all into memory."""
with open(path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
print(f"WARNING: Skipping malformed JSON at line {line_num}: {e}", file=sys.stderr)
def extract_conversation(messages: list[dict]) -> list[dict]:
"""Extract user/assistant conversation turns, skipping tool-only messages."""
conversation = []
for msg in messages:
role = msg.get('role', '')
content = msg.get('content', '')
# Skip empty messages and pure tool calls
if role in ('user', 'assistant', 'system'):
if isinstance(content, str) and content.strip():
conversation.append({
'role': role,
'content': content.strip(),
'timestamp': msg.get('timestamp', '')
})
elif isinstance(content, list):
# Multimodal content — extract text parts
text_parts = []
for part in content:
if isinstance(part, dict) and part.get('type') == 'text':
text_parts.append(part.get('text', ''))
if text_parts:
conversation.append({
'role': role,
'content': '\n'.join(text_parts),
'timestamp': msg.get('timestamp', '')
})
return conversation
def truncate_for_context(messages: list[dict], head: int = 50, tail: int = 50) -> list[dict]:
"""Truncate long sessions: keep first N + last N messages.
This preserves session start (initial context) and end (final results),
skipping the messy middle of long debugging sessions.
"""
if len(messages) <= head + tail:
return messages
truncated = messages[:head]
truncated.append({
'role': 'system',
'content': f'[{len(messages) - head - tail} messages truncated]',
'timestamp': ''
})
truncated.extend(messages[-tail:])
return truncated
def messages_to_text(messages: list[dict]) -> str:
"""Convert message list to plain text for LLM consumption."""
lines = []
for msg in messages:
role = msg.get('role', 'unknown').upper()
content = msg.get('content', '')
if msg.get('role') == 'system' and 'truncated' in content:
lines.append(f'--- {content} ---')
else:
lines.append(f'{role}: {content}')
return '\n\n'.join(lines)
def get_session_metadata(path: str) -> dict:
"""Extract metadata from a session file (first message often has config info)."""
messages = read_session(path)
if not messages:
return {'path': path, 'message_count': 0}
first = messages[0]
last = messages[-1]
return {
'path': path,
'message_count': len(messages),
'first_timestamp': first.get('timestamp', ''),
'last_timestamp': last.get('timestamp', ''),
'first_role': first.get('role', ''),
'has_tool_calls': any(m.get('tool_calls') for m in messages),
}
if __name__ == '__main__':
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <session.jsonl>")
sys.exit(1)
path = sys.argv[1]
meta = get_session_metadata(path)
print(json.dumps(meta, indent=2))
messages = read_session(path)
conv = extract_conversation(messages)
print(f"\nConversation: {len(conv)} turns")
truncated = truncate_for_context(conv)
print(f"After truncation: {len(truncated)} turns")
print(f"\nPreview (first 500 chars):")
print(messages_to_text(truncated[:5])[:500])

View File

@@ -1,212 +0,0 @@
#!/usr/bin/env python3
"""
Comprehensive test script for knowledge extraction prompt.
Validates prompt structure, requirements, and consistency.
"""
import json
import re
from pathlib import Path
def test_prompt_structure():
"""Test that the prompt has the required structure."""
prompt_path = Path("templates/harvest-prompt.md")
if not prompt_path.exists():
return False, "harvest-prompt.md not found"
content = prompt_path.read_text()
# Check for required sections
required_sections = [
"System Prompt",
"Instructions",
"Categories",
"Output Format",
"Confidence Scoring",
"Constraints",
"Example"
]
for section in required_sections:
if section.lower() not in content.lower():
return False, f"Missing required section: {section}"
# Check for required categories
required_categories = ["fact", "pitfall", "pattern", "tool-quirk", "question"]
for category in required_categories:
if category not in content:
return False, f"Missing required category: {category}"
# Check for required output fields
required_fields = ["fact", "category", "repo", "confidence"]
for field in required_fields:
if field not in content:
return False, f"Missing required output field: {field}"
# Check prompt size (should be ~1k tokens, roughly 4k chars)
if len(content) > 5000:
return False, f"Prompt too large: {len(content)} chars (max ~5000)"
if len(content) < 1000:
return False, f"Prompt too small: {len(content)} chars (min ~1000)"
return True, "Prompt structure is valid"
def test_confidence_scoring():
"""Test that confidence scoring is properly defined."""
prompt_path = Path("templates/harvest-prompt.md")
content = prompt_path.read_text()
# Check for confidence scale definitions
confidence_levels = [
("0.9-1.0", "explicitly stated"),
("0.7-0.8", "clearly implied"),
("0.5-0.6", "suggested"),
("0.3-0.4", "inferred"),
("0.1-0.2", "speculative")
]
for level, description in confidence_levels:
if level not in content:
return False, f"Missing confidence level: {level}"
if description.lower() not in content.lower():
return False, f"Missing confidence description: {description}"
return True, "Confidence scoring is properly defined"
def test_example_quality():
"""Test that examples are clear and complete."""
prompt_path = Path("templates/harvest-prompt.md")
content = prompt_path.read_text()
# Check for example input/output
if "example" not in content.lower():
return False, "No examples provided"
# Check that example includes all categories
example_section = content[content.lower().find("example"):]
# Look for JSON example
json_match = re.search(r'\{[\s\S]*"knowledge"[\s\S]*\}', example_section)
if not json_match:
return False, "No JSON example found"
example_json = json_match.group(0)
# Check for all categories in example
for category in ["fact", "pitfall", "pattern", "tool-quirk", "question"]:
if category not in example_json:
return False, f"Example missing category: {category}"
return True, "Examples are clear and complete"
def test_constraint_coverage():
"""Test that constraints cover all requirements."""
prompt_path = Path("templates/harvest-prompt.md")
content = prompt_path.read_text()
required_constraints = [
"No hallucination",
"only extract",
"explicitly",
"partial",
"failed sessions",
"1k tokens"
]
for constraint in required_constraints:
if constraint.lower() not in content.lower():
return False, f"Missing constraint: {constraint}"
return True, "Constraints cover all requirements"
def test_test_sessions():
"""Test that test sessions exist and are valid."""
test_sessions_dir = Path("test_sessions")
if not test_sessions_dir.exists():
return False, "test_sessions directory not found"
session_files = list(test_sessions_dir.glob("*.jsonl"))
if len(session_files) < 5:
return False, f"Only {len(session_files)} test sessions found, need 5"
# Check each session file
for session_file in session_files:
content = session_file.read_text()
lines = content.strip().split("\n")
# Check that each line is valid JSON
for i, line in enumerate(lines, 1):
try:
json.loads(line)
except json.JSONDecodeError as e:
return False, f"Invalid JSON in {session_file.name}, line {i}: {e}"
return True, f"Found {len(session_files)} valid test sessions"
def run_all_tests():
"""Run all tests and return results."""
tests = [
("Prompt Structure", test_prompt_structure),
("Confidence Scoring", test_confidence_scoring),
("Example Quality", test_example_quality),
("Constraint Coverage", test_constraint_coverage),
("Test Sessions", test_test_sessions)
]
results = []
all_passed = True
for test_name, test_func in tests:
try:
passed, message = test_func()
results.append({
"test": test_name,
"passed": passed,
"message": message
})
if not passed:
all_passed = False
except Exception as e:
results.append({
"test": test_name,
"passed": False,
"message": f"Error: {str(e)}"
})
all_passed = False
# Print results
print("=" * 60)
print("HARVEST PROMPT TEST RESULTS")
print("=" * 60)
for result in results:
status = "✓ PASS" if result["passed"] else "✗ FAIL"
print(f"{status}: {result['test']}")
print(f" {result['message']}")
print()
print("=" * 60)
if all_passed:
print("ALL TESTS PASSED!")
else:
print("SOME TESTS FAILED!")
print("=" * 60)
return all_passed, results
if __name__ == "__main__":
all_passed, results = run_all_tests()
# Save results to file
with open("test_results.json", "w") as f:
json.dump({
"all_passed": all_passed,
"results": results,
"timestamp": "2026-04-14T19:05:00Z"
}, f, indent=2)
print(f"Results saved to test_results.json")
# Exit with appropriate code
exit(0 if all_passed else 1)

View File

@@ -0,0 +1,162 @@
#!/usr/bin/env python3
"""
Smoke test for harvester pipeline — verifies the full chain:
session_reader -> prompt -> LLM (mocked) -> validate -> deduplicate -> store
Does NOT call the real LLM. Tests plumbing only.
"""
import json
import sys
import tempfile
import os
from pathlib import Path
# Setup path
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text
from harvester import validate_fact, deduplicate, load_existing_knowledge, fact_fingerprint
def test_session_reader():
"""Test that session_reader parses JSONL correctly."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
f.write('{"role": "user", "content": "Hello", "timestamp": "2026-04-13T10:00:00Z"}\n')
f.write('{"role": "assistant", "content": "Hi there", "timestamp": "2026-04-13T10:00:01Z"}\n')
f.write('{"role": "user", "content": "Clone the repo", "timestamp": "2026-04-13T10:00:02Z"}\n')
f.write('{"role": "assistant", "content": "Cloned successfully", "timestamp": "2026-04-13T10:00:05Z"}\n')
path = f.name
messages = read_session(path)
assert len(messages) == 4, f"Expected 4 messages, got {len(messages)}"
conv = extract_conversation(messages)
assert len(conv) == 4, f"Expected 4 conversation turns, got {len(conv)}"
text = messages_to_text(conv)
assert "USER: Hello" in text
assert "ASSISTANT: Hi there" in text
truncated = truncate_for_context(conv, head=2, tail=2)
assert len(truncated) == 4 # 4 <= head+tail, so no truncation
os.unlink(path)
print(" [PASS] session_reader pipeline works")
def test_validate_fact():
"""Test fact validation."""
good = {"fact": "Gitea token is at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9}
assert validate_fact(good), "Valid fact should pass"
bad_missing = {"fact": "Something", "category": "fact"}
assert not validate_fact(bad_missing), "Missing fields should fail"
bad_category = {"fact": "Something", "category": "nonsense", "repo": "x", "confidence": 0.5}
assert not validate_fact(bad_category), "Bad category should fail"
bad_conf = {"fact": "Something", "category": "fact", "repo": "x", "confidence": 1.5}
assert not validate_fact(bad_conf), "Confidence > 1.0 should fail"
print(" [PASS] fact validation works")
def test_deduplicate():
"""Test deduplication."""
existing = [
{"fact": "Token is at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9}
]
new = [
{"fact": "Token is at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9}, # exact dup
{"fact": "Deploy uses Ansible on port 22", "category": "pattern", "repo": "fleet", "confidence": 0.8}, # unique
]
result = deduplicate(new, existing)
assert len(result) == 1, f"Expected 1 unique, got {len(result)}"
assert result[0]["fact"] == "Deploy uses Ansible on port 22"
print(" [PASS] deduplication works")
def test_knowledge_store_roundtrip():
"""Test loading and writing knowledge index."""
with tempfile.TemporaryDirectory() as tmpdir:
# Load empty index
index = load_existing_knowledge(tmpdir)
assert index["total_facts"] == 0
# Write a fact
new_facts = [{"fact": "Test fact", "category": "fact", "repo": "test", "confidence": 0.9}]
# Use harvester's write function
from harvester import write_knowledge
write_knowledge(index, new_facts, tmpdir, source_session="test.jsonl")
# Reload and verify
index2 = load_existing_knowledge(tmpdir)
assert index2["total_facts"] == 1
assert index2["facts"][0]["fact"] == "Test fact"
assert index2["facts"][0]["source_session"] == "test.jsonl"
# Check markdown was written
md_path = Path(tmpdir) / "repos" / "test.md"
assert md_path.exists(), "Markdown file should be created"
print(" [PASS] knowledge store roundtrip works")
def test_full_chain_no_llm():
"""Test the full pipeline minus the LLM call."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
f.write('{"role": "user", "content": "Clone compounding-intelligence", "timestamp": "2026-04-13T10:00:00Z"}\n')
f.write('{"role": "assistant", "content": "Cloned successfully", "timestamp": "2026-04-13T10:00:05Z"}\n')
session_path = f.name
with tempfile.TemporaryDirectory() as knowledge_dir:
# Step 1: Read
messages = read_session(session_path)
assert len(messages) == 2
# Step 2: Extract conversation
conv = extract_conversation(messages)
assert len(conv) == 2
# Step 3: Truncate
truncated = truncate_for_context(conv, head=50, tail=50)
# Step 4: Convert to text (this goes to the LLM)
transcript = messages_to_text(truncated)
assert "Clone compounding-intelligence" in transcript
# Step 5-7: Would be LLM call, validate, deduplicate
# We simulate LLM output here
mock_facts = [
{"fact": "compounding-intelligence repo was cloned", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9}
]
valid = [f for f in mock_facts if validate_fact(f)]
# Step 6: Deduplicate
index = load_existing_knowledge(knowledge_dir)
new_facts = deduplicate(valid, index.get("facts", []))
assert len(new_facts) == 1
# Step 7: Store
from harvester import write_knowledge
write_knowledge(index, new_facts, knowledge_dir, source_session=session_path)
# Verify
index2 = load_existing_knowledge(knowledge_dir)
assert index2["total_facts"] == 1
os.unlink(session_path)
print(" [PASS] full chain (reader -> validate -> dedup -> store) works")
if __name__ == "__main__":
print("Running harvester pipeline smoke tests...")
test_session_reader()
test_validate_fact()
test_deduplicate()
test_knowledge_store_roundtrip()
test_full_chain_no_llm()
print("\nAll tests passed.")

View File

@@ -1,129 +0,0 @@
#!/usr/bin/env python3
"""Tests for scripts/knowledge_staleness_check.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location("ks", os.path.join(os.path.dirname(__file__) or ".", "knowledge_staleness_check.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
check_staleness = mod.check_staleness
fix_hashes = mod.fix_hashes
compute_file_hash = mod.compute_file_hash
def test_fresh_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("print('hello')")
h = compute_file_hash(src)
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "hello", "source_file": "source.py", "source_hash": h}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "fresh"
print("PASS: test_fresh_entry")
def test_stale_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("original content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "old", "source_file": "source.py", "source_hash": "sha256:wrong"}]}, f)
# Now change the source
with open(src, "w") as f:
f.write("modified content")
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "stale"
print("PASS: test_stale_entry")
def test_missing_source():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "gone", "source_file": "nonexistent.py", "source_hash": "sha256:abc"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "missing_source"
print("PASS: test_missing_source")
def test_no_hash():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "no hash", "source_file": "source.py"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_hash"
assert results[0]["current_hash"].startswith("sha256:")
print("PASS: test_no_hash")
def test_no_source_field():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "orphan"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_source"
print("PASS: test_no_source_field")
def test_fix_hashes():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content for hashing")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "needs hash", "source_file": "source.py"}]}, f)
fixed = fix_hashes(idx, tmpdir)
assert fixed == 1
# Verify hash was added
with open(idx) as f:
data = json.load(f)
assert data["facts"][0]["source_hash"].startswith("sha256:")
print("PASS: test_fix_hashes")
def test_empty_index():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": []}, f)
results = check_staleness(idx, tmpdir)
assert results == []
print("PASS: test_empty_index")
def test_compute_hash_nonexistent():
h = compute_file_hash("/nonexistent/path/file.py")
assert h is None
print("PASS: test_compute_hash_nonexistent")
def run_all():
test_fresh_entry()
test_stale_entry()
test_missing_source()
test_no_hash()
test_no_source_field()
test_fix_hashes()
test_empty_index()
test_compute_hash_nonexistent()
print("\nAll 8 tests passed!")
if __name__ == "__main__":
run_all()

View File

@@ -1,18 +0,0 @@
{"role": "user", "content": "Deploy the application to production"}
{"role": "assistant", "content": "I'll deploy the application to production.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ssh deploy@prod-server 'cd /app && git pull && docker-compose up -d'"}}]}
{"role": "tool", "content": "ssh: connect to host prod-server port 22: Connection refused"}
{"role": "assistant", "content": "Connection refused. Let me check the server status.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ping -c 3 prod-server"}}]}
{"role": "tool", "content": "PING prod-server (192.168.1.100): 56 data bytes\nRequest timeout for icmp_seq 0\nRequest timeout for icmp_seq 1\nRequest timeout for icmp_seq 2\n--- prod-server ping statistics ---\n3 packets transmitted, 0 packets received, 100.0% packet loss"}
{"role": "assistant", "content": "The server appears to be down. Let me check if there's an alternative deployment method."}
{"role": "user", "content": "Try using the VPS instead"}
{"role": "assistant", "content": "I'll try deploying to the VPS.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ssh deploy@vps-server 'cd /app && git pull && docker-compose up -d'"}}]}
{"role": "tool", "content": "Permission denied (publickey)."}
{"role": "assistant", "content": "Permission denied. The SSH key might not be set up correctly. Let me check the SSH configuration.", "tool_calls": [{"function": "terminal", "arguments": {"command": "cat ~/.ssh/config"}}]}
{"role": "tool", "content": "Host vps-server\n HostName 167.99.126.228\n User deploy\n IdentityFile ~/.ssh/id_rsa_vps"}
{"role": "assistant", "content": "The SSH config looks correct. Let me check if the key exists.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ls -la ~/.ssh/id_rsa_vps"}}]}
{"role": "tool", "content": "ls: /Users/user/.ssh/id_rsa_vps: No such file or directory"}
{"role": "assistant", "content": "The SSH key doesn't exist. This is a pitfall - we need to generate the SSH key first."}
{"role": "user", "content": "Generate a new SSH key for the VPS"}
{"role": "assistant", "content": "I'll generate a new SSH key.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ssh-keygen -t rsa -b 4096 -f ~/.ssh/id_rsa_vps -N ''"}}]}
{"role": "tool", "content": "Generating public/private rsa key pair.\nYour identification has been saved in /Users/user/.ssh/id_rsa_vps\nYour public key has been saved in /Users/user/.ssh/id_rsa_vps.pub\nThe key fingerprint is:\nSHA256:abc123... user@machine"}
{"role": "assistant", "content": "SSH key generated. Now I need to add it to the VPS."}

View File

@@ -1,14 +0,0 @@
{"role": "user", "content": "Check the Gitea API for open issues"}
{"role": "assistant", "content": "I'll check the Gitea API for open issues.", "tool_calls": [{"function": "terminal", "arguments": {"command": "curl -H 'Authorization: token $GITEA_TOKEN' https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/compounding-intelligence/issues?state=open"}}]}
{"role": "tool", "content": "curl: (6) Could not resolve host: forge.alexanderwhitestone.com"}
{"role": "assistant", "content": "DNS resolution failed. Let me check the network connectivity.", "tool_calls": [{"function": "terminal", "arguments": {"command": "nslookup forge.alexanderwhitestone.com"}}]}
{"role": "tool", "content": "Server: 8.8.8.8\nAddress: 8.8.8.8#53\n\n** server can't find forge.alexanderwhitestone.com: NXDOMAIN"}
{"role": "assistant", "content": "The domain doesn't resolve. This might be a DNS issue. Let me try using the IP address directly.", "tool_calls": [{"function": "terminal", "arguments": {"command": "curl -H 'Authorization: token $GITEA_TOKEN' https://167.99.126.228/api/v1/repos/Timmy_Foundation/compounding-intelligence/issues?state=open"}}]}
{"role": "tool", "content": "curl: (60) SSL certificate problem: self signed certificate"}
{"role": "assistant", "content": "SSL certificate issue. The server is using a self-signed certificate. Let me try with -k flag to skip SSL verification.", "tool_calls": [{"function": "terminal", "arguments": {"command": "curl -k -H 'Authorization: token $GITEA_TOKEN' https://167.99.126.228/api/v1/repos/Timmy_Foundation/compounding-intelligence/issues?state=open"}}]}
{"role": "tool", "content": "[]"}
{"role": "assistant", "content": "No open issues found. The API returned an empty array."}
{"role": "user", "content": "Check if there are any pull requests"}
{"role": "assistant", "content": "Let me check for pull requests.", "tool_calls": [{"function": "terminal", "arguments": {"command": "curl -k -H 'Authorization: token $GITEA_TOKEN' https://167.99.126.228/api/v1/repos/Timmy_Foundation/compounding-intelligence/pulls?state=open"}}]}
{"role": "tool", "content": "[{"id": 20, "title": "feat: add harvester.py + session_reader.py — session knowledge extractor (closes #8)", "state": "open", "user": {"login": "Rockachopa"}, "created_at": "2026-04-14T17:28:20Z"}]"}
{"role": "assistant", "content": "Found 1 open pull request: PR #20 - "feat: add harvester.py + session_reader.py — session knowledge extractor (closes #8)" by Rockachopa."}

View File

@@ -1,10 +0,0 @@
{"role": "user", "content": "How do I deploy a new version of the application?"}
{"role": "assistant", "content": "Here's the deployment process:\n1. Run tests: pytest\n2. Build Docker image: docker build -t app:latest .\n3. Push to registry: docker push registry.example.com/app:latest\n4. Update production: ssh prod 'cd /app && docker-compose pull && docker-compose up -d'\n5. Verify: curl https://app.example.com/health"}
{"role": "user", "content": "What if tests fail?"}
{"role": "assistant", "content": "If tests fail:\n1. Check the error logs: pytest -v\n2. Fix the failing tests\n3. Re-run tests: pytest\n4. If still failing, check recent changes: git log --oneline -10\n5. Consider reverting: git revert HEAD"}
{"role": "user", "content": "What if the Docker build fails?"}
{"role": "assistant", "content": "If Docker build fails:\n1. Check Dockerfile syntax: docker build --no-cache -t app:latest .\n2. Check for missing dependencies: cat requirements.txt\n3. Check disk space: df -h\n4. Clean up Docker: docker system prune -f\n5. Try building with verbose output: docker build --progress=plain -t app:latest ."}
{"role": "user", "content": "What if the push to registry fails?"}
{"role": "assistant", "content": "If push to registry fails:\n1. Check registry credentials: docker login registry.example.com\n2. Check registry space: curl https://registry.example.com/v2/_catalog\n3. Check image size: docker images app:latest\n4. Try tagging with version: docker tag app:latest registry.example.com/app:v1.0.0\n5. Push with version tag: docker push registry.example.com/app:v1.0.0"}
{"role": "user", "content": "What if the production update fails?"}
{"role": "assistant", "content": "If production update fails:\n1. Check SSH connection: ssh prod 'echo connected'\n2. Check disk space on prod: ssh prod 'df -h'\n3. Check Docker status: ssh prod 'docker ps -a'\n4. Check logs: ssh prod 'docker-compose logs --tail=100'\n5. Rollback: ssh prod 'cd /app && git checkout HEAD~1 && docker-compose up -d'"}

View File

@@ -1,12 +0,0 @@
{"role": "user", "content": "I need to implement a new feature for the application"}
{"role": "assistant", "content": "I can help you implement a new feature. What feature would you like to add?"}
{"role": "user", "content": "I want to add user authentication"}
{"role": "assistant", "content": "User authentication is a good feature. I have a few questions:\n1. What authentication method do you prefer? (JWT, OAuth, Session-based)\n2. Do you need social login? (Google, GitHub, etc.)\n3. What user data do you need to store?\n4. Do you need role-based access control?\n5. What's your database? (PostgreSQL, MySQL, SQLite)"}
{"role": "user", "content": "I prefer JWT, no social login, just email and password, no roles, PostgreSQL"}
{"role": "assistant", "content": "Great choices. Here's the implementation plan:\n1. Create user model with email and password hash\n2. Create JWT token generation and validation\n3. Create login endpoint: POST /api/auth/login\n4. Create register endpoint: POST /api/auth/register\n5. Create middleware for protected routes\n6. Add password hashing with bcrypt\n7. Add rate limiting for login attempts\n8. Add password reset functionality\n9. Add email verification\n10. Test all endpoints"}
{"role": "user", "content": "What about password reset?"}
{"role": "assistant", "content": "For password reset:\n1. Create reset token table\n2. Create forgot password endpoint: POST /api/auth/forgot-password\n3. Send reset email with token\n4. Create reset password endpoint: POST /api/auth/reset-password\n5. Validate reset token and expiration\n6. Update password and delete token\n7. Send confirmation email"}
{"role": "user", "content": "What about email verification?"}
{"role": "assistant", "content": "For email verification:\n1. Add email_verified field to user model\n2. Create verification token table\n3. Send verification email on registration\n4. Create verify email endpoint: GET /api/auth/verify-email?token=...\n5. Validate token and update email_verified\n6. Add middleware to check email_verified for certain routes\n7. Add resend verification email endpoint"}
{"role": "user", "content": "What about rate limiting?"}
{"role": "assistant", "content": "For rate limiting:\n1. Use Redis for rate limit storage\n2. Create rate limit middleware\n3. Set limits: 5 login attempts per minute, 3 password resets per hour\n4. Return 429 Too Many Requests when limit exceeded\n5. Add X-RateLimit headers to responses\n6. Log rate limit violations\n7. Add whitelist for trusted IPs"}

View File

@@ -1,13 +0,0 @@
{"role": "user", "content": "Clone the repo at forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence"}
{"role": "assistant", "content": "I'll clone the repository for you.", "tool_calls": [{"function": "terminal", "arguments": {"command": "git clone https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence.git"}}]}
{"role": "tool", "content": "Cloning into 'compounding-intelligence'...\nremote: Enumerating objects: 15, done.\nremote: Counting objects: 100% (15/15), done.\nremote: Compressing objects: 100% (12/12), done.\nremote: Total 15 (delta 2), reused 0 (delta 0), pack-reused 0\nReceiving objects: 100% (15/15), done.\nResolving deltas: 100% (2/2), done."}
{"role": "assistant", "content": "Repository cloned successfully. Let me check the structure.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ls -la compounding-intelligence/"}}]}
{"role": "tool", "content": "total 8\ndrwxr-xr-x 5 user staff 160 Apr 14 17:21 .\ndrwxr-xr-x 12 user staff 384 Apr 14 17:21 ..\ndrwxr-xr-x 13 user staff 416 Apr 14 17:21 .git\n-rw-r--r-- 1 user staff 1234 Apr 14 17:21 README.md\ndrwxr-xr-x 3 user staff 96 Apr 14 17:21 knowledge\n-rw-r--r-- 1 user staff 456 Apr 14 17:21 metrics\ndrwxr-xr-x 4 user staff 128 Apr 14 17:21 scripts\ndrwxr-xr-x 3 user staff 96 Apr 14 17:21 templates"}
{"role": "user", "content": "What files are in the templates directory?"}
{"role": "assistant", "content": "Let me check the templates directory.", "tool_calls": [{"function": "terminal", "arguments": {"command": "ls -la compounding-intelligence/templates/"}}]}
{"role": "tool", "content": "total 0\ndrwxr-xr-x 3 user staff 96 Apr 14 17:21 .\ndrwxr-xr-x 5 user staff 160 Apr 14 17:21 ..\n-rw-r--r-- 1 user staff 0 Apr 14 17:21 .gitkeep"}
{"role": "assistant", "content": "The templates directory currently only contains a .gitkeep file. No templates have been added yet."}
{"role": "user", "content": "Create a harvest-prompt.md template in the templates directory."}
{"role": "assistant", "content": "I'll create the harvest-prompt.md template for you.", "tool_calls": [{"function": "terminal", "arguments": {"command": "cat > compounding-intelligence/templates/harvest-prompt.md << 'EOF'\n# Knowledge Extraction Prompt\n\nExtract knowledge from session transcripts.\nEOF"}}]}
{"role": "tool", "content": ""}
{"role": "assistant", "content": "Template created successfully. The harvest-prompt.md file is now in the templates directory."}

View File

@@ -1,319 +0,0 @@
#!/usr/bin/env python3
"""
Tests for Performance Bottleneck Finder.
"""
import json
import os
import tempfile
import textwrap
from pathlib import Path
import pytest
# Add scripts to path
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts"))
from perf_bottleneck_finder import (
Bottleneck,
PerfReport,
find_slow_tests_by_scan,
analyze_build_artifacts,
analyze_makefile_targets,
find_slow_imports,
generate_report,
format_markdown,
severity_sort_key,
)
class TestBottleneck:
"""Test Bottleneck dataclass."""
def test_creation(self):
b = Bottleneck(
category="test",
name="test_foo",
duration_s=5.0,
severity="warning",
recommendation="Mock it"
)
assert b.category == "test"
assert b.name == "test_foo"
assert b.duration_s == 5.0
assert b.severity == "warning"
assert b.recommendation == "Mock it"
assert b.file_path is None
assert b.line_number is None
def test_with_location(self):
b = Bottleneck(
category="test",
name="test_bar",
duration_s=2.0,
severity="info",
recommendation="Consider",
file_path="tests/test_bar.py",
line_number=42
)
assert b.file_path == "tests/test_bar.py"
assert b.line_number == 42
def test_to_dict(self):
b = Bottleneck("test", "x", 1.0, "info", "y")
d = b.__dict__
assert "category" in d
assert "duration_s" in d
class TestPerfReport:
"""Test PerfReport dataclass."""
def test_creation(self):
report = PerfReport(
timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo"
)
assert report.timestamp == "2026-01-01T00:00:00Z"
assert report.bottlenecks == []
assert report.summary == {}
def test_to_dict(self):
report = PerfReport(
timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo",
bottlenecks=[Bottleneck("test", "x", 1.0, "info", "y")]
)
d = report.to_dict()
assert "bottlenecks" in d
assert len(d["bottlenecks"]) == 1
class TestSeveritySort:
"""Test severity sorting."""
def test_critical_first(self):
items = [
Bottleneck("test", "a", 1.0, "info", ""),
Bottleneck("test", "b", 0.5, "critical", ""),
Bottleneck("test", "c", 2.0, "warning", ""),
]
items.sort(key=severity_sort_key)
assert items[0].severity == "critical"
assert items[1].severity == "warning"
assert items[2].severity == "info"
def test_duration_within_severity(self):
items = [
Bottleneck("test", "slow", 10.0, "warning", ""),
Bottleneck("test", "fast", 1.0, "warning", ""),
]
items.sort(key=severity_sort_key)
assert items[0].name == "slow" # Higher duration first within same severity
class TestSlowTestScan:
"""Test slow test pattern scanning."""
def test_finds_sleep(self, tmp_path):
test_file = tmp_path / "test_sleepy.py"
test_file.write_text(textwrap.dedent('''
import time
def test_slow():
time.sleep(5)
assert True
'''))
bottlenecks = find_slow_tests_by_scan(str(tmp_path))
assert len(bottlenecks) >= 1
assert any("sleep" in b.recommendation.lower() for b in bottlenecks)
def test_finds_http_calls(self, tmp_path):
test_file = tmp_path / "test_http.py"
test_file.write_text(textwrap.dedent('''
import requests
def test_api():
resp = requests.get("https://example.com")
assert resp.status_code == 200
'''))
bottlenecks = find_slow_tests_by_scan(str(tmp_path))
assert len(bottlenecks) >= 1
assert any("HTTP" in b.recommendation or "mock" in b.recommendation.lower() for b in bottlenecks)
def test_skips_non_test_files(self, tmp_path):
src_file = tmp_path / "main.py"
src_file.write_text("import time\ntime.sleep(10)\n")
bottlenecks = find_slow_tests_by_scan(str(tmp_path))
assert len(bottlenecks) == 0
def test_handles_missing_dir(self):
bottlenecks = find_slow_tests_by_scan("/nonexistent/path")
assert bottlenecks == []
def test_file_path_populated(self, tmp_path):
test_file = tmp_path / "test_example.py"
test_file.write_text("import time\n\ndef test_it():\n time.sleep(2)\n")
bottlenecks = find_slow_tests_by_scan(str(tmp_path))
assert len(bottlenecks) >= 1
assert bottlenecks[0].file_path is not None
assert bottlenecks[0].line_number is not None
class TestBuildArtifacts:
"""Test build artifact analysis."""
def test_finds_large_node_modules(self, tmp_path):
nm = tmp_path / "node_modules"
nm.mkdir()
# Create a file > 10MB
big_file = nm / "big.txt"
big_file.write_bytes(b"x" * (11 * 1024 * 1024))
bottlenecks = analyze_build_artifacts(str(tmp_path))
assert len(bottlenecks) >= 1
assert any("node_modules" in b.name for b in bottlenecks)
def test_ignores_small_dirs(self, tmp_path):
nm = tmp_path / "node_modules"
nm.mkdir()
small_file = nm / "small.txt"
small_file.write_bytes(b"x" * 100)
bottlenecks = analyze_build_artifacts(str(tmp_path))
assert not any("node_modules" in b.name for b in bottlenecks)
def test_finds_pycache(self, tmp_path):
cache = tmp_path / "__pycache__"
cache.mkdir()
big_file = cache / "big.pyc"
big_file.write_bytes(b"x" * (11 * 1024 * 1024))
bottlenecks = analyze_build_artifacts(str(tmp_path))
assert any("__pycache__" in b.name for b in bottlenecks)
class TestMakefileAnalysis:
"""Test Makefile analysis."""
def test_finds_pip_install(self, tmp_path):
makefile = tmp_path / "Makefile"
makefile.write_text(textwrap.dedent('''
install:
pip install -r requirements.txt
test:
pytest
'''))
bottlenecks = analyze_makefile_targets(str(tmp_path))
assert len(bottlenecks) >= 1
def test_no_makefile(self, tmp_path):
bottlenecks = analyze_makefile_targets(str(tmp_path))
assert bottlenecks == []
class TestImportAnalysis:
"""Test heavy import detection."""
def test_finds_pandas(self, tmp_path):
src = tmp_path / "analysis.py"
src.write_text("import pandas as pd\n")
bottlenecks = find_slow_imports(str(tmp_path))
assert len(bottlenecks) >= 1
assert any("pandas" in b.name for b in bottlenecks)
def test_finds_torch(self, tmp_path):
src = tmp_path / "model.py"
src.write_text("import torch\n")
bottlenecks = find_slow_imports(str(tmp_path))
assert any("torch" in b.name for b in bottlenecks)
def test_skips_light_imports(self, tmp_path):
src = tmp_path / "utils.py"
src.write_text("import json\nimport os\nimport sys\n")
bottlenecks = find_slow_imports(str(tmp_path))
assert len(bottlenecks) == 0
class TestGenerateReport:
"""Test full report generation."""
def test_empty_repo(self, tmp_path):
report = generate_report(str(tmp_path))
assert report.summary["total_bottlenecks"] >= 0
assert "critical" in report.summary
assert "warning" in report.summary
def test_with_findings(self, tmp_path):
# Create a test file with issues
test_file = tmp_path / "test_slow.py"
test_file.write_text(textwrap.dedent('''
import time
import requests
def test_sleepy():
time.sleep(3)
def test_http():
requests.get("https://example.com")
'''))
report = generate_report(str(tmp_path))
assert report.summary["total_bottlenecks"] >= 2
assert len(report.bottlenecks) > 0
def test_summary_categories(self, tmp_path):
report = generate_report(str(tmp_path))
assert "by_category" in report.summary
class TestMarkdownReport:
"""Test markdown output."""
def test_format(self):
report = PerfReport(
timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo",
bottlenecks=[
Bottleneck("test", "slow_test", 5.0, "critical", "Fix it")
],
summary={
"total_bottlenecks": 1,
"critical": 1,
"warning": 0,
"info": 0,
"estimated_total_slowdown_s": 5.0,
"by_category": {"test": 1},
}
)
md = format_markdown(report)
assert "# Performance Bottleneck Report" in md
assert "slow_test" in md
assert "🔴" in md
assert "Fix it" in md
def test_empty_report(self):
report = PerfReport(
timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo",
summary={
"total_bottlenecks": 0,
"critical": 0,
"warning": 0,
"info": 0,
"estimated_total_slowdown_s": 0,
"by_category": {},
}
)
md = format_markdown(report)
assert "Total bottlenecks:** 0" in md