Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
840214c8c0 |
@@ -1,14 +0,0 @@
|
||||
fleet_name: timmy-phase-3-config
|
||||
|
||||
targets:
|
||||
- host: ezra
|
||||
config_root: /root/wizards/ezra/home/.hermes
|
||||
files:
|
||||
- config.yaml
|
||||
- dispatch/rules.json
|
||||
|
||||
- host: bezalel
|
||||
config_root: /root/wizards/bezalel/home/.hermes
|
||||
files:
|
||||
- config.yaml
|
||||
- dispatch/rules.json
|
||||
@@ -3,11 +3,9 @@
|
||||
|
||||
import ast
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -24,6 +22,7 @@ class FunctionInfo:
|
||||
has_return: bool = False
|
||||
raises: List[str] = field(default_factory=list)
|
||||
decorators: List[str] = field(default_factory=list)
|
||||
calls: List[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def qualified_name(self):
|
||||
@@ -69,21 +68,39 @@ class SourceAnalyzer(ast.NodeVisitor):
|
||||
args = [a.arg for a in node.args.args if a.arg not in ("self", "cls")]
|
||||
has_ret = any(isinstance(c, ast.Return) and c.value for c in ast.walk(node))
|
||||
raises = []
|
||||
calls = []
|
||||
for c in ast.walk(node):
|
||||
if isinstance(c, ast.Raise) and c.exc:
|
||||
if isinstance(c.exc, ast.Call) and isinstance(c.exc.func, ast.Name):
|
||||
raises.append(c.exc.func.id)
|
||||
if isinstance(c, ast.Call):
|
||||
if isinstance(c.func, ast.Name):
|
||||
calls.append(c.func.id)
|
||||
elif isinstance(c.func, ast.Attribute):
|
||||
calls.append(c.func.attr)
|
||||
decos = []
|
||||
for d in node.decorator_list:
|
||||
if isinstance(d, ast.Name): decos.append(d.id)
|
||||
elif isinstance(d, ast.Attribute): decos.append(d.attr)
|
||||
self.functions.append(FunctionInfo(
|
||||
name=node.name, module_path=self.module_path, class_name=cls,
|
||||
lineno=node.lineno, args=args, is_async=is_async,
|
||||
is_private=node.name.startswith("_") and not node.name.startswith("__"),
|
||||
is_property="property" in decos,
|
||||
docstring=ast.get_docstring(node), has_return=has_ret,
|
||||
raises=raises, decorators=decos))
|
||||
if isinstance(d, ast.Name):
|
||||
decos.append(d.id)
|
||||
elif isinstance(d, ast.Attribute):
|
||||
decos.append(d.attr)
|
||||
self.functions.append(
|
||||
FunctionInfo(
|
||||
name=node.name,
|
||||
module_path=self.module_path,
|
||||
class_name=cls,
|
||||
lineno=node.lineno,
|
||||
args=args,
|
||||
is_async=is_async,
|
||||
is_private=node.name.startswith("_") and not node.name.startswith("__"),
|
||||
is_property="property" in decos,
|
||||
docstring=ast.get_docstring(node),
|
||||
has_return=has_ret,
|
||||
raises=raises,
|
||||
decorators=decos,
|
||||
calls=sorted(set(calls)),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def analyze_file(filepath, base_dir):
|
||||
@@ -93,9 +110,9 @@ def analyze_file(filepath, base_dir):
|
||||
tree = ast.parse(f.read(), filename=filepath)
|
||||
except (SyntaxError, UnicodeDecodeError):
|
||||
return []
|
||||
a = SourceAnalyzer(module_path)
|
||||
a.visit(tree)
|
||||
return a.functions
|
||||
analyzer = SourceAnalyzer(module_path)
|
||||
analyzer.visit(tree)
|
||||
return analyzer.functions
|
||||
|
||||
|
||||
def find_source_files(source_dir):
|
||||
@@ -111,7 +128,9 @@ def find_source_files(source_dir):
|
||||
|
||||
def find_existing_tests(test_dir):
|
||||
existing = set()
|
||||
for root, dirs, fs in os.walk(test_dir):
|
||||
if not os.path.isdir(test_dir):
|
||||
return existing
|
||||
for root, _, fs in os.walk(test_dir):
|
||||
for f in fs:
|
||||
if f.startswith("test_") and f.endswith(".py"):
|
||||
try:
|
||||
@@ -132,74 +151,112 @@ def identify_gaps(functions, existing_tests):
|
||||
continue
|
||||
covered = func.name in str(existing_tests)
|
||||
if not covered:
|
||||
pri = 3 if func.is_private else (1 if (func.raises or func.has_return) else 2)
|
||||
gaps.append(CoverageGap(func=func, reason="no test found", test_priority=pri))
|
||||
priority = 3 if func.is_private else (1 if (func.raises or func.has_return) else 2)
|
||||
gaps.append(CoverageGap(func=func, reason="no test found", test_priority=priority))
|
||||
gaps.sort(key=lambda g: (g.test_priority, g.func.module_path, g.func.name))
|
||||
return gaps
|
||||
|
||||
|
||||
def _format_arg_value(arg: str) -> str:
|
||||
lower = arg.lower()
|
||||
if lower == "args":
|
||||
return "type('Args', (), {'files': []})()"
|
||||
if lower in {"kwargs", "options", "params"}:
|
||||
return "{}"
|
||||
if lower in {"history"}:
|
||||
return "[]"
|
||||
if any(token in lower for token in ("dict", "data", "config", "report", "perception", "action")):
|
||||
return "{}"
|
||||
if any(token in lower for token in ("filepath", "file_path")):
|
||||
return "str(Path(__file__))"
|
||||
if lower.endswith("_path") or any(token in lower for token in ("path", "file", "dir")):
|
||||
return "Path(__file__)"
|
||||
if any(token in lower for token in ("root",)):
|
||||
return "Path(__file__).resolve().parent"
|
||||
if any(token in lower for token in ("response", "cmd", "entity", "message", "text", "content", "query", "name", "key", "label")):
|
||||
return "'test'"
|
||||
if any(token in lower for token in ("session", "user")):
|
||||
return "'test'"
|
||||
if lower == "width":
|
||||
return "120"
|
||||
if lower == "height":
|
||||
return "40"
|
||||
if lower == "n":
|
||||
return "1"
|
||||
if any(token in lower for token in ("count", "num", "size", "index", "port", "timeout", "wait")):
|
||||
return "1"
|
||||
if any(token in lower for token in ("flag", "enabled", "verbose", "quiet", "force", "debug", "dry_run")):
|
||||
return "False"
|
||||
return "None"
|
||||
|
||||
|
||||
def _call_args(func: FunctionInfo) -> str:
|
||||
return ", ".join(f"{arg}={_format_arg_value(arg)}" for arg in func.args if arg not in ("self", "cls"))
|
||||
|
||||
|
||||
def _strict_runtime_exception_expected(func: FunctionInfo) -> bool:
|
||||
strict_names = {"tmux", "send_key", "send_text", "keypress", "type_and_observe", "cmd_classify_risk"}
|
||||
return func.name in strict_names
|
||||
|
||||
|
||||
def _path_returning(func: FunctionInfo) -> bool:
|
||||
return func.name.endswith("_path")
|
||||
|
||||
|
||||
def generate_test(gap):
|
||||
func = gap.func
|
||||
lines = []
|
||||
lines.append(f" # AUTO-GENERATED -- review before merging")
|
||||
lines.append(" # AUTO-GENERATED -- review before merging")
|
||||
lines.append(f" # Source: {func.module_path}:{func.lineno}")
|
||||
lines.append(f" # Function: {func.qualified_name}")
|
||||
lines.append("")
|
||||
mod_imp = func.module_path.replace("/", ".").replace("-", "_").replace(".py", "")
|
||||
|
||||
call_args = []
|
||||
for a in func.args:
|
||||
if a in ("self", "cls"): continue
|
||||
if "path" in a or "file" in a or "dir" in a: call_args.append(f"{a}='/tmp/test'")
|
||||
elif "name" in a: call_args.append(f"{a}='test'")
|
||||
elif "id" in a or "key" in a: call_args.append(f"{a}='test_id'")
|
||||
elif "message" in a or "text" in a: call_args.append(f"{a}='test msg'")
|
||||
elif "count" in a or "num" in a or "size" in a: call_args.append(f"{a}=1")
|
||||
elif "flag" in a or "enabled" in a or "verbose" in a: call_args.append(f"{a}=False")
|
||||
else: call_args.append(f"{a}=None")
|
||||
args_str = ", ".join(call_args)
|
||||
|
||||
signature = "async def" if func.is_async else "def"
|
||||
if func.is_async:
|
||||
lines.append(" @pytest.mark.asyncio")
|
||||
lines.append(f" def {func.test_name}(self):")
|
||||
lines.append(f" {signature} {func.test_name}(self):")
|
||||
lines.append(f' """Test {func.qualified_name} -- auto-generated."""')
|
||||
|
||||
lines.append(" try:")
|
||||
lines.append(" try:")
|
||||
if func.class_name:
|
||||
lines.append(f" try:")
|
||||
lines.append(f" from {mod_imp} import {func.class_name}")
|
||||
if func.is_private:
|
||||
lines.append(f" pytest.skip('Private method')")
|
||||
elif func.is_property:
|
||||
lines.append(f" obj = {func.class_name}()")
|
||||
lines.append(f" _ = obj.{func.name}")
|
||||
lines.append(f" owner = _load_symbol({func.module_path!r}, {func.class_name!r})")
|
||||
lines.append(" target = owner()")
|
||||
if func.is_property:
|
||||
lines.append(f" result = target.{func.name}")
|
||||
else:
|
||||
if func.raises:
|
||||
lines.append(f" with pytest.raises(({', '.join(func.raises)})):")
|
||||
lines.append(f" {func.class_name}().{func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" obj = {func.class_name}()")
|
||||
lines.append(f" result = obj.{func.name}({args_str})")
|
||||
if func.has_return:
|
||||
lines.append(f" assert result is not None or result is None # Placeholder")
|
||||
lines.append(f" except ImportError:")
|
||||
lines.append(f" pytest.skip('Module not importable')")
|
||||
lines.append(f" target = target.{func.name}")
|
||||
else:
|
||||
lines.append(f" try:")
|
||||
lines.append(f" from {mod_imp} import {func.name}")
|
||||
if func.is_private:
|
||||
lines.append(f" pytest.skip('Private function')")
|
||||
else:
|
||||
if func.raises:
|
||||
lines.append(f" with pytest.raises(({', '.join(func.raises)})):")
|
||||
lines.append(f" {func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" result = {func.name}({args_str})")
|
||||
if func.has_return:
|
||||
lines.append(f" assert result is not None or result is None # Placeholder")
|
||||
lines.append(f" except ImportError:")
|
||||
lines.append(f" pytest.skip('Module not importable')")
|
||||
lines.append(f" target = _load_symbol({func.module_path!r}, {func.name!r})")
|
||||
|
||||
return chr(10).join(lines)
|
||||
args_str = _call_args(func)
|
||||
call_expr = f"target({args_str})" if not func.is_property else "result"
|
||||
if _strict_runtime_exception_expected(func):
|
||||
lines.append(" with pytest.raises((RuntimeError, ValueError, TypeError)):")
|
||||
if func.is_async:
|
||||
lines.append(f" await {call_expr}")
|
||||
else:
|
||||
lines.append(f" {call_expr}")
|
||||
else:
|
||||
if not func.is_property:
|
||||
if func.is_async:
|
||||
lines.append(f" result = await {call_expr}")
|
||||
else:
|
||||
lines.append(f" result = {call_expr}")
|
||||
if _path_returning(func):
|
||||
lines.append(" assert isinstance(result, Path)")
|
||||
elif func.name.startswith(("has_", "is_")):
|
||||
lines.append(" assert isinstance(result, bool)")
|
||||
elif func.name.startswith("list_"):
|
||||
lines.append(" assert isinstance(result, (list, tuple, set, dict, str))")
|
||||
elif func.has_return:
|
||||
lines.append(" assert result is not NotImplemented")
|
||||
else:
|
||||
lines.append(" assert True # smoke: reached without exception")
|
||||
lines.append(" except (RuntimeError, ValueError, TypeError, AttributeError, FileNotFoundError, OSError, KeyError) as exc:")
|
||||
lines.append(" pytest.skip(f'Auto-generated stub needs richer fixture: {exc}')")
|
||||
lines.append(" except (ImportError, ModuleNotFoundError) as exc:")
|
||||
lines.append(" pytest.skip(f'Module not importable: {exc}')")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def generate_test_suite(gaps, max_tests=50):
|
||||
@@ -216,10 +273,26 @@ def generate_test_suite(gaps, max_tests=50):
|
||||
lines.append("These tests are starting points. Review before merging.")
|
||||
lines.append('"""')
|
||||
lines.append("")
|
||||
lines.append("import importlib.util")
|
||||
lines.append("from pathlib import Path")
|
||||
lines.append("import pytest")
|
||||
lines.append("from unittest.mock import MagicMock, patch")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
lines.append("def _load_symbol(relative_path, symbol):")
|
||||
lines.append(" module_path = Path(__file__).resolve().parents[1] / relative_path")
|
||||
lines.append(" if not module_path.exists():")
|
||||
lines.append(" pytest.skip(f'Module file not found: {module_path}')")
|
||||
lines.append(" spec_name = 'autogen_' + str(relative_path).replace('/', '_').replace('-', '_').replace('.', '_')")
|
||||
lines.append(" spec = importlib.util.spec_from_file_location(spec_name, module_path)")
|
||||
lines.append(" module = importlib.util.module_from_spec(spec)")
|
||||
lines.append(" try:")
|
||||
lines.append(" spec.loader.exec_module(module)")
|
||||
lines.append(" except Exception as exc:")
|
||||
lines.append(" pytest.skip(f'Module not importable: {exc}')")
|
||||
lines.append(" return getattr(module, symbol)")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
lines.append("# AUTO-GENERATED -- DO NOT EDIT WITHOUT REVIEW")
|
||||
|
||||
for module, mgaps in sorted(by_module.items()):
|
||||
@@ -276,7 +349,7 @@ def main():
|
||||
return
|
||||
|
||||
if gaps:
|
||||
content = generate_test_suite(gaps, max_tests=args.max-tests if hasattr(args, 'max-tests') else args.max_tests)
|
||||
content = generate_test_suite(gaps, max_tests=args.max_tests)
|
||||
out = os.path.join(source_dir, args.output)
|
||||
os.makedirs(os.path.dirname(out), exist_ok=True)
|
||||
with open(out, "w") as f:
|
||||
|
||||
@@ -1,292 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Plan atomic fleet config sync releases.
|
||||
|
||||
Refs: timmy-home #550
|
||||
|
||||
Phase-3 orchestration slice:
|
||||
- define a shared config-sync manifest for fleet hosts
|
||||
- fingerprint the exact config payload into one release id
|
||||
- generate per-host staging paths and atomic symlink-swap promotion metadata
|
||||
- stay dry-run by default so rollout planning is safe to verify locally
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
DEFAULT_INVENTORY_FILE = Path(__file__).resolve().parents[1] / "ansible" / "inventory" / "hosts.ini"
|
||||
|
||||
|
||||
def load_inventory_hosts(path: str | Path) -> dict[str, dict[str, str]]:
|
||||
hosts: dict[str, dict[str, str]] = {}
|
||||
section = None
|
||||
for raw_line in Path(path).read_text(encoding="utf-8").splitlines():
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#") or line.startswith(";"):
|
||||
continue
|
||||
if line.startswith("[") and line.endswith("]"):
|
||||
section = line[1:-1].strip().lower()
|
||||
continue
|
||||
if section != "fleet":
|
||||
continue
|
||||
|
||||
parts = line.split()
|
||||
host = parts[0]
|
||||
metadata = {"host": host}
|
||||
for token in parts[1:]:
|
||||
if "=" not in token:
|
||||
continue
|
||||
key, value = token.split("=", 1)
|
||||
metadata[key] = value
|
||||
hosts[host] = metadata
|
||||
|
||||
if not hosts:
|
||||
raise ValueError("inventory defines no [fleet] hosts")
|
||||
return hosts
|
||||
|
||||
|
||||
|
||||
def load_manifest(path: str | Path) -> dict[str, Any]:
|
||||
data = yaml.safe_load(Path(path).read_text(encoding="utf-8")) or {}
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("manifest must contain a YAML object")
|
||||
data.setdefault("fleet_name", "timmy-fleet-config")
|
||||
data.setdefault("targets", [])
|
||||
if not isinstance(data["targets"], list):
|
||||
raise ValueError("targets must be a list")
|
||||
return data
|
||||
|
||||
|
||||
|
||||
def _normalize_relative_path(value: str) -> str:
|
||||
path = Path(value)
|
||||
if path.is_absolute():
|
||||
raise ValueError(f"sync file path must be relative: {value}")
|
||||
if any(part == ".." for part in path.parts):
|
||||
raise ValueError(f"sync file path may not escape source root: {value}")
|
||||
normalized = path.as_posix()
|
||||
if normalized in {"", "."}:
|
||||
raise ValueError("sync file path may not be empty")
|
||||
return normalized
|
||||
|
||||
|
||||
|
||||
def validate_manifest(manifest: dict[str, Any], inventory_hosts: dict[str, dict[str, str]]) -> None:
|
||||
targets = manifest.get("targets", [])
|
||||
if not targets:
|
||||
raise ValueError("manifest must define at least one sync target")
|
||||
|
||||
seen_hosts: set[str] = set()
|
||||
for target in targets:
|
||||
if not isinstance(target, dict):
|
||||
raise ValueError("each target must be a mapping")
|
||||
|
||||
host = str(target.get("host", "")).strip()
|
||||
if not host:
|
||||
raise ValueError("each target must declare a host")
|
||||
if host in seen_hosts:
|
||||
raise ValueError(f"duplicate target host: {host}")
|
||||
if host not in inventory_hosts:
|
||||
raise ValueError(f"unknown inventory host: {host}")
|
||||
seen_hosts.add(host)
|
||||
|
||||
config_root = str(target.get("config_root", "")).strip()
|
||||
if not config_root:
|
||||
raise ValueError(f"target {host} missing config_root")
|
||||
|
||||
files = target.get("files")
|
||||
if not isinstance(files, list) or not files:
|
||||
raise ValueError(f"target {host} must declare at least one file")
|
||||
|
||||
normalized: list[str] = []
|
||||
for entry in files:
|
||||
normalized.append(_normalize_relative_path(str(entry)))
|
||||
if len(set(normalized)) != len(normalized):
|
||||
raise ValueError(f"target {host} declares duplicate file paths")
|
||||
|
||||
|
||||
|
||||
def _hash_file(path: Path) -> str:
|
||||
return hashlib.sha256(path.read_bytes()).hexdigest()
|
||||
|
||||
|
||||
|
||||
def _collect_target_files(source_root: Path, rel_paths: list[str]) -> list[dict[str, Any]]:
|
||||
items: list[dict[str, Any]] = []
|
||||
for rel_path in sorted(_normalize_relative_path(path) for path in rel_paths):
|
||||
source_path = source_root / rel_path
|
||||
if not source_path.exists():
|
||||
raise FileNotFoundError(f"missing source file: {rel_path}")
|
||||
if not source_path.is_file():
|
||||
raise ValueError(f"sync source must be a file: {rel_path}")
|
||||
items.append(
|
||||
{
|
||||
"relative_path": rel_path,
|
||||
"source": str(source_path),
|
||||
"sha256": _hash_file(source_path),
|
||||
"size": source_path.stat().st_size,
|
||||
}
|
||||
)
|
||||
return items
|
||||
|
||||
|
||||
|
||||
def compute_release_id(target_payloads: list[dict[str, Any]]) -> str:
|
||||
digest = hashlib.sha256()
|
||||
for target in sorted(target_payloads, key=lambda item: item["host"]):
|
||||
digest.update(target["host"].encode("utf-8"))
|
||||
digest.update(b"\0")
|
||||
for file_item in sorted(target["files"], key=lambda item: item["relative_path"]):
|
||||
digest.update(file_item["relative_path"].encode("utf-8"))
|
||||
digest.update(b"\0")
|
||||
digest.update(file_item["sha256"].encode("utf-8"))
|
||||
digest.update(b"\0")
|
||||
digest.update(str(file_item["size"]).encode("utf-8"))
|
||||
digest.update(b"\0")
|
||||
return digest.hexdigest()[:12]
|
||||
|
||||
|
||||
|
||||
def build_rollout_plan(
|
||||
manifest: dict[str, Any],
|
||||
inventory_hosts: dict[str, dict[str, str]],
|
||||
*,
|
||||
source_root: str | Path,
|
||||
) -> dict[str, Any]:
|
||||
validate_manifest(manifest, inventory_hosts)
|
||||
|
||||
source_root = Path(source_root)
|
||||
if not source_root.exists():
|
||||
raise FileNotFoundError(f"source root not found: {source_root}")
|
||||
if not source_root.is_dir():
|
||||
raise ValueError(f"source root must be a directory: {source_root}")
|
||||
|
||||
staged_targets: list[dict[str, Any]] = []
|
||||
for target in sorted(manifest["targets"], key=lambda item: item["host"]):
|
||||
host = target["host"]
|
||||
files = _collect_target_files(source_root, target["files"])
|
||||
staged_targets.append(
|
||||
{
|
||||
"host": host,
|
||||
"inventory": inventory_hosts[host],
|
||||
"config_root": str(target["config_root"]),
|
||||
"files": files,
|
||||
}
|
||||
)
|
||||
|
||||
release_id = compute_release_id(staged_targets)
|
||||
total_bytes = 0
|
||||
file_count = 0
|
||||
rendered_targets: list[dict[str, Any]] = []
|
||||
for target in staged_targets:
|
||||
config_root = target["config_root"].rstrip("/")
|
||||
stage_root = f"{config_root}/.releases/{release_id}"
|
||||
live_symlink = f"{config_root}/current"
|
||||
previous_symlink = f"{config_root}/previous"
|
||||
file_count += len(target["files"])
|
||||
total_bytes += sum(item["size"] for item in target["files"])
|
||||
rendered_targets.append(
|
||||
{
|
||||
"host": target["host"],
|
||||
"ansible_host": target["inventory"].get("ansible_host", ""),
|
||||
"ansible_user": target["inventory"].get("ansible_user", ""),
|
||||
"config_root": config_root,
|
||||
"stage_root": stage_root,
|
||||
"live_symlink": live_symlink,
|
||||
"previous_symlink": previous_symlink,
|
||||
"files": target["files"],
|
||||
"promote": {
|
||||
"mode": "symlink_swap",
|
||||
"release_id": release_id,
|
||||
"from": stage_root,
|
||||
"to": live_symlink,
|
||||
"backup_link": previous_symlink,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
return {
|
||||
"fleet_name": manifest.get("fleet_name", "timmy-fleet-config"),
|
||||
"source_root": str(source_root),
|
||||
"release_id": release_id,
|
||||
"target_count": len(rendered_targets),
|
||||
"file_count": file_count,
|
||||
"total_bytes": total_bytes,
|
||||
"targets": rendered_targets,
|
||||
}
|
||||
|
||||
|
||||
|
||||
def render_markdown(plan: dict[str, Any]) -> str:
|
||||
lines = [
|
||||
"# Fleet Config Sync Plan",
|
||||
"",
|
||||
f"Fleet: {plan['fleet_name']}",
|
||||
f"Release ID: `{plan['release_id']}`",
|
||||
f"Source root: `{plan['source_root']}`",
|
||||
f"Target count: {plan['target_count']}",
|
||||
f"File count: {plan['file_count']}",
|
||||
f"Total bytes: {plan['total_bytes']}",
|
||||
"",
|
||||
"Atomic promote via symlink swap keeps every host on one named release boundary.",
|
||||
"",
|
||||
"| Host | Address | Stage root | Live symlink | Files |",
|
||||
"|---|---|---|---|---:|",
|
||||
]
|
||||
|
||||
for target in plan["targets"]:
|
||||
lines.append(
|
||||
f"| {target['host']} | {target['ansible_host'] or 'n/a'} | `{target['stage_root']}` | `{target['live_symlink']}` | {len(target['files'])} |"
|
||||
)
|
||||
|
||||
lines.extend(["", "## Target file manifests", ""])
|
||||
for target in plan["targets"]:
|
||||
lines.extend(
|
||||
[
|
||||
f"### {target['host']}",
|
||||
"",
|
||||
f"- Promote: `{target['promote']['from']}` -> `{target['promote']['to']}`",
|
||||
f"- Backup link: `{target['promote']['backup_link']}`",
|
||||
"",
|
||||
"| Relative path | Bytes | SHA256 |",
|
||||
"|---|---:|---|",
|
||||
]
|
||||
)
|
||||
for file_item in target["files"]:
|
||||
lines.append(
|
||||
f"| `{file_item['relative_path']}` | {file_item['size']} | `{file_item['sha256'][:16]}…` |"
|
||||
)
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines).rstrip() + "\n"
|
||||
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Plan a dry-run atomic config sync release across fleet hosts")
|
||||
parser.add_argument("manifest", help="Path to fleet config sync manifest YAML")
|
||||
parser.add_argument("--inventory", default=str(DEFAULT_INVENTORY_FILE), help="Path to Ansible fleet inventory")
|
||||
parser.add_argument("--source-root", default=".", help="Local source root containing files listed in the manifest")
|
||||
parser.add_argument("--markdown", action="store_true", help="Render markdown instead of JSON")
|
||||
args = parser.parse_args()
|
||||
|
||||
inventory = load_inventory_hosts(args.inventory)
|
||||
manifest = load_manifest(args.manifest)
|
||||
plan = build_rollout_plan(manifest, inventory, source_root=args.source_root)
|
||||
|
||||
if args.markdown:
|
||||
print(render_markdown(plan))
|
||||
else:
|
||||
print(json.dumps(plan, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
55
tests/test_codebase_test_generator.py
Normal file
55
tests/test_codebase_test_generator.py
Normal file
@@ -0,0 +1,55 @@
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
SCRIPT = ROOT / "scripts" / "codebase_test_generator.py"
|
||||
|
||||
|
||||
def load_module():
|
||||
spec = importlib.util.spec_from_file_location("codebase_test_generator", str(SCRIPT))
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
def test_generate_test_suite_uses_dynamic_loader_for_numbered_paths():
|
||||
mod = load_module()
|
||||
func = mod.FunctionInfo(
|
||||
name="linkify",
|
||||
module_path="reports/notebooklm/2026-03-27-hermes-openclaw/render_reports.py",
|
||||
lineno=12,
|
||||
args=["text"],
|
||||
has_return=True,
|
||||
)
|
||||
gap = mod.CoverageGap(func=func, reason="no test found", test_priority=1)
|
||||
|
||||
suite = mod.generate_test_suite([gap], max_tests=1)
|
||||
|
||||
assert "import importlib.util" in suite
|
||||
assert "_load_symbol(" in suite
|
||||
assert "from reports.notebooklm" not in suite
|
||||
assert "2026-03-27-hermes-openclaw/render_reports.py" in suite
|
||||
|
||||
|
||||
def test_generate_test_handles_async_and_runtime_args_safely():
|
||||
mod = load_module()
|
||||
func = mod.FunctionInfo(
|
||||
name="keypress",
|
||||
module_path="angband/mcp_server.py",
|
||||
lineno=200,
|
||||
args=["key", "wait_ms", "session_name"],
|
||||
is_async=True,
|
||||
has_return=True,
|
||||
calls=["send_key"],
|
||||
)
|
||||
gap = mod.CoverageGap(func=func, reason="no test found", test_priority=1)
|
||||
|
||||
test_code = mod.generate_test(gap)
|
||||
|
||||
assert "@pytest.mark.asyncio" in test_code
|
||||
assert "async def" in test_code
|
||||
assert "await target(" in test_code
|
||||
assert "key='test'" in test_code
|
||||
assert "wait_ms=1" in test_code
|
||||
assert "session_name='test'" in test_code
|
||||
assert "pytest.raises((RuntimeError, ValueError, TypeError))" in test_code
|
||||
@@ -1,122 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
SCRIPT_PATH = ROOT / "scripts" / "fleet_config_sync.py"
|
||||
HOSTS_FILE = ROOT / "ansible" / "inventory" / "hosts.ini"
|
||||
EXAMPLE_MANIFEST = ROOT / "docs" / "fleet-config-sync.example.yaml"
|
||||
|
||||
|
||||
def _load_module(path: Path, name: str):
|
||||
assert path.exists(), f"missing {path.relative_to(ROOT)}"
|
||||
spec = importlib.util.spec_from_file_location(name, path)
|
||||
assert spec and spec.loader
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def _write_source_tree(tmp_path: Path) -> Path:
|
||||
source_root = tmp_path / "source"
|
||||
(source_root / "dispatch").mkdir(parents=True)
|
||||
(source_root / "config.yaml").write_text("model: local\nroute: hybrid\n", encoding="utf-8")
|
||||
(source_root / "dispatch" / "rules.json").write_text('{"lane":"allegro"}\n', encoding="utf-8")
|
||||
return source_root
|
||||
|
||||
|
||||
def test_example_manifest_targets_known_fleet_hosts() -> None:
|
||||
mod = _load_module(SCRIPT_PATH, "fleet_config_sync")
|
||||
assert EXAMPLE_MANIFEST.exists(), "missing docs/fleet-config-sync.example.yaml"
|
||||
|
||||
inventory = mod.load_inventory_hosts(HOSTS_FILE)
|
||||
manifest = mod.load_manifest(EXAMPLE_MANIFEST)
|
||||
mod.validate_manifest(manifest, inventory)
|
||||
|
||||
assert [target["host"] for target in manifest["targets"]] == ["ezra", "bezalel"]
|
||||
|
||||
|
||||
def test_build_rollout_plan_stages_one_release_for_all_hosts(tmp_path: Path) -> None:
|
||||
mod = _load_module(SCRIPT_PATH, "fleet_config_sync")
|
||||
source_root = _write_source_tree(tmp_path)
|
||||
inventory = {
|
||||
"ezra": {"host": "ezra", "ansible_host": "143.198.27.163"},
|
||||
"bezalel": {"host": "bezalel", "ansible_host": "67.205.155.108"},
|
||||
}
|
||||
manifest = {
|
||||
"fleet_name": "phase-3-config-sync",
|
||||
"targets": [
|
||||
{
|
||||
"host": "ezra",
|
||||
"config_root": "/root/wizards/ezra/home/.hermes",
|
||||
"files": ["config.yaml", "dispatch/rules.json"],
|
||||
},
|
||||
{
|
||||
"host": "bezalel",
|
||||
"config_root": "/root/wizards/bezalel/home/.hermes",
|
||||
"files": ["config.yaml", "dispatch/rules.json"],
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
plan = mod.build_rollout_plan(manifest, inventory, source_root=source_root)
|
||||
|
||||
assert plan["fleet_name"] == "phase-3-config-sync"
|
||||
assert len(plan["release_id"]) == 12
|
||||
assert plan["target_count"] == 2
|
||||
assert plan["file_count"] == 4
|
||||
assert plan["total_bytes"] > 0
|
||||
|
||||
for target in plan["targets"]:
|
||||
assert target["stage_root"].endswith(f"/.releases/{plan['release_id']}")
|
||||
assert target["live_symlink"].endswith("/current")
|
||||
assert target["promote"]["release_id"] == plan["release_id"]
|
||||
assert {item["relative_path"] for item in target["files"]} == {"config.yaml", "dispatch/rules.json"}
|
||||
|
||||
|
||||
def test_validate_manifest_rejects_unknown_inventory_host(tmp_path: Path) -> None:
|
||||
mod = _load_module(SCRIPT_PATH, "fleet_config_sync")
|
||||
source_root = _write_source_tree(tmp_path)
|
||||
manifest = {
|
||||
"targets": [
|
||||
{
|
||||
"host": "unknown-wizard",
|
||||
"config_root": "/srv/wizard/config",
|
||||
"files": ["config.yaml"],
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
try:
|
||||
mod.build_rollout_plan(manifest, {"ezra": {"host": "ezra"}}, source_root=source_root)
|
||||
except ValueError as exc:
|
||||
assert "unknown inventory host" in str(exc)
|
||||
assert "unknown-wizard" in str(exc)
|
||||
else:
|
||||
raise AssertionError("build_rollout_plan should reject unknown inventory hosts")
|
||||
|
||||
|
||||
def test_render_markdown_mentions_atomic_promote_and_targets(tmp_path: Path) -> None:
|
||||
mod = _load_module(SCRIPT_PATH, "fleet_config_sync")
|
||||
source_root = _write_source_tree(tmp_path)
|
||||
manifest = {
|
||||
"fleet_name": "phase-3-config-sync",
|
||||
"targets": [
|
||||
{
|
||||
"host": "ezra",
|
||||
"config_root": "/root/wizards/ezra/home/.hermes",
|
||||
"files": ["config.yaml"],
|
||||
}
|
||||
],
|
||||
}
|
||||
inventory = {"ezra": {"host": "ezra", "ansible_host": "143.198.27.163"}}
|
||||
|
||||
plan = mod.build_rollout_plan(manifest, inventory, source_root=source_root)
|
||||
report = mod.render_markdown(plan)
|
||||
|
||||
assert plan["release_id"] in report
|
||||
assert "Atomic promote via symlink swap" in report
|
||||
assert "ezra" in report
|
||||
assert "/root/wizards/ezra/home/.hermes/current" in report
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user