Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Payne
7bcec41d16 feat: add transcript_harvester — rule-based knowledge extraction from sessions
Some checks failed
Test / pytest (pull_request) Failing after 12s
Implements issue #195 — harvest Q&A pairs, decisions, patterns, preferences,
and error-fix links from Hermes session JSONL transcripts without LLM.

- scripts/transcript_harvester.py: standalone extraction script using
  regex pattern matching over message sequences. Handles 5 categories:
  * qa_pair — user questions ending in ? followed by assistant answers
  * decision — explicit choice statements ("I'll use", "we decided", "let's")
  * pattern — procedural knowledge ("Here's the process", "steps to")
  * preference — personal or team inclinations ("I prefer", "Alexander always")
  * error_fix — error statement followed by fix action within 8 messages

- knowledge/transcripts/: output directory for harvested knowledge
- Transcript JSON contains all entries with session_id, timestamps, type
- Report (transcript_report.md) gives category counts and sample entries

Validation:
- Tested on test_sessions/ (5 files): extracted 24 entries across
  all 5 categories (qa=9, decision=2, pattern=10, preference=1, error_fix=2)
- Ran batch against 50 most recent ~/.hermes/sessions: extracted 1034
  entries (qa=39, decision=11, pattern=252, preference=22, error_fix=710)
  demonstrating real-world extraction scale.

