#!/usr/bin/env python3 """ Graph Query Engine — traverse the knowledge graph. Usage: python3 scripts/graph_query.py neighbors [--knowledge-dir knowledge/] python3 scripts/graph_query.py path [--max-hops 10] python3 scripts/graph_query.py subgraph [--depth 2] python3 scripts/graph_query.py stats # Graph statistics Outputs JSON to stdout. """ import argparse import json import sys import time from pathlib import Path from collections import defaultdict, deque from typing import Optional # --- Graph building --- def load_index(knowledge_dir: Path) -> dict: index_path = knowledge_dir / "index.json" if not index_path.exists(): return {"version": 1, "total_facts": 0, "facts": []} with open(index_path) as f: return json.load(f) def build_adjacency(facts: list[dict]) -> dict: """Build undirected adjacency list from fact 'related' fields.""" adj = defaultdict(set) id_to_fact = {} for fact in facts: fid = fact.get("id") if not fid: continue id_to_fact[fid] = fact for related_id in fact.get("related", []): adj[fid].add(related_id) adj[related_id].add(fid) # undirected return dict(adj), id_to_fact # --- Queries --- def query_neighbors(fact_id: str, adj: dict, id_to_fact: dict) -> dict: """Return directly connected facts.""" neighbors = list(adj.get(fact_id, set())) return { "query": "neighbors", "fact_id": fact_id, "neighbors": [ {"id": nid, "fact": id_to_fact.get(nid, {}).get("fact", ""), "category": id_to_fact.get(nid, {}).get("category", "")} for nid in neighbors if nid in id_to_fact ], "count": len(neighbors), } def query_path(from_id: str, to_id: str, adj: dict, max_hops: int = 10) -> dict: """Find shortest path between two facts using BFS.""" if from_id not in adj or to_id not in adj: return {"query": "path", "from": from_id, "to": to_id, "path": None, "error": "Fact not found in graph"} if from_id == to_id: return {"query": "path", "from": from_id, "to": to_id, "path": [from_id], "length": 0} queue = deque([(from_id, [from_id])]) visited = {from_id} while queue: current, path = queue.popleft() if len(path) > max_hops: continue for neighbor in adj.get(current, []): if neighbor == to_id: return {"query": "path", "from": from_id, "to": to_id, "path": path + [to_id], "length": len(path)} if neighbor not in visited: visited.add(neighbor) queue.append((neighbor, path + [neighbor])) return {"query": "path", "from": from_id, "to": to_id, "path": None, "error": f"No path found within {max_hops} hops"} def query_subgraph(fact_id: str, adj: dict, id_to_fact: dict, depth: int = 2) -> dict: """Extract connected subgraph within N hops.""" if fact_id not in adj: return {"query": "subgraph", "fact_id": fact_id, "nodes": [], "edges": [], "error": "Fact not found"} visited = set() queue = deque([(fact_id, 0)]) subgraph_nodes = set() subgraph_edges = [] while queue: node, d = queue.popleft() if node in visited or d > depth: continue visited.add(node) subgraph_nodes.add(node) for neighbor in adj.get(node, []): subgraph_edges.append({"source": node, "target": neighbor}) if neighbor not in visited: queue.append((neighbor, d + 1)) return { "query": "subgraph", "fact_id": fact_id, "depth": depth, "nodes": [ {"id": nid, "fact": id_to_fact.get(nid, {}).get("fact", ""), "category": id_to_fact.get(nid, {}).get("category", "")} for nid in sorted(subgraph_nodes) ], "edges": [{"source": e["source"], "target": e["target"]} for e in subgraph_edges], "node_count": len(subgraph_nodes), "edge_count": len(subgraph_edges), } def query_stats(adj: dict, id_to_fact: dict) -> dict: """Graph statistics.""" return { "statistics": { "total_facts": len(id_to_fact), "total_edges": sum(len(neighbors) for neighbors in adj.values()) // 2, "connected_components": 0, # TODO: compute if needed "average_degree": sum(len(neighbors) for neighbors in adj.values()) / len(adj) if adj else 0, } } # --- CLI --- def main(): parser = argparse.ArgumentParser(description="Graph query engine for knowledge store") parser.add_argument("command", choices=["neighbors", "path", "subgraph", "stats"]) parser.add_argument("from_id", nargs="?", help="Starting fact ID") parser.add_argument("to_id", nargs="?", help="Target fact ID (for path query)") parser.add_argument("--knowledge-dir", default="knowledge", help="Knowledge directory") parser.add_argument("--depth", type=int, default=2, help="Depth for subgraph query") parser.add_argument("--max-hops", type=int, default=10, help="Max hops for path query") args = parser.parse_args() start = time.time() knowledge_dir = Path(args.knowledge_dir) index = load_index(knowledge_dir) facts = index.get("facts", []) adj, id_to_fact = build_adjacency(facts) result = None if args.command == "neighbors": if not args.from_id: print("ERROR: neighbors requires ", file=sys.stderr) sys.exit(1) result = query_neighbors(args.from_id, adj, id_to_fact) elif args.command == "path": if not args.from_id or not args.to_id: print("ERROR: path requires ", file=sys.stderr) sys.exit(1) result = query_path(args.from_id, args.to_id, adj, max_hops=args.max_hops) elif args.command == "subgraph": if not args.from_id: print("ERROR: subgraph requires ", file=sys.stderr) sys.exit(1) result = query_subgraph(args.from_id, adj, id_to_fact, depth=args.depth) elif args.command == "stats": result = query_stats(adj, id_to_fact) result["elapsed_ms"] = round((time.time() - start) * 1000, 2) print(json.dumps(result, indent=2)) if __name__ == "__main__": main()