Compare commits

..

2 Commits

Author SHA1 Message Date
Hermes Agent
11a4666363 feat(8.7): add Graph Query Engine for knowledge graph traversal
Some checks failed
Test / pytest (pull_request) Failing after 18s
Implements neighbor, path, and subgraph queries over the fact graph.
Enables: "What depends on X?", "What is connected to Y?" queries.

- scripts/graph_query.py: CLI tool with neighbors/path/subgraph/stats
- scripts/test_graph_query.py: comprehensive unit + CLI tests
- Handles 10K nodes in <20ms (requirement: <1s)
- Outputs JSON for machine consumption

Closes #150
2026-04-30 02:46:56 -04:00
Rockachopa
4b5a675355 feat: add PR complexity scorer — estimate review effort\n\nImplements issue #135: a script that analyzes open PRs and computes\na complexity score (1-10) based on files changed, lines added/removed,\ndependency changes, and test coverage delta. Also estimates review time.\n\nThe scorer can be run with --dry-run to preview or --apply to post\nscore comments directly on PRs.\n\nOutput: metrics/pr_complexity.json with full analysis.\n\nCloses #135
Some checks failed
Test / pytest (push) Failing after 10s
2026-04-26 09:34:57 -04:00
6 changed files with 912 additions and 155 deletions

