746 lines
24 KiB
Python
746 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""Weekly narrative summary generator — human-readable loop analysis.
|
|
|
|
Analyzes the past week's activity across the development loop to produce
|
|
a narrative summary of:
|
|
- What changed (themes, areas of focus)
|
|
- How agents and Timmy contributed
|
|
- Any shifts in tests, triage, or token economy
|
|
|
|
The output is designed to be skimmable — a quick read that gives context
|
|
on the week's progress without drowning in metrics.
|
|
|
|
Run: python3 timmy_automations/daily_run/weekly_narrative.py [--json]
|
|
Env: See timmy_automations/config/automations.json for configuration
|
|
|
|
Refs: #719
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from collections import Counter
|
|
from datetime import UTC, datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
# ── Configuration ─────────────────────────────────────────────────────────
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
|
CONFIG_PATH = Path(__file__).parent.parent / "config" / "automations.json"
|
|
|
|
DEFAULT_CONFIG = {
|
|
"gitea_api": "http://localhost:3000/api/v1",
|
|
"repo_slug": "rockachopa/Timmy-time-dashboard",
|
|
"token_file": "~/.hermes/gitea_token",
|
|
"lookback_days": 7,
|
|
"output_file": ".loop/weekly_narrative.json",
|
|
"enabled": True,
|
|
}
|
|
|
|
|
|
# ── Data Loading ───────────────────────────────────────────────────────────
|
|
|
|
|
|
def load_automation_config() -> dict:
|
|
"""Load configuration for weekly_narrative from automations manifest."""
|
|
config = DEFAULT_CONFIG.copy()
|
|
if CONFIG_PATH.exists():
|
|
try:
|
|
manifest = json.loads(CONFIG_PATH.read_text())
|
|
for auto in manifest.get("automations", []):
|
|
if auto.get("id") == "weekly_narrative":
|
|
config.update(auto.get("config", {}))
|
|
config["enabled"] = auto.get("enabled", True)
|
|
break
|
|
except (json.JSONDecodeError, OSError) as exc:
|
|
print(f"[weekly_narrative] Warning: Could not load config: {exc}", file=sys.stderr)
|
|
|
|
# Environment variable overrides
|
|
if os.environ.get("TIMMY_GITEA_API"):
|
|
config["gitea_api"] = os.environ.get("TIMMY_GITEA_API")
|
|
if os.environ.get("TIMMY_REPO_SLUG"):
|
|
config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG")
|
|
if os.environ.get("TIMMY_GITEA_TOKEN"):
|
|
config["token"] = os.environ.get("TIMMY_GITEA_TOKEN")
|
|
if os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED"):
|
|
config["enabled"] = os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED", "true").lower() == "true"
|
|
|
|
return config
|
|
|
|
|
|
def get_token(config: dict) -> str | None:
|
|
"""Get Gitea token from environment or file."""
|
|
if "token" in config:
|
|
return config["token"]
|
|
|
|
token_file = Path(config["token_file"]).expanduser()
|
|
if token_file.exists():
|
|
return token_file.read_text().strip()
|
|
|
|
return None
|
|
|
|
|
|
def load_jsonl(path: Path) -> list[dict]:
|
|
"""Load a JSONL file, skipping bad lines."""
|
|
if not path.exists():
|
|
return []
|
|
entries = []
|
|
for line in path.read_text().strip().splitlines():
|
|
try:
|
|
entries.append(json.loads(line))
|
|
except (json.JSONDecodeError, ValueError):
|
|
continue
|
|
return entries
|
|
|
|
|
|
def parse_ts(ts_str: str) -> datetime | None:
|
|
"""Parse an ISO timestamp, tolerating missing tz."""
|
|
if not ts_str:
|
|
return None
|
|
try:
|
|
dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=UTC)
|
|
return dt
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
# ── Gitea API Client ───────────────────────────────────────────────────────
|
|
|
|
|
|
class GiteaClient:
|
|
"""Simple Gitea API client with graceful degradation."""
|
|
|
|
def __init__(self, config: dict, token: str | None):
|
|
self.api_base = config["gitea_api"].rstrip("/")
|
|
self.repo_slug = config["repo_slug"]
|
|
self.token = token
|
|
self._available: bool | None = None
|
|
|
|
def _headers(self) -> dict:
|
|
headers = {"Accept": "application/json"}
|
|
if self.token:
|
|
headers["Authorization"] = f"token {self.token}"
|
|
return headers
|
|
|
|
def _api_url(self, path: str) -> str:
|
|
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if Gitea API is reachable."""
|
|
if self._available is not None:
|
|
return self._available
|
|
|
|
try:
|
|
req = Request(
|
|
f"{self.api_base}/version",
|
|
headers=self._headers(),
|
|
method="GET",
|
|
)
|
|
with urlopen(req, timeout=5) as resp:
|
|
self._available = resp.status == 200
|
|
return self._available
|
|
except (HTTPError, URLError, TimeoutError):
|
|
self._available = False
|
|
return False
|
|
|
|
def get_paginated(self, path: str, params: dict | None = None) -> list:
|
|
"""Fetch all pages of a paginated endpoint."""
|
|
all_items = []
|
|
page = 1
|
|
limit = 50
|
|
|
|
while True:
|
|
url = self._api_url(path)
|
|
query_parts = [f"limit={limit}", f"page={page}"]
|
|
if params:
|
|
for key, val in params.items():
|
|
query_parts.append(f"{key}={val}")
|
|
url = f"{url}?{'&'.join(query_parts)}"
|
|
|
|
req = Request(url, headers=self._headers(), method="GET")
|
|
with urlopen(req, timeout=15) as resp:
|
|
batch = json.loads(resp.read())
|
|
|
|
if not batch:
|
|
break
|
|
|
|
all_items.extend(batch)
|
|
if len(batch) < limit:
|
|
break
|
|
page += 1
|
|
|
|
return all_items
|
|
|
|
|
|
# ── Data Collection ────────────────────────────────────────────────────────
|
|
|
|
|
|
def collect_cycles_data(since: datetime) -> dict:
|
|
"""Load cycle retrospective data from the lookback period."""
|
|
cycles_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
|
|
if not cycles_file.exists():
|
|
return {"cycles": [], "total": 0, "successes": 0, "failures": 0}
|
|
|
|
entries = load_jsonl(cycles_file)
|
|
recent = []
|
|
for e in entries:
|
|
ts = parse_ts(e.get("timestamp", ""))
|
|
if ts and ts >= since:
|
|
recent.append(e)
|
|
|
|
successes = [e for e in recent if e.get("success")]
|
|
failures = [e for e in recent if not e.get("success")]
|
|
|
|
return {
|
|
"cycles": recent,
|
|
"total": len(recent),
|
|
"successes": len(successes),
|
|
"failures": len(failures),
|
|
"success_rate": round(len(successes) / len(recent), 2) if recent else 0,
|
|
}
|
|
|
|
|
|
def collect_issues_data(client: GiteaClient, since: datetime) -> dict:
|
|
"""Collect issue activity from Gitea."""
|
|
if not client.is_available():
|
|
return {"error": "Gitea unavailable", "issues": [], "closed": [], "opened": []}
|
|
|
|
try:
|
|
issues = client.get_paginated("issues", {"state": "all", "sort": "updated", "limit": 100})
|
|
except (HTTPError, URLError) as exc:
|
|
return {"error": str(exc), "issues": [], "closed": [], "opened": []}
|
|
|
|
touched = []
|
|
closed = []
|
|
opened = []
|
|
|
|
for issue in issues:
|
|
updated_at = issue.get("updated_at", "")
|
|
created_at = issue.get("created_at", "")
|
|
|
|
updated = parse_ts(updated_at)
|
|
created = parse_ts(created_at)
|
|
|
|
if updated and updated >= since:
|
|
touched.append(issue)
|
|
|
|
if issue.get("state") == "closed":
|
|
closed_at = issue.get("closed_at", "")
|
|
closed_dt = parse_ts(closed_at)
|
|
if closed_dt and closed_dt >= since:
|
|
closed.append(issue)
|
|
elif created and created >= since:
|
|
opened.append(issue)
|
|
|
|
return {
|
|
"issues": touched,
|
|
"closed": closed,
|
|
"opened": opened,
|
|
"touched_count": len(touched),
|
|
"closed_count": len(closed),
|
|
"opened_count": len(opened),
|
|
}
|
|
|
|
|
|
def collect_prs_data(client: GiteaClient, since: datetime) -> dict:
|
|
"""Collect PR activity from Gitea."""
|
|
if not client.is_available():
|
|
return {"error": "Gitea unavailable", "prs": [], "merged": [], "opened": []}
|
|
|
|
try:
|
|
prs = client.get_paginated("pulls", {"state": "all", "sort": "updated", "limit": 100})
|
|
except (HTTPError, URLError) as exc:
|
|
return {"error": str(exc), "prs": [], "merged": [], "opened": []}
|
|
|
|
touched = []
|
|
merged = []
|
|
opened = []
|
|
|
|
for pr in prs:
|
|
updated_at = pr.get("updated_at", "")
|
|
created_at = pr.get("created_at", "")
|
|
merged_at = pr.get("merged_at", "")
|
|
|
|
updated = parse_ts(updated_at)
|
|
created = parse_ts(created_at)
|
|
merged_dt = parse_ts(merged_at) if merged_at else None
|
|
|
|
if updated and updated >= since:
|
|
touched.append(pr)
|
|
|
|
if pr.get("merged") and merged_dt and merged_dt >= since:
|
|
merged.append(pr)
|
|
elif created and created >= since:
|
|
opened.append(pr)
|
|
|
|
return {
|
|
"prs": touched,
|
|
"merged": merged,
|
|
"opened": opened,
|
|
"touched_count": len(touched),
|
|
"merged_count": len(merged),
|
|
"opened_count": len(opened),
|
|
}
|
|
|
|
|
|
def collect_triage_data(since: datetime) -> dict:
|
|
"""Load triage and introspection data."""
|
|
triage_file = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
|
|
insights_file = REPO_ROOT / ".loop" / "retro" / "insights.json"
|
|
|
|
triage_entries = load_jsonl(triage_file)
|
|
recent_triage = [
|
|
e for e in triage_entries
|
|
if parse_ts(e.get("timestamp", "")) and parse_ts(e.get("timestamp", "")) >= since
|
|
]
|
|
|
|
insights = {}
|
|
if insights_file.exists():
|
|
try:
|
|
insights = json.loads(insights_file.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
|
|
return {
|
|
"triage_runs": len(recent_triage),
|
|
"triage_entries": recent_triage,
|
|
"latest_insights": insights,
|
|
}
|
|
|
|
|
|
def collect_token_data(since: datetime) -> dict:
|
|
"""Load token economy data from the lightning ledger."""
|
|
# The ledger is in-memory but we can look for any persisted data
|
|
# For now, return placeholder that will be filled by the ledger module
|
|
return {
|
|
"note": "Token economy data is ephemeral — check dashboard for live metrics",
|
|
"balance_sats": 0, # Placeholder
|
|
"transactions_week": 0,
|
|
}
|
|
|
|
|
|
# ── Analysis Functions ─────────────────────────────────────────────────────
|
|
|
|
|
|
def extract_themes(issues: list[dict]) -> list[dict]:
|
|
"""Extract themes from issue labels."""
|
|
label_counts = Counter()
|
|
layer_counts = Counter()
|
|
type_counts = Counter()
|
|
|
|
for issue in issues:
|
|
for label in issue.get("labels", []):
|
|
name = label.get("name", "")
|
|
label_counts[name] += 1
|
|
|
|
if name.startswith("layer:"):
|
|
layer_counts[name.replace("layer:", "")] += 1
|
|
if name in ("bug", "feature", "refactor", "docs", "test", "chore"):
|
|
type_counts[name] += 1
|
|
|
|
# Top themes (labels excluding layer prefixes)
|
|
themes = [
|
|
{"name": name, "count": count}
|
|
for name, count in label_counts.most_common(10)
|
|
if not name.startswith(("layer:", "size:"))
|
|
]
|
|
|
|
# Layers
|
|
layers = [
|
|
{"name": name, "count": count}
|
|
for name, count in layer_counts.most_common()
|
|
]
|
|
|
|
# Types
|
|
types = [
|
|
{"name": name, "count": count}
|
|
for name, count in type_counts.most_common()
|
|
]
|
|
|
|
return {
|
|
"top_labels": themes,
|
|
"layers": layers,
|
|
"types": types,
|
|
}
|
|
|
|
|
|
def extract_agent_contributions(issues: list[dict], prs: list[dict], cycles: list[dict]) -> dict:
|
|
"""Extract agent contribution patterns."""
|
|
# Count by assignee
|
|
assignee_counts = Counter()
|
|
for issue in issues:
|
|
assignee = issue.get("assignee")
|
|
if assignee and isinstance(assignee, dict):
|
|
assignee_counts[assignee.get("login", "unknown")] += 1
|
|
|
|
# Count PR authors
|
|
pr_authors = Counter()
|
|
for pr in prs:
|
|
user = pr.get("user")
|
|
if user and isinstance(user, dict):
|
|
pr_authors[user.get("login", "unknown")] += 1
|
|
|
|
# Check for Kimi mentions in cycle notes
|
|
kimi_mentions = sum(
|
|
1 for c in cycles
|
|
if "kimi" in c.get("notes", "").lower() or "kimi" in c.get("reason", "").lower()
|
|
)
|
|
|
|
return {
|
|
"active_assignees": [
|
|
{"login": login, "issues_count": count}
|
|
for login, count in assignee_counts.most_common()
|
|
],
|
|
"pr_authors": [
|
|
{"login": login, "prs_count": count}
|
|
for login, count in pr_authors.most_common()
|
|
],
|
|
"kimi_mentioned_cycles": kimi_mentions,
|
|
}
|
|
|
|
|
|
def analyze_test_shifts(cycles: list[dict]) -> dict:
|
|
"""Analyze shifts in test patterns."""
|
|
if not cycles:
|
|
return {"note": "No cycle data available"}
|
|
|
|
total_tests_passed = sum(c.get("tests_passed", 0) for c in cycles)
|
|
total_tests_added = sum(c.get("tests_added", 0) for c in cycles)
|
|
avg_tests_per_cycle = round(total_tests_passed / len(cycles), 1) if cycles else 0
|
|
|
|
# Look for test-related issues
|
|
test_focused = [
|
|
c for c in cycles
|
|
if c.get("type") == "test" or "test" in c.get("notes", "").lower()
|
|
]
|
|
|
|
return {
|
|
"total_tests_passed": total_tests_passed,
|
|
"total_tests_added": total_tests_added,
|
|
"avg_tests_per_cycle": avg_tests_per_cycle,
|
|
"test_focused_cycles": len(test_focused),
|
|
}
|
|
|
|
|
|
def analyze_triage_shifts(triage_data: dict) -> dict:
|
|
"""Analyze shifts in triage patterns."""
|
|
insights = triage_data.get("latest_insights", {})
|
|
recommendations = insights.get("recommendations", [])
|
|
|
|
high_priority_recs = [
|
|
r for r in recommendations
|
|
if r.get("severity") == "high"
|
|
]
|
|
|
|
return {
|
|
"triage_runs": triage_data.get("triage_runs", 0),
|
|
"insights_generated": insights.get("generated_at") is not None,
|
|
"high_priority_recommendations": len(high_priority_recs),
|
|
"recent_recommendations": recommendations[:3] if recommendations else [],
|
|
}
|
|
|
|
|
|
def generate_vibe_summary(
|
|
cycles_data: dict,
|
|
issues_data: dict,
|
|
prs_data: dict,
|
|
themes: dict,
|
|
agent_contrib: dict,
|
|
test_shifts: dict,
|
|
triage_shifts: dict,
|
|
) -> dict:
|
|
"""Generate the human-readable 'vibe' summary."""
|
|
# Determine overall vibe
|
|
success_rate = cycles_data.get("success_rate", 0)
|
|
failures = cycles_data.get("failures", 0)
|
|
closed_count = issues_data.get("closed_count", 0)
|
|
merged_count = prs_data.get("merged_count", 0)
|
|
|
|
if success_rate >= 0.9 and closed_count > 0:
|
|
vibe = "productive"
|
|
vibe_description = "A strong week with solid delivery and healthy success rates."
|
|
elif success_rate >= 0.7:
|
|
vibe = "steady"
|
|
vibe_description = "Steady progress with some bumps. Things are moving forward."
|
|
elif failures > cycles_data.get("successes", 0):
|
|
vibe = "struggling"
|
|
vibe_description = "A challenging week with more failures than successes. Time to regroup."
|
|
else:
|
|
vibe = "quiet"
|
|
vibe_description = "A lighter week with limited activity."
|
|
|
|
# Focus areas from themes
|
|
focus_areas = []
|
|
for layer in themes.get("layers", [])[:3]:
|
|
focus_areas.append(f"{layer['name']} ({layer['count']} items)")
|
|
|
|
# Agent activity summary
|
|
agent_summary = ""
|
|
active_assignees = agent_contrib.get("active_assignees", [])
|
|
if active_assignees:
|
|
top_agent = active_assignees[0]
|
|
agent_summary = f"{top_agent['login']} led with {top_agent['issues_count']} assigned issues."
|
|
|
|
# Notable events
|
|
notable = []
|
|
if merged_count > 5:
|
|
notable.append(f"{merged_count} PRs merged — high integration velocity")
|
|
if triage_shifts.get("high_priority_recommendations", 0) > 0:
|
|
notable.append("High-priority recommendations from loop introspection")
|
|
if test_shifts.get("test_focused_cycles", 0) > 3:
|
|
notable.append("Strong test coverage focus")
|
|
if not notable:
|
|
notable.append("Regular development flow")
|
|
|
|
return {
|
|
"overall": vibe,
|
|
"description": vibe_description,
|
|
"focus_areas": focus_areas,
|
|
"agent_summary": agent_summary,
|
|
"notable_events": notable,
|
|
}
|
|
|
|
|
|
# ── Narrative Generation ───────────────────────────────────────────────────
|
|
|
|
|
|
def generate_narrative(
|
|
cycles_data: dict,
|
|
issues_data: dict,
|
|
prs_data: dict,
|
|
triage_data: dict,
|
|
themes: dict,
|
|
agent_contrib: dict,
|
|
test_shifts: dict,
|
|
triage_shifts: dict,
|
|
token_data: dict,
|
|
since: datetime,
|
|
until: datetime,
|
|
) -> dict:
|
|
"""Generate the complete weekly narrative."""
|
|
vibe = generate_vibe_summary(
|
|
cycles_data, issues_data, prs_data, themes, agent_contrib, test_shifts, triage_shifts
|
|
)
|
|
|
|
return {
|
|
"generated_at": datetime.now(UTC).isoformat(),
|
|
"period": {
|
|
"start": since.isoformat(),
|
|
"end": until.isoformat(),
|
|
"days": 7,
|
|
},
|
|
"vibe": vibe,
|
|
"activity": {
|
|
"cycles": {
|
|
"total": cycles_data.get("total", 0),
|
|
"successes": cycles_data.get("successes", 0),
|
|
"failures": cycles_data.get("failures", 0),
|
|
"success_rate": cycles_data.get("success_rate", 0),
|
|
},
|
|
"issues": {
|
|
"touched": issues_data.get("touched_count", 0),
|
|
"closed": issues_data.get("closed_count", 0),
|
|
"opened": issues_data.get("opened_count", 0),
|
|
},
|
|
"pull_requests": {
|
|
"touched": prs_data.get("touched_count", 0),
|
|
"merged": prs_data.get("merged_count", 0),
|
|
"opened": prs_data.get("opened_count", 0),
|
|
},
|
|
},
|
|
"themes": themes,
|
|
"agents": agent_contrib,
|
|
"test_health": test_shifts,
|
|
"triage_health": triage_shifts,
|
|
"token_economy": token_data,
|
|
}
|
|
|
|
|
|
def generate_markdown_summary(narrative: dict) -> str:
|
|
"""Generate a human-readable markdown summary."""
|
|
vibe = narrative.get("vibe", {})
|
|
activity = narrative.get("activity", {})
|
|
cycles = activity.get("cycles", {})
|
|
issues = activity.get("issues", {})
|
|
prs = activity.get("pull_requests", {})
|
|
|
|
lines = [
|
|
"# Weekly Narrative Summary",
|
|
"",
|
|
f"**Period:** {narrative['period']['start'][:10]} to {narrative['period']['end'][:10]}",
|
|
f"**Vibe:** {vibe.get('overall', 'unknown').title()}",
|
|
"",
|
|
f"{vibe.get('description', '')}",
|
|
"",
|
|
"## Activity Highlights",
|
|
"",
|
|
f"- **Development Cycles:** {cycles.get('total', 0)} total ({cycles.get('successes', 0)} success, {cycles.get('failures', 0)} failure)",
|
|
f"- **Issues:** {issues.get('closed', 0)} closed, {issues.get('opened', 0)} opened",
|
|
f"- **Pull Requests:** {prs.get('merged', 0)} merged, {prs.get('opened', 0)} opened",
|
|
"",
|
|
]
|
|
|
|
# Focus areas
|
|
focus = vibe.get("focus_areas", [])
|
|
if focus:
|
|
lines.append("## Focus Areas")
|
|
lines.append("")
|
|
for area in focus:
|
|
lines.append(f"- {area}")
|
|
lines.append("")
|
|
|
|
# Agent contributions
|
|
agent_summary = vibe.get("agent_summary", "")
|
|
if agent_summary:
|
|
lines.append("## Agent Activity")
|
|
lines.append("")
|
|
lines.append(agent_summary)
|
|
lines.append("")
|
|
|
|
# Notable events
|
|
notable = vibe.get("notable_events", [])
|
|
if notable:
|
|
lines.append("## Notable Events")
|
|
lines.append("")
|
|
for event in notable:
|
|
lines.append(f"- {event}")
|
|
lines.append("")
|
|
|
|
# Triage health
|
|
triage = narrative.get("triage_health", {})
|
|
if triage.get("high_priority_recommendations", 0) > 0:
|
|
lines.append("## Triage Notes")
|
|
lines.append("")
|
|
lines.append(f"⚠️ {triage['high_priority_recommendations']} high-priority recommendation(s) from loop introspection.")
|
|
lines.append("")
|
|
for rec in triage.get("recent_recommendations", [])[:2]:
|
|
lines.append(f"- **{rec.get('category', 'general')}:** {rec.get('finding', '')}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ── Main ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
p = argparse.ArgumentParser(
|
|
description="Generate weekly narrative summary of work and vibes",
|
|
)
|
|
p.add_argument(
|
|
"--json", "-j",
|
|
action="store_true",
|
|
help="Output as JSON instead of markdown",
|
|
)
|
|
p.add_argument(
|
|
"--output", "-o",
|
|
type=str,
|
|
default=None,
|
|
help="Output file path (default from config)",
|
|
)
|
|
p.add_argument(
|
|
"--days",
|
|
type=int,
|
|
default=None,
|
|
help="Override lookback days (default 7)",
|
|
)
|
|
p.add_argument(
|
|
"--force",
|
|
action="store_true",
|
|
help="Run even if disabled in config",
|
|
)
|
|
return p.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
config = load_automation_config()
|
|
|
|
# Check if enabled
|
|
if not config.get("enabled", True) and not args.force:
|
|
print("[weekly_narrative] Skipped — weekly narrative is disabled in config")
|
|
print("[weekly_narrative] Use --force to run anyway")
|
|
return 0
|
|
|
|
# Determine lookback period
|
|
days = args.days if args.days is not None else config.get("lookback_days", 7)
|
|
until = datetime.now(UTC)
|
|
since = until - timedelta(days=days)
|
|
|
|
print(f"[weekly_narrative] Generating narrative for the past {days} days...")
|
|
|
|
# Setup Gitea client
|
|
token = get_token(config)
|
|
client = GiteaClient(config, token)
|
|
|
|
if not client.is_available():
|
|
print("[weekly_narrative] Warning: Gitea API unavailable — will use local data only")
|
|
|
|
# Collect data
|
|
cycles_data = collect_cycles_data(since)
|
|
issues_data = collect_issues_data(client, since)
|
|
prs_data = collect_prs_data(client, since)
|
|
triage_data = collect_triage_data(since)
|
|
token_data = collect_token_data(since)
|
|
|
|
# Analyze
|
|
themes = extract_themes(issues_data.get("issues", []))
|
|
agent_contrib = extract_agent_contributions(
|
|
issues_data.get("issues", []),
|
|
prs_data.get("prs", []),
|
|
cycles_data.get("cycles", []),
|
|
)
|
|
test_shifts = analyze_test_shifts(cycles_data.get("cycles", []))
|
|
triage_shifts = analyze_triage_shifts(triage_data)
|
|
|
|
# Generate narrative
|
|
narrative = generate_narrative(
|
|
cycles_data,
|
|
issues_data,
|
|
prs_data,
|
|
triage_data,
|
|
themes,
|
|
agent_contrib,
|
|
test_shifts,
|
|
triage_shifts,
|
|
token_data,
|
|
since,
|
|
until,
|
|
)
|
|
|
|
# Determine output path
|
|
output_path = args.output or config.get("output_file", ".loop/weekly_narrative.json")
|
|
output_file = REPO_ROOT / output_path
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Write JSON output
|
|
output_file.write_text(json.dumps(narrative, indent=2) + "\n")
|
|
|
|
# Write markdown summary alongside JSON
|
|
md_output_file = output_file.with_suffix(".md")
|
|
md_output_file.write_text(generate_markdown_summary(narrative))
|
|
|
|
# Print output
|
|
if args.json:
|
|
print(json.dumps(narrative, indent=2))
|
|
else:
|
|
print()
|
|
print(generate_markdown_summary(narrative))
|
|
|
|
print(f"\n[weekly_narrative] Written to: {output_file}")
|
|
print(f"[weekly_narrative] Markdown summary: {md_output_file}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|