Compare commits

...

7 Commits

Author SHA1 Message Date
Alexander Whitestone
03f9a42fbc feat: add ResearchOrchestrator pipeline (src/timmy/research.py)
Some checks failed
Tests / lint (pull_request) Failing after 4s
Tests / test (pull_request) Has been skipped
Implements autonomous research pipeline that chains:
- Check local knowledge (semantic memory cache, confidence > 0.85)
- Generate queries via LLM cascade
- Web search (concurrent, deduplicated)
- Fetch top pages
- Synthesize structured report via LLM
- Crystallize results in semantic memory
- Write artifact (create Gitea issues from action items)

Includes full unit test suite (25 tests) covering all pipeline steps,
cache hits, graceful degradation, and Gitea integration.

Fixes #975

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 18:44:20 -04:00
Alexander Whitestone
3ab180b8a7 [claude] Add Gitea backup script (#990) (#996)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Co-authored-by: Alexander Whitestone <alexpaynex@gmail.com>
Co-committed-by: Alexander Whitestone <alexpaynex@gmail.com>
2026-03-22 22:36:51 +00:00
e24f49e58d [kimi] Add JSON validation guard to queue.json writes (#952) (#995)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-22 22:33:40 +00:00
1fa5cff5dc [kimi] Fix GITEA_API configuration in triage scripts (#951) (#994)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-22 22:28:23 +00:00
e255e7eb2a [kimi] Add docstrings to system.py route handlers (#940) (#992)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-22 22:12:36 +00:00
c3b6eb71c0 [kimi] Add docstrings to src/dashboard/routes/tasks.py (#939) (#991)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-22 22:08:28 +00:00
bebbe442b4 feat: WorldInterface + Heartbeat v2 (#871, #872) (#900)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Co-authored-by: Perplexity Computer <perplexity@tower.local>
Co-committed-by: Perplexity Computer <perplexity@tower.local>
2026-03-22 13:44:49 +00:00
25 changed files with 2666 additions and 9 deletions

View File

@@ -17,8 +17,23 @@ REPO_ROOT = Path(__file__).resolve().parent.parent
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json"
GITEA_API = "http://localhost:3000/api/v1"
REPO_SLUG = "rockachopa/Timmy-time-dashboard"
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
TAG_RE = re.compile(r"\[([^\]]+)\]")

83
scripts/gitea_backup.sh Executable file
View File

@@ -0,0 +1,83 @@
#!/bin/bash
# Gitea backup script — run on the VPS before any hardening changes.
# Usage: sudo bash scripts/gitea_backup.sh [off-site-dest]
#
# off-site-dest: optional rsync/scp destination for off-site copy
# e.g. user@backup-host:/backups/gitea/
#
# Refs: #971, #990
set -euo pipefail
BACKUP_DIR="/opt/gitea/backups"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
GITEA_CONF="/etc/gitea/app.ini"
GITEA_WORK_DIR="/var/lib/gitea"
OFFSITE_DEST="${1:-}"
echo "=== Gitea Backup — $TIMESTAMP ==="
# Ensure backup directory exists
mkdir -p "$BACKUP_DIR"
cd "$BACKUP_DIR"
# Run the dump
echo "[1/4] Running gitea dump..."
gitea dump -c "$GITEA_CONF"
# Find the newest zip (gitea dump names it gitea-dump-*.zip)
BACKUP_FILE=$(ls -t "$BACKUP_DIR"/gitea-dump-*.zip 2>/dev/null | head -1)
if [ -z "$BACKUP_FILE" ]; then
echo "ERROR: No backup zip found in $BACKUP_DIR"
exit 1
fi
BACKUP_SIZE=$(stat -c%s "$BACKUP_FILE" 2>/dev/null || stat -f%z "$BACKUP_FILE")
echo "[2/4] Backup created: $BACKUP_FILE ($BACKUP_SIZE bytes)"
if [ "$BACKUP_SIZE" -eq 0 ]; then
echo "ERROR: Backup file is 0 bytes"
exit 1
fi
# Lock down permissions
chmod 600 "$BACKUP_FILE"
# Verify contents
echo "[3/4] Verifying backup contents..."
CONTENTS=$(unzip -l "$BACKUP_FILE" 2>/dev/null || true)
check_component() {
if echo "$CONTENTS" | grep -q "$1"; then
echo " OK: $2"
else
echo " WARN: $2 not found in backup"
fi
}
check_component "gitea-db.sql" "Database dump"
check_component "gitea-repo" "Repositories"
check_component "custom" "Custom config"
check_component "app.ini" "app.ini"
# Off-site copy
if [ -n "$OFFSITE_DEST" ]; then
echo "[4/4] Copying to off-site: $OFFSITE_DEST"
rsync -avz "$BACKUP_FILE" "$OFFSITE_DEST"
echo " Off-site copy complete."
else
echo "[4/4] No off-site destination provided. Skipping."
echo " To copy later: scp $BACKUP_FILE user@backup-host:/backups/gitea/"
fi
echo ""
echo "=== Backup complete ==="
echo "File: $BACKUP_FILE"
echo "Size: $BACKUP_SIZE bytes"
echo ""
echo "To verify restore on a clean instance:"
echo " 1. Copy zip to test machine"
echo " 2. unzip $BACKUP_FILE"
echo " 3. gitea restore --from <extracted-dir> -c /etc/gitea/app.ini"
echo " 4. Verify repos and DB are intact"

View File

@@ -30,7 +30,22 @@ IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json"
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
# Default cycle duration in seconds (5 min); stale threshold = 2× this
@@ -187,7 +202,11 @@ def load_queue() -> list[dict]:
# Persist the cleaned queue so stale entries don't recur
_save_cleaned_queue(data, open_numbers)
return ready
except (json.JSONDecodeError, OSError):
except json.JSONDecodeError as exc:
print(f"[loop-guard] WARNING: Corrupt queue.json ({exc}) — returning empty queue")
return []
except OSError as exc:
print(f"[loop-guard] WARNING: Cannot read queue.json ({exc}) — returning empty queue")
return []

View File

@@ -20,11 +20,28 @@ from datetime import datetime, timezone
from pathlib import Path
# ── Config ──────────────────────────────────────────────────────────────
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
REPO_ROOT = Path(__file__).resolve().parent.parent
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
QUEUE_BACKUP_FILE = REPO_ROOT / ".loop" / "queue.json.bak"
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json"
CYCLE_RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
@@ -326,9 +343,38 @@ def run_triage() -> list[dict]:
ready = [s for s in scored if s["ready"]]
not_ready = [s for s in scored if not s["ready"]]
# Save backup before writing (if current file exists and is valid)
if QUEUE_FILE.exists():
try:
json.loads(QUEUE_FILE.read_text()) # Validate current file
QUEUE_BACKUP_FILE.write_text(QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass # Current file is corrupt, don't overwrite backup
# Write new queue file
QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
QUEUE_FILE.write_text(json.dumps(ready, indent=2) + "\n")
# Validate the write by re-reading and parsing
try:
json.loads(QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError) as exc:
print(f"[triage] ERROR: queue.json validation failed: {exc}", file=sys.stderr)
# Restore from backup if available
if QUEUE_BACKUP_FILE.exists():
try:
backup_data = QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data) # Validate backup
QUEUE_FILE.write_text(backup_data)
print(f"[triage] Restored queue.json from backup")
except (json.JSONDecodeError, OSError) as restore_exc:
print(f"[triage] ERROR: Backup restore failed: {restore_exc}", file=sys.stderr)
# Write empty list as last resort
QUEUE_FILE.write_text("[]\n")
else:
# No backup, write empty list
QUEUE_FILE.write_text("[]\n")
# Write retro entry
retro_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),

View File

@@ -56,11 +56,13 @@ async def self_modify_queue(request: Request):
@router.get("/swarm/mission-control", response_class=HTMLResponse)
async def mission_control(request: Request):
"""Render the swarm mission control dashboard page."""
return templates.TemplateResponse(request, "mission_control.html", {})
@router.get("/bugs", response_class=HTMLResponse)
async def bugs_page(request: Request):
"""Render the bug tracking page."""
return templates.TemplateResponse(
request,
"bugs.html",
@@ -75,16 +77,19 @@ async def bugs_page(request: Request):
@router.get("/self-coding", response_class=HTMLResponse)
async def self_coding(request: Request):
"""Render the self-coding automation status page."""
return templates.TemplateResponse(request, "self_coding.html", {"stats": {}})
@router.get("/hands", response_class=HTMLResponse)
async def hands_page(request: Request):
"""Render the hands (automation executions) page."""
return templates.TemplateResponse(request, "hands.html", {"executions": []})
@router.get("/creative/ui", response_class=HTMLResponse)
async def creative_ui(request: Request):
"""Render the creative UI playground page."""
return templates.TemplateResponse(request, "creative.html", {})

View File

@@ -145,6 +145,7 @@ async def tasks_page(request: Request):
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending(request: Request):
"""Return HTMX partial for pending approval tasks."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
@@ -164,6 +165,7 @@ async def tasks_pending(request: Request):
@router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active(request: Request):
"""Return HTMX partial for active (approved/running/paused) tasks."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
@@ -183,6 +185,7 @@ async def tasks_active(request: Request):
@router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed(request: Request):
"""Return HTMX partial for completed/vetoed/failed tasks (last 50)."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
@@ -241,26 +244,31 @@ async def create_task_form(
@router.post("/tasks/{task_id}/approve", response_class=HTMLResponse)
async def approve_task(request: Request, task_id: str):
"""Approve a pending task and move it to active queue."""
return await _set_status(request, task_id, "approved")
@router.post("/tasks/{task_id}/veto", response_class=HTMLResponse)
async def veto_task(request: Request, task_id: str):
"""Veto a task, marking it as rejected."""
return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/pause", response_class=HTMLResponse)
async def pause_task(request: Request, task_id: str):
"""Pause a running or approved task."""
return await _set_status(request, task_id, "paused")
@router.post("/tasks/{task_id}/cancel", response_class=HTMLResponse)
async def cancel_task(request: Request, task_id: str):
"""Cancel a task (marks as vetoed)."""
return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/retry", response_class=HTMLResponse)
async def retry_task(request: Request, task_id: str):
"""Retry a failed/vetoed task by moving it back to approved."""
return await _set_status(request, task_id, "approved")
@@ -271,6 +279,7 @@ async def modify_task(
title: str = Form(...),
description: str = Form(""),
):
"""Update task title and description."""
with _get_db() as db:
db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?",

View File

@@ -0,0 +1,29 @@
"""World interface — engine-agnostic adapter pattern for embodied agents.
Provides the ``WorldInterface`` ABC and an adapter registry so Timmy can
observe, act, and speak in any game world (Morrowind, Luanti, Godot, …)
through a single contract.
Quick start::
from infrastructure.world import get_adapter, register_adapter
from infrastructure.world.interface import WorldInterface
register_adapter("mock", MockWorldAdapter)
world = get_adapter("mock")
perception = world.observe()
"""
from infrastructure.world.registry import AdapterRegistry
_registry = AdapterRegistry()
register_adapter = _registry.register
get_adapter = _registry.get
list_adapters = _registry.list_adapters
__all__ = [
"register_adapter",
"get_adapter",
"list_adapters",
]

View File

@@ -0,0 +1 @@
"""Built-in world adapters."""

View File

@@ -0,0 +1,99 @@
"""Mock world adapter — returns canned perception and logs commands.
Useful for testing the heartbeat loop and WorldInterface contract
without a running game server.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import UTC, datetime
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import (
ActionResult,
ActionStatus,
CommandInput,
PerceptionOutput,
)
logger = logging.getLogger(__name__)
@dataclass
class _ActionLog:
"""Record of an action dispatched to the mock world."""
command: CommandInput
timestamp: datetime
class MockWorldAdapter(WorldInterface):
"""In-memory mock adapter for testing.
* ``observe()`` returns configurable canned perception.
* ``act()`` logs the command and returns success.
* ``speak()`` logs the message.
Inspect ``action_log`` and ``speech_log`` to verify behaviour in tests.
"""
def __init__(
self,
*,
location: str = "Test Chamber",
entities: list[str] | None = None,
events: list[str] | None = None,
) -> None:
self._location = location
self._entities = entities or ["TestNPC"]
self._events = events or []
self._connected = False
self.action_log: list[_ActionLog] = []
self.speech_log: list[dict] = []
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
self._connected = True
logger.info("MockWorldAdapter connected")
def disconnect(self) -> None:
self._connected = False
logger.info("MockWorldAdapter disconnected")
@property
def is_connected(self) -> bool:
return self._connected
# -- core contract -----------------------------------------------------
def observe(self) -> PerceptionOutput:
logger.debug("MockWorldAdapter.observe()")
return PerceptionOutput(
timestamp=datetime.now(UTC),
location=self._location,
entities=list(self._entities),
events=list(self._events),
raw={"adapter": "mock"},
)
def act(self, command: CommandInput) -> ActionResult:
logger.debug("MockWorldAdapter.act(%s)", command.action)
self.action_log.append(_ActionLog(command=command, timestamp=datetime.now(UTC)))
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Mock executed: {command.action}",
data={"adapter": "mock"},
)
def speak(self, message: str, target: str | None = None) -> None:
logger.debug("MockWorldAdapter.speak(%r, target=%r)", message, target)
self.speech_log.append(
{
"message": message,
"target": target,
"timestamp": datetime.now(UTC).isoformat(),
}
)

View File

@@ -0,0 +1,58 @@
"""TES3MP world adapter — stub for Morrowind multiplayer via TES3MP.
This adapter will eventually connect to a TES3MP server and translate
the WorldInterface contract into TES3MP commands. For now every method
raises ``NotImplementedError`` with guidance on what needs wiring up.
Once PR #864 merges, import PerceptionOutput and CommandInput directly
from ``infrastructure.morrowind.schemas`` if their shapes differ from
the canonical types in ``infrastructure.world.types``.
"""
from __future__ import annotations
import logging
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import ActionResult, CommandInput, PerceptionOutput
logger = logging.getLogger(__name__)
class TES3MPWorldAdapter(WorldInterface):
"""Stub adapter for TES3MP (Morrowind multiplayer).
All core methods raise ``NotImplementedError``.
Implement ``connect()`` first — it should open a socket to the
TES3MP server and authenticate.
"""
def __init__(self, *, host: str = "localhost", port: int = 25565) -> None:
self._host = host
self._port = port
self._connected = False
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
raise NotImplementedError("TES3MPWorldAdapter.connect() — wire up TES3MP server socket")
def disconnect(self) -> None:
raise NotImplementedError("TES3MPWorldAdapter.disconnect() — close TES3MP server socket")
@property
def is_connected(self) -> bool:
return self._connected
# -- core contract (stubs) ---------------------------------------------
def observe(self) -> PerceptionOutput:
raise NotImplementedError("TES3MPWorldAdapter.observe() — poll TES3MP for player/NPC state")
def act(self, command: CommandInput) -> ActionResult:
raise NotImplementedError(
"TES3MPWorldAdapter.act() — translate CommandInput to TES3MP packet"
)
def speak(self, message: str, target: str | None = None) -> None:
raise NotImplementedError("TES3MPWorldAdapter.speak() — send chat message via TES3MP")

View File

@@ -0,0 +1,64 @@
"""Abstract WorldInterface — the contract every game-world adapter must fulfil.
Follows a Gymnasium-inspired pattern: observe → act → speak, with each
method returning strongly-typed data structures.
Any future engine (TES3MP, Luanti, Godot, …) plugs in by subclassing
``WorldInterface`` and implementing the three methods.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from infrastructure.world.types import ActionResult, CommandInput, PerceptionOutput
class WorldInterface(ABC):
"""Engine-agnostic base class for world adapters.
Subclasses must implement:
- ``observe()`` — gather structured perception from the world
- ``act()`` — dispatch a command and return the outcome
- ``speak()`` — send a message to an NPC / player / broadcast
Lifecycle hooks ``connect()`` and ``disconnect()`` are optional.
"""
# -- lifecycle (optional overrides) ------------------------------------
def connect(self) -> None: # noqa: B027
"""Establish connection to the game world.
Default implementation is a no-op. Override to open sockets,
authenticate, etc.
"""
def disconnect(self) -> None: # noqa: B027
"""Tear down the connection.
Default implementation is a no-op.
"""
@property
def is_connected(self) -> bool:
"""Return ``True`` if the adapter has an active connection.
Default returns ``True``. Override for adapters that maintain
persistent connections.
"""
return True
# -- core contract (must implement) ------------------------------------
@abstractmethod
def observe(self) -> PerceptionOutput:
"""Return a structured snapshot of the current world state."""
@abstractmethod
def act(self, command: CommandInput) -> ActionResult:
"""Execute *command* in the world and return the result."""
@abstractmethod
def speak(self, message: str, target: str | None = None) -> None:
"""Send *message* in the world, optionally directed at *target*."""

View File

@@ -0,0 +1,54 @@
"""Adapter registry — register and instantiate world adapters by name.
Usage::
registry = AdapterRegistry()
registry.register("mock", MockWorldAdapter)
adapter = registry.get("mock", some_kwarg="value")
"""
from __future__ import annotations
import logging
from typing import Any
from infrastructure.world.interface import WorldInterface
logger = logging.getLogger(__name__)
class AdapterRegistry:
"""Name → WorldInterface class registry with instantiation."""
def __init__(self) -> None:
self._adapters: dict[str, type[WorldInterface]] = {}
def register(self, name: str, cls: type[WorldInterface]) -> None:
"""Register an adapter class under *name*.
Raises ``TypeError`` if *cls* is not a ``WorldInterface`` subclass.
"""
if not (isinstance(cls, type) and issubclass(cls, WorldInterface)):
raise TypeError(f"{cls!r} is not a WorldInterface subclass")
if name in self._adapters:
logger.warning("Overwriting adapter %r (was %r)", name, self._adapters[name])
self._adapters[name] = cls
logger.info("Registered world adapter: %s%s", name, cls.__name__)
def get(self, name: str, **kwargs: Any) -> WorldInterface:
"""Instantiate and return the adapter registered as *name*.
Raises ``KeyError`` if *name* is not registered.
"""
cls = self._adapters[name]
return cls(**kwargs)
def list_adapters(self) -> list[str]:
"""Return sorted list of registered adapter names."""
return sorted(self._adapters)
def __contains__(self, name: str) -> bool:
return name in self._adapters
def __len__(self) -> int:
return len(self._adapters)

View File

@@ -0,0 +1,71 @@
"""Canonical data types for world interaction.
These mirror the PerceptionOutput / CommandInput types from PR #864's
``morrowind/schemas.py``. When that PR merges, these can be replaced
with re-exports — but until then they serve as the stable contract for
every WorldInterface adapter.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import StrEnum
class ActionStatus(StrEnum):
"""Outcome of an action dispatched to the world."""
SUCCESS = "success"
FAILURE = "failure"
PENDING = "pending"
NOOP = "noop"
@dataclass
class PerceptionOutput:
"""Structured world state returned by ``WorldInterface.observe()``.
Attributes:
timestamp: When the observation was captured.
location: Free-form location descriptor (e.g. "Balmora, Fighters Guild").
entities: List of nearby entity descriptions.
events: Recent game events since last observation.
raw: Optional raw / engine-specific payload for advanced consumers.
"""
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
location: str = ""
entities: list[str] = field(default_factory=list)
events: list[str] = field(default_factory=list)
raw: dict = field(default_factory=dict)
@dataclass
class CommandInput:
"""Action command sent via ``WorldInterface.act()``.
Attributes:
action: Verb / action name (e.g. "move", "attack", "use_item").
target: Optional target identifier.
parameters: Arbitrary key-value payload for engine-specific params.
"""
action: str
target: str | None = None
parameters: dict = field(default_factory=dict)
@dataclass
class ActionResult:
"""Outcome returned by ``WorldInterface.act()``.
Attributes:
status: Whether the action succeeded, failed, etc.
message: Human-readable description of the outcome.
data: Arbitrary engine-specific result payload.
"""
status: ActionStatus = ActionStatus.SUCCESS
message: str = ""
data: dict = field(default_factory=dict)

286
src/loop/heartbeat.py Normal file
View File

@@ -0,0 +1,286 @@
"""Heartbeat v2 — WorldInterface-driven cognitive loop.
Drives real observe → reason → act → reflect cycles through whatever
``WorldInterface`` adapter is connected. When no adapter is present,
gracefully falls back to the existing ``run_cycle()`` behaviour.
Usage::
heartbeat = Heartbeat(world=adapter, interval=30.0)
await heartbeat.run_once() # single cycle
await heartbeat.start() # background loop
heartbeat.stop() # graceful shutdown
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from loop.phase1_gather import gather
from loop.phase2_reason import reason
from loop.phase3_act import act
from loop.schema import ContextPayload
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Cycle log entry
# ---------------------------------------------------------------------------
@dataclass
class CycleRecord:
"""One observe → reason → act → reflect cycle."""
cycle_id: int
timestamp: str
observation: dict = field(default_factory=dict)
reasoning_summary: str = ""
action_taken: str = ""
action_status: str = ""
reflect_notes: str = ""
duration_ms: int = 0
# ---------------------------------------------------------------------------
# Heartbeat
# ---------------------------------------------------------------------------
class Heartbeat:
"""Manages the recurring cognitive loop with optional world adapter.
Parameters
----------
world:
A ``WorldInterface`` instance (or ``None`` for passive mode).
interval:
Seconds between heartbeat ticks. 30 s for embodied mode,
300 s (5 min) for passive thinking.
on_cycle:
Optional async callback invoked after each cycle with the
``CycleRecord``.
"""
def __init__(
self,
*,
world=None, # WorldInterface | None
interval: float = 30.0,
on_cycle=None, # Callable[[CycleRecord], Awaitable[None]] | None
) -> None:
self._world = world
self._interval = interval
self._on_cycle = on_cycle
self._cycle_count: int = 0
self._running = False
self._task: asyncio.Task | None = None
self.history: list[CycleRecord] = []
# -- properties --------------------------------------------------------
@property
def world(self):
return self._world
@world.setter
def world(self, adapter) -> None:
self._world = adapter
@property
def interval(self) -> float:
return self._interval
@interval.setter
def interval(self, value: float) -> None:
self._interval = max(1.0, value)
@property
def is_running(self) -> bool:
return self._running
@property
def cycle_count(self) -> int:
return self._cycle_count
# -- single cycle ------------------------------------------------------
async def run_once(self) -> CycleRecord:
"""Execute one full heartbeat cycle.
If a world adapter is present:
1. Observe — ``world.observe()``
2. Gather + Reason + Act via the three-phase loop, with the
observation injected into the payload
3. Dispatch the decided action back to ``world.act()``
4. Reflect — log the cycle
Without an adapter the existing loop runs on a timer-sourced
payload (passive thinking).
"""
self._cycle_count += 1
start = time.monotonic()
record = CycleRecord(
cycle_id=self._cycle_count,
timestamp=datetime.now(UTC).isoformat(),
)
if self._world is not None:
record = await self._embodied_cycle(record)
else:
record = await self._passive_cycle(record)
record.duration_ms = int((time.monotonic() - start) * 1000)
self.history.append(record)
# Broadcast via WebSocket (best-effort)
await self._broadcast(record)
if self._on_cycle:
await self._on_cycle(record)
logger.info(
"Heartbeat cycle #%d complete (%d ms) — action=%s status=%s",
record.cycle_id,
record.duration_ms,
record.action_taken or "(passive)",
record.action_status or "n/a",
)
return record
# -- background loop ---------------------------------------------------
async def start(self) -> None:
"""Start the recurring heartbeat loop as a background task."""
if self._running:
logger.warning("Heartbeat already running")
return
self._running = True
self._task = asyncio.current_task() or asyncio.ensure_future(self._loop())
if self._task is not asyncio.current_task():
return
await self._loop()
async def _loop(self) -> None:
logger.info(
"Heartbeat loop started (interval=%.1fs, adapter=%s)",
self._interval,
type(self._world).__name__ if self._world else "None",
)
while self._running:
try:
await self.run_once()
except Exception:
logger.exception("Heartbeat cycle failed")
await asyncio.sleep(self._interval)
def stop(self) -> None:
"""Signal the heartbeat loop to stop after the current cycle."""
self._running = False
logger.info("Heartbeat stop requested")
# -- internal: embodied cycle ------------------------------------------
async def _embodied_cycle(self, record: CycleRecord) -> CycleRecord:
"""Cycle with a live world adapter: observe → reason → act → reflect."""
from infrastructure.world.types import ActionStatus, CommandInput
# 1. Observe
perception = self._world.observe()
record.observation = {
"location": perception.location,
"entities": perception.entities,
"events": perception.events,
}
# 2. Feed observation into the three-phase loop
obs_content = (
f"Location: {perception.location}\n"
f"Entities: {', '.join(perception.entities)}\n"
f"Events: {', '.join(perception.events)}"
)
payload = ContextPayload(
source="world",
content=obs_content,
metadata={"perception": record.observation},
)
gathered = gather(payload)
reasoned = reason(gathered)
acted = act(reasoned)
# Extract action decision from the acted payload
action_name = acted.metadata.get("action", "idle")
action_target = acted.metadata.get("action_target")
action_params = acted.metadata.get("action_params", {})
record.reasoning_summary = acted.metadata.get("reasoning", acted.content[:200])
# 3. Dispatch action to world
if action_name != "idle":
cmd = CommandInput(
action=action_name,
target=action_target,
parameters=action_params,
)
result = self._world.act(cmd)
record.action_taken = action_name
record.action_status = result.status.value
else:
record.action_taken = "idle"
record.action_status = ActionStatus.NOOP.value
# 4. Reflect
record.reflect_notes = (
f"Observed {len(perception.entities)} entities at {perception.location}. "
f"Action: {record.action_taken}{record.action_status}."
)
return record
# -- internal: passive cycle -------------------------------------------
async def _passive_cycle(self, record: CycleRecord) -> CycleRecord:
"""Cycle without a world adapter — existing think_once() behaviour."""
payload = ContextPayload(
source="timer",
content="heartbeat",
metadata={"mode": "passive"},
)
gathered = gather(payload)
reasoned = reason(gathered)
acted = act(reasoned)
record.reasoning_summary = acted.content[:200]
record.action_taken = "think"
record.action_status = "noop"
record.reflect_notes = "Passive thinking cycle — no world adapter connected."
return record
# -- broadcast ---------------------------------------------------------
async def _broadcast(self, record: CycleRecord) -> None:
"""Emit heartbeat cycle data via WebSocket (best-effort)."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"heartbeat.cycle",
{
"cycle_id": record.cycle_id,
"timestamp": record.timestamp,
"action": record.action_taken,
"action_status": record.action_status,
"reasoning_summary": record.reasoning_summary[:300],
"observation": record.observation,
"duration_ms": record.duration_ms,
},
)
except (ImportError, AttributeError, ConnectionError, RuntimeError) as exc:
logger.debug("Heartbeat broadcast skipped: %s", exc)

View File

@@ -17,9 +17,9 @@ logger = logging.getLogger(__name__)
def gather(payload: ContextPayload) -> ContextPayload:
"""Accept raw input and return structured context for reasoning.
Stub: tags the payload with phase=gather and logs transit.
Timmy will flesh this out with context selection, memory lookup,
adapter polling, and attention-residual weighting.
When the payload carries a ``perception`` dict in metadata (injected by
the heartbeat loop from a WorldInterface adapter), that observation is
folded into the gathered context. Otherwise behaves as before.
"""
logger.info(
"Phase 1 (Gather) received: source=%s content_len=%d tokens=%d",
@@ -28,7 +28,20 @@ def gather(payload: ContextPayload) -> ContextPayload:
payload.token_count,
)
result = payload.with_metadata(phase="gather", gathered=True)
extra: dict = {"phase": "gather", "gathered": True}
# Enrich with world observation when present
perception = payload.metadata.get("perception")
if perception:
extra["world_observation"] = perception
logger.info(
"Phase 1 (Gather) world observation: location=%s entities=%d events=%d",
perception.get("location", "?"),
len(perception.get("entities", [])),
len(perception.get("events", [])),
)
result = payload.with_metadata(**extra)
logger.info(
"Phase 1 (Gather) produced: metadata_keys=%s",

555
src/timmy/research.py Normal file
View File

@@ -0,0 +1,555 @@
"""ResearchOrchestrator — autonomous research pipeline.
Chains: Check Local → Generate Queries → Search → Fetch → Synthesize →
Crystallize → Write Artifact into an end-to-end research workflow.
Usage:
from timmy.research import ResearchOrchestrator, run_research
orchestrator = ResearchOrchestrator(cascade=router, memory=memory_fns)
result = await orchestrator.run("Bitcoin Lightning Network scaling")
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ── Data structures ──────────────────────────────────────────────────────────
CONFIDENCE_THRESHOLD = 0.85
DEFAULT_QUERIES_PER_TOPIC = 8
DEFAULT_RESULTS_PER_QUERY = 5
DEFAULT_PAGES_TO_FETCH = 10
DEFAULT_FETCH_TOKEN_LIMIT = 3000
DEFAULT_SYNTHESIS_MAX_TOKENS = 4000
@dataclass
class ResearchResult:
"""Output of a completed research pipeline run."""
topic: str
report: str
queries_generated: list[str] = field(default_factory=list)
sources: list[dict[str, str]] = field(default_factory=list)
action_items: list[str] = field(default_factory=list)
cache_hit: bool = False
duration_ms: float = 0.0
metrics: dict[str, Any] = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
@dataclass
class SearchSnippet:
"""A single search result snippet."""
title: str
url: str
snippet: str
relevance: float = 0.0
@dataclass
class FetchedPage:
"""A fetched and truncated web page."""
url: str
title: str
content: str
token_estimate: int = 0
# ── Memory interface ─────────────────────────────────────────────────────────
@dataclass
class MemoryInterface:
"""Abstraction over the memory system for research.
Accepts callables so the orchestrator doesn't depend on a specific
memory implementation. Defaults wire to timmy.memory_system.
"""
search_fn: Any = None # (query, limit) -> list[MemoryEntry]
store_fn: Any = None # (content, source, context_type, ...) -> MemoryEntry
def __post_init__(self):
if self.search_fn is None or self.store_fn is None:
self._load_defaults()
def _load_defaults(self):
try:
from timmy.memory_system import search_memories, store_memory
if self.search_fn is None:
self.search_fn = search_memories
if self.store_fn is None:
self.store_fn = store_memory
except ImportError:
logger.warning("Memory system not available — research will skip caching")
if self.search_fn is None:
self.search_fn = lambda query, **kw: []
if self.store_fn is None:
self.store_fn = lambda content, source, **kw: None
# ── Tool interface ───────────────────────────────────────────────────────────
@dataclass
class ResearchTools:
"""Web search and fetch callables.
These are async callables:
web_search(query: str, limit: int) -> list[dict]
web_fetch(url: str, max_tokens: int) -> str
"""
web_search: Any = None
web_fetch: Any = None
# ── Orchestrator ─────────────────────────────────────────────────────────────
class ResearchOrchestrator:
"""Pipeline that chains research steps into an autonomous workflow.
Steps:
0. CHECK LOCAL KNOWLEDGE — search memory, return cached if confident
1. GENERATE QUERIES — ask LLM to produce search queries
2. SEARCH — execute queries via web_search tool
3. FETCH — rank snippets, fetch top pages
4. SYNTHESIZE — produce structured report via LLM
5. CRYSTALLIZE — store result in semantic memory
6. WRITE ARTIFACT — create Gitea issues from action items
"""
def __init__(
self,
cascade: Any,
memory: MemoryInterface | None = None,
tools: ResearchTools | None = None,
) -> None:
self.cascade = cascade
self.memory = memory or MemoryInterface()
self.tools = tools or ResearchTools()
self._metrics: dict[str, int] = {
"research_cache_hit": 0,
"research_api_call": 0,
}
async def run(
self,
topic: str,
template: str | None = None,
context: dict[str, Any] | None = None,
) -> ResearchResult:
"""Execute the full research pipeline.
Args:
topic: The research topic or question.
template: Optional prompt template for synthesis.
context: Additional context dict (cascade_tier hint, etc.).
Returns:
ResearchResult with report, sources, and action items.
"""
start = time.monotonic()
context = context or {}
cascade_tier = context.get("cascade_tier")
# Step 0: Check local knowledge
cached = await self._check_local_knowledge(topic)
if cached is not None:
self._metrics["research_cache_hit"] += 1
cached.duration_ms = (time.monotonic() - start) * 1000
return cached
self._metrics["research_api_call"] += 1
# Step 1: Generate queries
queries = await self._generate_queries(topic, template, cascade_tier)
# Step 2: Search
snippets = await self._search(queries)
# Step 3: Fetch top pages
pages = await self._fetch(snippets)
# Step 4: Synthesize
report = await self._synthesize(topic, template, pages, cascade_tier)
# Step 5: Extract action items
action_items = _extract_action_items(report)
# Build result
sources = [{"url": p.url, "title": p.title} for p in pages]
result = ResearchResult(
topic=topic,
report=report,
queries_generated=queries,
sources=sources,
action_items=action_items,
cache_hit=False,
duration_ms=(time.monotonic() - start) * 1000,
metrics=dict(self._metrics),
)
# Step 6: Crystallize — store in memory
await self._crystallize(topic, result)
# Step 7: Write artifact — create Gitea issues
await self._write_artifact(result)
return result
# ── Pipeline steps ───────────────────────────────────────────────────
async def _check_local_knowledge(self, topic: str) -> ResearchResult | None:
"""Search semantic memory for existing research on this topic."""
try:
results = self.memory.search_fn(
query=topic, limit=10, context_type="research"
)
if not results:
return None
# Check if top result has high confidence
top = results[0]
score = getattr(top, "relevance_score", 0.0) or 0.0
if score >= CONFIDENCE_THRESHOLD:
content = getattr(top, "content", str(top))
logger.info(
"Research cache hit for '%s' (score=%.2f)", topic, score
)
return ResearchResult(
topic=topic,
report=content,
cache_hit=True,
metrics={"research_cache_hit": 1},
)
except Exception as exc:
logger.warning("Local knowledge check failed: %s", exc)
return None
async def _generate_queries(
self,
topic: str,
template: str | None,
cascade_tier: str | None,
) -> list[str]:
"""Ask the LLM to generate search queries for the topic."""
prompt = (
f"Generate {DEFAULT_QUERIES_PER_TOPIC} diverse web search queries "
f"to thoroughly research the following topic. Return ONLY the "
f"queries, one per line, no numbering or bullets.\n\n"
f"Topic: {topic}"
)
if template:
prompt += f"\n\nResearch template context:\n{template}"
messages = [
{"role": "system", "content": "You are a research query generator."},
{"role": "user", "content": prompt},
]
kwargs: dict[str, Any] = {"messages": messages, "temperature": 0.7}
if cascade_tier:
kwargs["model"] = cascade_tier
try:
response = await self.cascade.complete(**kwargs)
raw = response.get("content", "")
queries = [
line.strip()
for line in raw.strip().splitlines()
if line.strip() and not line.strip().startswith("#")
]
# Clean numbering prefixes
cleaned = []
for q in queries:
q = re.sub(r"^\d+[\.\)]\s*", "", q)
q = re.sub(r"^[-*]\s*", "", q)
if q:
cleaned.append(q)
return cleaned[:DEFAULT_QUERIES_PER_TOPIC + 4] # slight over-generate
except Exception as exc:
logger.warning("Query generation failed: %s", exc)
# Fallback: use topic itself as a single query
return [topic]
async def _search(self, queries: list[str]) -> list[SearchSnippet]:
"""Execute search queries and collect snippets."""
if not self.tools.web_search:
logger.warning("No web_search tool configured — skipping search step")
return []
all_snippets: list[SearchSnippet] = []
async def _run_query(query: str) -> list[SearchSnippet]:
try:
results = await asyncio.to_thread(
self.tools.web_search, query, DEFAULT_RESULTS_PER_QUERY
)
snippets = []
for r in (results or []):
snippets.append(
SearchSnippet(
title=r.get("title", ""),
url=r.get("url", ""),
snippet=r.get("snippet", ""),
)
)
return snippets
except Exception as exc:
logger.warning("Search failed for query '%s': %s", query, exc)
return []
# Run searches concurrently
tasks = [_run_query(q) for q in queries]
results = await asyncio.gather(*tasks)
for snippets in results:
all_snippets.extend(snippets)
# Deduplicate by URL
seen_urls: set[str] = set()
unique: list[SearchSnippet] = []
for s in all_snippets:
if s.url and s.url not in seen_urls:
seen_urls.add(s.url)
unique.append(s)
return unique
async def _fetch(self, snippets: list[SearchSnippet]) -> list[FetchedPage]:
"""Fetch top pages from search snippets."""
if not self.tools.web_fetch:
logger.warning("No web_fetch tool configured — skipping fetch step")
return []
# Take top N snippets
to_fetch = snippets[:DEFAULT_PAGES_TO_FETCH]
pages: list[FetchedPage] = []
async def _fetch_one(snippet: SearchSnippet) -> FetchedPage | None:
try:
content = await asyncio.to_thread(
self.tools.web_fetch, snippet.url, DEFAULT_FETCH_TOKEN_LIMIT
)
if content:
return FetchedPage(
url=snippet.url,
title=snippet.title,
content=content[:DEFAULT_FETCH_TOKEN_LIMIT * 4],
token_estimate=len(content.split()),
)
except Exception as exc:
logger.warning("Fetch failed for %s: %s", snippet.url, exc)
return None
tasks = [_fetch_one(s) for s in to_fetch]
results = await asyncio.gather(*tasks)
for page in results:
if page is not None:
pages.append(page)
return pages
async def _synthesize(
self,
topic: str,
template: str | None,
pages: list[FetchedPage],
cascade_tier: str | None,
) -> str:
"""Synthesize fetched pages into a structured research report."""
# Build context from fetched pages
context_parts = []
for i, page in enumerate(pages, 1):
context_parts.append(
f"--- Source {i}: {page.title} ({page.url}) ---\n"
f"{page.content[:DEFAULT_FETCH_TOKEN_LIMIT * 4]}\n"
)
sources_text = "\n".join(context_parts) if context_parts else "(no sources fetched)"
if template:
prompt = (
f"{template}\n\n"
f"Topic: {topic}\n\n"
f"Research sources:\n{sources_text}\n\n"
f"Synthesize a comprehensive report based on the sources above."
)
else:
prompt = (
f"Write a comprehensive research report on: {topic}\n\n"
f"Research sources:\n{sources_text}\n\n"
f"Structure your report with:\n"
f"- Executive summary\n"
f"- Key findings\n"
f"- Analysis\n"
f"- Action items (prefix each with 'ACTION:')\n"
f"- Sources cited"
)
messages = [
{"role": "system", "content": "You are a research analyst producing structured reports."},
{"role": "user", "content": prompt},
]
kwargs: dict[str, Any] = {
"messages": messages,
"temperature": 0.3,
"max_tokens": DEFAULT_SYNTHESIS_MAX_TOKENS,
}
if cascade_tier:
kwargs["model"] = cascade_tier
try:
response = await self.cascade.complete(**kwargs)
return response.get("content", "")
except Exception as exc:
logger.error("Synthesis failed: %s", exc)
# Fallback: return raw source summaries
return (
f"# Research: {topic}\n\n"
f"Synthesis failed ({exc}). Raw sources:\n\n{sources_text}"
)
async def _crystallize(self, topic: str, result: ResearchResult) -> None:
"""Store the research result in semantic memory."""
try:
self.memory.store_fn(
content=result.report,
source="research_orchestrator",
context_type="research",
metadata={
"topic": topic,
"sources": result.sources,
"action_items": result.action_items,
"cache_hit": result.cache_hit,
"duration_ms": result.duration_ms,
},
)
logger.info("Crystallized research on '%s' into memory", topic)
except Exception as exc:
logger.warning("Failed to crystallize research: %s", exc)
async def _write_artifact(self, result: ResearchResult) -> None:
"""Create Gitea issues from action items."""
if not result.action_items:
return
try:
await asyncio.to_thread(_create_gitea_issues, result)
except Exception as exc:
logger.warning("Failed to create Gitea issues: %s", exc)
def get_metrics(self) -> dict[str, int]:
"""Return current research pipeline metrics."""
return dict(self._metrics)
# ── Helpers ──────────────────────────────────────────────────────────────────
def _extract_action_items(report: str) -> list[str]:
"""Extract action items from a research report.
Looks for lines prefixed with ACTION:, TODO:, or - [ ].
"""
items: list[str] = []
for line in report.splitlines():
stripped = line.strip()
# ACTION: prefix
match = re.match(r"^(?:ACTION|TODO)\s*:\s*(.+)", stripped, re.IGNORECASE)
if match:
items.append(match.group(1).strip())
continue
# Markdown checkbox
match = re.match(r"^-\s*\[\s*\]\s*(.+)", stripped)
if match:
items.append(match.group(1).strip())
return items
def _create_gitea_issues(result: ResearchResult) -> None:
"""Create Gitea issues for action items (runs in thread)."""
if not settings.gitea_token or not settings.gitea_url:
logger.debug("Gitea not configured — skipping issue creation")
return
try:
import requests
except ImportError:
logger.debug("requests not available — skipping Gitea issue creation")
return
base_url = settings.gitea_url.rstrip("/")
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
for item in result.action_items:
try:
payload = {
"title": f"[research] {item[:100]}",
"body": (
f"Auto-generated from research on: **{result.topic}**\n\n"
f"Action item: {item}\n\n"
f"---\n"
f"_Created by ResearchOrchestrator_"
),
}
resp = requests.post(
f"{base_url}/api/v1/repos/{repo}/issues",
headers=headers,
json=payload,
timeout=10,
)
if resp.status_code in (200, 201):
logger.info("Created Gitea issue: %s", item[:60])
else:
logger.warning(
"Gitea issue creation failed (%d): %s",
resp.status_code,
resp.text[:200],
)
except Exception as exc:
logger.warning("Failed to create issue '%s': %s", item[:60], exc)
# ── Convenience function ─────────────────────────────────────────────────────
async def run_research(
topic: str,
template: str | None = None,
context: dict[str, Any] | None = None,
) -> ResearchResult:
"""Convenience function to run research with default dependencies.
Creates a ResearchOrchestrator with the cascade router singleton
and default memory, then executes the pipeline.
"""
from infrastructure.router.cascade import get_router
cascade = get_router()
orchestrator = ResearchOrchestrator(cascade=cascade)
return await orchestrator.run(topic, template=template, context=context)

View File

View File

@@ -0,0 +1,129 @@
"""Tests for the WorldInterface contract and type system."""
import pytest
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import (
ActionResult,
ActionStatus,
CommandInput,
PerceptionOutput,
)
# ---------------------------------------------------------------------------
# Type construction
# ---------------------------------------------------------------------------
class TestPerceptionOutput:
def test_defaults(self):
p = PerceptionOutput()
assert p.location == ""
assert p.entities == []
assert p.events == []
assert p.raw == {}
assert p.timestamp is not None
def test_custom_values(self):
p = PerceptionOutput(
location="Balmora",
entities=["Guard", "Merchant"],
events=["door_opened"],
)
assert p.location == "Balmora"
assert len(p.entities) == 2
assert "door_opened" in p.events
class TestCommandInput:
def test_minimal(self):
c = CommandInput(action="move")
assert c.action == "move"
assert c.target is None
assert c.parameters == {}
def test_with_target_and_params(self):
c = CommandInput(action="attack", target="Rat", parameters={"weapon": "sword"})
assert c.target == "Rat"
assert c.parameters["weapon"] == "sword"
class TestActionResult:
def test_defaults(self):
r = ActionResult()
assert r.status == ActionStatus.SUCCESS
assert r.message == ""
def test_failure(self):
r = ActionResult(status=ActionStatus.FAILURE, message="blocked")
assert r.status == ActionStatus.FAILURE
class TestActionStatus:
def test_values(self):
assert ActionStatus.SUCCESS.value == "success"
assert ActionStatus.FAILURE.value == "failure"
assert ActionStatus.PENDING.value == "pending"
assert ActionStatus.NOOP.value == "noop"
# ---------------------------------------------------------------------------
# Abstract contract
# ---------------------------------------------------------------------------
class TestWorldInterfaceContract:
"""Verify the ABC cannot be instantiated directly."""
def test_cannot_instantiate(self):
with pytest.raises(TypeError):
WorldInterface()
def test_subclass_must_implement_observe(self):
class Incomplete(WorldInterface):
def act(self, command):
pass
def speak(self, message, target=None):
pass
with pytest.raises(TypeError):
Incomplete()
def test_subclass_must_implement_act(self):
class Incomplete(WorldInterface):
def observe(self):
return PerceptionOutput()
def speak(self, message, target=None):
pass
with pytest.raises(TypeError):
Incomplete()
def test_subclass_must_implement_speak(self):
class Incomplete(WorldInterface):
def observe(self):
return PerceptionOutput()
def act(self, command):
return ActionResult()
with pytest.raises(TypeError):
Incomplete()
def test_complete_subclass_instantiates(self):
class Complete(WorldInterface):
def observe(self):
return PerceptionOutput()
def act(self, command):
return ActionResult()
def speak(self, message, target=None):
pass
adapter = Complete()
assert adapter.is_connected is True # default
assert isinstance(adapter.observe(), PerceptionOutput)
assert isinstance(adapter.act(CommandInput(action="test")), ActionResult)

View File

@@ -0,0 +1,80 @@
"""Tests for the MockWorldAdapter — full observe/act/speak cycle."""
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.types import ActionStatus, CommandInput, PerceptionOutput
class TestMockWorldAdapter:
def test_observe_returns_perception(self):
adapter = MockWorldAdapter(location="Vivec")
perception = adapter.observe()
assert isinstance(perception, PerceptionOutput)
assert perception.location == "Vivec"
assert perception.raw == {"adapter": "mock"}
def test_observe_entities(self):
adapter = MockWorldAdapter(entities=["Jiub", "Silt Strider"])
perception = adapter.observe()
assert perception.entities == ["Jiub", "Silt Strider"]
def test_act_logs_command(self):
adapter = MockWorldAdapter()
cmd = CommandInput(action="move", target="north")
result = adapter.act(cmd)
assert result.status == ActionStatus.SUCCESS
assert "move" in result.message
assert len(adapter.action_log) == 1
assert adapter.action_log[0].command.action == "move"
def test_act_multiple_commands(self):
adapter = MockWorldAdapter()
adapter.act(CommandInput(action="attack"))
adapter.act(CommandInput(action="defend"))
adapter.act(CommandInput(action="retreat"))
assert len(adapter.action_log) == 3
def test_speak_logs_message(self):
adapter = MockWorldAdapter()
adapter.speak("Hello, traveler!")
assert len(adapter.speech_log) == 1
assert adapter.speech_log[0]["message"] == "Hello, traveler!"
assert adapter.speech_log[0]["target"] is None
def test_speak_with_target(self):
adapter = MockWorldAdapter()
adapter.speak("Die, scum!", target="Cliff Racer")
assert adapter.speech_log[0]["target"] == "Cliff Racer"
def test_lifecycle(self):
adapter = MockWorldAdapter()
assert adapter.is_connected is False
adapter.connect()
assert adapter.is_connected is True
adapter.disconnect()
assert adapter.is_connected is False
def test_full_observe_act_speak_cycle(self):
"""Acceptance criterion: full observe/act/speak cycle passes."""
adapter = MockWorldAdapter(
location="Seyda Neen",
entities=["Fargoth", "Hrisskar"],
events=["quest_started"],
)
adapter.connect()
# Observe
perception = adapter.observe()
assert perception.location == "Seyda Neen"
assert len(perception.entities) == 2
assert "quest_started" in perception.events
# Act
result = adapter.act(CommandInput(action="talk", target="Fargoth"))
assert result.status == ActionStatus.SUCCESS
# Speak
adapter.speak("Where is your ring, Fargoth?", target="Fargoth")
assert len(adapter.speech_log) == 1
adapter.disconnect()
assert adapter.is_connected is False

View File

@@ -0,0 +1,68 @@
"""Tests for the adapter registry."""
import pytest
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.registry import AdapterRegistry
class TestAdapterRegistry:
def test_register_and_get(self):
reg = AdapterRegistry()
reg.register("mock", MockWorldAdapter)
adapter = reg.get("mock")
assert isinstance(adapter, MockWorldAdapter)
def test_register_with_kwargs(self):
reg = AdapterRegistry()
reg.register("mock", MockWorldAdapter)
adapter = reg.get("mock", location="Custom Room")
assert adapter._location == "Custom Room"
def test_get_unknown_raises(self):
reg = AdapterRegistry()
with pytest.raises(KeyError):
reg.get("nonexistent")
def test_register_non_subclass_raises(self):
reg = AdapterRegistry()
with pytest.raises(TypeError):
reg.register("bad", dict)
def test_list_adapters(self):
reg = AdapterRegistry()
reg.register("beta", MockWorldAdapter)
reg.register("alpha", MockWorldAdapter)
assert reg.list_adapters() == ["alpha", "beta"]
def test_contains(self):
reg = AdapterRegistry()
reg.register("mock", MockWorldAdapter)
assert "mock" in reg
assert "other" not in reg
def test_len(self):
reg = AdapterRegistry()
assert len(reg) == 0
reg.register("mock", MockWorldAdapter)
assert len(reg) == 1
def test_overwrite_warns(self, caplog):
import logging
reg = AdapterRegistry()
reg.register("mock", MockWorldAdapter)
with caplog.at_level(logging.WARNING):
reg.register("mock", MockWorldAdapter)
assert "Overwriting" in caplog.text
class TestModuleLevelRegistry:
"""Test the convenience functions in infrastructure.world.__init__."""
def test_register_and_get(self):
from infrastructure.world import get_adapter, register_adapter
register_adapter("test_mock", MockWorldAdapter)
adapter = get_adapter("test_mock")
assert isinstance(adapter, MockWorldAdapter)

View File

@@ -0,0 +1,44 @@
"""Tests for the TES3MP stub adapter."""
import pytest
from infrastructure.world.adapters.tes3mp import TES3MPWorldAdapter
from infrastructure.world.types import CommandInput
class TestTES3MPStub:
"""Acceptance criterion: stub imports cleanly and raises NotImplementedError."""
def test_instantiates(self):
adapter = TES3MPWorldAdapter(host="127.0.0.1", port=25565)
assert adapter._host == "127.0.0.1"
assert adapter._port == 25565
def test_is_connected_default_false(self):
adapter = TES3MPWorldAdapter()
assert adapter.is_connected is False
def test_connect_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="connect"):
adapter.connect()
def test_disconnect_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="disconnect"):
adapter.disconnect()
def test_observe_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="observe"):
adapter.observe()
def test_act_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="act"):
adapter.act(CommandInput(action="move"))
def test_speak_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="speak"):
adapter.speak("Hello")

View File

@@ -0,0 +1,176 @@
"""Tests for Heartbeat v2 — WorldInterface-driven cognitive loop.
Acceptance criteria:
- With MockWorldAdapter: heartbeat runs, logs show observe→reason→act→reflect
- Without adapter: existing think_once() behaviour unchanged
- WebSocket broadcasts include current action and reasoning summary
"""
from unittest.mock import AsyncMock, patch
import pytest
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.types import ActionStatus
from loop.heartbeat import CycleRecord, Heartbeat
@pytest.fixture
def mock_adapter():
adapter = MockWorldAdapter(
location="Balmora",
entities=["Guard", "Merchant"],
events=["player_entered"],
)
adapter.connect()
return adapter
class TestHeartbeatWithAdapter:
"""With MockWorldAdapter: heartbeat runs full embodied cycle."""
@pytest.mark.asyncio
async def test_run_once_returns_cycle_record(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
assert isinstance(record, CycleRecord)
assert record.cycle_id == 1
@pytest.mark.asyncio
async def test_observation_populated(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
assert record.observation["location"] == "Balmora"
assert "Guard" in record.observation["entities"]
assert "player_entered" in record.observation["events"]
@pytest.mark.asyncio
async def test_action_dispatched_to_world(self, mock_adapter):
"""Act phase should dispatch to world.act() for non-idle actions."""
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
# The default loop phases don't set an explicit action, so it
# falls through to "idle" → NOOP. That's correct behaviour —
# the real LLM-powered reason phase will set action metadata.
assert record.action_status in (
ActionStatus.NOOP.value,
ActionStatus.SUCCESS.value,
)
@pytest.mark.asyncio
async def test_reflect_notes_present(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
assert "Balmora" in record.reflect_notes
@pytest.mark.asyncio
async def test_cycle_count_increments(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
await hb.run_once()
await hb.run_once()
assert hb.cycle_count == 2
assert len(hb.history) == 2
@pytest.mark.asyncio
async def test_duration_recorded(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
assert record.duration_ms >= 0
@pytest.mark.asyncio
async def test_on_cycle_callback(self, mock_adapter):
received = []
async def callback(record):
received.append(record)
hb = Heartbeat(world=mock_adapter, on_cycle=callback)
await hb.run_once()
assert len(received) == 1
assert received[0].cycle_id == 1
class TestHeartbeatWithoutAdapter:
"""Without adapter: existing think_once() behaviour unchanged."""
@pytest.mark.asyncio
async def test_passive_cycle(self):
hb = Heartbeat(world=None)
record = await hb.run_once()
assert record.action_taken == "think"
assert record.action_status == "noop"
assert "Passive" in record.reflect_notes
@pytest.mark.asyncio
async def test_passive_no_observation(self):
hb = Heartbeat(world=None)
record = await hb.run_once()
assert record.observation == {}
class TestHeartbeatLifecycle:
def test_interval_property(self):
hb = Heartbeat(interval=60.0)
assert hb.interval == 60.0
hb.interval = 10.0
assert hb.interval == 10.0
def test_interval_minimum(self):
hb = Heartbeat()
hb.interval = 0.1
assert hb.interval == 1.0
def test_world_property(self):
hb = Heartbeat()
assert hb.world is None
adapter = MockWorldAdapter()
hb.world = adapter
assert hb.world is adapter
def test_stop_sets_flag(self):
hb = Heartbeat()
assert not hb.is_running
hb.stop()
assert not hb.is_running
class TestHeartbeatBroadcast:
"""WebSocket broadcasts include action and reasoning summary."""
@pytest.mark.asyncio
async def test_broadcast_called(self, mock_adapter):
with patch(
"loop.heartbeat.ws_manager",
create=True,
) as mock_ws:
mock_ws.broadcast = AsyncMock()
# Patch the import inside heartbeat
with patch("infrastructure.ws_manager.handler.ws_manager") as ws_mod:
ws_mod.broadcast = AsyncMock()
hb = Heartbeat(world=mock_adapter)
await hb.run_once()
ws_mod.broadcast.assert_called_once()
call_args = ws_mod.broadcast.call_args
assert call_args[0][0] == "heartbeat.cycle"
data = call_args[0][1]
assert "action" in data
assert "reasoning_summary" in data
assert "observation" in data
class TestHeartbeatLog:
"""Verify logging of observe→reason→act→reflect cycle."""
@pytest.mark.asyncio
async def test_embodied_cycle_logs(self, mock_adapter, caplog):
import logging
with caplog.at_level(logging.INFO):
hb = Heartbeat(world=mock_adapter)
await hb.run_once()
messages = caplog.text
assert "Phase 1 (Gather)" in messages
assert "Phase 2 (Reason)" in messages
assert "Phase 3 (Act)" in messages
assert "Heartbeat cycle #1 complete" in messages

View File

@@ -0,0 +1,97 @@
"""Tests for load_queue corrupt JSON handling in loop_guard.py."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
import scripts.loop_guard as lg
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
"""Redirect loop_guard paths to tmp_path for isolation."""
monkeypatch.setattr(lg, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(lg, "IDLE_STATE_FILE", tmp_path / "idle_state.json")
monkeypatch.setattr(lg, "CYCLE_RESULT_FILE", tmp_path / "cycle_result.json")
monkeypatch.setattr(lg, "GITEA_API", "http://test:3000/api/v1")
monkeypatch.setattr(lg, "REPO_SLUG", "owner/repo")
def test_load_queue_missing_file(tmp_path):
"""Missing queue file returns empty list."""
result = lg.load_queue()
assert result == []
def test_load_queue_valid_data(tmp_path):
"""Valid queue.json returns ready items."""
data = [
{"issue": 1, "title": "Ready issue", "ready": True},
{"issue": 2, "title": "Not ready", "ready": False},
]
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps(data, indent=2))
result = lg.load_queue()
assert len(result) == 1
assert result[0]["issue"] == 1
def test_load_queue_corrupt_json_logs_warning(tmp_path, capsys):
"""Corrupt queue.json returns empty list and logs warning."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text("not valid json {{{")
result = lg.load_queue()
assert result == []
captured = capsys.readouterr()
assert "WARNING" in captured.out
assert "Corrupt queue.json" in captured.out
def test_load_queue_not_a_list(tmp_path):
"""Queue.json that is not a list returns empty list."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps({"not": "a list"}))
result = lg.load_queue()
assert result == []
def test_load_queue_no_ready_items(tmp_path):
"""Queue with no ready items returns empty list."""
data = [
{"issue": 1, "title": "Not ready 1", "ready": False},
{"issue": 2, "title": "Not ready 2", "ready": False},
]
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps(data, indent=2))
result = lg.load_queue()
assert result == []
def test_load_queue_oserror_logs_warning(tmp_path, monkeypatch, capsys):
"""OSError when reading queue.json returns empty list and logs warning."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text("[]")
# Mock Path.read_text to raise OSError
original_read_text = Path.read_text
def mock_read_text(self, *args, **kwargs):
if self.name == "queue.json":
raise OSError("Permission denied")
return original_read_text(self, *args, **kwargs)
monkeypatch.setattr(Path, "read_text", mock_read_text)
result = lg.load_queue()
assert result == []
captured = capsys.readouterr()
assert "WARNING" in captured.out
assert "Cannot read queue.json" in captured.out

View File

@@ -0,0 +1,159 @@
"""Tests for queue.json validation and backup in triage_score.py."""
from __future__ import annotations
import json
import pytest
import scripts.triage_score as ts
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
"""Redirect triage_score paths to tmp_path for isolation."""
monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(ts, "QUEUE_BACKUP_FILE", tmp_path / "queue.json.bak")
monkeypatch.setattr(ts, "RETRO_FILE", tmp_path / "retro" / "triage.jsonl")
monkeypatch.setattr(ts, "QUARANTINE_FILE", tmp_path / "quarantine.json")
monkeypatch.setattr(ts, "CYCLE_RETRO_FILE", tmp_path / "retro" / "cycles.jsonl")
def test_backup_created_on_write(tmp_path):
"""When writing queue.json, a backup should be created from previous valid file."""
# Create initial valid queue file
initial_data = [{"issue": 1, "title": "Test", "ready": True}]
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text(json.dumps(initial_data))
# Write new data
new_data = [{"issue": 2, "title": "New", "ready": True}]
ts.QUEUE_FILE.write_text(json.dumps(new_data, indent=2) + "\n")
# Manually run the backup logic as run_triage would
if ts.QUEUE_FILE.exists():
try:
json.loads(ts.QUEUE_FILE.read_text())
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
# Both files should exist with same content
assert ts.QUEUE_BACKUP_FILE.exists()
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == new_data
def test_corrupt_queue_restored_from_backup(tmp_path, capsys):
"""If queue.json is corrupt, it should be restored from backup."""
# Create a valid backup
valid_data = [{"issue": 1, "title": "Backup", "ready": True}]
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_data, indent=2) + "\n")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data) # Validate backup
ts.QUEUE_FILE.write_text(backup_data)
print("[triage] Restored queue.json from backup")
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Queue should be restored from backup
assert json.loads(ts.QUEUE_FILE.read_text()) == valid_data
captured = capsys.readouterr()
assert "Restored queue.json from backup" in captured.out
def test_corrupt_queue_no_backup_writes_empty_list(tmp_path):
"""If queue.json is corrupt and no backup exists, write empty list."""
# Ensure no backup exists
assert not ts.QUEUE_BACKUP_FILE.exists()
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data)
ts.QUEUE_FILE.write_text(backup_data)
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Should have empty list
assert json.loads(ts.QUEUE_FILE.read_text()) == []
def test_corrupt_backup_writes_empty_list(tmp_path):
"""If both queue.json and backup are corrupt, write empty list."""
# Create a corrupt backup
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text("also corrupt backup")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data)
ts.QUEUE_FILE.write_text(backup_data)
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Should have empty list
assert json.loads(ts.QUEUE_FILE.read_text()) == []
def test_valid_queue_not_corrupt_no_backup_overwrite(tmp_path):
"""Don't overwrite backup if current queue.json is corrupt."""
# Create a valid backup
valid_backup = [{"issue": 99, "title": "Old Backup", "ready": True}]
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_backup, indent=2) + "\n")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("corrupt data")
# Try to save backup (should skip because current is corrupt)
if ts.QUEUE_FILE.exists():
try:
json.loads(ts.QUEUE_FILE.read_text()) # This will fail
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass # Should hit this branch
# Backup should still have original valid data
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == valid_backup
def test_backup_path_configuration():
"""Ensure backup file path is properly configured relative to queue file."""
assert ts.QUEUE_BACKUP_FILE.parent == ts.QUEUE_FILE.parent
assert ts.QUEUE_BACKUP_FILE.name == "queue.json.bak"
assert ts.QUEUE_FILE.name == "queue.json"

497
tests/unit/test_research.py Normal file
View File

@@ -0,0 +1,497 @@
"""Unit tests for timmy.research — ResearchOrchestrator pipeline."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.research import (
DEFAULT_QUERIES_PER_TOPIC,
MemoryInterface,
ResearchOrchestrator,
ResearchResult,
ResearchTools,
SearchSnippet,
_extract_action_items,
)
# ── Data structures ──────────────────────────────────────────────────────────
class TestResearchResult:
def test_defaults(self):
r = ResearchResult(topic="test", report="content")
assert r.topic == "test"
assert r.report == "content"
assert r.cache_hit is False
assert r.queries_generated == []
assert r.sources == []
assert r.action_items == []
assert r.duration_ms == 0.0
assert r.timestamp # non-empty
def test_with_data(self):
r = ResearchResult(
topic="AI",
report="report text",
queries_generated=["q1", "q2"],
sources=[{"url": "http://example.com", "title": "Test"}],
action_items=["Do X"],
cache_hit=True,
duration_ms=42.5,
)
assert r.cache_hit is True
assert len(r.sources) == 1
assert r.duration_ms == 42.5
class TestSearchSnippet:
def test_fields(self):
s = SearchSnippet(title="T", url="http://x.com", snippet="text")
assert s.relevance == 0.0
# ── _extract_action_items ────────────────────────────────────────────────────
class TestExtractActionItems:
def test_action_prefix(self):
report = "Some text\nACTION: Do the thing\nMore text"
items = _extract_action_items(report)
assert items == ["Do the thing"]
def test_todo_prefix(self):
report = "TODO: Fix the bug\nTodo: Also this"
items = _extract_action_items(report)
assert items == ["Fix the bug", "Also this"]
def test_checkbox(self):
report = "- [ ] Implement feature\n- [x] Already done"
items = _extract_action_items(report)
assert items == ["Implement feature"]
def test_mixed(self):
report = "ACTION: First\n- [ ] Second\nTODO: Third"
items = _extract_action_items(report)
assert items == ["First", "Second", "Third"]
def test_empty(self):
assert _extract_action_items("No actions here") == []
assert _extract_action_items("") == []
# ── MemoryInterface ──────────────────────────────────────────────────────────
class TestMemoryInterface:
def test_custom_fns(self):
search = MagicMock(return_value=[])
store = MagicMock()
mi = MemoryInterface(search_fn=search, store_fn=store)
assert mi.search_fn is search
assert mi.store_fn is store
def test_defaults_when_import_fails(self):
with patch.dict("sys.modules", {"timmy.memory_system": None}):
mi = MemoryInterface()
# Should have fallback callables
assert callable(mi.search_fn)
assert callable(mi.store_fn)
# Fallback search returns empty
assert mi.search_fn("test") == []
# ── ResearchOrchestrator ─────────────────────────────────────────────────────
def _make_cascade(**overrides):
"""Create a mock cascade router."""
cascade = AsyncMock()
cascade.complete = AsyncMock(
return_value={"content": overrides.get("content", "query1\nquery2\nquery3")}
)
return cascade
def _make_memory(search_results=None, score=0.0):
"""Create a mock memory interface."""
if search_results is None:
search_results = []
search_fn = MagicMock(return_value=search_results)
store_fn = MagicMock()
return MemoryInterface(search_fn=search_fn, store_fn=store_fn)
def _make_tools(search_results=None, fetch_content="Page content"):
"""Create mock research tools."""
web_search = MagicMock(
return_value=search_results
or [
{"title": "Result 1", "url": "http://a.com", "snippet": "Snippet 1"},
{"title": "Result 2", "url": "http://b.com", "snippet": "Snippet 2"},
]
)
web_fetch = MagicMock(return_value=fetch_content)
return ResearchTools(web_search=web_search, web_fetch=web_fetch)
class TestResearchOrchestratorInit:
def test_basic_init(self):
cascade = _make_cascade()
memory = _make_memory()
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
assert orch.cascade is cascade
assert orch.memory is memory
assert orch.tools is tools
assert orch._metrics["research_cache_hit"] == 0
assert orch._metrics["research_api_call"] == 0
class TestCheckLocalKnowledge:
@pytest.mark.asyncio
async def test_cache_hit(self):
"""High-confidence memory result returns cached ResearchResult."""
entry = MagicMock()
entry.relevance_score = 0.90
entry.content = "Cached report"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is not None
assert result.cache_hit is True
assert result.report == "Cached report"
@pytest.mark.asyncio
async def test_cache_miss_low_score(self):
"""Low-confidence result returns None."""
entry = MagicMock()
entry.relevance_score = 0.5
entry.content = "Weak match"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
@pytest.mark.asyncio
async def test_cache_miss_empty(self):
"""No memory results returns None."""
memory = _make_memory(search_results=[])
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
@pytest.mark.asyncio
async def test_exception_returns_none(self):
"""Memory search exception returns None gracefully."""
memory = MemoryInterface(
search_fn=MagicMock(side_effect=RuntimeError("db error")),
store_fn=MagicMock(),
)
cascade = _make_cascade()
orch = ResearchOrchestrator(cascade=cascade, memory=memory)
result = await orch._check_local_knowledge("test topic")
assert result is None
class TestGenerateQueries:
@pytest.mark.asyncio
async def test_parses_queries(self):
cascade = _make_cascade(content="query one\nquery two\nquery three")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("AI safety", None, None)
assert queries == ["query one", "query two", "query three"]
@pytest.mark.asyncio
async def test_strips_numbering(self):
cascade = _make_cascade(content="1. First query\n2. Second query\n3) Third")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("topic", None, None)
assert "First query" in queries
assert "Second query" in queries
assert "Third" in queries
@pytest.mark.asyncio
async def test_fallback_on_error(self):
cascade = AsyncMock()
cascade.complete = AsyncMock(side_effect=RuntimeError("LLM down"))
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
queries = await orch._generate_queries("fallback topic", None, None)
assert queries == ["fallback topic"]
@pytest.mark.asyncio
async def test_passes_cascade_tier(self):
cascade = _make_cascade(content="q1\nq2")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
await orch._generate_queries("topic", None, "gpt-4")
call_kwargs = cascade.complete.call_args.kwargs
assert call_kwargs.get("model") == "gpt-4"
class TestSearch:
@pytest.mark.asyncio
async def test_collects_snippets(self):
tools = _make_tools()
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1", "q2"])
# 2 results per query, 2 queries, but deduplicated by URL
assert len(snippets) == 2 # same URLs returned for both queries
@pytest.mark.asyncio
async def test_no_search_tool(self):
tools = ResearchTools(web_search=None)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1"])
assert snippets == []
@pytest.mark.asyncio
async def test_search_error_handled(self):
tools = ResearchTools(
web_search=MagicMock(side_effect=RuntimeError("network error"))
)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = await orch._search(["q1"])
assert snippets == []
class TestFetch:
@pytest.mark.asyncio
async def test_fetches_pages(self):
tools = _make_tools(fetch_content="Page body here")
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
snippets = [
SearchSnippet(title="P1", url="http://a.com", snippet="s1"),
SearchSnippet(title="P2", url="http://b.com", snippet="s2"),
]
pages = await orch._fetch(snippets)
assert len(pages) == 2
assert pages[0].content == "Page body here"
@pytest.mark.asyncio
async def test_no_fetch_tool(self):
tools = ResearchTools(web_fetch=None)
orch = ResearchOrchestrator(
cascade=_make_cascade(), memory=_make_memory(), tools=tools
)
pages = await orch._fetch([SearchSnippet("T", "http://x.com", "s")])
assert pages == []
class TestSynthesize:
@pytest.mark.asyncio
async def test_produces_report(self):
cascade = _make_cascade(content="# Report\nKey findings here")
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
from timmy.research import FetchedPage
pages = [FetchedPage(url="http://x.com", title="X", content="content")]
report = await orch._synthesize("topic", None, pages, None)
assert "Report" in report
@pytest.mark.asyncio
async def test_fallback_on_error(self):
cascade = AsyncMock()
cascade.complete = AsyncMock(side_effect=RuntimeError("LLM error"))
orch = ResearchOrchestrator(cascade=cascade, memory=_make_memory())
from timmy.research import FetchedPage
pages = [FetchedPage(url="http://x.com", title="X", content="content")]
report = await orch._synthesize("topic", None, pages, None)
assert "Synthesis failed" in report
assert "topic" in report
class TestCrystallize:
@pytest.mark.asyncio
async def test_stores_in_memory(self):
memory = _make_memory()
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=memory)
result = ResearchResult(topic="test", report="report text")
await orch._crystallize("test", result)
memory.store_fn.assert_called_once()
call_kwargs = memory.store_fn.call_args
assert call_kwargs.kwargs.get("context_type") == "research"
assert call_kwargs.kwargs.get("source") == "research_orchestrator"
@pytest.mark.asyncio
async def test_store_error_handled(self):
memory = MemoryInterface(
search_fn=MagicMock(return_value=[]),
store_fn=MagicMock(side_effect=RuntimeError("db error")),
)
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=memory)
result = ResearchResult(topic="test", report="report")
# Should not raise
await orch._crystallize("test", result)
class TestWriteArtifact:
@pytest.mark.asyncio
async def test_no_action_items_skips(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
result = ResearchResult(topic="test", report="r", action_items=[])
# Should complete without any calls
await orch._write_artifact(result)
@pytest.mark.asyncio
async def test_creates_issues(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
result = ResearchResult(
topic="test", report="r", action_items=["Fix the thing"]
)
with patch("timmy.research._create_gitea_issues") as mock_create:
await orch._write_artifact(result)
mock_create.assert_called_once_with(result)
class TestFullPipeline:
@pytest.mark.asyncio
async def test_cache_hit_short_circuits(self):
"""When memory has a high-confidence match, skip web search."""
entry = MagicMock()
entry.relevance_score = 0.95
entry.content = "Previously researched content"
memory = _make_memory(search_results=[entry])
cascade = _make_cascade()
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
result = await orch.run("cached topic")
assert result.cache_hit is True
assert result.report == "Previously researched content"
# Cascade should NOT have been called (no query generation or synthesis)
cascade.complete.assert_not_called()
assert orch._metrics["research_cache_hit"] == 1
@pytest.mark.asyncio
async def test_full_pipeline_no_tools(self):
"""Pipeline completes even without web tools (graceful degradation)."""
memory = _make_memory()
cascade = AsyncMock()
# First call: generate queries, second: synthesize
cascade.complete = AsyncMock(
side_effect=[
{"content": "query 1\nquery 2"},
{"content": "# Report\nACTION: Do something"},
]
)
tools = ResearchTools() # No web tools
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
with patch("timmy.research._create_gitea_issues"):
result = await orch.run("test topic")
assert result.topic == "test topic"
assert result.cache_hit is False
assert "Report" in result.report
assert result.action_items == ["Do something"]
assert result.duration_ms > 0
assert orch._metrics["research_api_call"] == 1
memory.store_fn.assert_called_once()
@pytest.mark.asyncio
async def test_full_pipeline_with_tools(self):
"""Full pipeline with search and fetch tools."""
memory = _make_memory()
cascade = AsyncMock()
cascade.complete = AsyncMock(
side_effect=[
{"content": "search query 1\nsearch query 2"},
{"content": "# Full Report\nTODO: Review findings"},
]
)
tools = _make_tools()
orch = ResearchOrchestrator(cascade=cascade, memory=memory, tools=tools)
with patch("timmy.research._create_gitea_issues"):
result = await orch.run("test topic")
assert result.topic == "test topic"
assert result.cache_hit is False
assert len(result.queries_generated) == 2
assert len(result.sources) > 0
assert result.action_items == ["Review findings"]
@pytest.mark.asyncio
async def test_get_metrics(self):
orch = ResearchOrchestrator(cascade=_make_cascade(), memory=_make_memory())
metrics = orch.get_metrics()
assert "research_cache_hit" in metrics
assert "research_api_call" in metrics
class TestCreateGiteaIssues:
def test_no_token_skips(self):
"""No Gitea token configured — silently skips."""
from timmy.research import _create_gitea_issues
result = ResearchResult(
topic="t", report="r", action_items=["item"]
)
mock_settings = MagicMock()
mock_settings.gitea_token = ""
mock_settings.gitea_url = ""
with patch("timmy.research.settings", mock_settings):
# Should not raise
_create_gitea_issues(result)
def test_creates_issue_on_success(self):
from timmy.research import _create_gitea_issues
result = ResearchResult(
topic="AI", report="r", action_items=["Deploy model"]
)
mock_settings = MagicMock()
mock_settings.gitea_token = "tok"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
mock_resp = MagicMock()
mock_resp.status_code = 201
mock_requests_mod = MagicMock()
mock_requests_mod.post.return_value = mock_resp
with (
patch("timmy.research.settings", mock_settings),
patch.dict("sys.modules", {"requests": mock_requests_mod}),
):
_create_gitea_issues(result)
mock_requests_mod.post.assert_called_once()
call_kwargs = mock_requests_mod.post.call_args
assert "[research]" in call_kwargs.kwargs["json"]["title"]