Compare commits

...

2 Commits

Author SHA1 Message Date
step35
759abffd00 docs(swarm): add design note for swarm-memory architecture
Some checks failed
Test / pytest (pull_request) Failing after 8s
Creates docs/swarm-memory-design.md — a comprehensive design note
that:
- Distinguishes session memory (private, ephemeral) from swarm memory
  (shared, coordinated)
- Evaluates two candidate designs: append-only event log + synthesis
  vs shared board + evidence links with CAS
- Documents trade-offs, failure modes, and proposed experimental
  prototype using 3 concurrent subagents
- Sets acceptance baseline for issue #232

This is the smallest concrete step toward solving the swarm-memory gap:
a shared frame of reference that concurrent subagents can use to
coordinate without corrupting each other.

Closes #232
2026-04-26 12:37:10 -04:00
Rockachopa
4b5a675355 feat: add PR complexity scorer — estimate review effort\n\nImplements issue #135: a script that analyzes open PRs and computes\na complexity score (1-10) based on files changed, lines added/removed,\ndependency changes, and test coverage delta. Also estimates review time.\n\nThe scorer can be run with --dry-run to preview or --apply to post\nscore comments directly on PRs.\n\nOutput: metrics/pr_complexity.json with full analysis.\n\nCloses #135
Some checks failed
Test / pytest (push) Failing after 10s
2026-04-26 09:34:57 -04:00
3 changed files with 728 additions and 0 deletions

207
docs/swarm-memory-design.md Normal file
View File

