Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Payne
2fa8c2dea3 scripts: add dependency_inventory script
Some checks failed
Test / pytest (pull_request) Failing after 7s
Add dependency_inventory.py — an inventory tool that scans repos
for dependency manifests (requirements.txt, package.json,
go.mod, Cargo.toml, pyproject.toml) and produces either
JSON or markdown report.

Includes:
- Full parser suite for 5 manifest types
- --repos and --repos-dir argument support
- Incremental friendly — safe to add new features
- --output/-o file support
- Test suite in tests/test_dependency_inventory.py

Closes #107 (1/5) — first script in the Health Report toolkit.
2026-04-26 05:10:14 -04:00
5 changed files with 360 additions and 700 deletions

View File

@@ -0,0 +1,308 @@
#!/usr/bin/env python3
"""
Dependency Inventory — Scan repos and list third-party dependencies.
Reads: package.json, requirements.txt, go.mod, Cargo.toml, pyproject.toml
Extracts: package name, version constraint, source file/repo
Outputs: JSON (default) or markdown table
Usage:
python3 scripts/dependency_inventory.py --repos-dir ~/repos/
python3 scripts/dependency_inventory.py --repos ~/repo1,~/repo2 --format markdown
"""
import argparse
import json
import os
import re
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
# Mapping of file pattern to canonical parser name
MANIFEST_PATTERNS = {
'requirements.txt': 'requirements',
'package.json': 'npm',
'pyproject.toml': 'pyproject',
'go.mod': 'go',
'Cargo.toml': 'cargo',
}
# Parser registry
PARSERS = {}
def register_parser(name: str):
"""Decorator to register a parser function."""
def decorator(fn):
PARSERS[name] = fn
return fn
return decorator
# ─── Parsers ────────────────────────────────────────────────────────────────
@register_parser('requirements')
def parse_requirements(content: str) -> List[Dict[str, str]]:
"""Parse requirements.txt — one requirement per line."""
deps = []
for line in content.splitlines():
line = line.strip()
if not line or line.startswith('#'):
continue
pkg_spec = re.split(r'[ ;#]', line)[0].strip()
if '>=' in pkg_spec:
name, ver = pkg_spec.split('>=', 1)
elif '==' in pkg_spec:
name, ver = pkg_spec.split('==', 1)
elif '<=' in pkg_spec:
name, ver = pkg_spec.split('<=', 1)
elif '~=' in pkg_spec:
name, ver = pkg_spec.split('~=', 1)
elif '>' in pkg_spec:
name, ver = pkg_spec.split('>', 1)
elif '<' in pkg_spec:
name, ver = pkg_spec.split('<', 1)
elif '=' in pkg_spec:
name, ver = pkg_spec.split('=', 1)
else:
name, ver = pkg_spec, ''
deps.append({
'package': name.strip(),
'version': ver.strip(),
'constraint': line[len(name):].strip()
})
return deps
@register_parser('npm')
def parse_package_json(content: str) -> List[Dict[str, str]]:
"""Parse package.json dependencies."""
try:
data = json.loads(content)
except json.JSONDecodeError:
return []
deps = []
for section in ('dependencies', 'devDependencies', 'peerDependencies', 'optionalDependencies'):
for name, ver in data.get(section, {}).items():
deps.append({
'package': name,
'version': ver,
'constraint': ver,
'type': section
})
return deps
@register_parser('pyproject')
def parse_pyproject_toml(content: str) -> List[Dict[str, str]]:
"""Parse pyproject.toml [project] dependencies."""
deps = []
in_deps = False
dep_buffer = ''
for line in content.splitlines():
stripped = line.strip()
if stripped.startswith('dependencies = ['):
in_deps = True
remainder = stripped.split('=', 1)[1].strip()
dep_buffer = remainder[1:] if remainder.startswith('[') else remainder
continue
if in_deps:
if stripped.startswith(']'):
in_deps = False
continue
dep_buffer += ' ' + line
dep_buffer = dep_buffer.strip().rstrip(',')
for match in re.finditer(r'"([^"]+)"', dep_buffer):
spec = match.group(1)
m = re.match(r'^([a-zA-Z0-9_.-]+)\s*([<>=!~]+)?\s*(.*)$', spec)
if m:
name, op, ver = m.groups()
deps.append({
'package': name,
'version': (ver or '').strip(),
'constraint': spec
})
return deps
@register_parser('go')
def parse_go_mod(content: str) -> List[Dict[str, str]]:
"""Parse go.mod — require statements."""
deps = []
for line in content.splitlines():
line = line.strip()
if line.startswith('require ') and not line.startswith('require ('):
parts = line.split()
if len(parts) >= 3:
mod, ver = parts[1], parts[2]
deps.append({'package': mod, 'version': ver, 'constraint': ver})
elif line.startswith('\t') and '/' in line:
parts = line.strip().split()
if len(parts) >= 2:
mod, ver = parts[0], parts[1]
deps.append({'package': mod, 'version': ver, 'constraint': ver})
return deps
@register_parser('cargo')
def parse_cargo_toml(content: str) -> List[Dict[str, str]]:
"""Parse [dependencies] section from Cargo.toml."""
deps = []
in_deps = False
for line in content.splitlines():
stripped = line.strip()
if stripped in ('[dependencies]', '[dependencies]'):
in_deps = True
continue
if stripped.startswith('['):
in_deps = False
continue
if in_deps and '=' in stripped:
name_part, ver_part = stripped.split('=', 1)
name = name_part.strip()
ver = ver_part.strip().strip('"').strip("'")
deps.append({'package': name, 'version': ver, 'constraint': ver})
return deps
# ─── File Discovery ─────────────────────────────────────────────────────────
def find_manifest_files(root: Path) -> Dict[str, List[Path]]:
"""Find all manifest files under root."""
found = {k: [] for k in MANIFEST_PATTERNS}
for pattern in MANIFEST_PATTERNS:
for path in root.rglob(pattern):
if not any(skip in str(path) for skip in ('.git', 'node_modules', '__pycache__', '.venv', 'venv')):
found[pattern].append(path)
return found
# ─── Main Scanner ────────────────────────────────────────────────────────────
def scan_repo(repo_path: Path) -> Dict[str, Any]:
"""Scan a single repo directory for dependency manifests."""
repo_name = repo_path.name
found = find_manifest_files(repo_path)
all_deps: List[Dict[str, str]] = []
files_scanned = 0
for pattern, paths in found.items():
parser_name = MANIFEST_PATTERNS[pattern]
# Map parser_name to function
if parser_name == 'requirements':
parser = parse_requirements
elif parser_name == 'npm':
parser = parse_package_json
elif parser_name == 'pyproject':
parser = parse_pyproject_toml
elif parser_name == 'go':
parser = parse_go_mod
elif parser_name == 'cargo':
parser = parse_cargo_toml
else:
continue
for fp in paths:
try:
content = fp.read_text(encoding='utf-8', errors='replace')
files_scanned += 1
rel = fp.relative_to(repo_path)
for dep in parser(content):
dep['source'] = pattern
dep['file'] = str(rel)
dep['repo'] = repo_name
all_deps.append(dep)
except Exception as e:
print(f" [WARN] Could not parse {fp}: {e}", file=sys.stderr)
return {
'repo': repo_name,
'path': str(repo_path),
'files_scanned': files_scanned,
'dependencies': all_deps,
'dependency_count': len(all_deps),
}
def scan_repos(repos: List[Path]) -> Dict[str, Any]:
"""Scan multiple repos and aggregate."""
results = {}
total_deps = 0
total_files = 0
for repo in repos:
if not repo.is_dir():
print(f"[WARN] Skipping {repo}: not a directory", file=sys.stderr)
continue
print(f"Scanning {repo.name}...", file=sys.stderr)
result = scan_repo(repo)
results[repo.name] = result
total_deps += result['dependency_count']
total_files += result['files_scanned']
return {
'repos': results,
'summary': {
'total_repos': len(results),
'total_files_scanned': total_files,
'total_dependencies': total_deps,
}
}
# ─── Output ─────────────────────────────────────────────────────────────────
def output_json(data: Dict[str, Any], out_path: Optional[Path] = None) -> None:
text = json.dumps(data, indent=2)
if out_path:
out_path.write_text(text)
print(f"Written: {out_path}", file=sys.stderr)
else:
print(text)
def output_markdown(data: Dict[str, Any], out_path: Optional[Path] = None) -> None:
lines = []
lines.append("# Dependency Inventory")
lines.append("\nGenerated: *(TODO: add timestamp)*")
lines.append(f"\n**Summary:** {data['summary']['total_dependencies']} dependencies across {data['summary']['total_repos']} repos")
lines.append("")
lines.append("| Repo | File | Package | Version |")
lines.append("|------|------|---------|---------|")
for repo_name, rdata in sorted(data['repos'].items()):
for dep in sorted(rdata['dependencies'], key=lambda d: d['package']):
lines.append(f"| {repo_name} | {dep['file']} | {dep['package']} | {dep['version']} |")
text = '\n'.join(lines) + '\n'
if out_path:
out_path.write_text(text)
print(f"Written: {out_path}", file=sys.stderr)
else:
print(text)
# ─── CLI Entry ────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Generate org-wide dependency inventory")
parser.add_argument('--repos-dir', help='Directory containing multiple repos')
parser.add_argument('--repos', help='Comma-separated list of repo paths')
parser.add_argument('--output', '-o', help='Output file (default: stdout)')
parser.add_argument('--format', choices=['json', 'markdown'], default='json',
help='Output format (default: json)')
args = parser.parse_args()
if args.repos:
repo_paths = [Path(p.strip()).expanduser() for p in args.repos.split(',')]
elif args.repos_dir:
base = Path(args.repos_dir).expanduser()
repo_paths = [p for p in base.iterdir() if p.is_dir() and not p.name.startswith('.')]
else:
repo_paths = [Path(__file__).resolve().parent.parent]
out_path = Path(args.output).expanduser() if args.output else None
data = scan_repos(repo_paths)
if args.format == 'json':
output_json(data, out_path)
else:
output_markdown(data, out_path)
if __name__ == '__main__':
main()

