From 6dd354385f2e4b6053ad49937dbe26858b7742cf Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Tue, 21 Apr 2026 11:57:53 +0000 Subject: [PATCH 1/2] feat: Add freshness.py (#200) --- scripts/freshness.py | 387 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 387 insertions(+) create mode 100644 scripts/freshness.py diff --git a/scripts/freshness.py b/scripts/freshness.py new file mode 100644 index 0000000..3cbcceb --- /dev/null +++ b/scripts/freshness.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python3 +""" +Knowledge Freshness Cron — Detect stale entries from code changes (Issue #200) + +Automatically detects when knowledge entries become stale due to code changes. + +Detection Method: +1. Track source file hash alongside knowledge entry +2. Compare current file hashes vs stored +3. Mismatch → flag entry as potentially stale +4. Report stale entries and optionally re-extract + +Usage: + python3 scripts/freshness.py --knowledge-dir knowledge/ + python3 scripts/freshness.py --knowledge-dir knowledge/ --json + python3 scripts/freshness.py --knowledge-dir knowledge/ --repo /path/to/repo + python3 scripts/freshness.py --knowledge-dir knowledge/ --auto-reextract +""" + +import argparse +import hashlib +import json +import os +import subprocess +import sys +import yaml +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, List, Any, Optional, Tuple + + +def compute_file_hash(filepath: str) -> Optional[str]: + """Compute SHA-256 hash of a file. Returns None if file doesn't exist.""" + try: + with open(filepath, "rb") as f: + return "sha256:" + hashlib.sha256(f.read()).hexdigest() + except (FileNotFoundError, IsADirectoryError, PermissionError): + return None + + +def get_git_file_changes(repo_path: str, days: int = 1) -> Dict[str, List[str]]: + """ + Get files changed in git in the last N days. + + Returns dict with 'modified', 'added', 'deleted' lists of file paths. + """ + changes = {"modified": [], "added": [], "deleted": []} + + try: + # Get commits from last N days + cmd = [ + "git", "-C", repo_path, "log", + f"--since={days} days ago", + "--name-status", + "--pretty=format:", + "--diff-filter=MAD" + ] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + + if result.returncode != 0: + return changes + + for line in result.stdout.splitlines(): + line = line.strip() + if not line: + continue + + parts = line.split('\t', 1) + if len(parts) != 2: + continue + + status, filepath = parts + if status == 'M': + changes["modified"].append(filepath) + elif status == 'A': + changes["added"].append(filepath) + elif status == 'D': + changes["deleted"].append(filepath) + + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + # Deduplicate + for key in changes: + changes[key] = list(set(changes[key])) + + return changes + + +def load_knowledge_entries(knowledge_dir: str) -> List[Dict[str, Any]]: + """ + Load knowledge entries from YAML files in the knowledge directory. + + Supports: + - knowledge/index.json (legacy format) + - knowledge/global/*.yaml + - knowledge/repos/*.yaml + - knowledge/agents/*.yaml + """ + entries = [] + + # Load from index.json if exists + index_path = os.path.join(knowledge_dir, "index.json") + if os.path.exists(index_path): + try: + with open(index_path) as f: + data = json.load(f) + for fact in data.get("facts", []): + entries.append({ + "source": "index.json", + "fact": fact.get("fact", ""), + "source_file": fact.get("source_file"), + "source_hash": fact.get("source_hash"), + "category": fact.get("category", "unknown"), + "confidence": fact.get("confidence", 0.5) + }) + except (json.JSONDecodeError, KeyError): + pass + + # Load from YAML files + for subdir in ["global", "repos", "agents"]: + subdir_path = os.path.join(knowledge_dir, subdir) + if not os.path.isdir(subdir_path): + continue + + for filename in os.listdir(subdir_path): + if not filename.endswith((".yaml", ".yml")): + continue + + filepath = os.path.join(subdir_path, filename) + try: + with open(filepath) as f: + data = yaml.safe_load(f) + + if not data or not isinstance(data, dict): + continue + + # Extract entries from YAML structure + for key, value in data.items(): + if isinstance(value, list): + for item in value: + if isinstance(item, dict): + entries.append({ + "source": f"{subdir}/{filename}", + "fact": item.get("description", item.get("fact", "")), + "source_file": item.get("source_file"), + "source_hash": item.get("source_hash"), + "category": item.get("category", "unknown"), + "confidence": item.get("confidence", 0.5) + }) + elif isinstance(value, dict): + entries.append({ + "source": f"{subdir}/{filename}", + "fact": value.get("description", value.get("fact", "")), + "source_file": value.get("source_file"), + "source_hash": value.get("source_hash"), + "category": value.get("category", "unknown"), + "confidence": value.get("confidence", 0.5) + }) + except (yaml.YAMLError, IOError): + pass + + return entries + + +def check_freshness(knowledge_dir: str, repo_root: str = ".", + days: int = 1) -> Dict[str, Any]: + """ + Check freshness of knowledge entries against recent code changes. + + Returns: + { + "timestamp": ISO timestamp, + "total_entries": int, + "stale_entries": [...], + "fresh_entries": [...], + "git_changes": {...}, + "summary": {...} + } + """ + entries = load_knowledge_entries(knowledge_dir) + git_changes = get_git_file_changes(repo_root, days) + + stale_entries = [] + fresh_entries = [] + + for entry in entries: + source_file = entry.get("source_file") + if not source_file: + # Entry without source file reference + fresh_entries.append({**entry, "status": "no_source"}) + continue + + # Check if source file was recently modified + is_stale = False + reason = "" + + if source_file in git_changes["modified"]: + is_stale = True + reason = "source_modified" + elif source_file in git_changes["deleted"]: + is_stale = True + reason = "source_deleted" + elif source_file in git_changes["added"]: + is_stale = True + reason = "source_added" + + # Also check hash if available + stored_hash = entry.get("source_hash") + if stored_hash: + full_path = os.path.join(repo_root, source_file) + current_hash = compute_file_hash(full_path) + + if current_hash is None: + is_stale = True + reason = "source_missing" + elif current_hash != stored_hash: + is_stale = True + reason = "hash_mismatch" + + if is_stale: + stale_entries.append({ + **entry, + "status": "stale", + "reason": reason + }) + else: + fresh_entries.append({**entry, "status": "fresh"}) + + # Compute summary + total = len(entries) + stale_count = len(stale_entries) + fresh_count = len(fresh_entries) + + # Group stale entries by reason + stale_by_reason = {} + for entry in stale_entries: + reason = entry.get("reason", "unknown") + if reason not in stale_by_reason: + stale_by_reason[reason] = 0 + stale_by_reason[reason] += 1 + + return { + "timestamp": datetime.now(timezone.utc).isoformat(), + "total_entries": total, + "stale_entries": stale_entries, + "fresh_entries": fresh_entries, + "git_changes": git_changes, + "summary": { + "total": total, + "stale": stale_count, + "fresh": fresh_count, + "stale_percentage": round(stale_count / total * 100, 1) if total > 0 else 0, + "stale_by_reason": stale_by_reason, + "git_changes_summary": { + "modified": len(git_changes["modified"]), + "added": len(git_changes["added"]), + "deleted": len(git_changes["deleted"]) + } + } + } + + +def update_stale_hashes(knowledge_dir: str, repo_root: str = ".") -> int: + """ + Update hashes for stale entries. Returns count of updated entries. + """ + entries = load_knowledge_entries(knowledge_dir) + updated = 0 + + # This is a simplified version - in practice, you'd need to + # write back to the specific YAML files + for entry in entries: + source_file = entry.get("source_file") + if not source_file: + continue + + full_path = os.path.join(repo_root, source_file) + current_hash = compute_file_hash(full_path) + + if current_hash and entry.get("source_hash") != current_hash: + # Mark for update (in practice, you'd write back to the file) + updated += 1 + + return updated + + +def format_report(result: Dict[str, Any], max_items: int = 20) -> str: + """Format freshness check results as a human-readable report.""" + timestamp = result["timestamp"] + summary = result["summary"] + stale_entries = result["stale_entries"] + git_changes = result["git_changes"] + + lines = [ + "Knowledge Freshness Report", + "=" * 50, + f"Generated: {timestamp}", + f"Total entries: {summary['total']}", + f"Stale entries: {summary['stale']} ({summary['stale_percentage']}%)", + f"Fresh entries: {summary['fresh']}", + "" + ] + + # Git changes summary + lines.extend([ + "Git Changes (last 24h):", + f" Modified: {len(git_changes['modified'])} files", + f" Added: {len(git_changes['added'])} files", + f" Deleted: {len(git_changes['deleted'])} files", + "" + ]) + + # Stale entries by reason + if summary.get("stale_by_reason"): + lines.extend([ + "Stale Entries by Reason:", + "" + ]) + for reason, count in summary["stale_by_reason"].items(): + lines.append(f" {reason}: {count}") + lines.append("") + + # List stale entries + if stale_entries: + lines.extend([ + "Stale Entries:", + "" + ]) + for i, entry in enumerate(stale_entries[:max_items], 1): + source = entry.get("source_file", "?") + reason = entry.get("reason", "unknown") + fact = entry.get("fact", "")[:60] + lines.append(f"{i:2d}. [{reason}] {source}") + if fact: + lines.append(f" {fact}") + + if len(stale_entries) > max_items: + lines.append(f"\n... and {len(stale_entries) - max_items} more") + else: + lines.append("No stale entries found. All knowledge is fresh!") + + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser( + description="Knowledge Freshness Cron — detect stale entries from code changes") + parser.add_argument("--knowledge-dir", required=True, + help="Path to knowledge directory") + parser.add_argument("--repo", default=".", + help="Path to repository for git change detection") + parser.add_argument("--days", type=int, default=1, + help="Number of days to check for git changes (default: 1)") + parser.add_argument("--json", action="store_true", + help="Output as JSON instead of human-readable") + parser.add_argument("--max", type=int, default=20, + help="Maximum stale entries to show (default: 20)") + parser.add_argument("--auto-reextract", action="store_true", + help="Auto-re-extract knowledge for stale entries") + + args = parser.parse_args() + + if not os.path.isdir(args.knowledge_dir): + print(f"Error: {args.knowledge_dir} is not a directory", file=sys.stderr) + sys.exit(1) + + if not os.path.isdir(args.repo): + print(f"Error: {args.repo} is not a directory", file=sys.stderr) + sys.exit(1) + + result = check_freshness(args.knowledge_dir, args.repo, args.days) + + if args.json: + print(json.dumps(result, indent=2)) + else: + print(format_report(result, args.max)) + + # Auto-re-extract if requested + if args.auto_reextract and result["stale_entries"]: + print(f"\nAuto-re-extracting {len(result['stale_entries'])} stale entries...") + # In a real implementation, this would call the harvester + print("(Auto-re-extraction not yet implemented)") + + +if __name__ == "__main__": + main() \ No newline at end of file From baa2c84c3f2480b4254202bbedcc98ee1761575d Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Tue, 21 Apr 2026 11:57:54 +0000 Subject: [PATCH 2/2] feat: Add test_freshness.py (#200) --- tests/test_freshness.py | 227 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 227 insertions(+) create mode 100644 tests/test_freshness.py diff --git a/tests/test_freshness.py b/tests/test_freshness.py new file mode 100644 index 0000000..0568664 --- /dev/null +++ b/tests/test_freshness.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python3 +"""Tests for scripts/freshness.py — 8 tests.""" + +import json +import os +import sys +import tempfile + +sys.path.insert(0, os.path.join(os.path.dirname(__file__) or ".", "..")) +import importlib.util +spec = importlib.util.spec_from_file_location( + "freshness", os.path.join(os.path.dirname(__file__) or ".", "..", "scripts", "freshness.py")) +mod = importlib.util.module_from_spec(spec) +spec.loader.exec_module(mod) + +compute_file_hash = mod.compute_file_hash +check_freshness = mod.check_freshness +load_knowledge_entries = mod.load_knowledge_entries + + +def test_compute_file_hash(): + """File hash should be computed correctly.""" + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write("test content") + f.flush() + h = compute_file_hash(f.name) + assert h is not None + assert h.startswith("sha256:") + os.unlink(f.name) + print("PASS: test_compute_file_hash") + + +def test_compute_file_hash_nonexistent(): + """Nonexistent file should return None.""" + h = compute_file_hash("/nonexistent/file.txt") + assert h is None + print("PASS: test_compute_file_hash_nonexistent") + + +def test_load_knowledge_entries_empty(): + """Empty knowledge dir should return empty list.""" + with tempfile.TemporaryDirectory() as tmpdir: + entries = load_knowledge_entries(tmpdir) + assert entries == [] + print("PASS: test_load_knowledge_entries_empty") + + +def test_load_knowledge_entries_from_index(): + """Should load entries from index.json.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create index.json + index_path = os.path.join(tmpdir, "index.json") + with open(index_path, "w") as f: + json.dump({ + "facts": [ + { + "fact": "Test fact", + "source_file": "test.py", + "source_hash": "sha256:abc123", + "category": "fact", + "confidence": 0.9 + } + ] + }, f) + + entries = load_knowledge_entries(tmpdir) + assert len(entries) == 1 + assert entries[0]["fact"] == "Test fact" + assert entries[0]["source_file"] == "test.py" + print("PASS: test_load_knowledge_entries_from_index") + + +def test_load_knowledge_entries_from_yaml(): + """Should load entries from YAML files.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create global directory + global_dir = os.path.join(tmpdir, "global") + os.makedirs(global_dir) + + # Create YAML file + yaml_path = os.path.join(global_dir, "test.yaml") + with open(yaml_path, "w") as f: + f.write(""" +pitfalls: + - description: "Test pitfall" + source_file: "test.py" + source_hash: "sha256:def456" + category: "pitfall" + confidence: 0.8 +""") + + entries = load_knowledge_entries(tmpdir) + assert len(entries) == 1 + assert entries[0]["fact"] == "Test pitfall" + assert entries[0]["category"] == "pitfall" + print("PASS: test_load_knowledge_entries_from_yaml") + + +def test_check_freshness_no_changes(): + """With no source file reference, entries should be counted correctly.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create knowledge dir + knowledge_dir = os.path.join(tmpdir, "knowledge") + os.makedirs(knowledge_dir) + + # Create repo dir + repo_dir = os.path.join(tmpdir, "repo") + os.makedirs(repo_dir) + + # Create index.json with entry that has no source_file + index_path = os.path.join(knowledge_dir, "index.json") + with open(index_path, "w") as f: + json.dump({ + "facts": [ + { + "fact": "General knowledge", + "category": "fact", + "confidence": 0.9 + # No source_file or source_hash + } + ] + }, f) + + result = check_freshness(knowledge_dir, repo_dir, days=1) + + # Entry without source_file should be counted as "fresh" (no_source status) + assert result["summary"]["total"] == 1 + assert result["summary"]["stale"] == 0 + assert result["summary"]["fresh"] == 1 + assert result["fresh_entries"][0]["status"] == "no_source" + print("PASS: test_check_freshness_no_changes") + + +def test_check_freshness_with_hash_mismatch(): + """Hash mismatch should mark entry as stale.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create knowledge dir + knowledge_dir = os.path.join(tmpdir, "knowledge") + os.makedirs(knowledge_dir) + + # Create repo dir with a file + repo_dir = os.path.join(tmpdir, "repo") + os.makedirs(repo_dir) + + test_file = os.path.join(repo_dir, "test.py") + with open(test_file, "w") as f: + f.write("print('hello')") + + # Create index.json with wrong hash + index_path = os.path.join(knowledge_dir, "index.json") + with open(index_path, "w") as f: + json.dump({ + "facts": [ + { + "fact": "Test fact", + "source_file": "test.py", + "source_hash": "sha256:wronghash", + "category": "fact", + "confidence": 0.9 + } + ] + }, f) + + # Initialize git repo + os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null") + + result = check_freshness(knowledge_dir, repo_dir, days=1) + + assert result["summary"]["total"] == 1 + assert result["summary"]["stale"] == 1 + assert result["summary"]["fresh"] == 0 + assert result["stale_entries"][0]["reason"] == "hash_mismatch" + print("PASS: test_check_freshness_with_hash_mismatch") + + +def test_check_freshness_missing_source(): + """Missing source file should mark entry as stale.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create knowledge dir + knowledge_dir = os.path.join(tmpdir, "knowledge") + os.makedirs(knowledge_dir) + + # Create repo dir (without the referenced file) + repo_dir = os.path.join(tmpdir, "repo") + os.makedirs(repo_dir) + + # Create index.json referencing nonexistent file + index_path = os.path.join(knowledge_dir, "index.json") + with open(index_path, "w") as f: + json.dump({ + "facts": [ + { + "fact": "Test fact", + "source_file": "nonexistent.py", + "source_hash": "sha256:abc123", + "category": "fact", + "confidence": 0.9 + } + ] + }, f) + + # Initialize git repo + os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null") + + result = check_freshness(knowledge_dir, repo_dir, days=1) + + assert result["summary"]["total"] == 1 + assert result["summary"]["stale"] == 1 + assert result["summary"]["fresh"] == 0 + assert result["stale_entries"][0]["reason"] == "source_missing" + print("PASS: test_check_freshness_missing_source") + + +def run_all(): + test_compute_file_hash() + test_compute_file_hash_nonexistent() + test_load_knowledge_entries_empty() + test_load_knowledge_entries_from_index() + test_load_knowledge_entries_from_yaml() + test_check_freshness_no_changes() + test_check_freshness_with_hash_mismatch() + test_check_freshness_missing_source() + print("\nAll 8 tests passed!") + + +if __name__ == "__main__": + run_all() \ No newline at end of file