Files
compounding-intelligence/scripts/logic_reviewer.py
STEP35 Burn Agent db264b333b
Some checks failed
Test / pytest (pull_request) Failing after 7s
chore: remove debug prints from logic_reviewer.py
2026-04-26 11:13:45 -04:00

262 lines
9.7 KiB
Python

#!/usr/bin/env python3
"""
Logic Reviewer — Scan diffs for common logic bugs in Python code.
Pipeline 6.3 for Compounding Intelligence.
Covers:
• Potential null / None attribute or item access
• Off-by-one patterns (range(len(...)) direct indexing)
• Mutable default argument anti-pattern
• Identity comparison with literals (is vs ==)
Usage:
python3 scripts/logic_reviewer.py --diff <diff_file>
python3 scripts/logic_reviewer.py --diff <diff_file> --format json
git diff | python3 scripts/logic_reviewer.py --stdin
"""
import argparse
import ast
import json
import re
import sys
from dataclasses import dataclass, asdict
from enum import Enum
from pathlib import Path
from typing import List
class Severity(Enum):
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
@dataclass
class LogicIssue:
file: str
line: int
bug_type: str
severity: str
message: str
snippet: str
def to_dict(self) -> dict:
return asdict(self)
class LogicReviewer:
"""Scan added/modified Python code for common logic errors."""
# Mutable default: def f(x=[]): or def f(x={})
MUTABLE_DEFAULT_RE = re.compile(
r'def\s+\w+\s*\([^)]*=\s*(\[\s*\]|\{\}\s*|dict\(\)|list\(\])'
)
# Identity comparison with literal value.
# Use (?!\w) at end instead of \b because literals don't end on word-chars.
IDENTITY_LITERAL_RE = re.compile(
r'\bis\s+(?:"[^"]*"|\'[^\']*\'|True|False|None)(?!\w)'
)
# Off-by-one: for i in range(len(x)): accessing x[i]
OFF_BY_ONE_RE = re.compile(
r'for\s+(\w+)\s+in\s+range\s*\(\s*len\s*\(\s*(\w+)\s*\)\s*\)\s*:'
)
def __init__(self):
self.issues: List[LogicIssue] = []
def review_hunk(self, filepath: str, hunk_lines: List[str], hunk_start_line: int):
"""Analyze a single diff hunk for logic issues."""
# Build a string of added lines only (for multi-line patterns like AST/off-by-one)
added_only = []
for line in hunk_lines:
if line.startswith('+') and not line.startswith('++'):
added_only.append(line[1:].rstrip('\n'))
else:
added_only.append('') # preserve hunk line alignment
added_text_full = '\n'.join(added_only)
for i, line in enumerate(hunk_lines):
if not line.startswith('+') or line.startswith('++'):
continue
code = line[1:].rstrip('\n')
if not code.strip():
continue
lineno = hunk_start_line + i
# --- Mutable default argument ---
if self.MUTABLE_DEFAULT_RE.search(code):
self.issues.append(LogicIssue(
file=filepath, line=lineno,
bug_type="mutable_default",
severity=Severity.MEDIUM.value,
message="Mutable default argument — creates shared state across calls",
snippet=code.strip()
))
# --- Identity comparison with literal ---
if self.IDENTITY_LITERAL_RE.search(code):
self.issues.append(LogicIssue(
file=filepath, line=lineno,
bug_type="identity_literal",
severity=Severity.LOW.value,
message="Use '==' not 'is' for value comparison with literals",
snippet=code.strip()
))
# --- Off-by-one (multi-line) ---
for match in self.OFF_BY_ONE_RE.finditer(added_text_full):
# Flag any `for i in range(len(collection))` pattern — better to use enumerate()
idx_var = match.group(1)
arr_var = match.group(2)
before = added_text_full[:match.start()]
lineno_offset = before.count('\n')
lineno = hunk_start_line + lineno_offset
self.issues.append(LogicIssue(
file=filepath, line=lineno,
bug_type="off_by_one",
severity=Severity.MEDIUM.value,
message=f"Consider enumerate({arr_var}) instead of range(len({arr_var})) to avoid off-by-one",
snippet=match.group(0).strip()
))
# --- None-attribute risk via AST ---
try:
tree = ast.parse(added_text_full)
for node in ast.walk(tree):
if isinstance(node, ast.Attribute):
# Attribute access: x.attr — check if x may be None
if isinstance(node.value, ast.Name):
varname = node.value.id
if self._var_assigned_none(added_text_full, varname):
# Get the line number for the attribute access from AST
lineno = hunk_start_line + (node.lineno - 1) if hasattr(node, 'lineno') else hunk_start_line
snippet = ast.get_source_segment(added_text_full, node)
if snippet is None:
snippet = code.strip() if 'code' in locals() else ''
self.issues.append(LogicIssue(
file=filepath, line=lineno,
bug_type="none_dereference",
severity=Severity.HIGH.value,
message=f"Potential None dereference: '{varname}' may be None before accessing attribute",
snippet=snippet.strip()
))
except (SyntaxError, ValueError):
pass # Incomplete code snippet or AST error (acceptable)
def _var_assigned_none(self, text: str, var: str) -> bool:
"""Check if `var = None` appears earlier in the same hunk."""
pattern = re.compile(rf'{re.escape(var)}\s*=\s*None\b')
return bool(pattern.search(text))
def review_diff(self, diff_text: str, filename: str = "<stdin>"):
"""Parse a unified diff and review all Python hunks."""
files = self._split_diff(diff_text)
for path, file_diff in files.items():
if not path.endswith('.py'):
continue
for hunk in file_diff['hunks']:
self.review_hunk(path, hunk['lines'], hunk['start'])
def _split_diff(self, diff: str) -> dict:
"""Minimal unified diff parser — returns {path: {hunks: [...]} }."""
files = {}
current_file = None
current_hunks = []
in_hunk = False
hunk_start = 1
hunk_lines = []
for line in diff.split('\n'):
if line.startswith('diff --git a/'):
if current_file:
files[current_file] = {'hunks': current_hunks}
parts = line.split(' b/')
current_file = parts[1] if len(parts) > 1 else None
current_hunks = []
in_hunk = False
elif line.startswith('@@'):
if in_hunk and current_file:
current_hunks.append({'start': hunk_start, 'lines': hunk_lines})
m = re.search(r'@@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))?', line)
hunk_start = int(m.group(1)) if m else 1
hunk_lines = []
in_hunk = True
elif in_hunk and current_file:
hunk_lines.append(line)
if current_file and in_hunk:
current_hunks.append({'start': hunk_start, 'lines': hunk_lines})
if current_file and current_file not in files:
files[current_file] = {'hunks': current_hunks}
return files
def to_dict(self) -> dict:
return {
'summary': {
'total_issues': len(self.issues),
'by_severity': {
'high': sum(1 for i in self.issues if i.severity == 'high'),
'medium': sum(1 for i in self.issues if i.severity == 'medium'),
'low': sum(1 for i in self.issues if i.severity == 'low'),
}
},
'findings': [i.to_dict() for i in self.issues]
}
def format_text(reviewer: LogicReviewer) -> str:
s = reviewer.to_dict()['summary']
lines = [
"Logic Review Report",
"=" * 40,
f"Total issues: {s['total_issues']}",
f" HIGH: {s['by_severity']['high']}",
f" MEDIUM: {s['by_severity']['medium']}",
f" LOW: {s['by_severity']['low']}",
""
]
if reviewer.issues:
lines.append("Findings:")
for f in reviewer.issues:
lines.append(f" [{f.severity.upper()}] {f.file}:{f.line}")
lines.append(f" {f.bug_type}: {f.message}")
lines.append(f" --> {f.snippet}")
lines.append("")
return '\n'.join(lines)
def main():
parser = argparse.ArgumentParser(description="Review code diffs for common logic errors")
parser.add_argument('--diff', type=str, help='Path to unified diff file')
parser.add_argument('--stdin', action='store_true', help='Read diff from stdin')
parser.add_argument('--format', choices=['json', 'text'], default='text', help='Output format')
parser.add_argument('--output', type=str, help='Output file (default: stdout)')
args = parser.parse_args()
if args.stdin:
diff_text = sys.stdin.read()
elif args.diff:
with open(args.diff) as f:
diff_text = f.read()
else:
parser.error("Must provide --diff or --stdin")
reviewer = LogicReviewer()
reviewer.review_diff(diff_text, args.diff or '<stdin>')
output = json.dumps(reviewer.to_dict(), indent=2) if args.format == 'json' else format_text(reviewer)
if args.output:
with open(args.output, 'w') as f:
f.write(output + '\n')
else:
print(output)
if __name__ == '__main__':
main()