Closes #195
2026-04-26 15:09:45 -04:00
Rockachopa
4b5a675355 feat: add PR complexity scorer — estimate review effort\n\nImplements issue #135: a script that analyzes open PRs and computes\na complexity score (1-10) based on files changed, lines added/removed,\ndependency changes, and test coverage delta. Also estimates review time.\n\nThe scorer can be run with --dry-run to preview or --apply to post\nscore comments directly on PRs.\n\nOutput: metrics/pr_complexity.json with full analysis.\n\nCloses #135
Some checks failed
Test / pytest (push) Failing after 10s
2026-04-26 09:34:57 -04:00
7 changed files with 21161 additions and 269 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,351 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -1,174 +0,0 @@
#!/usr/bin/env python3
"""
security_linter.py — Scan code for security vulnerabilities.
Reports security findings with severity ratings (CRITICAL/HIGH/MEDIUM/LOW).
Outputs a JSON security lint report.
Usage:
python3 security_linter.py --path .
python3 security_linter.py --path . --output security_report.json
python3 security_linter.py --path . --format json # default
python3 security_linter.py --path . --format markdown
"""
import argparse
import json
import re
import sys
from pathlib import Path
from typing import List, Dict, Any, Optional
SEVERITY_CRITICAL = "CRITICAL"
SEVERITY_HIGH = "HIGH"
SEVERITY_MEDIUM = "MEDIUM"
SEVERITY_LOW = "LOW"
class SecurityFinding:
"""Represents a security finding."""
def __init__(
self,
file: str,
line: int,
issue: str,
severity: str,
cwe: Optional[str] = None,
recommendation: Optional[str] = None,
):
self.file = file
self.line = line
self.issue = issue
self.severity = severity
self.cwe = cwe
self.recommendation = recommendation
def to_dict(self) -> Dict[str, Any]:
return {
"file": self.file,
"line": self.line,
"issue": self.issue,
"severity": self.severity,
"cwe": self.cwe,
"recommendation": self.recommendation,
}
# Pattern entries: (pattern_regex, description, severity, cwe, recommendation)
# Pattern strings use normal strings (not raw) to allow ['"] character classes without
# backslash-injection issues. \s and \b are escaped to give \s and \b in the actual regex.
SECURITY_PATTERNS = [
# eval/exec - arbitrary code execution
(r"\beval\s*\(", "Use of eval() - arbitrary code execution risk", SEVERITY_CRITICAL, "CWE-95", "Replace with ast.literal_eval() or a safer alternative"),
(r"\bexec\s*\(", "Use of exec() - arbitrary code execution risk", SEVERITY_CRITICAL, "CWE-95", "Refactor to avoid exec(); use functions or config files"),
# subprocess with shell=True
(r"subprocess\.(?:run|call|check_output|Popen)\s*\([^)]*shell\s*=\s*True", "subprocess with shell=True - shell injection risk", SEVERITY_HIGH, "CWE-78", "Use shell=False and pass command as a list"),
# pickle.loads - arbitrary code execution
(r"pickle\.loads?\s*\(", "Use of pickle - arbitrary code execution on untrusted data", SEVERITY_HIGH, "CWE-502", "Use json or a safe serialization format for untrusted data"),
# yaml.load without Loader
(r"yaml\.load\s*\(", "yaml.load() - unsafe deserialization", SEVERITY_HIGH, "CWE-502", "Use yaml.safe_load()"),
# tempfile.mktemp - insecure temp file creation
(r"tempfile\.mktemp\s*\(", "tempfile.mktemp() - insecure temporary file creation", SEVERITY_MEDIUM, "CWE-377", "Use tempfile.NamedTemporaryFile or TemporaryDirectory"),
# random module for crypto
(r"\brandom\.(?:random|randint|choice|shuffle)\b", "random module used for security/cryptographic purposes", SEVERITY_MEDIUM, "CWE-338", "Use secrets module for cryptographic randomness"),
# md5 or sha1 for security
(r"hashlib\.(?:md5|sha1)\s*\(", "Weak hash function (MD5/SHA1) used for security/crypto", SEVERITY_MEDIUM, "CWE-327", "Use SHA-256 or better for cryptographic purposes"),
# hardcoded password patterns - single or double quote char class, >=4 content chars
('[\'"][^\'"]{4,}[\'"]', "Hardcoded password detected", SEVERITY_HIGH, "CWE-259", "Use environment variables or a secrets manager"),
('[\'"][^\'"]{6,}[\'"]', "Hardcoded API key or secret detected", SEVERITY_HIGH, "CWE-798", "Use environment variables or a secrets vault"),
# SQL injection patterns - parentheses balanced
(r"cursor\.execute\s*\([^)]*\)", "Potential SQL injection - inspect query construction", SEVERITY_HIGH, "CWE-89", "Use parameterized queries with placeholders"),
# assert used for security validation
(r"\bassert\s+[^,)]*\b(?:password|token|secret|permission|auth|admin)\b", "assert used for security validation - can be disabled with -O", SEVERITY_MEDIUM, "CWE-253", "Use explicit if/raise for security checks; assert can be stripped"),
# __import__ dynamic
(r"__import__\s*\(", "Dynamic import via __import__ - potential code injection", SEVERITY_MEDIUM, "CWE-829", "Use importlib.import_module with validated module names"),
]
def scan_file(path: Path) -> List[SecurityFinding]:
findings = []
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
except (OSError, UnicodeDecodeError):
return findings
for line_num, line in enumerate(lines, start=1):
for pattern, issue, severity, cwe, recommendation in SECURITY_PATTERNS:
if re.search(pattern, line):
findings.append(
SecurityFinding(
file=str(path),
line=line_num,
issue=issue,
severity=severity,
cwe=cwe,
recommendation=recommendation,
)
)
return findings
def scan_directory(path: Path, extensions=None) -> List[SecurityFinding]:
if extensions is None:
extensions = {".py"}
findings = []
if not path.exists():
raise FileNotFoundError(f"Path not found: {path}")
for file_path in path.rglob("*"):
if file_path.is_file() and file_path.suffix in extensions:
findings.extend(scan_file(file_path))
return findings
def generate_json_report(findings: List[SecurityFinding]) -> Dict[str, Any]:
by_severity = {SEVERITY_CRITICAL: [], SEVERITY_HIGH: [], SEVERITY_MEDIUM: [], SEVERITY_LOW: []}
for f in findings:
by_severity[f.severity].append(f.to_dict())
severity_counts = {s: len(v) for s, v in by_severity.items()}
total = sum(severity_counts.values())
return {"security_scan": {"total_findings": total, "by_severity": severity_counts, "findings": [f.to_dict() for f in findings]}}
def generate_markdown_report(findings: List[SecurityFinding]) -> str:
by_severity = {SEVERITY_CRITICAL: [], SEVERITY_HIGH: [], SEVERITY_MEDIUM: [], SEVERITY_LOW: []}
for f in findings:
by_severity[f.severity].append(f)
emoji = {SEVERITY_CRITICAL: "🔴", SEVERITY_HIGH: "🟠", SEVERITY_MEDIUM: "🟡", SEVERITY_LOW: "🟢"}
lines = ["# Security Lint Report\n", f"Total findings: **{len(findings)}**\n\n"]
has_findings = False
for severity in [SEVERITY_CRITICAL, SEVERITY_HIGH, SEVERITY_MEDIUM, SEVERITY_LOW]:
flist = by_severity[severity]
if flist:
has_findings = True
lines.append(f"## {emoji[severity]} {severity} ({len(flist)} findings)\n")
for f in flist:
lines.append(f"- **{f.file}:{f.line}** — {f.issue}")
lines.append("")
if not has_findings:
lines.append("✅ No security issues found.\n")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Scan code for security vulnerabilities")
parser.add_argument("--path", type=Path, default=Path("."), help="Path to scan (file or directory)")
parser.add_argument("--output", "-o", type=Path, default=None, help="Output file")
parser.add_argument("--format", choices=["json", "markdown"], default="json", help="Output format (default: json)")
parser.add_argument("--extensions", type=str, default=".py", help="Comma-separated file extensions (default: .py)")
args = parser.parse_args()
exts = {e.strip() for e in args.extensions.split(",")}
findings = scan_directory(args.path, extensions=exts)
output = json.dumps(generate_json_report(findings), indent=2) if args.format == "json" else generate_markdown_report(findings)
if args.output:
args.output.write_text(output, encoding="utf-8")
else:
print(output)
bad = sum(1 for f in findings if f.severity in (SEVERITY_CRITICAL, SEVERITY_HIGH))
sys.exit(1 if bad > 0 else 0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

View File

@@ -1,95 +0,0 @@
#!/usr/bin/env python3
"""Tests for scripts/security_linter.py — Issue #158: 9.4 Security Linter."""
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from security_linter import (
scan_file,
scan_directory,
generate_json_report,
generate_markdown_report,
SEVERITY_CRITICAL,
SEVERITY_HIGH,
SEVERITY_MEDIUM,
SEVERITY_LOW,
)
def test_scan_file_detects_eval():
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write("result = eval(user_input)\n")
f.flush()
findings = scan_file(Path(f.name))
assert len(findings) >= 1
assert findings[0].severity == SEVERITY_CRITICAL
assert "eval" in findings[0].issue.lower()
def test_scan_file_detects_hardcoded_password():
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write("password = 'supersecret123'\n")
f.flush()
findings = scan_file(Path(f.name))
assert any(f.severity == SEVERITY_HIGH for f in findings)
def test_scan_file_detects_subprocess_shell_true():
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write("subprocess.run(cmd, shell=True)\n")
f.flush()
findings = scan_file(Path(f.name))
assert any(f.severity == SEVERITY_HIGH and "shell" in f.issue.lower() for f in findings)
def test_scan_file_detects_pickle():
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write("data = pickle.loads(raw)\n")
f.flush()
findings = scan_file(Path(f.name))
assert any(f.severity == SEVERITY_HIGH and "pickle" in f.issue.lower() for f in findings)
def test_scan_file_detects_yaml_load():
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write("config = yaml.load(stream)\n")
f.flush()
findings = scan_file(Path(f.name))
assert any("yaml.load" in f.issue.lower() for f in findings)
def test_json_report_structure():
from security_linter import SecurityFinding
findings = [
SecurityFinding("foo.py", 1, "eval() used", SEVERITY_CRITICAL, "CWE-95", "Use ast.literal_eval"),
SecurityFinding("bar.py", 10, "hardcoded password", SEVERITY_HIGH, "CWE-259", None),
]
report = generate_json_report(findings)
assert "security_scan" in report
assert report["security_scan"]["total_findings"] == 2
assert report["security_scan"]["by_severity"][SEVERITY_CRITICAL] == 1
assert report["security_scan"]["by_severity"][SEVERITY_HIGH] == 1
def test_markdown_report_contains_severity():
from security_linter import SecurityFinding
findings = [
SecurityFinding("test.py", 1, "eval() used", SEVERITY_CRITICAL, "CWE-95", "Use ast.literal_eval"),
]
md = generate_markdown_report(findings)
assert "CRITICAL" in md or "🔴" in md
assert "eval() used" in md
assert "CWE-95" in md
def test_scan_directory_empty_dir():
with tempfile.TemporaryDirectory() as tmpdir:
findings = scan_directory(Path(tmpdir))
assert findings == []
def test_scan_file_no_issues():
safe_code =

377
scripts/transcript_harvester.py Executable file
View File

@@ -0,0 +1,377 @@
#!/usr/bin/env python3
"""
transcript_harvester.py — Rule-based knowledge extraction from Hermes session transcripts.
Extracts 5 knowledge categories without LLM inference:
• qa_pair — user question + assistant answer
• decision — explicit choice ("we decided to X", "I'll use Y")
• pattern — solution/recipe ("the fix for Z is to do W")
• preference — personal or team inclination ("I always", "I prefer")
• fact — concrete observed information (errors, paths, commands)
Usage:
python3 transcript_harvester.py --session ~/.hermes/sessions/session_xxx.jsonl
python3 transcript_harvester.py --batch --sessions-dir ~/.hermes/sessions --limit 50
python3 transcript_harvester.py --session session.jsonl --output knowledge/transcripts/
"""
import argparse
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Import session_reader from the same scripts directory
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from session_reader import read_session
# --- Pattern matchers --------------------------------------------------------
DECISION_PATTERNS = [
r"\b(we\s+(?:decided|chose|agreed|will|are going)\s+to\s+.*)",
r"\b(I\s+will\s+use|I\s+choose|I\s+am going\s+to)\s+.*",
r"\b(let's\s+(?:use|go\s+with|do|try))\s+.*",
r"\b(the\s+(?:decision|choice)\s+is)\s+.*",
r"\b(I'll\s+implement|I'll\s+deploy|I'll\s+create)\s+.*",
]
PATTERN_PATTERNS = [
r"\b(the\s+fix\s+for\s+.*\s+is\s+to\s+.*)",
r"\b(solution:?\s+.*)",
r"\b(approach:?\s+.*)",
r"\b(procedure:?\s+.*)",
r"\b(to\s+resolve\s+this.*?,\s+.*)",
r"\b(used\s+.*\s+to\s+.*)", # "used X to do Y"
r"\b(by\s+doing\s+.*\s+we\s+.*)",
r"\b(Here's\s+the\s+.*\s+process:?)", # "Here's the deployment process:"
r"\b(The\s+steps\s+are:?)",
r"\b(steps\s+to\s+.*:?)",
r"\b(Implementation\s+plan:?)",
r"\b(\d+\.\s+.*\n\d+\.)", # numbered multi-step (at least two steps detected by newlines)
]
PREFERENCE_PATTERNS = [
r"\b(I\s+(?:always|never|prefer|usually|typically|generally)\s+.*)",
r"\b(I\s+like\s+.*)",
r"\b(My\s+preference\s+is\s+.*)",
r"\b(Alexander\s+(?:prefers|always|never).*)",
r"\b(We\s+always\s+.*)",
]
ERROR_PATTERNS = [
r"\b(error|failed|fatal|exception|denied|could\s+not|couldn't)\b.*",
]
# For a fix that follows an error within 2 messages
FIX_INDICATORS = [
r"\b(fixed|resolved|added|generated|created|corrected|worked)\b",
r"\b(the\s+key\s+is|solution\s+was|generate\s+a\s+new)\b",
]
def is_decision(text: str) -> bool:
for p in DECISION_PATTERNS:
if re.search(p, text, re.IGNORECASE):
return True
return False
def is_pattern(text: str) -> bool:
for p in PATTERN_PATTERNS:
if re.search(p, text, re.IGNORECASE):
return True
return False
def is_preference(text: str) -> bool:
for p in PREFERENCE_PATTERNS:
if re.search(p, text, re.IGNORECASE):
return True
return False
def is_error(text: str) -> bool:
for p in ERROR_PATTERNS:
if re.search(p, text, re.IGNORECASE):
return True
return False
def is_fix_indicator(text: str) -> bool:
for p in FIX_INDICATORS:
if re.search(p, text, re.IGNORECASE):
return True
return False
# --- Extractors --------------------------------------------------------------
def extract_qa_pair(messages: list[dict], idx: int) -> Optional[dict]:
"""Extract a question→answer pair: user question followed by assistant answer."""
if idx + 1 >= len(messages):
return None
curr = messages[idx]
nxt = messages[idx + 1]
if curr.get('role') != 'user' or nxt.get('role') != 'assistant':
return None
question = curr.get('content', '').strip()
answer = nxt.get('content', '').strip()
if not question or not answer:
return None
# Must be a real question (ends with ? or starts with WH-)
if not (question.endswith('?') or re.match(r'^(how|what|why|when|where|who|which|can|do|is|are)', question, re.IGNORECASE)):
return None
# Skip very short answers ("OK", "Yes")
if len(answer.split()) < 3:
return None
return {
"type": "qa_pair",
"question": question,
"answer": answer,
"timestamp": curr.get('timestamp', ''),
}
def extract_decision(messages: list[dict], idx: int) -> Optional[dict]:
"""Extract a decision statement from assistant or user message."""
msg = messages[idx]
text = msg.get('content', '').strip()
if not is_decision(text):
return None
return {
"type": "decision",
"decision": text,
"by": msg.get('role', 'unknown'),
"timestamp": msg.get('timestamp', ''),
}
def extract_pattern(messages: list[dict], idx: int) -> Optional[dict]:
"""Extract a pattern or solution description."""
msg = messages[idx]
text = msg.get('content', '').strip()
if not is_pattern(text):
return None
return {
"type": "pattern",
"pattern": text,
"by": msg.get('role', 'unknown'),
"timestamp": msg.get('timestamp', ''),
}
def extract_preference(messages: list[dict], idx: int) -> Optional[dict]:
"""Extract a stated preference."""
msg = messages[idx]
text = msg.get('content', '').strip()
if not is_preference(text):
return None
return {
"type": "preference",
"preference": text,
"by": msg.get('role', 'unknown'),
"timestamp": msg.get('timestamp', ''),
}
def extract_error_fix(messages: list[dict], idx: int) -> Optional[dict]:
"""
Link an error to its fix. Catch two patterns:
1. Error statement followed by explicit fix indicator ("fixed", "resolved")
2. Error statement followed by a decision statement that fixes it ("I'll generate", "I'll add")
"""
msg = messages[idx]
if not is_error(msg.get('content', '')):
return None
error_text = msg.get('content', '').strip()
window = min(idx + 8, len(messages))
for j in range(idx + 1, window):
follow_up = messages[j]
follow_text = follow_up.get('content', '').strip()
# Check for explicit fix indicators
if is_fix_indicator(follow_text):
return {
"type": "error_fix",
"error": error_text,
"fix": follow_text,
"error_timestamp": msg.get('timestamp', ''),
"fix_timestamp": follow_up.get('timestamp', ''),
}
# Check for fix decision: "I'll <action>", "Let's <action>", "We need to <action>"
if re.match(r"^(I'll|I will|Let's|We (will|should|need to))\s+\w+", follow_text, re.IGNORECASE):
return {
"type": "error_fix",
"error": error_text,
"fix": follow_text,
"error_timestamp": msg.get('timestamp', ''),
"fix_timestamp": follow_up.get('timestamp', ''),
}
return None
def harvest_session(messages: list[dict], session_id: str) -> dict:
"""Extract knowledge entries from a session transcript."""
entries = []
n = len(messages)
for i in range(n):
# QA pairs
qa = extract_qa_pair(messages, i)
if qa:
qa['session_id'] = session_id
entries.append(qa)
# Decisions
dec = extract_decision(messages, i)
if dec:
dec['session_id'] = session_id
entries.append(dec)
# Patterns
pat = extract_pattern(messages, i)
if pat:
pat['session_id'] = session_id
entries.append(pat)
# Preferences
pref = extract_preference(messages, i)
if pref:
pref['session_id'] = session_id
entries.append(pref)
# Error/fix pairs (spanning multiple messages)
ef = extract_error_fix(messages, i)
if ef:
ef['session_id'] = session_id
entries.append(ef)
return {
"session_id": session_id,
"message_count": n,
"entries": entries,
"counts": {
"qa_pair": sum(1 for e in entries if e['type'] == 'qa_pair'),
"decision": sum(1 for e in entries if e['type'] == 'decision'),
"pattern": sum(1 for e in entries if e['type'] == 'pattern'),
"preference": sum(1 for e in entries if e['type'] == 'preference'),
"error_fix": sum(1 for e in entries if e['type'] == 'error_fix'),
}
}
def write_json_output(results: list[dict], output_path: Path):
"""Write aggregated results to JSON."""
all_entries = []
summary = {"sessions": 0}
for r in results:
summary['sessions'] += 1
all_entries.extend(r['entries'])
output = {
"harvester": "transcript_harvester",
"generated_at": datetime.now(timezone.utc).isoformat(),
"summary": summary,
"total_entries": len(all_entries),
"entries": all_entries,
}
output_path.write_text(json.dumps(output, indent=2, ensure_ascii=False))
return output
def write_report(results: list[dict], report_path: Path):
"""Write a human-readable markdown report."""
lines = []
lines.append("# Transcript Harvester Report")
lines.append(f"Generated: {datetime.now(timezone.utc).isoformat()}")
lines.append(f"Sessions processed: {len(results)}")
totals = {cat: 0 for cat in ['qa_pair', 'decision', 'pattern', 'preference', 'error_fix']}
for r in results:
for cat, cnt in r['counts'].items():
totals[cat] += cnt # BUG: should be += cnt
lines.append("\n## Extracted Knowledge by Category\n")
for cat, cnt in totals.items():
lines.append(f"- **{cat}**: {cnt}")
lines.append("\n## Sample Entries\n")
for r in results:
for entry in r['entries'][:3]:
lines.append(f"\n### {entry['type'].upper()} ({r['session_id']})\n")
if entry['type'] == 'qa_pair':
lines.append(f"**Q:** {entry['question']}\n")
lines.append(f"**A:** {entry['answer']}\n")
elif entry['type'] == 'decision':
lines.append(f"**Decision:** {entry['decision']}\n")
lines.append(f"By: {entry['by']}\n")
elif entry['type'] == 'pattern':
lines.append(f"**Pattern:** {entry['pattern']}\n")
elif entry['type'] == 'preference':
lines.append(f"**Preference:** {entry['preference']}\n")
elif entry['type'] == 'error_fix':
lines.append(f"**Error:** {entry['error']}\n")
lines.append(f"**Fixed by:** {entry['fix']}\n")
report_path.write_text("\n".join(lines))
def find_recent_sessions(sessions_dir: Path, limit: int = 50) -> list[Path]:
"""Find up to `limit` most recent .jsonl session files."""
sessions = sorted(sessions_dir.glob("*.jsonl"), reverse=True)
return sessions[:limit] if limit > 0 else sessions
def main():
parser = argparse.ArgumentParser(description="Harvest knowledge from session transcripts")
parser.add_argument('--session', help='Single session JSONL file')
parser.add_argument('--batch', action='store_true', help='Batch mode')
parser.add_argument('--sessions-dir', default=str(Path.home() / '.hermes' / 'sessions'),
help='Directory of session files')
parser.add_argument('--output', default='knowledge/transcripts',
help='Output directory (default: knowledge/transcripts)')
parser.add_argument('--limit', type=int, default=50,
help='Max sessions to process in batch (default: 50)')
args = parser.parse_args()
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
results = []
if args.session:
messages = read_session(args.session)
session_id = Path(args.session).stem
results.append(harvest_session(messages, session_id))
elif args.batch:
sessions_dir = Path(args.sessions_dir)
sessions = find_recent_sessions(sessions_dir, args.limit)
print(f"Processing {len(sessions)} sessions...")
for sf in sessions:
messages = read_session(str(sf))
results.append(harvest_session(messages, sf.stem))
else:
parser.print_help()
sys.exit(1)
# Write outputs
json_path = output_dir / "transcript_knowledge.json"
report_path = output_dir / "transcript_report.md"
output = write_json_output(results, json_path)
write_report(results, report_path)
print(f"\nDone: {output['total_entries']} entries from {len(results)} sessions")
print(f"Output: {json_path}")
print(f"Report: {report_path}")
# Print category totals
totals = {}
for r in results:
for cat, cnt in r['counts'].items():
totals[cat] = totals.get(cat, 0) + cnt
print("\nCategory counts:")
for cat, cnt in sorted(totals.items()):
print(f" {cat}: {cnt}")
if __name__ == '__main__':
main()