Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6416b776db |
141
GENOME.md
Normal file
141
GENOME.md
Normal file
@@ -0,0 +1,141 @@
|
||||
# GENOME.md — Timmy_Foundation/timmy-home
|
||||
|
||||
Generated by `pipelines/codebase_genome.py`.
|
||||
|
||||
## Project Overview
|
||||
|
||||
Timmy Foundation's home repository for development operations and configurations.
|
||||
|
||||
- Text files indexed: 3004
|
||||
- Source and script files: 186
|
||||
- Test files: 28
|
||||
- Documentation files: 701
|
||||
|
||||
## Architecture
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
repo_root["repo"]
|
||||
angband["angband"]
|
||||
briefings["briefings"]
|
||||
config["config"]
|
||||
conftest["conftest"]
|
||||
evennia["evennia"]
|
||||
evennia_tools["evennia_tools"]
|
||||
evolution["evolution"]
|
||||
gemini_fallback_setup["gemini-fallback-setup"]
|
||||
heartbeat["heartbeat"]
|
||||
infrastructure["infrastructure"]
|
||||
repo_root --> angband
|
||||
repo_root --> briefings
|
||||
repo_root --> config
|
||||
repo_root --> conftest
|
||||
repo_root --> evennia
|
||||
repo_root --> evennia_tools
|
||||
```
|
||||
|
||||
## Entry Points
|
||||
|
||||
- `gemini-fallback-setup.sh` — operational script (`bash gemini-fallback-setup.sh`)
|
||||
- `morrowind/hud.sh` — operational script (`bash morrowind/hud.sh`)
|
||||
- `pipelines/codebase_genome.py` — python main guard (`python3 pipelines/codebase_genome.py`)
|
||||
- `scripts/auto_restart_agent.sh` — operational script (`bash scripts/auto_restart_agent.sh`)
|
||||
- `scripts/backup_pipeline.sh` — operational script (`bash scripts/backup_pipeline.sh`)
|
||||
- `scripts/big_brain_manager.py` — operational script (`python3 scripts/big_brain_manager.py`)
|
||||
- `scripts/big_brain_repo_audit.py` — operational script (`python3 scripts/big_brain_repo_audit.py`)
|
||||
- `scripts/codebase_genome_nightly.py` — operational script (`python3 scripts/codebase_genome_nightly.py`)
|
||||
- `scripts/detect_secrets.py` — operational script (`python3 scripts/detect_secrets.py`)
|
||||
- `scripts/dynamic_dispatch_optimizer.py` — operational script (`python3 scripts/dynamic_dispatch_optimizer.py`)
|
||||
- `scripts/emacs-fleet-bridge.py` — operational script (`python3 scripts/emacs-fleet-bridge.py`)
|
||||
- `scripts/emacs-fleet-poll.sh` — operational script (`bash scripts/emacs-fleet-poll.sh`)
|
||||
|
||||
## Data Flow
|
||||
|
||||
1. Operators enter through `gemini-fallback-setup.sh`, `morrowind/hud.sh`, `pipelines/codebase_genome.py`.
|
||||
2. Core logic fans into top-level components: `angband`, `briefings`, `config`, `conftest`, `evennia`, `evennia_tools`.
|
||||
3. Validation is incomplete around `wizards/allegro/home/skills/red-teaming/godmode/scripts/auto_jailbreak.py`, `timmy-local/cache/agent_cache.py`, `wizards/allegro/home/skills/red-teaming/godmode/scripts/parseltongue.py`, so changes there carry regression risk.
|
||||
4. Final artifacts land as repository files, docs, or runtime side effects depending on the selected entry point.
|
||||
|
||||
## Key Abstractions
|
||||
|
||||
- `evennia/timmy_world/game.py` — classes `World`:91, `ActionSystem`:421, `TimmyAI`:539, `NPCAI`:550; functions `get_narrative_phase()`:55, `get_phase_transition_event()`:65
|
||||
- `evennia/timmy_world/world/game.py` — classes `World`:19, `ActionSystem`:326, `TimmyAI`:444, `NPCAI`:455; functions none detected
|
||||
- `timmy-world/game.py` — classes `World`:19, `ActionSystem`:349, `TimmyAI`:467, `NPCAI`:478; functions none detected
|
||||
- `wizards/allegro/home/skills/red-teaming/godmode/scripts/auto_jailbreak.py` — classes none detected; functions none detected
|
||||
- `uniwizard/self_grader.py` — classes `SessionGrade`:23, `WeeklyReport`:55, `SelfGrader`:74; functions `main()`:713
|
||||
- `uni-wizard/v3/intelligence_engine.py` — classes `ExecutionPattern`:27, `ModelPerformance`:44, `AdaptationEvent`:58, `PatternDatabase`:69; functions none detected
|
||||
- `scripts/know_thy_father/crossref_audit.py` — classes `ThemeCategory`:30, `Principle`:160, `MeaningKernel`:169, `CrossRefFinding`:178; functions `extract_themes_from_text()`:192, `parse_soul_md()`:206, `parse_kernels()`:264, `cross_reference()`:296, `generate_report()`:440, `main()`:561
|
||||
- `timmy-local/cache/agent_cache.py` — classes `CacheStats`:28, `LRUCache`:52, `ResponseCache`:94, `ToolCache`:205; functions none detected
|
||||
|
||||
## API Surface
|
||||
|
||||
- CLI: `bash gemini-fallback-setup.sh` — operational script (`gemini-fallback-setup.sh`)
|
||||
- CLI: `bash morrowind/hud.sh` — operational script (`morrowind/hud.sh`)
|
||||
- CLI: `python3 pipelines/codebase_genome.py` — python main guard (`pipelines/codebase_genome.py`)
|
||||
- CLI: `bash scripts/auto_restart_agent.sh` — operational script (`scripts/auto_restart_agent.sh`)
|
||||
- CLI: `bash scripts/backup_pipeline.sh` — operational script (`scripts/backup_pipeline.sh`)
|
||||
- CLI: `python3 scripts/big_brain_manager.py` — operational script (`scripts/big_brain_manager.py`)
|
||||
- CLI: `python3 scripts/big_brain_repo_audit.py` — operational script (`scripts/big_brain_repo_audit.py`)
|
||||
- CLI: `python3 scripts/codebase_genome_nightly.py` — operational script (`scripts/codebase_genome_nightly.py`)
|
||||
- Python: `get_narrative_phase()` from `evennia/timmy_world/game.py:55`
|
||||
- Python: `get_phase_transition_event()` from `evennia/timmy_world/game.py:65`
|
||||
- Python: `main()` from `uniwizard/self_grader.py:713`
|
||||
|
||||
## Test Coverage Report
|
||||
|
||||
- Source and script files inspected: 186
|
||||
- Test files inspected: 28
|
||||
- Coverage gaps:
|
||||
- `wizards/allegro/home/skills/red-teaming/godmode/scripts/auto_jailbreak.py` — no matching test reference detected
|
||||
- `timmy-local/cache/agent_cache.py` — no matching test reference detected
|
||||
- `wizards/allegro/home/skills/red-teaming/godmode/scripts/parseltongue.py` — no matching test reference detected
|
||||
- `twitter-archive/multimodal_pipeline.py` — no matching test reference detected
|
||||
- `wizards/allegro/home/skills/red-teaming/godmode/scripts/godmode_race.py` — no matching test reference detected
|
||||
- `skills/productivity/google-workspace/scripts/google_api.py` — no matching test reference detected
|
||||
- `wizards/allegro/home/skills/productivity/google-workspace/scripts/google_api.py` — no matching test reference detected
|
||||
- `morrowind/pilot.py` — no matching test reference detected
|
||||
- `morrowind/mcp_server.py` — no matching test reference detected
|
||||
- `skills/research/domain-intel/scripts/domain_intel.py` — no matching test reference detected
|
||||
- `wizards/allegro/home/skills/research/domain-intel/scripts/domain_intel.py` — no matching test reference detected
|
||||
- `timmy-local/scripts/ingest.py` — no matching test reference detected
|
||||
|
||||
## Security Audit Findings
|
||||
|
||||
- [medium] `briefings/briefing_20260325.json:37` — hardcoded http endpoint: plaintext or fixed HTTP endpoints can drift or leak across environments. Evidence: `"gitea_error": "Gitea 404: {\"errors\":null,\"message\":\"not found\",\"url\":\"http://143.198.27.163:3000/api/swagger\"}\n [http://143.198.27.163:3000/api/v1/repos/Timmy_Foundation/sovereign-orchestration/issues?state=open&type=issues&sort=created&direction=desc&limit=1&page=1]",`
|
||||
- [medium] `briefings/briefing_20260328.json:11` — hardcoded http endpoint: plaintext or fixed HTTP endpoints can drift or leak across environments. Evidence: `"provider_base_url": "http://localhost:8081/v1",`
|
||||
- [medium] `briefings/briefing_20260329.json:11` — hardcoded http endpoint: plaintext or fixed HTTP endpoints can drift or leak across environments. Evidence: `"provider_base_url": "http://localhost:8081/v1",`
|
||||
- [medium] `config.yaml:37` — hardcoded http endpoint: plaintext or fixed HTTP endpoints can drift or leak across environments. Evidence: `summary_base_url: http://localhost:11434/v1`
|
||||
- [medium] `config.yaml:47` — hardcoded http endpoint: plaintext or fixed HTTP endpoints can drift or leak across environments. Evidence: `base_url: 'http://localhost:11434/v1'`
|
||||
- [medium] `config.yaml:52` — hardcoded http endpoint: plaintext or fixed HTTP endpoints can drift or leak across environments. Evidence: `base_url: 'http://localhost:11434/v1'`
|
||||
- [medium] `config.yaml:57` — hardcoded http endpoint: plaintext or fixed HTTP endpoints can drift or leak across environments. Evidence: `base_url: 'http://localhost:11434/v1'`
|
||||
- [medium] `config.yaml:62` — hardcoded http endpoint: plaintext or fixed HTTP endpoints can drift or leak across environments. Evidence: `base_url: 'http://localhost:11434/v1'`
|
||||
- [medium] `config.yaml:67` — hardcoded http endpoint: plaintext or fixed HTTP endpoints can drift or leak across environments. Evidence: `base_url: 'http://localhost:11434/v1'`
|
||||
- [medium] `config.yaml:77` — hardcoded http endpoint: plaintext or fixed HTTP endpoints can drift or leak across environments. Evidence: `base_url: 'http://localhost:11434/v1'`
|
||||
- [medium] `config.yaml:82` — hardcoded http endpoint: plaintext or fixed HTTP endpoints can drift or leak across environments. Evidence: `base_url: 'http://localhost:11434/v1'`
|
||||
- [medium] `config.yaml:174` — hardcoded http endpoint: plaintext or fixed HTTP endpoints can drift or leak across environments. Evidence: `base_url: http://localhost:11434/v1`
|
||||
|
||||
## Dead Code Candidates
|
||||
|
||||
- `wizards/allegro/home/skills/red-teaming/godmode/scripts/auto_jailbreak.py` — not imported by indexed Python modules and not referenced by tests
|
||||
- `timmy-local/cache/agent_cache.py` — not imported by indexed Python modules and not referenced by tests
|
||||
- `wizards/allegro/home/skills/red-teaming/godmode/scripts/parseltongue.py` — not imported by indexed Python modules and not referenced by tests
|
||||
- `twitter-archive/multimodal_pipeline.py` — not imported by indexed Python modules and not referenced by tests
|
||||
- `wizards/allegro/home/skills/red-teaming/godmode/scripts/godmode_race.py` — not imported by indexed Python modules and not referenced by tests
|
||||
- `skills/productivity/google-workspace/scripts/google_api.py` — not imported by indexed Python modules and not referenced by tests
|
||||
- `wizards/allegro/home/skills/productivity/google-workspace/scripts/google_api.py` — not imported by indexed Python modules and not referenced by tests
|
||||
- `morrowind/pilot.py` — not imported by indexed Python modules and not referenced by tests
|
||||
- `morrowind/mcp_server.py` — not imported by indexed Python modules and not referenced by tests
|
||||
- `skills/research/domain-intel/scripts/domain_intel.py` — not imported by indexed Python modules and not referenced by tests
|
||||
|
||||
## Performance Bottleneck Analysis
|
||||
|
||||
- `angband/mcp_server.py` — large module (353 lines) likely hides multiple responsibilities
|
||||
- `evennia/timmy_world/game.py` — large module (1541 lines) likely hides multiple responsibilities
|
||||
- `evennia/timmy_world/world/game.py` — large module (1345 lines) likely hides multiple responsibilities
|
||||
- `morrowind/mcp_server.py` — large module (451 lines) likely hides multiple responsibilities
|
||||
- `morrowind/pilot.py` — large module (459 lines) likely hides multiple responsibilities
|
||||
- `pipelines/codebase_genome.py` — large module (557 lines) likely hides multiple responsibilities
|
||||
- `scripts/know_thy_father/crossref_audit.py` — large module (657 lines) likely hides multiple responsibilities
|
||||
- `scripts/know_thy_father/index_media.py` — large module (405 lines) likely hides multiple responsibilities
|
||||
- `scripts/know_thy_father/synthesize_kernels.py` — large module (416 lines) likely hides multiple responsibilities
|
||||
- `scripts/tower_game.py` — large module (395 lines) likely hides multiple responsibilities
|
||||
79
docs/CODEBASE_GENOME_PIPELINE.md
Normal file
79
docs/CODEBASE_GENOME_PIPELINE.md
Normal file
@@ -0,0 +1,79 @@
|
||||
# Codebase Genome Pipeline
|
||||
|
||||
Issue: `timmy-home#665`
|
||||
|
||||
This pipeline gives Timmy a repeatable way to generate a deterministic `GENOME.md` for any repository and rotate through the org nightly.
|
||||
|
||||
## What landed
|
||||
|
||||
- `pipelines/codebase_genome.py` — static analyzer that writes `GENOME.md`
|
||||
- `pipelines/codebase-genome.py` — thin CLI wrapper matching the expected pipeline-style entrypoint
|
||||
- `scripts/codebase_genome_nightly.py` — org-aware nightly runner that selects the next repo, updates a local checkout, and writes the genome artifact
|
||||
- `GENOME.md` — generated analysis for `timmy-home` itself
|
||||
|
||||
## Genome output
|
||||
|
||||
Each generated `GENOME.md` includes:
|
||||
|
||||
- project overview and repository size metrics
|
||||
- Mermaid architecture diagram
|
||||
- entry points and API surface
|
||||
- data flow summary
|
||||
- key abstractions from Python source
|
||||
- test coverage gaps
|
||||
- security audit findings
|
||||
- dead code candidates
|
||||
- performance bottleneck analysis
|
||||
|
||||
## Single-repo usage
|
||||
|
||||
```bash
|
||||
python3 pipelines/codebase_genome.py \
|
||||
--repo-root /path/to/repo \
|
||||
--repo-name Timmy_Foundation/some-repo \
|
||||
--output /path/to/repo/GENOME.md
|
||||
```
|
||||
|
||||
The hyphenated wrapper also works:
|
||||
|
||||
```bash
|
||||
python3 pipelines/codebase-genome.py --repo-root /path/to/repo --repo Timmy_Foundation/some-repo
|
||||
```
|
||||
|
||||
## Nightly org rotation
|
||||
|
||||
Dry-run the next selection:
|
||||
|
||||
```bash
|
||||
python3 scripts/codebase_genome_nightly.py --dry-run
|
||||
```
|
||||
|
||||
Run one real pass:
|
||||
|
||||
```bash
|
||||
python3 scripts/codebase_genome_nightly.py \
|
||||
--org Timmy_Foundation \
|
||||
--workspace-root ~/timmy-foundation-repos \
|
||||
--output-root ~/.timmy/codebase-genomes \
|
||||
--state-path ~/.timmy/codebase_genome_state.json
|
||||
```
|
||||
|
||||
Behavior:
|
||||
|
||||
1. fetches the current repo list from Gitea
|
||||
2. selects the next repo after the last recorded run
|
||||
3. clones or fast-forwards the local checkout
|
||||
4. writes `GENOME.md` into the configured output tree
|
||||
5. updates the rotation state file
|
||||
|
||||
## Example cron entry
|
||||
|
||||
```cron
|
||||
30 2 * * * cd ~/timmy-home && /usr/bin/env python3 scripts/codebase_genome_nightly.py --org Timmy_Foundation --workspace-root ~/timmy-foundation-repos --output-root ~/.timmy/codebase-genomes --state-path ~/.timmy/codebase_genome_state.json >> ~/.timmy/logs/codebase_genome_nightly.log 2>&1
|
||||
```
|
||||
|
||||
## Limits and follow-ons
|
||||
|
||||
- the generator is deterministic and static; it does not hallucinate architecture, but it also does not replace a full human review pass
|
||||
- nightly rotation handles genome generation; auto-generated test expansion remains a separate follow-on lane
|
||||
- large repos may still need a second-pass human edit after the initial genome artifact lands
|
||||
@@ -12,6 +12,7 @@ Quick-reference index for common operational tasks across the Timmy Foundation i
|
||||
| Check fleet health | fleet-ops | `python3 scripts/fleet_readiness.py` |
|
||||
| Agent scorecard | fleet-ops | `python3 scripts/agent_scorecard.py` |
|
||||
| View fleet manifest | fleet-ops | `cat manifest.yaml` |
|
||||
| Run nightly codebase genome pass | timmy-home | `python3 scripts/codebase_genome_nightly.py --dry-run` |
|
||||
|
||||
## the-nexus (Frontend + Brain)
|
||||
|
||||
|
||||
1
pipelines/__init__.py
Normal file
1
pipelines/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Codebase genome pipeline helpers."""
|
||||
6
pipelines/codebase-genome.py
Normal file
6
pipelines/codebase-genome.py
Normal file
@@ -0,0 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
from codebase_genome import main
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
557
pipelines/codebase_genome.py
Normal file
557
pipelines/codebase_genome.py
Normal file
@@ -0,0 +1,557 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Generate a deterministic GENOME.md for a repository."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import ast
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple
|
||||
|
||||
|
||||
IGNORED_DIRS = {
|
||||
".git",
|
||||
".hg",
|
||||
".svn",
|
||||
".venv",
|
||||
"venv",
|
||||
"node_modules",
|
||||
"__pycache__",
|
||||
".mypy_cache",
|
||||
".pytest_cache",
|
||||
"dist",
|
||||
"build",
|
||||
"coverage",
|
||||
}
|
||||
|
||||
TEXT_SUFFIXES = {
|
||||
".py",
|
||||
".js",
|
||||
".mjs",
|
||||
".cjs",
|
||||
".ts",
|
||||
".tsx",
|
||||
".jsx",
|
||||
".html",
|
||||
".css",
|
||||
".md",
|
||||
".txt",
|
||||
".json",
|
||||
".yaml",
|
||||
".yml",
|
||||
".sh",
|
||||
".ini",
|
||||
".cfg",
|
||||
".toml",
|
||||
}
|
||||
|
||||
SOURCE_SUFFIXES = {".py", ".js", ".mjs", ".cjs", ".ts", ".tsx", ".jsx", ".sh"}
|
||||
DOC_FILENAMES = {"README.md", "CONTRIBUTING.md", "SOUL.md"}
|
||||
|
||||
|
||||
class RepoFile(NamedTuple):
|
||||
path: str
|
||||
abs_path: Path
|
||||
size_bytes: int
|
||||
line_count: int
|
||||
kind: str
|
||||
|
||||
|
||||
class RunSummary(NamedTuple):
|
||||
markdown: str
|
||||
source_count: int
|
||||
test_count: int
|
||||
doc_count: int
|
||||
|
||||
|
||||
def _is_text_file(path: Path) -> bool:
|
||||
return path.suffix.lower() in TEXT_SUFFIXES or path.name in {"Dockerfile", "Makefile"}
|
||||
|
||||
|
||||
def _file_kind(rel_path: str, path: Path) -> str:
|
||||
suffix = path.suffix.lower()
|
||||
if rel_path.startswith("tests/") or path.name.startswith("test_"):
|
||||
return "test"
|
||||
if rel_path.startswith("docs/") or path.name in DOC_FILENAMES or suffix == ".md":
|
||||
return "doc"
|
||||
if suffix in {".json", ".yaml", ".yml", ".toml", ".ini", ".cfg"}:
|
||||
return "config"
|
||||
if suffix == ".sh":
|
||||
return "script"
|
||||
if rel_path.startswith("scripts/") and suffix == ".py" and path.name != "__init__.py":
|
||||
return "script"
|
||||
if suffix in SOURCE_SUFFIXES:
|
||||
return "source"
|
||||
return "other"
|
||||
|
||||
|
||||
def collect_repo_files(repo_root: str | Path) -> list[RepoFile]:
|
||||
root = Path(repo_root).resolve()
|
||||
files: list[RepoFile] = []
|
||||
for current_root, dirnames, filenames in os.walk(root):
|
||||
dirnames[:] = sorted(d for d in dirnames if d not in IGNORED_DIRS)
|
||||
base = Path(current_root)
|
||||
for filename in sorted(filenames):
|
||||
path = base / filename
|
||||
if not _is_text_file(path):
|
||||
continue
|
||||
rel_path = path.relative_to(root).as_posix()
|
||||
text = path.read_text(encoding="utf-8", errors="replace")
|
||||
files.append(
|
||||
RepoFile(
|
||||
path=rel_path,
|
||||
abs_path=path,
|
||||
size_bytes=path.stat().st_size,
|
||||
line_count=max(1, len(text.splitlines())),
|
||||
kind=_file_kind(rel_path, path),
|
||||
)
|
||||
)
|
||||
return sorted(files, key=lambda item: item.path)
|
||||
|
||||
|
||||
def _safe_text(path: Path) -> str:
|
||||
return path.read_text(encoding="utf-8", errors="replace")
|
||||
|
||||
|
||||
def _sanitize_node_id(name: str) -> str:
|
||||
cleaned = re.sub(r"[^A-Za-z0-9_]", "_", name)
|
||||
return cleaned or "node"
|
||||
|
||||
|
||||
def _component_name(path: str) -> str:
|
||||
if "/" in path:
|
||||
return path.split("/", 1)[0]
|
||||
return Path(path).stem or path
|
||||
|
||||
|
||||
def _priority_files(files: list[RepoFile], kinds: tuple[str, ...], limit: int = 8) -> list[RepoFile]:
|
||||
items = [item for item in files if item.kind in kinds]
|
||||
items.sort(key=lambda item: (-int(item.path.count("/") == 0), -item.line_count, item.path))
|
||||
return items[:limit]
|
||||
|
||||
|
||||
def _readme_summary(root: Path) -> str:
|
||||
readme = root / "README.md"
|
||||
if not readme.exists():
|
||||
return "Repository-specific overview missing from README.md. Genome generated from code structure and tests."
|
||||
paragraphs: list[str] = []
|
||||
current: list[str] = []
|
||||
for raw_line in _safe_text(readme).splitlines():
|
||||
line = raw_line.strip()
|
||||
if not line:
|
||||
if current:
|
||||
paragraphs.append(" ".join(current).strip())
|
||||
current = []
|
||||
continue
|
||||
if line.startswith("#"):
|
||||
continue
|
||||
current.append(line)
|
||||
if current:
|
||||
paragraphs.append(" ".join(current).strip())
|
||||
return paragraphs[0] if paragraphs else "README.md exists but does not contain a prose overview paragraph."
|
||||
|
||||
|
||||
def _extract_python_imports(text: str) -> set[str]:
|
||||
try:
|
||||
tree = ast.parse(text)
|
||||
except SyntaxError:
|
||||
return set()
|
||||
imports: set[str] = set()
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.Import):
|
||||
for alias in node.names:
|
||||
imports.add(alias.name.split(".", 1)[0])
|
||||
elif isinstance(node, ast.ImportFrom):
|
||||
if node.module:
|
||||
imports.add(node.module.split(".", 1)[0])
|
||||
return imports
|
||||
|
||||
|
||||
def _extract_python_symbols(text: str) -> tuple[list[tuple[str, int]], list[tuple[str, int]]]:
|
||||
try:
|
||||
tree = ast.parse(text)
|
||||
except SyntaxError:
|
||||
return [], []
|
||||
classes: list[tuple[str, int]] = []
|
||||
functions: list[tuple[str, int]] = []
|
||||
for node in tree.body:
|
||||
if isinstance(node, ast.ClassDef):
|
||||
classes.append((node.name, node.lineno))
|
||||
elif isinstance(node, ast.FunctionDef):
|
||||
functions.append((node.name, node.lineno))
|
||||
return classes, functions
|
||||
|
||||
|
||||
def _build_component_edges(files: list[RepoFile]) -> list[tuple[str, str]]:
|
||||
known_components = {_component_name(item.path) for item in files if item.kind in {"source", "script", "test"}}
|
||||
edges: set[tuple[str, str]] = set()
|
||||
for item in files:
|
||||
if item.kind not in {"source", "script", "test"} or item.abs_path.suffix.lower() != ".py":
|
||||
continue
|
||||
src = _component_name(item.path)
|
||||
imports = _extract_python_imports(_safe_text(item.abs_path))
|
||||
for imported in imports:
|
||||
if imported in known_components and imported != src:
|
||||
edges.add((src, imported))
|
||||
return sorted(edges)
|
||||
|
||||
|
||||
def _render_mermaid(files: list[RepoFile]) -> str:
|
||||
components = sorted(
|
||||
{
|
||||
_component_name(item.path)
|
||||
for item in files
|
||||
if item.kind in {"source", "script", "test", "config"}
|
||||
and not _component_name(item.path).startswith(".")
|
||||
}
|
||||
)
|
||||
edges = _build_component_edges(files)
|
||||
lines = ["graph TD"]
|
||||
if not components:
|
||||
lines.append(" repo[\"repository\"]")
|
||||
return "\n".join(lines)
|
||||
|
||||
for component in components[:10]:
|
||||
node_id = _sanitize_node_id(component)
|
||||
lines.append(f" {node_id}[\"{component}\"]")
|
||||
|
||||
seen_components = set(components[:10])
|
||||
emitted = False
|
||||
for src, dst in edges:
|
||||
if src in seen_components and dst in seen_components:
|
||||
lines.append(f" {_sanitize_node_id(src)} --> {_sanitize_node_id(dst)}")
|
||||
emitted = True
|
||||
if not emitted:
|
||||
root_id = "repo_root"
|
||||
lines.insert(1, f" {root_id}[\"repo\"]")
|
||||
for component in components[:6]:
|
||||
lines.append(f" {root_id} --> {_sanitize_node_id(component)}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _entry_points(files: list[RepoFile]) -> list[dict[str, str]]:
|
||||
points: list[dict[str, str]] = []
|
||||
for item in files:
|
||||
text = _safe_text(item.abs_path)
|
||||
if item.kind == "script":
|
||||
points.append({"path": item.path, "reason": "operational script", "command": f"python3 {item.path}" if item.abs_path.suffix == ".py" else f"bash {item.path}"})
|
||||
continue
|
||||
if item.abs_path.suffix == ".py" and "if __name__ == '__main__':" in text:
|
||||
points.append({"path": item.path, "reason": "python main guard", "command": f"python3 {item.path}"})
|
||||
elif item.path in {"app.py", "server.py", "main.py"}:
|
||||
points.append({"path": item.path, "reason": "top-level executable", "command": f"python3 {item.path}"})
|
||||
seen: set[str] = set()
|
||||
deduped: list[dict[str, str]] = []
|
||||
for point in points:
|
||||
if point["path"] in seen:
|
||||
continue
|
||||
seen.add(point["path"])
|
||||
deduped.append(point)
|
||||
return deduped[:12]
|
||||
|
||||
|
||||
def _test_coverage(files: list[RepoFile]) -> tuple[list[RepoFile], list[RepoFile], list[RepoFile]]:
|
||||
source_files = [
|
||||
item
|
||||
for item in files
|
||||
if item.kind in {"source", "script"}
|
||||
and item.path not in {"pipelines/codebase-genome.py", "pipelines/codebase_genome.py"}
|
||||
and not item.path.endswith("/__init__.py")
|
||||
]
|
||||
test_files = [item for item in files if item.kind == "test"]
|
||||
combined_test_text = "\n".join(_safe_text(item.abs_path) for item in test_files)
|
||||
entry_paths = {point["path"] for point in _entry_points(files)}
|
||||
|
||||
gaps: list[RepoFile] = []
|
||||
for item in source_files:
|
||||
stem = item.abs_path.stem
|
||||
if item.path in entry_paths:
|
||||
continue
|
||||
if stem and stem in combined_test_text:
|
||||
continue
|
||||
gaps.append(item)
|
||||
gaps.sort(key=lambda item: (-item.line_count, item.path))
|
||||
return source_files, test_files, gaps
|
||||
|
||||
|
||||
def _security_findings(files: list[RepoFile]) -> list[dict[str, str]]:
|
||||
rules = [
|
||||
("high", "shell execution", re.compile(r"shell\s*=\s*True"), "shell=True expands blast radius for command execution"),
|
||||
("high", "dynamic evaluation", re.compile(r"\b(eval|exec)\s*\("), "dynamic evaluation bypasses static guarantees"),
|
||||
("medium", "unsafe deserialization", re.compile(r"pickle\.load\(|yaml\.load\("), "deserialization of untrusted data can execute code"),
|
||||
("medium", "network egress", re.compile(r"urllib\.request\.urlopen\(|requests\.(get|post|put|delete)\("), "outbound network calls create runtime dependency and failure surface"),
|
||||
("medium", "hardcoded http endpoint", re.compile(r"http://[^\s\"']+"), "plaintext or fixed HTTP endpoints can drift or leak across environments"),
|
||||
]
|
||||
findings: list[dict[str, str]] = []
|
||||
for item in files:
|
||||
if item.kind not in {"source", "script", "config"}:
|
||||
continue
|
||||
for lineno, line in enumerate(_safe_text(item.abs_path).splitlines(), start=1):
|
||||
for severity, category, pattern, detail in rules:
|
||||
if pattern.search(line):
|
||||
findings.append(
|
||||
{
|
||||
"severity": severity,
|
||||
"category": category,
|
||||
"ref": f"{item.path}:{lineno}",
|
||||
"line": line.strip(),
|
||||
"detail": detail,
|
||||
}
|
||||
)
|
||||
break
|
||||
if len(findings) >= 12:
|
||||
return findings
|
||||
return findings
|
||||
|
||||
|
||||
def _dead_code_candidates(files: list[RepoFile]) -> list[RepoFile]:
|
||||
source_files = [item for item in files if item.kind in {"source", "script"} and item.abs_path.suffix == ".py"]
|
||||
imports_by_file = {
|
||||
item.path: _extract_python_imports(_safe_text(item.abs_path))
|
||||
for item in source_files
|
||||
}
|
||||
imported_names = {name for imports in imports_by_file.values() for name in imports}
|
||||
referenced_by_tests = "\n".join(_safe_text(item.abs_path) for item in files if item.kind == "test")
|
||||
entry_paths = {point["path"] for point in _entry_points(files)}
|
||||
|
||||
candidates: list[RepoFile] = []
|
||||
for item in source_files:
|
||||
stem = item.abs_path.stem
|
||||
if item.path in entry_paths:
|
||||
continue
|
||||
if stem in imported_names:
|
||||
continue
|
||||
if stem in referenced_by_tests:
|
||||
continue
|
||||
if stem in {"__init__", "conftest"}:
|
||||
continue
|
||||
candidates.append(item)
|
||||
candidates.sort(key=lambda item: (-item.line_count, item.path))
|
||||
return candidates[:10]
|
||||
|
||||
|
||||
def _performance_findings(files: list[RepoFile]) -> list[dict[str, str]]:
|
||||
findings: list[dict[str, str]] = []
|
||||
for item in files:
|
||||
if item.kind in {"source", "script"} and item.line_count >= 350:
|
||||
findings.append({
|
||||
"ref": item.path,
|
||||
"detail": f"large module ({item.line_count} lines) likely hides multiple responsibilities",
|
||||
})
|
||||
for item in files:
|
||||
if item.kind not in {"source", "script"}:
|
||||
continue
|
||||
text = _safe_text(item.abs_path)
|
||||
if "os.walk(" in text or ".rglob(" in text or "glob.glob(" in text:
|
||||
findings.append({
|
||||
"ref": item.path,
|
||||
"detail": "per-run filesystem scan detected; performance scales with repo size",
|
||||
})
|
||||
if "urllib.request.urlopen(" in text or "requests.get(" in text or "requests.post(" in text:
|
||||
findings.append({
|
||||
"ref": item.path,
|
||||
"detail": "network-bound execution path can dominate runtime and create flaky throughput",
|
||||
})
|
||||
deduped: list[dict[str, str]] = []
|
||||
seen: set[tuple[str, str]] = set()
|
||||
for finding in findings:
|
||||
key = (finding["ref"], finding["detail"])
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
deduped.append(finding)
|
||||
return deduped[:10]
|
||||
|
||||
|
||||
def _key_abstractions(files: list[RepoFile]) -> list[dict[str, object]]:
|
||||
abstractions: list[dict[str, object]] = []
|
||||
for item in _priority_files(files, ("source", "script"), limit=10):
|
||||
if item.abs_path.suffix != ".py":
|
||||
continue
|
||||
classes, functions = _extract_python_symbols(_safe_text(item.abs_path))
|
||||
if not classes and not functions:
|
||||
continue
|
||||
abstractions.append(
|
||||
{
|
||||
"path": item.path,
|
||||
"classes": classes[:4],
|
||||
"functions": [entry for entry in functions[:6] if not entry[0].startswith("_")],
|
||||
}
|
||||
)
|
||||
return abstractions[:8]
|
||||
|
||||
|
||||
def _api_surface(entry_points: list[dict[str, str]], abstractions: list[dict[str, object]]) -> list[str]:
|
||||
api_lines: list[str] = []
|
||||
for entry in entry_points[:8]:
|
||||
api_lines.append(f"- CLI: `{entry['command']}` — {entry['reason']} (`{entry['path']}`)")
|
||||
for abstraction in abstractions[:5]:
|
||||
for func_name, lineno in abstraction["functions"]:
|
||||
api_lines.append(f"- Python: `{func_name}()` from `{abstraction['path']}:{lineno}`")
|
||||
if len(api_lines) >= 14:
|
||||
return api_lines
|
||||
return api_lines
|
||||
|
||||
|
||||
def _data_flow(entry_points: list[dict[str, str]], files: list[RepoFile], gaps: list[RepoFile]) -> list[str]:
|
||||
components = sorted(
|
||||
{
|
||||
_component_name(item.path)
|
||||
for item in files
|
||||
if item.kind in {"source", "script", "test", "config"} and not _component_name(item.path).startswith(".")
|
||||
}
|
||||
)
|
||||
lines = []
|
||||
if entry_points:
|
||||
lines.append(f"1. Operators enter through {', '.join(f'`{item['path']}`' for item in entry_points[:3])}.")
|
||||
else:
|
||||
lines.append("1. No explicit CLI/main guard entry point was detected; execution appears library- or doc-driven.")
|
||||
if components:
|
||||
lines.append(f"2. Core logic fans into top-level components: {', '.join(f'`{name}`' for name in components[:6])}.")
|
||||
if gaps:
|
||||
lines.append(f"3. Validation is incomplete around {', '.join(f'`{item.path}`' for item in gaps[:3])}, so changes there carry regression risk.")
|
||||
else:
|
||||
lines.append("3. Tests appear to reference the currently indexed source set, reducing blind spots in the hot path.")
|
||||
lines.append("4. Final artifacts land as repository files, docs, or runtime side effects depending on the selected entry point.")
|
||||
return lines
|
||||
|
||||
|
||||
def generate_genome_markdown(repo_root: str | Path, repo_name: str | None = None) -> str:
|
||||
root = Path(repo_root).resolve()
|
||||
files = collect_repo_files(root)
|
||||
repo_display = repo_name or root.name
|
||||
summary = _readme_summary(root)
|
||||
entry_points = _entry_points(files)
|
||||
source_files, test_files, coverage_gaps = _test_coverage(files)
|
||||
security = _security_findings(files)
|
||||
dead_code = _dead_code_candidates(files)
|
||||
performance = _performance_findings(files)
|
||||
abstractions = _key_abstractions(files)
|
||||
api_surface = _api_surface(entry_points, abstractions)
|
||||
data_flow = _data_flow(entry_points, files, coverage_gaps)
|
||||
mermaid = _render_mermaid(files)
|
||||
|
||||
lines: list[str] = [
|
||||
f"# GENOME.md — {repo_display}",
|
||||
"",
|
||||
"Generated by `pipelines/codebase_genome.py`.",
|
||||
"",
|
||||
"## Project Overview",
|
||||
"",
|
||||
summary,
|
||||
"",
|
||||
f"- Text files indexed: {len(files)}",
|
||||
f"- Source and script files: {len(source_files)}",
|
||||
f"- Test files: {len(test_files)}",
|
||||
f"- Documentation files: {len([item for item in files if item.kind == 'doc'])}",
|
||||
"",
|
||||
"## Architecture",
|
||||
"",
|
||||
"```mermaid",
|
||||
mermaid,
|
||||
"```",
|
||||
"",
|
||||
"## Entry Points",
|
||||
"",
|
||||
]
|
||||
|
||||
if entry_points:
|
||||
for item in entry_points:
|
||||
lines.append(f"- `{item['path']}` — {item['reason']} (`{item['command']}`)")
|
||||
else:
|
||||
lines.append("- No explicit entry point detected.")
|
||||
|
||||
lines.extend(["", "## Data Flow", ""])
|
||||
lines.extend(data_flow)
|
||||
|
||||
lines.extend(["", "## Key Abstractions", ""])
|
||||
if abstractions:
|
||||
for abstraction in abstractions:
|
||||
path = abstraction["path"]
|
||||
classes = abstraction["classes"]
|
||||
functions = abstraction["functions"]
|
||||
class_bits = ", ".join(f"`{name}`:{lineno}" for name, lineno in classes) or "none detected"
|
||||
function_bits = ", ".join(f"`{name}()`:{lineno}" for name, lineno in functions) or "none detected"
|
||||
lines.append(f"- `{path}` — classes {class_bits}; functions {function_bits}")
|
||||
else:
|
||||
lines.append("- No Python classes or top-level functions detected in the highest-priority source files.")
|
||||
|
||||
lines.extend(["", "## API Surface", ""])
|
||||
if api_surface:
|
||||
lines.extend(api_surface)
|
||||
else:
|
||||
lines.append("- No obvious public API surface detected.")
|
||||
|
||||
lines.extend(["", "## Test Coverage Report", ""])
|
||||
lines.append(f"- Source and script files inspected: {len(source_files)}")
|
||||
lines.append(f"- Test files inspected: {len(test_files)}")
|
||||
if coverage_gaps:
|
||||
lines.append("- Coverage gaps:")
|
||||
for item in coverage_gaps[:12]:
|
||||
lines.append(f" - `{item.path}` — no matching test reference detected")
|
||||
else:
|
||||
lines.append("- No obvious coverage gaps detected by the stem-matching heuristic.")
|
||||
|
||||
lines.extend(["", "## Security Audit Findings", ""])
|
||||
if security:
|
||||
for finding in security:
|
||||
lines.append(
|
||||
f"- [{finding['severity']}] `{finding['ref']}` — {finding['category']}: {finding['detail']}. Evidence: `{finding['line']}`"
|
||||
)
|
||||
else:
|
||||
lines.append("- No high-signal security findings detected by the static heuristics in this pass.")
|
||||
|
||||
lines.extend(["", "## Dead Code Candidates", ""])
|
||||
if dead_code:
|
||||
for item in dead_code:
|
||||
lines.append(f"- `{item.path}` — not imported by indexed Python modules and not referenced by tests")
|
||||
else:
|
||||
lines.append("- No obvious dead-code candidates detected.")
|
||||
|
||||
lines.extend(["", "## Performance Bottleneck Analysis", ""])
|
||||
if performance:
|
||||
for finding in performance:
|
||||
lines.append(f"- `{finding['ref']}` — {finding['detail']}")
|
||||
else:
|
||||
lines.append("- No obvious performance hotspots detected by the static heuristics in this pass.")
|
||||
|
||||
return "\n".join(lines).rstrip() + "\n"
|
||||
|
||||
|
||||
def write_genome(repo_root: str | Path, repo_name: str | None = None, output_path: str | Path | None = None) -> RunSummary:
|
||||
root = Path(repo_root).resolve()
|
||||
markdown = generate_genome_markdown(root, repo_name=repo_name)
|
||||
out_path = Path(output_path) if output_path else root / "GENOME.md"
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
out_path.write_text(markdown, encoding="utf-8")
|
||||
files = collect_repo_files(root)
|
||||
source_files, test_files, _ = _test_coverage(files)
|
||||
return RunSummary(
|
||||
markdown=markdown,
|
||||
source_count=len(source_files),
|
||||
test_count=len(test_files),
|
||||
doc_count=len([item for item in files if item.kind == "doc"]),
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Generate a deterministic GENOME.md for a repository")
|
||||
parser.add_argument("--repo-root", required=True, help="Path to the repository to analyze")
|
||||
parser.add_argument("--repo", dest="repo_name", default=None, help="Optional repo display name")
|
||||
parser.add_argument("--repo-name", dest="repo_name_override", default=None, help="Optional repo display name")
|
||||
parser.add_argument("--output", default=None, help="Path to write GENOME.md (defaults to <repo-root>/GENOME.md)")
|
||||
args = parser.parse_args()
|
||||
|
||||
repo_name = args.repo_name_override or args.repo_name
|
||||
summary = write_genome(args.repo_root, repo_name=repo_name, output_path=args.output)
|
||||
target = Path(args.output) if args.output else Path(args.repo_root).resolve() / "GENOME.md"
|
||||
print(
|
||||
f"GENOME.md saved to {target} "
|
||||
f"(sources={summary.source_count}, tests={summary.test_count}, docs={summary.doc_count})"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
171
scripts/codebase_genome_nightly.py
Normal file
171
scripts/codebase_genome_nightly.py
Normal file
@@ -0,0 +1,171 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Nightly runner for the codebase genome pipeline."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple
|
||||
|
||||
|
||||
class RunPlan(NamedTuple):
|
||||
repo: dict
|
||||
repo_dir: Path
|
||||
output_path: Path
|
||||
command: list[str]
|
||||
|
||||
|
||||
def load_state(path: Path) -> dict:
|
||||
if not path.exists():
|
||||
return {}
|
||||
return json.loads(path.read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
def save_state(path: Path, state: dict) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
|
||||
|
||||
|
||||
def select_next_repo(repos: list[dict], state: dict) -> dict:
|
||||
if not repos:
|
||||
raise ValueError("no repositories available for nightly genome run")
|
||||
ordered = sorted(repos, key=lambda item: item.get("full_name", item.get("name", "")).lower())
|
||||
last_repo = state.get("last_repo")
|
||||
for index, repo in enumerate(ordered):
|
||||
if repo.get("name") == last_repo or repo.get("full_name") == last_repo:
|
||||
return ordered[(index + 1) % len(ordered)]
|
||||
last_index = int(state.get("last_index", -1))
|
||||
return ordered[(last_index + 1) % len(ordered)]
|
||||
|
||||
|
||||
def build_run_plan(repo: dict, workspace_root: Path, output_root: Path, pipeline_script: Path) -> RunPlan:
|
||||
repo_dir = workspace_root / repo["name"]
|
||||
output_path = output_root / repo["name"] / "GENOME.md"
|
||||
command = [
|
||||
sys.executable,
|
||||
str(pipeline_script),
|
||||
"--repo-root",
|
||||
str(repo_dir),
|
||||
"--repo-name",
|
||||
repo.get("full_name", repo["name"]),
|
||||
"--output",
|
||||
str(output_path),
|
||||
]
|
||||
return RunPlan(repo=repo, repo_dir=repo_dir, output_path=output_path, command=command)
|
||||
|
||||
|
||||
def fetch_org_repos(org: str, host: str, token_file: Path, include_archived: bool = False) -> list[dict]:
|
||||
token = token_file.read_text(encoding="utf-8").strip()
|
||||
page = 1
|
||||
repos: list[dict] = []
|
||||
while True:
|
||||
req = urllib.request.Request(
|
||||
f"{host.rstrip('/')}/api/v1/orgs/{org}/repos?limit=100&page={page}",
|
||||
headers={"Authorization": f"token {token}", "Accept": "application/json"},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
chunk = json.loads(resp.read().decode("utf-8"))
|
||||
if not chunk:
|
||||
break
|
||||
for item in chunk:
|
||||
if item.get("archived") and not include_archived:
|
||||
continue
|
||||
repos.append(
|
||||
{
|
||||
"name": item["name"],
|
||||
"full_name": item["full_name"],
|
||||
"clone_url": item["clone_url"],
|
||||
"default_branch": item.get("default_branch") or "main",
|
||||
}
|
||||
)
|
||||
page += 1
|
||||
return repos
|
||||
|
||||
|
||||
def _authenticated_clone_url(clone_url: str, token_file: Path) -> str:
|
||||
token = token_file.read_text(encoding="utf-8").strip()
|
||||
if clone_url.startswith("https://"):
|
||||
return f"https://{token}@{clone_url[len('https://') :]}"
|
||||
return clone_url
|
||||
|
||||
|
||||
def ensure_checkout(repo: dict, workspace_root: Path, token_file: Path) -> Path:
|
||||
workspace_root.mkdir(parents=True, exist_ok=True)
|
||||
repo_dir = workspace_root / repo["name"]
|
||||
branch = repo.get("default_branch") or "main"
|
||||
clone_url = _authenticated_clone_url(repo["clone_url"], token_file)
|
||||
|
||||
if (repo_dir / ".git").exists():
|
||||
subprocess.run(["git", "-C", str(repo_dir), "fetch", "origin", branch, "--depth", "1"], check=True)
|
||||
subprocess.run(["git", "-C", str(repo_dir), "checkout", branch], check=True)
|
||||
subprocess.run(["git", "-C", str(repo_dir), "reset", "--hard", f"origin/{branch}"], check=True)
|
||||
else:
|
||||
subprocess.run(
|
||||
["git", "clone", "--depth", "1", "--single-branch", "--branch", branch, clone_url, str(repo_dir)],
|
||||
check=True,
|
||||
)
|
||||
return repo_dir
|
||||
|
||||
|
||||
def run_plan(plan: RunPlan) -> None:
|
||||
plan.output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
subprocess.run(plan.command, check=True)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Run one nightly codebase genome pass for the next repo in an org")
|
||||
parser.add_argument("--org", default="Timmy_Foundation")
|
||||
parser.add_argument("--host", default="https://forge.alexanderwhitestone.com")
|
||||
parser.add_argument("--token-file", default=os.path.expanduser("~/.config/gitea/token"))
|
||||
parser.add_argument("--workspace-root", default=os.path.expanduser("~/timmy-foundation-repos"))
|
||||
parser.add_argument("--output-root", default=os.path.expanduser("~/.timmy/codebase-genomes"))
|
||||
parser.add_argument("--state-path", default=os.path.expanduser("~/.timmy/codebase_genome_state.json"))
|
||||
parser.add_argument("--pipeline-script", default=str(Path(__file__).resolve().parents[1] / "pipelines" / "codebase_genome.py"))
|
||||
parser.add_argument("--include-archived", action="store_true")
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
token_file = Path(args.token_file).expanduser()
|
||||
workspace_root = Path(args.workspace_root).expanduser()
|
||||
output_root = Path(args.output_root).expanduser()
|
||||
state_path = Path(args.state_path).expanduser()
|
||||
pipeline_script = Path(args.pipeline_script).expanduser()
|
||||
|
||||
repos = fetch_org_repos(args.org, args.host, token_file, include_archived=args.include_archived)
|
||||
state = load_state(state_path)
|
||||
repo = select_next_repo(repos, state)
|
||||
plan = build_run_plan(repo, workspace_root=workspace_root, output_root=output_root, pipeline_script=pipeline_script)
|
||||
|
||||
if args.dry_run:
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"repo": repo,
|
||||
"repo_dir": str(plan.repo_dir),
|
||||
"output_path": str(plan.output_path),
|
||||
"command": plan.command,
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
ensure_checkout(repo, workspace_root=workspace_root, token_file=token_file)
|
||||
run_plan(plan)
|
||||
save_state(
|
||||
state_path,
|
||||
{
|
||||
"last_index": sorted(repos, key=lambda item: item.get("full_name", item.get("name", "")).lower()).index(repo),
|
||||
"last_repo": repo.get("name"),
|
||||
},
|
||||
)
|
||||
print(f"Completed genome run for {repo['full_name']} -> {plan.output_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
115
tests/test_codebase_genome_pipeline.py
Normal file
115
tests/test_codebase_genome_pipeline.py
Normal file
@@ -0,0 +1,115 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
PIPELINE_PATH = ROOT / "pipelines" / "codebase_genome.py"
|
||||
NIGHTLY_PATH = ROOT / "scripts" / "codebase_genome_nightly.py"
|
||||
GENOME_PATH = ROOT / "GENOME.md"
|
||||
|
||||
|
||||
def _load_module(path: Path, name: str):
|
||||
assert path.exists(), f"missing {path.relative_to(ROOT)}"
|
||||
spec = importlib.util.spec_from_file_location(name, path)
|
||||
assert spec and spec.loader
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def test_generate_genome_markdown_contains_required_sections(tmp_path: Path) -> None:
|
||||
genome_mod = _load_module(PIPELINE_PATH, "codebase_genome")
|
||||
|
||||
repo = tmp_path / "repo"
|
||||
(repo / "tests").mkdir(parents=True)
|
||||
(repo / "README.md").write_text("# Demo Repo\n\nA tiny example repo.\n")
|
||||
(repo / "app.py").write_text(
|
||||
"import module\n\n"
|
||||
"def main():\n"
|
||||
" return module.Helper().answer()\n\n"
|
||||
"if __name__ == '__main__':\n"
|
||||
" raise SystemExit(main())\n"
|
||||
)
|
||||
(repo / "module.py").write_text(
|
||||
"class Helper:\n"
|
||||
" def answer(self):\n"
|
||||
" return 42\n"
|
||||
)
|
||||
(repo / "dangerous.py").write_text(
|
||||
"import subprocess\n\n"
|
||||
"def run_shell(cmd):\n"
|
||||
" return subprocess.run(cmd, shell=True, check=False)\n"
|
||||
)
|
||||
(repo / "extra.py").write_text("VALUE = 7\n")
|
||||
(repo / "tests" / "test_app.py").write_text(
|
||||
"from app import main\n\n"
|
||||
"def test_main():\n"
|
||||
" assert main() == 42\n"
|
||||
)
|
||||
|
||||
genome = genome_mod.generate_genome_markdown(repo, repo_name="org/repo")
|
||||
|
||||
for heading in (
|
||||
"# GENOME.md — org/repo",
|
||||
"## Project Overview",
|
||||
"## Architecture",
|
||||
"```mermaid",
|
||||
"## Entry Points",
|
||||
"## Data Flow",
|
||||
"## Key Abstractions",
|
||||
"## API Surface",
|
||||
"## Test Coverage Report",
|
||||
"## Security Audit Findings",
|
||||
"## Dead Code Candidates",
|
||||
"## Performance Bottleneck Analysis",
|
||||
):
|
||||
assert heading in genome
|
||||
|
||||
assert "app.py" in genome
|
||||
assert "module.py" in genome
|
||||
assert "dangerous.py" in genome
|
||||
assert "extra.py" in genome
|
||||
assert "shell=True" in genome
|
||||
|
||||
|
||||
def test_nightly_runner_rotates_repos_and_builds_plan() -> None:
|
||||
nightly_mod = _load_module(NIGHTLY_PATH, "codebase_genome_nightly")
|
||||
|
||||
repos = [
|
||||
{"name": "alpha", "full_name": "Timmy_Foundation/alpha", "clone_url": "https://example/alpha.git"},
|
||||
{"name": "beta", "full_name": "Timmy_Foundation/beta", "clone_url": "https://example/beta.git"},
|
||||
]
|
||||
state = {"last_index": 0, "last_repo": "alpha"}
|
||||
|
||||
next_repo = nightly_mod.select_next_repo(repos, state)
|
||||
assert next_repo["name"] == "beta"
|
||||
|
||||
plan = nightly_mod.build_run_plan(
|
||||
repo=next_repo,
|
||||
workspace_root=Path("/tmp/repos"),
|
||||
output_root=Path("/tmp/genomes"),
|
||||
pipeline_script=Path("/tmp/timmy-home/pipelines/codebase_genome.py"),
|
||||
)
|
||||
|
||||
assert plan.repo_dir == Path("/tmp/repos/beta")
|
||||
assert plan.output_path == Path("/tmp/genomes/beta/GENOME.md")
|
||||
assert "codebase_genome.py" in plan.command[1]
|
||||
assert plan.command[-1] == "/tmp/genomes/beta/GENOME.md"
|
||||
|
||||
|
||||
def test_repo_contains_generated_timmy_home_genome() -> None:
|
||||
assert GENOME_PATH.exists(), "missing generated GENOME.md for timmy-home"
|
||||
text = GENOME_PATH.read_text(encoding="utf-8")
|
||||
for snippet in (
|
||||
"# GENOME.md — Timmy_Foundation/timmy-home",
|
||||
"## Project Overview",
|
||||
"## Architecture",
|
||||
"## Entry Points",
|
||||
"## API Surface",
|
||||
"## Test Coverage Report",
|
||||
"## Security Audit Findings",
|
||||
"## Performance Bottleneck Analysis",
|
||||
):
|
||||
assert snippet in text
|
||||
Reference in New Issue
Block a user