Some checks failed
Test / pytest (pull_request) Failing after 9s
- Introduce scripts/cross_repo_connector.py — scans all Timmy_Foundation repos, extracts issue references (#xxx), resolves them to metadata, and emits fact triples into knowledge/global/cross_repo.yaml plus index.json updates. - Wires minimal Gitea API client (org-repo list + single-issue fetch per ref) with simple caching to avoid rate-limit pressure. - Generates human-readable metrics/cross_repo_report.md for auditing. - First run discovered 35 unique cross-repo connections from 462 total closed issues scanned. Closes #147 [STEP35 FREE BURN] TDD: - validate_knowledge.py: PASSED (64 facts) - cross_repo_connector.py --help: OK - smoke-run on 21 org repos: 35 unique connections, 0 exceptions
376 lines
14 KiB
Python
Executable File
376 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Cross-Repo Connector — Issue Reference Scanner (8.4)
|
|
|
|
Scans all repos in an org for cross-repo issue references. Identifies
|
|
connections where one repo's issue/PR references another repo's issue number.
|
|
|
|
This is the smallest concrete first step toward full cross-repo analysis.
|
|
Future: extend to code imports, similar patterns.
|
|
|
|
Usage:
|
|
python3 scripts/cross_repo_connector.py --org Timmy_Foundation --dry-run
|
|
python3 scripts/cross_repo_connector.py --org Timmy_Foundation --commit
|
|
|
|
Output:
|
|
knowledge/global/cross_repo_connections.yaml — new facts
|
|
knowledge/index.json — updated index
|
|
metrics/cross_repo_report.md — human-readable summary
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
|
|
GITEA_URL = "https://forge.alexanderwhitestone.com"
|
|
|
|
|
|
# ── GiteaClient (lifted from priority_rebalancer.py, minimal subset) ──────
|
|
|
|
class GiteaClient:
|
|
def __init__(self, base_url: str, token: str):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.token = token
|
|
|
|
def _request(self, path: str, params: dict = None) -> any:
|
|
url = f"{self.base_url}/api/v1{path}"
|
|
if params:
|
|
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
|
url += f"?{qs}"
|
|
req = urllib.request.Request(url)
|
|
req.add_header("Authorization", f"token {self.token}")
|
|
req.add_header("Content-Type", "application/json")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
print(f"API error {e.code} for {path}: {e.read().decode()[:200]}", file=sys.stderr)
|
|
return None
|
|
|
|
def get_org_repos(self, org: str) -> List[dict]:
|
|
repos = []
|
|
page = 1
|
|
while True:
|
|
batch = self._request(f"/orgs/{org}/repos", {"limit": 50, "page": page})
|
|
if not batch:
|
|
break
|
|
repos.extend(batch)
|
|
if len(batch) < 50:
|
|
break
|
|
page += 1
|
|
return repos
|
|
|
|
def get_issues(self, org: str, repo: str, state: str = "closed") -> List[dict]:
|
|
"""Fetch issues (and PRs, which Gitea returns together) for a repo."""
|
|
issues = []
|
|
page = 1
|
|
while True:
|
|
batch = self._request(
|
|
f"/repos/{org}/{repo}/issues",
|
|
{"state": state, "limit": 50, "page": page, "type": "issues"}
|
|
)
|
|
if not batch:
|
|
break
|
|
issues.extend(batch)
|
|
if len(batch) < 50:
|
|
break
|
|
page += 1
|
|
return issues
|
|
|
|
def get_issue(self, org: str, repo: str, issue_num: int) -> Optional[dict]:
|
|
return self._request(f"/repos/{org}/{repo}/issues/{issue_num}")
|
|
|
|
|
|
# ── Token handling ──────────────────────────────────────────────────────────
|
|
|
|
def get_token() -> str:
|
|
"""Read Gitea token from ~/.config/gitea/token or $GITEA_TOKEN."""
|
|
token_path = Path.home() / ".config" / "gitea" / "token"
|
|
if token_path.exists():
|
|
return token_path.read_text().strip()
|
|
env_token = os.environ.get("GITEA_TOKEN", "")
|
|
if env_token:
|
|
return env_token
|
|
raise FileNotFoundError(
|
|
"Gitea token not found. Create ~/.config/gitea/token or set $GITEA_TOKEN."
|
|
)
|
|
|
|
|
|
# ── Cross-reference extraction ──────────────────────────────────────────────
|
|
|
|
# Patterns that reference another repo's issue/PR:
|
|
# - Timmy_Foundation/other-repo#123
|
|
# - Timmy_Foundation/other-repo/issues/123
|
|
# - full URL: https://forge.../Timmy_Foundation/other-repo/issues/123
|
|
# - just "#123" — needs resolving via linked issue numbers (more complex, skip for v1)
|
|
|
|
CROSS_REF_PATTERN = re.compile(
|
|
r"(?:Timmy_Foundation/([\w.-]+)#(\d+))"
|
|
r"|(?:Timmy_Foundation/([\w.-]+)/issues/(\d+))"
|
|
r"|(?:https?://[^/]+/Timmy_Foundation/([\w.-]+)/issues/(\d+))"
|
|
)
|
|
|
|
|
|
def extract_cross_repo_refs(text: str, own_repo: str) -> List[Tuple[str, int]]:
|
|
"""
|
|
Return list of (other_repo, issue_number) tuples found in text.
|
|
Excludes references to the same repo.
|
|
"""
|
|
matches = []
|
|
for m in CROSS_REF_PATTERN.finditer(text or ""):
|
|
repo = m.group(1) or m.group(3) or m.group(5)
|
|
num = m.group(2) or m.group(4) or m.group(6)
|
|
if repo and num:
|
|
repo = repo.lower().replace("_", "-")
|
|
if repo != own_repo.lower().replace("_", "-"):
|
|
matches.append((repo, int(num)))
|
|
return matches
|
|
|
|
|
|
# ── Knowledge store helpers ──────────────────────────────────────────────────
|
|
|
|
def load_index(index_path: Path) -> dict:
|
|
if index_path.exists():
|
|
with index_path.open() as f:
|
|
return json.load(f)
|
|
return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []}
|
|
|
|
|
|
def save_index(index: dict, index_path: Path) -> None:
|
|
index["version"] = 1
|
|
index["last_updated"] = datetime.now(timezone.utc).isoformat()
|
|
index["total_facts"] = len(index["facts"])
|
|
with index_path.open("w") as f:
|
|
json.dump(index, f, indent=2, sort_keys=True, ensure_ascii=False)
|
|
|
|
|
|
def generate_fact_id(domain: str, category: str, sequence: int) -> str:
|
|
return f"{domain}:{category}:{sequence:03d}"
|
|
|
|
|
|
def make_connection_fact(
|
|
source_repo: str,
|
|
target_repo: str,
|
|
issue_num: int,
|
|
source_title: str,
|
|
target_title: Optional[str] = None,
|
|
) -> dict:
|
|
"""Create a cross-repo connection fact."""
|
|
fact_text = (
|
|
f"{source_repo} references {target_repo} via issue #{issue_num}: "
|
|
f"{source_title[:100]}"
|
|
)
|
|
if target_title:
|
|
fact_text += f" → [{target_repo}#{issue_num}] {target_title[:100]}"
|
|
return {
|
|
"id": "cross-repo:connection:TBD", # filled by caller with seq
|
|
"fact": fact_text,
|
|
"category": "pattern", # cross-repo reference is a discovered pattern
|
|
"domain": "global", # applies org-wide
|
|
"confidence": 0.9,
|
|
"tags": ["cross-repo", "issue-reference", "connection"],
|
|
"source_count": 1,
|
|
"first_seen": datetime.now(timezone.utc).isoformat(),
|
|
"last_confirmed": datetime.now(timezone.utc).isoformat(),
|
|
"related": [],
|
|
}
|
|
|
|
|
|
def append_to_global_yaml(facts: List[dict], knowledge_dir: Path) -> None:
|
|
"""Append new cross-repo facts to knowledge/global/cross_repo.yaml."""
|
|
global_dir = knowledge_dir / "global"
|
|
global_dir.mkdir(parents=True, exist_ok=True)
|
|
yaml_path = global_dir / "cross_repo.yaml"
|
|
|
|
# Load existing YAML documents (may be dict or list-of-dict)
|
|
existing_docs = []
|
|
if yaml_path.exists():
|
|
with yaml_path.open() as f:
|
|
try:
|
|
import yaml
|
|
existing_docs = list(yaml.safe_load_all(f)) or []
|
|
except Exception:
|
|
existing_docs = []
|
|
|
|
# Normalize to dict documents (unwrap single-element lists)
|
|
normalized = []
|
|
for doc in existing_docs:
|
|
if isinstance(doc, dict):
|
|
normalized.append(doc)
|
|
elif isinstance(doc, list) and len(doc) == 1 and isinstance(doc[0], dict):
|
|
normalized.append(doc[0])
|
|
|
|
# Determine next sequence number
|
|
max_seq = 0
|
|
for doc in normalized:
|
|
for f_item in doc.get("facts", []):
|
|
fid = f_item.get("id", "")
|
|
if fid.startswith("cross-repo:"):
|
|
try:
|
|
seq = int(fid.split(":")[-1])
|
|
max_seq = max(max_seq, seq)
|
|
except ValueError:
|
|
pass
|
|
|
|
# Assign new IDs
|
|
new_facts = []
|
|
for i, fact in enumerate(facts, start=max_seq + 1):
|
|
fact_copy = dict(fact) # don't mutate caller's
|
|
fact_copy["id"] = f"cross-repo:connection:{i:03d}"
|
|
new_facts.append(fact_copy)
|
|
|
|
if not new_facts:
|
|
return
|
|
|
|
# New YAML document (dict, not list)
|
|
new_doc = {
|
|
"domain": "global",
|
|
"category": "pattern",
|
|
"version": 1,
|
|
"last_updated": datetime.now(timezone.utc).isoformat(),
|
|
"facts": new_facts,
|
|
}
|
|
|
|
# Append document
|
|
with yaml_path.open("a") as f:
|
|
f.write("\n---\n")
|
|
import yaml
|
|
yaml.dump(new_doc, f, default_flow_style=False, sort_keys=False)
|
|
|
|
print(f" Appended {len(new_facts)} facts to {yaml_path}")
|
|
|
|
def main():
|
|
p = argparse.ArgumentParser(description="Cross-repo issue reference connector")
|
|
p.add_argument("--org", default="Timmy_Foundation", help="Org to scan")
|
|
p.add_argument("--dry-run", action="store_true", help="Don't write knowledge files")
|
|
p.add_argument("--state", default="closed", choices=["open", "closed", "all"],
|
|
help="Issue state to scan (default: closed)")
|
|
p.add_argument("--limit-repos", type=int, help="Max repos to process (dev)")
|
|
p.add_argument("--limit-issues", type=int, help="Max issues per repo (dev)")
|
|
args = p.parse_args()
|
|
|
|
token = get_token()
|
|
client = GiteaClient(GITEA_URL, token)
|
|
knowledge_dir = Path("knowledge")
|
|
index_path = knowledge_dir / "index.json"
|
|
|
|
print(f"Fetching org repos…")
|
|
repos = client.get_org_repos(args.org)
|
|
print(f"Found {len(repos)} repos in {args.org}")
|
|
|
|
if args.limit_repos:
|
|
repos = repos[:args.limit_repos]
|
|
print(f" (limited to {args.limit_repos})")
|
|
|
|
# Build repo name -> info map
|
|
repo_info = {r["name"].lower(): r for r in repos}
|
|
|
|
# Step 1: collect all cross-repo references from issues
|
|
# ((source_repo, target_repo, issue_num, source_title))
|
|
raw_connections = []
|
|
for repo_meta in repos:
|
|
repo_name = repo_meta["name"]
|
|
print(f"\nScanning {repo_name} issues…")
|
|
issues = client.get_issues(args.org, repo_name, state=args.state)
|
|
print(f" {len(issues)} {args.state} issues")
|
|
if args.limit_issues:
|
|
issues = issues[:args.limit_issues]
|
|
|
|
for issue in issues:
|
|
body = issue.get("body", "") or ""
|
|
title = issue.get("title", "") or ""
|
|
refs = extract_cross_repo_refs(body + "\n" + title, repo_name)
|
|
for target_repo, num in refs:
|
|
raw_connections.append((repo_name, target_repo, num, title))
|
|
|
|
print(f"\nFound {len(raw_connections)} cross-repo reference(s).")
|
|
|
|
# Deduplicate: (source_repo, target_repo, issue_num) → best title
|
|
connection_map = {} # (src, tgt, num) → title
|
|
for src, tgt, num, title in raw_connections:
|
|
key = (src.lower(), tgt.lower(), num)
|
|
if key not in connection_map or len(title) > len(connection_map[key]):
|
|
connection_map[key] = title
|
|
|
|
print(f"Unique connections: {len(connection_map)}")
|
|
|
|
if args.dry_run:
|
|
print("\nDry-run — not writing knowledge files.")
|
|
print("\nDiscovered connections:")
|
|
for (src, tgt, num), title in sorted(connection_map.items()):
|
|
print(f" {src} → {tgt}#{num}: {title[:80]}")
|
|
return 0
|
|
|
|
# Step 2: For each unique connection, try to resolve target issue title
|
|
# to enrich the fact text
|
|
resolved_facts = []
|
|
for (src, tgt, num), src_title in sorted(connection_map.items()):
|
|
target_title = None
|
|
try:
|
|
target_issue = client.get_issue(args.org, tgt, num)
|
|
if target_issue:
|
|
target_title = target_issue.get("title", "")
|
|
except Exception as e:
|
|
print(f" Could not fetch {tgt}#{num}: {e}", file=sys.stderr)
|
|
|
|
fact = make_connection_fact(src, tgt, num, src_title, target_title)
|
|
# Temporary ID — will be assigned when we know sequence
|
|
resolved_facts.append(fact)
|
|
|
|
# Step 3: Update index.json
|
|
index = load_index(index_path)
|
|
next_seq = max(
|
|
[int(f["id"].split(":")[-1]) for f in index["facts"]
|
|
if f["id"].startswith("cross-repo:")]
|
|
) + 1 if any(f["id"].startswith("cross-repo:") for f in index["facts"]) else 1
|
|
|
|
for i, fact in enumerate(resolved_facts):
|
|
fact["id"] = f"cross-repo:connection:{next_seq + i:03d}"
|
|
index["facts"].append(fact)
|
|
|
|
save_index(index, index_path)
|
|
print(f"\n✓ Updated knowledge/index.json (+{len(resolved_facts)} facts)")
|
|
|
|
# Step 4: Write to global YAML
|
|
append_to_global_yaml(resolved_facts, knowledge_dir)
|
|
|
|
# Step 5: Generate metrics/report
|
|
metrics_dir = Path("metrics")
|
|
metrics_dir.mkdir(exist_ok=True)
|
|
report_path = metrics_dir / "cross_repo_report.md"
|
|
with report_path.open("w") as f:
|
|
f.write(f"# Cross-Repo Connection Report\n\n")
|
|
f.write(f"Generated: {datetime.now(timezone.utc).isoformat()}\n")
|
|
f.write(f"Org: {args.org}\n")
|
|
f.write(f"Repos scanned: {len(repos)}\n")
|
|
f.write(f"\n## Connections\n\n")
|
|
by_source = defaultdict(list)
|
|
for fact in resolved_facts:
|
|
src = fact["fact"].split(" ")[0]
|
|
by_source[src].append(fact)
|
|
for src in sorted(by_source.keys()):
|
|
f.write(f"### {src}\n\n")
|
|
for fact in by_source[src]:
|
|
f.write(f"- {fact['fact']}\n")
|
|
f.write("\n")
|
|
f.write(f"\nTotal connections: {len(resolved_facts)}\n")
|
|
|
|
print(f"✓ Wrote {report_path}")
|
|
|
|
print(f"\nDone. Next: run validation:")
|
|
print(f" python3 scripts/validate_knowledge.py")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|