diff --git a/intelligence/deepdive/fleet_context.py b/intelligence/deepdive/fleet_context.py new file mode 100644 index 0000000..5ac7b9b --- /dev/null +++ b/intelligence/deepdive/fleet_context.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +"""Fleet Context Grounding — Phase 0 for Deep Dive. + +Fetches live world-state from Gitea to inject into synthesis, +ensuring briefings are grounded in actual fleet motion rather than +static assumptions. +""" + +import json +import logging +import os +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Dict, List, Optional + +try: + import httpx + HAS_HTTPX = True +except ImportError: + HAS_HTTPX = False + httpx = None + +logger = logging.getLogger("deepdive.fleet_context") + + +@dataclass +class FleetContext: + """Compact snapshot of fleet world-state.""" + + generated_at: str + repos: List[Dict] + open_issues: List[Dict] + recent_commits: List[Dict] + open_prs: List[Dict] + + def to_markdown(self, max_items_per_section: int = 5) -> str: + lines = [ + "## Fleet Context Snapshot", + f"*Generated: {self.generated_at}*", + "", + "### Active Repositories", + ] + for repo in self.repos[:max_items_per_section]: + lines.append( + f"- **{repo['name']}** — {repo.get('open_issues_count', 0)} open issues, " + f"{repo.get('open_prs_count', 0)} open PRs" + ) + lines.append("") + lines.append("### Recent Commits") + for commit in self.recent_commits[:max_items_per_section]: + lines.append( + f"- `{commit['repo']}`: {commit['message']} — {commit['author']} ({commit['when']})" + ) + lines.append("") + lines.append("### Open Issues / PRs") + for issue in self.open_issues[:max_items_per_section]: + lines.append( + f"- `{issue['repo']} #{issue['number']}`: {issue['title']} ({issue['state']})" + ) + lines.append("") + return "\n".join(lines) + + def to_prompt_text(self, max_items_per_section: int = 5) -> str: + return self.to_markdown(max_items_per_section) + + +class GiteaFleetClient: + """Fetch fleet state from Gitea API.""" + + def __init__(self, base_url: str, token: Optional[str] = None): + self.base_url = base_url.rstrip("/") + self.token = token + self.headers = {"Content-Type": "application/json"} + if token: + self.headers["Authorization"] = f"token {token}" + + def _get(self, path: str) -> Optional[List[Dict]]: + if not HAS_HTTPX: + logger.warning("httpx not installed — cannot fetch fleet context") + return None + url = f"{self.base_url}/api/v1{path}" + try: + resp = httpx.get(url, headers=self.headers, timeout=30.0) + resp.raise_for_status() + return resp.json() + except Exception as e: + logger.error(f"Gitea API error ({path}): {e}") + return None + + def fetch_repo_summary(self, owner: str, repo: str) -> Optional[Dict]: + data = self._get(f"/repos/{owner}/{repo}") + if not data: + return None + return { + "name": data.get("name"), + "full_name": data.get("full_name"), + "open_issues_count": data.get("open_issues_count", 0), + "open_prs_count": data.get("open_pr_counter", 0), + "updated_at": data.get("updated_at"), + } + + def fetch_open_issues(self, owner: str, repo: str, limit: int = 10) -> List[Dict]: + data = self._get(f"/repos/{owner}/{repo}/issues?state=open&limit={limit}") + if not data: + return [] + return [ + { + "repo": repo, + "number": item.get("number"), + "title": item.get("title", ""), + "state": item.get("state", ""), + "url": item.get("html_url", ""), + "updated_at": item.get("updated_at", ""), + } + for item in data + ] + + def fetch_recent_commits(self, owner: str, repo: str, limit: int = 5) -> List[Dict]: + data = self._get(f"/repos/{owner}/{repo}/commits?limit={limit}") + if not data: + return [] + commits = [] + for item in data: + commit_info = item.get("commit", {}) + author_info = commit_info.get("author", {}) + commits.append( + { + "repo": repo, + "sha": item.get("sha", "")[:7], + "message": commit_info.get("message", "").split("\n")[0], + "author": author_info.get("name", "unknown"), + "when": author_info.get("date", ""), + } + ) + return commits + + def fetch_open_prs(self, owner: str, repo: str, limit: int = 5) -> List[Dict]: + data = self._get(f"/repos/{owner}/{repo}/pulls?state=open&limit={limit}") + if not data: + return [] + return [ + { + "repo": repo, + "number": item.get("number"), + "title": item.get("title", ""), + "state": "open", + "url": item.get("html_url", ""), + "author": item.get("user", {}).get("login", ""), + } + for item in data + ] + + +def build_fleet_context(config: Dict) -> Optional[FleetContext]: + """Build fleet context from configuration.""" + fleet_cfg = config.get("fleet_context", {}) + if not fleet_cfg.get("enabled", False): + logger.info("Fleet context disabled") + return None + + base_url = fleet_cfg.get( + "gitea_url", os.environ.get("GITEA_URL", "http://localhost:3000") + ) + token = fleet_cfg.get("token", os.environ.get("GITEA_TOKEN")) + repos = fleet_cfg.get("repos", []) + owner = fleet_cfg.get("owner", "Timmy_Foundation") + + if not repos: + logger.warning("Fleet context enabled but no repos configured") + return None + + client = GiteaFleetClient(base_url, token) + + repo_summaries = [] + all_issues = [] + all_commits = [] + all_prs = [] + + for repo in repos: + summary = client.fetch_repo_summary(owner, repo) + if summary: + repo_summaries.append(summary) + all_issues.extend(client.fetch_open_issues(owner, repo, limit=5)) + all_commits.extend(client.fetch_recent_commits(owner, repo, limit=3)) + all_prs.extend(client.fetch_open_prs(owner, repo, limit=3)) + + all_issues.sort(key=lambda x: x.get("updated_at", ""), reverse=True) + all_commits.sort(key=lambda x: x.get("when", ""), reverse=True) + all_prs.sort(key=lambda x: x.get("number", 0), reverse=True) + + combined = all_issues + all_prs + combined.sort(key=lambda x: x.get("updated_at", x.get("when", "")), reverse=True) + + return FleetContext( + generated_at=datetime.now(timezone.utc).isoformat(), + repos=repo_summaries, + open_issues=combined[:10], + recent_commits=all_commits[:10], + open_prs=all_prs[:5], + )