Files
timmy-home/uni-wizard/daemons/task_router.py

223 lines
6.8 KiB
Python
Raw Normal View History

"""
Task Router for Uni-Wizard
Polls Gitea for assigned issues and executes them
"""
import json
import time
import sys
from pathlib import Path
from datetime import datetime
# Add parent to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from harness import get_harness
class TaskRouter:
"""
Gitea Task Router.
Polls Gitea for issues assigned to Timmy and routes them
to appropriate tools for execution.
Flow:
1. Poll Gitea API for open issues assigned to Timmy
2. Parse issue body for commands/tasks
3. Route to appropriate tool via harness
4. Post results back as comments
5. Close issue if task complete
"""
def __init__(
self,
gitea_url: str = "http://143.198.27.163:3000",
repo: str = "Timmy_Foundation/timmy-home",
assignee: str = "timmy",
poll_interval: int = 60
):
self.gitea_url = gitea_url
self.repo = repo
self.assignee = assignee
self.poll_interval = poll_interval
self.running = False
self.harness = get_harness()
self.processed_issues = set()
# Log file
self.log_path = Path.home() / "timmy" / "logs"
self.log_path.mkdir(parents=True, exist_ok=True)
self.router_log = self.log_path / "task_router.jsonl"
def start(self):
"""Start the task router"""
self.running = True
print(f"Task router started")
print(f" Polling: {self.gitea_url}")
print(f" Assignee: {self.assignee}")
print(f" Interval: {self.poll_interval}s")
while self.running:
try:
self._poll_and_route()
time.sleep(self.poll_interval)
except Exception as e:
self._log_event("error", {"message": str(e)})
time.sleep(5)
def stop(self):
"""Stop the task router"""
self.running = False
print("Task router stopped")
def _poll_and_route(self):
"""Poll for issues and route tasks"""
# Get assigned issues
result = self.harness.execute(
"gitea_list_issues",
repo=self.repo,
state="open",
assignee=self.assignee
)
try:
issues = json.loads(result)
except:
return
for issue in issues.get("issues", []):
issue_num = issue["number"]
# Skip already processed
if issue_num in self.processed_issues:
continue
# Process the issue
self._process_issue(issue)
self.processed_issues.add(issue_num)
def _process_issue(self, issue: dict):
"""Process a single issue"""
issue_num = issue["number"]
title = issue["title"]
self._log_event("issue_received", {
"number": issue_num,
"title": title
})
# Parse title for command hints
# Format: "[ACTION] Description" or just "Description"
action = self._parse_action(title)
# Route to appropriate handler
if action == "system_check":
result = self._handle_system_check(issue_num)
elif action == "git_operation":
result = self._handle_git_operation(issue_num, issue)
elif action == "health_report":
result = self._handle_health_report(issue_num)
else:
result = self._handle_generic(issue_num, issue)
# Post result as comment
self._post_comment(issue_num, result)
self._log_event("issue_processed", {
"number": issue_num,
"action": action,
"result": "success" if result else "failed"
})
def _parse_action(self, title: str) -> str:
"""Parse action from issue title"""
title_lower = title.lower()
if any(kw in title_lower for kw in ["health", "status", "check"]):
return "health_report"
elif any(kw in title_lower for kw in ["system", "resource", "disk", "memory"]):
return "system_check"
elif any(kw in title_lower for kw in ["git", "commit", "push", "pull", "branch"]):
return "git_operation"
return "generic"
def _handle_system_check(self, issue_num: int) -> str:
"""Handle system check task"""
result = self.harness.execute("system_info")
return f"## System Check Results\n\n```json\n{result}\n```"
def _handle_health_report(self, issue_num: int) -> str:
"""Handle health report task"""
result = self.harness.execute("health_check")
return f"## Health Report\n\n```json\n{result}\n```"
def _handle_git_operation(self, issue_num: int, issue: dict) -> str:
"""Handle git operation task"""
body = issue.get("body", "")
# Parse body for git commands
results = []
# Check for status request
if "status" in body.lower():
result = self.harness.execute("git_status", repo_path="/root/timmy/timmy-home")
results.append(f"**Git Status:**\n```json\n{result}\n```")
# Check for pull request
if "pull" in body.lower():
result = self.harness.execute("git_pull", repo_path="/root/timmy/timmy-home")
results.append(f"**Git Pull:**\n{result}")
if not results:
results.append("No specific git operation detected in issue body.")
return "\n\n".join(results)
def _handle_generic(self, issue_num: int, issue: dict) -> str:
"""Handle generic task"""
return f"Received issue #{issue_num}: {issue['title']}\n\nI'll process this and update shortly."
def _post_comment(self, issue_num: int, body: str):
"""Post a comment on the issue"""
result = self.harness.execute(
"gitea_comment",
repo=self.repo,
issue_number=issue_num,
body=body
)
return result
def _log_event(self, event_type: str, data: dict):
"""Log an event to the JSONL file"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"event": event_type,
**data
}
with open(self.router_log, "a") as f:
f.write(json.dumps(log_entry) + "\n")
def main():
"""Run the task router"""
import signal
router = TaskRouter()
def signal_handler(sig, frame):
print("\nShutting down...")
router.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
router.start()
if __name__ == "__main__":
main()