Files
timmy-config/tasks.py
Alexander Whitestone ef4c94b9b5 fix: gitea_client import path — point to sovereign-orchestration/src
The hermes-agent tools/__init__.py pulls in firecrawl which isn't installed.
Direct import from sovereign-orchestration's zero-dependency client works.

TODO: gitea_client should be pip-installable or vendored, not a cross-repo path reach.
2026-03-25 19:05:29 -04:00

81 lines
2.6 KiB
Python

"""Timmy's scheduled work — triage, PR review, dispatch."""
import sys
from pathlib import Path
# Gitea client lives in sovereign-orchestration
sys.path.insert(0, str(Path.home() / ".timmy" / "sovereign-orchestration" / "src"))
from orchestration import huey
from huey import crontab
from gitea_client import GiteaClient
REPOS = [
"Timmy_Foundation/the-nexus",
"Timmy_Foundation/autolora",
"Timmy_Foundation/timmy-config",
]
NET_LINE_LIMIT = 10
@huey.periodic_task(crontab(minute="*/15"))
def triage_issues():
"""Score and assign unassigned issues across all repos."""
g = GiteaClient()
found = 0
for repo in REPOS:
for issue in g.find_unassigned_issues(repo, limit=10):
found += 1
g.create_comment(
repo, issue.number,
"🔍 Triaged by Huey — needs assignment."
)
return {"triaged": found}
@huey.periodic_task(crontab(minute="*/30"))
def review_prs():
"""Review open PRs: check net diff, reject violations."""
g = GiteaClient()
reviewed, rejected = 0, 0
for repo in REPOS:
for pr in g.list_pulls(repo, state="open", limit=20):
reviewed += 1
files = g.get_pull_files(repo, pr.number)
net = sum(f.additions - f.deletions for f in files)
if net > NET_LINE_LIMIT:
rejected += 1
g.create_comment(
repo, pr.number,
f"❌ Net +{net} lines exceeds the {NET_LINE_LIMIT}-line limit. "
f"Find {net - NET_LINE_LIMIT} lines to cut. See CONTRIBUTING.md."
)
return {"reviewed": reviewed, "rejected": rejected}
@huey.periodic_task(crontab(minute="*/10"))
def dispatch_assigned():
"""Pick up issues assigned to agents and kick off work."""
g = GiteaClient()
agents = ["claude", "gemini", "kimi", "grok", "perplexity"]
dispatched = 0
for repo in REPOS:
for agent in agents:
for issue in g.find_agent_issues(repo, agent, limit=5):
comments = g.list_comments(repo, issue.number, limit=5)
if any(c.body and "dispatched" in c.body.lower() for c in comments):
continue
dispatch_work(repo, issue.number, agent)
dispatched += 1
return {"dispatched": dispatched}
@huey.task(retries=3, retry_delay=60)
def dispatch_work(repo, issue_number, agent):
"""Dispatch a single issue to an agent. Huey handles retry."""
g = GiteaClient()
g.create_comment(
repo, issue_number,
f"⚡ Dispatched to `{agent}`. Huey task queued."
)