#!/usr/bin/env python3 """ Knowledge Freshness Cron — Detect stale entries from code changes (Issue #200) Automatically detects when knowledge entries become stale due to code changes. Detection Method: 1. Track source file hash alongside knowledge entry 2. Compare current file hashes vs stored 3. Mismatch → flag entry as potentially stale 4. Report stale entries and optionally re-extract Usage: python3 scripts/freshness.py --knowledge-dir knowledge/ python3 scripts/freshness.py --knowledge-dir knowledge/ --json python3 scripts/freshness.py --knowledge-dir knowledge/ --repo /path/to/repo python3 scripts/freshness.py --knowledge-dir knowledge/ --auto-reextract """ import argparse import hashlib import json import os import subprocess import sys import yaml from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Any, Optional, Tuple def compute_file_hash(filepath: str) -> Optional[str]: """Compute SHA-256 hash of a file. Returns None if file doesn't exist.""" try: with open(filepath, "rb") as f: return "sha256:" + hashlib.sha256(f.read()).hexdigest() except (FileNotFoundError, IsADirectoryError, PermissionError): return None def get_git_file_changes(repo_path: str, days: int = 1) -> Dict[str, List[str]]: """ Get files changed in git in the last N days. Returns dict with 'modified', 'added', 'deleted' lists of file paths. """ changes = {"modified": [], "added": [], "deleted": []} try: # Get commits from last N days cmd = [ "git", "-C", repo_path, "log", f"--since={days} days ago", "--name-status", "--pretty=format:", "--diff-filter=MAD" ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: return changes for line in result.stdout.splitlines(): line = line.strip() if not line: continue parts = line.split('\t', 1) if len(parts) != 2: continue status, filepath = parts if status == 'M': changes["modified"].append(filepath) elif status == 'A': changes["added"].append(filepath) elif status == 'D': changes["deleted"].append(filepath) except (subprocess.TimeoutExpired, FileNotFoundError): pass # Deduplicate for key in changes: changes[key] = list(set(changes[key])) return changes def load_knowledge_entries(knowledge_dir: str) -> List[Dict[str, Any]]: """ Load knowledge entries from YAML files in the knowledge directory. Supports: - knowledge/index.json (legacy format) - knowledge/global/*.yaml - knowledge/repos/*.yaml - knowledge/agents/*.yaml """ entries = [] # Load from index.json if exists index_path = os.path.join(knowledge_dir, "index.json") if os.path.exists(index_path): try: with open(index_path) as f: data = json.load(f) for fact in data.get("facts", []): entries.append({ "source": "index.json", "fact": fact.get("fact", ""), "source_file": fact.get("source_file"), "source_hash": fact.get("source_hash"), "category": fact.get("category", "unknown"), "confidence": fact.get("confidence", 0.5) }) except (json.JSONDecodeError, KeyError): pass # Load from YAML files for subdir in ["global", "repos", "agents"]: subdir_path = os.path.join(knowledge_dir, subdir) if not os.path.isdir(subdir_path): continue for filename in os.listdir(subdir_path): if not filename.endswith((".yaml", ".yml")): continue filepath = os.path.join(subdir_path, filename) try: with open(filepath) as f: data = yaml.safe_load(f) if not data or not isinstance(data, dict): continue # Extract entries from YAML structure for key, value in data.items(): if isinstance(value, list): for item in value: if isinstance(item, dict): entries.append({ "source": f"{subdir}/{filename}", "fact": item.get("description", item.get("fact", "")), "source_file": item.get("source_file"), "source_hash": item.get("source_hash"), "category": item.get("category", "unknown"), "confidence": item.get("confidence", 0.5) }) elif isinstance(value, dict): entries.append({ "source": f"{subdir}/{filename}", "fact": value.get("description", value.get("fact", "")), "source_file": value.get("source_file"), "source_hash": value.get("source_hash"), "category": value.get("category", "unknown"), "confidence": value.get("confidence", 0.5) }) except (yaml.YAMLError, IOError): pass return entries def check_freshness(knowledge_dir: str, repo_root: str = ".", days: int = 1) -> Dict[str, Any]: """ Check freshness of knowledge entries against recent code changes. Returns: { "timestamp": ISO timestamp, "total_entries": int, "stale_entries": [...], "fresh_entries": [...], "git_changes": {...}, "summary": {...} } """ entries = load_knowledge_entries(knowledge_dir) git_changes = get_git_file_changes(repo_root, days) stale_entries = [] fresh_entries = [] for entry in entries: source_file = entry.get("source_file") if not source_file: # Entry without source file reference fresh_entries.append({**entry, "status": "no_source"}) continue # Check if source file was recently modified is_stale = False reason = "" if source_file in git_changes["modified"]: is_stale = True reason = "source_modified" elif source_file in git_changes["deleted"]: is_stale = True reason = "source_deleted" elif source_file in git_changes["added"]: is_stale = True reason = "source_added" # Also check hash if available stored_hash = entry.get("source_hash") if stored_hash: full_path = os.path.join(repo_root, source_file) current_hash = compute_file_hash(full_path) if current_hash is None: is_stale = True reason = "source_missing" elif current_hash != stored_hash: is_stale = True reason = "hash_mismatch" if is_stale: stale_entries.append({ **entry, "status": "stale", "reason": reason }) else: fresh_entries.append({**entry, "status": "fresh"}) # Compute summary total = len(entries) stale_count = len(stale_entries) fresh_count = len(fresh_entries) # Group stale entries by reason stale_by_reason = {} for entry in stale_entries: reason = entry.get("reason", "unknown") if reason not in stale_by_reason: stale_by_reason[reason] = 0 stale_by_reason[reason] += 1 return { "timestamp": datetime.now(timezone.utc).isoformat(), "total_entries": total, "stale_entries": stale_entries, "fresh_entries": fresh_entries, "git_changes": git_changes, "summary": { "total": total, "stale": stale_count, "fresh": fresh_count, "stale_percentage": round(stale_count / total * 100, 1) if total > 0 else 0, "stale_by_reason": stale_by_reason, "git_changes_summary": { "modified": len(git_changes["modified"]), "added": len(git_changes["added"]), "deleted": len(git_changes["deleted"]) } } } def update_stale_hashes(knowledge_dir: str, repo_root: str = ".") -> int: """ Update hashes for stale entries. Returns count of updated entries. """ entries = load_knowledge_entries(knowledge_dir) updated = 0 # This is a simplified version - in practice, you'd need to # write back to the specific YAML files for entry in entries: source_file = entry.get("source_file") if not source_file: continue full_path = os.path.join(repo_root, source_file) current_hash = compute_file_hash(full_path) if current_hash and entry.get("source_hash") != current_hash: # Mark for update (in practice, you'd write back to the file) updated += 1 return updated def format_report(result: Dict[str, Any], max_items: int = 20) -> str: """Format freshness check results as a human-readable report.""" timestamp = result["timestamp"] summary = result["summary"] stale_entries = result["stale_entries"] git_changes = result["git_changes"] lines = [ "Knowledge Freshness Report", "=" * 50, f"Generated: {timestamp}", f"Total entries: {summary['total']}", f"Stale entries: {summary['stale']} ({summary['stale_percentage']}%)", f"Fresh entries: {summary['fresh']}", "" ] # Git changes summary lines.extend([ "Git Changes (last 24h):", f" Modified: {len(git_changes['modified'])} files", f" Added: {len(git_changes['added'])} files", f" Deleted: {len(git_changes['deleted'])} files", "" ]) # Stale entries by reason if summary.get("stale_by_reason"): lines.extend([ "Stale Entries by Reason:", "" ]) for reason, count in summary["stale_by_reason"].items(): lines.append(f" {reason}: {count}") lines.append("") # List stale entries if stale_entries: lines.extend([ "Stale Entries:", "" ]) for i, entry in enumerate(stale_entries[:max_items], 1): source = entry.get("source_file", "?") reason = entry.get("reason", "unknown") fact = entry.get("fact", "")[:60] lines.append(f"{i:2d}. [{reason}] {source}") if fact: lines.append(f" {fact}") if len(stale_entries) > max_items: lines.append(f"\n... and {len(stale_entries) - max_items} more") else: lines.append("No stale entries found. All knowledge is fresh!") return "\n".join(lines) def main(): parser = argparse.ArgumentParser( description="Knowledge Freshness Cron — detect stale entries from code changes") parser.add_argument("--knowledge-dir", required=True, help="Path to knowledge directory") parser.add_argument("--repo", default=".", help="Path to repository for git change detection") parser.add_argument("--days", type=int, default=1, help="Number of days to check for git changes (default: 1)") parser.add_argument("--json", action="store_true", help="Output as JSON instead of human-readable") parser.add_argument("--max", type=int, default=20, help="Maximum stale entries to show (default: 20)") parser.add_argument("--auto-reextract", action="store_true", help="Auto-re-extract knowledge for stale entries") args = parser.parse_args() if not os.path.isdir(args.knowledge_dir): print(f"Error: {args.knowledge_dir} is not a directory", file=sys.stderr) sys.exit(1) if not os.path.isdir(args.repo): print(f"Error: {args.repo} is not a directory", file=sys.stderr) sys.exit(1) result = check_freshness(args.knowledge_dir, args.repo, args.days) if args.json: print(json.dumps(result, indent=2)) else: print(format_report(result, args.max)) # Auto-re-extract if requested if args.auto_reextract and result["stale_entries"]: print(f"\nAuto-re-extracting {len(result['stale_entries'])} stale entries...") # In a real implementation, this would call the harvester print("(Auto-re-extraction not yet implemented)") if __name__ == "__main__": main()