Compare commits

..

1 Commits

Author SHA1 Message Date
Step35 Burn Agent
e2b1a9f8ac feat: add Review Comment Generator (Issue #126)
Some checks failed
Test / pytest (pull_request) Failing after 7s
- Introduces scripts/review_comment_generator.py: reads JSONL findings,
  deduplicates by content hash, formats as review comments, and posts
  to Gitea PR via API.
- Includes dry-run and JSON output modes.
- Comprehensive smoke test suite: 20 tests covering deduplication,
  formatting, CLI modes, and error handling — all passing.

Closes #126
2026-04-26 07:22:40 -04:00
6 changed files with 424 additions and 839 deletions

View File

@@ -1,472 +0,0 @@
# Compounding Intelligence — Scripts API Reference
*Generated: 2026-04-26 11:02 UTC*
This document auto-documents the public API surface of all scripts
in `scripts/`. Each section covers one script: module purpose,
public functions, and their signatures.
---
## `scripts/api_doc_generator.py`
API Doc Generator — Issue #98
| Function | Signature | Description |
|----------|-----------|-------------|
| `extract_functions_from_ast` | `extract_functions_from_ast(tree, file_rel)` | Extract public function names, signatures, and first-line doc summaries. |
| `parse_module` | `parse_module(filepath)` | Parse a Python file and return its module-level docstring and public functions. |
| `scan_scripts_dir` | `scan_scripts_dir(scripts_dir)` | Scan all .py files in scripts/ and extract API info. |
| `render_markdown` | `render_markdown(modules)` | Generate full docs/API.md content from the scanned modules. |
| `render_json` | `render_json(modules)` | Emit machine-readable JSON version of the API reference. |
| `main` | `main()` | - |
## `scripts/automation_opportunity_finder.py`
Automation Opportunity Finder — Scan fleet for manual processes that could be automated.
| Function | Signature | Description |
|----------|-----------|-------------|
| `analyze_cron_jobs` | `analyze_cron_jobs(hermes_home)` | Analyze cron job definitions for automation gaps. |
| `analyze_documents` | `analyze_documents(root_dirs)` | Scan documentation for manual step patterns. |
| `analyze_scripts` | `analyze_scripts(root_dirs)` | Detect repeated command sequences in scripts. |
| `analyze_session_transcripts` | `analyze_session_transcripts(session_dirs)` | Find repeated tool-call patterns in session transcripts. |
| `analyze_shell_history` | `analyze_shell_history(root_dirs)` | Find repeated shell commands from history files. |
| `deduplicate_proposals` | `deduplicate_proposals(proposals)` | Remove duplicate proposals based on title similarity. |
| `rank_proposals` | `rank_proposals(proposals)` | Sort proposals by impact * confidence (highest first). |
| `format_text_report` | `format_text_report(proposals)` | Format proposals as human-readable text. |
| `main` | `main()` | - |
## `scripts/bootstrapper.py`
Bootstrapper — assemble pre-session context from knowledge store.
| Function | Signature | Description |
|----------|-----------|-------------|
| `load_index` | `load_index(index_path)` | Load and validate the knowledge index. |
| `filter_facts` | `filter_facts(facts, repo, agent, include_global)` | Filter facts by repo, agent, and global scope. |
| `sort_facts` | `sort_facts(facts)` | Sort facts by: confidence (desc), then category priority, then fact text. |
| `load_repo_knowledge` | `load_repo_knowledge(repo)` | Load per-repo knowledge markdown if it exists. |
| `load_agent_knowledge` | `load_agent_knowledge(agent)` | Load per-agent knowledge markdown if it exists. |
| `load_global_knowledge` | `load_global_knowledge()` | Load all global knowledge markdown files. |
| `render_facts_section` | `render_facts_section(facts, category, label)` | Render a section of facts for a single category. |
| `estimate_tokens` | `estimate_tokens(text)` | Rough token estimate. |
| `truncate_to_tokens` | `truncate_to_tokens(text, max_tokens)` | Truncate text to approximately max_tokens, cutting at line boundaries. |
| `build_bootstrap_context` | `build_bootstrap_context(repo, agent, include_global, max_tokens, index_path)` | Build the full bootstrap context block. |
| `main` | `main()` | - |
## `scripts/dead_code_detector.py`
Dead Code Detector for Python Codebases
| Function | Signature | Description |
|----------|-----------|-------------|
| `is_safe_unused` | `is_safe_unused(name, filepath)` | Check if an unused name is expected to be unused. |
| `get_git_blame` | `get_git_blame(filepath, lineno)` | Get last author of a line via git blame. |
| `analyze_file` | `analyze_file(filepath)` | Analyze a single Python file for dead code. |
| `scan_repo` | `scan_repo(repo_path, exclude_patterns)` | Scan an entire repo for dead code. |
| `main` | `main()` | - |
## `scripts/dedup.py`
dedup.py — Knowledge deduplication: content hash + semantic similarity.
| Function | Signature | Description |
|----------|-----------|-------------|
| `normalize_text` | `normalize_text(text)` | Normalize text for hashing: lowercase, collapse whitespace, strip. |
| `content_hash` | `content_hash(text)` | SHA256 hash of normalized text for exact dedup. |
| `tokenize` | `tokenize(text)` | Simple tokenizer: lowercase words, 3+ chars. |
| `token_similarity` | `token_similarity(a, b)` | Token-based Jaccard similarity (0.0-1.0). |
| `quality_score` | `quality_score(fact)` | Compute quality score for merge ranking. |
| `merge_facts` | `merge_facts(keep, drop)` | Merge two near-duplicate facts, keeping higher-quality fields. |
| `dedup_facts` | `dedup_facts(facts, exact_threshold, near_threshold, dry_run)` | Deduplicate a list of knowledge facts. |
| `dedup_index_file` | `dedup_index_file(input_path, output_path, near_threshold, dry_run)` | Deduplicate an index.json file. |
| `generate_test_duplicates` | `generate_test_duplicates(n)` | Generate test facts with intentional duplicates for testing. |
| `main` | `main()` | - |
## `scripts/dependency_graph.py`
Cross-Repo Dependency Graph Builder
| Function | Signature | Description |
|----------|-----------|-------------|
| `normalize_repo_name` | `normalize_repo_name(name)` | Normalize a repo name for comparison. |
| `scan_file_for_deps` | `scan_file_for_deps(filepath, content, own_repo)` | Scan a file's content for references to other repos. |
| `scan_repo` | `scan_repo(repo_path, repo_name)` | Scan a repo directory for dependencies. |
| `detect_cycles` | `detect_cycles(graph)` | Detect circular dependencies using DFS. |
| `to_dot` | `to_dot(graph)` | Generate DOT format output. |
| `to_mermaid` | `to_mermaid(graph)` | Generate Mermaid format output. |
| `main` | `main()` | - |
## `scripts/diff_analyzer.py`
Diff Analyzer — Parse unified diffs and categorize every change.
*(no public functions — script runs as `main()` only)*
## `scripts/freshness.py`
Knowledge Freshness Cron — Detect stale entries from code changes (Issue #200)
| Function | Signature | Description |
|----------|-----------|-------------|
| `compute_file_hash` | `compute_file_hash(filepath)` | Compute SHA-256 hash of a file. Returns None if file doesn't exist. |
| `get_git_file_changes` | `get_git_file_changes(repo_path, days)` | Get files changed in git in the last N days. |
| `load_knowledge_entries` | `load_knowledge_entries(knowledge_dir)` | Load knowledge entries from YAML files in the knowledge directory. |
| `check_freshness` | `check_freshness(knowledge_dir, repo_root, days)` | Check freshness of knowledge entries against recent code changes. |
| `update_stale_hashes` | `update_stale_hashes(knowledge_dir, repo_root)` | Update hashes for stale entries. Returns count of updated entries. |
| `format_report` | `format_report(result, max_items)` | Format freshness check results as a human-readable report. |
| `main` | `main()` | - |
## `scripts/gitea_issue_parser.py`
Gitea Issue Body Parser — Extract structured data from markdown issue bodies.
| Function | Signature | Description |
|----------|-----------|-------------|
| `parse_issue_body` | `parse_issue_body(body, title, labels)` | Parse a Gitea issue markdown body into structured JSON. |
| `fetch_issue_from_url` | `fetch_issue_from_url(url)` | Fetch an issue from a Gitea API URL and parse it. |
| `main` | `main()` | - |
## `scripts/harvester.py`
harvester.py — Extract durable knowledge from Hermes session transcripts.
| Function | Signature | Description |
|----------|-----------|-------------|
| `find_api_key` | `find_api_key()` | Find API key from common locations. |
| `load_extraction_prompt` | `load_extraction_prompt()` | Load the extraction prompt template. |
| `call_llm` | `call_llm(prompt, transcript, api_base, api_key, model)` | Call the LLM API to extract knowledge from a transcript. |
| `parse_extraction_response` | `parse_extraction_response(content)` | Parse the LLM response to extract knowledge items. |
| `load_existing_knowledge` | `load_existing_knowledge(knowledge_dir)` | Load the existing knowledge index. |
| `fact_fingerprint` | `fact_fingerprint(fact)` | Generate a deduplication fingerprint for a fact. |
| `deduplicate` | `deduplicate(new_facts, existing, similarity_threshold)` | Remove duplicate facts from new_facts that already exist in the knowledge store. |
| `validate_fact` | `validate_fact(fact)` | Validate a single knowledge item has required fields. |
| `write_knowledge` | `write_knowledge(index, new_facts, knowledge_dir, source_session)` | Write new facts to the knowledge store. |
| `harvest_session` | `harvest_session(session_path, knowledge_dir, api_base, api_key, model, dry_run, min_confidence)` | Harvest knowledge from a single session. |
| `batch_harvest` | `batch_harvest(sessions_dir, knowledge_dir, api_base, api_key, model, since, limit, dry_run)` | Harvest knowledge from multiple sessions in batch. |
| `main` | `main()` | - |
## `scripts/improvement_proposals.py`
Improvement Proposal Generator for compounding-intelligence.
| Function | Signature | Description |
|----------|-----------|-------------|
| `analyze_sessions` | `analyze_sessions(sessions)` | Analyze session data to find waste patterns. |
| `generate_proposals` | `generate_proposals(patterns, hourly_rate, implementation_overhead)` | Generate improvement proposals from waste patterns. |
| `format_proposals_markdown` | `format_proposals_markdown(proposals, patterns, generated_at)` | Format proposals as a markdown document. |
| `format_proposals_json` | `format_proposals_json(proposals)` | Format proposals as JSON. |
| `main` | `main()` | - |
## `scripts/knowledge_gap_identifier.py`
Knowledge Gap Identifier — Pipeline 10.7
*(no public functions — script runs as `main()` only)*
## `scripts/knowledge_staleness_check.py`
Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes.
| Function | Signature | Description |
|----------|-----------|-------------|
| `compute_file_hash` | `compute_file_hash(filepath)` | Compute SHA-256 hash of a file. Returns None if file doesn't exist. |
| `check_staleness` | `check_staleness(index_path, repo_root)` | Check all entries in knowledge index for staleness. |
| `fix_hashes` | `fix_hashes(index_path, repo_root)` | Add hashes to entries missing them. Returns count of fixed entries. |
| `main` | `main()` | - |
## `scripts/perf_bottleneck_finder.py`
Performance Bottleneck Finder — Identify slow tests, builds, and CI steps.
| Function | Signature | Description |
|----------|-----------|-------------|
| `find_slow_tests_pytest` | `find_slow_tests_pytest(repo_path)` | Run pytest --durations and parse slow tests. |
| `find_slow_tests_by_scan` | `find_slow_tests_by_scan(repo_path)` | Scan test files for patterns that indicate slow tests. |
| `analyze_build_artifacts` | `analyze_build_artifacts(repo_path)` | Find large build artifacts that slow down builds. |
| `analyze_makefile_targets` | `analyze_makefile_targets(repo_path)` | Analyze Makefile for potentially slow targets. |
| `analyze_github_actions` | `analyze_github_actions(repo_path)` | Analyze GitHub Actions workflow files for inefficiencies. |
| `analyze_gitea_ci` | `analyze_gitea_ci(repo_path)` | Analyze Gitea/Drone CI config files. |
| `find_slow_imports` | `find_slow_imports(repo_path)` | Find Python files with heavy import chains. |
| `severity_sort_key` | `severity_sort_key(b)` | Sort by severity then duration. |
| `generate_report` | `generate_report(repo_path)` | Run all analyses and generate a performance report. |
| `format_markdown` | `format_markdown(report)` | Format report as markdown. |
| `main` | `main()` | - |
## `scripts/priority_rebalancer.py`
Priority Rebalancer — Re-evaluate issue priorities based on accumulated data.
| Function | Signature | Description |
|----------|-----------|-------------|
| `collect_knowledge_signals` | `collect_knowledge_signals(knowledge_dir)` | Analyze knowledge store for coverage gaps and staleness. |
| `collect_staleness_signals` | `collect_staleness_signals(scripts_dir, knowledge_dir)` | Run staleness checker if available. |
| `collect_metrics_signals` | `collect_metrics_signals(metrics_dir)` | Analyze metrics directory for pipeline health. |
| `extract_priority` | `extract_priority(labels)` | Extract priority level from issue labels. |
| `compute_issue_score` | `compute_issue_score(issue, repo, signals, now)` | Compute priority score for a single issue. |
| `generate_report` | `generate_report(scores, signals, org, repos_scanned)` | Generate the full priority report. |
| `generate_markdown_report` | `generate_markdown_report(report)` | Generate human-readable markdown report. |
| `main` | `main()` | - |
## `scripts/refactoring_opportunity_finder.py`
Finds refactoring opportunities in codebases
| Function | Signature | Description |
|----------|-----------|-------------|
| `compute_file_complexity` | `compute_file_complexity(filepath)` | Compute cyclomatic complexity for a Python file. |
| `calculate_refactoring_score` | `calculate_refactoring_score(metrics)` | Calculate a refactoring priority score (0-100) based on file metrics. |
| `scan_directory` | `scan_directory(directory, extensions)` | Scan directory for source files. |
| `generate_proposals` | `generate_proposals(directory, min_score)` | Generate refactoring proposals by analyzing source files. |
| `main` | `main()` | - |
## `scripts/sampler.py`
sampler.py — Score and rank sessions by harvest value.
| Function | Signature | Description |
|----------|-----------|-------------|
| `scan_session_fast` | `scan_session_fast(path)` | Extract scoring metadata from a session without parsing the full JSONL. |
| `parse_session_timestamp` | `parse_session_timestamp(filename)` | Parse timestamp from session filename. |
| `score_session` | `score_session(meta, now, seen_repos)` | Score a session for harvest value. Returns (score, breakdown). |
| `main` | `main()` | - |
## `scripts/session_metadata.py`
session_metadata.py - Extract structured metadata from Hermes session transcripts.
| Function | Signature | Description |
|----------|-----------|-------------|
| `extract_session_metadata` | `extract_session_metadata(file_path)` | Extract structured metadata from a Hermes session JSONL transcript. |
| `process_session_directory` | `process_session_directory(directory_path, output_file)` | Process all JSONL files in a directory. |
| `main` | `main()` | CLI entry point. |
## `scripts/session_pair_harvester.py`
Session Transcript → Training Pair Harvester
| Function | Signature | Description |
|----------|-----------|-------------|
| `compute_hash` | `compute_hash(text)` | Content hash for deduplication. |
| `extract_pairs_from_session` | `extract_pairs_from_session(session_data, min_ratio, min_response_words)` | Extract terse→rich pairs from a single session object. |
| `extract_from_jsonl_file` | `extract_from_jsonl_file(filepath, **kwargs)` | Extract pairs from a session JSONL file. |
| `deduplicate_pairs` | `deduplicate_pairs(pairs)` | Remove duplicate pairs across files. |
| `main` | `main()` | - |
## `scripts/session_reader.py`
session_reader.py — Parse Hermes session JSONL transcripts.
| Function | Signature | Description |
|----------|-----------|-------------|
| `read_session` | `read_session(path)` | Read a session JSONL file and return all messages as a list. |
| `read_session_iter` | `read_session_iter(path)` | Iterate over session messages without loading all into memory. |
| `extract_conversation` | `extract_conversation(messages)` | Extract user/assistant conversation turns, skipping tool-only messages. |
| `truncate_for_context` | `truncate_for_context(messages, head, tail)` | Truncate long sessions: keep first N + last N messages. |
| `messages_to_text` | `messages_to_text(messages)` | Convert message list to plain text for LLM consumption. |
| `get_session_metadata` | `get_session_metadata(path)` | Extract metadata from a session file (first message often has config info). |
## `scripts/test_automation_opportunity_finder.py`
Tests for scripts/automation_opportunity_finder.py — 8 tests.
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_analyze_cron_jobs_no_file` | `test_analyze_cron_jobs_no_file()` | Returns empty list when no cron jobs file exists. |
| `test_analyze_cron_jobs_disabled` | `test_analyze_cron_jobs_disabled()` | Detects disabled cron jobs. |
| `test_analyze_cron_jobs_errors` | `test_analyze_cron_jobs_errors()` | Detects cron jobs with error status. |
| `test_analyze_documents_finds_todos` | `test_analyze_documents_finds_todos()` | Detects TODO markers in documents. |
| `test_analyze_scripts_repeated_commands` | `test_analyze_scripts_repeated_commands()` | Detects repeated shell commands across scripts. |
| `test_analyze_session_transcripts` | `test_analyze_session_transcripts()` | Detects repeated tool-call sequences. |
| `test_deduplicate_proposals` | `test_deduplicate_proposals()` | Deduplicates proposals with similar titles. |
| `test_rank_proposals` | `test_rank_proposals()` | Ranks proposals by impact * confidence. |
## `scripts/test_bootstrapper.py`
Tests for bootstrapper.py — context assembly from knowledge store.
| Function | Signature | Description |
|----------|-----------|-------------|
| `make_index` | `make_index(facts, tmp_dir)` | Create a temporary index.json with given facts. |
| `test_empty_index` | `test_empty_index()` | Empty knowledge store produces graceful output. |
| `test_filter_by_repo` | `test_filter_by_repo()` | Filter facts by repository. |
| `test_filter_by_agent` | `test_filter_by_agent()` | Filter facts by agent type. |
| `test_no_global_flag` | `test_no_global_flag()` | Excluding global facts works. |
| `test_sort_by_confidence` | `test_sort_by_confidence()` | Facts sort by confidence descending. |
| `test_sort_pitfalls_first` | `test_sort_pitfalls_first()` | Pitfalls sort before facts at same confidence. |
| `test_truncate_to_tokens` | `test_truncate_to_tokens()` | Truncation cuts at line boundary. |
| `test_estimate_tokens` | `test_estimate_tokens()` | Token estimation is reasonable. |
| `test_build_full_context` | `test_build_full_context()` | Full context with facts renders correctly. |
| `test_max_tokens_respected` | `test_max_tokens_respected()` | Output respects max_tokens limit. |
| `test_missing_index_graceful` | `test_missing_index_graceful()` | Missing index.json doesn't crash. |
## `scripts/test_diff_analyzer.py`
Tests for scripts/diff_analyzer.py — 10 tests.
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_empty` | `test_empty()` | - |
| `test_addition` | `test_addition()` | - |
| `test_deletion` | `test_deletion()` | - |
| `test_modification` | `test_modification()` | - |
| `test_rename` | `test_rename()` | - |
| `test_multiple_files` | `test_multiple_files()` | - |
| `test_binary` | `test_binary()` | - |
| `test_to_dict` | `test_to_dict()` | - |
| `test_context_only` | `test_context_only()` | - |
| `test_multi_hunk` | `test_multi_hunk()` | - |
| `run_all` | `run_all()` | - |
## `scripts/test_gitea_issue_parser.py`
Tests for scripts/gitea_issue_parser.py
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_basic_parsing` | `test_basic_parsing()` | - |
| `test_numbered_criteria` | `test_numbered_criteria()` | - |
| `test_epic_ref_from_body` | `test_epic_ref_from_body()` | - |
| `test_empty_body` | `test_empty_body()` | - |
| `test_no_sections` | `test_no_sections()` | - |
| `test_multiple_sections` | `test_multiple_sections()` | - |
| `run_all` | `run_all()` | - |
## `scripts/test_harvest_prompt.py`
Test harness for knowledge extraction prompt.
| Function | Signature | Description |
|----------|-----------|-------------|
| `validate_knowledge_item` | `validate_knowledge_item(item, idx)` | Validate a single knowledge item. Returns list of errors. |
| `validate_extraction` | `validate_extraction(data)` | Validate a full extraction result. Returns (is_valid, errors, warnings). |
| `validate_transcript_coverage` | `validate_transcript_coverage(data, transcript)` | Check that extracted facts are actually supported by the transcript. |
| `run_tests` | `run_tests()` | Run the built-in test suite. |
| `validate_file` | `validate_file(filepath)` | Validate an existing extraction JSON file. |
## `scripts/test_harvest_prompt_comprehensive.py`
Comprehensive tests for knowledge extraction prompt.
| Function | Signature | Description |
|----------|-----------|-------------|
| `check_prompt_structure` | `check_prompt_structure()` | - |
| `check_confidence_scoring` | `check_confidence_scoring()` | - |
| `check_example_quality` | `check_example_quality()` | - |
| `check_constraint_coverage` | `check_constraint_coverage()` | - |
| `check_test_sessions` | `check_test_sessions()` | - |
| `test_prompt_structure` | `test_prompt_structure()` | - |
| `test_confidence_scoring` | `test_confidence_scoring()` | - |
| `test_example_quality` | `test_example_quality()` | - |
| `test_constraint_coverage` | `test_constraint_coverage()` | - |
| `test_test_sessions` | `test_test_sessions()` | - |
## `scripts/test_harvester_pipeline.py`
Smoke test for harvester pipeline — verifies the full chain:
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_session_reader` | `test_session_reader()` | Test that session_reader parses JSONL correctly. |
| `test_validate_fact` | `test_validate_fact()` | Test fact validation. |
| `test_deduplicate` | `test_deduplicate()` | Test deduplication. |
| `test_knowledge_store_roundtrip` | `test_knowledge_store_roundtrip()` | Test loading and writing knowledge index. |
| `test_full_chain_no_llm` | `test_full_chain_no_llm()` | Test the full pipeline minus the LLM call. |
## `scripts/test_improvement_proposals.py`
Tests for scripts/improvement_proposals.py — 15 tests.
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_empty_sessions` | `test_empty_sessions()` | - |
| `test_no_patterns_on_clean_sessions` | `test_no_patterns_on_clean_sessions()` | - |
| `test_repeated_error_detection` | `test_repeated_error_detection()` | Same error across 3+ sessions triggers pattern. |
| `test_repeated_error_threshold` | `test_repeated_error_threshold()` | 2 occurrences should NOT trigger (threshold is 3). |
| `test_slow_tool_detection` | `test_slow_tool_detection()` | Tool with avg latency > 5000ms across 5+ calls. |
| `test_fast_tool_not_flagged` | `test_fast_tool_not_flagged()` | Tool under 5000ms avg should not trigger. |
| `test_failed_retry_detection` | `test_failed_retry_detection()` | 3+ consecutive calls to same tool triggers retry pattern. |
| `test_manual_process_detection` | `test_manual_process_detection()` | 10+ tool calls with <= 3 unique tools. |
| `test_generate_proposals_from_patterns` | `test_generate_proposals_from_patterns()` | Proposals generated from waste patterns. |
| `test_proposal_roi_positive` | `test_proposal_roi_positive()` | ROI weeks should be a positive number for recoverable time. |
| `test_proposals_sorted_by_impact` | `test_proposals_sorted_by_impact()` | Proposals should be sorted by monthly hours saved (descending). |
| `test_format_markdown` | `test_format_markdown()` | Markdown output should contain expected sections. |
| `test_format_json` | `test_format_json()` | JSON output should be valid and parseable. |
| `test_normalize_error` | `test_normalize_error()` | Error normalization should remove paths and hashes. |
| `test_cli_integration` | `test_cli_integration()` | End-to-end test: write input JSON, run script, check output. |
| `run_all` | `run_all()` | - |
## `scripts/test_knowledge_staleness.py`
Tests for scripts/knowledge_staleness_check.py — 8 tests.
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_fresh_entry` | `test_fresh_entry()` | - |
| `test_stale_entry` | `test_stale_entry()` | - |
| `test_missing_source` | `test_missing_source()` | - |
| `test_no_hash` | `test_no_hash()` | - |
| `test_no_source_field` | `test_no_source_field()` | - |
| `test_fix_hashes` | `test_fix_hashes()` | - |
| `test_empty_index` | `test_empty_index()` | - |
| `test_compute_hash_nonexistent` | `test_compute_hash_nonexistent()` | - |
| `run_all` | `run_all()` | - |
## `scripts/test_priority_rebalancer.py`
Tests for Priority Rebalancer
| Function | Signature | Description |
|----------|-----------|-------------|
| `test` | `test(name)` | - |
| `assert_eq` | `assert_eq(a, b, msg)` | - |
| `assert_true` | `assert_true(v, msg)` | - |
| `assert_false` | `assert_false(v, msg)` | - |
| `make_issue` | `make_issue(**kwargs)` | - |
## `scripts/test_refactoring_opportunity_finder.py`
Tests for scripts/refactoring_opportunity_finder.py — 10 tests.
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_complexity_simple_function` | `test_complexity_simple_function()` | Simple function should have low complexity. |
| `test_complexity_with_conditionals` | `test_complexity_with_conditionals()` | Function with if/else should have higher complexity. |
| `test_complexity_with_loops` | `test_complexity_with_loops()` | Function with loops should increase complexity. |
| `test_complexity_with_class` | `test_complexity_with_class()` | Class with methods should count both. |
| `test_complexity_syntax_error` | `test_complexity_syntax_error()` | File with syntax error should return zeros. |
| `test_refactoring_score_high_complexity` | `test_refactoring_score_high_complexity()` | High complexity should give high score. |
| `test_refactoring_score_low_complexity` | `test_refactoring_score_low_complexity()` | Low complexity should give lower score. |
| `test_refactoring_score_high_churn` | `test_refactoring_score_high_churn()` | High churn should increase score. |
| `test_refactoring_score_no_coverage` | `test_refactoring_score_no_coverage()` | No coverage data should assume medium risk. |
| `test_refactoring_score_large_file` | `test_refactoring_score_large_file()` | Large files should score higher. |
| `run_all` | `run_all()` | - |
## `scripts/test_session_pair_harvester.py`
Tests for session_pair_harvester.
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_basic_extraction` | `test_basic_extraction()` | - |
| `test_filters_short_responses` | `test_filters_short_responses()` | - |
| `test_skips_tool_results` | `test_skips_tool_results()` | - |
| `test_deduplication` | `test_deduplication()` | - |
| `test_ratio_filter` | `test_ratio_filter()` | - |
## `scripts/validate_knowledge.py`
Validate knowledge files and index.json against the schema.
| Function | Signature | Description |
|----------|-----------|-------------|
| `validate_fact` | `validate_fact(fact, src)` | - |
| `main` | `main()` | - |
---
**Total scripts documented:** 33
*Generated by `scripts/api_doc_generator.py` (Issue #98)*

View File

@@ -1,219 +0,0 @@
#!/usr/bin/env python3
"""
API Doc Generator — Issue #98
Scans all Python modules in `scripts/`, extracts their public API surface
(module docstring + public function signatures + first-line doc summaries),
and produces a single markdown reference document at `docs/API.md`.
Usage:
python3 scripts/api_doc_generator.py # Write docs/API.md
python3 scripts/api_doc_generator.py --check # Verify docs/API.md is up-to-date
python3 scripts/api_doc_generator.py --json # Emit JSON for downstream tooling
"""
from __future__ import annotations
import ast
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import TypedDict, List, Optional
# ─── Paths ────────────────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
SCRIPTS_DIR = REPO_ROOT / "scripts"
DOCS_DIR = REPO_ROOT / "docs"
OUTPUT_PATH = DOCS_DIR / "API.md"
# ─── Data structures ───────────────────────────────────────────────────────────
class FunctionInfo(TypedDict):
name: str
signature: str
summary: str
class ModuleInfo(TypedDict):
path: str # relative to repo root, e.g. "scripts/harvester.py"
docstring: str
functions: List[FunctionInfo]
# ─── AST extraction ────────────────────────────────────────────────────────────
def extract_functions_from_ast(tree: ast.AST, file_rel: str) -> List[FunctionInfo]:
"""Extract public function names, signatures, and first-line doc summaries."""
funcs: list[FunctionInfo] = []
for node in ast.iter_child_nodes(tree):
if not isinstance(node, ast.FunctionDef):
continue
# Skip private functions
if node.name.startswith("_"):
continue
# Build signature: arg1, arg2=default, *args, **kwargs
args = []
for arg in node.args.args:
args.append(arg.arg)
if node.args.vararg:
args.append(f"*{node.args.vararg.arg}")
if node.args.kwarg:
args.append(f"**{node.args.kwarg.arg}")
# Get first line of docstring
summary = ""
if (node.body and isinstance(node.body[0], ast.Expr) and
isinstance(node.body[0].value, ast.Constant) and
isinstance(node.body[0].value.value, str)):
raw = node.body[0].value.value.strip()
summary = raw.split("\n")[0].strip()
if len(summary) > 100:
summary = summary[:97] + "..."
funcs.append({
"name": node.name,
"signature": ", ".join(args),
"summary": summary,
})
return funcs
def parse_module(filepath: Path) -> Optional[ModuleInfo]:
"""Parse a Python file and return its module-level docstring and public functions."""
try:
with open(filepath, "r", encoding="utf-8") as f:
source = f.read()
tree = ast.parse(source, filename=str(filepath))
except Exception as e:
print(f"WARNING: Could not parse {filepath}: {e}", file=sys.stderr)
return None
# Module docstring
module_doc = ast.get_docstring(tree) or ""
module_doc = module_doc.strip().split("\n")[0] # first line only
# Public functions
functions = extract_functions_from_ast(tree, filepath.name)
rel = filepath.relative_to(REPO_ROOT)
return {
"path": str(rel),
"docstring": module_doc,
"functions": functions,
}
# ─── Scanning ──────────────────────────────────────────────────────────────────
def scan_scripts_dir(scripts_dir: Path) -> List[ModuleInfo]:
"""Scan all .py files in scripts/ and extract API info."""
modules: list[ModuleInfo] = []
for pyfile in sorted(scripts_dir.glob("*.py")):
info = parse_module(pyfile)
if info is not None:
modules.append(info)
return modules
# ─── Markdown rendering ─────────────────────────────────────────────────────────
def render_markdown(modules: List[ModuleInfo]) -> str:
"""Generate full docs/API.md content from the scanned modules."""
lines = [
"# Compounding Intelligence — Scripts API Reference",
"",
f"*Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}*",
"",
"This document auto-documents the public API surface of all scripts",
"in `scripts/`. Each section covers one script: module purpose,",
"public functions, and their signatures.",
"",
"---",
"",
]
for mod in modules:
rel = mod["path"]
name = Path(rel).stem # e.g. harvester
lines.append(f"## `{rel}`")
lines.append("")
if mod["docstring"]:
lines.append(mod["docstring"])
lines.append("")
if mod["functions"]:
lines.append("| Function | Signature | Description |")
lines.append("|----------|-----------|-------------|")
for fn in mod["functions"]:
sig = fn["name"] + "(" + fn["signature"] + ")"
desc = fn["summary"] or "-"
lines.append(f"| `{fn['name']}` | `{sig}` | {desc} |")
lines.append("")
else:
lines.append("*(no public functions — script runs as `main()` only)*")
lines.append("")
lines.extend([
"",
"---",
"",
f"**Total scripts documented:** {len(modules)}",
"",
"*Generated by `scripts/api_doc_generator.py` (Issue #98)*",
])
return "\n".join(lines)
# ─── JSON output (optional, for automation) ───────────────────────────────────
def render_json(modules: List[ModuleInfo]) -> str:
"""Emit machine-readable JSON version of the API reference."""
import json
payload = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"generator": "scripts/api_doc_generator.py",
"repo": "Timmy_Foundation/compounding-intelligence",
"modules": modules,
}
return json.dumps(payload, indent=2)
# ─── Main ──────────────────────────────────────────────────────────────────────
def main() -> int:
import argparse
parser = argparse.ArgumentParser(description="Generate API docs for scripts/")
parser.add_argument("--check", action="store_true",
help="Exit 1 if docs/API.md is out-of-date")
parser.add_argument("--json", action="store_true",
help="Emit JSON to stdout instead of writing markdown")
args = parser.parse_args()
modules = scan_scripts_dir(SCRIPTS_DIR)
modules.sort(key=lambda m: m["path"])
if args.json:
print(render_json(modules))
return 0
md = render_markdown(modules)
if args.check:
if OUTPUT_PATH.exists():
existing = OUTPUT_PATH.read_text(encoding="utf-8")
if existing == md:
print("✅ docs/API.md is up-to-date")
return 0
print("❌ docs/API.md is missing or out-of-date — regenerate with "
"`python3 scripts/api_doc_generator.py`", file=sys.stderr)
return 1
DOCS_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_PATH.write_text(md, encoding="utf-8")
print(f"✅ Wrote {OUTPUT_PATH} ({len(modules)} modules documented)")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,185 @@
#!/usr/bin/env python3
"""
Review Comment Generator — Issue #126
Reads JSONL findings, deduplicates, posts as Gitea PR comments.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import sys
import urllib.request
import urllib.error
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
DEFAULT_API_BASE = os.environ.get(
"GITEA_API_BASE",
"https://forge.alexanderwhitestone.com"
)
TOKEN_PATHS = [
os.path.expanduser("~/.config/gitea/token"),
os.path.expanduser("~/.hermes/gitea.token"),
os.environ.get("GITEA_TOKEN", ""),
]
def load_token() -> Optional[str]:
token = os.environ.get("GITEA_TOKEN", "")
if token:
return token
for path in TOKEN_PATHS:
if path and os.path.exists(path):
with open(path) as f:
t = f.read().strip()
if t:
return t
return None
class GiteaClient:
def __init__(self, base_url: str, token: str, org: str, repo: str):
self.base_url = base_url.rstrip("/")
self.token = token
self.org = org
self.repo = repo
def _post(self, path: str, data: Dict) -> Optional[Dict]:
url = f"{self.base_url}/api/v1{path}"
body = json.dumps(data).encode("utf-8")
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
err = e.read().decode() if e.read() else str(e)
print(f"[ERROR] HTTP {e.code}: {err}", file=sys.stderr)
return None
except Exception as e:
print(f"[ERROR] {e}", file=sys.stderr)
return None
def post_issue_comment(self, issue_num: int, body: str) -> Optional[Dict]:
return self._post(
f"/repos/{self.org}/{self.repo}/issues/{issue_num}/comments",
{"body": body}
)
def content_hash(finding: Dict) -> str:
key = f"{finding['file']}:{finding['line']}:{finding['text']}"
return hashlib.sha256(key.encode("utf-8")).hexdigest()
def format_comment(finding: Dict) -> str:
emoji = {
"error": "🛑",
"warning": "⚠️",
"info": "",
}.get(finding.get("severity", ""), "📝")
f = finding["file"]
ln = finding["line"]
txt = finding["text"]
return f"{emoji} **Review Comment**\n\nFile: `{f}`\nLine: {ln}\n\n> {txt}\n"
def load_findings(path: Optional[Path], from_stdin: bool) -> List[Dict]:
import fileinput
findings = []
sources = ["-"] if from_stdin else [str(path)]
for line in fileinput.input(files=sources):
line = line.strip()
if not line or line.startswith("#"):
continue
try:
f = json.loads(line)
for key in ("file", "line", "text"):
if key not in f:
raise ValueError(f"Missing key: {key}")
findings.append(f)
except json.JSONDecodeError as e:
print(f"WARNING: Skipping invalid JSON: {e}", file=sys.stderr)
return findings
def main() -> int:
parser = argparse.ArgumentParser(
description="Post review findings as comments to a Gitea PR/issue"
)
parser.add_argument("--pr", type=int, required=True, help="PR/issue number")
parser.add_argument("--org", default="Timmy_Foundation", help="Gitea org")
parser.add_argument("--repo", default="compounding-intelligence", help="Repo name")
parser.add_argument("--api-base", default=DEFAULT_API_BASE, help="Gitea API base")
parser.add_argument("--token", default=None, help="API token (or env/file)")
parser.add_argument("--input", type=Path, default=None, help="JSONL input file")
parser.add_argument("--stdin", action="store_true", help="Read from stdin")
parser.add_argument("--dry-run", action="store_true", help="Show without posting")
parser.add_argument("--json", action="store_true", help="Emit JSON report")
args = parser.parse_args()
if not args.stdin and args.input is None:
print("ERROR: --input or --stdin required", file=sys.stderr)
return 1
if args.stdin and args.input:
print("ERROR: --stdin and --input exclusive", file=sys.stderr)
return 1
token = args.token or load_token()
if not token:
print("ERROR: Token not found. Set GITEA_TOKEN or ~/.config/gitea/token", file=sys.stderr)
return 1
findings = load_findings(args.input, args.stdin)
if not findings:
print("ERROR: No findings loaded", file=sys.stderr)
return 1
if not args.json: print(f"Loaded {len(findings)} finding(s)")
seen: Dict[str, Dict] = {}
for f in findings:
h = content_hash(f)
if h not in seen:
seen[h] = f
unique = list(seen.values())
if not args.json: print(f"After dedup: {len(unique)} unique")
if args.json:
report = {
"total": len(findings),
"unique": len(unique),
"findings": unique,
"generated_at": datetime.now(timezone.utc).isoformat(),
}
print(json.dumps(report, indent=2))
return 0
if args.dry_run:
print("\n=== DRY RUN — would post ===")
for i, f in enumerate(unique, 1):
print(f"\n--- Comment {i}/{len(unique)} ---")
print(format_comment(f))
return 0
client = GiteaClient(args.api_base, token, args.org, args.repo)
posted = 0
for f in unique:
body = format_comment(f)
result = client.post_issue_comment(args.pr, body)
if result:
print(f"✅ Posted: {f['file']}:{f['line']} (id={result.get('id')})")
posted += 1
else:
print(f"❌ Failed: {f['file']}:{f['line']}")
print(f"\nPosted {posted}/{len(unique)} to PR #{args.pr}")
return 0 if posted == len(unique) else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,5 @@
{"file": "scripts/harvester.py", "line": 47, "text": "Consider adding type hints to improve readability", "severity": "info"}
{"file": "scripts/dedup.py", "line": 89, "text": "Add null check before accessing fact['confidence'] to avoid KeyError", "severity": "warning"}
{"file": "scripts/bootstrapper.py", "line": 102, "text": "This loop is O(n^2) — could be optimized with a dict lookup", "severity": "info"}
{"file": "scripts/harvester.py", "line": 47, "text": "Consider adding type hints to improve readability", "severity": "info"}
{"file": "scripts/harvester.py", "line": 120, "text": "File handle not closed in error path — use context manager", "severity": "error"}

View File

@@ -1,148 +0,0 @@
#!/usr/bin/env python3
"""
Smoke tests for API Doc Generator — Issue #98
Validates that the generator runs, produces docs/API.md, and that
the generated markdown contains expected sections for the known scripts.
"""
from __future__ import annotations
import subprocess
import sys
from pathlib import Path
import pytest
# Resolve repo root
REPO_ROOT = Path(__file__).resolve().parents[1]
SCRIPTS_DIR = REPO_ROOT / "scripts"
DOCS_DIR = REPO_ROOT / "docs"
API_MD = DOCS_DIR / "API.md"
GENERATOR = SCRIPTS_DIR / "api_doc_generator.py"
# ─── Generator presence ─────────────────────────────────────────────────────────
class TestGeneratorPresence:
def test_generator_script_exists(self):
assert GENERATOR.exists(), f"Missing: {GENERATOR}"
def test_generator_is_executable(self):
with open(GENERATOR) as f:
first = f.readline().strip()
assert first.startswith("#!"), "Missing shebang"
assert "python" in first.lower()
# ─── API.md generation ──────────────────────────────────────────────────────────
class TestAPIDocGeneration:
def test_generator_runs_successfully(self):
"""Run the generator and verify exit code 0."""
result = subprocess.run(
[sys.executable, str(GENERATOR)],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=30
)
assert result.returncode == 0, (
f"Generator failed (code {{result.returncode}})\n"
f"STDERR: {{result.stderr[:500]}}"
)
def test_api_md_is_created(self):
"""docs/API.md must exist after generation."""
assert API_MD.exists(), f"Missing output: {API_MD}"
def test_api_md_is_not_empty(self):
"""Generate markdown must have substantial content."""
content = API_MD.read_text(encoding="utf-8")
assert len(content) > 1000, "API.md is suspiciously small"
def test_api_md_has_expected_structure(self):
"""Top-level headings and table markers must be present."""
content = API_MD.read_text(encoding="utf-8")
assert "# Compounding Intelligence — Scripts API Reference" in content
assert "## `scripts/" in content
assert "| Function | Signature | Description |" in content
def test_api_md_covers_expected_scripts(self):
"""At minimum the core scripts should be documented."""
content = API_MD.read_text(encoding="utf-8")
# Core scripts that must appear
core = ["scripts/harvester.py", "scripts/bootstrapper.py",
"scripts/session_reader.py", "scripts/dedup.py"]
for rel in core:
assert f"## `{rel}`" in content, f"Missing section for {rel}"
def test_api_md_contains_function_names(self):
"""Spot-check: known public functions from key modules must appear."""
content = API_MD.read_text(encoding="utf-8")
checks = [
("harvester", "read_session"),
("bootstrapper", "load_index"),
("session_reader", "extract_conversation"),
("dedup", "normalize_text"),
]
for module_stem, func_name in checks:
assert f"| `{func_name}` |" in content, f"Missing function {func_name} from {module_stem}"
# ─── Idempotence / --check ─────────────────────────────────────────────────────
class TestIdempotence:
def test_check_flag_passes_when_current(self):
"""`--check` should exit 0 immediately after generation."""
result = subprocess.run(
[sys.executable, str(GENERATOR), "--check"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=30
)
assert result.returncode == 0, (
f"--check failed\nSTDOUT: {{result.stdout}}\nSTDERR: {{result.stderr[:200]}}"
)
def test_check_fails_when_api_md_stale(self):
"""If docs/API.md is manually altered, --check should detect staleness."""
# Generate fresh baseline first
subprocess.run([sys.executable, str(GENERATOR)], capture_output=True, cwd=REPO_ROOT, timeout=30)
# Corrupt API.md slightly (append a line at the end)
original = API_MD.read_text(encoding="utf-8")
corrupted = original + "\n<!-- corrupted -->\n"
API_MD.write_text(corrupted, encoding="utf-8")
# --check should now fail
result = subprocess.run(
[sys.executable, str(GENERATOR), "--check"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=30
)
assert result.returncode != 0, "--check should detect stale API.md"
assert "out-of-date" in result.stderr.lower() or "out-of-date" in result.stdout.lower()
# Restore clean state
subprocess.run([sys.executable, str(GENERATOR)], capture_output=True, cwd=REPO_ROOT, timeout=30)
assert API_MD.read_text(encoding="utf-8") == original
# ─── JSON output ────────────────────────────────────────────────────────────────
class TestJSONOutput:
def test_json_flag_emits_valid_json(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--json"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=30
)
assert result.returncode == 0
import json
payload = json.loads(result.stdout)
assert "modules" in payload
assert len(payload["modules"]) >= 30
def test_json_has_expected_fields(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--json"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=30
)
import json
payload = json.loads(result.stdout)
mod = payload["modules"][0]
for key in ("path", "docstring", "functions"):
assert key in mod, f"Missing key {{key}} in module payload"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,234 @@
#!/usr/bin/env python3
"""
Smoke tests for Review Comment Generator — Issue #126
"""
from __future__ import annotations
import json
import subprocess
import sys
import hashlib
from io import StringIO
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parents[1]
SCRIPTS_DIR = REPO_ROOT / "scripts"
GENERATOR = SCRIPTS_DIR / "review_comment_generator.py"
SAMPLE_FINDINGS = SCRIPTS_DIR / "sample_findings.jsonl"
class TestGeneratorPresence:
def test_script_exists(self):
assert GENERATOR.exists(), f"Missing: {GENERATOR}"
def test_shebang_is_python(self):
with open(GENERATOR) as f:
first = f.readline().strip()
assert first.startswith("#!"), "No shebang"
assert "python" in first.lower()
class TestDeduplication:
def test_content_hash_deterministic(self):
from hashlib import sha256
def ch(f):
key = f"{f['file']}:{f['line']}:{f['text']}"
return sha256(key.encode()).hexdigest()
finding = {"file": "a.py", "line": 1, "text": "test"}
assert ch(finding) == ch(finding)
def test_duplicate_findings_are_removed(self):
findings = [
{"file": "a.py", "line": 1, "text": "foo", "severity": "info"},
{"file": "a.py", "line": 1, "text": "foo", "severity": "warning"},
{"file": "b.py", "line": 2, "text": "bar", "severity": "info"},
]
seen = {}
for f in findings:
key = f"{f['file']}:{f['line']}:{f['text']}"
seen[key] = f
assert len(seen) == 2
def test_different_findings_are_kept(self):
findings = [
{"file": "a.py", "line": 1, "text": "foo"},
{"file": "a.py", "line": 2, "text": "foo"},
{"file": "a.py", "line": 1, "text": "bar"},
]
seen = {}
for f in findings:
key = f"{f['file']}:{f['line']}:{f['text']}"
seen[key] = f
assert len(seen) == 3
class TestCommentFormatting:
def test_format_basic(self):
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import format_comment
f = {"file": "scripts/foo.py", "line": 10, "text": "Fix this bug", "severity": "warning"}
body = format_comment(f)
assert "📝 **Review Comment**" not in body # warning uses ⚠️
assert "⚠️ **Review Comment**" in body
assert "`scripts/foo.py`" in body
assert "Line: 10" in body
assert "> Fix this bug" in body
def test_format_severity_emoji(self):
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import format_comment
cases = [("error", "🛑"), ("warning", "⚠️"), ("info", ""), ("unknown", "📝")]
for severity, emoji in cases:
f = {"file": "x.py", "line": 1, "text": "test", "severity": severity}
assert emoji in format_comment(f)
class TestFindingsLoader:
def test_load_from_file(self):
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import load_findings
findings = load_findings(SAMPLE_FINDINGS, from_stdin=False)
assert len(findings) >= 4
def test_load_ignores_blank_and_comments(self):
import tempfile, os
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tf:
tf.write('{"file":"a.py","line":1,"text":"valid"}\n')
tf.write('\n')
tf.write('# this is a comment\n')
tf.write('{"file":"b.py","line":2,"text":"also valid"}\n')
tfname = tf.name
try:
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import load_findings
assert len(load_findings(Path(tfname), from_stdin=False)) == 2
finally:
os.unlink(tfname)
def test_invalid_json_line_skipped(self, capsys):
import tempfile, os
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tf:
tf.write('invalid json\n')
tf.write('{"file":"ok.py","line":1,"text":"valid"}\n')
tfname = tf.name
try:
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import load_findings
assert len(load_findings(Path(tfname), from_stdin=False)) == 1
finally:
os.unlink(tfname)
class TestDryRunMode:
def test_dry_run_counts_unique(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126",
"--input", str(SAMPLE_FINDINGS), "--dry-run"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
assert result.returncode == 0
assert "DRY RUN" in result.stdout
assert "Review Comment" in result.stdout
def test_dry_run_shows_all_unique(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126",
"--input", str(SAMPLE_FINDINGS), "--dry-run"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
assert result.stdout.count("--- Comment") == 4
class TestJSONOutputMode:
def test_json_flag_emits_valid_json(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126",
"--input", str(SAMPLE_FINDINGS), "--json"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
assert result.returncode == 0
payload = json.loads(result.stdout)
assert "total" in payload and "unique" in payload and "findings" in payload
assert payload["total"] >= payload["unique"]
def test_json_findings_have_required_fields(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126",
"--input", str(SAMPLE_FINDINGS), "--json"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
payload = json.loads(result.stdout)
for f in payload["findings"]:
assert "file" in f and "line" in f and "text" in f
class TestGiteaClient:
def test_post_issue_comment_builds_correct_url(self):
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import GiteaClient
client = GiteaClient("https://example.com", "token123", "MyOrg", "myrepo")
assert client.org == "MyOrg" and client.repo == "myrepo"
def test_generate_comment_body_has_required_fields(self):
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import format_comment
f = {"file": "x.py", "line": 5, "text": "Fix this", "severity": "error"}
body = format_comment(f)
assert "x.py" in body and "5" in body and "Fix this" in body
class TestFullPipeline:
def test_end_to_end_json_output(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126",
"--input", str(SAMPLE_FINDINGS), "--json"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
assert result.returncode == 0
data = json.loads(result.stdout)
assert data["total"] == 5
assert data["unique"] == 4
f = data["findings"][0]
for key in ("file", "line", "text", "severity"):
assert key in f
def test_token_loading_fallback(self):
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import load_token
token = load_token()
assert token is None or isinstance(token, str)
class TestErrorHandling:
def test_missing_input_shows_error(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
assert result.returncode != 0
assert "--input" in result.stderr or "--stdin" in result.stderr
def test_invalid_json_line_skipped(self):
import tempfile, os
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tf:
tf.write('invalid json\n')
tf.write('{"file":"ok.py","line":1,"text":"valid"}\n')
tfname = tf.name
try:
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126",
"--input", tfname, "--json"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
data = json.loads(result.stdout)
assert data["total"] == 1
assert data["unique"] == 1
finally:
os.unlink(tfname)
if __name__ == "__main__":
pytest.main([__file__, "-v"])