feat(deepdive): add Phase 0 fleet context grounding module (#830)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled

This commit is contained in:
2026-04-05 17:32:22 +00:00
parent 3158d91786
commit 7189565d4d

View File

@@ -0,0 +1,200 @@
#!/usr/bin/env python3
"""Fleet Context Grounding — Phase 0 for Deep Dive.
Fetches live world-state from Gitea to inject into synthesis,
ensuring briefings are grounded in actual fleet motion rather than
static assumptions.
"""
import json
import logging
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Dict, List, Optional
try:
import httpx
HAS_HTTPX = True
except ImportError:
HAS_HTTPX = False
httpx = None
logger = logging.getLogger("deepdive.fleet_context")
@dataclass
class FleetContext:
"""Compact snapshot of fleet world-state."""
generated_at: str
repos: List[Dict]
open_issues: List[Dict]
recent_commits: List[Dict]
open_prs: List[Dict]
def to_markdown(self, max_items_per_section: int = 5) -> str:
lines = [
"## Fleet Context Snapshot",
f"*Generated: {self.generated_at}*",
"",
"### Active Repositories",
]
for repo in self.repos[:max_items_per_section]:
lines.append(
f"- **{repo['name']}** — {repo.get('open_issues_count', 0)} open issues, "
f"{repo.get('open_prs_count', 0)} open PRs"
)
lines.append("")
lines.append("### Recent Commits")
for commit in self.recent_commits[:max_items_per_section]:
lines.append(
f"- `{commit['repo']}`: {commit['message']}{commit['author']} ({commit['when']})"
)
lines.append("")
lines.append("### Open Issues / PRs")
for issue in self.open_issues[:max_items_per_section]:
lines.append(
f"- `{issue['repo']} #{issue['number']}`: {issue['title']} ({issue['state']})"
)
lines.append("")
return "\n".join(lines)
def to_prompt_text(self, max_items_per_section: int = 5) -> str:
return self.to_markdown(max_items_per_section)
class GiteaFleetClient:
"""Fetch fleet state from Gitea API."""
def __init__(self, base_url: str, token: Optional[str] = None):
self.base_url = base_url.rstrip("/")
self.token = token
self.headers = {"Content-Type": "application/json"}
if token:
self.headers["Authorization"] = f"token {token}"
def _get(self, path: str) -> Optional[List[Dict]]:
if not HAS_HTTPX:
logger.warning("httpx not installed — cannot fetch fleet context")
return None
url = f"{self.base_url}/api/v1{path}"
try:
resp = httpx.get(url, headers=self.headers, timeout=30.0)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Gitea API error ({path}): {e}")
return None
def fetch_repo_summary(self, owner: str, repo: str) -> Optional[Dict]:
data = self._get(f"/repos/{owner}/{repo}")
if not data:
return None
return {
"name": data.get("name"),
"full_name": data.get("full_name"),
"open_issues_count": data.get("open_issues_count", 0),
"open_prs_count": data.get("open_pr_counter", 0),
"updated_at": data.get("updated_at"),
}
def fetch_open_issues(self, owner: str, repo: str, limit: int = 10) -> List[Dict]:
data = self._get(f"/repos/{owner}/{repo}/issues?state=open&limit={limit}")
if not data:
return []
return [
{
"repo": repo,
"number": item.get("number"),
"title": item.get("title", ""),
"state": item.get("state", ""),
"url": item.get("html_url", ""),
"updated_at": item.get("updated_at", ""),
}
for item in data
]
def fetch_recent_commits(self, owner: str, repo: str, limit: int = 5) -> List[Dict]:
data = self._get(f"/repos/{owner}/{repo}/commits?limit={limit}")
if not data:
return []
commits = []
for item in data:
commit_info = item.get("commit", {})
author_info = commit_info.get("author", {})
commits.append(
{
"repo": repo,
"sha": item.get("sha", "")[:7],
"message": commit_info.get("message", "").split("\n")[0],
"author": author_info.get("name", "unknown"),
"when": author_info.get("date", ""),
}
)
return commits
def fetch_open_prs(self, owner: str, repo: str, limit: int = 5) -> List[Dict]:
data = self._get(f"/repos/{owner}/{repo}/pulls?state=open&limit={limit}")
if not data:
return []
return [
{
"repo": repo,
"number": item.get("number"),
"title": item.get("title", ""),
"state": "open",
"url": item.get("html_url", ""),
"author": item.get("user", {}).get("login", ""),
}
for item in data
]
def build_fleet_context(config: Dict) -> Optional[FleetContext]:
"""Build fleet context from configuration."""
fleet_cfg = config.get("fleet_context", {})
if not fleet_cfg.get("enabled", False):
logger.info("Fleet context disabled")
return None
base_url = fleet_cfg.get(
"gitea_url", os.environ.get("GITEA_URL", "http://localhost:3000")
)
token = fleet_cfg.get("token", os.environ.get("GITEA_TOKEN"))
repos = fleet_cfg.get("repos", [])
owner = fleet_cfg.get("owner", "Timmy_Foundation")
if not repos:
logger.warning("Fleet context enabled but no repos configured")
return None
client = GiteaFleetClient(base_url, token)
repo_summaries = []
all_issues = []
all_commits = []
all_prs = []
for repo in repos:
summary = client.fetch_repo_summary(owner, repo)
if summary:
repo_summaries.append(summary)
all_issues.extend(client.fetch_open_issues(owner, repo, limit=5))
all_commits.extend(client.fetch_recent_commits(owner, repo, limit=3))
all_prs.extend(client.fetch_open_prs(owner, repo, limit=3))
all_issues.sort(key=lambda x: x.get("updated_at", ""), reverse=True)
all_commits.sort(key=lambda x: x.get("when", ""), reverse=True)
all_prs.sort(key=lambda x: x.get("number", 0), reverse=True)
combined = all_issues + all_prs
combined.sort(key=lambda x: x.get("updated_at", x.get("when", "")), reverse=True)
return FleetContext(
generated_at=datetime.now(timezone.utc).isoformat(),
repos=repo_summaries,
open_issues=combined[:10],
recent_commits=all_commits[:10],
open_prs=all_prs[:5],
)