Compare commits

..

5 Commits

Author SHA1 Message Date
b3592e14ad test: add tests for Performance Bottleneck Finder
Refs #171
2026-04-15 14:48:59 +00:00
be805a1b4c feat: add Performance Bottleneck Finder (#171)
Analyzes: slow tests, build artifacts, CI workflows, heavy imports.
Outputs: markdown report or JSON. Designed for weekly cron.

Closes #171
2026-04-15 14:47:27 +00:00
e6f1b07f16 Merge pull request 'feat: Knowledge store staleness detector (closes #179)' (#185) from feat/179-staleness-check into main 2026-04-15 06:09:14 +00:00
81c02f6709 feat: Add staleness detector tests (closes #179) 2026-04-15 04:00:46 +00:00
c2c3c6a3b9 feat: Add knowledge staleness detector (closes #179) 2026-04-15 04:00:12 +00:00
5 changed files with 1130 additions and 353 deletions

View File

@@ -0,0 +1,131 @@
#!/usr/bin/env python3
"""
Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes.
Usage:
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
"""
import argparse
import hashlib
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
try:
with open(filepath, "rb") as f:
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return None
def check_staleness(index_path: str, repo_root: str = ".") -> List[Dict[str, Any]]:
"""Check all entries in knowledge index for staleness.
Returns list of entries with staleness info:
- status: "fresh" | "stale" | "missing_source" | "no_hash"
- current_hash: computed hash (if source exists)
- stored_hash: hash from index
"""
with open(index_path) as f:
data = json.load(f)
facts = data.get("facts", [])
results = []
for entry in facts:
source_file = entry.get("source_file")
stored_hash = entry.get("source_hash")
if not source_file:
results.append({**entry, "status": "no_source", "current_hash": None})
continue
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash is None:
results.append({**entry, "status": "missing_source", "current_hash": None})
elif not stored_hash:
results.append({**entry, "status": "no_hash", "current_hash": current_hash})
elif current_hash != stored_hash:
results.append({**entry, "status": "stale", "current_hash": current_hash})
else:
results.append({**entry, "status": "fresh", "current_hash": current_hash})
return results
def fix_hashes(index_path: str, repo_root: str = ".") -> int:
"""Add hashes to entries missing them. Returns count of fixed entries."""
with open(index_path) as f:
data = json.load(f)
fixed = 0
for entry in data.get("facts", []):
if entry.get("source_hash"):
continue
source_file = entry.get("source_file")
if not source_file:
continue
full_path = os.path.join(repo_root, source_file)
h = compute_file_hash(full_path)
if h:
entry["source_hash"] = h
fixed += 1
with open(index_path, "w") as f:
json.dump(data, f, indent=2)
return fixed
def main():
parser = argparse.ArgumentParser(description="Check knowledge store staleness")
parser.add_argument("--index", required=True, help="Path to knowledge/index.json")
parser.add_argument("--repo", default=".", help="Repo root for source file resolution")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--fix", action="store_true", help="Add hashes to entries missing them")
args = parser.parse_args()
if args.fix:
fixed = fix_hashes(args.index, args.repo)
print(f"Fixed {fixed} entries with missing hashes.")
return
results = check_staleness(args.index, args.repo)
if args.json:
print(json.dumps(results, indent=2))
else:
stale = [r for r in results if r["status"] != "fresh"]
fresh = [r for r in results if r["status"] == "fresh"]
print(f"Knowledge Store Staleness Check")
print(f" Total entries: {len(results)}")
print(f" Fresh: {len(fresh)}")
print(f" Stale/Issues: {len(stale)}")
print()
if stale:
print("Issues found:")
for r in stale:
status = r["status"]
fact = r.get("fact", "?")[:60]
source = r.get("source_file", "?")
print(f" [{status}] {source}: {fact}")
else:
print("All entries are fresh!")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,551 @@
#!/usr/bin/env python3
"""
Performance Bottleneck Finder — Identify slow tests, builds, and CI steps.
Analyzes:
1. Pytest output for slow tests
2. Build logs for slow steps
3. CI workflow durations
4. File system for large/slow artifacts
Usage:
python3 scripts/perf_bottleneck_finder.py --repo /path/to/repo
python3 scripts/perf_bottleneck_finder.py --repo /path/to/repo --json
python3 scripts/perf_bottleneck_finder.py --repo /path/to/repo --report metrics/perf_report.md
Weekly cron:
0 9 * * 1 cd /path/to/compounding-intelligence && python3 scripts/perf_bottleneck_finder.py --repo /path/to/target --report metrics/perf_report.md
"""
import argparse
import json
import os
import re
import subprocess
import sys
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# ── Configuration ──────────────────────────────────────────────────
SLOW_TEST_THRESHOLD_S = 2.0 # Tests slower than this are flagged
SLOW_BUILD_STEP_THRESHOLD_S = 10.0
TOP_N_BOTTLENECKS = 10 # Report top N bottlenecks
PYTEST_DURATIONS_COUNT = 20 # Number of slow tests to collect
LOG_EXTENSIONS = {".log", ".txt"}
@dataclass
class Bottleneck:
"""A single performance bottleneck."""
category: str # "test", "build", "ci", "artifact", "import"
name: str # What's slow
duration_s: float # How long it takes
severity: str # "critical", "warning", "info"
recommendation: str # How to fix
file_path: Optional[str] = None
line_number: Optional[int] = None
@dataclass
class PerfReport:
"""Full performance report."""
timestamp: str
repo_path: str
bottlenecks: List[Bottleneck] = field(default_factory=list)
summary: Dict[str, Any] = field(default_factory=dict)
test_stats: Dict[str, Any] = field(default_factory=dict)
build_stats: Dict[str, Any] = field(default_factory=dict)
ci_stats: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict:
d = asdict(self)
return d
# ── Test Analysis ──────────────────────────────────────────────────
def find_slow_tests_pytest(repo_path: str) -> List[Bottleneck]:
"""Run pytest --durations and parse slow tests."""
bottlenecks = []
# Try to run pytest with durations
try:
result = subprocess.run(
["python3", "-m", "pytest", "--co", "-q", "--durations=0"],
cwd=repo_path, capture_output=True, text=True, timeout=30
)
# If tests exist, try to get durations from last run
durations_file = os.path.join(repo_path, ".pytest_cache", "v", "cache", "durations")
if os.path.exists(durations_file):
with open(durations_file) as f:
for line in f:
parts = line.strip().split()
if len(parts) >= 2:
try:
duration = float(parts[0])
test_name = " ".join(parts[1:])
if duration > SLOW_TEST_THRESHOLD_S:
severity = "critical" if duration > 10 else "warning"
bottlenecks.append(Bottleneck(
category="test",
name=test_name,
duration_s=duration,
severity=severity,
recommendation=f"Test takes {duration:.1f}s. Consider mocking slow I/O, using fixtures, or marking with @pytest.mark.slow."
))
except ValueError:
continue
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return bottlenecks
def find_slow_tests_by_scan(repo_path: str) -> List[Bottleneck]:
"""Scan test files for patterns that indicate slow tests."""
bottlenecks = []
test_patterns = [
(r"time\.sleep\((\d+(?:\.\d+)?)\)", "Contains time.sleep() — consider using mock or async wait"),
(r"subprocess\.run\(.*timeout=(\d+)", "Subprocess with timeout — may block test"),
(r"requests\.(get|post|put|delete)\(", "Real HTTP call — mock with responses or httpretty"),
(r"open\([^)]*['"]w['"]", "File I/O in test — use tmp_path fixture"),
]
for root, dirs, files in os.walk(repo_path):
# Skip hidden and cache dirs
dirs[:] = [d for d in dirs if not d.startswith(('.', '__pycache__', 'node_modules', '.git'))]
for fname in files:
if not (fname.startswith("test_") or fname.endswith("_test.py")):
continue
if not fname.endswith(".py"):
continue
fpath = os.path.join(root, fname)
rel_path = os.path.relpath(fpath, repo_path)
try:
with open(fpath) as f:
lines = f.readlines()
except (PermissionError, UnicodeDecodeError):
continue
for i, line in enumerate(lines):
for pattern, recommendation in test_patterns:
match = re.search(pattern, line)
if match:
duration = 1.0 # Default estimate
if "sleep" in pattern:
try:
duration = float(match.group(1))
except (ValueError, IndexError):
duration = 1.0
elif "timeout" in pattern:
try:
duration = float(match.group(1))
except (ValueError, IndexError):
duration = 10.0
else:
duration = 2.0 # Estimated
bottlenecks.append(Bottleneck(
category="test",
name=f"{rel_path}:{i+1}",
duration_s=duration,
severity="warning" if duration < 5 else "critical",
recommendation=recommendation,
file_path=rel_path,
line_number=i + 1
))
return bottlenecks
# ── Build Analysis ─────────────────────────────────────────────────
def analyze_build_artifacts(repo_path: str) -> List[Bottleneck]:
"""Find large build artifacts that slow down builds."""
bottlenecks = []
large_dirs = {
"node_modules": "Consider using npm ci --production or yarn --production",
"__pycache__": "Consider .gitignore and cleaning before builds",
".tox": "Consider caching tox environments",
".pytest_cache": "Consider cleaning between CI runs",
"dist": "Check if dist/ artifacts are being rebuilt unnecessarily",
"build": "Check if build/ artifacts are being rebuilt unnecessarily",
".next": "Next.js cache — consider incremental builds",
"venv": "Virtual env in repo — move outside or use Docker",
}
for dirname, recommendation in large_dirs.items():
dirpath = os.path.join(repo_path, dirname)
if os.path.isdir(dirpath):
total_size = 0
file_count = 0
for root, dirs, files in os.walk(dirpath):
for f in files:
try:
fpath = os.path.join(root, f)
total_size += os.path.getsize(fpath)
file_count += 1
except OSError:
pass
if total_size > 10 * 1024 * 1024: # > 10MB
size_mb = total_size / (1024 * 1024)
bottlenecks.append(Bottleneck(
category="build",
name=f"{dirname}/ ({size_mb:.1f}MB, {file_count} files)",
duration_s=size_mb * 0.5, # Rough estimate
severity="critical" if size_mb > 100 else "warning",
recommendation=recommendation
))
return bottlenecks
def analyze_makefile_targets(repo_path: str) -> List[Bottleneck]:
"""Analyze Makefile for potentially slow targets."""
bottlenecks = []
makefiles = []
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith(('.', '__pycache__'))]
for f in files:
if f in ("Makefile", "makefile", "GNUmakefile"):
makefiles.append(os.path.join(root, f))
slow_patterns = [
(r"pip install", "pip install without --no-deps or constraints"),
(r"npm install(?!.*--production)", "npm install without --production flag"),
(r"docker build", "Docker build — consider multi-stage and layer caching"),
(r"pytest(?!.*-x|--maxfail)", "pytest without early exit on failure"),
(r"mypy|mypy --strict", "Type checking — consider incremental mode"),
]
for mfile in makefiles:
rel_path = os.path.relpath(mfile, repo_path)
try:
with open(mfile) as f:
content = f.read()
except (PermissionError, UnicodeDecodeError):
continue
for pattern, recommendation in slow_patterns:
if re.search(pattern, content):
bottlenecks.append(Bottleneck(
category="build",
name=f"{rel_path}: {pattern}",
duration_s=5.0,
severity="info",
recommendation=recommendation,
file_path=rel_path
))
return bottlenecks
# ── CI Analysis ────────────────────────────────────────────────────
def analyze_github_actions(repo_path: str) -> List[Bottleneck]:
"""Analyze GitHub Actions workflow files for inefficiencies."""
bottlenecks = []
workflow_dir = os.path.join(repo_path, ".github", "workflows")
if not os.path.isdir(workflow_dir):
return bottlenecks
slow_patterns = [
(r"runs-on:\s*ubuntu-latest", 0, "Consider caching dependencies between runs"),
(r"npm install", 2, "Use npm ci instead of npm install for reproducible builds"),
(r"pip install(?!.*--cache-dir)", 2, "Add --cache-dir or use pip cache action"),
(r"docker build(?!.*--cache-from)", 5, "Use Docker layer caching"),
(r"python -m pytest(?!.*-n|--numprocesses)", 3, "Consider pytest-xdist for parallel test execution"),
]
for fname in os.listdir(workflow_dir):
if not fname.endswith(('.yml', '.yaml')):
continue
fpath = os.path.join(workflow_dir, fname)
try:
with open(fpath) as f:
content = f.read()
except (PermissionError, UnicodeDecodeError):
continue
for pattern, est_savings, recommendation in slow_patterns:
if re.search(pattern, content):
bottlenecks.append(Bottleneck(
category="ci",
name=f"{fname}: {pattern}",
duration_s=est_savings,
severity="info",
recommendation=recommendation,
file_path=f".github/workflows/{fname}"
))
return bottlenecks
def analyze_gitea_ci(repo_path: str) -> List[Bottleneck]:
"""Analyze Gitea/Drone CI config files."""
bottlenecks = []
ci_files = [".gitea/workflows", ".drone.yml", ".woodpecker.yml"]
for ci_path in ci_files:
full_path = os.path.join(repo_path, ci_path)
if os.path.isfile(full_path):
try:
with open(full_path) as f:
content = f.read()
except (PermissionError, UnicodeDecodeError):
continue
if "pip install" in content and "--cache-dir" not in content:
bottlenecks.append(Bottleneck(
category="ci",
name=f"{ci_path}: pip without cache",
duration_s=5.0,
severity="warning",
recommendation="Add --cache-dir or mount pip cache volume",
file_path=ci_path
))
elif os.path.isdir(full_path):
for fname in os.listdir(full_path):
if not fname.endswith(('.yml', '.yaml')):
continue
fpath = os.path.join(full_path, fname)
try:
with open(fpath) as f:
content = f.read()
except (PermissionError, UnicodeDecodeError):
continue
if "pip install" in content and "--cache-dir" not in content:
bottlenecks.append(Bottleneck(
category="ci",
name=f"{ci_path}/{fname}: pip without cache",
duration_s=5.0,
severity="warning",
recommendation="Add --cache-dir or mount pip cache volume",
file_path=f"{ci_path}/{fname}"
))
return bottlenecks
# ── Import Analysis ────────────────────────────────────────────────
def find_slow_imports(repo_path: str) -> List[Bottleneck]:
"""Find Python files with heavy import chains."""
bottlenecks = []
heavy_imports = {
"pandas": 0.5,
"numpy": 0.3,
"torch": 2.0,
"tensorflow": 3.0,
"scipy": 0.5,
"matplotlib": 0.8,
"sklearn": 0.5,
"transformers": 1.5,
}
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith(('.', '__pycache__', 'node_modules'))]
for fname in files:
if not fname.endswith(".py"):
continue
fpath = os.path.join(root, fname)
rel_path = os.path.relpath(fpath, repo_path)
try:
with open(fpath) as f:
lines = f.readlines()
except (PermissionError, UnicodeDecodeError):
continue
for i, line in enumerate(lines):
stripped = line.strip()
if stripped.startswith("import ") or stripped.startswith("from "):
for heavy, est_time in heavy_imports.items():
if heavy in stripped:
bottlenecks.append(Bottleneck(
category="import",
name=f"{rel_path}:{i+1}: import {heavy}",
duration_s=est_time,
severity="info" if est_time < 1.0 else "warning",
recommendation=f"Heavy import ({heavy} ~{est_time}s). Consider lazy import or conditional import.",
file_path=rel_path,
line_number=i + 1
))
return bottlenecks
# ── Report Generation ──────────────────────────────────────────────
def severity_sort_key(b: Bottleneck) -> Tuple[int, float]:
"""Sort by severity then duration."""
sev_order = {"critical": 0, "warning": 1, "info": 2}
return (sev_order.get(b.severity, 3), -b.duration_s)
def generate_report(repo_path: str) -> PerfReport:
"""Run all analyses and generate a performance report."""
report = PerfReport(
timestamp=datetime.now(timezone.utc).isoformat(),
repo_path=os.path.abspath(repo_path)
)
# Collect all bottlenecks
all_bottlenecks = []
print("Scanning for slow tests (pytest cache)...")
all_bottlenecks.extend(find_slow_tests_pytest(repo_path))
print("Scanning for slow test patterns...")
all_bottlenecks.extend(find_slow_tests_by_scan(repo_path))
print("Analyzing build artifacts...")
all_bottlenecks.extend(analyze_build_artifacts(repo_path))
print("Analyzing Makefiles...")
all_bottlenecks.extend(analyze_makefile_targets(repo_path))
print("Analyzing CI workflows...")
all_bottlenecks.extend(analyze_github_actions(repo_path))
all_bottlenecks.extend(analyze_gitea_ci(repo_path))
print("Scanning for heavy imports...")
all_bottlenecks.extend(find_slow_imports(repo_path))
# Sort by severity and duration
all_bottlenecks.sort(key=severity_sort_key)
report.bottlenecks = all_bottlenecks[:TOP_N_BOTTLENECKS * 2] # Keep more for stats
# Compute summary
by_category = defaultdict(list)
for b in all_bottlenecks:
by_category[b.category].append(b)
report.summary = {
"total_bottlenecks": len(all_bottlenecks),
"critical": sum(1 for b in all_bottlenecks if b.severity == "critical"),
"warning": sum(1 for b in all_bottlenecks if b.severity == "warning"),
"info": sum(1 for b in all_bottlenecks if b.severity == "info"),
"estimated_total_slowdown_s": sum(b.duration_s for b in all_bottlenecks),
"by_category": {cat: len(items) for cat, items in by_category.items()},
}
report.test_stats = {
"slow_tests": len(by_category.get("test", [])),
"total_estimated_s": sum(b.duration_s for b in by_category.get("test", [])),
}
report.build_stats = {
"build_issues": len(by_category.get("build", [])),
"total_estimated_s": sum(b.duration_s for b in by_category.get("build", [])),
}
report.ci_stats = {
"ci_issues": len(by_category.get("ci", [])),
"total_estimated_s": sum(b.duration_s for b in by_category.get("ci", [])),
}
return report
def format_markdown(report: PerfReport) -> str:
"""Format report as markdown."""
lines = []
lines.append(f"# Performance Bottleneck Report")
lines.append(f"")
lines.append(f"Generated: {report.timestamp}")
lines.append(f"Repository: {report.repo_path}")
lines.append(f"")
# Summary
s = report.summary
lines.append(f"## Summary")
lines.append(f"")
lines.append(f"- **Total bottlenecks:** {s['total_bottlenecks']}")
lines.append(f"- **Critical:** {s['critical']} | **Warning:** {s['warning']} | **Info:** {s['info']}")
lines.append(f"- **Estimated total slowdown:** {s['estimated_total_slowdown_s']:.1f}s")
lines.append(f"- **By category:** {', '.join(f'{k}: {v}' for k, v in s['by_category'].items())}")
lines.append(f"")
# Top bottlenecks
lines.append(f"## Top {min(TOP_N_BOTTLENECKS, len(report.bottlenecks))} Bottlenecks")
lines.append(f"")
for i, b in enumerate(report.bottlenecks[:TOP_N_BOTTLENECKS], 1):
icon = {"critical": "🔴", "warning": "🟡", "info": "🔵"}.get(b.severity, "")
loc = f" ({b.file_path}:{b.line_number})" if b.file_path else ""
lines.append(f"{i}. {icon} **{b.category}** — {b.name}{loc}")
lines.append(f" - Duration: ~{b.duration_s:.1f}s | Severity: {b.severity}")
lines.append(f" - Fix: {b.recommendation}")
lines.append(f"")
# Category breakdowns
for cat in ["test", "build", "ci", "import"]:
items = [b for b in report.bottlenecks if b.category == cat]
if items:
lines.append(f"## {cat.title()} Bottlenecks")
lines.append(f"")
for b in items:
icon = {"critical": "🔴", "warning": "🟡", "info": "🔵"}.get(b.severity, "")
loc = f" ({b.file_path}:{b.line_number})" if b.file_path else ""
lines.append(f"- {icon} {b.name}{loc} — ~{b.duration_s:.1f}s — {b.recommendation}")
lines.append(f"")
return "
".join(lines)
# ── Main ───────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Performance Bottleneck Finder")
parser.add_argument("--repo", default=".", help="Path to repository to analyze")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--report", help="Write markdown report to file")
parser.add_argument("--threshold", type=float, default=SLOW_TEST_THRESHOLD_S,
help="Slow test threshold in seconds")
args = parser.parse_args()
global SLOW_TEST_THRESHOLD_S
SLOW_TEST_THRESHOLD_S = args.threshold
if not os.path.isdir(args.repo):
print(f"Error: {args.repo} is not a directory", file=sys.stderr)
sys.exit(1)
report = generate_report(args.repo)
if args.json:
print(json.dumps(report.to_dict(), indent=2))
else:
md = format_markdown(report)
if args.report:
os.makedirs(os.path.dirname(args.report) or ".", exist_ok=True)
with open(args.report, "w") as f:
f.write(md)
print(f"Report written to {args.report}")
else:
print(md)
# Exit code: 1 if critical bottlenecks found
if report.summary.get("critical", 0) > 0:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,353 +0,0 @@
#!/usr/bin/env python3
"""
sampler.py — Score and rank sessions by harvest value.
With 20k+ sessions on disk, we can't harvest all at once. This script
scores each session by how likely it is to contain valuable knowledge,
so the harvester processes the best ones first.
Scoring strategy:
- Recency: last 7d=3pts, last 30d=2pts, older=1pt
- Length: >50 messages=3pts, >20=2pts, <20=1pt
- Repo uniqueness: first session for a repo=5pts, otherwise=1pt
- Outcome: failure=3pts (most to learn), success=2pts, unknown=1pt
- Tool calls: >10 tool invocations=2pts (complex sessions)
Usage:
python3 sampler.py --count 100 # Top 100 sessions
python3 sampler.py --repo the-nexus --count 20 # Top 20 for a repo
python3 sampler.py --since 2026-04-01 # All sessions since date
python3 sampler.py --count 50 --min-score 8 # Only high-value sessions
python3 sampler.py --count 100 --output sample.json # Save to file
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
# --- Fast session scanning (no full parse) ---
def scan_session_fast(path: str) -> dict:
"""Extract scoring metadata from a session without parsing the full JSONL.
Reads only: first line, last ~20 lines, and line count. This processes
20k sessions in seconds instead of minutes.
"""
meta = {
'path': path,
'message_count': 0,
'has_tool_calls': False,
'tool_call_count': 0,
'first_timestamp': '',
'last_timestamp': '',
'is_failure': False,
'repos_mentioned': set(),
'first_role': '',
'last_content_preview': '',
}
try:
file_size = os.path.getsize(path)
if file_size == 0:
return meta
with open(path, 'r', encoding='utf-8', errors='replace') as f:
# Read first line for timestamp + role
first_line = f.readline().strip()
if first_line:
try:
first_msg = json.loads(first_line)
meta['first_timestamp'] = first_msg.get('timestamp', '')
meta['first_role'] = first_msg.get('role', '')
except json.JSONDecodeError:
pass
# Fast line count + collect tail lines
# For the tail, seek to near end of file
tail_lines = []
line_count = 1 # already read first
if file_size > 8192:
# Seek to last 8KB for tail sampling
f.seek(max(0, file_size - 8192))
f.readline() # skip partial line
for line in f:
line = line.strip()
if line:
tail_lines.append(line)
line_count += 1
# We lost the exact count for big files — estimate from file size
# Average JSONL line is ~500 bytes
if line_count < 100:
line_count = max(line_count, file_size // 500)
else:
# Small file — read all
for line in f:
line = line.strip()
if line:
tail_lines.append(line)
line_count += 1
meta['message_count'] = line_count
# Parse tail lines for outcome, tool calls, repos
for line in tail_lines[-30:]: # last 30 non-empty lines
try:
msg = json.loads(line)
# Track last timestamp
ts = msg.get('timestamp', '')
if ts:
meta['last_timestamp'] = ts
# Count tool calls
if msg.get('tool_calls'):
meta['has_tool_calls'] = True
meta['tool_call_count'] += len(msg['tool_calls'])
# Detect failure signals in content
content = ''
if isinstance(msg.get('content'), str):
content = msg['content'].lower()
elif isinstance(msg.get('content'), list):
for part in msg['content']:
if isinstance(part, dict) and part.get('type') == 'text':
content += part.get('text', '').lower()
if content:
meta['last_content_preview'] = content[:200]
failure_signals = ['error', 'failed', 'cannot', 'unable',
'exception', 'traceback', 'rejected', 'denied']
if any(sig in content for sig in failure_signals):
meta['is_failure'] = True
# Extract repo references from tool call arguments
if msg.get('tool_calls'):
for tc in msg['tool_calls']:
args = tc.get('function', {}).get('arguments', '')
if isinstance(args, str):
# Look for repo patterns
for pattern in ['Timmy_Foundation/', 'Rockachopa/', 'compounding-intelligence', 'the-nexus', 'timmy-home', 'hermes-agent', 'the-beacon', 'the-door']:
if pattern in args:
repo = pattern.rstrip('/')
meta['repos_mentioned'].add(repo)
except json.JSONDecodeError:
continue
except (IOError, OSError):
pass
meta['repos_mentioned'] = list(meta['repos_mentioned'])
return meta
# --- Filename timestamp parsing ---
def parse_session_timestamp(filename: str) -> Optional[datetime]:
"""Parse timestamp from session filename.
Common formats:
session_20260413_123456_hash.jsonl
20260413_123456_hash.jsonl
"""
stem = Path(filename).stem
parts = stem.split('_')
# Try session_YYYYMMDD_HHMMSS format
for i, part in enumerate(parts):
if len(part) == 8 and part.isdigit():
date_part = part
time_part = parts[i + 1] if i + 1 < len(parts) and len(parts[i + 1]) == 6 else '000000'
try:
return datetime.strptime(f"{date_part}_{time_part}", '%Y%m%d_%H%M%S').replace(tzinfo=timezone.utc)
except ValueError:
continue
# Fallback: use file modification time
return None
# --- Scoring ---
def score_session(meta: dict, now: datetime, seen_repos: set) -> tuple[int, dict]:
"""Score a session for harvest value. Returns (score, breakdown)."""
score = 0
breakdown = {}
# 1. Recency
ts = parse_session_timestamp(os.path.basename(meta['path']))
if ts is None:
# Fallback to mtime
try:
ts = datetime.fromtimestamp(os.path.getmtime(meta['path']), tz=timezone.utc)
except OSError:
ts = now - timedelta(days=365)
age_days = (now - ts).days
if age_days <= 7:
recency = 3
elif age_days <= 30:
recency = 2
else:
recency = 1
score += recency
breakdown['recency'] = recency
# 2. Length
count = meta['message_count']
if count > 50:
length = 3
elif count > 20:
length = 2
else:
length = 1
score += length
breakdown['length'] = length
# 3. Repo uniqueness (first session mentioning a repo gets bonus)
repo_score = 0
for repo in meta.get('repos_mentioned', []):
if repo not in seen_repos:
seen_repos.add(repo)
repo_score = max(repo_score, 5)
else:
repo_score = max(repo_score, 1)
score += repo_score
breakdown['repo_unique'] = repo_score
# 4. Outcome
if meta.get('is_failure'):
outcome = 3
elif meta.get('last_content_preview', '').strip():
outcome = 2 # has some content = likely completed
else:
outcome = 1
score += outcome
breakdown['outcome'] = outcome
# 5. Tool calls
if meta.get('tool_call_count', 0) > 10:
tool = 2
else:
tool = 0
score += tool
breakdown['tool_calls'] = tool
return score, breakdown
# --- Main ---
def main():
parser = argparse.ArgumentParser(description="Score and rank sessions for harvesting")
parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'),
help='Directory containing session files')
parser.add_argument('--count', type=int, default=100, help='Number of top sessions to return')
parser.add_argument('--repo', default='', help='Filter to sessions mentioning this repo')
parser.add_argument('--since', default='', help='Only score sessions after this date (YYYY-MM-DD)')
parser.add_argument('--min-score', type=int, default=0, help='Minimum score threshold')
parser.add_argument('--output', default='', help='Output file (JSON). Default: stdout')
parser.add_argument('--format', choices=['json', 'paths', 'table'], default='table',
help='Output format: json (full), paths (one per line), table (human)')
parser.add_argument('--top-percent', type=float, default=0, help='Return top N%% instead of --count')
args = parser.parse_args()
sessions_dir = Path(args.sessions_dir)
if not sessions_dir.is_dir():
print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr)
sys.exit(1)
# Find all JSONL files
print(f"Scanning {sessions_dir}...", file=sys.stderr)
t0 = time.time()
session_files = list(sessions_dir.glob('*.jsonl'))
total = len(session_files)
print(f"Found {total} session files", file=sys.stderr)
# Parse since date
since_dt = None
if args.since:
since_dt = datetime.strptime(args.since, '%Y-%m-%d').replace(tzinfo=timezone.utc)
# Score all sessions
now = datetime.now(timezone.utc)
seen_repos = set() # Track repos for uniqueness scoring
scored = []
for i, sf in enumerate(session_files):
# Date filter (fast path: check filename first)
if since_dt:
ts = parse_session_timestamp(sf.name)
if ts and ts < since_dt:
continue
meta = scan_session_fast(str(sf))
# Repo filter
if args.repo:
repos = meta.get('repos_mentioned', [])
if args.repo.lower() not in [r.lower() for r in repos]:
# Also check filename
if args.repo.lower() not in sf.name.lower():
continue
score, breakdown = score_session(meta, now, seen_repos)
if score >= args.min_score:
scored.append({
'path': str(sf),
'filename': sf.name,
'score': score,
'breakdown': breakdown,
'message_count': meta['message_count'],
'repos': meta['repos_mentioned'],
'is_failure': meta['is_failure'],
})
if (i + 1) % 5000 == 0:
elapsed = time.time() - t0
print(f" Scanned {i + 1}/{total} ({elapsed:.1f}s)", file=sys.stderr)
elapsed = time.time() - t0
print(f"Scored {len(scored)} sessions in {elapsed:.1f}s", file=sys.stderr)
# Sort by score descending
scored.sort(key=lambda x: x['score'], reverse=True)
# Apply count or percent
if args.top_percent > 0:
count = max(1, int(len(scored) * args.top_percent / 100))
else:
count = args.count
scored = scored[:count]
# Output
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(scored, f, indent=2)
print(f"Wrote {len(scored)} sessions to {args.output}", file=sys.stderr)
elif args.format == 'json':
json.dump(scored, sys.stdout, indent=2)
elif args.format == 'paths':
for s in scored:
print(s['path'])
else: # table
print(f"{'SCORE':>5} {'MSGS':>5} {'REPOS':<25} {'FILE'}")
print(f"{'-'*5} {'-'*5} {'-'*25} {'-'*40}")
for s in scored:
repos = ', '.join(s['repos'][:2]) if s['repos'] else '-'
fail = ' FAIL' if s['is_failure'] else ''
print(f"{s['score']:>5} {s['message_count']:>5} {repos:<25} {s['filename'][:40]}{fail}")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""Tests for scripts/knowledge_staleness_check.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location("ks", os.path.join(os.path.dirname(__file__) or ".", "knowledge_staleness_check.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
check_staleness = mod.check_staleness
fix_hashes = mod.fix_hashes
compute_file_hash = mod.compute_file_hash
def test_fresh_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("print('hello')")
h = compute_file_hash(src)
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "hello", "source_file": "source.py", "source_hash": h}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "fresh"
print("PASS: test_fresh_entry")
def test_stale_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("original content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "old", "source_file": "source.py", "source_hash": "sha256:wrong"}]}, f)
# Now change the source
with open(src, "w") as f:
f.write("modified content")
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "stale"
print("PASS: test_stale_entry")
def test_missing_source():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "gone", "source_file": "nonexistent.py", "source_hash": "sha256:abc"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "missing_source"
print("PASS: test_missing_source")
def test_no_hash():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "no hash", "source_file": "source.py"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_hash"
assert results[0]["current_hash"].startswith("sha256:")
print("PASS: test_no_hash")
def test_no_source_field():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "orphan"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_source"
print("PASS: test_no_source_field")
def test_fix_hashes():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content for hashing")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "needs hash", "source_file": "source.py"}]}, f)
fixed = fix_hashes(idx, tmpdir)
assert fixed == 1
# Verify hash was added
with open(idx) as f:
data = json.load(f)
assert data["facts"][0]["source_hash"].startswith("sha256:")
print("PASS: test_fix_hashes")
def test_empty_index():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": []}, f)
results = check_staleness(idx, tmpdir)
assert results == []
print("PASS: test_empty_index")
def test_compute_hash_nonexistent():
h = compute_file_hash("/nonexistent/path/file.py")
assert h is None
print("PASS: test_compute_hash_nonexistent")
def run_all():
test_fresh_entry()
test_stale_entry()
test_missing_source()
test_no_hash()
test_no_source_field()
test_fix_hashes()
test_empty_index()
test_compute_hash_nonexistent()
print("\nAll 8 tests passed!")
if __name__ == "__main__":
run_all()

View File

@@ -0,0 +1,319 @@
#!/usr/bin/env python3
"""
Tests for Performance Bottleneck Finder.
"""
import json
import os
import tempfile
import textwrap
from pathlib import Path
import pytest
# Add scripts to path
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts"))
from perf_bottleneck_finder import (
Bottleneck,
PerfReport,
find_slow_tests_by_scan,
analyze_build_artifacts,
analyze_makefile_targets,
find_slow_imports,
generate_report,
format_markdown,
severity_sort_key,
)
class TestBottleneck:
"""Test Bottleneck dataclass."""
def test_creation(self):
b = Bottleneck(
category="test",
name="test_foo",
duration_s=5.0,
severity="warning",
recommendation="Mock it"
)
assert b.category == "test"
assert b.name == "test_foo"
assert b.duration_s == 5.0
assert b.severity == "warning"
assert b.recommendation == "Mock it"
assert b.file_path is None
assert b.line_number is None
def test_with_location(self):
b = Bottleneck(
category="test",
name="test_bar",
duration_s=2.0,
severity="info",
recommendation="Consider",
file_path="tests/test_bar.py",
line_number=42
)
assert b.file_path == "tests/test_bar.py"
assert b.line_number == 42
def test_to_dict(self):
b = Bottleneck("test", "x", 1.0, "info", "y")
d = b.__dict__
assert "category" in d
assert "duration_s" in d
class TestPerfReport:
"""Test PerfReport dataclass."""
def test_creation(self):
report = PerfReport(
timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo"
)
assert report.timestamp == "2026-01-01T00:00:00Z"
assert report.bottlenecks == []
assert report.summary == {}
def test_to_dict(self):
report = PerfReport(
timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo",
bottlenecks=[Bottleneck("test", "x", 1.0, "info", "y")]
)
d = report.to_dict()
assert "bottlenecks" in d
assert len(d["bottlenecks"]) == 1
class TestSeveritySort:
"""Test severity sorting."""
def test_critical_first(self):
items = [
Bottleneck("test", "a", 1.0, "info", ""),
Bottleneck("test", "b", 0.5, "critical", ""),
Bottleneck("test", "c", 2.0, "warning", ""),
]
items.sort(key=severity_sort_key)
assert items[0].severity == "critical"
assert items[1].severity == "warning"
assert items[2].severity == "info"
def test_duration_within_severity(self):
items = [
Bottleneck("test", "slow", 10.0, "warning", ""),
Bottleneck("test", "fast", 1.0, "warning", ""),
]
items.sort(key=severity_sort_key)
assert items[0].name == "slow" # Higher duration first within same severity
class TestSlowTestScan:
"""Test slow test pattern scanning."""
def test_finds_sleep(self, tmp_path):
test_file = tmp_path / "test_sleepy.py"
test_file.write_text(textwrap.dedent('''
import time
def test_slow():
time.sleep(5)
assert True
'''))
bottlenecks = find_slow_tests_by_scan(str(tmp_path))
assert len(bottlenecks) >= 1
assert any("sleep" in b.recommendation.lower() for b in bottlenecks)
def test_finds_http_calls(self, tmp_path):
test_file = tmp_path / "test_http.py"
test_file.write_text(textwrap.dedent('''
import requests
def test_api():
resp = requests.get("https://example.com")
assert resp.status_code == 200
'''))
bottlenecks = find_slow_tests_by_scan(str(tmp_path))
assert len(bottlenecks) >= 1
assert any("HTTP" in b.recommendation or "mock" in b.recommendation.lower() for b in bottlenecks)
def test_skips_non_test_files(self, tmp_path):
src_file = tmp_path / "main.py"
src_file.write_text("import time\ntime.sleep(10)\n")
bottlenecks = find_slow_tests_by_scan(str(tmp_path))
assert len(bottlenecks) == 0
def test_handles_missing_dir(self):
bottlenecks = find_slow_tests_by_scan("/nonexistent/path")
assert bottlenecks == []
def test_file_path_populated(self, tmp_path):
test_file = tmp_path / "test_example.py"
test_file.write_text("import time\n\ndef test_it():\n time.sleep(2)\n")
bottlenecks = find_slow_tests_by_scan(str(tmp_path))
assert len(bottlenecks) >= 1
assert bottlenecks[0].file_path is not None
assert bottlenecks[0].line_number is not None
class TestBuildArtifacts:
"""Test build artifact analysis."""
def test_finds_large_node_modules(self, tmp_path):
nm = tmp_path / "node_modules"
nm.mkdir()
# Create a file > 10MB
big_file = nm / "big.txt"
big_file.write_bytes(b"x" * (11 * 1024 * 1024))
bottlenecks = analyze_build_artifacts(str(tmp_path))
assert len(bottlenecks) >= 1
assert any("node_modules" in b.name for b in bottlenecks)
def test_ignores_small_dirs(self, tmp_path):
nm = tmp_path / "node_modules"
nm.mkdir()
small_file = nm / "small.txt"
small_file.write_bytes(b"x" * 100)
bottlenecks = analyze_build_artifacts(str(tmp_path))
assert not any("node_modules" in b.name for b in bottlenecks)
def test_finds_pycache(self, tmp_path):
cache = tmp_path / "__pycache__"
cache.mkdir()
big_file = cache / "big.pyc"
big_file.write_bytes(b"x" * (11 * 1024 * 1024))
bottlenecks = analyze_build_artifacts(str(tmp_path))
assert any("__pycache__" in b.name for b in bottlenecks)
class TestMakefileAnalysis:
"""Test Makefile analysis."""
def test_finds_pip_install(self, tmp_path):
makefile = tmp_path / "Makefile"
makefile.write_text(textwrap.dedent('''
install:
pip install -r requirements.txt
test:
pytest
'''))
bottlenecks = analyze_makefile_targets(str(tmp_path))
assert len(bottlenecks) >= 1
def test_no_makefile(self, tmp_path):
bottlenecks = analyze_makefile_targets(str(tmp_path))
assert bottlenecks == []
class TestImportAnalysis:
"""Test heavy import detection."""
def test_finds_pandas(self, tmp_path):
src = tmp_path / "analysis.py"
src.write_text("import pandas as pd\n")
bottlenecks = find_slow_imports(str(tmp_path))
assert len(bottlenecks) >= 1
assert any("pandas" in b.name for b in bottlenecks)
def test_finds_torch(self, tmp_path):
src = tmp_path / "model.py"
src.write_text("import torch\n")
bottlenecks = find_slow_imports(str(tmp_path))
assert any("torch" in b.name for b in bottlenecks)
def test_skips_light_imports(self, tmp_path):
src = tmp_path / "utils.py"
src.write_text("import json\nimport os\nimport sys\n")
bottlenecks = find_slow_imports(str(tmp_path))
assert len(bottlenecks) == 0
class TestGenerateReport:
"""Test full report generation."""
def test_empty_repo(self, tmp_path):
report = generate_report(str(tmp_path))
assert report.summary["total_bottlenecks"] >= 0
assert "critical" in report.summary
assert "warning" in report.summary
def test_with_findings(self, tmp_path):
# Create a test file with issues
test_file = tmp_path / "test_slow.py"
test_file.write_text(textwrap.dedent('''
import time
import requests
def test_sleepy():
time.sleep(3)
def test_http():
requests.get("https://example.com")
'''))
report = generate_report(str(tmp_path))
assert report.summary["total_bottlenecks"] >= 2
assert len(report.bottlenecks) > 0
def test_summary_categories(self, tmp_path):
report = generate_report(str(tmp_path))
assert "by_category" in report.summary
class TestMarkdownReport:
"""Test markdown output."""
def test_format(self):
report = PerfReport(
timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo",
bottlenecks=[
Bottleneck("test", "slow_test", 5.0, "critical", "Fix it")
],
summary={
"total_bottlenecks": 1,
"critical": 1,
"warning": 0,
"info": 0,
"estimated_total_slowdown_s": 5.0,
"by_category": {"test": 1},
}
)
md = format_markdown(report)
assert "# Performance Bottleneck Report" in md
assert "slow_test" in md
assert "🔴" in md
assert "Fix it" in md
def test_empty_report(self):
report = PerfReport(
timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo",
summary={
"total_bottlenecks": 0,
"critical": 0,
"warning": 0,
"info": 0,
"estimated_total_slowdown_s": 0,
"by_category": {},
}
)
md = format_markdown(report)
assert "Total bottlenecks:** 0" in md