diff --git a/scripts/freshness.py b/scripts/freshness.py new file mode 100644 index 0000000..3cbcceb --- /dev/null +++ b/scripts/freshness.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python3 +""" +Knowledge Freshness Cron — Detect stale entries from code changes (Issue #200) + +Automatically detects when knowledge entries become stale due to code changes. + +Detection Method: +1. Track source file hash alongside knowledge entry +2. Compare current file hashes vs stored +3. Mismatch → flag entry as potentially stale +4. Report stale entries and optionally re-extract + +Usage: + python3 scripts/freshness.py --knowledge-dir knowledge/ + python3 scripts/freshness.py --knowledge-dir knowledge/ --json + python3 scripts/freshness.py --knowledge-dir knowledge/ --repo /path/to/repo + python3 scripts/freshness.py --knowledge-dir knowledge/ --auto-reextract +""" + +import argparse +import hashlib +import json +import os +import subprocess +import sys +import yaml +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, List, Any, Optional, Tuple + + +def compute_file_hash(filepath: str) -> Optional[str]: + """Compute SHA-256 hash of a file. Returns None if file doesn't exist.""" + try: + with open(filepath, "rb") as f: + return "sha256:" + hashlib.sha256(f.read()).hexdigest() + except (FileNotFoundError, IsADirectoryError, PermissionError): + return None + + +def get_git_file_changes(repo_path: str, days: int = 1) -> Dict[str, List[str]]: + """ + Get files changed in git in the last N days. + + Returns dict with 'modified', 'added', 'deleted' lists of file paths. + """ + changes = {"modified": [], "added": [], "deleted": []} + + try: + # Get commits from last N days + cmd = [ + "git", "-C", repo_path, "log", + f"--since={days} days ago", + "--name-status", + "--pretty=format:", + "--diff-filter=MAD" + ] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + + if result.returncode != 0: + return changes + + for line in result.stdout.splitlines(): + line = line.strip() + if not line: + continue + + parts = line.split('\t', 1) + if len(parts) != 2: + continue + + status, filepath = parts + if status == 'M': + changes["modified"].append(filepath) + elif status == 'A': + changes["added"].append(filepath) + elif status == 'D': + changes["deleted"].append(filepath) + + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + # Deduplicate + for key in changes: + changes[key] = list(set(changes[key])) + + return changes + + +def load_knowledge_entries(knowledge_dir: str) -> List[Dict[str, Any]]: + """ + Load knowledge entries from YAML files in the knowledge directory. + + Supports: + - knowledge/index.json (legacy format) + - knowledge/global/*.yaml + - knowledge/repos/*.yaml + - knowledge/agents/*.yaml + """ + entries = [] + + # Load from index.json if exists + index_path = os.path.join(knowledge_dir, "index.json") + if os.path.exists(index_path): + try: + with open(index_path) as f: + data = json.load(f) + for fact in data.get("facts", []): + entries.append({ + "source": "index.json", + "fact": fact.get("fact", ""), + "source_file": fact.get("source_file"), + "source_hash": fact.get("source_hash"), + "category": fact.get("category", "unknown"), + "confidence": fact.get("confidence", 0.5) + }) + except (json.JSONDecodeError, KeyError): + pass + + # Load from YAML files + for subdir in ["global", "repos", "agents"]: + subdir_path = os.path.join(knowledge_dir, subdir) + if not os.path.isdir(subdir_path): + continue + + for filename in os.listdir(subdir_path): + if not filename.endswith((".yaml", ".yml")): + continue + + filepath = os.path.join(subdir_path, filename) + try: + with open(filepath) as f: + data = yaml.safe_load(f) + + if not data or not isinstance(data, dict): + continue + + # Extract entries from YAML structure + for key, value in data.items(): + if isinstance(value, list): + for item in value: + if isinstance(item, dict): + entries.append({ + "source": f"{subdir}/{filename}", + "fact": item.get("description", item.get("fact", "")), + "source_file": item.get("source_file"), + "source_hash": item.get("source_hash"), + "category": item.get("category", "unknown"), + "confidence": item.get("confidence", 0.5) + }) + elif isinstance(value, dict): + entries.append({ + "source": f"{subdir}/{filename}", + "fact": value.get("description", value.get("fact", "")), + "source_file": value.get("source_file"), + "source_hash": value.get("source_hash"), + "category": value.get("category", "unknown"), + "confidence": value.get("confidence", 0.5) + }) + except (yaml.YAMLError, IOError): + pass + + return entries + + +def check_freshness(knowledge_dir: str, repo_root: str = ".", + days: int = 1) -> Dict[str, Any]: + """ + Check freshness of knowledge entries against recent code changes. + + Returns: + { + "timestamp": ISO timestamp, + "total_entries": int, + "stale_entries": [...], + "fresh_entries": [...], + "git_changes": {...}, + "summary": {...} + } + """ + entries = load_knowledge_entries(knowledge_dir) + git_changes = get_git_file_changes(repo_root, days) + + stale_entries = [] + fresh_entries = [] + + for entry in entries: + source_file = entry.get("source_file") + if not source_file: + # Entry without source file reference + fresh_entries.append({**entry, "status": "no_source"}) + continue + + # Check if source file was recently modified + is_stale = False + reason = "" + + if source_file in git_changes["modified"]: + is_stale = True + reason = "source_modified" + elif source_file in git_changes["deleted"]: + is_stale = True + reason = "source_deleted" + elif source_file in git_changes["added"]: + is_stale = True + reason = "source_added" + + # Also check hash if available + stored_hash = entry.get("source_hash") + if stored_hash: + full_path = os.path.join(repo_root, source_file) + current_hash = compute_file_hash(full_path) + + if current_hash is None: + is_stale = True + reason = "source_missing" + elif current_hash != stored_hash: + is_stale = True + reason = "hash_mismatch" + + if is_stale: + stale_entries.append({ + **entry, + "status": "stale", + "reason": reason + }) + else: + fresh_entries.append({**entry, "status": "fresh"}) + + # Compute summary + total = len(entries) + stale_count = len(stale_entries) + fresh_count = len(fresh_entries) + + # Group stale entries by reason + stale_by_reason = {} + for entry in stale_entries: + reason = entry.get("reason", "unknown") + if reason not in stale_by_reason: + stale_by_reason[reason] = 0 + stale_by_reason[reason] += 1 + + return { + "timestamp": datetime.now(timezone.utc).isoformat(), + "total_entries": total, + "stale_entries": stale_entries, + "fresh_entries": fresh_entries, + "git_changes": git_changes, + "summary": { + "total": total, + "stale": stale_count, + "fresh": fresh_count, + "stale_percentage": round(stale_count / total * 100, 1) if total > 0 else 0, + "stale_by_reason": stale_by_reason, + "git_changes_summary": { + "modified": len(git_changes["modified"]), + "added": len(git_changes["added"]), + "deleted": len(git_changes["deleted"]) + } + } + } + + +def update_stale_hashes(knowledge_dir: str, repo_root: str = ".") -> int: + """ + Update hashes for stale entries. Returns count of updated entries. + """ + entries = load_knowledge_entries(knowledge_dir) + updated = 0 + + # This is a simplified version - in practice, you'd need to + # write back to the specific YAML files + for entry in entries: + source_file = entry.get("source_file") + if not source_file: + continue + + full_path = os.path.join(repo_root, source_file) + current_hash = compute_file_hash(full_path) + + if current_hash and entry.get("source_hash") != current_hash: + # Mark for update (in practice, you'd write back to the file) + updated += 1 + + return updated + + +def format_report(result: Dict[str, Any], max_items: int = 20) -> str: + """Format freshness check results as a human-readable report.""" + timestamp = result["timestamp"] + summary = result["summary"] + stale_entries = result["stale_entries"] + git_changes = result["git_changes"] + + lines = [ + "Knowledge Freshness Report", + "=" * 50, + f"Generated: {timestamp}", + f"Total entries: {summary['total']}", + f"Stale entries: {summary['stale']} ({summary['stale_percentage']}%)", + f"Fresh entries: {summary['fresh']}", + "" + ] + + # Git changes summary + lines.extend([ + "Git Changes (last 24h):", + f" Modified: {len(git_changes['modified'])} files", + f" Added: {len(git_changes['added'])} files", + f" Deleted: {len(git_changes['deleted'])} files", + "" + ]) + + # Stale entries by reason + if summary.get("stale_by_reason"): + lines.extend([ + "Stale Entries by Reason:", + "" + ]) + for reason, count in summary["stale_by_reason"].items(): + lines.append(f" {reason}: {count}") + lines.append("") + + # List stale entries + if stale_entries: + lines.extend([ + "Stale Entries:", + "" + ]) + for i, entry in enumerate(stale_entries[:max_items], 1): + source = entry.get("source_file", "?") + reason = entry.get("reason", "unknown") + fact = entry.get("fact", "")[:60] + lines.append(f"{i:2d}. [{reason}] {source}") + if fact: + lines.append(f" {fact}") + + if len(stale_entries) > max_items: + lines.append(f"\n... and {len(stale_entries) - max_items} more") + else: + lines.append("No stale entries found. All knowledge is fresh!") + + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser( + description="Knowledge Freshness Cron — detect stale entries from code changes") + parser.add_argument("--knowledge-dir", required=True, + help="Path to knowledge directory") + parser.add_argument("--repo", default=".", + help="Path to repository for git change detection") + parser.add_argument("--days", type=int, default=1, + help="Number of days to check for git changes (default: 1)") + parser.add_argument("--json", action="store_true", + help="Output as JSON instead of human-readable") + parser.add_argument("--max", type=int, default=20, + help="Maximum stale entries to show (default: 20)") + parser.add_argument("--auto-reextract", action="store_true", + help="Auto-re-extract knowledge for stale entries") + + args = parser.parse_args() + + if not os.path.isdir(args.knowledge_dir): + print(f"Error: {args.knowledge_dir} is not a directory", file=sys.stderr) + sys.exit(1) + + if not os.path.isdir(args.repo): + print(f"Error: {args.repo} is not a directory", file=sys.stderr) + sys.exit(1) + + result = check_freshness(args.knowledge_dir, args.repo, args.days) + + if args.json: + print(json.dumps(result, indent=2)) + else: + print(format_report(result, args.max)) + + # Auto-re-extract if requested + if args.auto_reextract and result["stale_entries"]: + print(f"\nAuto-re-extracting {len(result['stale_entries'])} stale entries...") + # In a real implementation, this would call the harvester + print("(Auto-re-extraction not yet implemented)") + + +if __name__ == "__main__": + main() \ No newline at end of file