Compare commits

..

1 Commits

Author SHA1 Message Date
step35
5a6f41689f feat: add API Doc Generator — issue #98
Some checks failed
Test / pytest (pull_request) Failing after 7s
- scripts/api_doc_generator.py: AST-based scanner for scripts/ Python modules
- docs/API.md: generated API reference (33 modules, ~500 lines)
- tests/test_api_doc_generator.py: 12 smoke tests (all passing)

The generator extracts module docstrings and public function signatures (name, args, summary) and produces a markdown table per script. One consolidated document per repo (docs/API.md).

Closes #98
2026-04-26 07:02:49 -04:00
6 changed files with 839 additions and 700 deletions

472
docs/API.md Normal file
View File

@@ -0,0 +1,472 @@
# Compounding Intelligence — Scripts API Reference
*Generated: 2026-04-26 11:02 UTC*
This document auto-documents the public API surface of all scripts
in `scripts/`. Each section covers one script: module purpose,
public functions, and their signatures.
---
## `scripts/api_doc_generator.py`
API Doc Generator — Issue #98
| Function | Signature | Description |
|----------|-----------|-------------|
| `extract_functions_from_ast` | `extract_functions_from_ast(tree, file_rel)` | Extract public function names, signatures, and first-line doc summaries. |
| `parse_module` | `parse_module(filepath)` | Parse a Python file and return its module-level docstring and public functions. |
| `scan_scripts_dir` | `scan_scripts_dir(scripts_dir)` | Scan all .py files in scripts/ and extract API info. |
| `render_markdown` | `render_markdown(modules)` | Generate full docs/API.md content from the scanned modules. |
| `render_json` | `render_json(modules)` | Emit machine-readable JSON version of the API reference. |
| `main` | `main()` | - |
## `scripts/automation_opportunity_finder.py`
Automation Opportunity Finder — Scan fleet for manual processes that could be automated.
| Function | Signature | Description |
|----------|-----------|-------------|
| `analyze_cron_jobs` | `analyze_cron_jobs(hermes_home)` | Analyze cron job definitions for automation gaps. |
| `analyze_documents` | `analyze_documents(root_dirs)` | Scan documentation for manual step patterns. |
| `analyze_scripts` | `analyze_scripts(root_dirs)` | Detect repeated command sequences in scripts. |
| `analyze_session_transcripts` | `analyze_session_transcripts(session_dirs)` | Find repeated tool-call patterns in session transcripts. |
| `analyze_shell_history` | `analyze_shell_history(root_dirs)` | Find repeated shell commands from history files. |
| `deduplicate_proposals` | `deduplicate_proposals(proposals)` | Remove duplicate proposals based on title similarity. |
| `rank_proposals` | `rank_proposals(proposals)` | Sort proposals by impact * confidence (highest first). |
| `format_text_report` | `format_text_report(proposals)` | Format proposals as human-readable text. |
| `main` | `main()` | - |
## `scripts/bootstrapper.py`
Bootstrapper — assemble pre-session context from knowledge store.
| Function | Signature | Description |
|----------|-----------|-------------|
| `load_index` | `load_index(index_path)` | Load and validate the knowledge index. |
| `filter_facts` | `filter_facts(facts, repo, agent, include_global)` | Filter facts by repo, agent, and global scope. |
| `sort_facts` | `sort_facts(facts)` | Sort facts by: confidence (desc), then category priority, then fact text. |
| `load_repo_knowledge` | `load_repo_knowledge(repo)` | Load per-repo knowledge markdown if it exists. |
| `load_agent_knowledge` | `load_agent_knowledge(agent)` | Load per-agent knowledge markdown if it exists. |
| `load_global_knowledge` | `load_global_knowledge()` | Load all global knowledge markdown files. |
| `render_facts_section` | `render_facts_section(facts, category, label)` | Render a section of facts for a single category. |
| `estimate_tokens` | `estimate_tokens(text)` | Rough token estimate. |
| `truncate_to_tokens` | `truncate_to_tokens(text, max_tokens)` | Truncate text to approximately max_tokens, cutting at line boundaries. |
| `build_bootstrap_context` | `build_bootstrap_context(repo, agent, include_global, max_tokens, index_path)` | Build the full bootstrap context block. |
| `main` | `main()` | - |
## `scripts/dead_code_detector.py`
Dead Code Detector for Python Codebases
| Function | Signature | Description |
|----------|-----------|-------------|
| `is_safe_unused` | `is_safe_unused(name, filepath)` | Check if an unused name is expected to be unused. |
| `get_git_blame` | `get_git_blame(filepath, lineno)` | Get last author of a line via git blame. |
| `analyze_file` | `analyze_file(filepath)` | Analyze a single Python file for dead code. |
| `scan_repo` | `scan_repo(repo_path, exclude_patterns)` | Scan an entire repo for dead code. |
| `main` | `main()` | - |
## `scripts/dedup.py`
dedup.py — Knowledge deduplication: content hash + semantic similarity.
| Function | Signature | Description |
|----------|-----------|-------------|
| `normalize_text` | `normalize_text(text)` | Normalize text for hashing: lowercase, collapse whitespace, strip. |
| `content_hash` | `content_hash(text)` | SHA256 hash of normalized text for exact dedup. |
| `tokenize` | `tokenize(text)` | Simple tokenizer: lowercase words, 3+ chars. |
| `token_similarity` | `token_similarity(a, b)` | Token-based Jaccard similarity (0.0-1.0). |
| `quality_score` | `quality_score(fact)` | Compute quality score for merge ranking. |
| `merge_facts` | `merge_facts(keep, drop)` | Merge two near-duplicate facts, keeping higher-quality fields. |
| `dedup_facts` | `dedup_facts(facts, exact_threshold, near_threshold, dry_run)` | Deduplicate a list of knowledge facts. |
| `dedup_index_file` | `dedup_index_file(input_path, output_path, near_threshold, dry_run)` | Deduplicate an index.json file. |
| `generate_test_duplicates` | `generate_test_duplicates(n)` | Generate test facts with intentional duplicates for testing. |
| `main` | `main()` | - |
## `scripts/dependency_graph.py`
Cross-Repo Dependency Graph Builder
| Function | Signature | Description |
|----------|-----------|-------------|
| `normalize_repo_name` | `normalize_repo_name(name)` | Normalize a repo name for comparison. |
| `scan_file_for_deps` | `scan_file_for_deps(filepath, content, own_repo)` | Scan a file's content for references to other repos. |
| `scan_repo` | `scan_repo(repo_path, repo_name)` | Scan a repo directory for dependencies. |
| `detect_cycles` | `detect_cycles(graph)` | Detect circular dependencies using DFS. |
| `to_dot` | `to_dot(graph)` | Generate DOT format output. |
| `to_mermaid` | `to_mermaid(graph)` | Generate Mermaid format output. |
| `main` | `main()` | - |
## `scripts/diff_analyzer.py`
Diff Analyzer — Parse unified diffs and categorize every change.
*(no public functions — script runs as `main()` only)*
## `scripts/freshness.py`
Knowledge Freshness Cron — Detect stale entries from code changes (Issue #200)
| Function | Signature | Description |
|----------|-----------|-------------|
| `compute_file_hash` | `compute_file_hash(filepath)` | Compute SHA-256 hash of a file. Returns None if file doesn't exist. |
| `get_git_file_changes` | `get_git_file_changes(repo_path, days)` | Get files changed in git in the last N days. |
| `load_knowledge_entries` | `load_knowledge_entries(knowledge_dir)` | Load knowledge entries from YAML files in the knowledge directory. |
| `check_freshness` | `check_freshness(knowledge_dir, repo_root, days)` | Check freshness of knowledge entries against recent code changes. |
| `update_stale_hashes` | `update_stale_hashes(knowledge_dir, repo_root)` | Update hashes for stale entries. Returns count of updated entries. |
| `format_report` | `format_report(result, max_items)` | Format freshness check results as a human-readable report. |
| `main` | `main()` | - |
## `scripts/gitea_issue_parser.py`
Gitea Issue Body Parser — Extract structured data from markdown issue bodies.
| Function | Signature | Description |
|----------|-----------|-------------|
| `parse_issue_body` | `parse_issue_body(body, title, labels)` | Parse a Gitea issue markdown body into structured JSON. |
| `fetch_issue_from_url` | `fetch_issue_from_url(url)` | Fetch an issue from a Gitea API URL and parse it. |
| `main` | `main()` | - |
## `scripts/harvester.py`
harvester.py — Extract durable knowledge from Hermes session transcripts.
| Function | Signature | Description |
|----------|-----------|-------------|
| `find_api_key` | `find_api_key()` | Find API key from common locations. |
| `load_extraction_prompt` | `load_extraction_prompt()` | Load the extraction prompt template. |
| `call_llm` | `call_llm(prompt, transcript, api_base, api_key, model)` | Call the LLM API to extract knowledge from a transcript. |
| `parse_extraction_response` | `parse_extraction_response(content)` | Parse the LLM response to extract knowledge items. |
| `load_existing_knowledge` | `load_existing_knowledge(knowledge_dir)` | Load the existing knowledge index. |
| `fact_fingerprint` | `fact_fingerprint(fact)` | Generate a deduplication fingerprint for a fact. |
| `deduplicate` | `deduplicate(new_facts, existing, similarity_threshold)` | Remove duplicate facts from new_facts that already exist in the knowledge store. |
| `validate_fact` | `validate_fact(fact)` | Validate a single knowledge item has required fields. |
| `write_knowledge` | `write_knowledge(index, new_facts, knowledge_dir, source_session)` | Write new facts to the knowledge store. |
| `harvest_session` | `harvest_session(session_path, knowledge_dir, api_base, api_key, model, dry_run, min_confidence)` | Harvest knowledge from a single session. |
| `batch_harvest` | `batch_harvest(sessions_dir, knowledge_dir, api_base, api_key, model, since, limit, dry_run)` | Harvest knowledge from multiple sessions in batch. |
| `main` | `main()` | - |
## `scripts/improvement_proposals.py`
Improvement Proposal Generator for compounding-intelligence.
| Function | Signature | Description |
|----------|-----------|-------------|
| `analyze_sessions` | `analyze_sessions(sessions)` | Analyze session data to find waste patterns. |
| `generate_proposals` | `generate_proposals(patterns, hourly_rate, implementation_overhead)` | Generate improvement proposals from waste patterns. |
| `format_proposals_markdown` | `format_proposals_markdown(proposals, patterns, generated_at)` | Format proposals as a markdown document. |
| `format_proposals_json` | `format_proposals_json(proposals)` | Format proposals as JSON. |
| `main` | `main()` | - |
## `scripts/knowledge_gap_identifier.py`
Knowledge Gap Identifier — Pipeline 10.7
*(no public functions — script runs as `main()` only)*
## `scripts/knowledge_staleness_check.py`
Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes.
| Function | Signature | Description |
|----------|-----------|-------------|
| `compute_file_hash` | `compute_file_hash(filepath)` | Compute SHA-256 hash of a file. Returns None if file doesn't exist. |
| `check_staleness` | `check_staleness(index_path, repo_root)` | Check all entries in knowledge index for staleness. |
| `fix_hashes` | `fix_hashes(index_path, repo_root)` | Add hashes to entries missing them. Returns count of fixed entries. |
| `main` | `main()` | - |
## `scripts/perf_bottleneck_finder.py`
Performance Bottleneck Finder — Identify slow tests, builds, and CI steps.
| Function | Signature | Description |
|----------|-----------|-------------|
| `find_slow_tests_pytest` | `find_slow_tests_pytest(repo_path)` | Run pytest --durations and parse slow tests. |
| `find_slow_tests_by_scan` | `find_slow_tests_by_scan(repo_path)` | Scan test files for patterns that indicate slow tests. |
| `analyze_build_artifacts` | `analyze_build_artifacts(repo_path)` | Find large build artifacts that slow down builds. |
| `analyze_makefile_targets` | `analyze_makefile_targets(repo_path)` | Analyze Makefile for potentially slow targets. |
| `analyze_github_actions` | `analyze_github_actions(repo_path)` | Analyze GitHub Actions workflow files for inefficiencies. |
| `analyze_gitea_ci` | `analyze_gitea_ci(repo_path)` | Analyze Gitea/Drone CI config files. |
| `find_slow_imports` | `find_slow_imports(repo_path)` | Find Python files with heavy import chains. |
| `severity_sort_key` | `severity_sort_key(b)` | Sort by severity then duration. |
| `generate_report` | `generate_report(repo_path)` | Run all analyses and generate a performance report. |
| `format_markdown` | `format_markdown(report)` | Format report as markdown. |
| `main` | `main()` | - |
## `scripts/priority_rebalancer.py`
Priority Rebalancer — Re-evaluate issue priorities based on accumulated data.
| Function | Signature | Description |
|----------|-----------|-------------|
| `collect_knowledge_signals` | `collect_knowledge_signals(knowledge_dir)` | Analyze knowledge store for coverage gaps and staleness. |
| `collect_staleness_signals` | `collect_staleness_signals(scripts_dir, knowledge_dir)` | Run staleness checker if available. |
| `collect_metrics_signals` | `collect_metrics_signals(metrics_dir)` | Analyze metrics directory for pipeline health. |
| `extract_priority` | `extract_priority(labels)` | Extract priority level from issue labels. |
| `compute_issue_score` | `compute_issue_score(issue, repo, signals, now)` | Compute priority score for a single issue. |
| `generate_report` | `generate_report(scores, signals, org, repos_scanned)` | Generate the full priority report. |
| `generate_markdown_report` | `generate_markdown_report(report)` | Generate human-readable markdown report. |
| `main` | `main()` | - |
## `scripts/refactoring_opportunity_finder.py`
Finds refactoring opportunities in codebases
| Function | Signature | Description |
|----------|-----------|-------------|
| `compute_file_complexity` | `compute_file_complexity(filepath)` | Compute cyclomatic complexity for a Python file. |
| `calculate_refactoring_score` | `calculate_refactoring_score(metrics)` | Calculate a refactoring priority score (0-100) based on file metrics. |
| `scan_directory` | `scan_directory(directory, extensions)` | Scan directory for source files. |
| `generate_proposals` | `generate_proposals(directory, min_score)` | Generate refactoring proposals by analyzing source files. |
| `main` | `main()` | - |
## `scripts/sampler.py`
sampler.py — Score and rank sessions by harvest value.
| Function | Signature | Description |
|----------|-----------|-------------|
| `scan_session_fast` | `scan_session_fast(path)` | Extract scoring metadata from a session without parsing the full JSONL. |
| `parse_session_timestamp` | `parse_session_timestamp(filename)` | Parse timestamp from session filename. |
| `score_session` | `score_session(meta, now, seen_repos)` | Score a session for harvest value. Returns (score, breakdown). |
| `main` | `main()` | - |
## `scripts/session_metadata.py`
session_metadata.py - Extract structured metadata from Hermes session transcripts.
| Function | Signature | Description |
|----------|-----------|-------------|
| `extract_session_metadata` | `extract_session_metadata(file_path)` | Extract structured metadata from a Hermes session JSONL transcript. |
| `process_session_directory` | `process_session_directory(directory_path, output_file)` | Process all JSONL files in a directory. |
| `main` | `main()` | CLI entry point. |
## `scripts/session_pair_harvester.py`
Session Transcript → Training Pair Harvester
| Function | Signature | Description |
|----------|-----------|-------------|
| `compute_hash` | `compute_hash(text)` | Content hash for deduplication. |
| `extract_pairs_from_session` | `extract_pairs_from_session(session_data, min_ratio, min_response_words)` | Extract terse→rich pairs from a single session object. |
| `extract_from_jsonl_file` | `extract_from_jsonl_file(filepath, **kwargs)` | Extract pairs from a session JSONL file. |
| `deduplicate_pairs` | `deduplicate_pairs(pairs)` | Remove duplicate pairs across files. |
| `main` | `main()` | - |
## `scripts/session_reader.py`
session_reader.py — Parse Hermes session JSONL transcripts.
| Function | Signature | Description |
|----------|-----------|-------------|
| `read_session` | `read_session(path)` | Read a session JSONL file and return all messages as a list. |
| `read_session_iter` | `read_session_iter(path)` | Iterate over session messages without loading all into memory. |
| `extract_conversation` | `extract_conversation(messages)` | Extract user/assistant conversation turns, skipping tool-only messages. |
| `truncate_for_context` | `truncate_for_context(messages, head, tail)` | Truncate long sessions: keep first N + last N messages. |
| `messages_to_text` | `messages_to_text(messages)` | Convert message list to plain text for LLM consumption. |
| `get_session_metadata` | `get_session_metadata(path)` | Extract metadata from a session file (first message often has config info). |
## `scripts/test_automation_opportunity_finder.py`
Tests for scripts/automation_opportunity_finder.py — 8 tests.
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_analyze_cron_jobs_no_file` | `test_analyze_cron_jobs_no_file()` | Returns empty list when no cron jobs file exists. |
| `test_analyze_cron_jobs_disabled` | `test_analyze_cron_jobs_disabled()` | Detects disabled cron jobs. |
| `test_analyze_cron_jobs_errors` | `test_analyze_cron_jobs_errors()` | Detects cron jobs with error status. |
| `test_analyze_documents_finds_todos` | `test_analyze_documents_finds_todos()` | Detects TODO markers in documents. |
| `test_analyze_scripts_repeated_commands` | `test_analyze_scripts_repeated_commands()` | Detects repeated shell commands across scripts. |
| `test_analyze_session_transcripts` | `test_analyze_session_transcripts()` | Detects repeated tool-call sequences. |
| `test_deduplicate_proposals` | `test_deduplicate_proposals()` | Deduplicates proposals with similar titles. |
| `test_rank_proposals` | `test_rank_proposals()` | Ranks proposals by impact * confidence. |
## `scripts/test_bootstrapper.py`
Tests for bootstrapper.py — context assembly from knowledge store.
| Function | Signature | Description |
|----------|-----------|-------------|
| `make_index` | `make_index(facts, tmp_dir)` | Create a temporary index.json with given facts. |
| `test_empty_index` | `test_empty_index()` | Empty knowledge store produces graceful output. |
| `test_filter_by_repo` | `test_filter_by_repo()` | Filter facts by repository. |
| `test_filter_by_agent` | `test_filter_by_agent()` | Filter facts by agent type. |
| `test_no_global_flag` | `test_no_global_flag()` | Excluding global facts works. |
| `test_sort_by_confidence` | `test_sort_by_confidence()` | Facts sort by confidence descending. |
| `test_sort_pitfalls_first` | `test_sort_pitfalls_first()` | Pitfalls sort before facts at same confidence. |
| `test_truncate_to_tokens` | `test_truncate_to_tokens()` | Truncation cuts at line boundary. |
| `test_estimate_tokens` | `test_estimate_tokens()` | Token estimation is reasonable. |
| `test_build_full_context` | `test_build_full_context()` | Full context with facts renders correctly. |
| `test_max_tokens_respected` | `test_max_tokens_respected()` | Output respects max_tokens limit. |
| `test_missing_index_graceful` | `test_missing_index_graceful()` | Missing index.json doesn't crash. |
## `scripts/test_diff_analyzer.py`
Tests for scripts/diff_analyzer.py — 10 tests.
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_empty` | `test_empty()` | - |
| `test_addition` | `test_addition()` | - |
| `test_deletion` | `test_deletion()` | - |
| `test_modification` | `test_modification()` | - |
| `test_rename` | `test_rename()` | - |
| `test_multiple_files` | `test_multiple_files()` | - |
| `test_binary` | `test_binary()` | - |
| `test_to_dict` | `test_to_dict()` | - |
| `test_context_only` | `test_context_only()` | - |
| `test_multi_hunk` | `test_multi_hunk()` | - |
| `run_all` | `run_all()` | - |
## `scripts/test_gitea_issue_parser.py`
Tests for scripts/gitea_issue_parser.py
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_basic_parsing` | `test_basic_parsing()` | - |
| `test_numbered_criteria` | `test_numbered_criteria()` | - |
| `test_epic_ref_from_body` | `test_epic_ref_from_body()` | - |
| `test_empty_body` | `test_empty_body()` | - |
| `test_no_sections` | `test_no_sections()` | - |
| `test_multiple_sections` | `test_multiple_sections()` | - |
| `run_all` | `run_all()` | - |
## `scripts/test_harvest_prompt.py`
Test harness for knowledge extraction prompt.
| Function | Signature | Description |
|----------|-----------|-------------|
| `validate_knowledge_item` | `validate_knowledge_item(item, idx)` | Validate a single knowledge item. Returns list of errors. |
| `validate_extraction` | `validate_extraction(data)` | Validate a full extraction result. Returns (is_valid, errors, warnings). |
| `validate_transcript_coverage` | `validate_transcript_coverage(data, transcript)` | Check that extracted facts are actually supported by the transcript. |
| `run_tests` | `run_tests()` | Run the built-in test suite. |
| `validate_file` | `validate_file(filepath)` | Validate an existing extraction JSON file. |
## `scripts/test_harvest_prompt_comprehensive.py`
Comprehensive tests for knowledge extraction prompt.
| Function | Signature | Description |
|----------|-----------|-------------|
| `check_prompt_structure` | `check_prompt_structure()` | - |
| `check_confidence_scoring` | `check_confidence_scoring()` | - |
| `check_example_quality` | `check_example_quality()` | - |
| `check_constraint_coverage` | `check_constraint_coverage()` | - |
| `check_test_sessions` | `check_test_sessions()` | - |
| `test_prompt_structure` | `test_prompt_structure()` | - |
| `test_confidence_scoring` | `test_confidence_scoring()` | - |
| `test_example_quality` | `test_example_quality()` | - |
| `test_constraint_coverage` | `test_constraint_coverage()` | - |
| `test_test_sessions` | `test_test_sessions()` | - |
## `scripts/test_harvester_pipeline.py`
Smoke test for harvester pipeline — verifies the full chain:
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_session_reader` | `test_session_reader()` | Test that session_reader parses JSONL correctly. |
| `test_validate_fact` | `test_validate_fact()` | Test fact validation. |
| `test_deduplicate` | `test_deduplicate()` | Test deduplication. |
| `test_knowledge_store_roundtrip` | `test_knowledge_store_roundtrip()` | Test loading and writing knowledge index. |
| `test_full_chain_no_llm` | `test_full_chain_no_llm()` | Test the full pipeline minus the LLM call. |
## `scripts/test_improvement_proposals.py`
Tests for scripts/improvement_proposals.py — 15 tests.
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_empty_sessions` | `test_empty_sessions()` | - |
| `test_no_patterns_on_clean_sessions` | `test_no_patterns_on_clean_sessions()` | - |
| `test_repeated_error_detection` | `test_repeated_error_detection()` | Same error across 3+ sessions triggers pattern. |
| `test_repeated_error_threshold` | `test_repeated_error_threshold()` | 2 occurrences should NOT trigger (threshold is 3). |
| `test_slow_tool_detection` | `test_slow_tool_detection()` | Tool with avg latency > 5000ms across 5+ calls. |
| `test_fast_tool_not_flagged` | `test_fast_tool_not_flagged()` | Tool under 5000ms avg should not trigger. |
| `test_failed_retry_detection` | `test_failed_retry_detection()` | 3+ consecutive calls to same tool triggers retry pattern. |
| `test_manual_process_detection` | `test_manual_process_detection()` | 10+ tool calls with <= 3 unique tools. |
| `test_generate_proposals_from_patterns` | `test_generate_proposals_from_patterns()` | Proposals generated from waste patterns. |
| `test_proposal_roi_positive` | `test_proposal_roi_positive()` | ROI weeks should be a positive number for recoverable time. |
| `test_proposals_sorted_by_impact` | `test_proposals_sorted_by_impact()` | Proposals should be sorted by monthly hours saved (descending). |
| `test_format_markdown` | `test_format_markdown()` | Markdown output should contain expected sections. |
| `test_format_json` | `test_format_json()` | JSON output should be valid and parseable. |
| `test_normalize_error` | `test_normalize_error()` | Error normalization should remove paths and hashes. |
| `test_cli_integration` | `test_cli_integration()` | End-to-end test: write input JSON, run script, check output. |
| `run_all` | `run_all()` | - |
## `scripts/test_knowledge_staleness.py`
Tests for scripts/knowledge_staleness_check.py — 8 tests.
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_fresh_entry` | `test_fresh_entry()` | - |
| `test_stale_entry` | `test_stale_entry()` | - |
| `test_missing_source` | `test_missing_source()` | - |
| `test_no_hash` | `test_no_hash()` | - |
| `test_no_source_field` | `test_no_source_field()` | - |
| `test_fix_hashes` | `test_fix_hashes()` | - |
| `test_empty_index` | `test_empty_index()` | - |
| `test_compute_hash_nonexistent` | `test_compute_hash_nonexistent()` | - |
| `run_all` | `run_all()` | - |
## `scripts/test_priority_rebalancer.py`
Tests for Priority Rebalancer
| Function | Signature | Description |
|----------|-----------|-------------|
| `test` | `test(name)` | - |
| `assert_eq` | `assert_eq(a, b, msg)` | - |
| `assert_true` | `assert_true(v, msg)` | - |
| `assert_false` | `assert_false(v, msg)` | - |
| `make_issue` | `make_issue(**kwargs)` | - |
## `scripts/test_refactoring_opportunity_finder.py`
Tests for scripts/refactoring_opportunity_finder.py — 10 tests.
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_complexity_simple_function` | `test_complexity_simple_function()` | Simple function should have low complexity. |
| `test_complexity_with_conditionals` | `test_complexity_with_conditionals()` | Function with if/else should have higher complexity. |
| `test_complexity_with_loops` | `test_complexity_with_loops()` | Function with loops should increase complexity. |
| `test_complexity_with_class` | `test_complexity_with_class()` | Class with methods should count both. |
| `test_complexity_syntax_error` | `test_complexity_syntax_error()` | File with syntax error should return zeros. |
| `test_refactoring_score_high_complexity` | `test_refactoring_score_high_complexity()` | High complexity should give high score. |
| `test_refactoring_score_low_complexity` | `test_refactoring_score_low_complexity()` | Low complexity should give lower score. |
| `test_refactoring_score_high_churn` | `test_refactoring_score_high_churn()` | High churn should increase score. |
| `test_refactoring_score_no_coverage` | `test_refactoring_score_no_coverage()` | No coverage data should assume medium risk. |
| `test_refactoring_score_large_file` | `test_refactoring_score_large_file()` | Large files should score higher. |
| `run_all` | `run_all()` | - |
## `scripts/test_session_pair_harvester.py`
Tests for session_pair_harvester.
| Function | Signature | Description |
|----------|-----------|-------------|
| `test_basic_extraction` | `test_basic_extraction()` | - |
| `test_filters_short_responses` | `test_filters_short_responses()` | - |
| `test_skips_tool_results` | `test_skips_tool_results()` | - |
| `test_deduplication` | `test_deduplication()` | - |
| `test_ratio_filter` | `test_ratio_filter()` | - |
## `scripts/validate_knowledge.py`
Validate knowledge files and index.json against the schema.
| Function | Signature | Description |
|----------|-----------|-------------|
| `validate_fact` | `validate_fact(fact, src)` | - |
| `main` | `main()` | - |
---
**Total scripts documented:** 33
*Generated by `scripts/api_doc_generator.py` (Issue #98)*

View File

@@ -0,0 +1,219 @@
#!/usr/bin/env python3
"""
API Doc Generator — Issue #98
Scans all Python modules in `scripts/`, extracts their public API surface
(module docstring + public function signatures + first-line doc summaries),
and produces a single markdown reference document at `docs/API.md`.
Usage:
python3 scripts/api_doc_generator.py # Write docs/API.md
python3 scripts/api_doc_generator.py --check # Verify docs/API.md is up-to-date
python3 scripts/api_doc_generator.py --json # Emit JSON for downstream tooling
"""
from __future__ import annotations
import ast
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import TypedDict, List, Optional
# ─── Paths ────────────────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
SCRIPTS_DIR = REPO_ROOT / "scripts"
DOCS_DIR = REPO_ROOT / "docs"
OUTPUT_PATH = DOCS_DIR / "API.md"
# ─── Data structures ───────────────────────────────────────────────────────────
class FunctionInfo(TypedDict):
name: str
signature: str
summary: str
class ModuleInfo(TypedDict):
path: str # relative to repo root, e.g. "scripts/harvester.py"
docstring: str
functions: List[FunctionInfo]
# ─── AST extraction ────────────────────────────────────────────────────────────
def extract_functions_from_ast(tree: ast.AST, file_rel: str) -> List[FunctionInfo]:
"""Extract public function names, signatures, and first-line doc summaries."""
funcs: list[FunctionInfo] = []
for node in ast.iter_child_nodes(tree):
if not isinstance(node, ast.FunctionDef):
continue
# Skip private functions
if node.name.startswith("_"):
continue
# Build signature: arg1, arg2=default, *args, **kwargs
args = []
for arg in node.args.args:
args.append(arg.arg)
if node.args.vararg:
args.append(f"*{node.args.vararg.arg}")
if node.args.kwarg:
args.append(f"**{node.args.kwarg.arg}")
# Get first line of docstring
summary = ""
if (node.body and isinstance(node.body[0], ast.Expr) and
isinstance(node.body[0].value, ast.Constant) and
isinstance(node.body[0].value.value, str)):
raw = node.body[0].value.value.strip()
summary = raw.split("\n")[0].strip()
if len(summary) > 100:
summary = summary[:97] + "..."
funcs.append({
"name": node.name,
"signature": ", ".join(args),
"summary": summary,
})
return funcs
def parse_module(filepath: Path) -> Optional[ModuleInfo]:
"""Parse a Python file and return its module-level docstring and public functions."""
try:
with open(filepath, "r", encoding="utf-8") as f:
source = f.read()
tree = ast.parse(source, filename=str(filepath))
except Exception as e:
print(f"WARNING: Could not parse {filepath}: {e}", file=sys.stderr)
return None
# Module docstring
module_doc = ast.get_docstring(tree) or ""
module_doc = module_doc.strip().split("\n")[0] # first line only
# Public functions
functions = extract_functions_from_ast(tree, filepath.name)
rel = filepath.relative_to(REPO_ROOT)
return {
"path": str(rel),
"docstring": module_doc,
"functions": functions,
}
# ─── Scanning ──────────────────────────────────────────────────────────────────
def scan_scripts_dir(scripts_dir: Path) -> List[ModuleInfo]:
"""Scan all .py files in scripts/ and extract API info."""
modules: list[ModuleInfo] = []
for pyfile in sorted(scripts_dir.glob("*.py")):
info = parse_module(pyfile)
if info is not None:
modules.append(info)
return modules
# ─── Markdown rendering ─────────────────────────────────────────────────────────
def render_markdown(modules: List[ModuleInfo]) -> str:
"""Generate full docs/API.md content from the scanned modules."""
lines = [
"# Compounding Intelligence — Scripts API Reference",
"",
f"*Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}*",
"",
"This document auto-documents the public API surface of all scripts",
"in `scripts/`. Each section covers one script: module purpose,",
"public functions, and their signatures.",
"",
"---",
"",
]
for mod in modules:
rel = mod["path"]
name = Path(rel).stem # e.g. harvester
lines.append(f"## `{rel}`")
lines.append("")
if mod["docstring"]:
lines.append(mod["docstring"])
lines.append("")
if mod["functions"]:
lines.append("| Function | Signature | Description |")
lines.append("|----------|-----------|-------------|")
for fn in mod["functions"]:
sig = fn["name"] + "(" + fn["signature"] + ")"
desc = fn["summary"] or "-"
lines.append(f"| `{fn['name']}` | `{sig}` | {desc} |")
lines.append("")
else:
lines.append("*(no public functions — script runs as `main()` only)*")
lines.append("")
lines.extend([
"",
"---",
"",
f"**Total scripts documented:** {len(modules)}",
"",
"*Generated by `scripts/api_doc_generator.py` (Issue #98)*",
])
return "\n".join(lines)
# ─── JSON output (optional, for automation) ───────────────────────────────────
def render_json(modules: List[ModuleInfo]) -> str:
"""Emit machine-readable JSON version of the API reference."""
import json
payload = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"generator": "scripts/api_doc_generator.py",
"repo": "Timmy_Foundation/compounding-intelligence",
"modules": modules,
}
return json.dumps(payload, indent=2)
# ─── Main ──────────────────────────────────────────────────────────────────────
def main() -> int:
import argparse
parser = argparse.ArgumentParser(description="Generate API docs for scripts/")
parser.add_argument("--check", action="store_true",
help="Exit 1 if docs/API.md is out-of-date")
parser.add_argument("--json", action="store_true",
help="Emit JSON to stdout instead of writing markdown")
args = parser.parse_args()
modules = scan_scripts_dir(SCRIPTS_DIR)
modules.sort(key=lambda m: m["path"])
if args.json:
print(render_json(modules))
return 0
md = render_markdown(modules)
if args.check:
if OUTPUT_PATH.exists():
existing = OUTPUT_PATH.read_text(encoding="utf-8")
if existing == md:
print("✅ docs/API.md is up-to-date")
return 0
print("❌ docs/API.md is missing or out-of-date — regenerate with "
"`python3 scripts/api_doc_generator.py`", file=sys.stderr)
return 1
DOCS_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_PATH.write_text(md, encoding="utf-8")
print(f"✅ Wrote {OUTPUT_PATH} ({len(modules)} modules documented)")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,418 +0,0 @@
#!/usr/bin/env python3
"""
knowledge_synthesizer.py — Zero-shot knowledge synthesis for compounding intelligence.
Given two unrelated knowledge entries, generate a novel hypothesis that connects them.
Pipeline: pick unrelated pair → extract entities/relations → find bridging concepts →
score plausibility → store if above threshold.
Usage:
python3 scripts/knowledge_synthesizer.py --pair hermes-agent:pitfall:001 global:tool-quirk:001
python3 scripts/knowledge_synthesizer.py --auto --threshold 0.75
python3 scripts/knowledge_synthesizer.py --dry-run # show candidate pair without synthesizing
"""
import argparse
import json
import os
import sys
import time
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Tuple, List, Dict
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
REPO_ROOT = SCRIPT_DIR.parent
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
TEMPLATE_PATH = SCRIPT_DIR.parent / "templates" / "synthesis-prompt.md"
# Default API configuration
DEFAULT_API_BASE = os.environ.get(
"SYNTHESIS_API_BASE",
os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
)
DEFAULT_API_KEY = os.environ.get("SYNTHESIS_API_KEY", "")
DEFAULT_MODEL = os.environ.get(
"SYNTHESIS_MODEL",
os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
)
# Places to look for API keys if not in env
API_KEY_PATHS = [
os.path.expanduser("~/.config/nous/key"),
os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"),
os.path.expanduser("~/.config/openrouter/key"),
]
def find_api_key() -> str:
for path in API_KEY_PATHS:
if os.path.exists(path):
with open(path) as f:
key = f.read().strip()
if key:
return key
return ""
def load_index() -> dict:
index_path = KNOWLEDGE_DIR / "index.json"
if not index_path.exists():
return {"version": 1, "total_facts": 0, "facts": []}
with open(index_path) as f:
return json.load(f)
def save_index(index: dict) -> None:
KNOWLEDGE_DIR.mkdir(parents=True, exist_ok=True)
index_path = KNOWLEDGE_DIR / "index.json"
with open(index_path, 'w', encoding='utf-8') as f:
json.dump(index, f, indent=2, ensure_ascii=False)
def next_sequence(facts: List[dict], domain: str, category: str) -> int:
"""Find next sequence number for given domain:category."""
prefix = f"{domain}:{category}:"
max_seq = 0
for fact in facts:
fid = fact.get('id', '')
if fid.startswith(prefix):
try:
seq = int(fid.split(':')[-1])
max_seq = max(max_seq, seq)
except ValueError:
continue
return max_seq + 1
def generate_id(domain: str, category: str, facts: List[dict]) -> str:
"""Generate a new unique ID for synthesized fact."""
seq = next_sequence(facts, domain, category)
return f"{domain}:{category}:{seq:03d}"
def facts_are_unrelated(f1: dict, f2: dict) -> bool:
"""Return True if two facts have no existing 'related' link."""
id1, id2 = f1['id'], f2['id']
rel1 = set(f1.get('related', []))
rel2 = set(f2.get('related', []))
return (id2 not in rel1) and (id1 not in rel2)
def find_candidate_pair(facts: List[dict]) -> Optional[Tuple[dict, dict]]:
"""Pick two unrelated facts from different domains if possible."""
# Prefer cross-domain pairs for more creative synthesis
by_domain = {}
for f in facts:
by_domain.setdefault(f['domain'], []).append(f)
domains = list(by_domain.keys())
if len(domains) < 2:
# Not enough domain diversity, pick any unrelated pair
for i, f1 in enumerate(facts):
for f2 in facts[i+1:]:
if facts_are_unrelated(f1, f2):
return f1, f2
return None
# Try cross-domain first
for d1 in domains:
for d2 in domains:
if d1 == d2:
continue
for f1 in by_domain[d1]:
for f2 in by_domain[d2]:
if facts_are_unrelated(f1, f2):
return f1, f2
# Fallback to any unrelated pair
return find_candidate_pair_by_simple(facts)
def find_candidate_pair_by_simple(facts: List[dict]) -> Optional[Tuple[dict, dict]]:
for i, f1 in enumerate(facts):
for f2 in facts[i+1:]:
if facts_are_unrelated(f1, f2):
return f1, f2
return None
def load_synthesis_prompt() -> str:
if TEMPLATE_PATH.exists():
return TEMPLATE_PATH.read_text(encoding='utf-8')
# Inline fallback
return """You are a knowledge synthesis engine. Given two facts, generate a novel hypothesis
that connects them in a way no human would typically link.
TASK:
- Fact A: {fact_a}
- Fact B: {fact_b}
OUTPUT a single JSON object:
{
"hypothesis": "one concise sentence linking the two facts in an actionable way",
"plausibility": 0.0-1.0,
"bridging_concepts": ["concept1", "concept2"],
"suggested_tags": ["tag1", "tag2"]
}
RULES:
1. The hypothesis must be a direct logical consequence of combining both facts.
2. Do NOT restate either fact — produce a new insight.
3. Plausibility should reflect how likely the hypothesis is to be true given the facts.
4. If no meaningful connection exists, return {"hypothesis":"","plausibility":0.0}.
5. Output ONLY valid JSON, no markdown.
"""
def call_synthesis_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str) -> Optional[dict]:
"""Call LLM to synthesize a hypothesis from two facts."""
import urllib.request
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": transcript}
]
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.7, # More creative for synthesis
"max_tokens": 512
}).encode('utf-8')
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read().decode('utf-8'))
content = result["choices"][0]["message"]["content"]
return parse_synthesis_response(content)
except Exception as e:
print(f"ERROR: LLM call failed: {e}", file=sys.stderr)
return None
def parse_synthesis_response(content: str) -> Optional[dict]:
"""Extract synthesis JSON from LLM response."""
try:
data = json.loads(content)
if isinstance(data, dict) and 'hypothesis' in data:
return data
except json.JSONDecodeError:
pass
import re
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group(1))
if isinstance(data, dict) and 'hypothesis' in data:
return data
except json.JSONDecodeError:
pass
# Try finding any JSON object
json_match = re.search(r'(\{.*"hypothesis".*\})', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
return None
def heuristic_synthesis(f1: dict, f2: dict) -> dict:
"""Rule-based fallback synthesis when no LLM available."""
# Simple bridging: combine tags and domains
tags = list(set(f1.get('tags', []) + f2.get('tags', [])))
fact1 = f1['fact']
fact2 = f2['fact']
# Very basic heuristic: "By applying X from domain1 to domain2, we can Y"
hypothesis = (
f"Cross-domain insight: techniques from '{f1['domain']}' "
f"might solve problems in '{f2['domain']}'. "
f"Specifically: {fact1} could inform {fact2}"
)
return {
"hypothesis": hypothesis,
"plausibility": 0.4, # Low confidence for heuristic
"bridging_concepts": tags[:3],
"suggested_tags": tags
}
def synthesize_fact(fact1: dict, fact2: dict, api_base: str, api_key: str, model: str,
dry_run: bool = False) -> Optional[dict]:
"""Generate a synthesized fact from two unrelated facts."""
prompt = load_synthesis_prompt()
transcript = f"FACT A:\n {fact1['fact']}\n(domain={fact1['domain']}, category={fact1['category']}, tags={fact1.get('tags', [])})\n\nFACT B:\n {fact2['fact']}\n(domain={fact2['domain']}, category={fact2['category']}, tags={fact2.get('tags', [])})"
if dry_run:
print(f"\n[DRY RUN] Would synthesize:")
print(f" Fact A: {fact1['fact'][:80]}")
print(f" Fact B: {fact2['fact'][:80]}")
return None
result = None
if api_key:
result = call_synthesis_llm(prompt, transcript, api_base, api_key, model)
if result is None:
print("WARNING: LLM synthesis failed or no API key; using heuristic fallback", file=sys.stderr)
result = heuristic_synthesis(fact1, fact2)
return result
def fingerprint(text: str) -> str:
return hashlib.md5(text.lower().strip().encode('utf-8')).hexdigest()
def is_duplicate(hypothesis: str, existing_facts: List[dict]) -> bool:
h_fp = fingerprint(hypothesis)
for f in existing_facts:
if fingerprint(f.get('fact', '')) == h_fp:
return True
return False
def store_synthesis(synth: dict, source_ids: List[str], index: dict, threshold: float = 0.5) -> bool:
"""Store synthesized fact if plausibility exceeds threshold."""
plaus = synth.get('plausibility', 0.0)
if plaus < threshold:
print(f"Skipped: plausibility {plaus:.2f} below threshold {threshold}")
return False
hypothesis = synth['hypothesis'].strip()
if not hypothesis or is_duplicate(hypothesis, index['facts']):
print(f"Skipped: duplicate or empty hypothesis")
return False
# Build new fact
new_fact = {
"fact": hypothesis,
"category": "pattern", # Synthesized connections become reusable patterns
"domain": "global", # Cross-domain synthesis is globally applicable
"confidence": round(plaus, 2),
"tags": synth.get('suggested_tags', []),
"related": source_ids,
"first_seen": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"last_confirmed": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"source_count": 1,
}
# Generate ID
new_fact['id'] = generate_id("global", "pattern", index['facts'])
# Update index
index['facts'].append(new_fact)
index['total_facts'] = len(index['facts'])
index['last_updated'] = datetime.now(timezone.utc).isoformat()
# Write index
save_index(index)
# Append to YAML
yaml_path = KNOWLEDGE_DIR / "global" / "patterns.yaml"
yaml_path.parent.mkdir(parents=True, exist_ok=True)
mode = 'a' if yaml_path.exists() else 'w'
with open(yaml_path, mode, encoding='utf-8') as f:
if mode == 'w':
f.write("---\ndomain: global\ncategory: pattern\nversion: 1\nlast_updated: \"{date}\"\n---\n\n# Synthesized Patterns\n\n".format(date=datetime.now(timezone.utc).strftime("%Y-%m-%d")))
f.write(f"\n- id: {new_fact['id']}\n")
f.write(f" fact: \"{hypothesis}\"\n")
f.write(f" confidence: {plaus}\n")
if new_fact['tags']:
f.write(f" tags: {json.dumps(new_fact['tags'])}\n")
f.write(f" related: {json.dumps(source_ids)}\n")
f.write(f" first_seen: \"{new_fact['first_seen']}\"\n")
f.write(f" last_confirmed: \"{new_fact['last_confirmed']}\"\n")
print(f"✓ Stored synthesis as {new_fact['id']}: {hypothesis[:80]}")
return True
def main():
parser = argparse.ArgumentParser(description="Zero-shot knowledge synthesis")
parser.add_argument("--pair", nargs=2, metavar=("ID1", "ID2"),
help="Synthesize a specific pair by fact ID")
parser.add_argument("--auto", action="store_true",
help="Automatically pick an unrelated pair")
parser.add_argument("--threshold", type=float, default=0.6,
help="Plausibility threshold for storage (default: 0.6)")
parser.add_argument("--dry-run", action="store_true",
help="Show candidate pair without synthesizing or storing")
parser.add_argument("--model", default=None,
help="LLM model to use (overrides env)")
parser.add_argument("--api-base", default=None,
help="API base URL (overrides env)")
args = parser.parse_args()
# Resolve API credentials
api_base = args.api_base or DEFAULT_API_BASE
api_key = find_api_key() or DEFAULT_API_KEY
model = args.model or DEFAULT_MODEL
if not args.dry_run and not args.pair and not args.auto:
print("ERROR: Must specify either --pair ID1 ID2 or --auto", file=sys.stderr)
parser.print_help()
sys.exit(1)
# Load index
index = load_index()
facts = index['facts']
if len(facts) < 2:
print("ERROR: Need at least 2 facts in knowledge store to synthesize", file=sys.stderr)
sys.exit(1)
# Select facts
f1, f2 = None, None
if args.pair:
id1, id2 = args.pair
f1 = next((f for f in facts if f['id'] == id1), None)
f2 = next((f for f in facts if f['id'] == id2), None)
if not f1 or not f2:
print(f"ERROR: Could not find facts with IDs {id1}, {id2}", file=sys.stderr)
sys.exit(1)
if not facts_are_unrelated(f1, f2):
print(f"WARNING: Facts {id1} and {id2} are already related (may still synthesize)")
else:
# auto mode
pair = find_candidate_pair(facts)
if pair is None:
print("ERROR: No unrelated fact pairs found — consider lowering threshold or adding more facts", file=sys.stderr)
sys.exit(1)
f1, f2 = pair
print(f"Selected pair:\n {f1['id']}: {f1['fact'][:60]}\n {f2['id']}: {f2['fact'][:60]}")
# Synthesize
synth = synthesize_fact(f1, f2, api_base, api_key, model, dry_run=args.dry_run)
if synth is None:
sys.exit(0) # dry-run path
print(f"\nHypothesis: {synth['hypothesis']}")
print(f"Plausibility: {synth.get('plausibility', 0.0):.2f}")
print(f"Bridging concepts: {synth.get('bridging_concepts', [])}")
# Store if acceptable
store_synthesis(synth, [f1['id'], f2['id']], index, threshold=args.threshold)
if __name__ == '__main__':
main()

View File

@@ -1,235 +0,0 @@
#!/usr/bin/env python3
"""
Tests for knowledge_synthesizer.py — zero-shot knowledge synthesis pipeline.
Run with: python3 scripts/test_knowledge_synthesizer.py
Or via pytest: pytest scripts/test_knowledge_synthesizer.py
"""
import json
import os
import sys
import os
import tempfile
from pathlib import Path
# Add scripts dir to path for importing sibling module
SCRIPT_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(SCRIPT_DIR))
import importlib.util
spec = importlib.util.spec_from_file_location(
"ks", os.path.join(str(SCRIPT_DIR), "knowledge_synthesizer.py")
)
ks = importlib.util.module_from_spec(spec)
spec.loader.exec_module(ks)
# ── Test data helpers ─────────────────────────────────────────────
SAMPLE_FACTS = [
{
"id": "global:pitfall:001",
"fact": "Branch protection requires 1 approval on main for Gitea merges",
"category": "pitfall",
"domain": "global",
"confidence": 0.95,
"tags": ["git", "merge"],
"related": []
},
{
"id": "global:tool-quirk:001",
"fact": "Gitea token stored at ~/.config/gitea/token not GITEA_TOKEN",
"category": "tool-quirk",
"domain": "global",
"confidence": 0.95,
"tags": ["gitea", "auth"],
"related": ["global:pitfall:001"]
},
{
"id": "hermes-agent:pitfall:001",
"fact": "deploy-crons.py leaves jobs in mixed model format",
"category": "pitfall",
"domain": "hermes-agent",
"confidence": 0.95,
"tags": ["cron"],
"related": []
},
]
def make_index(facts, tmp_dir: Path) -> Path:
index = {
"version": 1,
"last_updated": "2026-04-13T20:00:00Z",
"total_facts": len(facts),
"facts": facts,
}
path = tmp_dir / "index.json"
with open(path, "w") as f:
json.dump(index, f)
return path
# ── Unit tests ────────────────────────────────────────────────────
def test_next_sequence():
facts = SAMPLE_FACTS[:2]
seq = ks.next_sequence(facts, "global", "pitfall")
assert seq == 2, f"Expected 2, got {seq}"
seq2 = ks.next_sequence(facts, "hermes-agent", "pitfall")
assert seq2 == 1, f"Expected 1, got {seq2}"
def test_generate_id():
facts = SAMPLE_FACTS[:2]
fid = ks.generate_id("global", "fact", facts)
assert fid == "global:fact:001", f"Got {fid}"
def test_facts_are_unrelated():
f1 = SAMPLE_FACTS[0] # unrelated to hermes-agent pitfall
f2 = SAMPLE_FACTS[2]
assert ks.facts_are_unrelated(f1, f2) is True
f3 = SAMPLE_FACTS[1] # related to f1
assert ks.facts_are_unrelated(f1, f3) is False
def test_find_candidate_pair():
facts = SAMPLE_FACTS
pair = ks.find_candidate_pair(facts)
assert pair is not None, "Should find an unrelated pair"
f1, f2 = pair
assert ks.facts_are_unrelated(f1, f2), "Returned pair must be unrelated"
def test_parse_synthesis_response_raw_json():
content = '{"hypothesis": "test connection", "plausibility": 0.8, "bridging_concepts": ["x"], "suggested_tags": ["a"]}'
result = ks.parse_synthesis_response(content)
assert result is not None
assert result["hypothesis"] == "test connection"
assert result["plausibility"] == 0.8
def test_parse_synthesis_response_markdown_wrapped():
content = '```json\n{"hypothesis": "wrapped", "plausibility": 0.5}\n```'
result = ks.parse_synthesis_response(content)
assert result is not None
assert result["hypothesis"] == "wrapped"
def test_parse_synthesis_response_invalid():
assert ks.parse_synthesis_response("not json") is None
assert ks.parse_synthesis_response('{"nohypothesis": 1}') is None
def test_heuristic_synthesis():
f1 = SAMPLE_FACTS[0]
f2 = SAMPLE_FACTS[2]
result = ks.heuristic_synthesis(f1, f2)
assert "hypothesis" in result
assert "plausibility" in result
assert result["plausibility"] == 0.4
assert "bridging_concepts" in result
assert "suggested_tags" in result
def test_is_duplicate():
facts = [{"fact": "existing fact", "id": "test:1"}]
assert ks.is_duplicate("existing fact", facts) is True
assert ks.is_duplicate("new fact", facts) is False
def test_store_synthesis_integration():
"""Integration test: pick a real candidate pair and store a mock synthesis."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
# Create fake knowledge dir with index
kdir = tmp_path / "knowledge"
kdir.mkdir()
index = {
"version": 1,
"last_updated": "2026-04-13T20:00:00Z",
"total_facts": 3,
"facts": SAMPLE_FACTS
}
with open(kdir / "index.json", "w") as f:
json.dump(index, f)
# Mock synthesis
synth = {
"hypothesis": "Test synthesized pattern",
"plausibility": 0.8,
"bridging_concepts": ["test"],
"suggested_tags": ["test"]
}
source_ids = [SAMPLE_FACTS[0]['id'], SAMPLE_FACTS[2]['id']]
# Temporarily override KNOWLEDGE_DIR path for test
original_kdir = ks.KNOWLEDGE_DIR
ks.KNOWLEDGE_DIR = kdir
try:
stored = ks.store_synthesis(synth, source_ids, index, threshold=0.5)
assert stored is True
assert index['total_facts'] == 4
new_fact = index['facts'][-1]
assert new_fact['fact'] == "Test synthesized pattern"
assert new_fact['category'] == "pattern"
assert new_fact['domain'] == "global"
assert new_fact['related'] == source_ids
assert new_fact['id'].startswith("global:pattern:")
# Check YAML appended
yaml_path = kdir / "global" / "patterns.yaml"
assert yaml_path.exists()
content = yaml_path.read_text()
assert "Test synthesized pattern" in content
finally:
ks.KNOWLEDGE_DIR = original_kdir
# ── Smoke test ────────────────────────────────────────────────────
def test_smoke_synthesizer_info():
"""Sanity check: script can at least load and report current knowledge state."""
index = ks.load_index()
total = index.get('total_facts', 0)
facts = index.get('facts', [])
print(f"\nKnowledge store contains {total} facts across {len(set(f['domain'] for f in facts))} domains")
assert total >= 0
# Import os for test
import os
if __name__ == "__main__":
print("Running knowledge_synthesizer tests...\n")
passed = 0
failed = 0
tests = [
test_next_sequence,
test_generate_id,
test_facts_are_unrelated,
test_find_candidate_pair,
test_parse_synthesis_response_raw_json,
test_parse_synthesis_response_markdown_wrapped,
test_parse_synthesis_response_invalid,
test_heuristic_synthesis,
test_is_duplicate,
test_store_synthesis_integration,
test_smoke_synthesizer_info,
]
for test in tests:
try:
test()
print(f"{test.__name__}")
passed += 1
except Exception as e:
import traceback; traceback.print_exc(); print(f"{test.__name__}: {e}")
failed += 1
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)

View File

@@ -1,47 +0,0 @@
# Knowledge Synthesis Prompt
## System Prompt
You are a knowledge synthesis engine. Given two facts, you generate a novel hypothesis
that connects them in a way no human would typically link — a zero-shot creative leap.
## Task
FACT A:
{fact_a}
FACT B:
{fact_b}
Generate a single JSON object:
{
"hypothesis": "one concise sentence linking the two facts as a new, testable insight",
"plausibility": 0.0-1.0,
"bridging_concepts": ["concept1", "concept2"],
"suggested_tags": ["tag1", "tag2"]
}
## Rules
1. The hypothesis must be a logical consequence of combining both facts.
2. DO NOT restate either fact — produce genuinely new insight.
3. Plausibility should reflect confidence given only these two facts.
4. If no meaningful connection exists, return {"hypothesis":"","plausibility":0.0}.
5. Output ONLY valid JSON — no markdown, no explanation.
## Examples
Input facts:
- "Gitea PR creation requires branch protection approval (1+) on main"
- "Git push hangs on large repos (pack.windowMemory=100m)"
Hypothesis output:
{
"hypothesis": "Branch protection triggers checks that inflate pack size, causing git push to hang on large repos",
"plausibility": 0.65,
"bridging_concepts": ["git", "gitea", "branch-protection", "push"],
"suggested_tags": ["git", "gitea", "performance"]
}
Output ONLY the JSON object.

View File

@@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""
Smoke tests for API Doc Generator — Issue #98
Validates that the generator runs, produces docs/API.md, and that
the generated markdown contains expected sections for the known scripts.
"""
from __future__ import annotations
import subprocess
import sys
from pathlib import Path
import pytest
# Resolve repo root
REPO_ROOT = Path(__file__).resolve().parents[1]
SCRIPTS_DIR = REPO_ROOT / "scripts"
DOCS_DIR = REPO_ROOT / "docs"
API_MD = DOCS_DIR / "API.md"
GENERATOR = SCRIPTS_DIR / "api_doc_generator.py"
# ─── Generator presence ─────────────────────────────────────────────────────────
class TestGeneratorPresence:
def test_generator_script_exists(self):
assert GENERATOR.exists(), f"Missing: {GENERATOR}"
def test_generator_is_executable(self):
with open(GENERATOR) as f:
first = f.readline().strip()
assert first.startswith("#!"), "Missing shebang"
assert "python" in first.lower()
# ─── API.md generation ──────────────────────────────────────────────────────────
class TestAPIDocGeneration:
def test_generator_runs_successfully(self):
"""Run the generator and verify exit code 0."""
result = subprocess.run(
[sys.executable, str(GENERATOR)],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=30
)
assert result.returncode == 0, (
f"Generator failed (code {{result.returncode}})\n"
f"STDERR: {{result.stderr[:500]}}"
)
def test_api_md_is_created(self):
"""docs/API.md must exist after generation."""
assert API_MD.exists(), f"Missing output: {API_MD}"
def test_api_md_is_not_empty(self):
"""Generate markdown must have substantial content."""
content = API_MD.read_text(encoding="utf-8")
assert len(content) > 1000, "API.md is suspiciously small"
def test_api_md_has_expected_structure(self):
"""Top-level headings and table markers must be present."""
content = API_MD.read_text(encoding="utf-8")
assert "# Compounding Intelligence — Scripts API Reference" in content
assert "## `scripts/" in content
assert "| Function | Signature | Description |" in content
def test_api_md_covers_expected_scripts(self):
"""At minimum the core scripts should be documented."""
content = API_MD.read_text(encoding="utf-8")
# Core scripts that must appear
core = ["scripts/harvester.py", "scripts/bootstrapper.py",
"scripts/session_reader.py", "scripts/dedup.py"]
for rel in core:
assert f"## `{rel}`" in content, f"Missing section for {rel}"
def test_api_md_contains_function_names(self):
"""Spot-check: known public functions from key modules must appear."""
content = API_MD.read_text(encoding="utf-8")
checks = [
("harvester", "read_session"),
("bootstrapper", "load_index"),
("session_reader", "extract_conversation"),
("dedup", "normalize_text"),
]
for module_stem, func_name in checks:
assert f"| `{func_name}` |" in content, f"Missing function {func_name} from {module_stem}"
# ─── Idempotence / --check ─────────────────────────────────────────────────────
class TestIdempotence:
def test_check_flag_passes_when_current(self):
"""`--check` should exit 0 immediately after generation."""
result = subprocess.run(
[sys.executable, str(GENERATOR), "--check"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=30
)
assert result.returncode == 0, (
f"--check failed\nSTDOUT: {{result.stdout}}\nSTDERR: {{result.stderr[:200]}}"
)
def test_check_fails_when_api_md_stale(self):
"""If docs/API.md is manually altered, --check should detect staleness."""
# Generate fresh baseline first
subprocess.run([sys.executable, str(GENERATOR)], capture_output=True, cwd=REPO_ROOT, timeout=30)
# Corrupt API.md slightly (append a line at the end)
original = API_MD.read_text(encoding="utf-8")
corrupted = original + "\n<!-- corrupted -->\n"
API_MD.write_text(corrupted, encoding="utf-8")
# --check should now fail
result = subprocess.run(
[sys.executable, str(GENERATOR), "--check"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=30
)
assert result.returncode != 0, "--check should detect stale API.md"
assert "out-of-date" in result.stderr.lower() or "out-of-date" in result.stdout.lower()
# Restore clean state
subprocess.run([sys.executable, str(GENERATOR)], capture_output=True, cwd=REPO_ROOT, timeout=30)
assert API_MD.read_text(encoding="utf-8") == original
# ─── JSON output ────────────────────────────────────────────────────────────────
class TestJSONOutput:
def test_json_flag_emits_valid_json(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--json"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=30
)
assert result.returncode == 0
import json
payload = json.loads(result.stdout)
assert "modules" in payload
assert len(payload["modules"]) >= 30
def test_json_has_expected_fields(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--json"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=30
)
import json
payload = json.loads(result.stdout)
mod = payload["modules"][0]
for key in ("path", "docstring", "functions"):
assert key in mod, f"Missing key {{key}} in module payload"
if __name__ == "__main__":
pytest.main([__file__, "-v"])