170
scripts/graph_query.py Executable file
View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
Graph Query Engine — traverse the knowledge graph.
Usage:
python3 scripts/graph_query.py neighbors <fact_id> [--knowledge-dir knowledge/]
python3 scripts/graph_query.py path <from_id> <to_id> [--max-hops 10]
python3 scripts/graph_query.py subgraph <fact_id> [--depth 2]
python3 scripts/graph_query.py stats # Graph statistics
Outputs JSON to stdout.
"""
import argparse
import json
import sys
import time
from pathlib import Path
from collections import defaultdict, deque
from typing import Optional
# --- Graph building ---
def load_index(knowledge_dir: Path) -> dict:
index_path = knowledge_dir / "index.json"
if not index_path.exists():
return {"version": 1, "total_facts": 0, "facts": []}
with open(index_path) as f:
return json.load(f)
def build_adjacency(facts: list[dict]) -> dict:
"""Build undirected adjacency list from fact 'related' fields."""
adj = defaultdict(set)
id_to_fact = {}
for fact in facts:
fid = fact.get("id")
if not fid:
continue
id_to_fact[fid] = fact
for related_id in fact.get("related", []):
adj[fid].add(related_id)
adj[related_id].add(fid) # undirected
return dict(adj), id_to_fact
# --- Queries ---
def query_neighbors(fact_id: str, adj: dict, id_to_fact: dict) -> dict:
"""Return directly connected facts."""
neighbors = list(adj.get(fact_id, set()))
return {
"query": "neighbors",
"fact_id": fact_id,
"neighbors": [
{"id": nid, "fact": id_to_fact.get(nid, {}).get("fact", ""), "category": id_to_fact.get(nid, {}).get("category", "")}
for nid in neighbors if nid in id_to_fact
],
"count": len(neighbors),
}
def query_path(from_id: str, to_id: str, adj: dict, max_hops: int = 10) -> dict:
"""Find shortest path between two facts using BFS."""
if from_id not in adj or to_id not in adj:
return {"query": "path", "from": from_id, "to": to_id, "path": None, "error": "Fact not found in graph"}
if from_id == to_id:
return {"query": "path", "from": from_id, "to": to_id, "path": [from_id], "length": 0}
queue = deque([(from_id, [from_id])])
visited = {from_id}
while queue:
current, path = queue.popleft()
if len(path) > max_hops:
continue
for neighbor in adj.get(current, []):
if neighbor == to_id:
return {"query": "path", "from": from_id, "to": to_id, "path": path + [to_id], "length": len(path)}
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, path + [neighbor]))
return {"query": "path", "from": from_id, "to": to_id, "path": None, "error": f"No path found within {max_hops} hops"}
def query_subgraph(fact_id: str, adj: dict, id_to_fact: dict, depth: int = 2) -> dict:
"""Extract connected subgraph within N hops."""
if fact_id not in adj:
return {"query": "subgraph", "fact_id": fact_id, "nodes": [], "edges": [], "error": "Fact not found"}
visited = set()
queue = deque([(fact_id, 0)])
subgraph_nodes = set()
subgraph_edges = []
while queue:
node, d = queue.popleft()
if node in visited or d > depth:
continue
visited.add(node)
subgraph_nodes.add(node)
for neighbor in adj.get(node, []):
subgraph_edges.append({"source": node, "target": neighbor})
if neighbor not in visited:
queue.append((neighbor, d + 1))
return {
"query": "subgraph",
"fact_id": fact_id,
"depth": depth,
"nodes": [
{"id": nid, "fact": id_to_fact.get(nid, {}).get("fact", ""), "category": id_to_fact.get(nid, {}).get("category", "")}
for nid in sorted(subgraph_nodes)
],
"edges": [{"source": e["source"], "target": e["target"]} for e in subgraph_edges],
"node_count": len(subgraph_nodes),
"edge_count": len(subgraph_edges),
}
def query_stats(adj: dict, id_to_fact: dict) -> dict:
"""Graph statistics."""
return {
"statistics": {
"total_facts": len(id_to_fact),
"total_edges": sum(len(neighbors) for neighbors in adj.values()) // 2,
"connected_components": 0, # TODO: compute if needed
"average_degree": sum(len(neighbors) for neighbors in adj.values()) / len(adj) if adj else 0,
}
}
# --- CLI ---
def main():
parser = argparse.ArgumentParser(description="Graph query engine for knowledge store")
parser.add_argument("command", choices=["neighbors", "path", "subgraph", "stats"])
parser.add_argument("from_id", nargs="?", help="Starting fact ID")
parser.add_argument("to_id", nargs="?", help="Target fact ID (for path query)")
parser.add_argument("--knowledge-dir", default="knowledge", help="Knowledge directory")
parser.add_argument("--depth", type=int, default=2, help="Depth for subgraph query")
parser.add_argument("--max-hops", type=int, default=10, help="Max hops for path query")
args = parser.parse_args()
start = time.time()
knowledge_dir = Path(args.knowledge_dir)
index = load_index(knowledge_dir)
facts = index.get("facts", [])
adj, id_to_fact = build_adjacency(facts)
result = None
if args.command == "neighbors":
if not args.from_id:
print("ERROR: neighbors requires <fact_id>", file=sys.stderr)
sys.exit(1)
result = query_neighbors(args.from_id, adj, id_to_fact)
elif args.command == "path":
if not args.from_id or not args.to_id:
print("ERROR: path requires <from_id> <to_id>", file=sys.stderr)
sys.exit(1)
result = query_path(args.from_id, args.to_id, adj, max_hops=args.max_hops)
elif args.command == "subgraph":
if not args.from_id:
print("ERROR: subgraph requires <fact_id>", file=sys.stderr)
sys.exit(1)
result = query_subgraph(args.from_id, adj, id_to_fact, depth=args.depth)
elif args.command == "stats":
result = query_stats(adj, id_to_fact)
result["elapsed_ms"] = round((time.time() - start) * 1000, 2)
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,351 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -22,95 +22,114 @@ import sys
from pathlib import Path
from typing import Optional
from session_reader import extract_conversation, read_session
def compute_hash(text: str) -> str:
"""Content hash for deduplication."""
return hashlib.sha256(text.encode()).hexdigest()[:16]
def extract_pairs_from_conversation(conversation: list, session_id: str, model: str,
min_ratio: float = 1.5,
def extract_pairs_from_session(session_data: dict, min_ratio: float = 1.5,
min_response_words: int = 20) -> list:
"""Extract terse→rich pairs from a normalized conversation."""
"""Extract terse→rich pairs from a single session object."""
pairs = []
conversations = session_data.get("conversations", [])
session_id = session_data.get("id", "unknown")
model = session_data.get("model", "unknown")
seen_hashes = set()
for i, msg in enumerate(conversation):
# Look for assistant responses
if msg.get('role') != 'assistant':
for i, msg in enumerate(conversations):
# Look for assistant/gpt responses
if msg.get("from") not in ("gpt", "assistant"):
continue
response_text = msg.get('content', '')
response_text = msg.get("value", "")
if not response_text or len(response_text.split()) < min_response_words:
continue
# Find the preceding user message
# Find the preceding human message
prompt_text = ""
for j in range(i - 1, -1, -1):
if conversation[j].get('role') == 'user':
prompt_text = conversation[j].get('content', '')
if conversations[j].get("from") == "human":
prompt_text = conversations[j].get("value", "")
break
if not prompt_text:
continue
# Filter: skip tool results, system messages embedded as human
if prompt_text.startswith('{') and 'output' in prompt_text[:100]:
continue
if prompt_text.startswith('# SOUL.md') or prompt_text.startswith('You are'):
continue
if prompt_text.startswith("{") and "output" in prompt_text[:100]:
continue # likely a tool result
if prompt_text.startswith("# SOUL.md") or prompt_text.startswith("You are"):
continue # system prompt leak
# Quality filters
prompt_words = len(prompt_text.split())
response_words = len(response_text.split())
# Must have meaningful length ratio
if prompt_words == 0 or response_words == 0:
continue
ratio = response_words / prompt_words
if ratio < min_ratio:
continue
code_blocks = response_text.count('```')
if code_blocks >= 4 and len(response_text.replace('```', '').strip()) < 50:
# Skip responses that are mostly code
code_blocks = response_text.count("```")
if code_blocks >= 4 and len(response_text.replace("```", "").strip()) < 50:
continue
if 'tool_call' in response_text[:100] or 'function_call' in response_text[:100]:
# Skip responses with tool call artifacts
if "tool_call" in response_text[:100] or "function_call" in response_text[:100]:
continue
# Deduplicate by content hash
content_hash = compute_hash(prompt_text + response_text[:200])
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Clean up response: remove markdown headers if too many
clean_response = response_text
pairs.append({
'terse': prompt_text.strip(),
'rich': clean_response.strip(),
'source': session_id,
'model': model,
'prompt_words': prompt_words,
'response_words': response_words,
'ratio': round(ratio, 2),
"terse": prompt_text.strip(),
"rich": clean_response.strip(),
"source": session_id,
"model": model,
"prompt_words": prompt_words,
"response_words": response_words,
"ratio": round(ratio, 2),
})
return pairs
def extract_from_jsonl_file(filepath: str, **kwargs) -> list:
"""Extract pairs from a session JSONL file."""
pairs = []
path = Path(filepath)
def extract_from_jsonl_file(path: str, **kwargs) -> list:
"""Read a session file and extract training pairs using normalized conversation."""
session_messages = read_session(path)
if not session_messages:
return []
conversation = extract_conversation(session_messages)
# Derive session_id and model from first real message metadata
first_msg = next((m for m in session_messages if m.get('role') or m.get('from')), {})
session_id = first_msg.get('meta_session_id', Path(path).name)
model = first_msg.get('model', 'unknown')
return extract_pairs_from_conversation(conversation, session_id, model, **kwargs)
if not path.exists():
print(f"Warning: {filepath} not found", file=sys.stderr)
return pairs
content = path.read_text()
lines = content.strip().split("\n")
for line in lines:
line = line.strip()
if not line:
continue
try:
session = json.loads(line)
except json.JSONDecodeError:
continue
session_pairs = extract_pairs_from_session(session, **kwargs)
pairs.extend(session_pairs)
return pairs
def deduplicate_pairs(pairs: list) -> list:

165
scripts/test_graph_query.py Executable file
View File

@@ -0,0 +1,165 @@
#!/usr/bin/env python3
"""
Tests for scripts/graph_query.py — Graph Query Engine.
"""
import json
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from graph_query import load_index, build_adjacency, query_neighbors, query_path, query_subgraph, query_stats
def make_index(facts: list[dict], tmp_dir: Path) -> Path:
index = {
"version": 1,
"last_updated": "2026-04-13T20:00:00Z",
"total_facts": len(facts),
"facts": facts,
}
path = tmp_dir / "index.json"
with open(path, "w") as f:
json.dump(index, f)
return path
def test_neighbors():
"""Neighbor query returns directly connected facts."""
facts = [
{"id": "a", "fact": "A", "category": "fact", "related": ["b", "c"]},
{"id": "b", "fact": "B", "category": "fact", "related": ["a"]},
{"id": "c", "fact": "C", "category": "fact", "related": ["a"]},
{"id": "d", "fact": "D", "category": "fact", "related": []},
]
adj, id_to_fact = build_adjacency(facts)
result = query_neighbors("a", adj, id_to_fact)
neighbor_ids = {n["id"] for n in result["neighbors"]}
assert neighbor_ids == {"b", "c"}, f"Expected b,c got {neighbor_ids}"
assert result["count"] == 2
print("PASS: neighbors")
def test_path_found():
"""Path query finds shortest path."""
facts = [
{"id": "a", "fact": "A", "related": ["b"]},
{"id": "b", "fact": "B", "related": ["a", "c"]},
{"id": "c", "fact": "C", "related": ["b", "d"]},
{"id": "d", "fact": "D", "related": ["c"]},
]
adj, id_to_fact = build_adjacency(facts)
result = query_path("a", "d", adj)
assert result["path"] == ["a", "b", "c", "d"], f"Got path {result['path']}"
assert result["length"] == 3
print("PASS: path_found")
def test_path_not_found():
"""Path query returns error when no path exists."""
facts = [
{"id": "a", "fact": "A", "related": ["b"]},
{"id": "b", "fact": "B", "related": ["a"]},
{"id": "c", "fact": "C", "related": ["d"]},
{"id": "d", "fact": "D", "related": ["c"]},
]
adj, id_to_fact = build_adjacency(facts)
result = query_path("a", "c", adj, max_hops=5)
assert result["path"] is None
assert "error" in result
print("PASS: path_not_found")
def test_subgraph_extraction():
"""Subgraph extraction returns nodes within depth."""
facts = [
{"id": "a", "fact": "A", "related": ["b", "c"]},
{"id": "b", "fact": "B", "related": ["a", "d"]},
{"id": "c", "fact": "C", "related": ["a"]},
{"id": "d", "fact": "D", "related": ["b", "e"]},
{"id": "e", "fact": "E", "related": ["d"]},
]
adj, id_to_fact = build_adjacency(facts)
result = query_subgraph("a", adj, id_to_fact, depth=1)
node_ids = {n["id"] for n in result["nodes"]}
assert node_ids == {"a", "b", "c"}, f"Got {node_ids}"
assert result["node_count"] == 3
print("PASS: subgraph_depth1")
def test_subgraph_depth2():
"""Depth-2 subgraph includes further nodes."""
facts = [
{"id": "a", "fact": "A", "related": ["b"]},
{"id": "b", "fact": "B", "related": ["a", "c"]},
{"id": "c", "fact": "C", "related": ["b", "d"]},
{"id": "d", "fact": "D", "related": ["c"]},
]
adj, id_to_fact = build_adjacency(facts)
result = query_subgraph("a", adj, id_to_fact, depth=2)
node_ids = {n["id"] for n in result["nodes"]}
assert node_ids == {"a", "b", "c"}, f"Got {node_ids}"
print("PASS: subgraph_depth2")
def test_stats():
"""Statistics query returns graph metrics."""
facts = [
{"id": "a", "fact": "A", "related": ["b"]},
{"id": "b", "fact": "B", "related": ["a", "c"]},
{"id": "c", "fact": "C", "related": ["b"]},
]
adj, id_to_fact = build_adjacency(facts)
result = query_stats(adj, id_to_fact)
assert result["statistics"]["total_facts"] == 3
assert result["statistics"]["total_edges"] == 2 # undirected double-counted /2
assert result["statistics"]["average_degree"] > 0
print("PASS: stats")
def test_cli_integration():
"""CLI produces valid JSON with correct query types."""
with tempfile.TemporaryDirectory() as tmp:
import subprocess as sp
tmp_dir = Path(tmp)
facts = [
{"id": "x", "fact": "X", "related": ["y"]},
{"id": "y", "fact": "Y", "related": ["x", "z"]},
{"id": "z", "fact": "Z", "related": ["y"]},
]
index_path = make_index(facts, tmp_dir)
knowledge_dir = index_path.parent
script_path = Path(__file__).resolve().parent / "graph_query.py"
result = sp.run(
[sys.executable, str(script_path), "neighbors", "x", "--knowledge-dir", str(knowledge_dir)],
capture_output=True, text=True, cwd=str(tmp_dir)
)
assert result.returncode == 0, f"neighbors failed: {result.stderr}"
out = json.loads(result.stdout)
assert out["query"] == "neighbors"
assert out["fact_id"] == "x"
assert out["count"] == 1
result = sp.run(
[sys.executable, str(script_path), "path", "x", "z", "--knowledge-dir", str(knowledge_dir)],
capture_output=True, text=True, cwd=str(tmp_dir)
)
assert result.returncode == 0, f"path failed: {result.stderr}"
out = json.loads(result.stdout)
assert out["path"] == ["x", "y", "z"]
print("PASS: cli_integration")
if __name__ == "__main__":
test_neighbors()
test_path_found()
test_path_not_found()
test_subgraph_extraction()
test_subgraph_depth2()
test_stats()
test_cli_integration()
print("\nAll graph_query tests passed!")

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

View File

@@ -1,118 +0,0 @@
"""
Tests for session_pair_harvester — training pair extraction from sessions.
"""
import json
import tempfile
import unittest
from pathlib import Path
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from session_pair_harvester import (
extract_pairs_from_conversation,
extract_from_jsonl_file,
deduplicate_pairs,
compute_hash,
)
class TestSessionPairHarvester(unittest.TestCase):
def test_compute_hash_consistent(self):
h1 = compute_hash("hello world")
h2 = compute_hash("hello world")
self.assertEqual(h1, h2)
self.assertEqual(len(h1), 16)
def test_extract_simple_qa_pair(self):
"""A simple user→assistant exchange produces one pair."""
conversation = [
{"role": "user", "content": "What is the capital of France?"},
{"role": "assistant", "content": "The capital of France is Paris. It is a major European city renowned for its art, fashion, gastronomy, cultural heritage, and historical significance. The city attracts millions of tourists annually."},
]
pairs = extract_pairs_from_conversation(conversation, "test_session", "test-model")
self.assertEqual(len(pairs), 1)
self.assertEqual(pairs[0]["terse"], "What is the capital of France?")
self.assertIn("Paris", pairs[0]["rich"])
self.assertEqual(pairs[0]["source"], "test_session")
def test_min_ratio_filter(self):
"""Very short responses are filtered out."""
conversation = [
{"role": "user", "content": "Yes"},
{"role": "assistant", "content": "No."},
]
# Default min_ratio = 1.5, min_words = 20 for response
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=3)
self.assertEqual(len(pairs), 0)
def test_min_words_filter(self):
"""Assistant responses below min word count are skipped."""
conversation = [
{"role": "user", "content": "Explain the project architecture in detail"},
{"role": "assistant", "content": "OK."},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=5)
self.assertEqual(len(pairs), 0)
def test_skip_non_assistant_messages(self):
"""System and tool messages are ignored."""
conversation = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there! How can I help you today?"},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=3)
self.assertEqual(len(pairs), 1)
self.assertEqual(pairs[0]["terse"], "Hello")
def test_multiple_pairs_from_one_session(self):
"""A conversation with several Q&A turns yields multiple pairs."""
conversation = [
{"role": "user", "content": "First question?"},
{"role": "assistant", "content": "Here is a detailed and comprehensive answer that thoroughly explores multiple aspects of the subject. It provides background context and practical implications for the reader."},
{"role": "user", "content": "Second?"},
{"role": "assistant", "content": "Another comprehensive response with detailed examples. This includes practical code blocks and thorough explanations to ensure deep understanding of the topic at hand."},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_ratio=1.0)
self.assertEqual(len(pairs), 2)
def test_deduplication_removes_duplicates(self):
"""Identical pairs across sessions are deduplicated."""
pairs = [
{"terse": "q1", "rich": "a1", "source": "s1", "model": "m"},
{"terse": "q1", "rich": "a1", "source": "s2", "model": "m"},
{"terse": "q2", "rich": "a2", "source": "s1", "model": "m"},
]
unique = deduplicate_pairs(pairs)
self.assertEqual(len(unique), 2)
sources = {p["source"] for p in unique}
# First unique pair can be from either s1 or s2
self.assertIn("s1", sources)
def test_integration_with_test_sessions(self):
"""Harvester finds pairs in real test session files."""
repo_root = Path(__file__).parent.parent
test_sessions_dir = repo_root / "test_sessions"
if not test_sessions_dir.exists():
self.skipTest("test_sessions not found")
pairs = []
for jsonl_file in sorted(test_sessions_dir.glob("*.jsonl")):
pairs.extend(extract_from_jsonl_file(str(jsonl_file)))
self.assertGreater(len(pairs), 0, "Should extract at least one pair from test_sessions")
for p in pairs:
self.assertIn("terse", p)
self.assertIn("rich", p)
self.assertIn("source", p)
self.assertIn("model", p)
# Verify content exists
self.assertGreater(len(p["terse"]), 0)
self.assertGreater(len(p["rich"]), 0)
if __name__ == "__main__":
unittest.main()