Compare commits

..

1 Commits

Author SHA1 Message Date
20a59d0cb7 feat: knowledge store staleness detector (#92) 2026-04-15 03:42:12 +00:00
3 changed files with 221 additions and 273 deletions

View File

@@ -1,162 +0,0 @@
#!/usr/bin/env python3
"""
Gitea Issue Body Parser
Extracts structured data from Gitea issue markdown bodies:
- Title
- Context section
- Acceptance criteria (checkboxes)
- Labels
- Epic/parent references
Usage:
python3 scripts/gitea_issue_parser.py <issue_body.txt
python3 scripts/gitea_issue_parser.py --url https://forge.../api/v1/repos/.../issues/123
echo "issue body" | python3 scripts/gitea_issue_parser.py --stdin
Output: JSON with {title, context, criteria[], labels[], epic_ref}
"""
import argparse
import json
import re
import sys
from typing import Optional
def parse_issue_body(body: str, title: str = "", labels: list = None) -> dict:
"""Parse a Gitea issue body into structured JSON."""
result = {
"title": title,
"context": "",
"criteria": [],
"labels": labels or [],
"epic_ref": None,
"sections": {},
}
if not body:
return result
# Extract epic/parent reference from title or body
epic_pattern = r"#(\d+)"
title_refs = re.findall(epic_pattern, title)
body_refs = re.findall(epic_pattern, body[:200]) # Check early body refs
# Look for "Closes #N" or "Part of #N" or "Epic: #N"
close_match = re.search(r"(?:Closes?|Fixes?|Resolves?)\s+#(\d+)", body, re.IGNORECASE)
part_match = re.search(r"(?:Part of|Epic|Parent|Blocks?)\s+#(\d+)", body, re.IGNORECASE)
if close_match:
result["epic_ref"] = f"#{close_match.group(1)}"
elif part_match:
result["epic_ref"] = f"#{part_match.group(1)}"
elif title_refs:
result["epic_ref"] = f"#{title_refs[0]}"
elif body_refs:
result["epic_ref"] = f"#{body_refs[0]}"
# Split into sections by ## headers
section_pattern = r"^##\s+(.+)$"
lines = body.split("\n")
current_section = None
current_content = []
for line in lines:
header_match = re.match(section_pattern, line)
if header_match:
# Save previous section
if current_section:
result["sections"][current_section] = "\n".join(current_content).strip()
current_section = header_match.group(1).strip().lower()
current_content = []
else:
current_content.append(line)
# Save last section
if current_section:
result["sections"][current_section] = "\n".join(current_content).strip()
# Extract context
for key in ["context", "background", "description", "problem"]:
if key in result["sections"]:
result["context"] = result["sections"][key]
break
# Extract acceptance criteria (checkboxes)
criteria_section = None
for key in ["acceptance criteria", "acceptance_criteria", "criteria", "requirements", "definition of done"]:
if key in result["sections"]:
criteria_section = result["sections"][key]
break
if criteria_section:
checkbox_pattern = r"-\s*\[[ xX]?\]\s*(.+)"
for match in re.finditer(checkbox_pattern, criteria_section):
result["criteria"].append(match.group(1).strip())
# Also try plain numbered/bulleted lists if no checkboxes found
if not result["criteria"]:
list_pattern = r"^\s*(?:\d+\.|-|\*)\s+(.+)"
for match in re.finditer(list_pattern, criteria_section, re.MULTILINE):
result["criteria"].append(match.group(1).strip())
# If no sectioned criteria found, scan whole body for checkboxes
if not result["criteria"]:
for match in re.finditer(r"-\s*\[[ xX]?\]\s*(.+)", body):
result["criteria"].append(match.group(1).strip())
return result
def parse_from_url(api_url: str, token: str = None) -> dict:
"""Parse an issue from a Gitea API URL."""
import urllib.request
headers = {}
if token:
headers["Authorization"] = f"token {token}"
req = urllib.request.Request(api_url, headers=headers)
resp = json.loads(urllib.request.urlopen(req, timeout=30).read())
title = resp.get("title", "")
body = resp.get("body", "")
labels = [l["name"] for l in resp.get("labels", [])]
return parse_issue_body(body, title, labels)
def main():
parser = argparse.ArgumentParser(description="Parse Gitea issue body into structured JSON")
parser.add_argument("input", nargs="?", help="Issue body file (or - for stdin)")
parser.add_argument("--url", help="Gitea API URL for the issue")
parser.add_argument("--stdin", action="store_true", help="Read from stdin")
parser.add_argument("--token", help="Gitea API token (or set GITEA_TOKEN env var)")
parser.add_argument("--title", default="", help="Issue title (for epic ref extraction)")
parser.add_argument("--labels", nargs="*", default=[], help="Issue labels")
parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON output")
args = parser.parse_args()
import os
token = args.token or os.environ.get("GITEA_TOKEN")
if args.url:
result = parse_from_url(args.url, token)
elif args.stdin or (args.input and args.input == "-"):
body = sys.stdin.read()
result = parse_issue_body(body, args.title, args.labels)
elif args.input:
with open(args.input) as f:
body = f.read()
result = parse_issue_body(body, args.title, args.labels)
else:
parser.print_help()
sys.exit(1)
indent = 2 if args.pretty else None
print(json.dumps(result, indent=indent))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,221 @@
#!/usr/bin/env python3
"""
Knowledge Store Staleness Detector
Checks knowledge entries against their source files to detect staleness.
An entry is stale when its source file has been modified since extraction.
Usage:
python3 scripts/knowledge_staleness_check.py knowledge/index.json
python3 scripts/knowledge_staleness_check.py --repo /path/to/repo --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
Expected index.json format:
{
"version": 1,
"facts": [
{
"fact": "...",
"category": "fact|pitfall|pattern|tool-quirk",
"repo": "repo-name",
"confidence": 0.8,
"source_file": "path/to/file.py",
"source_hash": "sha256:abcdef...",
"extracted_at": "2026-04-13T20:00:00Z"
}
]
}
"""
import argparse
import hashlib
import json
import sys
from pathlib import Path
from typing import Optional
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file not found."""
path = Path(filepath)
if not path.exists():
return None
content = path.read_bytes()
return hashlib.sha256(content).hexdigest()[:16]
def check_staleness(index_path: str, repo_root: str = None) -> dict:
"""Check all entries in the knowledge index for staleness."""
index = Path(index_path)
if not index.exists():
return {"error": f"Index not found: {index_path}"}
data = json.loads(index.read_text())
facts = data.get("facts", [])
if not facts:
return {
"total": 0,
"stale": 0,
"fresh": 0,
"no_source": 0,
"missing_files": 0,
"stale_entries": [],
}
# Determine repo root
if repo_root:
root = Path(repo_root)
else:
root = index.parent.parent # knowledge/index.json -> repo root
results = {
"total": len(facts),
"stale": 0,
"fresh": 0,
"no_source": 0,
"missing_files": 0,
"stale_entries": [],
}
for i, entry in enumerate(facts):
source_file = entry.get("source_file")
stored_hash = entry.get("source_hash")
if not source_file:
results["no_source"] += 1
continue
if not stored_hash:
# Entry has source file but no hash — consider stale
results["stale"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"reason": "no_hash",
})
continue
# Compute current hash
full_path = root / source_file
current_hash = compute_file_hash(str(full_path))
if current_hash is None:
results["missing_files"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"reason": "file_missing",
})
elif current_hash != stored_hash:
results["stale"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"stored_hash": stored_hash,
"current_hash": current_hash,
"reason": "hash_mismatch",
})
else:
results["fresh"] += 1
return results
def add_hashes_to_index(index_path: str, repo_root: str = None) -> dict:
"""Add source hashes to entries that are missing them."""
index = Path(index_path)
data = json.loads(index.read_text())
facts = data.get("facts", [])
if repo_root:
root = Path(repo_root)
else:
root = index.parent.parent
updated = 0
skipped = 0
for entry in facts:
source_file = entry.get("source_file")
if not source_file or entry.get("source_hash"):
skipped += 1
continue
full_path = root / source_file
file_hash = compute_file_hash(str(full_path))
if file_hash:
entry["source_hash"] = file_hash
updated += 1
if updated > 0:
index.write_text(json.dumps(data, indent=2) + "\n")
return {"updated": updated, "skipped": skipped, "total": len(facts)}
def report_staleness(results: dict) -> str:
"""Format staleness check results as a report."""
lines = []
lines.append("=" * 50)
lines.append("KNOWLEDGE STORE STALENESS REPORT")
lines.append("=" * 50)
lines.append(f"Total entries: {results['total']}")
lines.append(f"Fresh: {results['fresh']}")
lines.append(f"Stale: {results['stale']}")
lines.append(f"No source: {results['no_source']}")
lines.append(f"Missing files: {results['missing_files']}")
lines.append("")
if results["stale_entries"]:
lines.append("STALE ENTRIES:")
lines.append("-" * 50)
for entry in results["stale_entries"]:
lines.append(f" [{entry['reason']}] {entry['source_file']}")
lines.append(f" {entry['fact']}")
if entry.get("stored_hash") and entry.get("current_hash"):
lines.append(f" stored: {entry['stored_hash']}")
lines.append(f" current: {entry['current_hash']}")
lines.append("")
if results["total"] > 0:
staleness_pct = results["stale"] / results["total"] * 100
lines.append(f"Staleness rate: {staleness_pct:.1f}%")
else:
lines.append("No entries to check.")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Check knowledge store for stale entries")
parser.add_argument("--index", default="knowledge/index.json", help="Path to index.json")
parser.add_argument("--repo", help="Repository root (default: auto-detect from index path)")
parser.add_argument("--fix", action="store_true", help="Add missing hashes to index")
parser.add_argument("--json", action="store_true", help="Output JSON instead of report")
args = parser.parse_args()
if args.fix:
result = add_hashes_to_index(args.index, args.repo)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"Updated {result['updated']} entries with source hashes.")
print(f"Skipped {result['skipped']} (already had hashes or no source file).")
else:
results = check_staleness(args.index, args.repo)
if "error" in results:
print(f"Error: {results['error']}", file=sys.stderr)
sys.exit(1)
if args.json:
print(json.dumps(results, indent=2))
else:
print(report_staleness(results))
if __name__ == "__main__":
main()

View File

@@ -1,111 +0,0 @@
#!/usr/bin/env python3
"""Tests for gitea_issue_parser."""
import json
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from gitea_issue_parser import parse_issue_body
def test_basic_structure():
body = """## Context
This is the background.
## Acceptance Criteria
- [ ] First criterion
- [x] Second criterion (already done)
- [ ] Third criterion
## Labels
`pipeline`, `extraction`
"""
result = parse_issue_body(body, "Test Issue", ["pipeline", "extraction"])
assert result["title"] == "Test Issue"
assert "background" in result["context"].lower()
assert len(result["criteria"]) == 3
assert "First criterion" in result["criteria"]
assert result["labels"] == ["pipeline", "extraction"]
print("PASS: test_basic_structure")
def test_epic_ref():
body = "Closes #645\n\nSome description."
result = parse_issue_body(body, "feat: thing (#688)")
assert result["epic_ref"] == "#645"
print("PASS: test_epic_ref")
def test_epic_ref_from_title():
body = "Some description without close ref."
result = parse_issue_body(body, "feat: scene descriptions (#645)")
assert result["epic_ref"] == "#645"
print("PASS: test_epic_ref_from_title")
def test_no_checkboxes():
body = """## Requirements
1. First thing
2. Second thing
3. Third thing
"""
result = parse_issue_body(body)
assert len(result["criteria"]) == 3
print("PASS: test_no_checkboxes")
def test_empty_body():
result = parse_issue_body("", "Empty Issue")
assert result["title"] == "Empty Issue"
assert result["criteria"] == []
assert result["context"] == ""
print("PASS: test_empty_body")
def test_real_issue_format():
body = """Closes #681
## Changes
Add `#!/usr/bin/env python3` shebang to 6 Python scripts.
## Verification
All 6 files confirmed missing shebangs before fix.
## Impact
Scripts can now be executed directly.
"""
result = parse_issue_body(body, "fix: add python3 shebangs (#685)")
assert result["epic_ref"] == "#681"
assert "shebang" in result["context"].lower()
print("PASS: test_real_issue_format")
def test_all_sections_captured():
body = """## Context
Background info.
## Acceptance Criteria
- [ ] Do thing
## Labels
`test`
"""
result = parse_issue_body(body)
assert "context" in result["sections"]
assert "acceptance criteria" in result["sections"]
print("PASS: test_all_sections_captured")
if __name__ == "__main__":
test_basic_structure()
test_epic_ref()
test_epic_ref_from_title()
test_no_checkboxes()
test_empty_body()
test_real_issue_format()
test_all_sections_captured()
print("\nAll tests passed.")