Compare commits

..

1 Commits

Author SHA1 Message Date
77a753f6f2 feat: dead code detector for Python codebases (#94) 2026-04-15 03:46:43 +00:00
2 changed files with 282 additions and 353 deletions

View File

@@ -0,0 +1,282 @@
#!/usr/bin/env python3
"""
Dead Code Detector for Python Codebases
AST-based analysis to find defined but never-called functions and classes.
Excludes entry points, plugin hooks, __init__ exports.
Usage:
python3 scripts/dead_code_detector.py /path/to/repo/
python3 scripts/dead_code_detector.py hermes-agent/ --format json
python3 scripts/dead_code_detector.py . --exclude tests/,venv/
Output: file:line, function/class name, last git author (if available)
"""
import argparse
import ast
import json
import os
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
from typing import Optional
# Names that are expected to be unused (entry points, protocol methods, etc.)
SAFE_UNUSED_PATTERNS = {
# Python dunders
"__init__", "__str__", "__repr__", "__eq__", "__hash__", "__len__",
"__getitem__", "__setitem__", "__contains__", "__iter__", "__next__",
"__enter__", "__exit__", "__call__", "__bool__", "__del__",
"__post_init__", "__class_getitem__",
# Common entry points
"main", "app", "handler", "setup", "teardown", "fixture",
# pytest
"conftest", "test_", "pytest_", # prefix patterns
# Protocols / abstract
"abstractmethod", "abc_",
}
def is_safe_unused(name: str, filepath: str) -> bool:
"""Check if an unused name is expected to be unused."""
# Test files are exempt
if "test" in filepath.lower():
return True
# Known patterns
for pattern in SAFE_UNUSED_PATTERNS:
if name.startswith(pattern) or name == pattern:
return True
# __init__.py exports are often unused internally
if filepath.endswith("__init__.py"):
return True
return False
def get_git_blame(filepath: str, lineno: int) -> Optional[str]:
"""Get last author of a line via git blame."""
try:
result = subprocess.run(
["git", "blame", "-L", f"{lineno},{lineno}", "--porcelain", filepath],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.split("\n"):
if line.startswith("author "):
return line[7:]
except:
pass
return None
class DefinitionCollector(ast.NodeVisitor):
"""Collect all function and class definitions."""
def __init__(self):
self.definitions = [] # (name, type, lineno, filepath)
def visit_FunctionDef(self, node):
self.definitions.append((node.name, "function", node.lineno))
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node):
self.definitions.append((node.name, "async_function", node.lineno))
self.generic_visit(node)
def visit_ClassDef(self, node):
self.definitions.append((node.name, "class", node.lineno))
self.generic_visit(node)
class NameUsageCollector(ast.NodeVisitor):
"""Collect all name references (calls, imports, attribute access)."""
def __init__(self):
self.names = set()
self.calls = set()
self.imports = set()
def visit_Name(self, node):
self.names.add(node.id)
self.generic_visit(node)
def visit_Attribute(self, node):
if isinstance(node.value, ast.Name):
self.names.add(node.value.id)
self.generic_visit(node)
def visit_Call(self, node):
if isinstance(node.func, ast.Name):
self.calls.add(node.func.id)
elif isinstance(node.func, ast.Attribute):
if isinstance(node.func.value, ast.Name):
self.names.add(node.func.value.id)
self.calls.add(node.func.attr)
self.generic_visit(node)
def visit_Import(self, node):
for alias in node.names:
self.imports.add(alias.asname or alias.name)
self.generic_visit(node)
def visit_ImportFrom(self, node):
for alias in node.names:
self.imports.add(alias.asname or alias.name)
self.generic_visit(node)
def analyze_file(filepath: str) -> dict:
"""Analyze a single Python file for dead code."""
path = Path(filepath)
try:
content = path.read_text()
tree = ast.parse(content, filename=str(filepath))
except (SyntaxError, UnicodeDecodeError):
return {"error": f"Could not parse {filepath}"}
# Collect definitions
def_collector = DefinitionCollector()
def_collector.visit(tree)
definitions = def_collector.definitions
# Collect usage
usage_collector = NameUsageCollector()
usage_collector.visit(tree)
used_names = usage_collector.names | usage_collector.calls | usage_collector.imports
# Also scan the entire repo for references to this file's definitions
# (this is done at the repo level, not file level)
dead = []
for name, def_type, lineno in definitions:
if name.startswith("_") and not name.startswith("__"):
# Private functions — might be used externally, less likely dead
pass
if name not in used_names:
if not is_safe_unused(name, filepath):
dead.append({
"name": name,
"type": def_type,
"file": filepath,
"line": lineno,
})
return {"definitions": len(definitions), "dead": dead}
def scan_repo(repo_path: str, exclude_patterns: list = None) -> dict:
"""Scan an entire repo for dead code."""
path = Path(repo_path)
exclude = exclude_patterns or ["venv", ".venv", "node_modules", "__pycache__",
".git", "dist", "build", ".tox", "vendor"]
all_definitions = {} # name -> [{file, line, type}]
all_files = []
dead_code = []
# First pass: collect all definitions across repo
for fpath in path.rglob("*.py"):
parts = fpath.parts
if any(ex in parts for ex in exclude):
continue
if fpath.name.startswith("."):
continue
try:
content = fpath.read_text(errors="ignore")
tree = ast.parse(content, filename=str(fpath))
except:
continue
all_files.append(str(fpath))
collector = DefinitionCollector()
collector.visit(tree)
for name, def_type, lineno in collector.definitions:
rel_path = str(fpath.relative_to(path))
if name not in all_definitions:
all_definitions[name] = []
all_definitions[name].append({
"file": rel_path,
"line": lineno,
"type": def_type,
})
# Second pass: check each name for usage across entire repo
all_used_names = set()
for fpath_str in all_files:
try:
content = Path(fpath_str).read_text(errors="ignore")
tree = ast.parse(content)
except:
continue
usage = NameUsageCollector()
usage.visit(tree)
all_used_names.update(usage.names)
all_used_names.update(usage.calls)
all_used_names.update(usage.imports)
# Find dead code
for name, locations in all_definitions.items():
if name not in all_used_names:
for loc in locations:
if not is_safe_unused(name, loc["file"]):
dead_code.append({
"name": name,
"type": loc["type"],
"file": loc["file"],
"line": loc["line"],
})
return {
"repo": path.name,
"files_scanned": len(all_files),
"total_definitions": sum(len(v) for v in all_definitions.values()),
"dead_code_count": len(dead_code),
"dead_code": sorted(dead_code, key=lambda x: (x["file"], x["line"])),
}
def main():
parser = argparse.ArgumentParser(description="Find dead code in Python codebases")
parser.add_argument("repo", help="Repository path to scan")
parser.add_argument("--format", choices=["text", "json"], default="text")
parser.add_argument("--exclude", help="Comma-separated patterns to exclude")
parser.add_argument("--git-blame", action="store_true", help="Include git blame info")
args = parser.parse_args()
exclude = args.exclude.split(",") if args.exclude else None
result = scan_repo(args.repo, exclude)
if args.format == "json":
print(json.dumps(result, indent=2))
else:
print(f"Dead Code Report: {result['repo']}")
print(f"Files scanned: {result['files_scanned']}")
print(f"Total definitions: {result['total_definitions']}")
print(f"Dead code found: {result['dead_code_count']}")
print()
if result["dead_code"]:
print(f"{'File':<45} {'Line':>4} {'Type':<10} {'Name'}")
print("-" * 85)
for item in result["dead_code"]:
author = ""
if args.git_blame:
author = get_git_blame(
os.path.join(args.repo, item["file"]),
item["line"]
) or ""
author = f" ({author})" if author else ""
print(f"{item['file']:<45} {item['line']:>4} {item['type']:<10} {item['name']}{author}")
else:
print("No dead code detected!")
if __name__ == "__main__":
main()

View File

@@ -1,353 +0,0 @@
#!/usr/bin/env python3
"""
sampler.py — Score and rank sessions by harvest value.
With 20k+ sessions on disk, we can't harvest all at once. This script
scores each session by how likely it is to contain valuable knowledge,
so the harvester processes the best ones first.
Scoring strategy:
- Recency: last 7d=3pts, last 30d=2pts, older=1pt
- Length: >50 messages=3pts, >20=2pts, <20=1pt
- Repo uniqueness: first session for a repo=5pts, otherwise=1pt
- Outcome: failure=3pts (most to learn), success=2pts, unknown=1pt
- Tool calls: >10 tool invocations=2pts (complex sessions)
Usage:
python3 sampler.py --count 100 # Top 100 sessions
python3 sampler.py --repo the-nexus --count 20 # Top 20 for a repo
python3 sampler.py --since 2026-04-01 # All sessions since date
python3 sampler.py --count 50 --min-score 8 # Only high-value sessions
python3 sampler.py --count 100 --output sample.json # Save to file
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
# --- Fast session scanning (no full parse) ---
def scan_session_fast(path: str) -> dict:
"""Extract scoring metadata from a session without parsing the full JSONL.
Reads only: first line, last ~20 lines, and line count. This processes
20k sessions in seconds instead of minutes.
"""
meta = {
'path': path,
'message_count': 0,
'has_tool_calls': False,
'tool_call_count': 0,
'first_timestamp': '',
'last_timestamp': '',
'is_failure': False,
'repos_mentioned': set(),
'first_role': '',
'last_content_preview': '',
}
try:
file_size = os.path.getsize(path)
if file_size == 0:
return meta
with open(path, 'r', encoding='utf-8', errors='replace') as f:
# Read first line for timestamp + role
first_line = f.readline().strip()
if first_line:
try:
first_msg = json.loads(first_line)
meta['first_timestamp'] = first_msg.get('timestamp', '')
meta['first_role'] = first_msg.get('role', '')
except json.JSONDecodeError:
pass
# Fast line count + collect tail lines
# For the tail, seek to near end of file
tail_lines = []
line_count = 1 # already read first
if file_size > 8192:
# Seek to last 8KB for tail sampling
f.seek(max(0, file_size - 8192))
f.readline() # skip partial line
for line in f:
line = line.strip()
if line:
tail_lines.append(line)
line_count += 1
# We lost the exact count for big files — estimate from file size
# Average JSONL line is ~500 bytes
if line_count < 100:
line_count = max(line_count, file_size // 500)
else:
# Small file — read all
for line in f:
line = line.strip()
if line:
tail_lines.append(line)
line_count += 1
meta['message_count'] = line_count
# Parse tail lines for outcome, tool calls, repos
for line in tail_lines[-30:]: # last 30 non-empty lines
try:
msg = json.loads(line)
# Track last timestamp
ts = msg.get('timestamp', '')
if ts:
meta['last_timestamp'] = ts
# Count tool calls
if msg.get('tool_calls'):
meta['has_tool_calls'] = True
meta['tool_call_count'] += len(msg['tool_calls'])
# Detect failure signals in content
content = ''
if isinstance(msg.get('content'), str):
content = msg['content'].lower()
elif isinstance(msg.get('content'), list):
for part in msg['content']:
if isinstance(part, dict) and part.get('type') == 'text':
content += part.get('text', '').lower()
if content:
meta['last_content_preview'] = content[:200]
failure_signals = ['error', 'failed', 'cannot', 'unable',
'exception', 'traceback', 'rejected', 'denied']
if any(sig in content for sig in failure_signals):
meta['is_failure'] = True
# Extract repo references from tool call arguments
if msg.get('tool_calls'):
for tc in msg['tool_calls']:
args = tc.get('function', {}).get('arguments', '')
if isinstance(args, str):
# Look for repo patterns
for pattern in ['Timmy_Foundation/', 'Rockachopa/', 'compounding-intelligence', 'the-nexus', 'timmy-home', 'hermes-agent', 'the-beacon', 'the-door']:
if pattern in args:
repo = pattern.rstrip('/')
meta['repos_mentioned'].add(repo)
except json.JSONDecodeError:
continue
except (IOError, OSError):
pass
meta['repos_mentioned'] = list(meta['repos_mentioned'])
return meta
# --- Filename timestamp parsing ---
def parse_session_timestamp(filename: str) -> Optional[datetime]:
"""Parse timestamp from session filename.
Common formats:
session_20260413_123456_hash.jsonl
20260413_123456_hash.jsonl
"""
stem = Path(filename).stem
parts = stem.split('_')
# Try session_YYYYMMDD_HHMMSS format
for i, part in enumerate(parts):
if len(part) == 8 and part.isdigit():
date_part = part
time_part = parts[i + 1] if i + 1 < len(parts) and len(parts[i + 1]) == 6 else '000000'
try:
return datetime.strptime(f"{date_part}_{time_part}", '%Y%m%d_%H%M%S').replace(tzinfo=timezone.utc)
except ValueError:
continue
# Fallback: use file modification time
return None
# --- Scoring ---
def score_session(meta: dict, now: datetime, seen_repos: set) -> tuple[int, dict]:
"""Score a session for harvest value. Returns (score, breakdown)."""
score = 0
breakdown = {}
# 1. Recency
ts = parse_session_timestamp(os.path.basename(meta['path']))
if ts is None:
# Fallback to mtime
try:
ts = datetime.fromtimestamp(os.path.getmtime(meta['path']), tz=timezone.utc)
except OSError:
ts = now - timedelta(days=365)
age_days = (now - ts).days
if age_days <= 7:
recency = 3
elif age_days <= 30:
recency = 2
else:
recency = 1
score += recency
breakdown['recency'] = recency
# 2. Length
count = meta['message_count']
if count > 50:
length = 3
elif count > 20:
length = 2
else:
length = 1
score += length
breakdown['length'] = length
# 3. Repo uniqueness (first session mentioning a repo gets bonus)
repo_score = 0
for repo in meta.get('repos_mentioned', []):
if repo not in seen_repos:
seen_repos.add(repo)
repo_score = max(repo_score, 5)
else:
repo_score = max(repo_score, 1)
score += repo_score
breakdown['repo_unique'] = repo_score
# 4. Outcome
if meta.get('is_failure'):
outcome = 3
elif meta.get('last_content_preview', '').strip():
outcome = 2 # has some content = likely completed
else:
outcome = 1
score += outcome
breakdown['outcome'] = outcome
# 5. Tool calls
if meta.get('tool_call_count', 0) > 10:
tool = 2
else:
tool = 0
score += tool
breakdown['tool_calls'] = tool
return score, breakdown
# --- Main ---
def main():
parser = argparse.ArgumentParser(description="Score and rank sessions for harvesting")
parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'),
help='Directory containing session files')
parser.add_argument('--count', type=int, default=100, help='Number of top sessions to return')
parser.add_argument('--repo', default='', help='Filter to sessions mentioning this repo')
parser.add_argument('--since', default='', help='Only score sessions after this date (YYYY-MM-DD)')
parser.add_argument('--min-score', type=int, default=0, help='Minimum score threshold')
parser.add_argument('--output', default='', help='Output file (JSON). Default: stdout')
parser.add_argument('--format', choices=['json', 'paths', 'table'], default='table',
help='Output format: json (full), paths (one per line), table (human)')
parser.add_argument('--top-percent', type=float, default=0, help='Return top N%% instead of --count')
args = parser.parse_args()
sessions_dir = Path(args.sessions_dir)
if not sessions_dir.is_dir():
print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr)
sys.exit(1)
# Find all JSONL files
print(f"Scanning {sessions_dir}...", file=sys.stderr)
t0 = time.time()
session_files = list(sessions_dir.glob('*.jsonl'))
total = len(session_files)
print(f"Found {total} session files", file=sys.stderr)
# Parse since date
since_dt = None
if args.since:
since_dt = datetime.strptime(args.since, '%Y-%m-%d').replace(tzinfo=timezone.utc)
# Score all sessions
now = datetime.now(timezone.utc)
seen_repos = set() # Track repos for uniqueness scoring
scored = []
for i, sf in enumerate(session_files):
# Date filter (fast path: check filename first)
if since_dt:
ts = parse_session_timestamp(sf.name)
if ts and ts < since_dt:
continue
meta = scan_session_fast(str(sf))
# Repo filter
if args.repo:
repos = meta.get('repos_mentioned', [])
if args.repo.lower() not in [r.lower() for r in repos]:
# Also check filename
if args.repo.lower() not in sf.name.lower():
continue
score, breakdown = score_session(meta, now, seen_repos)
if score >= args.min_score:
scored.append({
'path': str(sf),
'filename': sf.name,
'score': score,
'breakdown': breakdown,
'message_count': meta['message_count'],
'repos': meta['repos_mentioned'],
'is_failure': meta['is_failure'],
})
if (i + 1) % 5000 == 0:
elapsed = time.time() - t0
print(f" Scanned {i + 1}/{total} ({elapsed:.1f}s)", file=sys.stderr)
elapsed = time.time() - t0
print(f"Scored {len(scored)} sessions in {elapsed:.1f}s", file=sys.stderr)
# Sort by score descending
scored.sort(key=lambda x: x['score'], reverse=True)
# Apply count or percent
if args.top_percent > 0:
count = max(1, int(len(scored) * args.top_percent / 100))
else:
count = args.count
scored = scored[:count]
# Output
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(scored, f, indent=2)
print(f"Wrote {len(scored)} sessions to {args.output}", file=sys.stderr)
elif args.format == 'json':
json.dump(scored, sys.stdout, indent=2)
elif args.format == 'paths':
for s in scored:
print(s['path'])
else: # table
print(f"{'SCORE':>5} {'MSGS':>5} {'REPOS':<25} {'FILE'}")
print(f"{'-'*5} {'-'*5} {'-'*25} {'-'*40}")
for s in scored:
repos = ', '.join(s['repos'][:2]) if s['repos'] else '-'
fail = ' FAIL' if s['is_failure'] else ''
print(f"{s['score']:>5} {s['message_count']:>5} {repos:<25} {s['filename'][:40]}{fail}")
if __name__ == '__main__':
main()