Compare commits

...

4 Commits

Author SHA1 Message Date
kimi
c5e7dc09ae refactor: break up lifespan() into focused helpers
Extract _init_services(), _auto_prune(), _register_error_recorder(),
and _shutdown() from the 142-line lifespan() context manager, reducing
it to ~25 lines that read as a clear startup/shutdown sequence.

Fixes #514

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 19:30:19 -04:00
cd3dc5d989 refactor: break up CascadeRouter.complete() into focused helpers (#510)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:24:36 -04:00
e4de539bf3 fix: extract ollama_url normalization into shared utility (#508)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:18:22 -04:00
b2057f72e1 [loop-cycle] refactor: break up run_agentic_loop into testable helpers (#504) (#509) 2026-03-19 19:15:38 -04:00
7 changed files with 338 additions and 260 deletions

View File

@@ -10,6 +10,11 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
APP_START_TIME: _datetime = _datetime.now(UTC)
def normalize_ollama_url(url: str) -> str:
"""Replace localhost with 127.0.0.1 to avoid IPv6 resolution delays."""
return url.replace("localhost", "127.0.0.1")
class Settings(BaseSettings):
"""Central configuration — all env-var access goes through this class."""
@@ -19,6 +24,11 @@ class Settings(BaseSettings):
# Ollama host — override with OLLAMA_URL env var or .env file
ollama_url: str = "http://localhost:11434"
@property
def normalized_ollama_url(self) -> str:
"""Return ollama_url with localhost replaced by 127.0.0.1."""
return normalize_ollama_url(self.ollama_url)
# LLM model passed to Agno/Ollama — override with OLLAMA_MODEL
# qwen3:30b is the primary model — better reasoning and tool calling
# than llama3.1:8b-instruct while still running locally on modest hardware.
@@ -392,7 +402,7 @@ def check_ollama_model_available(model_name: str) -> bool:
import json
import urllib.request
url = settings.ollama_url.replace("localhost", "127.0.0.1")
url = settings.normalized_ollama_url
req = urllib.request.Request(
f"{url}/api/tags",
method="GET",

View File

@@ -329,33 +329,21 @@ async def _discord_token_watcher() -> None:
logger.warning("Discord auto-start failed: %s", exc)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
# Validate security config (no-op in test mode)
def _init_services() -> None:
"""Validate config, enable event persistence, and init Spark engine."""
from config import validate_startup
from infrastructure.events.bus import init_event_bus_persistence
from spark.engine import get_spark_engine
validate_startup()
# Enable event persistence (unified EventBus + swarm event_log)
from infrastructure.events.bus import init_event_bus_persistence
init_event_bus_persistence()
# Create all background tasks without waiting for them
briefing_task = asyncio.create_task(_briefing_scheduler())
thinking_task = asyncio.create_task(_thinking_scheduler())
loop_qa_task = asyncio.create_task(_loop_qa_scheduler())
presence_task = asyncio.create_task(_presence_watcher())
# Initialize Spark Intelligence engine
from spark.engine import get_spark_engine
if get_spark_engine().enabled:
logger.info("Spark Intelligence active — event capture enabled")
# Auto-prune old vector store memories on startup
def _auto_prune() -> None:
"""Run startup housekeeping: prune memories, thoughts, events, and check vault size."""
if settings.memory_prune_days > 0:
try:
from timmy.memory_system import prune_memories
@@ -373,7 +361,6 @@ async def lifespan(app: FastAPI):
except Exception as exc:
logger.debug("Memory auto-prune skipped: %s", exc)
# Auto-prune old thoughts on startup
if settings.thoughts_prune_days > 0:
try:
from timmy.thinking import thinking_engine
@@ -391,7 +378,6 @@ async def lifespan(app: FastAPI):
except Exception as exc:
logger.debug("Thought auto-prune skipped: %s", exc)
# Auto-prune old system events on startup
if settings.events_prune_days > 0:
try:
from swarm.event_log import prune_old_events
@@ -409,7 +395,6 @@ async def lifespan(app: FastAPI):
except Exception as exc:
logger.debug("Event auto-prune skipped: %s", exc)
# Warn if memory vault exceeds size limit
if settings.memory_vault_max_mb > 0:
try:
vault_path = Path(settings.repo_root) / "memory" / "notes"
@@ -425,17 +410,9 @@ async def lifespan(app: FastAPI):
except Exception as exc:
logger.debug("Vault size check skipped: %s", exc)
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
await workshop_heartbeat.start()
# Start chat integrations in background
chat_task = asyncio.create_task(_start_chat_integrations_background())
# Register session logger with error capture (breaks infrastructure → timmy circular dep)
def _register_error_recorder() -> None:
"""Wire the session logger into the error-capture system."""
try:
from infrastructure.error_capture import register_error_recorder
from timmy.session_logger import get_session_logger
@@ -444,18 +421,18 @@ async def lifespan(app: FastAPI):
except Exception:
logger.debug("Failed to register error recorder")
logger.info("✓ Dashboard ready for requests")
yield
# Cleanup on shutdown
async def _shutdown(
tasks: list[asyncio.Task],
workshop_heartbeat: object,
) -> None:
"""Stop integrations, close sessions, and cancel background tasks."""
from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.telegram_bot.bot import telegram_bot
await discord_bot.stop()
await telegram_bot.stop()
# Close MCP tool server sessions
try:
from timmy.mcp_tools import close_mcp_sessions
@@ -463,15 +440,46 @@ async def lifespan(app: FastAPI):
except Exception as exc:
logger.debug("MCP shutdown: %s", exc)
await workshop_heartbeat.stop()
await workshop_heartbeat.stop() # type: ignore[union-attr]
for task in [briefing_task, thinking_task, chat_task, loop_qa_task, presence_task]:
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
for task in tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
_init_services()
_auto_prune()
# Create all background tasks without waiting for them
bg_tasks = [
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
]
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
await workshop_heartbeat.start()
# Start chat integrations in background
bg_tasks.append(asyncio.create_task(_start_chat_integrations_background()))
_register_error_recorder()
logger.info("✓ Dashboard ready for requests")
yield
await _shutdown(bg_tasks, workshop_heartbeat)
app = FastAPI(

View File

@@ -65,7 +65,7 @@ def _check_ollama_sync() -> DependencyStatus:
try:
import urllib.request
url = settings.ollama_url.replace("localhost", "127.0.0.1")
url = settings.normalized_ollama_url
req = urllib.request.Request(
f"{url}/api/tags",
method="GET",

View File

@@ -13,7 +13,7 @@ import logging
from dataclasses import dataclass, field
from enum import Enum, auto
from config import settings
from config import normalize_ollama_url, settings
logger = logging.getLogger(__name__)
@@ -307,7 +307,7 @@ class MultiModalManager:
import json
import urllib.request
url = self.ollama_url.replace("localhost", "127.0.0.1")
url = normalize_ollama_url(self.ollama_url)
req = urllib.request.Request(
f"{url}/api/tags",
method="GET",
@@ -462,7 +462,7 @@ class MultiModalManager:
logger.info("Pulling model: %s", model_name)
url = self.ollama_url.replace("localhost", "127.0.0.1")
url = normalize_ollama_url(self.ollama_url)
req = urllib.request.Request(
f"{url}/api/pull",
method="POST",

View File

@@ -388,6 +388,101 @@ class CascadeRouter:
return None
def _select_model(
self, provider: Provider, model: str | None, content_type: ContentType
) -> tuple[str | None, bool]:
"""Select the best model for the request, with vision fallback.
Returns:
Tuple of (selected_model, is_fallback_model).
"""
selected_model = model or provider.get_default_model()
is_fallback = False
if content_type != ContentType.TEXT and selected_model:
if provider.type == "ollama" and self._mm_manager:
from infrastructure.models.multimodal import ModelCapability
if content_type == ContentType.VISION:
supports = self._mm_manager.model_supports(
selected_model, ModelCapability.VISION
)
if not supports:
fallback = self._get_fallback_model(provider, selected_model, content_type)
if fallback:
logger.info(
"Model %s doesn't support vision, falling back to %s",
selected_model,
fallback,
)
selected_model = fallback
is_fallback = True
else:
logger.warning(
"No vision-capable model found on %s, trying anyway",
provider.name,
)
return selected_model, is_fallback
async def _attempt_with_retry(
self,
provider: Provider,
messages: list[dict],
model: str | None,
temperature: float,
max_tokens: int | None,
content_type: ContentType,
) -> dict:
"""Try a provider with retries, returning the result dict.
Raises:
RuntimeError: If all retry attempts fail.
Returns error strings collected during retries via the exception message.
"""
errors: list[str] = []
for attempt in range(self.config.max_retries_per_provider):
try:
return await self._try_provider(
provider=provider,
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
except Exception as exc:
error_msg = str(exc)
logger.warning(
"Provider %s attempt %d failed: %s",
provider.name,
attempt + 1,
error_msg,
)
errors.append(f"{provider.name}: {error_msg}")
if attempt < self.config.max_retries_per_provider - 1:
await asyncio.sleep(self.config.retry_delay_seconds)
raise RuntimeError("; ".join(errors))
def _is_provider_available(self, provider: Provider) -> bool:
"""Check if a provider should be tried (enabled + circuit breaker)."""
if not provider.enabled:
logger.debug("Skipping %s (disabled)", provider.name)
return False
if provider.status == ProviderStatus.UNHEALTHY:
if self._can_close_circuit(provider):
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
logger.info("Circuit breaker half-open for %s", provider.name)
else:
logger.debug("Skipping %s (circuit open)", provider.name)
return False
return True
async def complete(
self,
messages: list[dict],
@@ -414,7 +509,6 @@ class CascadeRouter:
Raises:
RuntimeError: If all providers fail
"""
# Detect content type for multi-modal routing
content_type = self._detect_content_type(messages)
if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
@@ -422,93 +516,34 @@ class CascadeRouter:
errors = []
for provider in self.providers:
# Skip disabled providers
if not provider.enabled:
logger.debug("Skipping %s (disabled)", provider.name)
if not self._is_provider_available(provider):
continue
# Skip unhealthy providers (circuit breaker)
if provider.status == ProviderStatus.UNHEALTHY:
# Check if circuit breaker can close
if self._can_close_circuit(provider):
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
logger.info("Circuit breaker half-open for %s", provider.name)
else:
logger.debug("Skipping %s (circuit open)", provider.name)
continue
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
# Determine which model to use
selected_model = model or provider.get_default_model()
is_fallback_model = False
try:
result = await self._attempt_with_retry(
provider,
messages,
selected_model,
temperature,
max_tokens,
content_type,
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
continue
# For non-text content, check if model supports it
if content_type != ContentType.TEXT and selected_model:
if provider.type == "ollama" and self._mm_manager:
from infrastructure.models.multimodal import ModelCapability
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
# Check if selected model supports the required capability
if content_type == ContentType.VISION:
supports = self._mm_manager.model_supports(
selected_model, ModelCapability.VISION
)
if not supports:
# Find fallback model
fallback = self._get_fallback_model(
provider, selected_model, content_type
)
if fallback:
logger.info(
"Model %s doesn't support vision, falling back to %s",
selected_model,
fallback,
)
selected_model = fallback
is_fallback_model = True
else:
logger.warning(
"No vision-capable model found on %s, trying anyway",
provider.name,
)
# Try this provider
for attempt in range(self.config.max_retries_per_provider):
try:
result = await self._try_provider(
provider=provider,
messages=messages,
model=selected_model,
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
# Success! Update metrics and return
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get(
"model", selected_model or provider.get_default_model()
),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
except Exception as exc:
error_msg = str(exc)
logger.warning(
"Provider %s attempt %d failed: %s", provider.name, attempt + 1, error_msg
)
errors.append(f"{provider.name}: {error_msg}")
if attempt < self.config.max_retries_per_provider - 1:
await asyncio.sleep(self.config.retry_delay_seconds)
# All retries failed for this provider
self._record_failure(provider)
# All providers failed
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")
async def _try_provider(

View File

@@ -63,7 +63,7 @@ def _pull_model(model_name: str) -> bool:
logger.info("Pulling model: %s", model_name)
url = settings.ollama_url.replace("localhost", "127.0.0.1")
url = settings.normalized_ollama_url
req = urllib.request.Request(
f"{url}/api/pull",
method="POST",

View File

@@ -95,6 +95,126 @@ def _parse_steps(plan_text: str) -> list[str]:
return [line.strip() for line in plan_text.strip().splitlines() if line.strip()]
# ---------------------------------------------------------------------------
# Extracted helpers
# ---------------------------------------------------------------------------
def _extract_content(run_result) -> str:
"""Extract text content from an agent run result."""
return run_result.content if hasattr(run_result, "content") else str(run_result)
def _clean(text: str) -> str:
"""Clean a model response using session's response cleaner."""
from timmy.session import _clean_response
return _clean_response(text)
async def _plan_task(
agent, task: str, session_id: str, max_steps: int
) -> tuple[list[str], bool] | str:
"""Run the planning phase — returns (steps, was_truncated) or error string."""
plan_prompt = (
f"Break this task into numbered steps (max {max_steps}). "
f"Return ONLY a numbered list, nothing else.\n\n"
f"Task: {task}"
)
try:
plan_run = await asyncio.to_thread(
agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
)
plan_text = _extract_content(plan_run)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop: planning failed: %s", exc)
return f"Planning failed: {exc}"
steps = _parse_steps(plan_text)
if not steps:
return "Planning produced no steps."
planned_count = len(steps)
steps = steps[:max_steps]
return steps, planned_count > len(steps)
async def _execute_step(
agent,
task: str,
step_desc: str,
step_num: int,
total_steps: int,
recent_results: list[str],
session_id: str,
) -> AgenticStep:
"""Execute a single step, returning an AgenticStep."""
step_start = time.monotonic()
context = (
f"Task: {task}\n"
f"Step {step_num}/{total_steps}: {step_desc}\n"
f"Recent progress: {recent_results[-2:] if recent_results else []}\n\n"
f"Execute this step and report what you did."
)
step_run = await asyncio.to_thread(
agent.run, context, stream=False, session_id=f"{session_id}_step{step_num}"
)
step_result = _clean(_extract_content(step_run))
return AgenticStep(
step_num=step_num,
description=step_desc,
result=step_result,
status="completed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
async def _adapt_step(
agent,
step_desc: str,
step_num: int,
error: Exception,
step_start: float,
session_id: str,
) -> AgenticStep:
"""Attempt adaptation after a step failure."""
adapt_prompt = (
f"Step {step_num} failed with error: {error}\n"
f"Original step was: {step_desc}\n"
f"Adapt the plan and try an alternative approach for this step."
)
adapt_run = await asyncio.to_thread(
agent.run, adapt_prompt, stream=False, session_id=f"{session_id}_adapt{step_num}"
)
adapt_result = _clean(_extract_content(adapt_run))
return AgenticStep(
step_num=step_num,
description=f"[Adapted] {step_desc}",
result=adapt_result,
status="adapted",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
def _summarize(result: AgenticResult, total_steps: int, was_truncated: bool) -> None:
"""Fill in summary and final status on the result object (mutates in place)."""
completed = sum(1 for s in result.steps if s.status == "completed")
adapted = sum(1 for s in result.steps if s.status == "adapted")
failed = sum(1 for s in result.steps if s.status == "failed")
parts = [f"Completed {completed}/{total_steps} steps"]
if adapted:
parts.append(f"{adapted} adapted")
if failed:
parts.append(f"{failed} failed")
result.summary = f"{result.task}: {', '.join(parts)}."
if was_truncated or len(result.steps) < total_steps or failed:
result.status = "partial"
else:
result.status = "completed"
# ---------------------------------------------------------------------------
# Core loop
# ---------------------------------------------------------------------------
@@ -125,88 +245,41 @@ async def run_agentic_loop(
task_id = str(uuid.uuid4())[:8]
start_time = time.monotonic()
agent = _get_loop_agent()
result = AgenticResult(task_id=task_id, task=task, summary="")
# ── Phase 1: Planning ──────────────────────────────────────────────────
plan_prompt = (
f"Break this task into numbered steps (max {max_steps}). "
f"Return ONLY a numbered list, nothing else.\n\n"
f"Task: {task}"
)
try:
plan_run = await asyncio.to_thread(
agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
)
plan_text = plan_run.content if hasattr(plan_run, "content") else str(plan_run)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop: planning failed: %s", exc)
# Phase 1: Planning
plan = await _plan_task(agent, task, session_id, max_steps)
if isinstance(plan, str):
result.status = "failed"
result.summary = f"Planning failed: {exc}"
result.summary = plan
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
return result
steps = _parse_steps(plan_text)
if not steps:
result.status = "failed"
result.summary = "Planning produced no steps."
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
return result
# Enforce max_steps — track if we truncated
planned_steps = len(steps)
steps = steps[:max_steps]
steps, was_truncated = plan
total_steps = len(steps)
was_truncated = planned_steps > total_steps
# Broadcast plan
await _broadcast_progress(
"agentic.plan_ready",
{
"task_id": task_id,
"task": task,
"steps": steps,
"total": total_steps,
},
{"task_id": task_id, "task": task, "steps": steps, "total": total_steps},
)
# ── Phase 2: Execution ─────────────────────────────────────────────────
# Phase 2: Execution
completed_results: list[str] = []
for i, step_desc in enumerate(steps, 1):
step_start = time.monotonic()
recent = completed_results[-2:] if completed_results else []
context = (
f"Task: {task}\n"
f"Step {i}/{total_steps}: {step_desc}\n"
f"Recent progress: {recent}\n\n"
f"Execute this step and report what you did."
)
try:
step_run = await asyncio.to_thread(
agent.run, context, stream=False, session_id=f"{session_id}_step{i}"
)
step_result = step_run.content if hasattr(step_run, "content") else str(step_run)
# Clean the response
from timmy.session import _clean_response
step_result = _clean_response(step_result)
step = AgenticStep(
step_num=i,
description=step_desc,
result=step_result,
status="completed",
duration_ms=int((time.monotonic() - step_start) * 1000),
step = await _execute_step(
agent,
task,
step_desc,
i,
total_steps,
completed_results,
session_id,
)
result.steps.append(step)
completed_results.append(f"Step {i}: {step_result[:200]}")
# Broadcast progress
completed_results.append(f"Step {i}: {step.result[:200]}")
await _broadcast_progress(
"agentic.step_complete",
{
@@ -214,46 +287,18 @@ async def run_agentic_loop(
"step": i,
"total": total_steps,
"description": step_desc,
"result": step_result[:200],
"result": step.result[:200],
},
)
if on_progress:
await on_progress(step_desc, i, total_steps)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.warning("Agentic loop step %d failed: %s", i, exc)
# ── Adaptation: ask model to adapt ─────────────────────────────
adapt_prompt = (
f"Step {i} failed with error: {exc}\n"
f"Original step was: {step_desc}\n"
f"Adapt the plan and try an alternative approach for this step."
)
try:
adapt_run = await asyncio.to_thread(
agent.run,
adapt_prompt,
stream=False,
session_id=f"{session_id}_adapt{i}",
)
adapt_result = (
adapt_run.content if hasattr(adapt_run, "content") else str(adapt_run)
)
from timmy.session import _clean_response
adapt_result = _clean_response(adapt_result)
step = AgenticStep(
step_num=i,
description=f"[Adapted] {step_desc}",
result=adapt_result,
status="adapted",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
step = await _adapt_step(agent, step_desc, i, exc, step_start, session_id)
result.steps.append(step)
completed_results.append(f"Step {i} (adapted): {adapt_result[:200]}")
completed_results.append(f"Step {i} (adapted): {step.result[:200]}")
await _broadcast_progress(
"agentic.step_adapted",
{
@@ -262,46 +307,26 @@ async def run_agentic_loop(
"total": total_steps,
"description": step_desc,
"error": str(exc),
"adaptation": adapt_result[:200],
"adaptation": step.result[:200],
},
)
if on_progress:
await on_progress(f"[Adapted] {step_desc}", i, total_steps)
except Exception as adapt_exc: # broad catch intentional: agent.run can raise any error
except Exception as adapt_exc: # broad catch intentional
logger.error("Agentic loop adaptation also failed: %s", adapt_exc)
step = AgenticStep(
step_num=i,
description=step_desc,
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
status="failed",
duration_ms=int((time.monotonic() - step_start) * 1000),
result.steps.append(
AgenticStep(
step_num=i,
description=step_desc,
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
status="failed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
)
result.steps.append(step)
completed_results.append(f"Step {i}: FAILED")
# ── Phase 3: Summary ───────────────────────────────────────────────────
completed_count = sum(1 for s in result.steps if s.status == "completed")
adapted_count = sum(1 for s in result.steps if s.status == "adapted")
failed_count = sum(1 for s in result.steps if s.status == "failed")
parts = [f"Completed {completed_count}/{total_steps} steps"]
if adapted_count:
parts.append(f"{adapted_count} adapted")
if failed_count:
parts.append(f"{failed_count} failed")
result.summary = f"{task}: {', '.join(parts)}."
# Determine final status
if was_truncated:
result.status = "partial"
elif len(result.steps) < total_steps:
result.status = "partial"
elif any(s.status == "failed" for s in result.steps):
result.status = "partial"
else:
result.status = "completed"
# Phase 3: Summary
_summarize(result, total_steps, was_truncated)
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
await _broadcast_progress(