Compare commits
7 Commits
kimi/issue
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
36de0b491d | ||
| e99b09f700 | |||
| 2ab6539564 | |||
| 28b8673584 | |||
| 2f15435fed | |||
| dfe40f5fe6 | |||
| 6dd48685e7 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -73,7 +73,6 @@ morning_briefing.txt
|
||||
markdown_report.md
|
||||
data/timmy_soul.jsonl
|
||||
scripts/migrate_to_zeroclaw.py
|
||||
src/infrastructure/db_pool.py
|
||||
workspace/
|
||||
|
||||
# Loop orchestration state
|
||||
|
||||
@@ -330,6 +330,13 @@ class Settings(BaseSettings):
|
||||
autoresearch_max_iterations: int = 100
|
||||
autoresearch_metric: str = "val_bpb" # metric to optimise (lower = better)
|
||||
|
||||
# ── Weekly Narrative Summary ───────────────────────────────────────
|
||||
# Generates a human-readable weekly summary of development activity.
|
||||
# Disabling this will stop the weekly narrative generation.
|
||||
weekly_narrative_enabled: bool = True
|
||||
weekly_narrative_lookback_days: int = 7
|
||||
weekly_narrative_output_dir: str = ".loop"
|
||||
|
||||
# ── Local Hands (Shell + Git) ──────────────────────────────────────
|
||||
# Enable local shell/git execution hands.
|
||||
hands_shell_enabled: bool = True
|
||||
|
||||
@@ -275,3 +275,54 @@ async def component_status():
|
||||
},
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
}
|
||||
|
||||
|
||||
@router.get("/health/snapshot")
|
||||
async def health_snapshot():
|
||||
"""Quick health snapshot before coding.
|
||||
|
||||
Returns a concise status summary including:
|
||||
- CI pipeline status (pass/fail/unknown)
|
||||
- Critical issues count (P0/P1)
|
||||
- Test flakiness rate
|
||||
- Token economy temperature
|
||||
|
||||
Fast execution (< 5 seconds) for pre-work checks.
|
||||
Refs: #710
|
||||
"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Import the health snapshot module
|
||||
snapshot_path = Path(settings.repo_root) / "timmy_automations" / "daily_run"
|
||||
if str(snapshot_path) not in sys.path:
|
||||
sys.path.insert(0, str(snapshot_path))
|
||||
|
||||
try:
|
||||
from health_snapshot import generate_snapshot, get_token, load_config
|
||||
|
||||
config = load_config()
|
||||
token = get_token(config)
|
||||
|
||||
# Run the health snapshot (in thread to avoid blocking)
|
||||
snapshot = await asyncio.to_thread(generate_snapshot, config, token)
|
||||
|
||||
return snapshot.to_dict()
|
||||
except Exception as exc:
|
||||
logger.warning("Health snapshot failed: %s", exc)
|
||||
# Return graceful fallback
|
||||
return {
|
||||
"timestamp": datetime.now(UTC).isoformat(),
|
||||
"overall_status": "unknown",
|
||||
"error": str(exc),
|
||||
"ci": {"status": "unknown", "message": "Snapshot failed"},
|
||||
"issues": {"count": 0, "p0_count": 0, "p1_count": 0, "issues": []},
|
||||
"flakiness": {
|
||||
"status": "unknown",
|
||||
"recent_failures": 0,
|
||||
"recent_cycles": 0,
|
||||
"failure_rate": 0.0,
|
||||
"message": "Snapshot failed",
|
||||
},
|
||||
"tokens": {"status": "unknown", "message": "Snapshot failed"},
|
||||
}
|
||||
|
||||
84
src/infrastructure/db_pool.py
Normal file
84
src/infrastructure/db_pool.py
Normal file
@@ -0,0 +1,84 @@
|
||||
"""Thread-local SQLite connection pool.
|
||||
|
||||
Provides a ConnectionPool class that manages SQLite connections per thread,
|
||||
with support for context managers and automatic cleanup.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import threading
|
||||
from collections.abc import Generator
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class ConnectionPool:
|
||||
"""Thread-local SQLite connection pool.
|
||||
|
||||
Each thread gets its own connection, which is reused for subsequent
|
||||
requests from the same thread. Connections are automatically cleaned
|
||||
up when close_connection() is called or the context manager exits.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: Path | str) -> None:
|
||||
"""Initialize the connection pool.
|
||||
|
||||
Args:
|
||||
db_path: Path to the SQLite database file.
|
||||
"""
|
||||
self._db_path = Path(db_path)
|
||||
self._local = threading.local()
|
||||
|
||||
def _ensure_db_exists(self) -> None:
|
||||
"""Ensure the database directory exists."""
|
||||
self._db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def get_connection(self) -> sqlite3.Connection:
|
||||
"""Get a connection for the current thread.
|
||||
|
||||
Creates a new connection if one doesn't exist for this thread,
|
||||
otherwise returns the existing connection.
|
||||
|
||||
Returns:
|
||||
A sqlite3 Connection object.
|
||||
"""
|
||||
if not hasattr(self._local, "conn") or self._local.conn is None:
|
||||
self._ensure_db_exists()
|
||||
self._local.conn = sqlite3.connect(str(self._db_path), check_same_thread=False)
|
||||
self._local.conn.row_factory = sqlite3.Row
|
||||
return self._local.conn
|
||||
|
||||
def close_connection(self) -> None:
|
||||
"""Close the connection for the current thread.
|
||||
|
||||
Cleans up the thread-local storage. Safe to call even if
|
||||
no connection exists for this thread.
|
||||
"""
|
||||
if hasattr(self._local, "conn") and self._local.conn is not None:
|
||||
self._local.conn.close()
|
||||
self._local.conn = None
|
||||
|
||||
@contextmanager
|
||||
def connection(self) -> Generator[sqlite3.Connection, None, None]:
|
||||
"""Context manager for getting and automatically closing a connection.
|
||||
|
||||
Yields:
|
||||
A sqlite3 Connection object.
|
||||
|
||||
Example:
|
||||
with pool.connection() as conn:
|
||||
cursor = conn.execute("SELECT 1")
|
||||
result = cursor.fetchone()
|
||||
"""
|
||||
conn = self.get_connection()
|
||||
try:
|
||||
yield conn
|
||||
finally:
|
||||
self.close_connection()
|
||||
|
||||
def close_all(self) -> None:
|
||||
"""Close all connections (useful for testing).
|
||||
|
||||
Note: This only closes the connection for the current thread.
|
||||
In a multi-threaded environment, each thread must close its own.
|
||||
"""
|
||||
self.close_connection()
|
||||
@@ -489,5 +489,43 @@ def focus(
|
||||
typer.echo("No active focus (broad mode).")
|
||||
|
||||
|
||||
@app.command(name="healthcheck")
|
||||
def healthcheck(
|
||||
json_output: bool = typer.Option(False, "--json", "-j", help="Output as JSON"),
|
||||
verbose: bool = typer.Option(
|
||||
False, "--verbose", "-v", help="Show verbose output including issue details"
|
||||
),
|
||||
quiet: bool = typer.Option(False, "--quiet", "-q", help="Only show status line (no details)"),
|
||||
):
|
||||
"""Quick health snapshot before coding.
|
||||
|
||||
Shows CI status, critical issues (P0/P1), test flakiness, and token economy.
|
||||
Fast execution (< 5 seconds) for pre-work checks.
|
||||
|
||||
Refs: #710
|
||||
"""
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
script_path = (
|
||||
Path(__file__).resolve().parent.parent.parent
|
||||
/ "timmy_automations"
|
||||
/ "daily_run"
|
||||
/ "health_snapshot.py"
|
||||
)
|
||||
|
||||
cmd = [sys.executable, str(script_path)]
|
||||
if json_output:
|
||||
cmd.append("--json")
|
||||
if verbose:
|
||||
cmd.append("--verbose")
|
||||
if quiet:
|
||||
cmd.append("--quiet")
|
||||
|
||||
result = subprocess.run(cmd)
|
||||
raise typer.Exit(result.returncode)
|
||||
|
||||
|
||||
def main():
|
||||
app()
|
||||
|
||||
@@ -13,11 +13,121 @@
|
||||
<div class="mood" id="mood-text">focused</div>
|
||||
</div>
|
||||
<div id="connection-dot"></div>
|
||||
<button id="info-btn" class="info-button" aria-label="About The Matrix" title="About The Matrix">
|
||||
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
|
||||
<circle cx="12" cy="12" r="10"></circle>
|
||||
<line x1="12" y1="16" x2="12" y2="12"></line>
|
||||
<line x1="12" y1="8" x2="12.01" y2="8"></line>
|
||||
</svg>
|
||||
</button>
|
||||
<button id="fund-btn" class="fund-button" aria-label="Fund Session" title="Fund Session">
|
||||
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
|
||||
<path d="M12 2v20M2 12h20"></path>
|
||||
</svg>
|
||||
</button>
|
||||
<div id="speech-area">
|
||||
<div class="bubble" id="speech-bubble"></div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Fund Session Modal -->
|
||||
<div id="fund-modal" class="fund-modal">
|
||||
<div class="fund-modal-content">
|
||||
<button id="fund-close" class="fund-close" aria-label="Close">
|
||||
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
|
||||
<line x1="18" y1="6" x2="6" y2="18"></line>
|
||||
<line x1="6" y1="6" x2="18" y2="18"></line>
|
||||
</svg>
|
||||
</button>
|
||||
<h2>Fund Session</h2>
|
||||
|
||||
<section class="fund-info">
|
||||
<h3>⚡ What are Sats?</h3>
|
||||
<p><strong>Sats</strong> (satoshis) are the smallest unit of Bitcoin—like cents to a dollar. There are 100 million sats in 1 bitcoin. They enable tiny payments perfect for AI interactions.</p>
|
||||
</section>
|
||||
|
||||
<section class="fund-info">
|
||||
<h3>🛠️ Why Fund Your Session?</h3>
|
||||
<p>Your sats power the Workshop AI agents. When you fund a session:</p>
|
||||
<ul>
|
||||
<li>Timmy and the agent swarm can process your requests</li>
|
||||
<li>You get priority access to compute resources</li>
|
||||
<li>Agents are compensated for their work</li>
|
||||
</ul>
|
||||
</section>
|
||||
|
||||
<section class="fund-info">
|
||||
<h3>💰 Approximate Costs</h3>
|
||||
<div class="cost-table">
|
||||
<div class="cost-row">
|
||||
<span>Simple chat message</span>
|
||||
<span class="cost-value">~10-50 sats</span>
|
||||
</div>
|
||||
<div class="cost-row">
|
||||
<span>Code generation task</span>
|
||||
<span class="cost-value">~100-500 sats</span>
|
||||
</div>
|
||||
<div class="cost-row">
|
||||
<span>Complex multi-agent job</span>
|
||||
<span class="cost-value">~1,000-5,000 sats</span>
|
||||
</div>
|
||||
</div>
|
||||
<p class="cost-note">Costs vary based on model and complexity. Unused sats remain in your balance.</p>
|
||||
</section>
|
||||
|
||||
<div class="fund-actions">
|
||||
<div class="fund-input-group">
|
||||
<label for="fund-amount">Amount (sats)</label>
|
||||
<input type="number" id="fund-amount" class="fund-input" placeholder="1000" min="100" step="100">
|
||||
</div>
|
||||
<button id="fund-submit" class="fund-submit-btn">Fund Session</button>
|
||||
</div>
|
||||
|
||||
<div class="fund-footer">
|
||||
<span>⚡ Lightning Network · No subscriptions · Pay as you go</span>
|
||||
</div>
|
||||
</div>
|
||||
<div id="fund-backdrop" class="fund-backdrop"></div>
|
||||
</div>
|
||||
|
||||
<!-- About Panel -->
|
||||
<div id="about-panel" class="about-panel">
|
||||
<div class="about-panel-content">
|
||||
<button id="about-close" class="about-close" aria-label="Close">
|
||||
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
|
||||
<line x1="18" y1="6" x2="6" y2="18"></line>
|
||||
<line x1="6" y1="6" x2="18" y2="18"></line>
|
||||
</svg>
|
||||
</button>
|
||||
<h2>Welcome to The Matrix</h2>
|
||||
|
||||
<section>
|
||||
<h3>🌌 The Matrix</h3>
|
||||
<p>The Matrix is a 3D visualization of Timmy's AI agent workspace. Enter the workshop to see Timmy at work—pondering the arcane arts of code, managing tasks, and orchestrating autonomous agents in real-time.</p>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h3>🛠️ The Workshop</h3>
|
||||
<p>The Workshop is where you interact directly with Timmy:</p>
|
||||
<ul>
|
||||
<li><strong>Submit Jobs</strong> — Create tasks, delegate work, and track progress</li>
|
||||
<li><strong>Chat with Agents</strong> — Converse with Timmy and his swarm of specialized agents</li>
|
||||
<li><strong>Fund Sessions</strong> — Power your work with satoshis via Lightning Network</li>
|
||||
</ul>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<h3>⚡ Lightning & Sats</h3>
|
||||
<p>The Matrix runs on Bitcoin. Sessions are funded with satoshis (sats) over the Lightning Network—enabling fast, cheap micropayments that keep Timmy energized and working for you. No subscriptions, no limits—pay as you go.</p>
|
||||
</section>
|
||||
|
||||
<div class="about-footer">
|
||||
<span>Sovereign AI · Soul on Bitcoin</span>
|
||||
</div>
|
||||
</div>
|
||||
<div id="about-backdrop" class="about-backdrop"></div>
|
||||
</div>
|
||||
|
||||
<script type="importmap">
|
||||
{
|
||||
"imports": {
|
||||
@@ -74,6 +184,81 @@
|
||||
});
|
||||
stateReader.connect();
|
||||
|
||||
// --- Fund Session Modal ---
|
||||
const fundBtn = document.getElementById("fund-btn");
|
||||
const fundModal = document.getElementById("fund-modal");
|
||||
const fundClose = document.getElementById("fund-close");
|
||||
const fundBackdrop = document.getElementById("fund-backdrop");
|
||||
const fundSubmit = document.getElementById("fund-submit");
|
||||
const fundAmount = document.getElementById("fund-amount");
|
||||
|
||||
function openFundModal() {
|
||||
fundModal.classList.add("open");
|
||||
document.body.style.overflow = "hidden";
|
||||
// Focus the input when opening
|
||||
setTimeout(() => fundAmount.focus(), 100);
|
||||
}
|
||||
|
||||
function closeFundModal() {
|
||||
fundModal.classList.remove("open");
|
||||
document.body.style.overflow = "";
|
||||
}
|
||||
|
||||
function handleFundSubmit() {
|
||||
const amount = parseInt(fundAmount.value, 10);
|
||||
if (!amount || amount < 100) {
|
||||
alert("Please enter a valid amount (minimum 100 sats)");
|
||||
return;
|
||||
}
|
||||
// TODO: Integrate with Lightning payment API
|
||||
console.log("Funding session with", amount, "sats");
|
||||
alert("Lightning payment integration coming soon! Amount: " + amount + " sats");
|
||||
closeFundModal();
|
||||
}
|
||||
|
||||
fundBtn.addEventListener("click", openFundModal);
|
||||
fundClose.addEventListener("click", closeFundModal);
|
||||
fundBackdrop.addEventListener("click", closeFundModal);
|
||||
fundSubmit.addEventListener("click", handleFundSubmit);
|
||||
|
||||
// Allow Enter key to submit
|
||||
fundAmount.addEventListener("keypress", (e) => {
|
||||
if (e.key === "Enter") {
|
||||
handleFundSubmit();
|
||||
}
|
||||
});
|
||||
|
||||
// --- About Panel ---
|
||||
const infoBtn = document.getElementById("info-btn");
|
||||
const aboutPanel = document.getElementById("about-panel");
|
||||
const aboutClose = document.getElementById("about-close");
|
||||
const aboutBackdrop = document.getElementById("about-backdrop");
|
||||
|
||||
function openAboutPanel() {
|
||||
aboutPanel.classList.add("open");
|
||||
document.body.style.overflow = "hidden";
|
||||
}
|
||||
|
||||
function closeAboutPanel() {
|
||||
aboutPanel.classList.remove("open");
|
||||
document.body.style.overflow = "";
|
||||
}
|
||||
|
||||
infoBtn.addEventListener("click", openAboutPanel);
|
||||
aboutClose.addEventListener("click", closeAboutPanel);
|
||||
aboutBackdrop.addEventListener("click", closeAboutPanel);
|
||||
|
||||
// Close on Escape key
|
||||
document.addEventListener("keydown", (e) => {
|
||||
if (e.key === "Escape") {
|
||||
if (fundModal.classList.contains("open")) {
|
||||
closeFundModal();
|
||||
} else if (aboutPanel.classList.contains("open")) {
|
||||
closeAboutPanel();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// --- Resize ---
|
||||
window.addEventListener("resize", () => {
|
||||
camera.aspect = window.innerWidth / window.innerHeight;
|
||||
|
||||
@@ -87,3 +87,490 @@ canvas {
|
||||
#connection-dot.connected {
|
||||
background: #00b450;
|
||||
}
|
||||
|
||||
/* Info button */
|
||||
.info-button {
|
||||
position: absolute;
|
||||
top: 14px;
|
||||
right: 70px;
|
||||
width: 28px;
|
||||
height: 28px;
|
||||
padding: 0;
|
||||
background: rgba(10, 10, 20, 0.7);
|
||||
border: 1px solid rgba(218, 165, 32, 0.4);
|
||||
border-radius: 50%;
|
||||
color: #daa520;
|
||||
cursor: pointer;
|
||||
pointer-events: auto;
|
||||
transition: all 0.2s ease;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
}
|
||||
|
||||
.info-button:hover {
|
||||
background: rgba(218, 165, 32, 0.15);
|
||||
border-color: rgba(218, 165, 32, 0.7);
|
||||
transform: scale(1.05);
|
||||
}
|
||||
|
||||
.info-button svg {
|
||||
width: 16px;
|
||||
height: 16px;
|
||||
}
|
||||
|
||||
/* Fund Session button */
|
||||
.fund-button {
|
||||
position: absolute;
|
||||
top: 14px;
|
||||
right: 36px;
|
||||
width: 28px;
|
||||
height: 28px;
|
||||
padding: 0;
|
||||
background: rgba(10, 10, 20, 0.7);
|
||||
border: 1px solid rgba(0, 180, 80, 0.4);
|
||||
border-radius: 50%;
|
||||
color: #00b450;
|
||||
cursor: pointer;
|
||||
pointer-events: auto;
|
||||
transition: all 0.2s ease;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
}
|
||||
|
||||
.fund-button:hover {
|
||||
background: rgba(0, 180, 80, 0.15);
|
||||
border-color: rgba(0, 180, 80, 0.7);
|
||||
transform: scale(1.05);
|
||||
}
|
||||
|
||||
.fund-button svg {
|
||||
width: 16px;
|
||||
height: 16px;
|
||||
}
|
||||
|
||||
/* Fund Session Modal */
|
||||
.fund-modal {
|
||||
position: fixed;
|
||||
top: 0;
|
||||
left: 0;
|
||||
width: 100%;
|
||||
height: 100%;
|
||||
z-index: 100;
|
||||
pointer-events: none;
|
||||
visibility: hidden;
|
||||
opacity: 0;
|
||||
transition: opacity 0.3s ease, visibility 0.3s ease;
|
||||
}
|
||||
|
||||
.fund-modal.open {
|
||||
pointer-events: auto;
|
||||
visibility: visible;
|
||||
opacity: 1;
|
||||
}
|
||||
|
||||
.fund-modal-content {
|
||||
position: absolute;
|
||||
top: 0;
|
||||
right: 0;
|
||||
width: 420px;
|
||||
max-width: 90%;
|
||||
height: 100%;
|
||||
background: rgba(10, 10, 20, 0.97);
|
||||
border-left: 1px solid rgba(0, 180, 80, 0.3);
|
||||
padding: 60px 24px 24px 24px;
|
||||
overflow-y: auto;
|
||||
transform: translateX(100%);
|
||||
transition: transform 0.3s ease;
|
||||
box-shadow: -4px 0 20px rgba(0, 0, 0, 0.5);
|
||||
}
|
||||
|
||||
.fund-modal.open .fund-modal-content {
|
||||
transform: translateX(0);
|
||||
}
|
||||
|
||||
.fund-close {
|
||||
position: absolute;
|
||||
top: 16px;
|
||||
right: 16px;
|
||||
width: 32px;
|
||||
height: 32px;
|
||||
padding: 0;
|
||||
background: transparent;
|
||||
border: 1px solid rgba(160, 160, 160, 0.3);
|
||||
border-radius: 50%;
|
||||
color: #aaa;
|
||||
cursor: pointer;
|
||||
transition: all 0.2s ease;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
}
|
||||
|
||||
.fund-close:hover {
|
||||
background: rgba(255, 255, 255, 0.1);
|
||||
border-color: rgba(0, 180, 80, 0.5);
|
||||
color: #00b450;
|
||||
}
|
||||
|
||||
.fund-close svg {
|
||||
width: 18px;
|
||||
height: 18px;
|
||||
}
|
||||
|
||||
.fund-modal-content h2 {
|
||||
font-size: 20px;
|
||||
color: #00b450;
|
||||
margin-bottom: 24px;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
.fund-info {
|
||||
margin-bottom: 24px;
|
||||
}
|
||||
|
||||
.fund-info h3 {
|
||||
font-size: 14px;
|
||||
color: #e0e0e0;
|
||||
margin-bottom: 10px;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
.fund-info p {
|
||||
font-size: 13px;
|
||||
line-height: 1.6;
|
||||
color: #aaa;
|
||||
margin-bottom: 10px;
|
||||
}
|
||||
|
||||
.fund-info ul {
|
||||
list-style: none;
|
||||
padding: 0;
|
||||
margin: 0;
|
||||
}
|
||||
|
||||
.fund-info li {
|
||||
font-size: 13px;
|
||||
line-height: 1.6;
|
||||
color: #aaa;
|
||||
margin-bottom: 8px;
|
||||
padding-left: 16px;
|
||||
position: relative;
|
||||
}
|
||||
|
||||
.fund-info li::before {
|
||||
content: "•";
|
||||
position: absolute;
|
||||
left: 0;
|
||||
color: #00b450;
|
||||
}
|
||||
|
||||
.fund-info li strong {
|
||||
color: #ccc;
|
||||
}
|
||||
|
||||
/* Cost table */
|
||||
.cost-table {
|
||||
background: rgba(0, 0, 0, 0.3);
|
||||
border: 1px solid rgba(0, 180, 80, 0.2);
|
||||
border-radius: 8px;
|
||||
padding: 12px;
|
||||
margin-bottom: 10px;
|
||||
}
|
||||
|
||||
.cost-row {
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
padding: 8px 0;
|
||||
border-bottom: 1px solid rgba(255, 255, 255, 0.1);
|
||||
font-size: 13px;
|
||||
}
|
||||
|
||||
.cost-row:last-child {
|
||||
border-bottom: none;
|
||||
}
|
||||
|
||||
.cost-row span:first-child {
|
||||
color: #aaa;
|
||||
}
|
||||
|
||||
.cost-value {
|
||||
color: #00b450;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
.cost-note {
|
||||
font-size: 12px;
|
||||
color: #666;
|
||||
font-style: italic;
|
||||
margin-top: 8px;
|
||||
}
|
||||
|
||||
/* Fund actions */
|
||||
.fund-actions {
|
||||
margin-top: 32px;
|
||||
padding: 20px;
|
||||
background: rgba(0, 0, 0, 0.3);
|
||||
border: 1px solid rgba(0, 180, 80, 0.2);
|
||||
border-radius: 8px;
|
||||
}
|
||||
|
||||
.fund-input-group {
|
||||
margin-bottom: 16px;
|
||||
}
|
||||
|
||||
.fund-input-group label {
|
||||
display: block;
|
||||
font-size: 13px;
|
||||
color: #ccc;
|
||||
margin-bottom: 6px;
|
||||
}
|
||||
|
||||
.fund-input {
|
||||
width: 100%;
|
||||
padding: 10px 12px;
|
||||
background: rgba(255, 255, 255, 0.05);
|
||||
border: 1px solid rgba(0, 180, 80, 0.3);
|
||||
border-radius: 6px;
|
||||
color: #e0e0e0;
|
||||
font-family: "Courier New", monospace;
|
||||
font-size: 16px;
|
||||
transition: all 0.2s ease;
|
||||
}
|
||||
|
||||
.fund-input:focus {
|
||||
outline: none;
|
||||
border-color: rgba(0, 180, 80, 0.6);
|
||||
background: rgba(255, 255, 255, 0.08);
|
||||
}
|
||||
|
||||
.fund-input::placeholder {
|
||||
color: #666;
|
||||
}
|
||||
|
||||
.fund-submit-btn {
|
||||
width: 100%;
|
||||
padding: 12px;
|
||||
background: linear-gradient(135deg, rgba(0, 180, 80, 0.8), rgba(0, 140, 60, 0.9));
|
||||
border: none;
|
||||
border-radius: 6px;
|
||||
color: #fff;
|
||||
font-family: "Courier New", monospace;
|
||||
font-size: 14px;
|
||||
font-weight: 600;
|
||||
cursor: pointer;
|
||||
transition: all 0.2s ease;
|
||||
text-transform: uppercase;
|
||||
letter-spacing: 0.5px;
|
||||
}
|
||||
|
||||
.fund-submit-btn:hover {
|
||||
background: linear-gradient(135deg, rgba(0, 200, 90, 0.9), rgba(0, 160, 70, 1));
|
||||
transform: translateY(-1px);
|
||||
box-shadow: 0 4px 12px rgba(0, 180, 80, 0.3);
|
||||
}
|
||||
|
||||
.fund-submit-btn:active {
|
||||
transform: translateY(0);
|
||||
}
|
||||
|
||||
.fund-footer {
|
||||
margin-top: 24px;
|
||||
padding-top: 16px;
|
||||
border-top: 1px solid rgba(160, 160, 160, 0.2);
|
||||
font-size: 12px;
|
||||
color: #666;
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
.fund-backdrop {
|
||||
position: absolute;
|
||||
top: 0;
|
||||
left: 0;
|
||||
width: 100%;
|
||||
height: 100%;
|
||||
background: rgba(0, 0, 0, 0.5);
|
||||
opacity: 0;
|
||||
transition: opacity 0.3s ease;
|
||||
}
|
||||
|
||||
.fund-modal.open .fund-backdrop {
|
||||
opacity: 1;
|
||||
}
|
||||
|
||||
/* About Panel */
|
||||
.about-panel {
|
||||
position: fixed;
|
||||
top: 0;
|
||||
right: 0;
|
||||
width: 100%;
|
||||
height: 100%;
|
||||
z-index: 100;
|
||||
pointer-events: none;
|
||||
visibility: hidden;
|
||||
opacity: 0;
|
||||
transition: opacity 0.3s ease, visibility 0.3s ease;
|
||||
}
|
||||
|
||||
.about-panel.open {
|
||||
pointer-events: auto;
|
||||
visibility: visible;
|
||||
opacity: 1;
|
||||
}
|
||||
|
||||
.about-panel-content {
|
||||
position: absolute;
|
||||
top: 0;
|
||||
right: 0;
|
||||
width: 380px;
|
||||
max-width: 90%;
|
||||
height: 100%;
|
||||
background: rgba(10, 10, 20, 0.97);
|
||||
border-left: 1px solid rgba(218, 165, 32, 0.3);
|
||||
padding: 60px 24px 24px 24px;
|
||||
overflow-y: auto;
|
||||
transform: translateX(100%);
|
||||
transition: transform 0.3s ease;
|
||||
box-shadow: -4px 0 20px rgba(0, 0, 0, 0.5);
|
||||
}
|
||||
|
||||
.about-panel.open .about-panel-content {
|
||||
transform: translateX(0);
|
||||
}
|
||||
|
||||
.about-close {
|
||||
position: absolute;
|
||||
top: 16px;
|
||||
right: 16px;
|
||||
width: 32px;
|
||||
height: 32px;
|
||||
padding: 0;
|
||||
background: transparent;
|
||||
border: 1px solid rgba(160, 160, 160, 0.3);
|
||||
border-radius: 50%;
|
||||
color: #aaa;
|
||||
cursor: pointer;
|
||||
transition: all 0.2s ease;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
}
|
||||
|
||||
.about-close:hover {
|
||||
background: rgba(255, 255, 255, 0.1);
|
||||
border-color: rgba(218, 165, 32, 0.5);
|
||||
color: #daa520;
|
||||
}
|
||||
|
||||
.about-close svg {
|
||||
width: 18px;
|
||||
height: 18px;
|
||||
}
|
||||
|
||||
.about-panel-content h2 {
|
||||
font-size: 20px;
|
||||
color: #daa520;
|
||||
margin-bottom: 24px;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
.about-panel-content section {
|
||||
margin-bottom: 24px;
|
||||
}
|
||||
|
||||
.about-panel-content h3 {
|
||||
font-size: 14px;
|
||||
color: #e0e0e0;
|
||||
margin-bottom: 10px;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
.about-panel-content p {
|
||||
font-size: 13px;
|
||||
line-height: 1.6;
|
||||
color: #aaa;
|
||||
margin-bottom: 10px;
|
||||
}
|
||||
|
||||
.about-panel-content ul {
|
||||
list-style: none;
|
||||
padding: 0;
|
||||
margin: 0;
|
||||
}
|
||||
|
||||
.about-panel-content li {
|
||||
font-size: 13px;
|
||||
line-height: 1.6;
|
||||
color: #aaa;
|
||||
margin-bottom: 8px;
|
||||
padding-left: 16px;
|
||||
position: relative;
|
||||
}
|
||||
|
||||
.about-panel-content li::before {
|
||||
content: "•";
|
||||
position: absolute;
|
||||
left: 0;
|
||||
color: #daa520;
|
||||
}
|
||||
|
||||
.about-panel-content li strong {
|
||||
color: #ccc;
|
||||
}
|
||||
|
||||
.about-footer {
|
||||
margin-top: 32px;
|
||||
padding-top: 16px;
|
||||
border-top: 1px solid rgba(160, 160, 160, 0.2);
|
||||
font-size: 12px;
|
||||
color: #666;
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
.about-backdrop {
|
||||
position: absolute;
|
||||
top: 0;
|
||||
left: 0;
|
||||
width: 100%;
|
||||
height: 100%;
|
||||
background: rgba(0, 0, 0, 0.5);
|
||||
opacity: 0;
|
||||
transition: opacity 0.3s ease;
|
||||
}
|
||||
|
||||
.about-panel.open .about-backdrop {
|
||||
opacity: 1;
|
||||
}
|
||||
|
||||
/* Mobile adjustments */
|
||||
@media (max-width: 480px) {
|
||||
.about-panel-content,
|
||||
.fund-modal-content {
|
||||
width: 100%;
|
||||
max-width: 100%;
|
||||
padding: 56px 20px 20px 20px;
|
||||
}
|
||||
|
||||
.info-button {
|
||||
right: 66px;
|
||||
width: 26px;
|
||||
height: 26px;
|
||||
}
|
||||
|
||||
.info-button svg {
|
||||
width: 14px;
|
||||
height: 14px;
|
||||
}
|
||||
|
||||
.fund-button {
|
||||
right: 32px;
|
||||
width: 26px;
|
||||
height: 26px;
|
||||
}
|
||||
|
||||
.fund-button svg {
|
||||
width: 14px;
|
||||
height: 14px;
|
||||
}
|
||||
}
|
||||
|
||||
288
tests/infrastructure/test_db_pool.py
Normal file
288
tests/infrastructure/test_db_pool.py
Normal file
@@ -0,0 +1,288 @@
|
||||
"""Tests for infrastructure.db_pool module."""
|
||||
|
||||
import sqlite3
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.db_pool import ConnectionPool
|
||||
|
||||
|
||||
class TestConnectionPoolInit:
|
||||
"""Test ConnectionPool initialization."""
|
||||
|
||||
def test_init_with_string_path(self, tmp_path):
|
||||
"""Pool can be initialized with a string path."""
|
||||
db_path = str(tmp_path / "test.db")
|
||||
pool = ConnectionPool(db_path)
|
||||
assert pool._db_path == Path(db_path)
|
||||
|
||||
def test_init_with_path_object(self, tmp_path):
|
||||
"""Pool can be initialized with a Path object."""
|
||||
db_path = tmp_path / "test.db"
|
||||
pool = ConnectionPool(db_path)
|
||||
assert pool._db_path == db_path
|
||||
|
||||
def test_init_creates_thread_local(self, tmp_path):
|
||||
"""Pool initializes thread-local storage."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
assert hasattr(pool, "_local")
|
||||
assert isinstance(pool._local, threading.local)
|
||||
|
||||
|
||||
class TestGetConnection:
|
||||
"""Test get_connection() method."""
|
||||
|
||||
def test_get_connection_returns_valid_sqlite3_connection(self, tmp_path):
|
||||
"""get_connection() returns a valid sqlite3 connection."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
conn = pool.get_connection()
|
||||
assert isinstance(conn, sqlite3.Connection)
|
||||
# Verify it's a working connection
|
||||
cursor = conn.execute("SELECT 1")
|
||||
assert cursor.fetchone()[0] == 1
|
||||
|
||||
def test_get_connection_creates_db_file(self, tmp_path):
|
||||
"""get_connection() creates the database file if it doesn't exist."""
|
||||
db_path = tmp_path / "subdir" / "test.db"
|
||||
assert not db_path.exists()
|
||||
pool = ConnectionPool(db_path)
|
||||
pool.get_connection()
|
||||
assert db_path.exists()
|
||||
|
||||
def test_get_connection_sets_row_factory(self, tmp_path):
|
||||
"""get_connection() sets row_factory to sqlite3.Row."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
conn = pool.get_connection()
|
||||
assert conn.row_factory is sqlite3.Row
|
||||
|
||||
def test_multiple_calls_same_thread_reuse_connection(self, tmp_path):
|
||||
"""Multiple calls from same thread reuse the same connection."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
conn1 = pool.get_connection()
|
||||
conn2 = pool.get_connection()
|
||||
assert conn1 is conn2
|
||||
|
||||
def test_different_threads_get_different_connections(self, tmp_path):
|
||||
"""Different threads get different connections."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
connections = []
|
||||
|
||||
def get_conn():
|
||||
connections.append(pool.get_connection())
|
||||
|
||||
t1 = threading.Thread(target=get_conn)
|
||||
t2 = threading.Thread(target=get_conn)
|
||||
t1.start()
|
||||
t2.start()
|
||||
t1.join()
|
||||
t2.join()
|
||||
|
||||
assert len(connections) == 2
|
||||
assert connections[0] is not connections[1]
|
||||
|
||||
|
||||
class TestCloseConnection:
|
||||
"""Test close_connection() method."""
|
||||
|
||||
def test_close_connection_closes_sqlite_connection(self, tmp_path):
|
||||
"""close_connection() closes the underlying sqlite connection."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
conn = pool.get_connection()
|
||||
pool.close_connection()
|
||||
# Connection should be closed
|
||||
with pytest.raises(sqlite3.ProgrammingError):
|
||||
conn.execute("SELECT 1")
|
||||
|
||||
def test_close_connection_cleans_up_thread_local(self, tmp_path):
|
||||
"""close_connection() cleans up thread-local storage."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
pool.get_connection()
|
||||
assert hasattr(pool._local, "conn")
|
||||
assert pool._local.conn is not None
|
||||
|
||||
pool.close_connection()
|
||||
|
||||
# Should either not have the attr or it should be None
|
||||
assert not hasattr(pool._local, "conn") or pool._local.conn is None
|
||||
|
||||
def test_close_connection_without_getting_connection_is_safe(self, tmp_path):
|
||||
"""close_connection() is safe to call even without getting a connection first."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
# Should not raise
|
||||
pool.close_connection()
|
||||
|
||||
def test_close_connection_multiple_calls_is_safe(self, tmp_path):
|
||||
"""close_connection() can be called multiple times safely."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
pool.get_connection()
|
||||
pool.close_connection()
|
||||
# Should not raise
|
||||
pool.close_connection()
|
||||
|
||||
|
||||
class TestContextManager:
|
||||
"""Test the connection() context manager."""
|
||||
|
||||
def test_connection_yields_valid_connection(self, tmp_path):
|
||||
"""connection() context manager yields a valid sqlite3 connection."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
with pool.connection() as conn:
|
||||
assert isinstance(conn, sqlite3.Connection)
|
||||
cursor = conn.execute("SELECT 42")
|
||||
assert cursor.fetchone()[0] == 42
|
||||
|
||||
def test_connection_closes_on_exit(self, tmp_path):
|
||||
"""connection() context manager closes connection on exit."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
with pool.connection() as conn:
|
||||
pass
|
||||
# Connection should be closed after context exit
|
||||
with pytest.raises(sqlite3.ProgrammingError):
|
||||
conn.execute("SELECT 1")
|
||||
|
||||
def test_connection_closes_on_exception(self, tmp_path):
|
||||
"""connection() context manager closes connection even on exception."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
conn_ref = None
|
||||
try:
|
||||
with pool.connection() as conn:
|
||||
conn_ref = conn
|
||||
raise ValueError("Test exception")
|
||||
except ValueError:
|
||||
pass
|
||||
# Connection should still be closed
|
||||
with pytest.raises(sqlite3.ProgrammingError):
|
||||
conn_ref.execute("SELECT 1")
|
||||
|
||||
def test_connection_context_manager_is_reusable(self, tmp_path):
|
||||
"""connection() context manager can be used multiple times."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
|
||||
with pool.connection() as conn1:
|
||||
result1 = conn1.execute("SELECT 1").fetchone()[0]
|
||||
|
||||
with pool.connection() as conn2:
|
||||
result2 = conn2.execute("SELECT 2").fetchone()[0]
|
||||
|
||||
assert result1 == 1
|
||||
assert result2 == 2
|
||||
|
||||
|
||||
class TestThreadSafety:
|
||||
"""Test thread-safety of the connection pool."""
|
||||
|
||||
def test_concurrent_access(self, tmp_path):
|
||||
"""Multiple threads can use the pool concurrently."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def worker(worker_id):
|
||||
try:
|
||||
with pool.connection() as conn:
|
||||
conn.execute("CREATE TABLE IF NOT EXISTS test (id INTEGER)")
|
||||
conn.execute("INSERT INTO test VALUES (?)", (worker_id,))
|
||||
conn.commit()
|
||||
time.sleep(0.01) # Small delay to increase contention
|
||||
results.append(worker_id)
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert len(errors) == 0, f"Errors occurred: {errors}"
|
||||
assert len(results) == 5
|
||||
|
||||
def test_thread_isolation(self, tmp_path):
|
||||
"""Each thread has isolated connections (verified by thread-local data)."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
results = []
|
||||
|
||||
def worker(worker_id):
|
||||
# Get connection and write worker-specific data
|
||||
conn = pool.get_connection()
|
||||
conn.execute("CREATE TABLE IF NOT EXISTS isolation_test (thread_id INTEGER)")
|
||||
conn.execute("DELETE FROM isolation_test") # Clear previous data
|
||||
conn.execute("INSERT INTO isolation_test VALUES (?)", (worker_id,))
|
||||
conn.commit()
|
||||
# Read back the data
|
||||
result = conn.execute("SELECT thread_id FROM isolation_test").fetchone()[0]
|
||||
results.append((worker_id, result))
|
||||
pool.close_connection()
|
||||
|
||||
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# Each thread should have written and read its own ID
|
||||
assert len(results) == 3
|
||||
for worker_id, read_id in results:
|
||||
assert worker_id == read_id, f"Thread {worker_id} read {read_id} instead"
|
||||
|
||||
|
||||
class TestCloseAll:
|
||||
"""Test close_all() method."""
|
||||
|
||||
def test_close_all_closes_current_thread_connection(self, tmp_path):
|
||||
"""close_all() closes the connection for the current thread."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
conn = pool.get_connection()
|
||||
pool.close_all()
|
||||
# Connection should be closed
|
||||
with pytest.raises(sqlite3.ProgrammingError):
|
||||
conn.execute("SELECT 1")
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests for real-world usage patterns."""
|
||||
|
||||
def test_basic_crud_operations(self, tmp_path):
|
||||
"""Can perform basic CRUD operations through the pool."""
|
||||
pool = ConnectionPool(tmp_path / "test.db")
|
||||
|
||||
with pool.connection() as conn:
|
||||
# Create table
|
||||
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
|
||||
# Insert
|
||||
conn.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
|
||||
conn.execute("INSERT INTO users (name) VALUES (?)", ("Bob",))
|
||||
conn.commit()
|
||||
# Query
|
||||
cursor = conn.execute("SELECT * FROM users ORDER BY id")
|
||||
rows = cursor.fetchall()
|
||||
assert len(rows) == 2
|
||||
assert rows[0]["name"] == "Alice"
|
||||
assert rows[1]["name"] == "Bob"
|
||||
|
||||
def test_multiple_pools_different_databases(self, tmp_path):
|
||||
"""Multiple pools can manage different databases independently."""
|
||||
pool1 = ConnectionPool(tmp_path / "db1.db")
|
||||
pool2 = ConnectionPool(tmp_path / "db2.db")
|
||||
|
||||
with pool1.connection() as conn1:
|
||||
conn1.execute("CREATE TABLE test (val INTEGER)")
|
||||
conn1.execute("INSERT INTO test VALUES (1)")
|
||||
conn1.commit()
|
||||
|
||||
with pool2.connection() as conn2:
|
||||
conn2.execute("CREATE TABLE test (val INTEGER)")
|
||||
conn2.execute("INSERT INTO test VALUES (2)")
|
||||
conn2.commit()
|
||||
|
||||
# Verify isolation
|
||||
with pool1.connection() as conn1:
|
||||
result = conn1.execute("SELECT val FROM test").fetchone()[0]
|
||||
assert result == 1
|
||||
|
||||
with pool2.connection() as conn2:
|
||||
result = conn2.execute("SELECT val FROM test").fetchone()[0]
|
||||
assert result == 2
|
||||
@@ -130,6 +130,13 @@ class TestAPIEndpoints:
|
||||
r = client.get("/health/sovereignty")
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_health_snapshot(self, client):
|
||||
r = client.get("/health/snapshot")
|
||||
assert r.status_code == 200
|
||||
data = r.json()
|
||||
assert "overall_status" in data
|
||||
assert data["overall_status"] in ["green", "yellow", "red", "unknown"]
|
||||
|
||||
def test_queue_status(self, client):
|
||||
r = client.get("/api/queue/status")
|
||||
assert r.status_code == 200
|
||||
@@ -186,6 +193,7 @@ class TestNo500:
|
||||
"/health",
|
||||
"/health/status",
|
||||
"/health/sovereignty",
|
||||
"/health/snapshot",
|
||||
"/health/components",
|
||||
"/agents/default/panel",
|
||||
"/agents/default/history",
|
||||
|
||||
280
tests/timmy/test_voice_tts_unit.py
Normal file
280
tests/timmy/test_voice_tts_unit.py
Normal file
@@ -0,0 +1,280 @@
|
||||
"""Unit tests for timmy_serve.voice_tts.
|
||||
|
||||
Mocks pyttsx3 so tests run without audio hardware.
|
||||
"""
|
||||
|
||||
import threading
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
class TestVoiceTTSInit:
|
||||
"""Test VoiceTTS initialization with/without pyttsx3."""
|
||||
|
||||
def test_init_success(self):
|
||||
"""When pyttsx3 is available, engine initializes with given rate/volume."""
|
||||
mock_pyttsx3 = MagicMock()
|
||||
mock_engine = MagicMock()
|
||||
mock_pyttsx3.init.return_value = mock_engine
|
||||
|
||||
with patch.dict("sys.modules", {"pyttsx3": mock_pyttsx3}):
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS(rate=200, volume=0.8)
|
||||
assert tts.available is True
|
||||
assert tts._rate == 200
|
||||
assert tts._volume == 0.8
|
||||
mock_engine.setProperty.assert_any_call("rate", 200)
|
||||
mock_engine.setProperty.assert_any_call("volume", 0.8)
|
||||
|
||||
def test_init_import_failure(self):
|
||||
"""When pyttsx3 import fails, VoiceTTS degrades gracefully."""
|
||||
with patch.dict("sys.modules", {"pyttsx3": None}):
|
||||
# Force reimport by clearing cache
|
||||
import sys
|
||||
|
||||
modules_to_clear = [k for k in sys.modules.keys() if "voice_tts" in k]
|
||||
for mod in modules_to_clear:
|
||||
del sys.modules[mod]
|
||||
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS()
|
||||
assert tts.available is False
|
||||
assert tts._engine is None
|
||||
|
||||
|
||||
class TestVoiceTTSSpeak:
|
||||
"""Test VoiceTTS speak methods."""
|
||||
|
||||
def test_speak_skips_when_not_available(self):
|
||||
"""speak() should skip gracefully when TTS is not available."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._engine = None
|
||||
tts._available = False
|
||||
tts._lock = threading.Lock()
|
||||
|
||||
# Should not raise
|
||||
tts.speak("hello world")
|
||||
|
||||
def test_speak_sync_skips_when_not_available(self):
|
||||
"""speak_sync() should skip gracefully when TTS is not available."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._engine = None
|
||||
tts._available = False
|
||||
tts._lock = threading.Lock()
|
||||
|
||||
# Should not raise
|
||||
tts.speak_sync("hello world")
|
||||
|
||||
def test_speak_runs_in_background_thread(self):
|
||||
"""speak() should run speech in a background thread."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._engine = MagicMock()
|
||||
tts._available = True
|
||||
tts._lock = threading.Lock()
|
||||
|
||||
captured_threads = []
|
||||
original_thread = threading.Thread
|
||||
|
||||
def capture_thread(*args, **kwargs):
|
||||
t = original_thread(*args, **kwargs)
|
||||
captured_threads.append(t)
|
||||
return t
|
||||
|
||||
with patch.object(threading, "Thread", side_effect=capture_thread):
|
||||
tts.speak("test message")
|
||||
# Wait for threads to complete
|
||||
for t in captured_threads:
|
||||
t.join(timeout=1)
|
||||
|
||||
tts._engine.say.assert_called_with("test message")
|
||||
tts._engine.runAndWait.assert_called_once()
|
||||
|
||||
|
||||
class TestVoiceTTSProperties:
|
||||
"""Test VoiceTTS property setters."""
|
||||
|
||||
def test_set_rate_updates_property(self):
|
||||
"""set_rate() updates internal rate and engine property."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._engine = MagicMock()
|
||||
tts._rate = 175
|
||||
|
||||
tts.set_rate(220)
|
||||
assert tts._rate == 220
|
||||
tts._engine.setProperty.assert_called_with("rate", 220)
|
||||
|
||||
def test_set_rate_without_engine(self):
|
||||
"""set_rate() updates internal rate even when engine is None."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._engine = None
|
||||
tts._rate = 175
|
||||
|
||||
tts.set_rate(220)
|
||||
assert tts._rate == 220
|
||||
|
||||
def test_set_volume_clamped_to_max(self):
|
||||
"""set_volume() clamps volume to maximum of 1.0."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._engine = MagicMock()
|
||||
tts._volume = 0.9
|
||||
|
||||
tts.set_volume(1.5)
|
||||
assert tts._volume == 1.0
|
||||
tts._engine.setProperty.assert_called_with("volume", 1.0)
|
||||
|
||||
def test_set_volume_clamped_to_min(self):
|
||||
"""set_volume() clamps volume to minimum of 0.0."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._engine = MagicMock()
|
||||
tts._volume = 0.9
|
||||
|
||||
tts.set_volume(-0.5)
|
||||
assert tts._volume == 0.0
|
||||
tts._engine.setProperty.assert_called_with("volume", 0.0)
|
||||
|
||||
def test_set_volume_within_range(self):
|
||||
"""set_volume() accepts values within 0.0-1.0 range."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._engine = MagicMock()
|
||||
tts._volume = 0.9
|
||||
|
||||
tts.set_volume(0.5)
|
||||
assert tts._volume == 0.5
|
||||
tts._engine.setProperty.assert_called_with("volume", 0.5)
|
||||
|
||||
|
||||
class TestVoiceTTSGetVoices:
|
||||
"""Test VoiceTTS get_voices() method."""
|
||||
|
||||
def test_get_voices_returns_empty_list_when_no_engine(self):
|
||||
"""get_voices() returns empty list when engine is None."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._engine = None
|
||||
|
||||
result = tts.get_voices()
|
||||
assert result == []
|
||||
|
||||
def test_get_voices_returns_formatted_voice_list(self):
|
||||
"""get_voices() returns list of voice dicts with id, name, languages."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
|
||||
mock_voice1 = MagicMock()
|
||||
mock_voice1.id = "com.apple.voice.compact.en-US.Samantha"
|
||||
mock_voice1.name = "Samantha"
|
||||
mock_voice1.languages = ["en-US"]
|
||||
|
||||
mock_voice2 = MagicMock()
|
||||
mock_voice2.id = "com.apple.voice.compact.en-GB.Daniel"
|
||||
mock_voice2.name = "Daniel"
|
||||
mock_voice2.languages = ["en-GB"]
|
||||
|
||||
tts._engine = MagicMock()
|
||||
tts._engine.getProperty.return_value = [mock_voice1, mock_voice2]
|
||||
|
||||
voices = tts.get_voices()
|
||||
assert len(voices) == 2
|
||||
assert voices[0]["id"] == "com.apple.voice.compact.en-US.Samantha"
|
||||
assert voices[0]["name"] == "Samantha"
|
||||
assert voices[0]["languages"] == ["en-US"]
|
||||
assert voices[1]["id"] == "com.apple.voice.compact.en-GB.Daniel"
|
||||
assert voices[1]["name"] == "Daniel"
|
||||
assert voices[1]["languages"] == ["en-GB"]
|
||||
|
||||
def test_get_voices_handles_missing_languages_attr(self):
|
||||
"""get_voices() handles voices without languages attribute."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
|
||||
mock_voice = MagicMock()
|
||||
mock_voice.id = "voice1"
|
||||
mock_voice.name = "Default Voice"
|
||||
# No languages attribute
|
||||
del mock_voice.languages
|
||||
|
||||
tts._engine = MagicMock()
|
||||
tts._engine.getProperty.return_value = [mock_voice]
|
||||
|
||||
voices = tts.get_voices()
|
||||
assert len(voices) == 1
|
||||
assert voices[0]["languages"] == []
|
||||
|
||||
def test_get_voices_handles_exception(self):
|
||||
"""get_voices() returns empty list on exception."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._engine = MagicMock()
|
||||
tts._engine.getProperty.side_effect = RuntimeError("engine error")
|
||||
|
||||
result = tts.get_voices()
|
||||
assert result == []
|
||||
|
||||
|
||||
class TestVoiceTTSSetVoice:
|
||||
"""Test VoiceTTS set_voice() method."""
|
||||
|
||||
def test_set_voice_updates_property(self):
|
||||
"""set_voice() updates engine voice property when engine exists."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._engine = MagicMock()
|
||||
|
||||
tts.set_voice("com.apple.voice.compact.en-US.Samantha")
|
||||
tts._engine.setProperty.assert_called_with(
|
||||
"voice", "com.apple.voice.compact.en-US.Samantha"
|
||||
)
|
||||
|
||||
def test_set_voice_skips_when_no_engine(self):
|
||||
"""set_voice() does nothing when engine is None."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._engine = None
|
||||
|
||||
# Should not raise
|
||||
tts.set_voice("some_voice_id")
|
||||
|
||||
|
||||
class TestVoiceTTSAvailableProperty:
|
||||
"""Test VoiceTTS available property."""
|
||||
|
||||
def test_available_returns_true_when_initialized(self):
|
||||
"""available property returns True when engine initialized."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._available = True
|
||||
|
||||
assert tts.available is True
|
||||
|
||||
def test_available_returns_false_when_not_initialized(self):
|
||||
"""available property returns False when engine not initialized."""
|
||||
from timmy_serve.voice_tts import VoiceTTS
|
||||
|
||||
tts = VoiceTTS.__new__(VoiceTTS)
|
||||
tts._available = False
|
||||
|
||||
assert tts.available is False
|
||||
401
tests/timmy_automations/test_health_snapshot.py
Normal file
401
tests/timmy_automations/test_health_snapshot.py
Normal file
@@ -0,0 +1,401 @@
|
||||
"""Tests for health_snapshot module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
# Add timmy_automations to path for imports
|
||||
sys.path.insert(
|
||||
0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run")
|
||||
)
|
||||
|
||||
from datetime import UTC
|
||||
|
||||
import health_snapshot as hs
|
||||
|
||||
|
||||
class TestLoadConfig:
|
||||
"""Test configuration loading."""
|
||||
|
||||
def test_loads_default_config(self):
|
||||
"""Load default configuration."""
|
||||
config = hs.load_config()
|
||||
|
||||
assert "gitea_api" in config
|
||||
assert "repo_slug" in config
|
||||
assert "critical_labels" in config
|
||||
assert "flakiness_lookback_cycles" in config
|
||||
|
||||
def test_environment_overrides(self, monkeypatch):
|
||||
"""Environment variables override defaults."""
|
||||
monkeypatch.setenv("TIMMY_GITEA_API", "http://test:3000/api/v1")
|
||||
monkeypatch.setenv("TIMMY_REPO_SLUG", "test/repo")
|
||||
|
||||
config = hs.load_config()
|
||||
|
||||
assert config["gitea_api"] == "http://test:3000/api/v1"
|
||||
assert config["repo_slug"] == "test/repo"
|
||||
|
||||
|
||||
class TestGetToken:
|
||||
"""Test token retrieval."""
|
||||
|
||||
def test_returns_config_token(self):
|
||||
"""Return token from config if present."""
|
||||
config = {"token": "test-token-123"}
|
||||
token = hs.get_token(config)
|
||||
|
||||
assert token == "test-token-123"
|
||||
|
||||
def test_reads_from_file(self, tmp_path, monkeypatch):
|
||||
"""Read token from file if no config token."""
|
||||
token_file = tmp_path / "gitea_token"
|
||||
token_file.write_text("file-token-456")
|
||||
|
||||
config = {"token_file": str(token_file)}
|
||||
token = hs.get_token(config)
|
||||
|
||||
assert token == "file-token-456"
|
||||
|
||||
def test_returns_none_when_no_token(self):
|
||||
"""Return None when no token available."""
|
||||
config = {"token_file": "/nonexistent/path"}
|
||||
token = hs.get_token(config)
|
||||
|
||||
assert token is None
|
||||
|
||||
|
||||
class TestCISignal:
|
||||
"""Test CISignal dataclass."""
|
||||
|
||||
def test_default_details(self):
|
||||
"""Details defaults to empty dict."""
|
||||
signal = hs.CISignal(status="pass", message="CI passing")
|
||||
|
||||
assert signal.details == {}
|
||||
|
||||
def test_with_details(self):
|
||||
"""Can include details."""
|
||||
signal = hs.CISignal(status="pass", message="CI passing", details={"sha": "abc123"})
|
||||
|
||||
assert signal.details["sha"] == "abc123"
|
||||
|
||||
|
||||
class TestIssueSignal:
|
||||
"""Test IssueSignal dataclass."""
|
||||
|
||||
def test_default_issues_list(self):
|
||||
"""Issues defaults to empty list."""
|
||||
signal = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
|
||||
|
||||
assert signal.issues == []
|
||||
|
||||
def test_with_issues(self):
|
||||
"""Can include issues."""
|
||||
issues = [{"number": 1, "title": "Test"}]
|
||||
signal = hs.IssueSignal(count=1, p0_count=1, p1_count=0, issues=issues)
|
||||
|
||||
assert len(signal.issues) == 1
|
||||
|
||||
|
||||
class TestFlakinessSignal:
|
||||
"""Test FlakinessSignal dataclass."""
|
||||
|
||||
def test_calculated_fields(self):
|
||||
"""All fields set correctly."""
|
||||
signal = hs.FlakinessSignal(
|
||||
status="healthy",
|
||||
recent_failures=2,
|
||||
recent_cycles=20,
|
||||
failure_rate=0.1,
|
||||
message="Low flakiness",
|
||||
)
|
||||
|
||||
assert signal.status == "healthy"
|
||||
assert signal.recent_failures == 2
|
||||
assert signal.failure_rate == 0.1
|
||||
|
||||
|
||||
class TestHealthSnapshot:
|
||||
"""Test HealthSnapshot dataclass."""
|
||||
|
||||
def test_to_dict_structure(self):
|
||||
"""to_dict produces expected structure."""
|
||||
snapshot = hs.HealthSnapshot(
|
||||
timestamp="2026-01-01T00:00:00+00:00",
|
||||
overall_status="green",
|
||||
ci=hs.CISignal(status="pass", message="CI passing"),
|
||||
issues=hs.IssueSignal(count=0, p0_count=0, p1_count=0),
|
||||
flakiness=hs.FlakinessSignal(
|
||||
status="healthy",
|
||||
recent_failures=0,
|
||||
recent_cycles=10,
|
||||
failure_rate=0.0,
|
||||
message="All good",
|
||||
),
|
||||
tokens=hs.TokenEconomySignal(status="balanced", message="Balanced"),
|
||||
)
|
||||
|
||||
data = snapshot.to_dict()
|
||||
|
||||
assert data["timestamp"] == "2026-01-01T00:00:00+00:00"
|
||||
assert data["overall_status"] == "green"
|
||||
assert "ci" in data
|
||||
assert "issues" in data
|
||||
assert "flakiness" in data
|
||||
assert "tokens" in data
|
||||
|
||||
def test_to_dict_limits_issues(self):
|
||||
"""to_dict limits issues to 5."""
|
||||
many_issues = [{"number": i, "title": f"Issue {i}"} for i in range(10)]
|
||||
snapshot = hs.HealthSnapshot(
|
||||
timestamp="2026-01-01T00:00:00+00:00",
|
||||
overall_status="green",
|
||||
ci=hs.CISignal(status="pass", message="CI passing"),
|
||||
issues=hs.IssueSignal(count=10, p0_count=5, p1_count=5, issues=many_issues),
|
||||
flakiness=hs.FlakinessSignal(
|
||||
status="healthy",
|
||||
recent_failures=0,
|
||||
recent_cycles=10,
|
||||
failure_rate=0.0,
|
||||
message="All good",
|
||||
),
|
||||
tokens=hs.TokenEconomySignal(status="balanced", message="Balanced"),
|
||||
)
|
||||
|
||||
data = snapshot.to_dict()
|
||||
|
||||
assert len(data["issues"]["issues"]) == 5
|
||||
|
||||
|
||||
class TestCalculateOverallStatus:
|
||||
"""Test overall status calculation."""
|
||||
|
||||
def test_green_when_all_healthy(self):
|
||||
"""Status is green when all signals healthy."""
|
||||
ci = hs.CISignal(status="pass", message="CI passing")
|
||||
issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
|
||||
flakiness = hs.FlakinessSignal(
|
||||
status="healthy",
|
||||
recent_failures=0,
|
||||
recent_cycles=10,
|
||||
failure_rate=0.0,
|
||||
message="All good",
|
||||
)
|
||||
|
||||
status = hs.calculate_overall_status(ci, issues, flakiness)
|
||||
|
||||
assert status == "green"
|
||||
|
||||
def test_red_when_ci_fails(self):
|
||||
"""Status is red when CI fails."""
|
||||
ci = hs.CISignal(status="fail", message="CI failed")
|
||||
issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
|
||||
flakiness = hs.FlakinessSignal(
|
||||
status="healthy",
|
||||
recent_failures=0,
|
||||
recent_cycles=10,
|
||||
failure_rate=0.0,
|
||||
message="All good",
|
||||
)
|
||||
|
||||
status = hs.calculate_overall_status(ci, issues, flakiness)
|
||||
|
||||
assert status == "red"
|
||||
|
||||
def test_red_when_p0_issues(self):
|
||||
"""Status is red when P0 issues exist."""
|
||||
ci = hs.CISignal(status="pass", message="CI passing")
|
||||
issues = hs.IssueSignal(count=1, p0_count=1, p1_count=0)
|
||||
flakiness = hs.FlakinessSignal(
|
||||
status="healthy",
|
||||
recent_failures=0,
|
||||
recent_cycles=10,
|
||||
failure_rate=0.0,
|
||||
message="All good",
|
||||
)
|
||||
|
||||
status = hs.calculate_overall_status(ci, issues, flakiness)
|
||||
|
||||
assert status == "red"
|
||||
|
||||
def test_yellow_when_p1_issues(self):
|
||||
"""Status is yellow when P1 issues exist."""
|
||||
ci = hs.CISignal(status="pass", message="CI passing")
|
||||
issues = hs.IssueSignal(count=1, p0_count=0, p1_count=1)
|
||||
flakiness = hs.FlakinessSignal(
|
||||
status="healthy",
|
||||
recent_failures=0,
|
||||
recent_cycles=10,
|
||||
failure_rate=0.0,
|
||||
message="All good",
|
||||
)
|
||||
|
||||
status = hs.calculate_overall_status(ci, issues, flakiness)
|
||||
|
||||
assert status == "yellow"
|
||||
|
||||
def test_yellow_when_flakiness_degraded(self):
|
||||
"""Status is yellow when flakiness degraded."""
|
||||
ci = hs.CISignal(status="pass", message="CI passing")
|
||||
issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
|
||||
flakiness = hs.FlakinessSignal(
|
||||
status="degraded",
|
||||
recent_failures=5,
|
||||
recent_cycles=20,
|
||||
failure_rate=0.25,
|
||||
message="Moderate flakiness",
|
||||
)
|
||||
|
||||
status = hs.calculate_overall_status(ci, issues, flakiness)
|
||||
|
||||
assert status == "yellow"
|
||||
|
||||
def test_red_when_flakiness_critical(self):
|
||||
"""Status is red when flakiness critical."""
|
||||
ci = hs.CISignal(status="pass", message="CI passing")
|
||||
issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
|
||||
flakiness = hs.FlakinessSignal(
|
||||
status="critical",
|
||||
recent_failures=10,
|
||||
recent_cycles=20,
|
||||
failure_rate=0.5,
|
||||
message="High flakiness",
|
||||
)
|
||||
|
||||
status = hs.calculate_overall_status(ci, issues, flakiness)
|
||||
|
||||
assert status == "red"
|
||||
|
||||
|
||||
class TestCheckFlakiness:
|
||||
"""Test flakiness checking."""
|
||||
|
||||
def test_no_data_returns_unknown(self, tmp_path, monkeypatch):
|
||||
"""Return unknown when no cycle data exists."""
|
||||
monkeypatch.setattr(hs, "REPO_ROOT", tmp_path)
|
||||
config = {"flakiness_lookback_cycles": 20}
|
||||
|
||||
signal = hs.check_flakiness(config)
|
||||
|
||||
assert signal.status == "unknown"
|
||||
assert signal.message == "No cycle data available"
|
||||
|
||||
def test_calculates_failure_rate(self, tmp_path, monkeypatch):
|
||||
"""Calculate failure rate from cycle data."""
|
||||
monkeypatch.setattr(hs, "REPO_ROOT", tmp_path)
|
||||
|
||||
retro_dir = tmp_path / ".loop" / "retro"
|
||||
retro_dir.mkdir(parents=True)
|
||||
|
||||
cycles = [
|
||||
json.dumps({"success": True, "cycle": 1}),
|
||||
json.dumps({"success": True, "cycle": 2}),
|
||||
json.dumps({"success": False, "cycle": 3}),
|
||||
json.dumps({"success": True, "cycle": 4}),
|
||||
json.dumps({"success": False, "cycle": 5}),
|
||||
]
|
||||
retro_file = retro_dir / "cycles.jsonl"
|
||||
retro_file.write_text("\n".join(cycles))
|
||||
|
||||
config = {"flakiness_lookback_cycles": 20}
|
||||
signal = hs.check_flakiness(config)
|
||||
|
||||
assert signal.recent_cycles == 5
|
||||
assert signal.recent_failures == 2
|
||||
assert signal.failure_rate == 0.4
|
||||
assert signal.status == "critical" # 40% > 30%
|
||||
|
||||
|
||||
class TestCheckTokenEconomy:
|
||||
"""Test token economy checking."""
|
||||
|
||||
def test_no_data_returns_unknown(self, tmp_path, monkeypatch):
|
||||
"""Return unknown when no token data exists."""
|
||||
monkeypatch.setattr(hs, "REPO_ROOT", tmp_path)
|
||||
config = {}
|
||||
|
||||
signal = hs.check_token_economy(config)
|
||||
|
||||
assert signal.status == "unknown"
|
||||
|
||||
def test_calculates_balanced(self, tmp_path, monkeypatch):
|
||||
"""Detect balanced token economy."""
|
||||
monkeypatch.setattr(hs, "REPO_ROOT", tmp_path)
|
||||
|
||||
loop_dir = tmp_path / ".loop"
|
||||
loop_dir.mkdir(parents=True)
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
now = datetime.now(UTC).isoformat()
|
||||
transactions = [
|
||||
json.dumps({"timestamp": now, "delta": 10}),
|
||||
json.dumps({"timestamp": now, "delta": -5}),
|
||||
]
|
||||
ledger_file = loop_dir / "token_economy.jsonl"
|
||||
ledger_file.write_text("\n".join(transactions))
|
||||
|
||||
config = {}
|
||||
signal = hs.check_token_economy(config)
|
||||
|
||||
assert signal.status == "balanced"
|
||||
assert signal.recent_mint == 10
|
||||
assert signal.recent_burn == 5
|
||||
|
||||
|
||||
class TestGiteaClient:
|
||||
"""Test Gitea API client."""
|
||||
|
||||
def test_initialization(self):
|
||||
"""Initialize with config and token."""
|
||||
config = {"gitea_api": "http://test:3000/api/v1", "repo_slug": "test/repo"}
|
||||
client = hs.GiteaClient(config, "token123")
|
||||
|
||||
assert client.api_base == "http://test:3000/api/v1"
|
||||
assert client.repo_slug == "test/repo"
|
||||
assert client.token == "token123"
|
||||
|
||||
def test_headers_with_token(self):
|
||||
"""Include authorization header with token."""
|
||||
config = {"gitea_api": "http://test:3000/api/v1", "repo_slug": "test/repo"}
|
||||
client = hs.GiteaClient(config, "token123")
|
||||
|
||||
headers = client._headers()
|
||||
|
||||
assert headers["Authorization"] == "token token123"
|
||||
assert headers["Accept"] == "application/json"
|
||||
|
||||
def test_headers_without_token(self):
|
||||
"""No authorization header without token."""
|
||||
config = {"gitea_api": "http://test:3000/api/v1", "repo_slug": "test/repo"}
|
||||
client = hs.GiteaClient(config, None)
|
||||
|
||||
headers = client._headers()
|
||||
|
||||
assert "Authorization" not in headers
|
||||
assert headers["Accept"] == "application/json"
|
||||
|
||||
|
||||
class TestGenerateSnapshot:
|
||||
"""Test snapshot generation."""
|
||||
|
||||
def test_returns_snapshot(self):
|
||||
"""Generate a complete snapshot."""
|
||||
config = hs.load_config()
|
||||
|
||||
with (
|
||||
patch.object(hs.GiteaClient, "is_available", return_value=False),
|
||||
patch.object(hs.GiteaClient, "__init__", return_value=None),
|
||||
):
|
||||
snapshot = hs.generate_snapshot(config, None)
|
||||
|
||||
assert isinstance(snapshot, hs.HealthSnapshot)
|
||||
assert snapshot.overall_status in ["green", "yellow", "red", "unknown"]
|
||||
assert snapshot.ci is not None
|
||||
assert snapshot.issues is not None
|
||||
assert snapshot.flakiness is not None
|
||||
assert snapshot.tokens is not None
|
||||
524
tests/timmy_automations/test_token_rules.py
Normal file
524
tests/timmy_automations/test_token_rules.py
Normal file
@@ -0,0 +1,524 @@
|
||||
"""Tests for token_rules module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Add timmy_automations to path for imports
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations"))
|
||||
|
||||
from utils import token_rules as tr
|
||||
|
||||
|
||||
class TestTokenEvent:
|
||||
"""Test TokenEvent dataclass."""
|
||||
|
||||
def test_delta_calculation_reward(self):
|
||||
"""Delta is positive for rewards."""
|
||||
event = tr.TokenEvent(
|
||||
name="test",
|
||||
description="Test event",
|
||||
reward=10,
|
||||
penalty=0,
|
||||
category="test",
|
||||
)
|
||||
assert event.delta == 10
|
||||
|
||||
def test_delta_calculation_penalty(self):
|
||||
"""Delta is negative for penalties."""
|
||||
event = tr.TokenEvent(
|
||||
name="test",
|
||||
description="Test event",
|
||||
reward=0,
|
||||
penalty=-5,
|
||||
category="test",
|
||||
)
|
||||
assert event.delta == -5
|
||||
|
||||
def test_delta_calculation_mixed(self):
|
||||
"""Delta is net of reward and penalty."""
|
||||
event = tr.TokenEvent(
|
||||
name="test",
|
||||
description="Test event",
|
||||
reward=10,
|
||||
penalty=-3,
|
||||
category="test",
|
||||
)
|
||||
assert event.delta == 7
|
||||
|
||||
|
||||
class TestTokenRulesLoading:
|
||||
"""Test TokenRules configuration loading."""
|
||||
|
||||
def test_loads_from_yaml_file(self, tmp_path):
|
||||
"""Load configuration from YAML file."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0-test",
|
||||
"events": {
|
||||
"test_event": {
|
||||
"description": "A test event",
|
||||
"reward": 15,
|
||||
"category": "test",
|
||||
}
|
||||
},
|
||||
"gating_thresholds": {"test_op": 50},
|
||||
"daily_limits": {"test": {"max_earn": 100, "max_spend": 10}},
|
||||
"audit": {"log_all_transactions": False},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_config_version() == "1.0.0-test"
|
||||
assert rules.get_delta("test_event") == 15
|
||||
assert rules.get_gate_threshold("test_op") == 50
|
||||
|
||||
def test_fallback_when_yaml_missing(self, tmp_path):
|
||||
"""Use fallback defaults when YAML file doesn't exist."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_config_version() == "fallback"
|
||||
# Fallback should have some basic events
|
||||
assert rules.get_delta("pr_merged") == 10
|
||||
assert rules.get_delta("test_fixed") == 8
|
||||
assert rules.get_delta("automation_failure") == -2
|
||||
|
||||
def test_fallback_when_yaml_not_installed(self, tmp_path):
|
||||
"""Use fallback when PyYAML is not installed."""
|
||||
with patch.dict(sys.modules, {"yaml": None}):
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_file.write_text("version: '1.0.0'")
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_config_version() == "fallback"
|
||||
|
||||
|
||||
class TestTokenRulesGetDelta:
|
||||
"""Test get_delta method."""
|
||||
|
||||
def test_get_delta_existing_event(self, tmp_path):
|
||||
"""Get delta for configured event."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"},
|
||||
"automation_failure": {"description": "Failure", "penalty": -2, "category": "ops"},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_delta("pr_merged") == 10
|
||||
assert rules.get_delta("automation_failure") == -2
|
||||
|
||||
def test_get_delta_unknown_event(self, tmp_path):
|
||||
"""Return 0 for unknown events."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_delta("unknown_event") == 0
|
||||
|
||||
|
||||
class TestTokenRulesGetEvent:
|
||||
"""Test get_event method."""
|
||||
|
||||
def test_get_event_returns_full_config(self, tmp_path):
|
||||
"""Get full event configuration."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"pr_merged": {
|
||||
"description": "PR merged successfully",
|
||||
"reward": 10,
|
||||
"category": "merge",
|
||||
"gate_threshold": 0,
|
||||
}
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
event = rules.get_event("pr_merged")
|
||||
|
||||
assert event is not None
|
||||
assert event.name == "pr_merged"
|
||||
assert event.description == "PR merged successfully"
|
||||
assert event.reward == 10
|
||||
assert event.category == "merge"
|
||||
assert event.gate_threshold == 0
|
||||
|
||||
def test_get_event_unknown_returns_none(self, tmp_path):
|
||||
"""Return None for unknown event."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_event("unknown") is None
|
||||
|
||||
|
||||
class TestTokenRulesListEvents:
|
||||
"""Test list_events method."""
|
||||
|
||||
def test_list_all_events(self, tmp_path):
|
||||
"""List all configured events."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
||||
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
||||
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
events = rules.list_events()
|
||||
|
||||
assert len(events) == 3
|
||||
event_names = {e.name for e in events}
|
||||
assert "event_a" in event_names
|
||||
assert "event_b" in event_names
|
||||
assert "event_c" in event_names
|
||||
|
||||
def test_list_events_by_category(self, tmp_path):
|
||||
"""Filter events by category."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
||||
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
||||
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
events = rules.list_events(category="cat1")
|
||||
|
||||
assert len(events) == 2
|
||||
for event in events:
|
||||
assert event.category == "cat1"
|
||||
|
||||
|
||||
class TestTokenRulesGating:
|
||||
"""Test gating threshold methods."""
|
||||
|
||||
def test_check_gate_with_threshold(self, tmp_path):
|
||||
"""Check gate when threshold is defined."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {},
|
||||
"gating_thresholds": {"pr_merge": 50},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.check_gate("pr_merge", current_tokens=100) is True
|
||||
assert rules.check_gate("pr_merge", current_tokens=50) is True
|
||||
assert rules.check_gate("pr_merge", current_tokens=49) is False
|
||||
assert rules.check_gate("pr_merge", current_tokens=0) is False
|
||||
|
||||
def test_check_gate_no_threshold(self, tmp_path):
|
||||
"""Check gate when no threshold is defined (always allowed)."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {},
|
||||
"gating_thresholds": {},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
# No threshold defined, should always be allowed
|
||||
assert rules.check_gate("unknown_op", current_tokens=0) is True
|
||||
assert rules.check_gate("unknown_op", current_tokens=-100) is True
|
||||
|
||||
def test_get_gate_threshold(self, tmp_path):
|
||||
"""Get threshold value."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"gating_thresholds": {"pr_merge": 50, "sensitive_op": 100},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_gate_threshold("pr_merge") == 50
|
||||
assert rules.get_gate_threshold("sensitive_op") == 100
|
||||
assert rules.get_gate_threshold("unknown") is None
|
||||
|
||||
|
||||
class TestTokenRulesDailyLimits:
|
||||
"""Test daily limits methods."""
|
||||
|
||||
def test_get_daily_limits(self, tmp_path):
|
||||
"""Get daily limits for a category."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"daily_limits": {
|
||||
"triage": {"max_earn": 100, "max_spend": 0},
|
||||
"merge": {"max_earn": 50, "max_spend": 10},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
triage_limits = rules.get_daily_limits("triage")
|
||||
assert triage_limits is not None
|
||||
assert triage_limits.max_earn == 100
|
||||
assert triage_limits.max_spend == 0
|
||||
|
||||
merge_limits = rules.get_daily_limits("merge")
|
||||
assert merge_limits is not None
|
||||
assert merge_limits.max_earn == 50
|
||||
assert merge_limits.max_spend == 10
|
||||
|
||||
def test_get_daily_limits_unknown(self, tmp_path):
|
||||
"""Return None for unknown category."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {"version": "1.0.0", "daily_limits": {}}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
assert rules.get_daily_limits("unknown") is None
|
||||
|
||||
|
||||
class TestTokenRulesComputeTransaction:
|
||||
"""Test compute_transaction method."""
|
||||
|
||||
def test_compute_successful_transaction(self, tmp_path):
|
||||
"""Compute transaction for valid event."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"pr_merged": {"description": "PR merged", "reward": 10, "category": "merge"}
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
result = rules.compute_transaction("pr_merged", current_tokens=100)
|
||||
|
||||
assert result["event"] == "pr_merged"
|
||||
assert result["delta"] == 10
|
||||
assert result["category"] == "merge"
|
||||
assert result["allowed"] is True
|
||||
assert result["new_balance"] == 110
|
||||
assert result["limit_reached"] is False
|
||||
|
||||
def test_compute_unknown_event(self, tmp_path):
|
||||
"""Compute transaction for unknown event."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
result = rules.compute_transaction("unknown_event", current_tokens=50)
|
||||
|
||||
assert result["event"] == "unknown_event"
|
||||
assert result["delta"] == 0
|
||||
assert result["allowed"] is False
|
||||
assert result["reason"] == "unknown_event"
|
||||
assert result["new_balance"] == 50
|
||||
|
||||
def test_compute_with_gate_check(self, tmp_path):
|
||||
"""Compute transaction respects gating."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"sensitive_op": {
|
||||
"description": "Sensitive",
|
||||
"reward": 50,
|
||||
"category": "sensitive",
|
||||
"gate_threshold": 100,
|
||||
}
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
# With enough tokens
|
||||
result = rules.compute_transaction("sensitive_op", current_tokens=150)
|
||||
assert result["allowed"] is True
|
||||
|
||||
# Without enough tokens
|
||||
result = rules.compute_transaction("sensitive_op", current_tokens=50)
|
||||
assert result["allowed"] is False
|
||||
assert "gate_reason" in result
|
||||
|
||||
def test_compute_with_daily_limits(self, tmp_path):
|
||||
"""Compute transaction respects daily limits."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"triage_action": {
|
||||
"description": "Triage",
|
||||
"reward": 20,
|
||||
"category": "triage",
|
||||
}
|
||||
},
|
||||
"daily_limits": {"triage": {"max_earn": 50, "max_spend": 0}},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
|
||||
# Within limit
|
||||
daily_earned = {"triage": 20}
|
||||
result = rules.compute_transaction(
|
||||
"triage_action", current_tokens=100, current_daily_earned=daily_earned
|
||||
)
|
||||
assert result["allowed"] is True
|
||||
assert result["limit_reached"] is False
|
||||
|
||||
# Would exceed limit (20 + 20 > 50 is false, so this should be fine)
|
||||
# Let's test with higher current earned
|
||||
daily_earned = {"triage": 40}
|
||||
result = rules.compute_transaction(
|
||||
"triage_action", current_tokens=100, current_daily_earned=daily_earned
|
||||
)
|
||||
assert result["allowed"] is False
|
||||
assert result["limit_reached"] is True
|
||||
assert "limit_reason" in result
|
||||
|
||||
|
||||
class TestTokenRulesCategories:
|
||||
"""Test category methods."""
|
||||
|
||||
def test_get_categories(self, tmp_path):
|
||||
"""Get all unique categories."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {
|
||||
"version": "1.0.0",
|
||||
"events": {
|
||||
"event_a": {"description": "A", "reward": 5, "category": "cat1"},
|
||||
"event_b": {"description": "B", "reward": 10, "category": "cat2"},
|
||||
"event_c": {"description": "C", "reward": 15, "category": "cat1"},
|
||||
},
|
||||
}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
categories = rules.get_categories()
|
||||
|
||||
assert sorted(categories) == ["cat1", "cat2"]
|
||||
|
||||
|
||||
class TestTokenRulesAudit:
|
||||
"""Test audit methods."""
|
||||
|
||||
def test_is_auditable_true(self, tmp_path):
|
||||
"""Check if auditable when enabled."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": True}}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
assert rules.is_auditable() is True
|
||||
|
||||
def test_is_auditable_false(self, tmp_path):
|
||||
"""Check if auditable when disabled."""
|
||||
yaml = pytest.importorskip("yaml")
|
||||
|
||||
config_file = tmp_path / "token_rules.yaml"
|
||||
config_data = {"version": "1.0.0", "audit": {"log_all_transactions": False}}
|
||||
config_file.write_text(yaml.dump(config_data))
|
||||
|
||||
rules = tr.TokenRules(config_path=config_file)
|
||||
assert rules.is_auditable() is False
|
||||
|
||||
|
||||
class TestConvenienceFunctions:
|
||||
"""Test module-level convenience functions."""
|
||||
|
||||
def test_get_token_delta(self, tmp_path):
|
||||
"""Convenience function returns delta."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||
delta = tr.get_token_delta("pr_merged")
|
||||
assert delta == 10 # From fallback
|
||||
|
||||
def test_check_operation_gate(self, tmp_path):
|
||||
"""Convenience function checks gate."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||
# Fallback has pr_merge gate at 0
|
||||
assert tr.check_operation_gate("pr_merge", current_tokens=0) is True
|
||||
assert tr.check_operation_gate("pr_merge", current_tokens=100) is True
|
||||
|
||||
def test_compute_token_reward(self, tmp_path):
|
||||
"""Convenience function computes reward."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||
result = tr.compute_token_reward("pr_merged", current_tokens=50)
|
||||
assert result["event"] == "pr_merged"
|
||||
assert result["delta"] == 10
|
||||
assert result["new_balance"] == 60
|
||||
|
||||
def test_list_token_events(self, tmp_path):
|
||||
"""Convenience function lists events."""
|
||||
config_file = tmp_path / "nonexistent.yaml"
|
||||
|
||||
with patch.object(tr.TokenRules, "CONFIG_PATH", config_file):
|
||||
events = tr.list_token_events()
|
||||
assert len(events) >= 3 # Fallback has at least 3 events
|
||||
|
||||
# Check structure
|
||||
for event in events:
|
||||
assert "name" in event
|
||||
assert "description" in event
|
||||
assert "delta" in event
|
||||
assert "category" in event
|
||||
343
tests/timmy_automations/test_weekly_narrative.py
Normal file
343
tests/timmy_automations/test_weekly_narrative.py
Normal file
@@ -0,0 +1,343 @@
|
||||
"""Tests for weekly_narrative.py script."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
# Add timmy_automations to path for imports
|
||||
sys.path.insert(
|
||||
0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run")
|
||||
)
|
||||
|
||||
import weekly_narrative as wn
|
||||
|
||||
|
||||
class TestParseTimestamp:
|
||||
"""Test timestamp parsing."""
|
||||
|
||||
def test_parse_iso_with_z(self):
|
||||
"""Parse ISO timestamp with Z suffix."""
|
||||
result = wn.parse_ts("2026-03-21T12:00:00Z")
|
||||
assert result is not None
|
||||
assert result.year == 2026
|
||||
assert result.month == 3
|
||||
assert result.day == 21
|
||||
|
||||
def test_parse_iso_with_offset(self):
|
||||
"""Parse ISO timestamp with timezone offset."""
|
||||
result = wn.parse_ts("2026-03-21T12:00:00+00:00")
|
||||
assert result is not None
|
||||
assert result.year == 2026
|
||||
|
||||
def test_parse_empty_string(self):
|
||||
"""Empty string returns None."""
|
||||
result = wn.parse_ts("")
|
||||
assert result is None
|
||||
|
||||
def test_parse_invalid_string(self):
|
||||
"""Invalid string returns None."""
|
||||
result = wn.parse_ts("not-a-timestamp")
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestCollectCyclesData:
|
||||
"""Test cycle data collection."""
|
||||
|
||||
def test_no_cycles_file(self, tmp_path):
|
||||
"""Handle missing cycles file gracefully."""
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
since = datetime.now(UTC) - timedelta(days=7)
|
||||
result = wn.collect_cycles_data(since)
|
||||
assert result["total"] == 0
|
||||
assert result["successes"] == 0
|
||||
assert result["failures"] == 0
|
||||
|
||||
def test_collect_recent_cycles(self, tmp_path):
|
||||
"""Collect cycles within lookback period."""
|
||||
retro_dir = tmp_path / ".loop" / "retro"
|
||||
retro_dir.mkdir(parents=True)
|
||||
|
||||
now = datetime.now(UTC)
|
||||
cycles = [
|
||||
{"timestamp": now.isoformat(), "success": True, "cycle": 1},
|
||||
{"timestamp": now.isoformat(), "success": False, "cycle": 2},
|
||||
{"timestamp": (now - timedelta(days=10)).isoformat(), "success": True, "cycle": 3},
|
||||
]
|
||||
|
||||
with open(retro_dir / "cycles.jsonl", "w") as f:
|
||||
for c in cycles:
|
||||
f.write(json.dumps(c) + "\n")
|
||||
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
since = now - timedelta(days=7)
|
||||
result = wn.collect_cycles_data(since)
|
||||
assert result["total"] == 2 # Only recent 2
|
||||
assert result["successes"] == 1
|
||||
assert result["failures"] == 1
|
||||
|
||||
|
||||
class TestExtractThemes:
|
||||
"""Test theme extraction from issues."""
|
||||
|
||||
def test_extract_layer_labels(self):
|
||||
"""Extract layer labels from issues."""
|
||||
issues = [
|
||||
{"labels": [{"name": "layer:triage"}, {"name": "bug"}]},
|
||||
{"labels": [{"name": "layer:tests"}, {"name": "bug"}]},
|
||||
{"labels": [{"name": "layer:triage"}, {"name": "feature"}]},
|
||||
]
|
||||
|
||||
result = wn.extract_themes(issues)
|
||||
|
||||
assert len(result["layers"]) == 2
|
||||
layer_names = {layer["name"] for layer in result["layers"]}
|
||||
assert "triage" in layer_names
|
||||
assert "tests" in layer_names
|
||||
|
||||
def test_extract_type_labels(self):
|
||||
"""Extract type labels (bug/feature/etc)."""
|
||||
issues = [
|
||||
{"labels": [{"name": "bug"}]},
|
||||
{"labels": [{"name": "feature"}]},
|
||||
{"labels": [{"name": "bug"}]},
|
||||
]
|
||||
|
||||
result = wn.extract_themes(issues)
|
||||
|
||||
type_names = {t_type["name"] for t_type in result["types"]}
|
||||
assert "bug" in type_names
|
||||
assert "feature" in type_names
|
||||
|
||||
def test_empty_issues(self):
|
||||
"""Handle empty issue list."""
|
||||
result = wn.extract_themes([])
|
||||
assert result["layers"] == []
|
||||
assert result["types"] == []
|
||||
assert result["top_labels"] == []
|
||||
|
||||
|
||||
class TestExtractAgentContributions:
|
||||
"""Test agent contribution extraction."""
|
||||
|
||||
def test_extract_assignees(self):
|
||||
"""Extract assignee counts."""
|
||||
issues = [
|
||||
{"assignee": {"login": "kimi"}},
|
||||
{"assignee": {"login": "hermes"}},
|
||||
{"assignee": {"login": "kimi"}},
|
||||
]
|
||||
|
||||
result = wn.extract_agent_contributions(issues, [], [])
|
||||
|
||||
assert len(result["active_assignees"]) == 2
|
||||
assignee_logins = {a["login"] for a in result["active_assignees"]} # noqa: E741
|
||||
assert "kimi" in assignee_logins
|
||||
assert "hermes" in assignee_logins
|
||||
|
||||
def test_extract_pr_authors(self):
|
||||
"""Extract PR author counts."""
|
||||
prs = [
|
||||
{"user": {"login": "kimi"}},
|
||||
{"user": {"login": "claude"}},
|
||||
{"user": {"login": "kimi"}},
|
||||
]
|
||||
|
||||
result = wn.extract_agent_contributions([], prs, [])
|
||||
|
||||
assert len(result["pr_authors"]) == 2
|
||||
|
||||
def test_kimi_mentions_in_cycles(self):
|
||||
"""Count Kimi mentions in cycle notes."""
|
||||
cycles = [
|
||||
{"notes": "Kimi did great work", "reason": ""},
|
||||
{"notes": "", "reason": "Kimi timeout"},
|
||||
{"notes": "All good", "reason": ""},
|
||||
]
|
||||
|
||||
result = wn.extract_agent_contributions([], [], cycles)
|
||||
assert result["kimi_mentioned_cycles"] == 2
|
||||
|
||||
|
||||
class TestAnalyzeTestShifts:
|
||||
"""Test test pattern analysis."""
|
||||
|
||||
def test_no_cycles(self):
|
||||
"""Handle no cycle data."""
|
||||
result = wn.analyze_test_shifts([])
|
||||
assert "note" in result
|
||||
|
||||
def test_test_metrics(self):
|
||||
"""Calculate test metrics from cycles."""
|
||||
cycles = [
|
||||
{"tests_passed": 100, "tests_added": 5},
|
||||
{"tests_passed": 150, "tests_added": 3},
|
||||
]
|
||||
|
||||
result = wn.analyze_test_shifts(cycles)
|
||||
|
||||
assert result["total_tests_passed"] == 250
|
||||
assert result["total_tests_added"] == 8
|
||||
|
||||
|
||||
class TestGenerateVibeSummary:
|
||||
"""Test vibe summary generation."""
|
||||
|
||||
def test_productive_vibe(self):
|
||||
"""High success rate and activity = productive vibe."""
|
||||
cycles_data = {"success_rate": 0.95, "successes": 10, "failures": 1}
|
||||
issues_data = {"closed_count": 5}
|
||||
|
||||
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
|
||||
|
||||
assert result["overall"] == "productive"
|
||||
assert "strong week" in result["description"].lower()
|
||||
|
||||
def test_struggling_vibe(self):
|
||||
"""More failures than successes = struggling vibe."""
|
||||
cycles_data = {"success_rate": 0.3, "successes": 3, "failures": 7}
|
||||
issues_data = {"closed_count": 0}
|
||||
|
||||
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
|
||||
|
||||
assert result["overall"] == "struggling"
|
||||
|
||||
def test_quiet_vibe(self):
|
||||
"""Low activity = quiet vibe."""
|
||||
cycles_data = {"success_rate": 0.0, "successes": 0, "failures": 0}
|
||||
issues_data = {"closed_count": 0}
|
||||
|
||||
result = wn.generate_vibe_summary(cycles_data, issues_data, {}, {"layers": []}, {}, {}, {})
|
||||
|
||||
assert result["overall"] == "quiet"
|
||||
|
||||
|
||||
class TestGenerateMarkdownSummary:
|
||||
"""Test markdown summary generation."""
|
||||
|
||||
def test_includes_header(self):
|
||||
"""Markdown includes header."""
|
||||
narrative = {
|
||||
"period": {"start": "2026-03-14T00:00:00", "end": "2026-03-21T00:00:00"},
|
||||
"vibe": {"overall": "productive", "description": "Good week"},
|
||||
"activity": {
|
||||
"cycles": {"total": 10, "successes": 9, "failures": 1},
|
||||
"issues": {"closed": 5, "opened": 3},
|
||||
"pull_requests": {"merged": 4, "opened": 2},
|
||||
},
|
||||
}
|
||||
|
||||
result = wn.generate_markdown_summary(narrative)
|
||||
|
||||
assert "# Weekly Narrative Summary" in result
|
||||
assert "productive" in result.lower()
|
||||
assert "10 total" in result or "10" in result
|
||||
|
||||
def test_includes_focus_areas(self):
|
||||
"""Markdown includes focus areas when present."""
|
||||
narrative = {
|
||||
"period": {"start": "2026-03-14", "end": "2026-03-21"},
|
||||
"vibe": {
|
||||
"overall": "productive",
|
||||
"description": "Good week",
|
||||
"focus_areas": ["triage (5 items)", "tests (3 items)"],
|
||||
},
|
||||
"activity": {
|
||||
"cycles": {"total": 0, "successes": 0, "failures": 0},
|
||||
"issues": {"closed": 0, "opened": 0},
|
||||
"pull_requests": {"merged": 0, "opened": 0},
|
||||
},
|
||||
}
|
||||
|
||||
result = wn.generate_markdown_summary(narrative)
|
||||
|
||||
assert "Focus Areas" in result
|
||||
assert "triage" in result
|
||||
|
||||
|
||||
class TestConfigLoading:
|
||||
"""Test configuration loading."""
|
||||
|
||||
def test_default_config(self, tmp_path):
|
||||
"""Default config when manifest missing."""
|
||||
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
|
||||
config = wn.load_automation_config()
|
||||
assert config["lookback_days"] == 7
|
||||
assert config["enabled"] is True
|
||||
|
||||
def test_environment_override(self, tmp_path):
|
||||
"""Environment variables override config."""
|
||||
with patch.dict("os.environ", {"TIMMY_WEEKLY_NARRATIVE_ENABLED": "false"}):
|
||||
with patch.object(wn, "CONFIG_PATH", tmp_path / "nonexistent.json"):
|
||||
config = wn.load_automation_config()
|
||||
assert config["enabled"] is False
|
||||
|
||||
|
||||
class TestMain:
|
||||
"""Test main function."""
|
||||
|
||||
def test_disabled_exits_cleanly(self, tmp_path):
|
||||
"""When disabled and no --force, exits cleanly."""
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
with patch.object(wn, "load_automation_config", return_value={"enabled": False}):
|
||||
with patch("sys.argv", ["weekly_narrative"]):
|
||||
result = wn.main()
|
||||
assert result == 0
|
||||
|
||||
def test_force_runs_when_disabled(self, tmp_path):
|
||||
"""--force runs even when disabled."""
|
||||
# Setup minimal structure
|
||||
(tmp_path / ".loop" / "retro").mkdir(parents=True)
|
||||
|
||||
with patch.object(wn, "REPO_ROOT", tmp_path):
|
||||
with patch.object(
|
||||
wn,
|
||||
"load_automation_config",
|
||||
return_value={
|
||||
"enabled": False,
|
||||
"lookback_days": 7,
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "test/repo",
|
||||
"token_file": "~/.hermes/gitea_token",
|
||||
},
|
||||
):
|
||||
with patch.object(wn, "GiteaClient") as mock_client:
|
||||
mock_instance = MagicMock()
|
||||
mock_instance.is_available.return_value = False
|
||||
mock_client.return_value = mock_instance
|
||||
|
||||
with patch("sys.argv", ["weekly_narrative", "--force"]):
|
||||
result = wn.main()
|
||||
# Should complete without error even though Gitea unavailable
|
||||
assert result == 0
|
||||
|
||||
|
||||
class TestGiteaClient:
|
||||
"""Test Gitea API client."""
|
||||
|
||||
def test_is_available_when_unavailable(self):
|
||||
"""is_available returns False when server down."""
|
||||
config = {"gitea_api": "http://localhost:99999", "repo_slug": "test/repo"}
|
||||
client = wn.GiteaClient(config, None)
|
||||
|
||||
# Should return False without raising
|
||||
assert client.is_available() is False
|
||||
|
||||
def test_headers_with_token(self):
|
||||
"""Headers include Authorization when token provided."""
|
||||
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
|
||||
client = wn.GiteaClient(config, "test-token")
|
||||
|
||||
headers = client._headers()
|
||||
assert headers["Authorization"] == "token test-token"
|
||||
|
||||
def test_headers_without_token(self):
|
||||
"""Headers don't include Authorization when no token."""
|
||||
config = {"gitea_api": "http://localhost:3000", "repo_slug": "test/repo"}
|
||||
client = wn.GiteaClient(config, None)
|
||||
|
||||
headers = client._headers()
|
||||
assert "Authorization" not in headers
|
||||
@@ -1,6 +1,9 @@
|
||||
{
|
||||
"version": "1.0.0",
|
||||
"description": "Master manifest of all Timmy automations",
|
||||
"_health_snapshot": {
|
||||
"note": "Quick health check before coding — CI, P0/P1 issues, flakiness"
|
||||
},
|
||||
"last_updated": "2026-03-21",
|
||||
"automations": [
|
||||
{
|
||||
@@ -228,6 +231,43 @@
|
||||
"max_items": 5
|
||||
},
|
||||
"outputs": []
|
||||
},
|
||||
{
|
||||
"id": "weekly_narrative",
|
||||
"name": "Weekly Narrative Summary",
|
||||
"description": "Generates a human-readable weekly summary of work themes, agent contributions, and token economy shifts",
|
||||
"script": "timmy_automations/daily_run/weekly_narrative.py",
|
||||
"category": "daily_run",
|
||||
"enabled": true,
|
||||
"trigger": "scheduled",
|
||||
"schedule": "weekly",
|
||||
"executable": "python3",
|
||||
"config": {
|
||||
"lookback_days": 7,
|
||||
"output_file": ".loop/weekly_narrative.json",
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "rockachopa/Timmy-time-dashboard"
|
||||
},
|
||||
"outputs": [
|
||||
".loop/weekly_narrative.json",
|
||||
".loop/weekly_narrative.md"
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "health_snapshot",
|
||||
"name": "Health Snapshot",
|
||||
"description": "Quick health check before coding — CI status, P0/P1 issues, test flakiness, token economy",
|
||||
"script": "timmy_automations/daily_run/health_snapshot.py",
|
||||
"category": "daily_run",
|
||||
"enabled": true,
|
||||
"trigger": "pre_cycle",
|
||||
"executable": "python3",
|
||||
"config": {
|
||||
"critical_labels": ["P0", "P1", "priority/critical", "priority/high"],
|
||||
"flakiness_lookback_cycles": 20,
|
||||
"ci_timeout_seconds": 5
|
||||
},
|
||||
"outputs": []
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -17,6 +17,10 @@
|
||||
"manual": {
|
||||
"description": "Run on-demand only",
|
||||
"automations": ["agent_workspace", "kimi_bootstrap", "kimi_resume", "backfill_retro"]
|
||||
},
|
||||
"weekly": {
|
||||
"description": "Run once per week (Sundays)",
|
||||
"automations": ["weekly_narrative"]
|
||||
}
|
||||
},
|
||||
"triggers": {
|
||||
|
||||
138
timmy_automations/config/token_rules.yaml
Normal file
138
timmy_automations/config/token_rules.yaml
Normal file
@@ -0,0 +1,138 @@
|
||||
# Token Rules — Agent reward/penalty configuration for automations
|
||||
#
|
||||
# This file defines the token economy for agent actions.
|
||||
# Modify values here to adjust incentives without code changes.
|
||||
#
|
||||
# Used by: timmy_automations.utils.token_rules
|
||||
|
||||
version: "1.0.0"
|
||||
description: "Token economy rules for agent automations"
|
||||
|
||||
# ── Events ─────────────────────────────────────────────────────────────────
|
||||
# Each event type defines rewards/penalties and optional gating thresholds
|
||||
|
||||
events:
|
||||
# Triage actions
|
||||
triage_success:
|
||||
description: "Successfully triaged an issue (scored and categorized)"
|
||||
reward: 5
|
||||
category: "triage"
|
||||
|
||||
deep_triage_refinement:
|
||||
description: "LLM-driven issue refinement with acceptance criteria added"
|
||||
reward: 20
|
||||
category: "triage"
|
||||
|
||||
quarantine_candidate_found:
|
||||
description: "Identified a repeat failure issue for quarantine"
|
||||
reward: 10
|
||||
category: "triage"
|
||||
|
||||
# Daily Run completions
|
||||
daily_run_completed:
|
||||
description: "Completed a daily run cycle successfully"
|
||||
reward: 5
|
||||
category: "daily_run"
|
||||
|
||||
golden_path_generated:
|
||||
description: "Generated a coherent mini-session plan"
|
||||
reward: 3
|
||||
category: "daily_run"
|
||||
|
||||
weekly_narrative_created:
|
||||
description: "Generated weekly summary of work themes"
|
||||
reward: 15
|
||||
category: "daily_run"
|
||||
|
||||
# PR merges
|
||||
pr_merged:
|
||||
description: "Successfully merged a pull request"
|
||||
reward: 10
|
||||
category: "merge"
|
||||
# Gating: requires minimum tokens to perform
|
||||
gate_threshold: 0
|
||||
|
||||
pr_merged_with_tests:
|
||||
description: "Merged PR with all tests passing"
|
||||
reward: 15
|
||||
category: "merge"
|
||||
gate_threshold: 0
|
||||
|
||||
# Test fixes
|
||||
test_fixed:
|
||||
description: "Fixed a failing test"
|
||||
reward: 8
|
||||
category: "test"
|
||||
|
||||
test_added:
|
||||
description: "Added new test coverage"
|
||||
reward: 5
|
||||
category: "test"
|
||||
|
||||
critical_bug_fixed:
|
||||
description: "Fixed a critical bug on main"
|
||||
reward: 25
|
||||
category: "test"
|
||||
|
||||
# General operations
|
||||
automation_run:
|
||||
description: "Ran any automation (resource usage)"
|
||||
penalty: -1
|
||||
category: "operation"
|
||||
|
||||
automation_failure:
|
||||
description: "Automation failed or produced error"
|
||||
penalty: -2
|
||||
category: "operation"
|
||||
|
||||
cycle_retro_logged:
|
||||
description: "Logged structured retrospective data"
|
||||
reward: 5
|
||||
category: "operation"
|
||||
|
||||
pre_commit_passed:
|
||||
description: "Pre-commit checks passed"
|
||||
reward: 2
|
||||
category: "operation"
|
||||
|
||||
pre_commit_failed:
|
||||
description: "Pre-commit checks failed"
|
||||
penalty: -1
|
||||
category: "operation"
|
||||
|
||||
# ── Gating Thresholds ──────────────────────────────────────────────────────
|
||||
# Minimum token balances required for sensitive operations
|
||||
|
||||
gating_thresholds:
|
||||
pr_merge: 0
|
||||
sensitive_config_change: 50
|
||||
agent_workspace_create: 10
|
||||
deep_triage_run: 0
|
||||
|
||||
# ── Daily Limits ───────────────────────────────────────────────────────────
|
||||
# Maximum tokens that can be earned/spent per category per day
|
||||
|
||||
daily_limits:
|
||||
triage:
|
||||
max_earn: 100
|
||||
max_spend: 0
|
||||
daily_run:
|
||||
max_earn: 50
|
||||
max_spend: 0
|
||||
merge:
|
||||
max_earn: 100
|
||||
max_spend: 0
|
||||
test:
|
||||
max_earn: 100
|
||||
max_spend: 0
|
||||
operation:
|
||||
max_earn: 50
|
||||
max_spend: 50
|
||||
|
||||
# ── Audit Settings ─────────────────────────────────────────────────────────
|
||||
# Settings for token audit and inspection
|
||||
|
||||
audit:
|
||||
log_all_transactions: true
|
||||
log_retention_days: 30
|
||||
inspectable_by: ["orchestrator", "auditor", "timmy"]
|
||||
619
timmy_automations/daily_run/health_snapshot.py
Executable file
619
timmy_automations/daily_run/health_snapshot.py
Executable file
@@ -0,0 +1,619 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Quick health snapshot before coding — checks CI, issues, flakiness.
|
||||
|
||||
A fast status check that shows major red/green signals before deeper work.
|
||||
Runs in a few seconds and produces a concise summary.
|
||||
|
||||
Run: python3 timmy_automations/daily_run/health_snapshot.py
|
||||
Env: GITEA_API, GITEA_TOKEN, REPO_SLUG
|
||||
|
||||
Refs: #710
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import HTTPError, URLError
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "rockachopa/Timmy-time-dashboard",
|
||||
"token_file": "~/.hermes/gitea_token",
|
||||
"critical_labels": ["P0", "P1", "priority/critical", "priority/high"],
|
||||
"flakiness_lookback_cycles": 20,
|
||||
"ci_timeout_seconds": 5,
|
||||
}
|
||||
|
||||
|
||||
def load_config() -> dict:
|
||||
"""Load configuration with fallback to defaults."""
|
||||
config = DEFAULT_CONFIG.copy()
|
||||
|
||||
# Environment variable overrides
|
||||
if os.environ.get("TIMMY_GITEA_API"):
|
||||
config["gitea_api"] = os.environ["TIMMY_GITEA_API"]
|
||||
if os.environ.get("TIMMY_REPO_SLUG"):
|
||||
config["repo_slug"] = os.environ["TIMMY_REPO_SLUG"]
|
||||
if os.environ.get("TIMMY_GITEA_TOKEN"):
|
||||
config["token"] = os.environ["TIMMY_GITEA_TOKEN"]
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_token(config: dict) -> str | None:
|
||||
"""Get Gitea token from environment or file."""
|
||||
if "token" in config:
|
||||
return config["token"]
|
||||
|
||||
# Try timmy's token file
|
||||
repo_root = Path(__file__).resolve().parent.parent.parent
|
||||
timmy_token_path = repo_root / ".timmy_gitea_token"
|
||||
if timmy_token_path.exists():
|
||||
return timmy_token_path.read_text().strip()
|
||||
|
||||
# Fallback to legacy token file
|
||||
token_file = Path(config["token_file"]).expanduser()
|
||||
if token_file.exists():
|
||||
return token_file.read_text().strip()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ── Gitea API Client ──────────────────────────────────────────────────────
|
||||
|
||||
class GiteaClient:
|
||||
"""Simple Gitea API client with graceful degradation."""
|
||||
|
||||
def __init__(self, config: dict, token: str | None):
|
||||
self.api_base = config["gitea_api"].rstrip("/")
|
||||
self.repo_slug = config["repo_slug"]
|
||||
self.token = token
|
||||
self._available: bool | None = None
|
||||
|
||||
def _headers(self) -> dict:
|
||||
headers = {"Accept": "application/json"}
|
||||
if self.token:
|
||||
headers["Authorization"] = f"token {self.token}"
|
||||
return headers
|
||||
|
||||
def _api_url(self, path: str) -> str:
|
||||
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if Gitea API is reachable."""
|
||||
if self._available is not None:
|
||||
return self._available
|
||||
|
||||
try:
|
||||
req = Request(
|
||||
f"{self.api_base}/version",
|
||||
headers=self._headers(),
|
||||
method="GET",
|
||||
)
|
||||
with urlopen(req, timeout=3) as resp:
|
||||
self._available = resp.status == 200
|
||||
return self._available
|
||||
except (HTTPError, URLError, TimeoutError):
|
||||
self._available = False
|
||||
return False
|
||||
|
||||
def get(self, path: str, params: dict | None = None) -> list | dict:
|
||||
"""Make a GET request to the Gitea API."""
|
||||
url = self._api_url(path)
|
||||
if params:
|
||||
query = "&".join(f"{k}={v}" for k, v in params.items())
|
||||
url = f"{url}?{query}"
|
||||
|
||||
req = Request(url, headers=self._headers(), method="GET")
|
||||
with urlopen(req, timeout=10) as resp:
|
||||
return json.loads(resp.read())
|
||||
|
||||
def get_paginated(self, path: str, params: dict | None = None) -> list:
|
||||
"""Fetch all pages of a paginated endpoint."""
|
||||
all_items = []
|
||||
page = 1
|
||||
limit = 50
|
||||
|
||||
while True:
|
||||
page_params = {"limit": limit, "page": page}
|
||||
if params:
|
||||
page_params.update(params)
|
||||
|
||||
batch = self.get(path, page_params)
|
||||
if not batch:
|
||||
break
|
||||
|
||||
all_items.extend(batch)
|
||||
if len(batch) < limit:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return all_items
|
||||
|
||||
|
||||
# ── Data Models ───────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class CISignal:
|
||||
"""CI pipeline status signal."""
|
||||
status: str # "pass", "fail", "unknown", "unavailable"
|
||||
message: str
|
||||
details: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class IssueSignal:
|
||||
"""Critical issues signal."""
|
||||
count: int
|
||||
p0_count: int
|
||||
p1_count: int
|
||||
issues: list[dict[str, Any]] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class FlakinessSignal:
|
||||
"""Test flakiness/error rate signal."""
|
||||
status: str # "healthy", "degraded", "critical", "unknown"
|
||||
recent_failures: int
|
||||
recent_cycles: int
|
||||
failure_rate: float
|
||||
message: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TokenEconomySignal:
|
||||
"""Token economy temperature indicator."""
|
||||
status: str # "balanced", "inflationary", "deflationary", "unknown"
|
||||
message: str
|
||||
recent_mint: int = 0
|
||||
recent_burn: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class HealthSnapshot:
|
||||
"""Complete health snapshot."""
|
||||
timestamp: str
|
||||
overall_status: str # "green", "yellow", "red"
|
||||
ci: CISignal
|
||||
issues: IssueSignal
|
||||
flakiness: FlakinessSignal
|
||||
tokens: TokenEconomySignal
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"timestamp": self.timestamp,
|
||||
"overall_status": self.overall_status,
|
||||
"ci": {
|
||||
"status": self.ci.status,
|
||||
"message": self.ci.message,
|
||||
"details": self.ci.details,
|
||||
},
|
||||
"issues": {
|
||||
"count": self.issues.count,
|
||||
"p0_count": self.issues.p0_count,
|
||||
"p1_count": self.issues.p1_count,
|
||||
"issues": self.issues.issues[:5], # Limit to 5
|
||||
},
|
||||
"flakiness": {
|
||||
"status": self.flakiness.status,
|
||||
"recent_failures": self.flakiness.recent_failures,
|
||||
"recent_cycles": self.flakiness.recent_cycles,
|
||||
"failure_rate": round(self.flakiness.failure_rate, 2),
|
||||
"message": self.flakiness.message,
|
||||
},
|
||||
"tokens": {
|
||||
"status": self.tokens.status,
|
||||
"message": self.tokens.message,
|
||||
"recent_mint": self.tokens.recent_mint,
|
||||
"recent_burn": self.tokens.recent_burn,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# ── Health Check Functions ────────────────────────────────────────────────
|
||||
|
||||
def check_ci_status(client: GiteaClient, config: dict) -> CISignal:
|
||||
"""Check CI pipeline status from recent commits."""
|
||||
try:
|
||||
# Get recent commits with status
|
||||
commits = client.get_paginated("commits", {"limit": 5})
|
||||
|
||||
if not commits:
|
||||
return CISignal(
|
||||
status="unknown",
|
||||
message="No recent commits found",
|
||||
)
|
||||
|
||||
# Check status for most recent commit
|
||||
latest = commits[0]
|
||||
sha = latest.get("sha", "")
|
||||
|
||||
try:
|
||||
statuses = client.get(f"commits/{sha}/status")
|
||||
state = statuses.get("state", "unknown")
|
||||
|
||||
if state == "success":
|
||||
return CISignal(
|
||||
status="pass",
|
||||
message="CI passing",
|
||||
details={"sha": sha[:8], "state": state},
|
||||
)
|
||||
elif state in ("failure", "error"):
|
||||
return CISignal(
|
||||
status="fail",
|
||||
message=f"CI failed ({state})",
|
||||
details={"sha": sha[:8], "state": state},
|
||||
)
|
||||
elif state == "pending":
|
||||
return CISignal(
|
||||
status="unknown",
|
||||
message="CI pending",
|
||||
details={"sha": sha[:8], "state": state},
|
||||
)
|
||||
else:
|
||||
return CISignal(
|
||||
status="unknown",
|
||||
message=f"CI status: {state}",
|
||||
details={"sha": sha[:8], "state": state},
|
||||
)
|
||||
except (HTTPError, URLError) as exc:
|
||||
return CISignal(
|
||||
status="unknown",
|
||||
message=f"Could not fetch CI status: {exc}",
|
||||
)
|
||||
|
||||
except (HTTPError, URLError) as exc:
|
||||
return CISignal(
|
||||
status="unavailable",
|
||||
message=f"CI check failed: {exc}",
|
||||
)
|
||||
|
||||
|
||||
def check_critical_issues(client: GiteaClient, config: dict) -> IssueSignal:
|
||||
"""Check for open P0/P1 issues."""
|
||||
critical_labels = config.get("critical_labels", ["P0", "P1"])
|
||||
|
||||
try:
|
||||
# Fetch open issues
|
||||
issues = client.get_paginated("issues", {"state": "open", "limit": 100})
|
||||
|
||||
p0_issues = []
|
||||
p1_issues = []
|
||||
other_critical = []
|
||||
|
||||
for issue in issues:
|
||||
labels = [l.get("name", "").lower() for l in issue.get("labels", [])]
|
||||
|
||||
# Check for P0/P1 labels
|
||||
is_p0 = any("p0" in l or "critical" in l for l in labels)
|
||||
is_p1 = any("p1" in l or "high" in l for l in labels)
|
||||
|
||||
issue_summary = {
|
||||
"number": issue.get("number"),
|
||||
"title": issue.get("title", "Untitled")[:60],
|
||||
"url": issue.get("html_url", ""),
|
||||
}
|
||||
|
||||
if is_p0:
|
||||
p0_issues.append(issue_summary)
|
||||
elif is_p1:
|
||||
p1_issues.append(issue_summary)
|
||||
elif any(cl.lower() in labels for cl in critical_labels):
|
||||
other_critical.append(issue_summary)
|
||||
|
||||
all_critical = p0_issues + p1_issues + other_critical
|
||||
|
||||
return IssueSignal(
|
||||
count=len(all_critical),
|
||||
p0_count=len(p0_issues),
|
||||
p1_count=len(p1_issues),
|
||||
issues=all_critical[:10], # Limit stored issues
|
||||
)
|
||||
|
||||
except (HTTPError, URLError) as exc:
|
||||
return IssueSignal(
|
||||
count=0,
|
||||
p0_count=0,
|
||||
p1_count=0,
|
||||
issues=[],
|
||||
)
|
||||
|
||||
|
||||
def check_flakiness(config: dict) -> FlakinessSignal:
|
||||
"""Check test flakiness from cycle retrospective data."""
|
||||
retro_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
|
||||
lookback = config.get("flakiness_lookback_cycles", 20)
|
||||
|
||||
if not retro_file.exists():
|
||||
return FlakinessSignal(
|
||||
status="unknown",
|
||||
recent_failures=0,
|
||||
recent_cycles=0,
|
||||
failure_rate=0.0,
|
||||
message="No cycle data available",
|
||||
)
|
||||
|
||||
try:
|
||||
entries = []
|
||||
for line in retro_file.read_text().strip().splitlines():
|
||||
try:
|
||||
entries.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
# Get recent entries
|
||||
recent = entries[-lookback:] if len(entries) > lookback else entries
|
||||
|
||||
failures = [e for e in recent if not e.get("success", True)]
|
||||
failure_count = len(failures)
|
||||
total_count = len(recent)
|
||||
|
||||
if total_count == 0:
|
||||
return FlakinessSignal(
|
||||
status="unknown",
|
||||
recent_failures=0,
|
||||
recent_cycles=0,
|
||||
failure_rate=0.0,
|
||||
message="No recent cycle data",
|
||||
)
|
||||
|
||||
failure_rate = failure_count / total_count
|
||||
|
||||
# Determine status based on failure rate
|
||||
if failure_rate < 0.1:
|
||||
status = "healthy"
|
||||
message = f"Low flakiness ({failure_rate:.0%})"
|
||||
elif failure_rate < 0.3:
|
||||
status = "degraded"
|
||||
message = f"Moderate flakiness ({failure_rate:.0%})"
|
||||
else:
|
||||
status = "critical"
|
||||
message = f"High flakiness ({failure_rate:.0%})"
|
||||
|
||||
return FlakinessSignal(
|
||||
status=status,
|
||||
recent_failures=failure_count,
|
||||
recent_cycles=total_count,
|
||||
failure_rate=failure_rate,
|
||||
message=message,
|
||||
)
|
||||
|
||||
except (OSError, ValueError) as exc:
|
||||
return FlakinessSignal(
|
||||
status="unknown",
|
||||
recent_failures=0,
|
||||
recent_cycles=0,
|
||||
failure_rate=0.0,
|
||||
message=f"Could not read cycle data: {exc}",
|
||||
)
|
||||
|
||||
|
||||
def check_token_economy(config: dict) -> TokenEconomySignal:
|
||||
"""Check token economy temperature from recent transactions."""
|
||||
# This is a simplified check - in a full implementation,
|
||||
# this would query the token ledger
|
||||
ledger_file = REPO_ROOT / ".loop" / "token_economy.jsonl"
|
||||
|
||||
if not ledger_file.exists():
|
||||
return TokenEconomySignal(
|
||||
status="unknown",
|
||||
message="No token economy data",
|
||||
)
|
||||
|
||||
try:
|
||||
# Read last 24 hours of transactions
|
||||
since = datetime.now(timezone.utc) - timedelta(hours=24)
|
||||
|
||||
recent_mint = 0
|
||||
recent_burn = 0
|
||||
|
||||
for line in ledger_file.read_text().strip().splitlines():
|
||||
try:
|
||||
tx = json.loads(line)
|
||||
tx_time = datetime.fromisoformat(tx.get("timestamp", "1970-01-01").replace("Z", "+00:00"))
|
||||
if tx_time >= since:
|
||||
delta = tx.get("delta", 0)
|
||||
if delta > 0:
|
||||
recent_mint += delta
|
||||
else:
|
||||
recent_burn += abs(delta)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
|
||||
# Simple temperature check
|
||||
if recent_mint > recent_burn * 2:
|
||||
status = "inflationary"
|
||||
message = f"High mint activity (+{recent_mint}/-{recent_burn})"
|
||||
elif recent_burn > recent_mint * 2:
|
||||
status = "deflationary"
|
||||
message = f"High burn activity (+{recent_mint}/-{recent_burn})"
|
||||
else:
|
||||
status = "balanced"
|
||||
message = f"Balanced flow (+{recent_mint}/-{recent_burn})"
|
||||
|
||||
return TokenEconomySignal(
|
||||
status=status,
|
||||
message=message,
|
||||
recent_mint=recent_mint,
|
||||
recent_burn=recent_burn,
|
||||
)
|
||||
|
||||
except (OSError, ValueError) as exc:
|
||||
return TokenEconomySignal(
|
||||
status="unknown",
|
||||
message=f"Could not read token data: {exc}",
|
||||
)
|
||||
|
||||
|
||||
def calculate_overall_status(
|
||||
ci: CISignal,
|
||||
issues: IssueSignal,
|
||||
flakiness: FlakinessSignal,
|
||||
) -> str:
|
||||
"""Calculate overall status from individual signals."""
|
||||
# Red conditions
|
||||
if ci.status == "fail":
|
||||
return "red"
|
||||
if issues.p0_count > 0:
|
||||
return "red"
|
||||
if flakiness.status == "critical":
|
||||
return "red"
|
||||
|
||||
# Yellow conditions
|
||||
if ci.status == "unknown":
|
||||
return "yellow"
|
||||
if issues.p1_count > 0:
|
||||
return "yellow"
|
||||
if flakiness.status == "degraded":
|
||||
return "yellow"
|
||||
|
||||
# Green
|
||||
return "green"
|
||||
|
||||
|
||||
# ── Main Functions ────────────────────────────────────────────────────────
|
||||
|
||||
def generate_snapshot(config: dict, token: str | None) -> HealthSnapshot:
|
||||
"""Generate a complete health snapshot."""
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
# Always run all checks (don't short-circuit)
|
||||
if client.is_available():
|
||||
ci = check_ci_status(client, config)
|
||||
issues = check_critical_issues(client, config)
|
||||
else:
|
||||
ci = CISignal(
|
||||
status="unavailable",
|
||||
message="Gitea unavailable",
|
||||
)
|
||||
issues = IssueSignal(count=0, p0_count=0, p1_count=0, issues=[])
|
||||
|
||||
flakiness = check_flakiness(config)
|
||||
tokens = check_token_economy(config)
|
||||
|
||||
overall = calculate_overall_status(ci, issues, flakiness)
|
||||
|
||||
return HealthSnapshot(
|
||||
timestamp=datetime.now(timezone.utc).isoformat(),
|
||||
overall_status=overall,
|
||||
ci=ci,
|
||||
issues=issues,
|
||||
flakiness=flakiness,
|
||||
tokens=tokens,
|
||||
)
|
||||
|
||||
|
||||
def print_snapshot(snapshot: HealthSnapshot, verbose: bool = False) -> None:
|
||||
"""Print a formatted health snapshot."""
|
||||
# Status emoji
|
||||
status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get(
|
||||
snapshot.overall_status, "⚪"
|
||||
)
|
||||
|
||||
print("=" * 60)
|
||||
print(f"{status_emoji} HEALTH SNAPSHOT")
|
||||
print("=" * 60)
|
||||
print(f"Generated: {snapshot.timestamp}")
|
||||
print(f"Overall: {snapshot.overall_status.upper()}")
|
||||
print()
|
||||
|
||||
# CI Status
|
||||
ci_emoji = {"pass": "✅", "fail": "❌", "unknown": "⚠️", "unavailable": "⚪"}.get(
|
||||
snapshot.ci.status, "⚪"
|
||||
)
|
||||
print(f"{ci_emoji} CI: {snapshot.ci.message}")
|
||||
|
||||
# Issues
|
||||
if snapshot.issues.p0_count > 0:
|
||||
issue_emoji = "🔴"
|
||||
elif snapshot.issues.p1_count > 0:
|
||||
issue_emoji = "🟡"
|
||||
else:
|
||||
issue_emoji = "✅"
|
||||
print(f"{issue_emoji} Issues: {snapshot.issues.count} critical")
|
||||
if snapshot.issues.p0_count > 0:
|
||||
print(f" 🔴 P0: {snapshot.issues.p0_count}")
|
||||
if snapshot.issues.p1_count > 0:
|
||||
print(f" 🟡 P1: {snapshot.issues.p1_count}")
|
||||
|
||||
# Flakiness
|
||||
flak_emoji = {"healthy": "✅", "degraded": "🟡", "critical": "🔴", "unknown": "⚪"}.get(
|
||||
snapshot.flakiness.status, "⚪"
|
||||
)
|
||||
print(f"{flak_emoji} Flakiness: {snapshot.flakiness.message}")
|
||||
|
||||
# Token Economy
|
||||
token_emoji = {"balanced": "✅", "inflationary": "🟡", "deflationary": "🔵", "unknown": "⚪"}.get(
|
||||
snapshot.tokens.status, "⚪"
|
||||
)
|
||||
print(f"{token_emoji} Tokens: {snapshot.tokens.message}")
|
||||
|
||||
# Verbose: show issue details
|
||||
if verbose and snapshot.issues.issues:
|
||||
print()
|
||||
print("Critical Issues:")
|
||||
for issue in snapshot.issues.issues[:5]:
|
||||
print(f" #{issue['number']}: {issue['title'][:50]}")
|
||||
|
||||
print()
|
||||
print("─" * 60)
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(
|
||||
description="Quick health snapshot before coding",
|
||||
)
|
||||
p.add_argument(
|
||||
"--json", "-j",
|
||||
action="store_true",
|
||||
help="Output as JSON",
|
||||
)
|
||||
p.add_argument(
|
||||
"--verbose", "-v",
|
||||
action="store_true",
|
||||
help="Show verbose output including issue details",
|
||||
)
|
||||
p.add_argument(
|
||||
"--quiet", "-q",
|
||||
action="store_true",
|
||||
help="Only show status line (no details)",
|
||||
)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Main entry point for CLI."""
|
||||
args = parse_args()
|
||||
config = load_config()
|
||||
token = get_token(config)
|
||||
|
||||
snapshot = generate_snapshot(config, token)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(snapshot.to_dict(), indent=2))
|
||||
elif args.quiet:
|
||||
status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get(
|
||||
snapshot.overall_status, "⚪"
|
||||
)
|
||||
print(f"{status_emoji} {snapshot.overall_status.upper()}")
|
||||
else:
|
||||
print_snapshot(snapshot, verbose=args.verbose)
|
||||
|
||||
# Exit with non-zero if red status
|
||||
return 0 if snapshot.overall_status != "red" else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -22,6 +22,14 @@ from typing import Any
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import HTTPError, URLError
|
||||
|
||||
# ── Token Economy Integration ──────────────────────────────────────────────
|
||||
# Import token rules helpers for tracking Daily Run rewards
|
||||
|
||||
sys.path.insert(
|
||||
0, str(Path(__file__).resolve().parent.parent)
|
||||
)
|
||||
from utils.token_rules import TokenRules, compute_token_reward
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
||||
@@ -490,6 +498,43 @@ def parse_args() -> argparse.Namespace:
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def compute_daily_run_tokens(success: bool = True) -> dict[str, Any]:
|
||||
"""Compute token rewards for Daily Run completion.
|
||||
|
||||
Uses the centralized token_rules configuration to calculate
|
||||
rewards/penalties for automation actions.
|
||||
|
||||
Args:
|
||||
success: Whether the Daily Run completed successfully
|
||||
|
||||
Returns:
|
||||
Token transaction details
|
||||
"""
|
||||
rules = TokenRules()
|
||||
|
||||
if success:
|
||||
# Daily run completed successfully
|
||||
transaction = compute_token_reward("daily_run_completed", current_tokens=0)
|
||||
|
||||
# Also compute golden path generation if agenda was created
|
||||
agenda_transaction = compute_token_reward("golden_path_generated", current_tokens=0)
|
||||
|
||||
return {
|
||||
"daily_run": transaction,
|
||||
"golden_path": agenda_transaction,
|
||||
"total_delta": transaction.get("delta", 0) + agenda_transaction.get("delta", 0),
|
||||
"config_version": rules.get_config_version(),
|
||||
}
|
||||
else:
|
||||
# Automation failed
|
||||
transaction = compute_token_reward("automation_failure", current_tokens=0)
|
||||
return {
|
||||
"automation_failure": transaction,
|
||||
"total_delta": transaction.get("delta", 0),
|
||||
"config_version": rules.get_config_version(),
|
||||
}
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
config = load_config()
|
||||
@@ -503,10 +548,13 @@ def main() -> int:
|
||||
# Check Gitea availability
|
||||
if not client.is_available():
|
||||
error_msg = "[orchestrator] Error: Gitea API is not available"
|
||||
# Compute failure tokens even when unavailable
|
||||
tokens = compute_daily_run_tokens(success=False)
|
||||
if args.json:
|
||||
print(json.dumps({"error": error_msg}))
|
||||
print(json.dumps({"error": error_msg, "tokens": tokens}))
|
||||
else:
|
||||
print(error_msg, file=sys.stderr)
|
||||
print(f"[tokens] Failure penalty: {tokens['total_delta']}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
# Fetch candidates and generate agenda
|
||||
@@ -521,9 +569,12 @@ def main() -> int:
|
||||
cycles = load_cycle_data()
|
||||
day_summary = generate_day_summary(activity, cycles)
|
||||
|
||||
# Compute token rewards for successful completion
|
||||
tokens = compute_daily_run_tokens(success=True)
|
||||
|
||||
# Output
|
||||
if args.json:
|
||||
output = {"agenda": agenda}
|
||||
output = {"agenda": agenda, "tokens": tokens}
|
||||
if day_summary:
|
||||
output["day_summary"] = day_summary
|
||||
print(json.dumps(output, indent=2))
|
||||
@@ -531,6 +582,15 @@ def main() -> int:
|
||||
print_agenda(agenda)
|
||||
if day_summary and activity:
|
||||
print_day_summary(day_summary, activity)
|
||||
# Show token rewards
|
||||
print("─" * 60)
|
||||
print("🪙 Token Rewards")
|
||||
print("─" * 60)
|
||||
print(f"Daily Run completed: +{tokens['daily_run']['delta']} tokens")
|
||||
if candidates:
|
||||
print(f"Golden path generated: +{tokens['golden_path']['delta']} tokens")
|
||||
print(f"Total: +{tokens['total_delta']} tokens")
|
||||
print(f"Config version: {tokens['config_version']}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
745
timmy_automations/daily_run/weekly_narrative.py
Normal file
745
timmy_automations/daily_run/weekly_narrative.py
Normal file
@@ -0,0 +1,745 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Weekly narrative summary generator — human-readable loop analysis.
|
||||
|
||||
Analyzes the past week's activity across the development loop to produce
|
||||
a narrative summary of:
|
||||
- What changed (themes, areas of focus)
|
||||
- How agents and Timmy contributed
|
||||
- Any shifts in tests, triage, or token economy
|
||||
|
||||
The output is designed to be skimmable — a quick read that gives context
|
||||
on the week's progress without drowning in metrics.
|
||||
|
||||
Run: python3 timmy_automations/daily_run/weekly_narrative.py [--json]
|
||||
Env: See timmy_automations/config/automations.json for configuration
|
||||
|
||||
Refs: #719
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from collections import Counter
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
||||
CONFIG_PATH = Path(__file__).parent.parent / "config" / "automations.json"
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"gitea_api": "http://localhost:3000/api/v1",
|
||||
"repo_slug": "rockachopa/Timmy-time-dashboard",
|
||||
"token_file": "~/.hermes/gitea_token",
|
||||
"lookback_days": 7,
|
||||
"output_file": ".loop/weekly_narrative.json",
|
||||
"enabled": True,
|
||||
}
|
||||
|
||||
|
||||
# ── Data Loading ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def load_automation_config() -> dict:
|
||||
"""Load configuration for weekly_narrative from automations manifest."""
|
||||
config = DEFAULT_CONFIG.copy()
|
||||
if CONFIG_PATH.exists():
|
||||
try:
|
||||
manifest = json.loads(CONFIG_PATH.read_text())
|
||||
for auto in manifest.get("automations", []):
|
||||
if auto.get("id") == "weekly_narrative":
|
||||
config.update(auto.get("config", {}))
|
||||
config["enabled"] = auto.get("enabled", True)
|
||||
break
|
||||
except (json.JSONDecodeError, OSError) as exc:
|
||||
print(f"[weekly_narrative] Warning: Could not load config: {exc}", file=sys.stderr)
|
||||
|
||||
# Environment variable overrides
|
||||
if os.environ.get("TIMMY_GITEA_API"):
|
||||
config["gitea_api"] = os.environ.get("TIMMY_GITEA_API")
|
||||
if os.environ.get("TIMMY_REPO_SLUG"):
|
||||
config["repo_slug"] = os.environ.get("TIMMY_REPO_SLUG")
|
||||
if os.environ.get("TIMMY_GITEA_TOKEN"):
|
||||
config["token"] = os.environ.get("TIMMY_GITEA_TOKEN")
|
||||
if os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED"):
|
||||
config["enabled"] = os.environ.get("TIMMY_WEEKLY_NARRATIVE_ENABLED", "true").lower() == "true"
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_token(config: dict) -> str | None:
|
||||
"""Get Gitea token from environment or file."""
|
||||
if "token" in config:
|
||||
return config["token"]
|
||||
|
||||
token_file = Path(config["token_file"]).expanduser()
|
||||
if token_file.exists():
|
||||
return token_file.read_text().strip()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def load_jsonl(path: Path) -> list[dict]:
|
||||
"""Load a JSONL file, skipping bad lines."""
|
||||
if not path.exists():
|
||||
return []
|
||||
entries = []
|
||||
for line in path.read_text().strip().splitlines():
|
||||
try:
|
||||
entries.append(json.loads(line))
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
return entries
|
||||
|
||||
|
||||
def parse_ts(ts_str: str) -> datetime | None:
|
||||
"""Parse an ISO timestamp, tolerating missing tz."""
|
||||
if not ts_str:
|
||||
return None
|
||||
try:
|
||||
dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=UTC)
|
||||
return dt
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
# ── Gitea API Client ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
"""Simple Gitea API client with graceful degradation."""
|
||||
|
||||
def __init__(self, config: dict, token: str | None):
|
||||
self.api_base = config["gitea_api"].rstrip("/")
|
||||
self.repo_slug = config["repo_slug"]
|
||||
self.token = token
|
||||
self._available: bool | None = None
|
||||
|
||||
def _headers(self) -> dict:
|
||||
headers = {"Accept": "application/json"}
|
||||
if self.token:
|
||||
headers["Authorization"] = f"token {self.token}"
|
||||
return headers
|
||||
|
||||
def _api_url(self, path: str) -> str:
|
||||
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if Gitea API is reachable."""
|
||||
if self._available is not None:
|
||||
return self._available
|
||||
|
||||
try:
|
||||
req = Request(
|
||||
f"{self.api_base}/version",
|
||||
headers=self._headers(),
|
||||
method="GET",
|
||||
)
|
||||
with urlopen(req, timeout=5) as resp:
|
||||
self._available = resp.status == 200
|
||||
return self._available
|
||||
except (HTTPError, URLError, TimeoutError):
|
||||
self._available = False
|
||||
return False
|
||||
|
||||
def get_paginated(self, path: str, params: dict | None = None) -> list:
|
||||
"""Fetch all pages of a paginated endpoint."""
|
||||
all_items = []
|
||||
page = 1
|
||||
limit = 50
|
||||
|
||||
while True:
|
||||
url = self._api_url(path)
|
||||
query_parts = [f"limit={limit}", f"page={page}"]
|
||||
if params:
|
||||
for key, val in params.items():
|
||||
query_parts.append(f"{key}={val}")
|
||||
url = f"{url}?{'&'.join(query_parts)}"
|
||||
|
||||
req = Request(url, headers=self._headers(), method="GET")
|
||||
with urlopen(req, timeout=15) as resp:
|
||||
batch = json.loads(resp.read())
|
||||
|
||||
if not batch:
|
||||
break
|
||||
|
||||
all_items.extend(batch)
|
||||
if len(batch) < limit:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return all_items
|
||||
|
||||
|
||||
# ── Data Collection ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def collect_cycles_data(since: datetime) -> dict:
|
||||
"""Load cycle retrospective data from the lookback period."""
|
||||
cycles_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
|
||||
if not cycles_file.exists():
|
||||
return {"cycles": [], "total": 0, "successes": 0, "failures": 0}
|
||||
|
||||
entries = load_jsonl(cycles_file)
|
||||
recent = []
|
||||
for e in entries:
|
||||
ts = parse_ts(e.get("timestamp", ""))
|
||||
if ts and ts >= since:
|
||||
recent.append(e)
|
||||
|
||||
successes = [e for e in recent if e.get("success")]
|
||||
failures = [e for e in recent if not e.get("success")]
|
||||
|
||||
return {
|
||||
"cycles": recent,
|
||||
"total": len(recent),
|
||||
"successes": len(successes),
|
||||
"failures": len(failures),
|
||||
"success_rate": round(len(successes) / len(recent), 2) if recent else 0,
|
||||
}
|
||||
|
||||
|
||||
def collect_issues_data(client: GiteaClient, since: datetime) -> dict:
|
||||
"""Collect issue activity from Gitea."""
|
||||
if not client.is_available():
|
||||
return {"error": "Gitea unavailable", "issues": [], "closed": [], "opened": []}
|
||||
|
||||
try:
|
||||
issues = client.get_paginated("issues", {"state": "all", "sort": "updated", "limit": 100})
|
||||
except (HTTPError, URLError) as exc:
|
||||
return {"error": str(exc), "issues": [], "closed": [], "opened": []}
|
||||
|
||||
touched = []
|
||||
closed = []
|
||||
opened = []
|
||||
|
||||
for issue in issues:
|
||||
updated_at = issue.get("updated_at", "")
|
||||
created_at = issue.get("created_at", "")
|
||||
|
||||
updated = parse_ts(updated_at)
|
||||
created = parse_ts(created_at)
|
||||
|
||||
if updated and updated >= since:
|
||||
touched.append(issue)
|
||||
|
||||
if issue.get("state") == "closed":
|
||||
closed_at = issue.get("closed_at", "")
|
||||
closed_dt = parse_ts(closed_at)
|
||||
if closed_dt and closed_dt >= since:
|
||||
closed.append(issue)
|
||||
elif created and created >= since:
|
||||
opened.append(issue)
|
||||
|
||||
return {
|
||||
"issues": touched,
|
||||
"closed": closed,
|
||||
"opened": opened,
|
||||
"touched_count": len(touched),
|
||||
"closed_count": len(closed),
|
||||
"opened_count": len(opened),
|
||||
}
|
||||
|
||||
|
||||
def collect_prs_data(client: GiteaClient, since: datetime) -> dict:
|
||||
"""Collect PR activity from Gitea."""
|
||||
if not client.is_available():
|
||||
return {"error": "Gitea unavailable", "prs": [], "merged": [], "opened": []}
|
||||
|
||||
try:
|
||||
prs = client.get_paginated("pulls", {"state": "all", "sort": "updated", "limit": 100})
|
||||
except (HTTPError, URLError) as exc:
|
||||
return {"error": str(exc), "prs": [], "merged": [], "opened": []}
|
||||
|
||||
touched = []
|
||||
merged = []
|
||||
opened = []
|
||||
|
||||
for pr in prs:
|
||||
updated_at = pr.get("updated_at", "")
|
||||
created_at = pr.get("created_at", "")
|
||||
merged_at = pr.get("merged_at", "")
|
||||
|
||||
updated = parse_ts(updated_at)
|
||||
created = parse_ts(created_at)
|
||||
merged_dt = parse_ts(merged_at) if merged_at else None
|
||||
|
||||
if updated and updated >= since:
|
||||
touched.append(pr)
|
||||
|
||||
if pr.get("merged") and merged_dt and merged_dt >= since:
|
||||
merged.append(pr)
|
||||
elif created and created >= since:
|
||||
opened.append(pr)
|
||||
|
||||
return {
|
||||
"prs": touched,
|
||||
"merged": merged,
|
||||
"opened": opened,
|
||||
"touched_count": len(touched),
|
||||
"merged_count": len(merged),
|
||||
"opened_count": len(opened),
|
||||
}
|
||||
|
||||
|
||||
def collect_triage_data(since: datetime) -> dict:
|
||||
"""Load triage and introspection data."""
|
||||
triage_file = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
|
||||
insights_file = REPO_ROOT / ".loop" / "retro" / "insights.json"
|
||||
|
||||
triage_entries = load_jsonl(triage_file)
|
||||
recent_triage = [
|
||||
e for e in triage_entries
|
||||
if parse_ts(e.get("timestamp", "")) and parse_ts(e.get("timestamp", "")) >= since
|
||||
]
|
||||
|
||||
insights = {}
|
||||
if insights_file.exists():
|
||||
try:
|
||||
insights = json.loads(insights_file.read_text())
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
|
||||
return {
|
||||
"triage_runs": len(recent_triage),
|
||||
"triage_entries": recent_triage,
|
||||
"latest_insights": insights,
|
||||
}
|
||||
|
||||
|
||||
def collect_token_data(since: datetime) -> dict:
|
||||
"""Load token economy data from the lightning ledger."""
|
||||
# The ledger is in-memory but we can look for any persisted data
|
||||
# For now, return placeholder that will be filled by the ledger module
|
||||
return {
|
||||
"note": "Token economy data is ephemeral — check dashboard for live metrics",
|
||||
"balance_sats": 0, # Placeholder
|
||||
"transactions_week": 0,
|
||||
}
|
||||
|
||||
|
||||
# ── Analysis Functions ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def extract_themes(issues: list[dict]) -> list[dict]:
|
||||
"""Extract themes from issue labels."""
|
||||
label_counts = Counter()
|
||||
layer_counts = Counter()
|
||||
type_counts = Counter()
|
||||
|
||||
for issue in issues:
|
||||
for label in issue.get("labels", []):
|
||||
name = label.get("name", "")
|
||||
label_counts[name] += 1
|
||||
|
||||
if name.startswith("layer:"):
|
||||
layer_counts[name.replace("layer:", "")] += 1
|
||||
if name in ("bug", "feature", "refactor", "docs", "test", "chore"):
|
||||
type_counts[name] += 1
|
||||
|
||||
# Top themes (labels excluding layer prefixes)
|
||||
themes = [
|
||||
{"name": name, "count": count}
|
||||
for name, count in label_counts.most_common(10)
|
||||
if not name.startswith(("layer:", "size:"))
|
||||
]
|
||||
|
||||
# Layers
|
||||
layers = [
|
||||
{"name": name, "count": count}
|
||||
for name, count in layer_counts.most_common()
|
||||
]
|
||||
|
||||
# Types
|
||||
types = [
|
||||
{"name": name, "count": count}
|
||||
for name, count in type_counts.most_common()
|
||||
]
|
||||
|
||||
return {
|
||||
"top_labels": themes,
|
||||
"layers": layers,
|
||||
"types": types,
|
||||
}
|
||||
|
||||
|
||||
def extract_agent_contributions(issues: list[dict], prs: list[dict], cycles: list[dict]) -> dict:
|
||||
"""Extract agent contribution patterns."""
|
||||
# Count by assignee
|
||||
assignee_counts = Counter()
|
||||
for issue in issues:
|
||||
assignee = issue.get("assignee")
|
||||
if assignee and isinstance(assignee, dict):
|
||||
assignee_counts[assignee.get("login", "unknown")] += 1
|
||||
|
||||
# Count PR authors
|
||||
pr_authors = Counter()
|
||||
for pr in prs:
|
||||
user = pr.get("user")
|
||||
if user and isinstance(user, dict):
|
||||
pr_authors[user.get("login", "unknown")] += 1
|
||||
|
||||
# Check for Kimi mentions in cycle notes
|
||||
kimi_mentions = sum(
|
||||
1 for c in cycles
|
||||
if "kimi" in c.get("notes", "").lower() or "kimi" in c.get("reason", "").lower()
|
||||
)
|
||||
|
||||
return {
|
||||
"active_assignees": [
|
||||
{"login": login, "issues_count": count}
|
||||
for login, count in assignee_counts.most_common()
|
||||
],
|
||||
"pr_authors": [
|
||||
{"login": login, "prs_count": count}
|
||||
for login, count in pr_authors.most_common()
|
||||
],
|
||||
"kimi_mentioned_cycles": kimi_mentions,
|
||||
}
|
||||
|
||||
|
||||
def analyze_test_shifts(cycles: list[dict]) -> dict:
|
||||
"""Analyze shifts in test patterns."""
|
||||
if not cycles:
|
||||
return {"note": "No cycle data available"}
|
||||
|
||||
total_tests_passed = sum(c.get("tests_passed", 0) for c in cycles)
|
||||
total_tests_added = sum(c.get("tests_added", 0) for c in cycles)
|
||||
avg_tests_per_cycle = round(total_tests_passed / len(cycles), 1) if cycles else 0
|
||||
|
||||
# Look for test-related issues
|
||||
test_focused = [
|
||||
c for c in cycles
|
||||
if c.get("type") == "test" or "test" in c.get("notes", "").lower()
|
||||
]
|
||||
|
||||
return {
|
||||
"total_tests_passed": total_tests_passed,
|
||||
"total_tests_added": total_tests_added,
|
||||
"avg_tests_per_cycle": avg_tests_per_cycle,
|
||||
"test_focused_cycles": len(test_focused),
|
||||
}
|
||||
|
||||
|
||||
def analyze_triage_shifts(triage_data: dict) -> dict:
|
||||
"""Analyze shifts in triage patterns."""
|
||||
insights = triage_data.get("latest_insights", {})
|
||||
recommendations = insights.get("recommendations", [])
|
||||
|
||||
high_priority_recs = [
|
||||
r for r in recommendations
|
||||
if r.get("severity") == "high"
|
||||
]
|
||||
|
||||
return {
|
||||
"triage_runs": triage_data.get("triage_runs", 0),
|
||||
"insights_generated": insights.get("generated_at") is not None,
|
||||
"high_priority_recommendations": len(high_priority_recs),
|
||||
"recent_recommendations": recommendations[:3] if recommendations else [],
|
||||
}
|
||||
|
||||
|
||||
def generate_vibe_summary(
|
||||
cycles_data: dict,
|
||||
issues_data: dict,
|
||||
prs_data: dict,
|
||||
themes: dict,
|
||||
agent_contrib: dict,
|
||||
test_shifts: dict,
|
||||
triage_shifts: dict,
|
||||
) -> dict:
|
||||
"""Generate the human-readable 'vibe' summary."""
|
||||
# Determine overall vibe
|
||||
success_rate = cycles_data.get("success_rate", 0)
|
||||
failures = cycles_data.get("failures", 0)
|
||||
closed_count = issues_data.get("closed_count", 0)
|
||||
merged_count = prs_data.get("merged_count", 0)
|
||||
|
||||
if success_rate >= 0.9 and closed_count > 0:
|
||||
vibe = "productive"
|
||||
vibe_description = "A strong week with solid delivery and healthy success rates."
|
||||
elif success_rate >= 0.7:
|
||||
vibe = "steady"
|
||||
vibe_description = "Steady progress with some bumps. Things are moving forward."
|
||||
elif failures > cycles_data.get("successes", 0):
|
||||
vibe = "struggling"
|
||||
vibe_description = "A challenging week with more failures than successes. Time to regroup."
|
||||
else:
|
||||
vibe = "quiet"
|
||||
vibe_description = "A lighter week with limited activity."
|
||||
|
||||
# Focus areas from themes
|
||||
focus_areas = []
|
||||
for layer in themes.get("layers", [])[:3]:
|
||||
focus_areas.append(f"{layer['name']} ({layer['count']} items)")
|
||||
|
||||
# Agent activity summary
|
||||
agent_summary = ""
|
||||
active_assignees = agent_contrib.get("active_assignees", [])
|
||||
if active_assignees:
|
||||
top_agent = active_assignees[0]
|
||||
agent_summary = f"{top_agent['login']} led with {top_agent['issues_count']} assigned issues."
|
||||
|
||||
# Notable events
|
||||
notable = []
|
||||
if merged_count > 5:
|
||||
notable.append(f"{merged_count} PRs merged — high integration velocity")
|
||||
if triage_shifts.get("high_priority_recommendations", 0) > 0:
|
||||
notable.append("High-priority recommendations from loop introspection")
|
||||
if test_shifts.get("test_focused_cycles", 0) > 3:
|
||||
notable.append("Strong test coverage focus")
|
||||
if not notable:
|
||||
notable.append("Regular development flow")
|
||||
|
||||
return {
|
||||
"overall": vibe,
|
||||
"description": vibe_description,
|
||||
"focus_areas": focus_areas,
|
||||
"agent_summary": agent_summary,
|
||||
"notable_events": notable,
|
||||
}
|
||||
|
||||
|
||||
# ── Narrative Generation ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def generate_narrative(
|
||||
cycles_data: dict,
|
||||
issues_data: dict,
|
||||
prs_data: dict,
|
||||
triage_data: dict,
|
||||
themes: dict,
|
||||
agent_contrib: dict,
|
||||
test_shifts: dict,
|
||||
triage_shifts: dict,
|
||||
token_data: dict,
|
||||
since: datetime,
|
||||
until: datetime,
|
||||
) -> dict:
|
||||
"""Generate the complete weekly narrative."""
|
||||
vibe = generate_vibe_summary(
|
||||
cycles_data, issues_data, prs_data, themes, agent_contrib, test_shifts, triage_shifts
|
||||
)
|
||||
|
||||
return {
|
||||
"generated_at": datetime.now(UTC).isoformat(),
|
||||
"period": {
|
||||
"start": since.isoformat(),
|
||||
"end": until.isoformat(),
|
||||
"days": 7,
|
||||
},
|
||||
"vibe": vibe,
|
||||
"activity": {
|
||||
"cycles": {
|
||||
"total": cycles_data.get("total", 0),
|
||||
"successes": cycles_data.get("successes", 0),
|
||||
"failures": cycles_data.get("failures", 0),
|
||||
"success_rate": cycles_data.get("success_rate", 0),
|
||||
},
|
||||
"issues": {
|
||||
"touched": issues_data.get("touched_count", 0),
|
||||
"closed": issues_data.get("closed_count", 0),
|
||||
"opened": issues_data.get("opened_count", 0),
|
||||
},
|
||||
"pull_requests": {
|
||||
"touched": prs_data.get("touched_count", 0),
|
||||
"merged": prs_data.get("merged_count", 0),
|
||||
"opened": prs_data.get("opened_count", 0),
|
||||
},
|
||||
},
|
||||
"themes": themes,
|
||||
"agents": agent_contrib,
|
||||
"test_health": test_shifts,
|
||||
"triage_health": triage_shifts,
|
||||
"token_economy": token_data,
|
||||
}
|
||||
|
||||
|
||||
def generate_markdown_summary(narrative: dict) -> str:
|
||||
"""Generate a human-readable markdown summary."""
|
||||
vibe = narrative.get("vibe", {})
|
||||
activity = narrative.get("activity", {})
|
||||
cycles = activity.get("cycles", {})
|
||||
issues = activity.get("issues", {})
|
||||
prs = activity.get("pull_requests", {})
|
||||
|
||||
lines = [
|
||||
"# Weekly Narrative Summary",
|
||||
"",
|
||||
f"**Period:** {narrative['period']['start'][:10]} to {narrative['period']['end'][:10]}",
|
||||
f"**Vibe:** {vibe.get('overall', 'unknown').title()}",
|
||||
"",
|
||||
f"{vibe.get('description', '')}",
|
||||
"",
|
||||
"## Activity Highlights",
|
||||
"",
|
||||
f"- **Development Cycles:** {cycles.get('total', 0)} total ({cycles.get('successes', 0)} success, {cycles.get('failures', 0)} failure)",
|
||||
f"- **Issues:** {issues.get('closed', 0)} closed, {issues.get('opened', 0)} opened",
|
||||
f"- **Pull Requests:** {prs.get('merged', 0)} merged, {prs.get('opened', 0)} opened",
|
||||
"",
|
||||
]
|
||||
|
||||
# Focus areas
|
||||
focus = vibe.get("focus_areas", [])
|
||||
if focus:
|
||||
lines.append("## Focus Areas")
|
||||
lines.append("")
|
||||
for area in focus:
|
||||
lines.append(f"- {area}")
|
||||
lines.append("")
|
||||
|
||||
# Agent contributions
|
||||
agent_summary = vibe.get("agent_summary", "")
|
||||
if agent_summary:
|
||||
lines.append("## Agent Activity")
|
||||
lines.append("")
|
||||
lines.append(agent_summary)
|
||||
lines.append("")
|
||||
|
||||
# Notable events
|
||||
notable = vibe.get("notable_events", [])
|
||||
if notable:
|
||||
lines.append("## Notable Events")
|
||||
lines.append("")
|
||||
for event in notable:
|
||||
lines.append(f"- {event}")
|
||||
lines.append("")
|
||||
|
||||
# Triage health
|
||||
triage = narrative.get("triage_health", {})
|
||||
if triage.get("high_priority_recommendations", 0) > 0:
|
||||
lines.append("## Triage Notes")
|
||||
lines.append("")
|
||||
lines.append(f"⚠️ {triage['high_priority_recommendations']} high-priority recommendation(s) from loop introspection.")
|
||||
lines.append("")
|
||||
for rec in triage.get("recent_recommendations", [])[:2]:
|
||||
lines.append(f"- **{rec.get('category', 'general')}:** {rec.get('finding', '')}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── Main ───────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(
|
||||
description="Generate weekly narrative summary of work and vibes",
|
||||
)
|
||||
p.add_argument(
|
||||
"--json", "-j",
|
||||
action="store_true",
|
||||
help="Output as JSON instead of markdown",
|
||||
)
|
||||
p.add_argument(
|
||||
"--output", "-o",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Output file path (default from config)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--days",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Override lookback days (default 7)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--force",
|
||||
action="store_true",
|
||||
help="Run even if disabled in config",
|
||||
)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
config = load_automation_config()
|
||||
|
||||
# Check if enabled
|
||||
if not config.get("enabled", True) and not args.force:
|
||||
print("[weekly_narrative] Skipped — weekly narrative is disabled in config")
|
||||
print("[weekly_narrative] Use --force to run anyway")
|
||||
return 0
|
||||
|
||||
# Determine lookback period
|
||||
days = args.days if args.days is not None else config.get("lookback_days", 7)
|
||||
until = datetime.now(UTC)
|
||||
since = until - timedelta(days=days)
|
||||
|
||||
print(f"[weekly_narrative] Generating narrative for the past {days} days...")
|
||||
|
||||
# Setup Gitea client
|
||||
token = get_token(config)
|
||||
client = GiteaClient(config, token)
|
||||
|
||||
if not client.is_available():
|
||||
print("[weekly_narrative] Warning: Gitea API unavailable — will use local data only")
|
||||
|
||||
# Collect data
|
||||
cycles_data = collect_cycles_data(since)
|
||||
issues_data = collect_issues_data(client, since)
|
||||
prs_data = collect_prs_data(client, since)
|
||||
triage_data = collect_triage_data(since)
|
||||
token_data = collect_token_data(since)
|
||||
|
||||
# Analyze
|
||||
themes = extract_themes(issues_data.get("issues", []))
|
||||
agent_contrib = extract_agent_contributions(
|
||||
issues_data.get("issues", []),
|
||||
prs_data.get("prs", []),
|
||||
cycles_data.get("cycles", []),
|
||||
)
|
||||
test_shifts = analyze_test_shifts(cycles_data.get("cycles", []))
|
||||
triage_shifts = analyze_triage_shifts(triage_data)
|
||||
|
||||
# Generate narrative
|
||||
narrative = generate_narrative(
|
||||
cycles_data,
|
||||
issues_data,
|
||||
prs_data,
|
||||
triage_data,
|
||||
themes,
|
||||
agent_contrib,
|
||||
test_shifts,
|
||||
triage_shifts,
|
||||
token_data,
|
||||
since,
|
||||
until,
|
||||
)
|
||||
|
||||
# Determine output path
|
||||
output_path = args.output or config.get("output_file", ".loop/weekly_narrative.json")
|
||||
output_file = REPO_ROOT / output_path
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Write JSON output
|
||||
output_file.write_text(json.dumps(narrative, indent=2) + "\n")
|
||||
|
||||
# Write markdown summary alongside JSON
|
||||
md_output_file = output_file.with_suffix(".md")
|
||||
md_output_file.write_text(generate_markdown_summary(narrative))
|
||||
|
||||
# Print output
|
||||
if args.json:
|
||||
print(json.dumps(narrative, indent=2))
|
||||
else:
|
||||
print()
|
||||
print(generate_markdown_summary(narrative))
|
||||
|
||||
print(f"\n[weekly_narrative] Written to: {output_file}")
|
||||
print(f"[weekly_narrative] Markdown summary: {md_output_file}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
6
timmy_automations/utils/__init__.py
Normal file
6
timmy_automations/utils/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Timmy Automations utilities.
|
||||
|
||||
Shared helper modules for automations.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
389
timmy_automations/utils/token_rules.py
Normal file
389
timmy_automations/utils/token_rules.py
Normal file
@@ -0,0 +1,389 @@
|
||||
"""Token rules helper — Compute token deltas for agent actions.
|
||||
|
||||
This module loads token economy configuration from YAML and provides
|
||||
functions for automations to compute token rewards/penalties.
|
||||
|
||||
Usage:
|
||||
from timmy_automations.utils.token_rules import TokenRules
|
||||
|
||||
rules = TokenRules()
|
||||
delta = rules.get_delta("pr_merged")
|
||||
print(f"PR merge reward: {delta}") # 10
|
||||
|
||||
# Check if agent can perform sensitive operation
|
||||
can_merge = rules.check_gate("pr_merge", current_tokens=25)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class TokenEvent:
|
||||
"""Represents a single token event configuration."""
|
||||
|
||||
name: str
|
||||
description: str
|
||||
reward: int
|
||||
penalty: int
|
||||
category: str
|
||||
gate_threshold: int | None = None
|
||||
|
||||
@property
|
||||
def delta(self) -> int:
|
||||
"""Net token delta (reward + penalty)."""
|
||||
return self.reward + self.penalty
|
||||
|
||||
|
||||
@dataclass
|
||||
class TokenCategoryLimits:
|
||||
"""Daily limits for a token category."""
|
||||
|
||||
max_earn: int
|
||||
max_spend: int
|
||||
|
||||
|
||||
class TokenRules:
|
||||
"""Token economy rules loader and calculator.
|
||||
|
||||
Loads configuration from timmy_automations/config/token_rules.yaml
|
||||
and provides methods to compute token deltas and check gating.
|
||||
"""
|
||||
|
||||
CONFIG_PATH = Path(__file__).parent.parent / "config" / "token_rules.yaml"
|
||||
|
||||
def __init__(self, config_path: Path | None = None) -> None:
|
||||
"""Initialize token rules from configuration file.
|
||||
|
||||
Args:
|
||||
config_path: Optional override for config file location.
|
||||
"""
|
||||
self._config_path = config_path or self.CONFIG_PATH
|
||||
self._events: dict[str, TokenEvent] = {}
|
||||
self._gating: dict[str, int] = {}
|
||||
self._daily_limits: dict[str, TokenCategoryLimits] = {}
|
||||
self._audit: dict[str, Any] = {}
|
||||
self._version: str = "unknown"
|
||||
self._load_config()
|
||||
|
||||
def _load_config(self) -> None:
|
||||
"""Load configuration from YAML file."""
|
||||
# Graceful degradation if yaml not available or file missing
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
# YAML not installed, use fallback defaults
|
||||
self._load_fallback_defaults()
|
||||
return
|
||||
|
||||
if not self._config_path.exists():
|
||||
self._load_fallback_defaults()
|
||||
return
|
||||
|
||||
try:
|
||||
config = yaml.safe_load(self._config_path.read_text())
|
||||
if not config:
|
||||
self._load_fallback_defaults()
|
||||
return
|
||||
|
||||
self._version = config.get("version", "unknown")
|
||||
self._parse_events(config.get("events", {}))
|
||||
self._parse_gating(config.get("gating_thresholds", {}))
|
||||
self._parse_daily_limits(config.get("daily_limits", {}))
|
||||
self._audit = config.get("audit", {})
|
||||
|
||||
except Exception:
|
||||
# Any error loading config, use fallbacks
|
||||
self._load_fallback_defaults()
|
||||
|
||||
def _load_fallback_defaults(self) -> None:
|
||||
"""Load minimal fallback defaults if config unavailable."""
|
||||
self._version = "fallback"
|
||||
self._events = {
|
||||
"pr_merged": TokenEvent(
|
||||
name="pr_merged",
|
||||
description="Successfully merged a pull request",
|
||||
reward=10,
|
||||
penalty=0,
|
||||
category="merge",
|
||||
gate_threshold=0,
|
||||
),
|
||||
"test_fixed": TokenEvent(
|
||||
name="test_fixed",
|
||||
description="Fixed a failing test",
|
||||
reward=8,
|
||||
penalty=0,
|
||||
category="test",
|
||||
),
|
||||
"automation_failure": TokenEvent(
|
||||
name="automation_failure",
|
||||
description="Automation failed",
|
||||
reward=0,
|
||||
penalty=-2,
|
||||
category="operation",
|
||||
),
|
||||
}
|
||||
self._gating = {"pr_merge": 0}
|
||||
self._daily_limits = {}
|
||||
self._audit = {"log_all_transactions": True}
|
||||
|
||||
def _parse_events(self, events_config: dict) -> None:
|
||||
"""Parse event configurations from YAML."""
|
||||
for name, config in events_config.items():
|
||||
if not isinstance(config, dict):
|
||||
continue
|
||||
|
||||
self._events[name] = TokenEvent(
|
||||
name=name,
|
||||
description=config.get("description", ""),
|
||||
reward=config.get("reward", 0),
|
||||
penalty=config.get("penalty", 0),
|
||||
category=config.get("category", "unknown"),
|
||||
gate_threshold=config.get("gate_threshold"),
|
||||
)
|
||||
|
||||
def _parse_gating(self, gating_config: dict) -> None:
|
||||
"""Parse gating thresholds from YAML."""
|
||||
for name, threshold in gating_config.items():
|
||||
if isinstance(threshold, int):
|
||||
self._gating[name] = threshold
|
||||
|
||||
def _parse_daily_limits(self, limits_config: dict) -> None:
|
||||
"""Parse daily limits from YAML."""
|
||||
for category, limits in limits_config.items():
|
||||
if isinstance(limits, dict):
|
||||
self._daily_limits[category] = TokenCategoryLimits(
|
||||
max_earn=limits.get("max_earn", 0),
|
||||
max_spend=limits.get("max_spend", 0),
|
||||
)
|
||||
|
||||
def get_delta(self, event_name: str) -> int:
|
||||
"""Get token delta for an event.
|
||||
|
||||
Args:
|
||||
event_name: Name of the event (e.g., "pr_merged", "test_fixed")
|
||||
|
||||
Returns:
|
||||
Net token delta (positive for reward, negative for penalty)
|
||||
"""
|
||||
event = self._events.get(event_name)
|
||||
if event:
|
||||
return event.delta
|
||||
return 0
|
||||
|
||||
def get_event(self, event_name: str) -> TokenEvent | None:
|
||||
"""Get full event configuration.
|
||||
|
||||
Args:
|
||||
event_name: Name of the event
|
||||
|
||||
Returns:
|
||||
TokenEvent object or None if not found
|
||||
"""
|
||||
return self._events.get(event_name)
|
||||
|
||||
def list_events(self, category: str | None = None) -> list[TokenEvent]:
|
||||
"""List all configured events.
|
||||
|
||||
Args:
|
||||
category: Optional category filter
|
||||
|
||||
Returns:
|
||||
List of TokenEvent objects
|
||||
"""
|
||||
events = list(self._events.values())
|
||||
if category:
|
||||
events = [e for e in events if e.category == category]
|
||||
return events
|
||||
|
||||
def check_gate(self, operation: str, current_tokens: int) -> bool:
|
||||
"""Check if agent meets token threshold for an operation.
|
||||
|
||||
Args:
|
||||
operation: Operation name (e.g., "pr_merge")
|
||||
current_tokens: Agent's current token balance
|
||||
|
||||
Returns:
|
||||
True if agent can perform the operation
|
||||
"""
|
||||
threshold = self._gating.get(operation)
|
||||
if threshold is None:
|
||||
return True # No gate defined, allow
|
||||
return current_tokens >= threshold
|
||||
|
||||
def get_gate_threshold(self, operation: str) -> int | None:
|
||||
"""Get the gating threshold for an operation.
|
||||
|
||||
Args:
|
||||
operation: Operation name
|
||||
|
||||
Returns:
|
||||
Threshold value or None if no gate defined
|
||||
"""
|
||||
return self._gating.get(operation)
|
||||
|
||||
def get_daily_limits(self, category: str) -> TokenCategoryLimits | None:
|
||||
"""Get daily limits for a category.
|
||||
|
||||
Args:
|
||||
category: Category name
|
||||
|
||||
Returns:
|
||||
TokenCategoryLimits or None if not defined
|
||||
"""
|
||||
return self._daily_limits.get(category)
|
||||
|
||||
def compute_transaction(
|
||||
self,
|
||||
event_name: str,
|
||||
current_tokens: int = 0,
|
||||
current_daily_earned: dict[str, int] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Compute a complete token transaction.
|
||||
|
||||
This is the main entry point for agents to use. It returns
|
||||
a complete transaction record with delta, gating check, and limits.
|
||||
|
||||
Args:
|
||||
event_name: Name of the event
|
||||
current_tokens: Agent's current token balance
|
||||
current_daily_earned: Dict of category -> tokens earned today
|
||||
|
||||
Returns:
|
||||
Transaction dict with:
|
||||
- event: Event name
|
||||
- delta: Token delta
|
||||
- allowed: Whether operation is allowed (gating)
|
||||
- new_balance: Projected new balance
|
||||
- limit_reached: Whether daily limit would be exceeded
|
||||
"""
|
||||
event = self._events.get(event_name)
|
||||
if not event:
|
||||
return {
|
||||
"event": event_name,
|
||||
"delta": 0,
|
||||
"allowed": False,
|
||||
"reason": "unknown_event",
|
||||
"new_balance": current_tokens,
|
||||
"limit_reached": False,
|
||||
}
|
||||
|
||||
delta = event.delta
|
||||
new_balance = current_tokens + delta
|
||||
|
||||
# Check gating (for penalties, we don't check gates)
|
||||
allowed = True
|
||||
gate_reason = None
|
||||
if delta > 0 and event.gate_threshold is not None: # Only check gates for positive operations with thresholds
|
||||
allowed = current_tokens >= event.gate_threshold
|
||||
if not allowed:
|
||||
gate_reason = f"requires {event.gate_threshold} tokens"
|
||||
|
||||
# Check daily limits
|
||||
limit_reached = False
|
||||
limit_reason = None
|
||||
if current_daily_earned and event.category in current_daily_earned:
|
||||
limits = self._daily_limits.get(event.category)
|
||||
if limits:
|
||||
current_earned = current_daily_earned.get(event.category, 0)
|
||||
if delta > 0 and current_earned + delta > limits.max_earn:
|
||||
limit_reached = True
|
||||
limit_reason = f"daily earn limit ({limits.max_earn}) reached"
|
||||
|
||||
result = {
|
||||
"event": event_name,
|
||||
"delta": delta,
|
||||
"category": event.category,
|
||||
"allowed": allowed and not limit_reached,
|
||||
"new_balance": new_balance,
|
||||
"limit_reached": limit_reached,
|
||||
}
|
||||
|
||||
if gate_reason:
|
||||
result["gate_reason"] = gate_reason
|
||||
if limit_reason:
|
||||
result["limit_reason"] = limit_reason
|
||||
|
||||
return result
|
||||
|
||||
def get_config_version(self) -> str:
|
||||
"""Get the loaded configuration version."""
|
||||
return self._version
|
||||
|
||||
def get_categories(self) -> list[str]:
|
||||
"""Get list of all configured categories."""
|
||||
categories = {e.category for e in self._events.values()}
|
||||
return sorted(categories)
|
||||
|
||||
def is_auditable(self) -> bool:
|
||||
"""Check if transactions should be logged for audit."""
|
||||
return self._audit.get("log_all_transactions", True)
|
||||
|
||||
|
||||
# Convenience functions for simple use cases
|
||||
|
||||
def get_token_delta(event_name: str) -> int:
|
||||
"""Get token delta for an event (convenience function).
|
||||
|
||||
Args:
|
||||
event_name: Name of the event
|
||||
|
||||
Returns:
|
||||
Token delta (positive for reward, negative for penalty)
|
||||
"""
|
||||
return TokenRules().get_delta(event_name)
|
||||
|
||||
|
||||
def check_operation_gate(operation: str, current_tokens: int) -> bool:
|
||||
"""Check if agent can perform operation (convenience function).
|
||||
|
||||
Args:
|
||||
operation: Operation name
|
||||
current_tokens: Agent's current token balance
|
||||
|
||||
Returns:
|
||||
True if operation is allowed
|
||||
"""
|
||||
return TokenRules().check_gate(operation, current_tokens)
|
||||
|
||||
|
||||
def compute_token_reward(
|
||||
event_name: str,
|
||||
current_tokens: int = 0,
|
||||
) -> dict[str, Any]:
|
||||
"""Compute token reward for an event (convenience function).
|
||||
|
||||
Args:
|
||||
event_name: Name of the event
|
||||
current_tokens: Agent's current token balance
|
||||
|
||||
Returns:
|
||||
Transaction dict with delta, allowed status, new balance
|
||||
"""
|
||||
return TokenRules().compute_transaction(event_name, current_tokens)
|
||||
|
||||
|
||||
def list_token_events(category: str | None = None) -> list[dict[str, Any]]:
|
||||
"""List all token events (convenience function).
|
||||
|
||||
Args:
|
||||
category: Optional category filter
|
||||
|
||||
Returns:
|
||||
List of event dicts with name, description, delta, category
|
||||
"""
|
||||
rules = TokenRules()
|
||||
events = rules.list_events(category)
|
||||
return [
|
||||
{
|
||||
"name": e.name,
|
||||
"description": e.description,
|
||||
"delta": e.delta,
|
||||
"category": e.category,
|
||||
"gate_threshold": e.gate_threshold,
|
||||
}
|
||||
for e in events
|
||||
]
|
||||
Reference in New Issue
Block a user