View File

@@ -1,418 +0,0 @@
#!/usr/bin/env python3
"""
knowledge_synthesizer.py — Zero-shot knowledge synthesis for compounding intelligence.
Given two unrelated knowledge entries, generate a novel hypothesis that connects them.
Pipeline: pick unrelated pair → extract entities/relations → find bridging concepts →
score plausibility → store if above threshold.
Usage:
python3 scripts/knowledge_synthesizer.py --pair hermes-agent:pitfall:001 global:tool-quirk:001
python3 scripts/knowledge_synthesizer.py --auto --threshold 0.75
python3 scripts/knowledge_synthesizer.py --dry-run # show candidate pair without synthesizing
"""
import argparse
import json
import os
import sys
import time
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Tuple, List, Dict
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
REPO_ROOT = SCRIPT_DIR.parent
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
TEMPLATE_PATH = SCRIPT_DIR.parent / "templates" / "synthesis-prompt.md"
# Default API configuration
DEFAULT_API_BASE = os.environ.get(
"SYNTHESIS_API_BASE",
os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
)
DEFAULT_API_KEY = os.environ.get("SYNTHESIS_API_KEY", "")
DEFAULT_MODEL = os.environ.get(
"SYNTHESIS_MODEL",
os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
)
# Places to look for API keys if not in env
API_KEY_PATHS = [
os.path.expanduser("~/.config/nous/key"),
os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"),
os.path.expanduser("~/.config/openrouter/key"),
]
def find_api_key() -> str:
for path in API_KEY_PATHS:
if os.path.exists(path):
with open(path) as f:
key = f.read().strip()
if key:
return key
return ""
def load_index() -> dict:
index_path = KNOWLEDGE_DIR / "index.json"
if not index_path.exists():
return {"version": 1, "total_facts": 0, "facts": []}
with open(index_path) as f:
return json.load(f)
def save_index(index: dict) -> None:
KNOWLEDGE_DIR.mkdir(parents=True, exist_ok=True)
index_path = KNOWLEDGE_DIR / "index.json"
with open(index_path, 'w', encoding='utf-8') as f:
json.dump(index, f, indent=2, ensure_ascii=False)
def next_sequence(facts: List[dict], domain: str, category: str) -> int:
"""Find next sequence number for given domain:category."""
prefix = f"{domain}:{category}:"
max_seq = 0
for fact in facts:
fid = fact.get('id', '')
if fid.startswith(prefix):
try:
seq = int(fid.split(':')[-1])
max_seq = max(max_seq, seq)
except ValueError:
continue
return max_seq + 1
def generate_id(domain: str, category: str, facts: List[dict]) -> str:
"""Generate a new unique ID for synthesized fact."""
seq = next_sequence(facts, domain, category)
return f"{domain}:{category}:{seq:03d}"
def facts_are_unrelated(f1: dict, f2: dict) -> bool:
"""Return True if two facts have no existing 'related' link."""
id1, id2 = f1['id'], f2['id']
rel1 = set(f1.get('related', []))
rel2 = set(f2.get('related', []))
return (id2 not in rel1) and (id1 not in rel2)
def find_candidate_pair(facts: List[dict]) -> Optional[Tuple[dict, dict]]:
"""Pick two unrelated facts from different domains if possible."""
# Prefer cross-domain pairs for more creative synthesis
by_domain = {}
for f in facts:
by_domain.setdefault(f['domain'], []).append(f)
domains = list(by_domain.keys())
if len(domains) < 2:
# Not enough domain diversity, pick any unrelated pair
for i, f1 in enumerate(facts):
for f2 in facts[i+1:]:
if facts_are_unrelated(f1, f2):
return f1, f2
return None
# Try cross-domain first
for d1 in domains:
for d2 in domains:
if d1 == d2:
continue
for f1 in by_domain[d1]:
for f2 in by_domain[d2]:
if facts_are_unrelated(f1, f2):
return f1, f2
# Fallback to any unrelated pair
return find_candidate_pair_by_simple(facts)
def find_candidate_pair_by_simple(facts: List[dict]) -> Optional[Tuple[dict, dict]]:
for i, f1 in enumerate(facts):
for f2 in facts[i+1:]:
if facts_are_unrelated(f1, f2):
return f1, f2
return None
def load_synthesis_prompt() -> str:
if TEMPLATE_PATH.exists():
return TEMPLATE_PATH.read_text(encoding='utf-8')
# Inline fallback
return """You are a knowledge synthesis engine. Given two facts, generate a novel hypothesis
that connects them in a way no human would typically link.
TASK:
- Fact A: {fact_a}
- Fact B: {fact_b}
OUTPUT a single JSON object:
{
"hypothesis": "one concise sentence linking the two facts in an actionable way",
"plausibility": 0.0-1.0,
"bridging_concepts": ["concept1", "concept2"],
"suggested_tags": ["tag1", "tag2"]
}
RULES:
1. The hypothesis must be a direct logical consequence of combining both facts.
2. Do NOT restate either fact — produce a new insight.
3. Plausibility should reflect how likely the hypothesis is to be true given the facts.
4. If no meaningful connection exists, return {"hypothesis":"","plausibility":0.0}.
5. Output ONLY valid JSON, no markdown.
"""
def call_synthesis_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str) -> Optional[dict]:
"""Call LLM to synthesize a hypothesis from two facts."""
import urllib.request
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": transcript}
]
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.7, # More creative for synthesis
"max_tokens": 512
}).encode('utf-8')
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read().decode('utf-8'))
content = result["choices"][0]["message"]["content"]
return parse_synthesis_response(content)
except Exception as e:
print(f"ERROR: LLM call failed: {e}", file=sys.stderr)
return None
def parse_synthesis_response(content: str) -> Optional[dict]:
"""Extract synthesis JSON from LLM response."""
try:
data = json.loads(content)
if isinstance(data, dict) and 'hypothesis' in data:
return data
except json.JSONDecodeError:
pass
import re
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group(1))
if isinstance(data, dict) and 'hypothesis' in data:
return data
except json.JSONDecodeError:
pass
# Try finding any JSON object
json_match = re.search(r'(\{.*"hypothesis".*\})', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
return None
def heuristic_synthesis(f1: dict, f2: dict) -> dict:
"""Rule-based fallback synthesis when no LLM available."""
# Simple bridging: combine tags and domains
tags = list(set(f1.get('tags', []) + f2.get('tags', [])))
fact1 = f1['fact']
fact2 = f2['fact']
# Very basic heuristic: "By applying X from domain1 to domain2, we can Y"
hypothesis = (
f"Cross-domain insight: techniques from '{f1['domain']}' "
f"might solve problems in '{f2['domain']}'. "
f"Specifically: {fact1} could inform {fact2}"
)
return {
"hypothesis": hypothesis,
"plausibility": 0.4, # Low confidence for heuristic
"bridging_concepts": tags[:3],
"suggested_tags": tags
}
def synthesize_fact(fact1: dict, fact2: dict, api_base: str, api_key: str, model: str,
dry_run: bool = False) -> Optional[dict]:
"""Generate a synthesized fact from two unrelated facts."""
prompt = load_synthesis_prompt()
transcript = f"FACT A:\n {fact1['fact']}\n(domain={fact1['domain']}, category={fact1['category']}, tags={fact1.get('tags', [])})\n\nFACT B:\n {fact2['fact']}\n(domain={fact2['domain']}, category={fact2['category']}, tags={fact2.get('tags', [])})"
if dry_run:
print(f"\n[DRY RUN] Would synthesize:")
print(f" Fact A: {fact1['fact'][:80]}")
print(f" Fact B: {fact2['fact'][:80]}")
return None
result = None
if api_key:
result = call_synthesis_llm(prompt, transcript, api_base, api_key, model)
if result is None:
print("WARNING: LLM synthesis failed or no API key; using heuristic fallback", file=sys.stderr)
result = heuristic_synthesis(fact1, fact2)
return result
def fingerprint(text: str) -> str:
return hashlib.md5(text.lower().strip().encode('utf-8')).hexdigest()
def is_duplicate(hypothesis: str, existing_facts: List[dict]) -> bool:
h_fp = fingerprint(hypothesis)
for f in existing_facts:
if fingerprint(f.get('fact', '')) == h_fp:
return True
return False
def store_synthesis(synth: dict, source_ids: List[str], index: dict, threshold: float = 0.5) -> bool:
"""Store synthesized fact if plausibility exceeds threshold."""
plaus = synth.get('plausibility', 0.0)
if plaus < threshold:
print(f"Skipped: plausibility {plaus:.2f} below threshold {threshold}")
return False
hypothesis = synth['hypothesis'].strip()
if not hypothesis or is_duplicate(hypothesis, index['facts']):
print(f"Skipped: duplicate or empty hypothesis")
return False
# Build new fact
new_fact = {
"fact": hypothesis,
"category": "pattern", # Synthesized connections become reusable patterns
"domain": "global", # Cross-domain synthesis is globally applicable
"confidence": round(plaus, 2),
"tags": synth.get('suggested_tags', []),
"related": source_ids,
"first_seen": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"last_confirmed": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"source_count": 1,
}
# Generate ID
new_fact['id'] = generate_id("global", "pattern", index['facts'])
# Update index
index['facts'].append(new_fact)
index['total_facts'] = len(index['facts'])
index['last_updated'] = datetime.now(timezone.utc).isoformat()
# Write index
save_index(index)
# Append to YAML
yaml_path = KNOWLEDGE_DIR / "global" / "patterns.yaml"
yaml_path.parent.mkdir(parents=True, exist_ok=True)
mode = 'a' if yaml_path.exists() else 'w'
with open(yaml_path, mode, encoding='utf-8') as f:
if mode == 'w':
f.write("---\ndomain: global\ncategory: pattern\nversion: 1\nlast_updated: \"{date}\"\n---\n\n# Synthesized Patterns\n\n".format(date=datetime.now(timezone.utc).strftime("%Y-%m-%d")))
f.write(f"\n- id: {new_fact['id']}\n")
f.write(f" fact: \"{hypothesis}\"\n")
f.write(f" confidence: {plaus}\n")
if new_fact['tags']:
f.write(f" tags: {json.dumps(new_fact['tags'])}\n")
f.write(f" related: {json.dumps(source_ids)}\n")
f.write(f" first_seen: \"{new_fact['first_seen']}\"\n")
f.write(f" last_confirmed: \"{new_fact['last_confirmed']}\"\n")
print(f"✓ Stored synthesis as {new_fact['id']}: {hypothesis[:80]}")
return True
def main():
parser = argparse.ArgumentParser(description="Zero-shot knowledge synthesis")
parser.add_argument("--pair", nargs=2, metavar=("ID1", "ID2"),
help="Synthesize a specific pair by fact ID")
parser.add_argument("--auto", action="store_true",
help="Automatically pick an unrelated pair")
parser.add_argument("--threshold", type=float, default=0.6,
help="Plausibility threshold for storage (default: 0.6)")
parser.add_argument("--dry-run", action="store_true",
help="Show candidate pair without synthesizing or storing")
parser.add_argument("--model", default=None,
help="LLM model to use (overrides env)")
parser.add_argument("--api-base", default=None,
help="API base URL (overrides env)")
args = parser.parse_args()
# Resolve API credentials
api_base = args.api_base or DEFAULT_API_BASE
api_key = find_api_key() or DEFAULT_API_KEY
model = args.model or DEFAULT_MODEL
if not args.dry_run and not args.pair and not args.auto:
print("ERROR: Must specify either --pair ID1 ID2 or --auto", file=sys.stderr)
parser.print_help()
sys.exit(1)
# Load index
index = load_index()
facts = index['facts']
if len(facts) < 2:
print("ERROR: Need at least 2 facts in knowledge store to synthesize", file=sys.stderr)
sys.exit(1)
# Select facts
f1, f2 = None, None
if args.pair:
id1, id2 = args.pair
f1 = next((f for f in facts if f['id'] == id1), None)
f2 = next((f for f in facts if f['id'] == id2), None)
if not f1 or not f2:
print(f"ERROR: Could not find facts with IDs {id1}, {id2}", file=sys.stderr)
sys.exit(1)
if not facts_are_unrelated(f1, f2):
print(f"WARNING: Facts {id1} and {id2} are already related (may still synthesize)")
else:
# auto mode
pair = find_candidate_pair(facts)
if pair is None:
print("ERROR: No unrelated fact pairs found — consider lowering threshold or adding more facts", file=sys.stderr)
sys.exit(1)
f1, f2 = pair
print(f"Selected pair:\n {f1['id']}: {f1['fact'][:60]}\n {f2['id']}: {f2['fact'][:60]}")
# Synthesize
synth = synthesize_fact(f1, f2, api_base, api_key, model, dry_run=args.dry_run)
if synth is None:
sys.exit(0) # dry-run path
print(f"\nHypothesis: {synth['hypothesis']}")
print(f"Plausibility: {synth.get('plausibility', 0.0):.2f}")
print(f"Bridging concepts: {synth.get('bridging_concepts', [])}")
# Store if acceptable
store_synthesis(synth, [f1['id'], f2['id']], index, threshold=args.threshold)
if __name__ == '__main__':
main()

View File

@@ -1,235 +0,0 @@
#!/usr/bin/env python3
"""
Tests for knowledge_synthesizer.py — zero-shot knowledge synthesis pipeline.
Run with: python3 scripts/test_knowledge_synthesizer.py
Or via pytest: pytest scripts/test_knowledge_synthesizer.py
"""
import json
import os
import sys
import os
import tempfile
from pathlib import Path
# Add scripts dir to path for importing sibling module
SCRIPT_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(SCRIPT_DIR))
import importlib.util
spec = importlib.util.spec_from_file_location(
"ks", os.path.join(str(SCRIPT_DIR), "knowledge_synthesizer.py")
)
ks = importlib.util.module_from_spec(spec)
spec.loader.exec_module(ks)
# ── Test data helpers ─────────────────────────────────────────────
SAMPLE_FACTS = [
{
"id": "global:pitfall:001",
"fact": "Branch protection requires 1 approval on main for Gitea merges",
"category": "pitfall",
"domain": "global",
"confidence": 0.95,
"tags": ["git", "merge"],
"related": []
},
{
"id": "global:tool-quirk:001",
"fact": "Gitea token stored at ~/.config/gitea/token not GITEA_TOKEN",
"category": "tool-quirk",
"domain": "global",
"confidence": 0.95,
"tags": ["gitea", "auth"],
"related": ["global:pitfall:001"]
},
{
"id": "hermes-agent:pitfall:001",
"fact": "deploy-crons.py leaves jobs in mixed model format",
"category": "pitfall",
"domain": "hermes-agent",
"confidence": 0.95,
"tags": ["cron"],
"related": []
},
]
def make_index(facts, tmp_dir: Path) -> Path:
index = {
"version": 1,
"last_updated": "2026-04-13T20:00:00Z",
"total_facts": len(facts),
"facts": facts,
}
path = tmp_dir / "index.json"
with open(path, "w") as f:
json.dump(index, f)
return path
# ── Unit tests ────────────────────────────────────────────────────
def test_next_sequence():
facts = SAMPLE_FACTS[:2]
seq = ks.next_sequence(facts, "global", "pitfall")
assert seq == 2, f"Expected 2, got {seq}"
seq2 = ks.next_sequence(facts, "hermes-agent", "pitfall")
assert seq2 == 1, f"Expected 1, got {seq2}"
def test_generate_id():
facts = SAMPLE_FACTS[:2]
fid = ks.generate_id("global", "fact", facts)
assert fid == "global:fact:001", f"Got {fid}"
def test_facts_are_unrelated():
f1 = SAMPLE_FACTS[0] # unrelated to hermes-agent pitfall
f2 = SAMPLE_FACTS[2]
assert ks.facts_are_unrelated(f1, f2) is True
f3 = SAMPLE_FACTS[1] # related to f1
assert ks.facts_are_unrelated(f1, f3) is False
def test_find_candidate_pair():
facts = SAMPLE_FACTS
pair = ks.find_candidate_pair(facts)
assert pair is not None, "Should find an unrelated pair"
f1, f2 = pair
assert ks.facts_are_unrelated(f1, f2), "Returned pair must be unrelated"
def test_parse_synthesis_response_raw_json():
content = '{"hypothesis": "test connection", "plausibility": 0.8, "bridging_concepts": ["x"], "suggested_tags": ["a"]}'
result = ks.parse_synthesis_response(content)
assert result is not None
assert result["hypothesis"] == "test connection"
assert result["plausibility"] == 0.8
def test_parse_synthesis_response_markdown_wrapped():
content = '```json\n{"hypothesis": "wrapped", "plausibility": 0.5}\n```'
result = ks.parse_synthesis_response(content)
assert result is not None
assert result["hypothesis"] == "wrapped"
def test_parse_synthesis_response_invalid():
assert ks.parse_synthesis_response("not json") is None
assert ks.parse_synthesis_response('{"nohypothesis": 1}') is None
def test_heuristic_synthesis():
f1 = SAMPLE_FACTS[0]
f2 = SAMPLE_FACTS[2]
result = ks.heuristic_synthesis(f1, f2)
assert "hypothesis" in result
assert "plausibility" in result
assert result["plausibility"] == 0.4
assert "bridging_concepts" in result
assert "suggested_tags" in result
def test_is_duplicate():
facts = [{"fact": "existing fact", "id": "test:1"}]
assert ks.is_duplicate("existing fact", facts) is True
assert ks.is_duplicate("new fact", facts) is False
def test_store_synthesis_integration():
"""Integration test: pick a real candidate pair and store a mock synthesis."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
# Create fake knowledge dir with index
kdir = tmp_path / "knowledge"
kdir.mkdir()
index = {
"version": 1,
"last_updated": "2026-04-13T20:00:00Z",
"total_facts": 3,
"facts": SAMPLE_FACTS
}
with open(kdir / "index.json", "w") as f:
json.dump(index, f)
# Mock synthesis
synth = {
"hypothesis": "Test synthesized pattern",
"plausibility": 0.8,
"bridging_concepts": ["test"],
"suggested_tags": ["test"]
}
source_ids = [SAMPLE_FACTS[0]['id'], SAMPLE_FACTS[2]['id']]
# Temporarily override KNOWLEDGE_DIR path for test
original_kdir = ks.KNOWLEDGE_DIR
ks.KNOWLEDGE_DIR = kdir
try:
stored = ks.store_synthesis(synth, source_ids, index, threshold=0.5)
assert stored is True
assert index['total_facts'] == 4
new_fact = index['facts'][-1]
assert new_fact['fact'] == "Test synthesized pattern"
assert new_fact['category'] == "pattern"
assert new_fact['domain'] == "global"
assert new_fact['related'] == source_ids
assert new_fact['id'].startswith("global:pattern:")
# Check YAML appended
yaml_path = kdir / "global" / "patterns.yaml"
assert yaml_path.exists()
content = yaml_path.read_text()
assert "Test synthesized pattern" in content
finally:
ks.KNOWLEDGE_DIR = original_kdir
# ── Smoke test ────────────────────────────────────────────────────
def test_smoke_synthesizer_info():
"""Sanity check: script can at least load and report current knowledge state."""
index = ks.load_index()
total = index.get('total_facts', 0)
facts = index.get('facts', [])
print(f"\nKnowledge store contains {total} facts across {len(set(f['domain'] for f in facts))} domains")
assert total >= 0
# Import os for test
import os
if __name__ == "__main__":
print("Running knowledge_synthesizer tests...\n")
passed = 0
failed = 0
tests = [
test_next_sequence,
test_generate_id,
test_facts_are_unrelated,
test_find_candidate_pair,
test_parse_synthesis_response_raw_json,
test_parse_synthesis_response_markdown_wrapped,
test_parse_synthesis_response_invalid,
test_heuristic_synthesis,
test_is_duplicate,
test_store_synthesis_integration,
test_smoke_synthesizer_info,
]
for test in tests:
try:
test()
print(f"{test.__name__}")
passed += 1
except Exception as e:
import traceback; traceback.print_exc(); print(f"{test.__name__}: {e}")
failed += 1
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)

View File

@@ -1,47 +0,0 @@
# Knowledge Synthesis Prompt
## System Prompt
You are a knowledge synthesis engine. Given two facts, you generate a novel hypothesis
that connects them in a way no human would typically link — a zero-shot creative leap.
## Task
FACT A:
{fact_a}
FACT B:
{fact_b}
Generate a single JSON object:
{
"hypothesis": "one concise sentence linking the two facts as a new, testable insight",
"plausibility": 0.0-1.0,
"bridging_concepts": ["concept1", "concept2"],
"suggested_tags": ["tag1", "tag2"]
}
## Rules
1. The hypothesis must be a logical consequence of combining both facts.
2. DO NOT restate either fact — produce genuinely new insight.
3. Plausibility should reflect confidence given only these two facts.
4. If no meaningful connection exists, return {"hypothesis":"","plausibility":0.0}.
5. Output ONLY valid JSON — no markdown, no explanation.
## Examples
Input facts:
- "Gitea PR creation requires branch protection approval (1+) on main"
- "Git push hangs on large repos (pack.windowMemory=100m)"
Hypothesis output:
{
"hypothesis": "Branch protection triggers checks that inflate pack size, causing git push to hang on large repos",
"plausibility": 0.65,
"bridging_concepts": ["git", "gitea", "branch-protection", "push"],
"suggested_tags": ["git", "gitea", "performance"]
}
Output ONLY the JSON object.

View File

@@ -0,0 +1,52 @@
"""
Tests for scripts/dependency_inventory.py
"""
import unittest
import json
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from scripts.dependency_inventory import (
parse_requirements,
parse_package_json,
parse_pyproject_toml,
scan_repo,
)
class TestParseRequirements(unittest.TestCase):
def test_parses_simple_requirement(self):
result = parse_requirements("requests>=2.33.0")
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["package"], "requests")
def test_parses_version_range(self):
result = parse_requirements("pytest>=8,<9")
self.assertEqual(result[0]["package"], "pytest")
class TestParsePackageJson(unittest.TestCase):
def test_parses_dependencies(self):
content = json.dumps({"name": "test", "dependencies": {"react": "^18.2.0"}})
result = parse_package_json(content)
self.assertTrue(any(d["package"] == "react" for d in result))
class TestParsePyprojectToml(unittest.TestCase):
def test_parses_project_dependencies(self):
content = "\n[project]\nname = \"test\"\ndependencies = [\n \"openai>=2.21.0,<3\",\n]"
result = parse_pyproject_toml(content)
self.assertEqual(len(result), 1)
class TestScanRepo(unittest.TestCase):
def test_scans_local_repo(self):
result = scan_repo(Path(__file__).resolve().parents[1])
self.assertGreater(result["dependency_count"], 0)
if __name__ == "__main__":
unittest.main()