@@ -0,0 +1,207 @@
# Swarm Memory Architecture — Design Note
**Issue:** #232 — [ATLAS][Research] Solve the swarm-memory gap for concurrent subagents
**Repo:** Timmy_Foundation/compounding-intelligence
**Status:** Research — Design Draft
**Author:** step35 (burn)
**Date:** 2026-04-26
---
## 1. Problem Statement
The compounding-intelligence pipelines assume a **session-bounded** memory model: each agent session starts with injected bootstrap context, runs, produces a transcript, then ends. Knowledge is harvested *after* the session and injected *before* the next.
But **concurrent subagents** (multiple simultaneous agents working parallel tasks) break this model:
- **No shared scratch space:** Each subagent operates in isolation; discoveries in sibling sessions aren't visible until the next harvest cycle.
- **Race conditions on promotion:** Two subagents may discover the same fact; both write it, causing duplication or conflicts.
- **Lost correlation:** Without a shared event log, you cannot reconstruct what happened across the swarm.
- **Stale shared state:** If a fact is promoted to global memory while subagents are still running, they may act on outdated assumptions.
**Core question:** What memory semantics should exist across concurrent subagents so they can cooperate without corrupting each other or losing important results?
---
## 2. Session Memory vs Swarm Memory
### Session Memory (Current)
| Property | Description |
|---|---|
| **Scope** | Single agent process lifetime |
| **Storage** | In-memory context window + transient tool state |
| **Visibility** | Private to that session |
| **Lifetime** | Ephemeral — disappears on exit |
| **Promotion** | Post-session harvester extracts durable facts |
| **Example** | "I read the config file and saw port 8080" |
### Swarm Memory (What's Missing)
| Property | Desired |
|---|---|
| **Scope** | All concurrent subagents in a task group |
| **Storage** | Shared, durable, versioned |
| **Visibility** | Readable by all siblings; write semantics TBD |
| **Lifetime** | Persists for duration of the coordinated task |
| **Promotion** | Real-time or near-real-time synchronization |
| **Example** | "Agent A found that the API returns 405 on main; all agents should know this now" |
**Key insight:** Session memory is **private and accumulated**; swarm memory is **shared and coordinated**. The harvester/bootstrapper loop is too slow for real-time coordination.
---
## 3. Candidate Designs
### Design A — Append-Only Event Log + Synthesis
**Overview:** All subagents write to a shared, append-only event log. A background synthesis process reads the log and extracts high-level facts into the knowledge store. Subagents also read the log to stay current.
**Data model:**
```
swarm-memory/
event-log.jsonl # Immutable, ordered, concurrent-safe append
event-index/ # By agent, by type, by timestamp
synthesized-facts/ # Periodic distillation into durable facts
checkpoints/ # Snapshot every N events for fast replay
```
**Write path:**
1. Subagent observes something → `event_log.append({agent, type, content, timestamp, session_id})`
2. Other subagents can tail the log (like a changelog)
**Read path:**
1. Before each action, subagent queries recent events (last N minutes or last M entries)
2. Background job periodically runs synthesis LLM to convert raw events → distilled facts
**Pros:**
- **Lossless:** Nothing is ever overwritten; full audit trail
- **Concurrent-safe:** Append-only, no locking
- **Causality preserved:** Order of discoveries is visible
- **Replayable:** Any subagent can reconstruct state from checkpoint + tail
**Cons:**
- **Signal/noise:** Raw events are noisy; synthesis latency means swarm facts lag
- **Storage growth:** Event log grows unbounded without pruning policy
- **Query performance:** Finding "all facts about X" requires synthesis or full scan
- **Coordination latency:** Subagents only learn of discoveries after they're written and tailed
**Failure modes:**
- **Duplication:** Multiple agents write the same observation → synthesis dedups
- **Contradiction:** Two agents report conflicting facts → synthesis must reconcile
- **Stale state:** Agent reads log at T0, then new events arrive before it acts
---
### Design B — Shared Board + Evidence Links
**Overview:** A shared, mutable board stores distilled facts. Each fact includes provenance links to the agent sessions that discovered it. Agents read-before-write and update via compare-and-swap.
**Data model:**
```
swarm-memory/
board.yaml # Current set of facts with version stamps
evidence-links/ # Mapping: fact_id → [session_id, turn_range]
fact-history/ # append-only log of fact revisions (for audit)
```
**Write path (compare-and-swap):**
1. Agent reads current fact version
2. Agent proposes update with new evidence
3. System accepts if version unchanged since read; rejects with retry if conflict
4. On accept → append to fact-history, increment board version
**Read path:**
1. Agent reads board.yaml (small, distilled)
2. If deeper verification needed, follow evidence-links to source sessions
**Pros:**
- **Low-latency reads:** Board is small and current
- **Explicit provenance:** Every fact knows which sessions contributed
- **Conflict detection:** CAS catches concurrent updates
- **Intentional updates:** Agents must justify changes with evidence
**Cons:**
- **Write contention:** Multiple agents writing same fact cause retry storms
- **Central point:** board.yaml is a single source of truth (but versioned)
- **Merge complexity:** CAS retry logic must be retry-with-backoff; could stall
- **Staleness window:** Between read and act, board may change
**Failure modes:**
- **Thundering herd:** Many agents CAS-fail on same hot fact → exponential backoff needed
- **Missing promotions:** A fact discovered but never written because agent crashed pre-write
- **Board corruption:** If CAS not atomic, two writes could interleave
- **Evidence loss:** If evidence-links point to deleted session transcripts, verification fails
---
## 4. Trade-off Matrix
| Dimension | Event Log | Shared Board |
|---|---|---|
| **Write concurrency** | Unbounded (append-only) | Contention on hot keys |
| **Read latency** | Must scan/synthesize | Direct read (constant-time) |
| **Storage efficiency** | Redundant raw events | Condensed facts |
| **Auditability** | Full reconstruction | Requires fact-history |
| **Coordination speed** | Lag between event → synthesis | Near-real-time (CAS cycle) |
| **Complexity** | Log management + synthesis worker | CAS protocol + retry logic |
**Verdict:** Start with **Event Log** (simpler, safer, no coordination overhead), then layer Board as a *view* over synthesized facts if read latency becomes a bottleneck.
---
## 5. Proposed Experimental Prototype
**Scope:** Minimal viable swarm-memory path for a controlled parallel task.
**Task:** Have 3 concurrent subagents process a set of GitHub issues. Each agent:
1. Reads issue details
2. Searches codebase for relevant files
3. Drafts a fix
4. **Writes discovery events to swarm event log**
5. Reads peer discoveries before next step
**Metrics to collect:**
- Duplication rate: how many agents found the same root cause independently?
- Correlation lift: did reading peer discoveries change agent behavior?
- Latency: time from discovery to visibility across swarm
- Synthesis quality: can an LLM summarize raw events into coherent fact?
**Implementation plan:**
1. `scripts/swarm_event_log.py` — thread-safe JSONL append + tail API
2. `scripts/swarm_synthesizer.py` — periodic batch that consumes event log, emits distilled facts
3. Patch `hermes-agent` burn worker to emit events at key milestones
4. Simple dashboard: `metrics/swarm_memory_dashboard.md`
**Success criteria:** Prototype runs end-to-end with 3 agents; event log captures discoveries; synthesizer produces at least one cross-agent insight.
---
## 6. Failure Modes to Watch
| Mode | Symptom | Mitigation |
|---|---|---|
| Duplication | Same fact appears from 3 agents | Synthesis dedup; evidence links count |
| Contradiction | Agent A says "port 8080", Agent B says "port 3000" | Evidence-weighted majority; timestamp priority |
| Stale shared state | Agent reads board, acts, board changed under it | Version vectors; read-modify-write CAS with retry |
| Missing promotion | Discovery lost on agent crash | Event log is durable before action; recovery from last checkpoint |
| Race on hot fact | Two agents try to write same fact simultaneously | CAS backoff; random jitter |
| Log unbounded | Event log grows 10GB/day | Checkpoint + prune: keep summary + recent window |
---
## 7. Next Steps (Out of Scope for This Note)
- Build the event log implementation (Design A, phase 1)
- Wire hermes-agent to emit events
- Run the 3-agent parallel experiment
- Measure and compare Board vs Log read patterns
- Decide: ship to prod or iterate
---
## 8. References
- Parent: Timmy_Foundation/hermes-agent#984 — [ATLAS] Steal highest-leverage ecosystem patterns
- Related: compounding-intelligence#229 — Telemetry ingestion (Tokscale)
- Related: hermes-agent#985 — Lossless context + memory subsystem (LCM/GBrain)

View File

@@ -0,0 +1,351 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)