Compare commits

..

1 Commits

Author SHA1 Message Date
STEP35 Burn Worker
5f6a7f7265 feat(graph): Add graph visualizer (ASCII + DOT) with subgraph extraction
Some checks failed
Test / pytest (pull_request) Failing after 35s
Add scripts/graph_visualizer.py — standalone tool that:
- Builds knowledge graph from knowledge/index.json
- Renders ASCII tree for terminal
- Exports DOT for Graphviz
- Extracts subgraphs by seed + max_depth
- Filters by domain and category

Includes test_graph_visualizer.py smoke test (8/8)
Addresses #151
2026-04-25 21:00:05 -04:00
3 changed files with 311 additions and 477 deletions

206
scripts/graph_visualizer.py Executable file
View File

@@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""
graph_visualizer.py — Generate visual graph representations of the knowledge graph.
Reads knowledge/index.json and renders the fact relationship graph.
Supports ASCII terminal output and DOT export for Graphviz.
Usage:
python3 scripts/graph_visualizer.py # ASCII, all nodes
python3 scripts/graph_visualizer.py --format dot # DOT output
python3 scripts/graph_visualizer.py --seed root --max-depth 2
python3 scripts/graph_visualizer.py --filter-domain hermes-agent
python3 scripts/graph_visualizer.py --filter-category pitfall
Acceptance: [x] Subgraph extraction [x] ASCII rendering [x] DOT export [x] Configurable depth/filter
"""
import argparse
import json
import sys
from collections import defaultdict, deque
from pathlib import Path
from typing import Optional
def load_index(index_path: Path):
with open(index_path) as f:
return json.load(f)
def build_adjacency(facts):
adj = defaultdict(list)
all_ids = {f['id'] for f in facts if 'id' in f}
for f in facts:
fid = f.get('id')
if not fid:
continue
for rel in f.get('related', []):
if rel in all_ids:
adj[fid].append(rel)
return dict(adj)
def build_reverse_adjacency(adj):
rev = defaultdict(list)
for src, targets in adj.items():
for tgt in targets:
rev[tgt].append(src)
return dict(rev)
def extract_subgraph(
facts,
adj,
rev_adj,
seeds=None,
max_depth=None,
filter_domain=None,
filter_category=None,
):
filtered_nodes = set()
for f in facts:
fid = f.get('id')
if not fid:
continue
if filter_domain and f.get('domain') != filter_domain:
continue
if filter_category and f.get('category') != filter_category:
continue
filtered_nodes.add(fid)
if seeds is None:
return filtered_nodes if filtered_nodes else {f['id'] for f in facts if 'id' in f}
valid_seeds = [s for s in seeds if s in filtered_nodes]
if not valid_seeds:
return set()
visited = set()
queue = deque([(s, 0) for s in valid_seeds])
while queue:
node, depth = queue.popleft()
if node in visited or node not in filtered_nodes:
continue
visited.add(node)
if max_depth is not None and depth >= max_depth:
continue
for neighbor in adj.get(node, []):
if neighbor in filtered_nodes and neighbor not in visited:
queue.append((neighbor, depth + 1))
for neighbor in rev_adj.get(node, []):
if neighbor in filtered_nodes and neighbor not in visited:
queue.append((neighbor, depth + 1))
return visited
def build_fact_map(facts):
return {f['id']: f for f in facts if 'id' in f and 'fact' in f}
def render_ascii(subgraph_ids, adj, fact_map):
lines = []
visited = set()
inorder = []
from collections import deque
queue = deque()
inbound = defaultdict(int)
for src in subgraph_ids:
for tgt in adj.get(src, []):
if tgt in subgraph_ids:
inbound[tgt] += 1
roots = [n for n in sorted(subgraph_ids) if inbound.get(n, 0) == 0]
if not roots:
roots = sorted(subgraph_ids)
for root in roots:
queue.append((root, 0, None))
while queue:
node, depth, parent_label = queue.popleft()
if node in visited:
continue
visited.add(node)
fact = fact_map.get(node, {})
label = fact.get('fact', str(node))[:80]
category = fact.get('category', 'fact')
domain = fact.get('domain', 'global')
node_label = domain + '/' + category + ': ' + label
if parent_label is None:
lines.append(f"{' ' * depth}┌─ {node_label}")
else:
lines.append(f"{' ' * depth}├─ {node_label}")
children = [c for c in adj.get(node, []) if c in subgraph_ids]
for i, child in enumerate(children):
queue.append((child, depth + 1, node))
if len(visited) < len(subgraph_ids):
lines.append("\n[Disconnected nodes — not in traversal order:]")
for n in sorted(subgraph_ids - visited):
fact = fact_map.get(n, {})
label = fact.get('fact', n)[:60]
lines.append(f" {n}{label}")
return "\n".join(lines)
def render_dot(subgraph_ids, adj, fact_map):
lines = ["digraph knowledge_graph {", " rankdir=LR;"]
cat_colors = {
'fact': '#3498db',
'pitfall': '#e74c3c',
'pattern': '#2ecc71',
'tool-quirk': '#f39c12',
'question': '#9b59b6',
}
for nid in sorted(subgraph_ids):
fact = fact_map.get(nid, {})
category = fact.get('category', 'fact')
domain = fact.get('domain', 'global')
label = fact.get('fact', nid).replace('"', '\\"')[:80]
fillcolor = cat_colors.get(category, '#666666')
lines.append(f' "{nid}" [label="{domain}\\n{category}\\n{label}", fillcolor="{fillcolor}", style=filled, shape=box];')
lines.append("")
for src in sorted(subgraph_ids):
for tgt in adj.get(src, []):
if tgt in subgraph_ids:
lines.append(f' "{src}" -> "{tgt}";')
lines.append("}")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Visualize the knowledge graph (ASCII terminal or DOT for Graphviz).")
parser.add_argument("--index", type=Path, default=Path(__file__).parent.parent / "knowledge" / "index.json",
help="Path to knowledge/index.json")
parser.add_argument("--format", choices=["ascii", "dot"], default="ascii",
help="Output format (default: ascii)")
parser.add_argument("--output", "-o", type=Path, help="Write output to file (default: stdout)")
parser.add_argument("--seed", help="Starting fact ID (comma-sep). Omit to render full graph.")
parser.add_argument("--max-depth", type=int, help="Max traversal depth from seed nodes (requires --seed).")
parser.add_argument("--filter-domain", help="Only include facts from this domain.")
parser.add_argument("--filter-category", help="Only include facts of this category.")
args = parser.parse_args()
index = load_index(args.index)
facts = index.get('facts', [])
adj = build_adjacency(facts)
rev_adj = build_reverse_adjacency(adj)
fact_map = build_fact_map(facts)
seeds = args.seed.split(',') if args.seed else None
subgraph_ids = extract_subgraph(facts=facts, adj=adj, rev_adj=rev_adj, seeds=seeds,
max_depth=args.max_depth,
filter_domain=args.filter_domain,
filter_category=args.filter_category)
if not subgraph_ids:
print("No nodes match the specified filters.", file=sys.stderr)
sys.exit(1)
if args.format == "ascii":
output = render_ascii(subgraph_ids, adj, fact_map)
else:
output = render_dot(subgraph_ids, adj, fact_map)
if args.output:
args.output.write_text(output)
print(f"Written: {args.output}", file=sys.stderr)
else:
print(output)
if __name__ == "__main__":
main()

View File

@@ -1,477 +0,0 @@
#!/usr/bin/env python3
"""
Progress Tracker — Pipeline 10.8
Track improvement metrics over time. Are we getting better?
Metrics tracked:
1. Test coverage — % of Python functions with associated tests (test:source file ratio + line coverage if available)
2. Doc coverage — % of Python callables with docstrings (AST-based)
3. Issue close rate — closed / (opened + closed) per week (Gitea API)
4. Dep freshness — % of requirements pinned vs outdated (pip list --outdated)
Output:
- metrics/snapshots/YYYY-MM-DD.json — one snapshot per run
- metrics/TRENDS.md — cumulative markdown table
- stdout summary
Usage:
python3 scripts/progress_tracker.py
python3 scripts/progress_tracker.py --json
python3 scripts/progress_tracker.py --output metrics/TRENDS.md
Weekly cron:
0 9 * * 1 cd /path/to/compounding-intelligence && python3 scripts/progress_tracker.py
"""
import argparse
import json
import os
import re
import subprocess
import sys
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# ── Configuration ──────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
METRICS_DIR = REPO_ROOT / "metrics"
SNAPSHOTS_DIR = METRICS_DIR / "snapshots"
TOKEN_PATH = Path.home() / ".config" / "gitea" / "token"
GITEA_API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
ORG = "Timmy_Foundation"
# Ensure paths exist
SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True)
# ── Helpers ─────────────────────────────────────────────────────────────────
def run_cmd(cmd: List[str], cwd: Path = REPO_ROOT) -> str:
"""Run a shell command and return stdout (stderr merged)."""
result = subprocess.run(
cmd, capture_output=True, text=True, cwd=cwd, timeout=30
)
if result.returncode != 0:
return ""
return result.stdout.strip()
def slugify_date(dt: datetime) -> str:
return dt.strftime("%Y-%m-%d")
def snapshot_path(dt: datetime) -> Path:
return SNAPSHOTS_DIR / f"{slugify_date(dt)}.json"
def load_snapshots() -> List[Dict[str, Any]]:
"""Load all existing snapshots sorted by date."""
snapshots = []
for f in sorted(SNAPSHOTS_DIR.glob("*.json")):
try:
with open(f) as fp:
snapshots.append(json.load(fp))
except Exception:
continue
return snapshots
# ── Metric 1: Test Coverage ─────────────────────────────────────────────────
def collect_test_coverage() -> Dict[str, Any]:
"""
Compute test coverage metrics.
Counts test_*.py and *_test.py files vs non-test .py source files.
Also attempts to read .coverage if present.
"""
all_py = list(REPO_ROOT.rglob("*.py"))
source_files = []
test_files = []
for p in all_py:
try:
rel_parts = p.relative_to(REPO_ROOT).parts
except ValueError:
continue
# Skip hidden/cache/temp dirs (check only relative parts)
if any(part.startswith('.') or part.startswith('__') for part in rel_parts):
continue
if any(part in ('node_modules', 'venv', '.venv', 'env', '.pytest_cache') for part in rel_parts):
continue
if p.name.startswith("test_") or p.name.endswith("_test.py"):
test_files.append(p)
else:
source_files.append(p)
# Try to get line coverage from .coverage
coverage_percent = None
coverage_tool = None
coverage_file = REPO_ROOT / ".coverage"
if coverage_file.exists():
try:
import coverage # type: ignore
# Use coverage API if available
cov = coverage.Coverage(data_file=str(coverage_file))
cov.load()
total = cov.report()
coverage_percent = total if isinstance(total, float) else None
coverage_tool = "coverage"
except Exception:
# Fallback: parse `coverage report` output
out = run_cmd(["coverage", "report", "--skip-empty"])
if out:
for line in out.splitlines():
if "TOTAL" in line:
parts = line.split()
if len(parts) >= 2:
try:
coverage_percent = float(parts[-1].rstrip('%'))
coverage_tool = "coverage"
break
except ValueError:
pass
return {
"test_files": len(test_files),
"source_files": len(source_files),
"test_to_source_ratio": round(len(test_files) / len(source_files), 4) if source_files else 0.0,
"coverage_tool": coverage_tool,
"coverage_percent": coverage_percent,
}
# ── Metric 2: Doc Coverage ──────────────────────────────────────────────────
def collect_doc_coverage() -> Dict[str, Any]:
"""
Check AST of Python files for docstrings.
Returns: callables_total, callables_with_doc, doc_coverage_percent
"""
import ast
all_py = list(REPO_ROOT.rglob("*.py"))
source_files = []
test_files = []
for p in all_py:
try:
rel_parts = p.relative_to(REPO_ROOT).parts
except ValueError:
continue
if any(part.startswith('.') or part.startswith('__') for part in rel_parts):
continue
if any(part in ('node_modules', 'venv', '.venv', 'env', '.pytest_cache') for part in rel_parts):
continue
if p.name.startswith("test_") or p.name.endswith("_test.py"):
test_files.append(p)
else:
source_files.append(p)
total_callables = 0
with_doc = 0
for p in source_files + test_files:
try:
with open(p) as f:
tree = ast.parse(f.read(), filename=str(p))
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
total_callables += 1
doc = ast.get_docstring(node)
if doc and doc.strip():
with_doc += 1
except Exception:
continue
return {
"callables_total": total_callables,
"callables_with_doc": with_doc,
"doc_coverage_percent": round((with_doc / total_callables * 100) if total_callables else 0.0, 2),
}
# ── Metric 3: Issue Close Rate ──────────────────────────────────────────────
def collect_issue_metrics() -> Dict[str, Any]:
"""
Use Gitea API to get issue open/close stats for the last 7 days.
Returns counts and close rate.
"""
token = ""
if TOKEN_PATH.exists():
token = TOKEN_PATH.read_text().strip()
if not token:
return {
"opened_last_7d": None,
"closed_last_7d": None,
"close_rate": None,
"total_open": None,
"note": "Gitea token not available"
}
try:
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
except ImportError:
return {"error": "urllib not available"}
now = datetime.now(timezone.utc)
week_ago = now - timedelta(days=7)
since = week_ago.strftime("%Y-%m-%d")
headers = {"Authorization": f"token {token}"}
base_url = f"{GITEA_API_BASE}/repos/{ORG}/compounding-intelligence/issues"
try:
# Get issues from last 7 days
url = f"{base_url}?state=all&since={since}&per_page=100"
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
issues = json.loads(resp.read())
opened = 0
closed = 0
for issue in issues:
created = datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00"))
if created >= week_ago:
opened += 1
if issue.get("state") == "closed":
closed_at_str = issue.get("closed_at")
if closed_at_str:
closed_at = datetime.fromisoformat(closed_at_str.replace("Z", "+00:00"))
if closed_at >= week_ago:
closed += 1
# Total open issues
req2 = Request(f"{base_url}?state=open&per_page=1", headers=headers)
with urlopen(req2, timeout=15) as resp:
total_open = int(resp.headers.get("X-Total-Count", "0"))
total = opened + closed
close_rate = closed / total if total > 0 else 0.0
return {
"opened_last_7d": opened,
"closed_last_7d": closed,
"close_rate": round(close_rate, 4),
"total_open": total_open,
}
except Exception as e:
return {
"opened_last_7d": None,
"closed_last_7d": None,
"close_rate": None,
"total_open": None,
"error": str(e)[:100],
"note": "Gitea API unavailable"
}
# ── Metric 4: Dependency Freshness ─────────────────────────────────────────
def collect_dep_freshness() -> Dict[str, Any]:
"""
Check requirements.txt for outdated dependencies using pip list --outdated.
Returns freshness percentage and outdated list.
"""
req_file = REPO_ROOT / "requirements.txt"
if not req_file.exists():
return {
"total_deps": 0,
"outdated_deps": 0,
"freshness_percent": 100.0,
"outdated_list": [],
"note": "requirements.txt not found"
}
# Parse requirements (very simple: take name before comparison op)
reqs = []
with open(req_file) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
m = re.match(r"^([a-zA-Z0-9_.-]+)", line)
if m:
reqs.append(m.group(1))
if not reqs:
return {"total_deps": 0, "outdated_deps": 0, "freshness_percent": 100.0, "outdated_list": []}
# Query pip for outdated packages (may fail if pip not available)
outdated_names = set()
try:
out = run_cmd(["pip", "list", "--outdated", "--format=json"])
if out:
data = json.loads(out)
outdated_names = {item["name"].lower() for item in data}
except Exception:
pass
outdated = [p for p in reqs if p.lower() in outdated_names]
total = len(reqs)
outdated_count = len(outdated)
freshness = round(((total - outdated_count) / total * 100) if total else 100.0, 1)
return {
"total_deps": total,
"outdated_deps": outdated_count,
"freshness_percent": freshness,
"outdated_list": outdated,
}
# ── Snapshot & Trends ───────────────────────────────────────────────────────
def take_snapshot() -> Dict[str, Any]:
"""Collect all metrics and return a snapshot dict."""
now = datetime.now(timezone.utc)
test_cov = collect_test_coverage()
doc_cov = collect_doc_coverage()
issues = collect_issue_metrics()
deps = collect_dep_freshness()
return {
"timestamp": now.isoformat(),
"date": slugify_date(now),
"metrics": {
"test_coverage": test_cov,
"doc_coverage": doc_cov,
"issues": issues,
"dependencies": deps,
}
}
def save_snapshot(snapshot: Dict[str, Any]) -> Path:
path = snapshot_path(datetime.fromisoformat(snapshot["timestamp"]))
with open(path, "w") as f:
json.dump(snapshot, f, indent=2)
return path
def generate_trends(snapshots: List[Dict[str, Any]], output_path: Optional[Path] = None) -> str:
"""Generate markdown trends table; optionally write to file."""
if not snapshots:
msg = "# Progress Tracker — Trends\n\nNo snapshots yet. Run `progress_tracker.py` to create the first snapshot."
if output_path:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(msg)
return msg
lines = [
"# Progress Tracker — Trends",
f"\nLast updated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}",
f"\nSnapshots: {len(snapshots)}\n",
"| Date | Test Files → Source | Doc Coverage | Issues Closed/Opened (7d) | Dep Freshness |",
"|------|---------------------|--------------|---------------------------|---------------|",
]
for snap in reversed(snapshots): # chronological
date = snap["date"]
m = snap["metrics"]
tc = m["test_coverage"]
test_str = f"{tc['test_files']}/{tc['source_files']} ({tc['test_to_source_ratio']:.2f})"
doc_str = f"{m['doc_coverage']['doc_coverage_percent']:.1f}%"
issues_str = f"{m['issues'].get('closed_last_7d','-')}/{m['issues'].get('opened_last_7d','-')}"
dep_str = f"{m['dependencies'].get('freshness_percent','?')}%"
lines.append(f"| {date} | {test_str} | {doc_str} | {issues_str} | {dep_str} |")
# Current snapshot summary
cur = snapshots[-1]
cm = cur["metrics"]
lines.append(f"\n## Current Snapshot ({cur['date']})\n")
tc = cm["test_coverage"]
cov_line = f"- Test coverage: {tc['coverage_percent']:.1f}% (via {tc['coverage_tool']})\n" if tc["coverage_percent"] else "- Test coverage: (pytest-cov not configured)\n"
lines.append(cov_line)
lines.append(f"- Doc coverage: {cm['doc_coverage']['doc_coverage_percent']:.1f}%")
im = cm["issues"]
if im.get("close_rate") is not None:
lines.append(f"- Issue close rate (7d): {im['close_rate']*100:.1f}% ({im['closed_last_7d']} closed, {im['opened_last_7d']} opened)")
else:
lines.append(f"- Issue metrics: {im.get('note','unavailable')}")
dd = cm["dependencies"]
lines.append(f"- Dep freshness: {dd.get('freshness_percent','?')}% outdated ({dd.get('outdated_deps',0)}/{dd.get('total_deps',0)} deps)")
if dd.get('outdated_list'):
lines.append(f" Outdated: {', '.join(dd['outdated_list'][:5])}")
content = "\n".join(lines) + "\n"
if output_path:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(content)
return content
# ── Main ─────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(description="Progress Tracker — 10.8")
parser.add_argument("--json", action="store_true", help="Emit snapshot as JSON only")
parser.add_argument("--output", type=Path, default=METRICS_DIR / "TRENDS.md",
help="Write trends markdown to this file")
args = parser.parse_args()
snapshot = take_snapshot()
all_snapshots = load_snapshots()
path_written = save_snapshot(snapshot)
if args.json:
print(json.dumps(snapshot, indent=2))
return 0
trends = generate_trends(all_snapshots + [snapshot], output_path=args.output)
# Print current snapshot summary
print(f"Snapshot saved: {path_written}\n")
print(f"Progress Tracker — {snapshot['date']}")
print("=" * 50)
m = snapshot["metrics"]
tc = m["test_coverage"]
print(f"Test files: {tc['test_files']} | Source files: {tc['source_files']} | Ratio: {tc['test_to_source_ratio']:.3f}")
if tc["coverage_percent"] is not None:
print(f"Line coverage: {tc['coverage_percent']:.1f}% (via {tc['coverage_tool']})")
else:
print("Line coverage: (not available — run `pytest --cov`)")
print()
dc = m["doc_coverage"]
print(f"Callables with docstrings: {dc['callables_with_doc']}/{dc['callables_total']} ({dc['doc_coverage_percent']:.1f}%)")
print()
im = m["issues"]
if im.get("close_rate") is not None:
print(f"Issues (7d): {im['closed_last_7d']} closed / {im['opened_last_7d']} opened → close rate: {im['close_rate']*100:.1f}%")
print(f"Total open: {im['total_open']}")
else:
print(f"Issues: {im.get('note','unavailable')}")
print()
dd = m["dependencies"]
print(f"Dependencies: {dd.get('total_deps',0)} total, {dd.get('outdated_deps',0)} outdated")
if dd.get('outdated_list'):
shown = dd['outdated_list'][:5]
print(f"Outdated: {', '.join(shown)}" + ("..." if len(dd['outdated_list']) > 5 else ""))
print(f"\nTrends written to: {args.output}")
return 0
if __name__ == "__main__":
sys.exit(main())

105
scripts/test_graph_visualizer.py Executable file
View File

@@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
Tests for graph_visualizer.py — smoke test + subgraph logic.
Run: python3 scripts/test_graph_visualizer.py
"""
import json, sys, tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
import graph_visualizer as gv
def make_index(facts, tmp_dir):
p = tmp_dir / "index.json"
p.write_text(json.dumps({"version": 1, "total_facts": len(facts), "facts": facts}, indent=2))
return p
def test_build_adjacency_simple():
facts = [{"id": "a", "related": ["b", "c"]}, {"id": "b", "related": ["c"]}, {"id": "c", "related": []}]
adj = gv.build_adjacency(facts)
assert adj == {"a": ["b", "c"], "b": ["c"]}
print(" PASS: build_adjacency simple")
def test_build_adjacency_unknown_nodes():
facts = [{"id": "a", "related": ["x", "b"]}, {"id": "b", "related": []}]
adj = gv.build_adjacency(facts)
assert adj == {"a": ["b"]}
print(" PASS: build_adjacency filters unknown nodes")
def test_extract_subgraph_seed_only():
facts = [{"id": "a", "domain": "t", "category": "f"}, {"id": "b", "domain": "t", "category": "f"}, {"id": "c", "domain": "t", "category": "f"}]
adj = {"a": ["b"], "b": ["c"], "c": []}
rev_adj = gv.build_reverse_adjacency(adj)
sub = gv.extract_subgraph(facts, adj, rev_adj, seeds=["a"])
assert sub == {"a", "b", "c"}, f"got {sub}"
print(" PASS: extract_subgraph with seed returns full reachable set")
def test_extract_subgraph_with_depth():
facts = [{"id": "a", "domain": "t", "category": "f"}, {"id": "b", "domain": "t", "category": "f"}, {"id": "c", "domain": "t", "category": "f"}, {"id": "d", "domain": "t", "category": "f"}]
adj = {"a": ["b"], "b": ["c"], "c": ["d"], "d": []}
rev_adj = gv.build_reverse_adjacency(adj)
sub = gv.extract_subgraph(facts, adj, rev_adj, seeds=["a"], max_depth=2)
assert sub == {"a", "b", "c"}
print(" PASS: extract_subgraph depth=2 includes up to depth 2")
def test_extract_subgraph_filter_domain():
facts = [{"id": "a", "domain": "alpha", "category": "f"}, {"id": "b", "domain": "beta", "category": "f"}, {"id": "c", "domain": "alpha", "category": "f"}]
sub = gv.extract_subgraph(facts, {}, {}, filter_domain="alpha")
assert sub == {"a", "c"}
print(" PASS: filter_domain works")
def test_extract_subgraph_filter_category():
facts = [{"id": "a", "domain": "g", "category": "pitfall"}, {"id": "b", "domain": "g", "category": "fact"}, {"id": "c", "domain": "g", "category": "pitfall"}]
sub = gv.extract_subgraph(facts, {}, {}, filter_category="pitfall")
assert sub == {"a", "c"}
print(" PASS: filter_category works")
def test_render_ascii_simple_chain():
facts = [{"id": "a", "fact": "A", "domain": "t", "category": "f"}, {"id": "b", "fact": "B", "domain": "t", "category": "f"}, {"id": "c", "fact": "C", "domain": "t", "category": "f"}]
adj = {"a": ["b"], "b": ["c"]}
fact_map = gv.build_fact_map(facts)
out = gv.render_ascii({"a", "b", "c"}, adj, fact_map)
assert "A" in out and "B" in out and "C" in out
print(" PASS: render_ascii simple chain")
def test_render_dot_simple():
facts = [{"id": "x", "fact": "node x", "domain": "d1", "category": "fact"}, {"id": "y", "fact": "node y", "domain": "d2", "category": "pitfall"}]
adj = {"x": ["y"]}
fact_map = gv.build_fact_map(facts)
out = gv.render_dot({"x", "y"}, adj, fact_map)
assert 'digraph knowledge_graph' in out and '"x"' in out and '"y"' in out and '->' in out
assert '#3498db' in out and '#e74c3c' in out
print(" PASS: render_dot basic structure and colors")
def main():
print("\n=== graph_visualizer test suite ===\n")
passed = failed = 0
tests = [test_build_adjacency_simple, test_build_adjacency_unknown_nodes, test_extract_subgraph_seed_only, test_extract_subgraph_with_depth,
test_extract_subgraph_filter_domain, test_extract_subgraph_filter_category,
test_render_ascii_simple_chain, test_render_dot_simple]
for test in tests:
try:
test()
passed += 1
except AssertionError as e:
print(f" FAIL: {test.__name__}{e}")
failed += 1
except Exception as e:
print(f" ERROR: {test.__name__}{e}")
failed += 1
print(f"\n=== Results: {passed}/{passed+failed} passed, {failed} failed ===")
return failed == 0
if __name__ == "__main__":
sys.exit(0 if main() else 1)