#!/usr/bin/env python3 """Fleet Context Grounding — Phase 0 for Deep Dive. Fetches live world-state from Gitea to inject into synthesis, ensuring briefings are grounded in actual fleet motion rather than static assumptions. """ import json import logging import os from dataclasses import dataclass from datetime import datetime, timezone from typing import Dict, List, Optional try: import httpx HAS_HTTPX = True except ImportError: HAS_HTTPX = False httpx = None logger = logging.getLogger("deepdive.fleet_context") @dataclass class FleetContext: """Compact snapshot of fleet world-state.""" generated_at: str repos: List[Dict] open_issues: List[Dict] recent_commits: List[Dict] open_prs: List[Dict] def to_markdown(self, max_items_per_section: int = 5) -> str: lines = [ "## Fleet Context Snapshot", f"*Generated: {self.generated_at}*", "", "### Active Repositories", ] for repo in self.repos[:max_items_per_section]: lines.append( f"- **{repo['name']}** — {repo.get('open_issues_count', 0)} open issues, " f"{repo.get('open_prs_count', 0)} open PRs" ) lines.append("") lines.append("### Recent Commits") for commit in self.recent_commits[:max_items_per_section]: lines.append( f"- `{commit['repo']}`: {commit['message']} — {commit['author']} ({commit['when']})" ) lines.append("") lines.append("### Open Issues / PRs") for issue in self.open_issues[:max_items_per_section]: lines.append( f"- `{issue['repo']} #{issue['number']}`: {issue['title']} ({issue['state']})" ) lines.append("") return "\n".join(lines) def to_prompt_text(self, max_items_per_section: int = 5) -> str: return self.to_markdown(max_items_per_section) class GiteaFleetClient: """Fetch fleet state from Gitea API.""" def __init__(self, base_url: str, token: Optional[str] = None): self.base_url = base_url.rstrip("/") self.token = token self.headers = {"Content-Type": "application/json"} if token: self.headers["Authorization"] = f"token {token}" def _get(self, path: str) -> Optional[List[Dict]]: if not HAS_HTTPX: logger.warning("httpx not installed — cannot fetch fleet context") return None url = f"{self.base_url}/api/v1{path}" try: resp = httpx.get(url, headers=self.headers, timeout=30.0) resp.raise_for_status() return resp.json() except Exception as e: logger.error(f"Gitea API error ({path}): {e}") return None def fetch_repo_summary(self, owner: str, repo: str) -> Optional[Dict]: data = self._get(f"/repos/{owner}/{repo}") if not data: return None return { "name": data.get("name"), "full_name": data.get("full_name"), "open_issues_count": data.get("open_issues_count", 0), "open_prs_count": data.get("open_pr_counter", 0), "updated_at": data.get("updated_at"), } def fetch_open_issues(self, owner: str, repo: str, limit: int = 10) -> List[Dict]: data = self._get(f"/repos/{owner}/{repo}/issues?state=open&limit={limit}") if not data: return [] return [ { "repo": repo, "number": item.get("number"), "title": item.get("title", ""), "state": item.get("state", ""), "url": item.get("html_url", ""), "updated_at": item.get("updated_at", ""), } for item in data ] def fetch_recent_commits(self, owner: str, repo: str, limit: int = 5) -> List[Dict]: data = self._get(f"/repos/{owner}/{repo}/commits?limit={limit}") if not data: return [] commits = [] for item in data: commit_info = item.get("commit", {}) author_info = commit_info.get("author", {}) commits.append( { "repo": repo, "sha": item.get("sha", "")[:7], "message": commit_info.get("message", "").split("\n")[0], "author": author_info.get("name", "unknown"), "when": author_info.get("date", ""), } ) return commits def fetch_open_prs(self, owner: str, repo: str, limit: int = 5) -> List[Dict]: data = self._get(f"/repos/{owner}/{repo}/pulls?state=open&limit={limit}") if not data: return [] return [ { "repo": repo, "number": item.get("number"), "title": item.get("title", ""), "state": "open", "url": item.get("html_url", ""), "author": item.get("user", {}).get("login", ""), } for item in data ] def build_fleet_context(config: Dict) -> Optional[FleetContext]: """Build fleet context from configuration.""" fleet_cfg = config.get("fleet_context", {}) if not fleet_cfg.get("enabled", False): logger.info("Fleet context disabled") return None def _resolve_env(value): if isinstance(value, str) and value.startswith("${") and value.endswith("}"): return os.environ.get(value[2:-1], "") return value base_url = _resolve_env(fleet_cfg.get( "gitea_url", os.environ.get("GITEA_URL", "http://localhost:3000") )) token = _resolve_env(fleet_cfg.get("token", os.environ.get("GITEA_TOKEN"))) repos = fleet_cfg.get("repos", []) owner = _resolve_env(fleet_cfg.get("owner", "Timmy_Foundation")) if not repos: logger.warning("Fleet context enabled but no repos configured") return None client = GiteaFleetClient(base_url, token) repo_summaries = [] all_issues = [] all_commits = [] all_prs = [] for repo in repos: summary = client.fetch_repo_summary(owner, repo) if summary: repo_summaries.append(summary) all_issues.extend(client.fetch_open_issues(owner, repo, limit=5)) all_commits.extend(client.fetch_recent_commits(owner, repo, limit=3)) all_prs.extend(client.fetch_open_prs(owner, repo, limit=3)) all_issues.sort(key=lambda x: x.get("updated_at", ""), reverse=True) all_commits.sort(key=lambda x: x.get("when", ""), reverse=True) all_prs.sort(key=lambda x: x.get("number", 0), reverse=True) combined = all_issues + all_prs combined.sort(key=lambda x: x.get("updated_at", x.get("when", "")), reverse=True) return FleetContext( generated_at=datetime.now(timezone.utc).isoformat(), repos=repo_summaries, open_issues=combined[:10], recent_commits=all_commits[:10], open_prs=all_prs[:5], )