Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Payne
d218ac79d9 feat: add zero-shot knowledge synthesizer (#205)
Some checks failed
Test / pytest (pull_request) Failing after 10s
Implement knowledge_synthesizer.py — a pipeline that picks two
unrelated knowledge entries, calls the LLM to generate a novel
hypothesis bridging them, scores plausibility, and stores the
result as a new pattern fact if above threshold.

- scripts/knowledge_synthesizer.py: main pipeline
- templates/synthesis-prompt.md: LLM prompt
- scripts/test_knowledge_synthesizer.py: 11 tests, all passing
- Supports both LLM synthesis and heuristic fallback
- Respects existing knowledge deduplication
- Integration test demonstrates end-to-end storage
2026-04-26 05:27:29 -04:00
5 changed files with 700 additions and 269 deletions

View File

@@ -0,0 +1,418 @@
#!/usr/bin/env python3
"""
knowledge_synthesizer.py — Zero-shot knowledge synthesis for compounding intelligence.
Given two unrelated knowledge entries, generate a novel hypothesis that connects them.
Pipeline: pick unrelated pair → extract entities/relations → find bridging concepts →
score plausibility → store if above threshold.
Usage:
python3 scripts/knowledge_synthesizer.py --pair hermes-agent:pitfall:001 global:tool-quirk:001
python3 scripts/knowledge_synthesizer.py --auto --threshold 0.75
python3 scripts/knowledge_synthesizer.py --dry-run # show candidate pair without synthesizing
"""
import argparse
import json
import os
import sys
import time
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Tuple, List, Dict
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
REPO_ROOT = SCRIPT_DIR.parent
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
TEMPLATE_PATH = SCRIPT_DIR.parent / "templates" / "synthesis-prompt.md"
# Default API configuration
DEFAULT_API_BASE = os.environ.get(
"SYNTHESIS_API_BASE",
os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
)
DEFAULT_API_KEY = os.environ.get("SYNTHESIS_API_KEY", "")
DEFAULT_MODEL = os.environ.get(
"SYNTHESIS_MODEL",
os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
)
# Places to look for API keys if not in env
API_KEY_PATHS = [
os.path.expanduser("~/.config/nous/key"),
os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"),
os.path.expanduser("~/.config/openrouter/key"),
]
def find_api_key() -> str:
for path in API_KEY_PATHS:
if os.path.exists(path):
with open(path) as f:
key = f.read().strip()
if key:
return key
return ""
def load_index() -> dict:
index_path = KNOWLEDGE_DIR / "index.json"
if not index_path.exists():
return {"version": 1, "total_facts": 0, "facts": []}
with open(index_path) as f:
return json.load(f)
def save_index(index: dict) -> None:
KNOWLEDGE_DIR.mkdir(parents=True, exist_ok=True)
index_path = KNOWLEDGE_DIR / "index.json"
with open(index_path, 'w', encoding='utf-8') as f:
json.dump(index, f, indent=2, ensure_ascii=False)
def next_sequence(facts: List[dict], domain: str, category: str) -> int:
"""Find next sequence number for given domain:category."""
prefix = f"{domain}:{category}:"
max_seq = 0
for fact in facts:
fid = fact.get('id', '')
if fid.startswith(prefix):
try:
seq = int(fid.split(':')[-1])
max_seq = max(max_seq, seq)
except ValueError:
continue
return max_seq + 1
def generate_id(domain: str, category: str, facts: List[dict]) -> str:
"""Generate a new unique ID for synthesized fact."""
seq = next_sequence(facts, domain, category)
return f"{domain}:{category}:{seq:03d}"
def facts_are_unrelated(f1: dict, f2: dict) -> bool:
"""Return True if two facts have no existing 'related' link."""
id1, id2 = f1['id'], f2['id']
rel1 = set(f1.get('related', []))
rel2 = set(f2.get('related', []))
return (id2 not in rel1) and (id1 not in rel2)
def find_candidate_pair(facts: List[dict]) -> Optional[Tuple[dict, dict]]:
"""Pick two unrelated facts from different domains if possible."""
# Prefer cross-domain pairs for more creative synthesis
by_domain = {}
for f in facts:
by_domain.setdefault(f['domain'], []).append(f)
domains = list(by_domain.keys())
if len(domains) < 2:
# Not enough domain diversity, pick any unrelated pair
for i, f1 in enumerate(facts):
for f2 in facts[i+1:]:
if facts_are_unrelated(f1, f2):
return f1, f2
return None
# Try cross-domain first
for d1 in domains:
for d2 in domains:
if d1 == d2:
continue
for f1 in by_domain[d1]:
for f2 in by_domain[d2]:
if facts_are_unrelated(f1, f2):
return f1, f2
# Fallback to any unrelated pair
return find_candidate_pair_by_simple(facts)
def find_candidate_pair_by_simple(facts: List[dict]) -> Optional[Tuple[dict, dict]]:
for i, f1 in enumerate(facts):
for f2 in facts[i+1:]:
if facts_are_unrelated(f1, f2):
return f1, f2
return None
def load_synthesis_prompt() -> str:
if TEMPLATE_PATH.exists():
return TEMPLATE_PATH.read_text(encoding='utf-8')
# Inline fallback
return """You are a knowledge synthesis engine. Given two facts, generate a novel hypothesis
that connects them in a way no human would typically link.
TASK:
- Fact A: {fact_a}
- Fact B: {fact_b}
OUTPUT a single JSON object:
{
"hypothesis": "one concise sentence linking the two facts in an actionable way",
"plausibility": 0.0-1.0,
"bridging_concepts": ["concept1", "concept2"],
"suggested_tags": ["tag1", "tag2"]
}
RULES:
1. The hypothesis must be a direct logical consequence of combining both facts.
2. Do NOT restate either fact — produce a new insight.
3. Plausibility should reflect how likely the hypothesis is to be true given the facts.
4. If no meaningful connection exists, return {"hypothesis":"","plausibility":0.0}.
5. Output ONLY valid JSON, no markdown.
"""
def call_synthesis_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str) -> Optional[dict]:
"""Call LLM to synthesize a hypothesis from two facts."""
import urllib.request
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": transcript}
]
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.7, # More creative for synthesis
"max_tokens": 512
}).encode('utf-8')
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read().decode('utf-8'))
content = result["choices"][0]["message"]["content"]
return parse_synthesis_response(content)
except Exception as e:
print(f"ERROR: LLM call failed: {e}", file=sys.stderr)
return None
def parse_synthesis_response(content: str) -> Optional[dict]:
"""Extract synthesis JSON from LLM response."""
try:
data = json.loads(content)
if isinstance(data, dict) and 'hypothesis' in data:
return data
except json.JSONDecodeError:
pass
import re
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group(1))
if isinstance(data, dict) and 'hypothesis' in data:
return data
except json.JSONDecodeError:
pass
# Try finding any JSON object
json_match = re.search(r'(\{.*"hypothesis".*\})', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
return None
def heuristic_synthesis(f1: dict, f2: dict) -> dict:
"""Rule-based fallback synthesis when no LLM available."""
# Simple bridging: combine tags and domains
tags = list(set(f1.get('tags', []) + f2.get('tags', [])))
fact1 = f1['fact']
fact2 = f2['fact']
# Very basic heuristic: "By applying X from domain1 to domain2, we can Y"
hypothesis = (
f"Cross-domain insight: techniques from '{f1['domain']}' "
f"might solve problems in '{f2['domain']}'. "
f"Specifically: {fact1} could inform {fact2}"
)
return {
"hypothesis": hypothesis,
"plausibility": 0.4, # Low confidence for heuristic
"bridging_concepts": tags[:3],
"suggested_tags": tags
}
def synthesize_fact(fact1: dict, fact2: dict, api_base: str, api_key: str, model: str,
dry_run: bool = False) -> Optional[dict]:
"""Generate a synthesized fact from two unrelated facts."""
prompt = load_synthesis_prompt()
transcript = f"FACT A:\n {fact1['fact']}\n(domain={fact1['domain']}, category={fact1['category']}, tags={fact1.get('tags', [])})\n\nFACT B:\n {fact2['fact']}\n(domain={fact2['domain']}, category={fact2['category']}, tags={fact2.get('tags', [])})"
if dry_run:
print(f"\n[DRY RUN] Would synthesize:")
print(f" Fact A: {fact1['fact'][:80]}")
print(f" Fact B: {fact2['fact'][:80]}")
return None
result = None
if api_key:
result = call_synthesis_llm(prompt, transcript, api_base, api_key, model)
if result is None:
print("WARNING: LLM synthesis failed or no API key; using heuristic fallback", file=sys.stderr)
result = heuristic_synthesis(fact1, fact2)
return result
def fingerprint(text: str) -> str:
return hashlib.md5(text.lower().strip().encode('utf-8')).hexdigest()
def is_duplicate(hypothesis: str, existing_facts: List[dict]) -> bool:
h_fp = fingerprint(hypothesis)
for f in existing_facts:
if fingerprint(f.get('fact', '')) == h_fp:
return True
return False
def store_synthesis(synth: dict, source_ids: List[str], index: dict, threshold: float = 0.5) -> bool:
"""Store synthesized fact if plausibility exceeds threshold."""
plaus = synth.get('plausibility', 0.0)
if plaus < threshold:
print(f"Skipped: plausibility {plaus:.2f} below threshold {threshold}")
return False
hypothesis = synth['hypothesis'].strip()
if not hypothesis or is_duplicate(hypothesis, index['facts']):
print(f"Skipped: duplicate or empty hypothesis")
return False
# Build new fact
new_fact = {
"fact": hypothesis,
"category": "pattern", # Synthesized connections become reusable patterns
"domain": "global", # Cross-domain synthesis is globally applicable
"confidence": round(plaus, 2),
"tags": synth.get('suggested_tags', []),
"related": source_ids,
"first_seen": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"last_confirmed": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"source_count": 1,
}
# Generate ID
new_fact['id'] = generate_id("global", "pattern", index['facts'])
# Update index
index['facts'].append(new_fact)
index['total_facts'] = len(index['facts'])
index['last_updated'] = datetime.now(timezone.utc).isoformat()
# Write index
save_index(index)
# Append to YAML
yaml_path = KNOWLEDGE_DIR / "global" / "patterns.yaml"
yaml_path.parent.mkdir(parents=True, exist_ok=True)
mode = 'a' if yaml_path.exists() else 'w'
with open(yaml_path, mode, encoding='utf-8') as f:
if mode == 'w':
f.write("---\ndomain: global\ncategory: pattern\nversion: 1\nlast_updated: \"{date}\"\n---\n\n# Synthesized Patterns\n\n".format(date=datetime.now(timezone.utc).strftime("%Y-%m-%d")))
f.write(f"\n- id: {new_fact['id']}\n")
f.write(f" fact: \"{hypothesis}\"\n")
f.write(f" confidence: {plaus}\n")
if new_fact['tags']:
f.write(f" tags: {json.dumps(new_fact['tags'])}\n")
f.write(f" related: {json.dumps(source_ids)}\n")
f.write(f" first_seen: \"{new_fact['first_seen']}\"\n")
f.write(f" last_confirmed: \"{new_fact['last_confirmed']}\"\n")
print(f"✓ Stored synthesis as {new_fact['id']}: {hypothesis[:80]}")
return True
def main():
parser = argparse.ArgumentParser(description="Zero-shot knowledge synthesis")
parser.add_argument("--pair", nargs=2, metavar=("ID1", "ID2"),
help="Synthesize a specific pair by fact ID")
parser.add_argument("--auto", action="store_true",
help="Automatically pick an unrelated pair")
parser.add_argument("--threshold", type=float, default=0.6,
help="Plausibility threshold for storage (default: 0.6)")
parser.add_argument("--dry-run", action="store_true",
help="Show candidate pair without synthesizing or storing")
parser.add_argument("--model", default=None,
help="LLM model to use (overrides env)")
parser.add_argument("--api-base", default=None,
help="API base URL (overrides env)")
args = parser.parse_args()
# Resolve API credentials
api_base = args.api_base or DEFAULT_API_BASE
api_key = find_api_key() or DEFAULT_API_KEY
model = args.model or DEFAULT_MODEL
if not args.dry_run and not args.pair and not args.auto:
print("ERROR: Must specify either --pair ID1 ID2 or --auto", file=sys.stderr)
parser.print_help()
sys.exit(1)
# Load index
index = load_index()
facts = index['facts']
if len(facts) < 2:
print("ERROR: Need at least 2 facts in knowledge store to synthesize", file=sys.stderr)
sys.exit(1)
# Select facts
f1, f2 = None, None
if args.pair:
id1, id2 = args.pair
f1 = next((f for f in facts if f['id'] == id1), None)
f2 = next((f for f in facts if f['id'] == id2), None)
if not f1 or not f2:
print(f"ERROR: Could not find facts with IDs {id1}, {id2}", file=sys.stderr)
sys.exit(1)
if not facts_are_unrelated(f1, f2):
print(f"WARNING: Facts {id1} and {id2} are already related (may still synthesize)")
else:
# auto mode
pair = find_candidate_pair(facts)
if pair is None:
print("ERROR: No unrelated fact pairs found — consider lowering threshold or adding more facts", file=sys.stderr)
sys.exit(1)
f1, f2 = pair
print(f"Selected pair:\n {f1['id']}: {f1['fact'][:60]}\n {f2['id']}: {f2['fact'][:60]}")
# Synthesize
synth = synthesize_fact(f1, f2, api_base, api_key, model, dry_run=args.dry_run)
if synth is None:
sys.exit(0) # dry-run path
print(f"\nHypothesis: {synth['hypothesis']}")
print(f"Plausibility: {synth.get('plausibility', 0.0):.2f}")
print(f"Bridging concepts: {synth.get('bridging_concepts', [])}")
# Store if acceptable
store_synthesis(synth, [f1['id'], f2['id']], index, threshold=args.threshold)
if __name__ == '__main__':
main()

View File

@@ -1,174 +0,0 @@
#!/usr/bin/env python3
"""
security_linter.py — Scan code for security vulnerabilities.
Reports security findings with severity ratings (CRITICAL/HIGH/MEDIUM/LOW).
Outputs a JSON security lint report.
Usage:
python3 security_linter.py --path .
python3 security_linter.py --path . --output security_report.json
python3 security_linter.py --path . --format json # default
python3 security_linter.py --path . --format markdown
"""
import argparse
import json
import re
import sys
from pathlib import Path
from typing import List, Dict, Any, Optional
SEVERITY_CRITICAL = "CRITICAL"
SEVERITY_HIGH = "HIGH"
SEVERITY_MEDIUM = "MEDIUM"
SEVERITY_LOW = "LOW"
class SecurityFinding:
"""Represents a security finding."""
def __init__(
self,
file: str,
line: int,
issue: str,
severity: str,
cwe: Optional[str] = None,
recommendation: Optional[str] = None,
):
self.file = file
self.line = line
self.issue = issue
self.severity = severity
self.cwe = cwe
self.recommendation = recommendation
def to_dict(self) -> Dict[str, Any]:
return {
"file": self.file,
"line": self.line,
"issue": self.issue,
"severity": self.severity,
"cwe": self.cwe,
"recommendation": self.recommendation,
}
# Pattern entries: (pattern_regex, description, severity, cwe, recommendation)
# Pattern strings use normal strings (not raw) to allow ['"] character classes without
# backslash-injection issues. \s and \b are escaped to give \s and \b in the actual regex.
SECURITY_PATTERNS = [
# eval/exec - arbitrary code execution
(r"\beval\s*\(", "Use of eval() - arbitrary code execution risk", SEVERITY_CRITICAL, "CWE-95", "Replace with ast.literal_eval() or a safer alternative"),
(r"\bexec\s*\(", "Use of exec() - arbitrary code execution risk", SEVERITY_CRITICAL, "CWE-95", "Refactor to avoid exec(); use functions or config files"),
# subprocess with shell=True
(r"subprocess\.(?:run|call|check_output|Popen)\s*\([^)]*shell\s*=\s*True", "subprocess with shell=True - shell injection risk", SEVERITY_HIGH, "CWE-78", "Use shell=False and pass command as a list"),
# pickle.loads - arbitrary code execution
(r"pickle\.loads?\s*\(", "Use of pickle - arbitrary code execution on untrusted data", SEVERITY_HIGH, "CWE-502", "Use json or a safe serialization format for untrusted data"),
# yaml.load without Loader
(r"yaml\.load\s*\(", "yaml.load() - unsafe deserialization", SEVERITY_HIGH, "CWE-502", "Use yaml.safe_load()"),
# tempfile.mktemp - insecure temp file creation
(r"tempfile\.mktemp\s*\(", "tempfile.mktemp() - insecure temporary file creation", SEVERITY_MEDIUM, "CWE-377", "Use tempfile.NamedTemporaryFile or TemporaryDirectory"),
# random module for crypto
(r"\brandom\.(?:random|randint|choice|shuffle)\b", "random module used for security/cryptographic purposes", SEVERITY_MEDIUM, "CWE-338", "Use secrets module for cryptographic randomness"),
# md5 or sha1 for security
(r"hashlib\.(?:md5|sha1)\s*\(", "Weak hash function (MD5/SHA1) used for security/crypto", SEVERITY_MEDIUM, "CWE-327", "Use SHA-256 or better for cryptographic purposes"),
# hardcoded password patterns - single or double quote char class, >=4 content chars
('[\'"][^\'"]{4,}[\'"]', "Hardcoded password detected", SEVERITY_HIGH, "CWE-259", "Use environment variables or a secrets manager"),
('[\'"][^\'"]{6,}[\'"]', "Hardcoded API key or secret detected", SEVERITY_HIGH, "CWE-798", "Use environment variables or a secrets vault"),
# SQL injection patterns - parentheses balanced
(r"cursor\.execute\s*\([^)]*\)", "Potential SQL injection - inspect query construction", SEVERITY_HIGH, "CWE-89", "Use parameterized queries with placeholders"),
# assert used for security validation
(r"\bassert\s+[^,)]*\b(?:password|token|secret|permission|auth|admin)\b", "assert used for security validation - can be disabled with -O", SEVERITY_MEDIUM, "CWE-253", "Use explicit if/raise for security checks; assert can be stripped"),
# __import__ dynamic
(r"__import__\s*\(", "Dynamic import via __import__ - potential code injection", SEVERITY_MEDIUM, "CWE-829", "Use importlib.import_module with validated module names"),
]
def scan_file(path: Path) -> List[SecurityFinding]:
findings = []
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
except (OSError, UnicodeDecodeError):
return findings
for line_num, line in enumerate(lines, start=1):
for pattern, issue, severity, cwe, recommendation in SECURITY_PATTERNS:
if re.search(pattern, line):
findings.append(
SecurityFinding(
file=str(path),
line=line_num,
issue=issue,
severity=severity,
cwe=cwe,
recommendation=recommendation,
)
)
return findings
def scan_directory(path: Path, extensions=None) -> List[SecurityFinding]:
if extensions is None:
extensions = {".py"}
findings = []
if not path.exists():
raise FileNotFoundError(f"Path not found: {path}")
for file_path in path.rglob("*"):
if file_path.is_file() and file_path.suffix in extensions:
findings.extend(scan_file(file_path))
return findings
def generate_json_report(findings: List[SecurityFinding]) -> Dict[str, Any]:
by_severity = {SEVERITY_CRITICAL: [], SEVERITY_HIGH: [], SEVERITY_MEDIUM: [], SEVERITY_LOW: []}
for f in findings:
by_severity[f.severity].append(f.to_dict())
severity_counts = {s: len(v) for s, v in by_severity.items()}
total = sum(severity_counts.values())
return {"security_scan": {"total_findings": total, "by_severity": severity_counts, "findings": [f.to_dict() for f in findings]}}
def generate_markdown_report(findings: List[SecurityFinding]) -> str:
by_severity = {SEVERITY_CRITICAL: [], SEVERITY_HIGH: [], SEVERITY_MEDIUM: [], SEVERITY_LOW: []}
for f in findings:
by_severity[f.severity].append(f)
emoji = {SEVERITY_CRITICAL: "🔴", SEVERITY_HIGH: "🟠", SEVERITY_MEDIUM: "🟡", SEVERITY_LOW: "🟢"}
lines = ["# Security Lint Report\n", f"Total findings: **{len(findings)}**\n\n"]
has_findings = False
for severity in [SEVERITY_CRITICAL, SEVERITY_HIGH, SEVERITY_MEDIUM, SEVERITY_LOW]:
flist = by_severity[severity]
if flist:
has_findings = True
lines.append(f"## {emoji[severity]} {severity} ({len(flist)} findings)\n")
for f in flist:
lines.append(f"- **{f.file}:{f.line}** — {f.issue}")
lines.append("")
if not has_findings:
lines.append("✅ No security issues found.\n")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Scan code for security vulnerabilities")
parser.add_argument("--path", type=Path, default=Path("."), help="Path to scan (file or directory)")
parser.add_argument("--output", "-o", type=Path, default=None, help="Output file")
parser.add_argument("--format", choices=["json", "markdown"], default="json", help="Output format (default: json)")
parser.add_argument("--extensions", type=str, default=".py", help="Comma-separated file extensions (default: .py)")
args = parser.parse_args()
exts = {e.strip() for e in args.extensions.split(",")}
findings = scan_directory(args.path, extensions=exts)
output = json.dumps(generate_json_report(findings), indent=2) if args.format == "json" else generate_markdown_report(findings)
if args.output:
args.output.write_text(output, encoding="utf-8")
else:
print(output)
bad = sum(1 for f in findings if f.severity in (SEVERITY_CRITICAL, SEVERITY_HIGH))
sys.exit(1 if bad > 0 else 0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,235 @@
#!/usr/bin/env python3
"""
Tests for knowledge_synthesizer.py — zero-shot knowledge synthesis pipeline.
Run with: python3 scripts/test_knowledge_synthesizer.py
Or via pytest: pytest scripts/test_knowledge_synthesizer.py
"""
import json
import os
import sys
import os
import tempfile
from pathlib import Path
# Add scripts dir to path for importing sibling module
SCRIPT_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(SCRIPT_DIR))
import importlib.util
spec = importlib.util.spec_from_file_location(
"ks", os.path.join(str(SCRIPT_DIR), "knowledge_synthesizer.py")
)
ks = importlib.util.module_from_spec(spec)
spec.loader.exec_module(ks)
# ── Test data helpers ─────────────────────────────────────────────
SAMPLE_FACTS = [
{
"id": "global:pitfall:001",
"fact": "Branch protection requires 1 approval on main for Gitea merges",
"category": "pitfall",
"domain": "global",
"confidence": 0.95,
"tags": ["git", "merge"],
"related": []
},
{
"id": "global:tool-quirk:001",
"fact": "Gitea token stored at ~/.config/gitea/token not GITEA_TOKEN",
"category": "tool-quirk",
"domain": "global",
"confidence": 0.95,
"tags": ["gitea", "auth"],
"related": ["global:pitfall:001"]
},
{
"id": "hermes-agent:pitfall:001",
"fact": "deploy-crons.py leaves jobs in mixed model format",
"category": "pitfall",
"domain": "hermes-agent",
"confidence": 0.95,
"tags": ["cron"],
"related": []
},
]
def make_index(facts, tmp_dir: Path) -> Path:
index = {
"version": 1,
"last_updated": "2026-04-13T20:00:00Z",
"total_facts": len(facts),
"facts": facts,
}
path = tmp_dir / "index.json"
with open(path, "w") as f:
json.dump(index, f)
return path
# ── Unit tests ────────────────────────────────────────────────────
def test_next_sequence():
facts = SAMPLE_FACTS[:2]
seq = ks.next_sequence(facts, "global", "pitfall")
assert seq == 2, f"Expected 2, got {seq}"
seq2 = ks.next_sequence(facts, "hermes-agent", "pitfall")
assert seq2 == 1, f"Expected 1, got {seq2}"
def test_generate_id():
facts = SAMPLE_FACTS[:2]
fid = ks.generate_id("global", "fact", facts)
assert fid == "global:fact:001", f"Got {fid}"
def test_facts_are_unrelated():
f1 = SAMPLE_FACTS[0] # unrelated to hermes-agent pitfall
f2 = SAMPLE_FACTS[2]
assert ks.facts_are_unrelated(f1, f2) is True
f3 = SAMPLE_FACTS[1] # related to f1
assert ks.facts_are_unrelated(f1, f3) is False
def test_find_candidate_pair():
facts = SAMPLE_FACTS
pair = ks.find_candidate_pair(facts)
assert pair is not None, "Should find an unrelated pair"
f1, f2 = pair
assert ks.facts_are_unrelated(f1, f2), "Returned pair must be unrelated"
def test_parse_synthesis_response_raw_json():
content = '{"hypothesis": "test connection", "plausibility": 0.8, "bridging_concepts": ["x"], "suggested_tags": ["a"]}'
result = ks.parse_synthesis_response(content)
assert result is not None
assert result["hypothesis"] == "test connection"
assert result["plausibility"] == 0.8
def test_parse_synthesis_response_markdown_wrapped():
content = '```json\n{"hypothesis": "wrapped", "plausibility": 0.5}\n```'
result = ks.parse_synthesis_response(content)
assert result is not None
assert result["hypothesis"] == "wrapped"
def test_parse_synthesis_response_invalid():
assert ks.parse_synthesis_response("not json") is None
assert ks.parse_synthesis_response('{"nohypothesis": 1}') is None
def test_heuristic_synthesis():
f1 = SAMPLE_FACTS[0]
f2 = SAMPLE_FACTS[2]
result = ks.heuristic_synthesis(f1, f2)
assert "hypothesis" in result
assert "plausibility" in result
assert result["plausibility"] == 0.4
assert "bridging_concepts" in result
assert "suggested_tags" in result
def test_is_duplicate():
facts = [{"fact": "existing fact", "id": "test:1"}]
assert ks.is_duplicate("existing fact", facts) is True
assert ks.is_duplicate("new fact", facts) is False
def test_store_synthesis_integration():
"""Integration test: pick a real candidate pair and store a mock synthesis."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
# Create fake knowledge dir with index
kdir = tmp_path / "knowledge"
kdir.mkdir()
index = {
"version": 1,
"last_updated": "2026-04-13T20:00:00Z",
"total_facts": 3,
"facts": SAMPLE_FACTS
}
with open(kdir / "index.json", "w") as f:
json.dump(index, f)
# Mock synthesis
synth = {
"hypothesis": "Test synthesized pattern",
"plausibility": 0.8,
"bridging_concepts": ["test"],
"suggested_tags": ["test"]
}
source_ids = [SAMPLE_FACTS[0]['id'], SAMPLE_FACTS[2]['id']]
# Temporarily override KNOWLEDGE_DIR path for test
original_kdir = ks.KNOWLEDGE_DIR
ks.KNOWLEDGE_DIR = kdir
try:
stored = ks.store_synthesis(synth, source_ids, index, threshold=0.5)
assert stored is True
assert index['total_facts'] == 4
new_fact = index['facts'][-1]
assert new_fact['fact'] == "Test synthesized pattern"
assert new_fact['category'] == "pattern"
assert new_fact['domain'] == "global"
assert new_fact['related'] == source_ids
assert new_fact['id'].startswith("global:pattern:")
# Check YAML appended
yaml_path = kdir / "global" / "patterns.yaml"
assert yaml_path.exists()
content = yaml_path.read_text()
assert "Test synthesized pattern" in content
finally:
ks.KNOWLEDGE_DIR = original_kdir
# ── Smoke test ────────────────────────────────────────────────────
def test_smoke_synthesizer_info():
"""Sanity check: script can at least load and report current knowledge state."""
index = ks.load_index()
total = index.get('total_facts', 0)
facts = index.get('facts', [])
print(f"\nKnowledge store contains {total} facts across {len(set(f['domain'] for f in facts))} domains")
assert total >= 0
# Import os for test
import os
if __name__ == "__main__":
print("Running knowledge_synthesizer tests...\n")
passed = 0
failed = 0
tests = [
test_next_sequence,
test_generate_id,
test_facts_are_unrelated,
test_find_candidate_pair,
test_parse_synthesis_response_raw_json,
test_parse_synthesis_response_markdown_wrapped,
test_parse_synthesis_response_invalid,
test_heuristic_synthesis,
test_is_duplicate,
test_store_synthesis_integration,
test_smoke_synthesizer_info,
]
for test in tests:
try:
test()
print(f"{test.__name__}")
passed += 1
except Exception as e:
import traceback; traceback.print_exc(); print(f"{test.__name__}: {e}")
failed += 1
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)

View File

@@ -1,95 +0,0 @@
#!/usr/bin/env python3
"""Tests for scripts/security_linter.py — Issue #158: 9.4 Security Linter."""
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from security_linter import (
scan_file,
scan_directory,
generate_json_report,
generate_markdown_report,
SEVERITY_CRITICAL,
SEVERITY_HIGH,
SEVERITY_MEDIUM,
SEVERITY_LOW,
)
def test_scan_file_detects_eval():
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write("result = eval(user_input)\n")
f.flush()
findings = scan_file(Path(f.name))
assert len(findings) >= 1
assert findings[0].severity == SEVERITY_CRITICAL
assert "eval" in findings[0].issue.lower()
def test_scan_file_detects_hardcoded_password():
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write("password = 'supersecret123'\n")
f.flush()
findings = scan_file(Path(f.name))
assert any(f.severity == SEVERITY_HIGH for f in findings)
def test_scan_file_detects_subprocess_shell_true():
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write("subprocess.run(cmd, shell=True)\n")
f.flush()
findings = scan_file(Path(f.name))
assert any(f.severity == SEVERITY_HIGH and "shell" in f.issue.lower() for f in findings)
def test_scan_file_detects_pickle():
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write("data = pickle.loads(raw)\n")
f.flush()
findings = scan_file(Path(f.name))
assert any(f.severity == SEVERITY_HIGH and "pickle" in f.issue.lower() for f in findings)
def test_scan_file_detects_yaml_load():
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write("config = yaml.load(stream)\n")
f.flush()
findings = scan_file(Path(f.name))
assert any("yaml.load" in f.issue.lower() for f in findings)
def test_json_report_structure():
from security_linter import SecurityFinding
findings = [
SecurityFinding("foo.py", 1, "eval() used", SEVERITY_CRITICAL, "CWE-95", "Use ast.literal_eval"),
SecurityFinding("bar.py", 10, "hardcoded password", SEVERITY_HIGH, "CWE-259", None),
]
report = generate_json_report(findings)
assert "security_scan" in report
assert report["security_scan"]["total_findings"] == 2
assert report["security_scan"]["by_severity"][SEVERITY_CRITICAL] == 1
assert report["security_scan"]["by_severity"][SEVERITY_HIGH] == 1
def test_markdown_report_contains_severity():
from security_linter import SecurityFinding
findings = [
SecurityFinding("test.py", 1, "eval() used", SEVERITY_CRITICAL, "CWE-95", "Use ast.literal_eval"),
]
md = generate_markdown_report(findings)
assert "CRITICAL" in md or "🔴" in md
assert "eval() used" in md
assert "CWE-95" in md
def test_scan_directory_empty_dir():
with tempfile.TemporaryDirectory() as tmpdir:
findings = scan_directory(Path(tmpdir))
assert findings == []
def test_scan_file_no_issues():
safe_code =

View File

@@ -0,0 +1,47 @@
# Knowledge Synthesis Prompt
## System Prompt
You are a knowledge synthesis engine. Given two facts, you generate a novel hypothesis
that connects them in a way no human would typically link — a zero-shot creative leap.
## Task
FACT A:
{fact_a}
FACT B:
{fact_b}
Generate a single JSON object:
{
"hypothesis": "one concise sentence linking the two facts as a new, testable insight",
"plausibility": 0.0-1.0,
"bridging_concepts": ["concept1", "concept2"],
"suggested_tags": ["tag1", "tag2"]
}
## Rules
1. The hypothesis must be a logical consequence of combining both facts.
2. DO NOT restate either fact — produce genuinely new insight.
3. Plausibility should reflect confidence given only these two facts.
4. If no meaningful connection exists, return {"hypothesis":"","plausibility":0.0}.
5. Output ONLY valid JSON — no markdown, no explanation.
## Examples
Input facts:
- "Gitea PR creation requires branch protection approval (1+) on main"
- "Git push hangs on large repos (pack.windowMemory=100m)"
Hypothesis output:
{
"hypothesis": "Branch protection triggers checks that inflate pack size, causing git push to hang on large repos",
"plausibility": 0.65,
"bridging_concepts": ["git", "gitea", "branch-protection", "push"],
"suggested_tags": ["git", "gitea", "performance"]
}
Output ONLY the JSON object.