diff --git a/src/timmy/agentic_loop.py b/src/timmy/agentic_loop.py index 1d64d30..e13f404 100644 --- a/src/timmy/agentic_loop.py +++ b/src/timmy/agentic_loop.py @@ -95,6 +95,126 @@ def _parse_steps(plan_text: str) -> list[str]: return [line.strip() for line in plan_text.strip().splitlines() if line.strip()] +# --------------------------------------------------------------------------- +# Extracted helpers +# --------------------------------------------------------------------------- + + +def _extract_content(run_result) -> str: + """Extract text content from an agent run result.""" + return run_result.content if hasattr(run_result, "content") else str(run_result) + + +def _clean(text: str) -> str: + """Clean a model response using session's response cleaner.""" + from timmy.session import _clean_response + + return _clean_response(text) + + +async def _plan_task( + agent, task: str, session_id: str, max_steps: int +) -> tuple[list[str], bool] | str: + """Run the planning phase — returns (steps, was_truncated) or error string.""" + plan_prompt = ( + f"Break this task into numbered steps (max {max_steps}). " + f"Return ONLY a numbered list, nothing else.\n\n" + f"Task: {task}" + ) + try: + plan_run = await asyncio.to_thread( + agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan" + ) + plan_text = _extract_content(plan_run) + except Exception as exc: # broad catch intentional: agent.run can raise any error + logger.error("Agentic loop: planning failed: %s", exc) + return f"Planning failed: {exc}" + + steps = _parse_steps(plan_text) + if not steps: + return "Planning produced no steps." + + planned_count = len(steps) + steps = steps[:max_steps] + return steps, planned_count > len(steps) + + +async def _execute_step( + agent, + task: str, + step_desc: str, + step_num: int, + total_steps: int, + recent_results: list[str], + session_id: str, +) -> AgenticStep: + """Execute a single step, returning an AgenticStep.""" + step_start = time.monotonic() + context = ( + f"Task: {task}\n" + f"Step {step_num}/{total_steps}: {step_desc}\n" + f"Recent progress: {recent_results[-2:] if recent_results else []}\n\n" + f"Execute this step and report what you did." + ) + step_run = await asyncio.to_thread( + agent.run, context, stream=False, session_id=f"{session_id}_step{step_num}" + ) + step_result = _clean(_extract_content(step_run)) + return AgenticStep( + step_num=step_num, + description=step_desc, + result=step_result, + status="completed", + duration_ms=int((time.monotonic() - step_start) * 1000), + ) + + +async def _adapt_step( + agent, + step_desc: str, + step_num: int, + error: Exception, + step_start: float, + session_id: str, +) -> AgenticStep: + """Attempt adaptation after a step failure.""" + adapt_prompt = ( + f"Step {step_num} failed with error: {error}\n" + f"Original step was: {step_desc}\n" + f"Adapt the plan and try an alternative approach for this step." + ) + adapt_run = await asyncio.to_thread( + agent.run, adapt_prompt, stream=False, session_id=f"{session_id}_adapt{step_num}" + ) + adapt_result = _clean(_extract_content(adapt_run)) + return AgenticStep( + step_num=step_num, + description=f"[Adapted] {step_desc}", + result=adapt_result, + status="adapted", + duration_ms=int((time.monotonic() - step_start) * 1000), + ) + + +def _summarize(result: AgenticResult, total_steps: int, was_truncated: bool) -> None: + """Fill in summary and final status on the result object (mutates in place).""" + completed = sum(1 for s in result.steps if s.status == "completed") + adapted = sum(1 for s in result.steps if s.status == "adapted") + failed = sum(1 for s in result.steps if s.status == "failed") + + parts = [f"Completed {completed}/{total_steps} steps"] + if adapted: + parts.append(f"{adapted} adapted") + if failed: + parts.append(f"{failed} failed") + result.summary = f"{result.task}: {', '.join(parts)}." + + if was_truncated or len(result.steps) < total_steps or failed: + result.status = "partial" + else: + result.status = "completed" + + # --------------------------------------------------------------------------- # Core loop # --------------------------------------------------------------------------- @@ -125,88 +245,41 @@ async def run_agentic_loop( task_id = str(uuid.uuid4())[:8] start_time = time.monotonic() - agent = _get_loop_agent() result = AgenticResult(task_id=task_id, task=task, summary="") - # ── Phase 1: Planning ────────────────────────────────────────────────── - plan_prompt = ( - f"Break this task into numbered steps (max {max_steps}). " - f"Return ONLY a numbered list, nothing else.\n\n" - f"Task: {task}" - ) - try: - plan_run = await asyncio.to_thread( - agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan" - ) - plan_text = plan_run.content if hasattr(plan_run, "content") else str(plan_run) - except Exception as exc: # broad catch intentional: agent.run can raise any error - logger.error("Agentic loop: planning failed: %s", exc) + # Phase 1: Planning + plan = await _plan_task(agent, task, session_id, max_steps) + if isinstance(plan, str): result.status = "failed" - result.summary = f"Planning failed: {exc}" + result.summary = plan result.total_duration_ms = int((time.monotonic() - start_time) * 1000) return result - steps = _parse_steps(plan_text) - if not steps: - result.status = "failed" - result.summary = "Planning produced no steps." - result.total_duration_ms = int((time.monotonic() - start_time) * 1000) - return result - - # Enforce max_steps — track if we truncated - planned_steps = len(steps) - steps = steps[:max_steps] + steps, was_truncated = plan total_steps = len(steps) - was_truncated = planned_steps > total_steps - # Broadcast plan await _broadcast_progress( "agentic.plan_ready", - { - "task_id": task_id, - "task": task, - "steps": steps, - "total": total_steps, - }, + {"task_id": task_id, "task": task, "steps": steps, "total": total_steps}, ) - # ── Phase 2: Execution ───────────────────────────────────────────────── + # Phase 2: Execution completed_results: list[str] = [] - for i, step_desc in enumerate(steps, 1): step_start = time.monotonic() - - recent = completed_results[-2:] if completed_results else [] - context = ( - f"Task: {task}\n" - f"Step {i}/{total_steps}: {step_desc}\n" - f"Recent progress: {recent}\n\n" - f"Execute this step and report what you did." - ) - try: - step_run = await asyncio.to_thread( - agent.run, context, stream=False, session_id=f"{session_id}_step{i}" - ) - step_result = step_run.content if hasattr(step_run, "content") else str(step_run) - - # Clean the response - from timmy.session import _clean_response - - step_result = _clean_response(step_result) - - step = AgenticStep( - step_num=i, - description=step_desc, - result=step_result, - status="completed", - duration_ms=int((time.monotonic() - step_start) * 1000), + step = await _execute_step( + agent, + task, + step_desc, + i, + total_steps, + completed_results, + session_id, ) result.steps.append(step) - completed_results.append(f"Step {i}: {step_result[:200]}") - - # Broadcast progress + completed_results.append(f"Step {i}: {step.result[:200]}") await _broadcast_progress( "agentic.step_complete", { @@ -214,46 +287,18 @@ async def run_agentic_loop( "step": i, "total": total_steps, "description": step_desc, - "result": step_result[:200], + "result": step.result[:200], }, ) - if on_progress: await on_progress(step_desc, i, total_steps) except Exception as exc: # broad catch intentional: agent.run can raise any error logger.warning("Agentic loop step %d failed: %s", i, exc) - - # ── Adaptation: ask model to adapt ───────────────────────────── - adapt_prompt = ( - f"Step {i} failed with error: {exc}\n" - f"Original step was: {step_desc}\n" - f"Adapt the plan and try an alternative approach for this step." - ) try: - adapt_run = await asyncio.to_thread( - agent.run, - adapt_prompt, - stream=False, - session_id=f"{session_id}_adapt{i}", - ) - adapt_result = ( - adapt_run.content if hasattr(adapt_run, "content") else str(adapt_run) - ) - from timmy.session import _clean_response - - adapt_result = _clean_response(adapt_result) - - step = AgenticStep( - step_num=i, - description=f"[Adapted] {step_desc}", - result=adapt_result, - status="adapted", - duration_ms=int((time.monotonic() - step_start) * 1000), - ) + step = await _adapt_step(agent, step_desc, i, exc, step_start, session_id) result.steps.append(step) - completed_results.append(f"Step {i} (adapted): {adapt_result[:200]}") - + completed_results.append(f"Step {i} (adapted): {step.result[:200]}") await _broadcast_progress( "agentic.step_adapted", { @@ -262,46 +307,26 @@ async def run_agentic_loop( "total": total_steps, "description": step_desc, "error": str(exc), - "adaptation": adapt_result[:200], + "adaptation": step.result[:200], }, ) - if on_progress: await on_progress(f"[Adapted] {step_desc}", i, total_steps) - - except Exception as adapt_exc: # broad catch intentional: agent.run can raise any error + except Exception as adapt_exc: # broad catch intentional logger.error("Agentic loop adaptation also failed: %s", adapt_exc) - step = AgenticStep( - step_num=i, - description=step_desc, - result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}", - status="failed", - duration_ms=int((time.monotonic() - step_start) * 1000), + result.steps.append( + AgenticStep( + step_num=i, + description=step_desc, + result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}", + status="failed", + duration_ms=int((time.monotonic() - step_start) * 1000), + ) ) - result.steps.append(step) completed_results.append(f"Step {i}: FAILED") - # ── Phase 3: Summary ─────────────────────────────────────────────────── - completed_count = sum(1 for s in result.steps if s.status == "completed") - adapted_count = sum(1 for s in result.steps if s.status == "adapted") - failed_count = sum(1 for s in result.steps if s.status == "failed") - parts = [f"Completed {completed_count}/{total_steps} steps"] - if adapted_count: - parts.append(f"{adapted_count} adapted") - if failed_count: - parts.append(f"{failed_count} failed") - result.summary = f"{task}: {', '.join(parts)}." - - # Determine final status - if was_truncated: - result.status = "partial" - elif len(result.steps) < total_steps: - result.status = "partial" - elif any(s.status == "failed" for s in result.steps): - result.status = "partial" - else: - result.status = "completed" - + # Phase 3: Summary + _summarize(result, total_steps, was_truncated) result.total_duration_ms = int((time.monotonic() - start_time) * 1000) await _broadcast_progress(