387 lines
13 KiB
Python
387 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Knowledge Freshness Cron — Detect stale entries from code changes (Issue #200)
|
|
|
|
Automatically detects when knowledge entries become stale due to code changes.
|
|
|
|
Detection Method:
|
|
1. Track source file hash alongside knowledge entry
|
|
2. Compare current file hashes vs stored
|
|
3. Mismatch → flag entry as potentially stale
|
|
4. Report stale entries and optionally re-extract
|
|
|
|
Usage:
|
|
python3 scripts/freshness.py --knowledge-dir knowledge/
|
|
python3 scripts/freshness.py --knowledge-dir knowledge/ --json
|
|
python3 scripts/freshness.py --knowledge-dir knowledge/ --repo /path/to/repo
|
|
python3 scripts/freshness.py --knowledge-dir knowledge/ --auto-reextract
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import yaml
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
|
|
|
|
def compute_file_hash(filepath: str) -> Optional[str]:
|
|
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
|
|
try:
|
|
with open(filepath, "rb") as f:
|
|
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
|
|
except (FileNotFoundError, IsADirectoryError, PermissionError):
|
|
return None
|
|
|
|
|
|
def get_git_file_changes(repo_path: str, days: int = 1) -> Dict[str, List[str]]:
|
|
"""
|
|
Get files changed in git in the last N days.
|
|
|
|
Returns dict with 'modified', 'added', 'deleted' lists of file paths.
|
|
"""
|
|
changes = {"modified": [], "added": [], "deleted": []}
|
|
|
|
try:
|
|
# Get commits from last N days
|
|
cmd = [
|
|
"git", "-C", repo_path, "log",
|
|
f"--since={days} days ago",
|
|
"--name-status",
|
|
"--pretty=format:",
|
|
"--diff-filter=MAD"
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
if result.returncode != 0:
|
|
return changes
|
|
|
|
for line in result.stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
parts = line.split('\t', 1)
|
|
if len(parts) != 2:
|
|
continue
|
|
|
|
status, filepath = parts
|
|
if status == 'M':
|
|
changes["modified"].append(filepath)
|
|
elif status == 'A':
|
|
changes["added"].append(filepath)
|
|
elif status == 'D':
|
|
changes["deleted"].append(filepath)
|
|
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
pass
|
|
|
|
# Deduplicate
|
|
for key in changes:
|
|
changes[key] = list(set(changes[key]))
|
|
|
|
return changes
|
|
|
|
|
|
def load_knowledge_entries(knowledge_dir: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Load knowledge entries from YAML files in the knowledge directory.
|
|
|
|
Supports:
|
|
- knowledge/index.json (legacy format)
|
|
- knowledge/global/*.yaml
|
|
- knowledge/repos/*.yaml
|
|
- knowledge/agents/*.yaml
|
|
"""
|
|
entries = []
|
|
|
|
# Load from index.json if exists
|
|
index_path = os.path.join(knowledge_dir, "index.json")
|
|
if os.path.exists(index_path):
|
|
try:
|
|
with open(index_path) as f:
|
|
data = json.load(f)
|
|
for fact in data.get("facts", []):
|
|
entries.append({
|
|
"source": "index.json",
|
|
"fact": fact.get("fact", ""),
|
|
"source_file": fact.get("source_file"),
|
|
"source_hash": fact.get("source_hash"),
|
|
"category": fact.get("category", "unknown"),
|
|
"confidence": fact.get("confidence", 0.5)
|
|
})
|
|
except (json.JSONDecodeError, KeyError):
|
|
pass
|
|
|
|
# Load from YAML files
|
|
for subdir in ["global", "repos", "agents"]:
|
|
subdir_path = os.path.join(knowledge_dir, subdir)
|
|
if not os.path.isdir(subdir_path):
|
|
continue
|
|
|
|
for filename in os.listdir(subdir_path):
|
|
if not filename.endswith((".yaml", ".yml")):
|
|
continue
|
|
|
|
filepath = os.path.join(subdir_path, filename)
|
|
try:
|
|
with open(filepath) as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if not data or not isinstance(data, dict):
|
|
continue
|
|
|
|
# Extract entries from YAML structure
|
|
for key, value in data.items():
|
|
if isinstance(value, list):
|
|
for item in value:
|
|
if isinstance(item, dict):
|
|
entries.append({
|
|
"source": f"{subdir}/{filename}",
|
|
"fact": item.get("description", item.get("fact", "")),
|
|
"source_file": item.get("source_file"),
|
|
"source_hash": item.get("source_hash"),
|
|
"category": item.get("category", "unknown"),
|
|
"confidence": item.get("confidence", 0.5)
|
|
})
|
|
elif isinstance(value, dict):
|
|
entries.append({
|
|
"source": f"{subdir}/{filename}",
|
|
"fact": value.get("description", value.get("fact", "")),
|
|
"source_file": value.get("source_file"),
|
|
"source_hash": value.get("source_hash"),
|
|
"category": value.get("category", "unknown"),
|
|
"confidence": value.get("confidence", 0.5)
|
|
})
|
|
except (yaml.YAMLError, IOError):
|
|
pass
|
|
|
|
return entries
|
|
|
|
|
|
def check_freshness(knowledge_dir: str, repo_root: str = ".",
|
|
days: int = 1) -> Dict[str, Any]:
|
|
"""
|
|
Check freshness of knowledge entries against recent code changes.
|
|
|
|
Returns:
|
|
{
|
|
"timestamp": ISO timestamp,
|
|
"total_entries": int,
|
|
"stale_entries": [...],
|
|
"fresh_entries": [...],
|
|
"git_changes": {...},
|
|
"summary": {...}
|
|
}
|
|
"""
|
|
entries = load_knowledge_entries(knowledge_dir)
|
|
git_changes = get_git_file_changes(repo_root, days)
|
|
|
|
stale_entries = []
|
|
fresh_entries = []
|
|
|
|
for entry in entries:
|
|
source_file = entry.get("source_file")
|
|
if not source_file:
|
|
# Entry without source file reference
|
|
fresh_entries.append({**entry, "status": "no_source"})
|
|
continue
|
|
|
|
# Check if source file was recently modified
|
|
is_stale = False
|
|
reason = ""
|
|
|
|
if source_file in git_changes["modified"]:
|
|
is_stale = True
|
|
reason = "source_modified"
|
|
elif source_file in git_changes["deleted"]:
|
|
is_stale = True
|
|
reason = "source_deleted"
|
|
elif source_file in git_changes["added"]:
|
|
is_stale = True
|
|
reason = "source_added"
|
|
|
|
# Also check hash if available
|
|
stored_hash = entry.get("source_hash")
|
|
if stored_hash:
|
|
full_path = os.path.join(repo_root, source_file)
|
|
current_hash = compute_file_hash(full_path)
|
|
|
|
if current_hash is None:
|
|
is_stale = True
|
|
reason = "source_missing"
|
|
elif current_hash != stored_hash:
|
|
is_stale = True
|
|
reason = "hash_mismatch"
|
|
|
|
if is_stale:
|
|
stale_entries.append({
|
|
**entry,
|
|
"status": "stale",
|
|
"reason": reason
|
|
})
|
|
else:
|
|
fresh_entries.append({**entry, "status": "fresh"})
|
|
|
|
# Compute summary
|
|
total = len(entries)
|
|
stale_count = len(stale_entries)
|
|
fresh_count = len(fresh_entries)
|
|
|
|
# Group stale entries by reason
|
|
stale_by_reason = {}
|
|
for entry in stale_entries:
|
|
reason = entry.get("reason", "unknown")
|
|
if reason not in stale_by_reason:
|
|
stale_by_reason[reason] = 0
|
|
stale_by_reason[reason] += 1
|
|
|
|
return {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"total_entries": total,
|
|
"stale_entries": stale_entries,
|
|
"fresh_entries": fresh_entries,
|
|
"git_changes": git_changes,
|
|
"summary": {
|
|
"total": total,
|
|
"stale": stale_count,
|
|
"fresh": fresh_count,
|
|
"stale_percentage": round(stale_count / total * 100, 1) if total > 0 else 0,
|
|
"stale_by_reason": stale_by_reason,
|
|
"git_changes_summary": {
|
|
"modified": len(git_changes["modified"]),
|
|
"added": len(git_changes["added"]),
|
|
"deleted": len(git_changes["deleted"])
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
def update_stale_hashes(knowledge_dir: str, repo_root: str = ".") -> int:
|
|
"""
|
|
Update hashes for stale entries. Returns count of updated entries.
|
|
"""
|
|
entries = load_knowledge_entries(knowledge_dir)
|
|
updated = 0
|
|
|
|
# This is a simplified version - in practice, you'd need to
|
|
# write back to the specific YAML files
|
|
for entry in entries:
|
|
source_file = entry.get("source_file")
|
|
if not source_file:
|
|
continue
|
|
|
|
full_path = os.path.join(repo_root, source_file)
|
|
current_hash = compute_file_hash(full_path)
|
|
|
|
if current_hash and entry.get("source_hash") != current_hash:
|
|
# Mark for update (in practice, you'd write back to the file)
|
|
updated += 1
|
|
|
|
return updated
|
|
|
|
|
|
def format_report(result: Dict[str, Any], max_items: int = 20) -> str:
|
|
"""Format freshness check results as a human-readable report."""
|
|
timestamp = result["timestamp"]
|
|
summary = result["summary"]
|
|
stale_entries = result["stale_entries"]
|
|
git_changes = result["git_changes"]
|
|
|
|
lines = [
|
|
"Knowledge Freshness Report",
|
|
"=" * 50,
|
|
f"Generated: {timestamp}",
|
|
f"Total entries: {summary['total']}",
|
|
f"Stale entries: {summary['stale']} ({summary['stale_percentage']}%)",
|
|
f"Fresh entries: {summary['fresh']}",
|
|
""
|
|
]
|
|
|
|
# Git changes summary
|
|
lines.extend([
|
|
"Git Changes (last 24h):",
|
|
f" Modified: {len(git_changes['modified'])} files",
|
|
f" Added: {len(git_changes['added'])} files",
|
|
f" Deleted: {len(git_changes['deleted'])} files",
|
|
""
|
|
])
|
|
|
|
# Stale entries by reason
|
|
if summary.get("stale_by_reason"):
|
|
lines.extend([
|
|
"Stale Entries by Reason:",
|
|
""
|
|
])
|
|
for reason, count in summary["stale_by_reason"].items():
|
|
lines.append(f" {reason}: {count}")
|
|
lines.append("")
|
|
|
|
# List stale entries
|
|
if stale_entries:
|
|
lines.extend([
|
|
"Stale Entries:",
|
|
""
|
|
])
|
|
for i, entry in enumerate(stale_entries[:max_items], 1):
|
|
source = entry.get("source_file", "?")
|
|
reason = entry.get("reason", "unknown")
|
|
fact = entry.get("fact", "")[:60]
|
|
lines.append(f"{i:2d}. [{reason}] {source}")
|
|
if fact:
|
|
lines.append(f" {fact}")
|
|
|
|
if len(stale_entries) > max_items:
|
|
lines.append(f"\n... and {len(stale_entries) - max_items} more")
|
|
else:
|
|
lines.append("No stale entries found. All knowledge is fresh!")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Knowledge Freshness Cron — detect stale entries from code changes")
|
|
parser.add_argument("--knowledge-dir", required=True,
|
|
help="Path to knowledge directory")
|
|
parser.add_argument("--repo", default=".",
|
|
help="Path to repository for git change detection")
|
|
parser.add_argument("--days", type=int, default=1,
|
|
help="Number of days to check for git changes (default: 1)")
|
|
parser.add_argument("--json", action="store_true",
|
|
help="Output as JSON instead of human-readable")
|
|
parser.add_argument("--max", type=int, default=20,
|
|
help="Maximum stale entries to show (default: 20)")
|
|
parser.add_argument("--auto-reextract", action="store_true",
|
|
help="Auto-re-extract knowledge for stale entries")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not os.path.isdir(args.knowledge_dir):
|
|
print(f"Error: {args.knowledge_dir} is not a directory", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if not os.path.isdir(args.repo):
|
|
print(f"Error: {args.repo} is not a directory", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
result = check_freshness(args.knowledge_dir, args.repo, args.days)
|
|
|
|
if args.json:
|
|
print(json.dumps(result, indent=2))
|
|
else:
|
|
print(format_report(result, args.max))
|
|
|
|
# Auto-re-extract if requested
|
|
if args.auto_reextract and result["stale_entries"]:
|
|
print(f"\nAuto-re-extracting {len(result['stale_entries'])} stale entries...")
|
|
# In a real implementation, this would call the harvester
|
|
print("(Auto-re-extraction not yet implemented)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |