[loop-cycle-1] fix: update GITEA_API default from localhost to VPS IP (#1177) #1180
@@ -30,7 +30,7 @@ IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
|
||||
CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json"
|
||||
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
|
||||
|
||||
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
|
||||
GITEA_API = os.environ.get("GITEA_API", "http://143.198.27.163:3000/api/v1")
|
||||
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
|
||||
|
||||
# Default cycle duration in seconds (5 min); stale threshold = 2× this
|
||||
|
||||
56
scripts/post_lhf_issues.py
Normal file
56
scripts/post_lhf_issues.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import json
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import os
|
||||
|
||||
BASE_URL = "http://143.198.27.163:3000/api/v1"
|
||||
|
||||
issues = [
|
||||
{"title": "LHF: Fix 4 broken tests in test_setup_script.py", "body": "Add @pytest.mark.skip_ci or mock fixtures to stop these environment-specific script tests from failing CI."},
|
||||
{"title": "LHF: Fix xdist and coverage conflict in pyproject.toml", "body": "The -n auto --dist worksteal arguments conflict with --cov flags during make test-cov. Reposition these flags so xdist and coverage play nicely."},
|
||||
{"title": "LHF: Separate tox unit and integration environments", "body": "They currently alias the same command. Ensure `tox -e unit` uses `-m unit` and `tox -e integration` uses `-m integration`."},
|
||||
{"title": "LHF: Add duration and coverage threshold strictness to pytest", "body": "Add `--durations=10` and `--cov-fail-under=60` directly to the tool.pytest.ini_options addopts or CI pipeline."},
|
||||
{"title": "LHF: Enforce coverage threshold in CI workflow", "body": "Update .github/workflows/tests.yml to fail if coverage drops below the 60% floor threshold."},
|
||||
{"title": "LHF: Extract hardcoded PRAGMA busy_timeout=5000", "body": "Move the SQLite busy_timeout hardcode to pydantic-settings config.py for better environment control."},
|
||||
{"title": "LHF: Extract hardcoded sats limit in consult_grok()", "body": "The hardcoded sats limit for the grok L402 proxy should be controlled via config.py environment variables."},
|
||||
{"title": "LHF: Remove bare pass clauses in src/timmy/tools.py", "body": "Logged exceptions should not be followed by bare `pass` clauses if they silently swallow critical tool errors. Refactor to return an error string or raise gracefully."},
|
||||
{"title": "LHF: Add docstrings to src/dashboard/routes/tasks.py", "body": "Add proper module-level and function-level docstrings to all public methods."},
|
||||
{"title": "LHF: Add docstrings to src/dashboard/routes/system.py", "body": "Add proper module-level and function-level docstrings to the system configuration endpoints."},
|
||||
{"title": "LHF: Add docstrings to VoiceTTS setter methods", "body": "Document `set_rate()`, `set_volume()`, and `set_voice()` parameters and bounds."},
|
||||
{"title": "LHF: DRY up tasks_pending/active/completed in tasks.py", "body": "Refactor and extract the shared filtering logic for these three similar list-filtering functions."},
|
||||
{"title": "LHF: Add error handling for missing DB in tasks.py", "body": "If swarm.db is locked or missing, tasks.py endpoints currently crash. Add a try/except pattern matching the graceful degradation specs."},
|
||||
{"title": "LHF: Write unit tests for db_pool.py", "body": "The SQLite connection pool infrastructure needs dedicated unit tests ensuring that connections do not leak and pragmas are applied."},
|
||||
{"title": "LHF: Write unit tests for health.py", "body": "The health check route needs tests to ensure it correctly aggregates subsystem states (Ollama, Redis, DB) without blocking the event loop."}
|
||||
]
|
||||
|
||||
def main():
|
||||
token_path = os.path.join(os.getcwd(), ".antigravity_gitea_token")
|
||||
if not os.path.exists(token_path):
|
||||
print("Missing token.")
|
||||
return
|
||||
|
||||
with open(token_path, "r") as f:
|
||||
token = f.read().strip()
|
||||
|
||||
repo_owner = "rockachopa"
|
||||
repo_name = "Timmy-time-dashboard"
|
||||
count = 0
|
||||
for i, issue in enumerate(issues):
|
||||
print(f"Creating LHF issue {i+1}: {issue['title']}")
|
||||
url = f"{BASE_URL}/repos/{repo_owner}/{repo_name}/issues"
|
||||
payload = json.dumps(issue).encode("utf-8")
|
||||
req = urllib.request.Request(url, data=payload, method="POST")
|
||||
req.add_header("Authorization", f"token {token}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
try:
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
if resp.status == 201:
|
||||
count += 1
|
||||
print(f" -> Success")
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f" -> Failed: {e.code} {e.read().decode('utf-8')}")
|
||||
|
||||
print(f"Created {count}/{len(issues)} LHF issues.")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -6,7 +6,7 @@ writes a ranked queue to .loop/queue.json. No LLM calls — pure heuristics.
|
||||
|
||||
Run: python3 scripts/triage_score.py
|
||||
Env: GITEA_TOKEN (or reads ~/.hermes/gitea_token)
|
||||
GITEA_API (default: http://localhost:3000/api/v1)
|
||||
GITEA_API (default: http://143.198.27.163:3000/api/v1)
|
||||
REPO_SLUG (default: rockachopa/Timmy-time-dashboard)
|
||||
"""
|
||||
|
||||
@@ -20,7 +20,7 @@ from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
# ── Config ──────────────────────────────────────────────────────────────
|
||||
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
|
||||
GITEA_API = os.environ.get("GITEA_API", "http://143.198.27.163:3000/api/v1")
|
||||
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
|
||||
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
|
||||
@@ -56,11 +56,13 @@ async def self_modify_queue(request: Request):
|
||||
|
||||
@router.get("/swarm/mission-control", response_class=HTMLResponse)
|
||||
async def mission_control(request: Request):
|
||||
"""Render the primary swarm mission control terminal."""
|
||||
return templates.TemplateResponse(request, "mission_control.html", {})
|
||||
|
||||
|
||||
@router.get("/bugs", response_class=HTMLResponse)
|
||||
async def bugs_page(request: Request):
|
||||
"""Render the systemic bugs and issue tracking page."""
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"bugs.html",
|
||||
@@ -75,16 +77,19 @@ async def bugs_page(request: Request):
|
||||
|
||||
@router.get("/self-coding", response_class=HTMLResponse)
|
||||
async def self_coding(request: Request):
|
||||
"""Render the self-coding and modifications statistics page."""
|
||||
return templates.TemplateResponse(request, "self_coding.html", {"stats": {}})
|
||||
|
||||
|
||||
@router.get("/hands", response_class=HTMLResponse)
|
||||
async def hands_page(request: Request):
|
||||
"""Render the physical 'hands' tracking page for environment-interacting agents."""
|
||||
return templates.TemplateResponse(request, "hands.html", {"executions": []})
|
||||
|
||||
|
||||
@router.get("/creative/ui", response_class=HTMLResponse)
|
||||
async def creative_ui(request: Request):
|
||||
"""Render the creative/producer studio UI for image and media generation."""
|
||||
return templates.TemplateResponse(request, "creative.html", {})
|
||||
|
||||
|
||||
|
||||
@@ -104,25 +104,29 @@ class _TaskView:
|
||||
@router.get("/tasks", response_class=HTMLResponse)
|
||||
async def tasks_page(request: Request):
|
||||
"""Render the main task queue page with 3-column layout."""
|
||||
with _get_db() as db:
|
||||
pending = [
|
||||
_TaskView(_row_to_dict(r))
|
||||
for r in db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('pending_approval') ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
]
|
||||
active = [
|
||||
_TaskView(_row_to_dict(r))
|
||||
for r in db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
]
|
||||
completed = [
|
||||
_TaskView(_row_to_dict(r))
|
||||
for r in db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
|
||||
).fetchall()
|
||||
]
|
||||
pending, active, completed = [], [], []
|
||||
try:
|
||||
with _get_db() as db:
|
||||
pending = [
|
||||
_TaskView(_row_to_dict(r))
|
||||
for r in db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('pending_approval') ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
]
|
||||
active = [
|
||||
_TaskView(_row_to_dict(r))
|
||||
for r in db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
]
|
||||
completed = [
|
||||
_TaskView(_row_to_dict(r))
|
||||
for r in db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
|
||||
).fetchall()
|
||||
]
|
||||
except sqlite3.Error as e:
|
||||
logger.error("Database error rendering tasks_page: %s", e)
|
||||
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
@@ -143,61 +147,45 @@ async def tasks_page(request: Request):
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.get("/tasks/pending", response_class=HTMLResponse)
|
||||
async def tasks_pending(request: Request):
|
||||
with _get_db() as db:
|
||||
rows = db.execute(
|
||||
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
|
||||
parts = []
|
||||
for task in tasks:
|
||||
parts.append(
|
||||
def _render_task_list(request: Request, query: str, empty_message: str) -> HTMLResponse:
|
||||
"""Helper to fetch tasks from DB and render HTML partials safely, handling DB errors."""
|
||||
try:
|
||||
with _get_db() as db:
|
||||
rows = db.execute(query).fetchall()
|
||||
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
|
||||
if not tasks:
|
||||
return HTMLResponse(f'<div class="empty-column">{empty_message}</div>')
|
||||
parts = [
|
||||
templates.TemplateResponse(
|
||||
request, "partials/task_card.html", {"task": task}
|
||||
).body.decode()
|
||||
)
|
||||
if not parts:
|
||||
return HTMLResponse('<div class="empty-column">No pending tasks</div>')
|
||||
return HTMLResponse("".join(parts))
|
||||
for task in tasks
|
||||
]
|
||||
return HTMLResponse("".join(parts))
|
||||
except sqlite3.Error as e:
|
||||
logger.error("Database error fetching tasks: %s", e)
|
||||
return HTMLResponse('<div class="empty-column error">Database unavailable</div>')
|
||||
|
||||
|
||||
@router.get("/tasks/pending", response_class=HTMLResponse)
|
||||
async def tasks_pending(request: Request):
|
||||
"""HTMX partial rendering the list of pending tasks."""
|
||||
query = "SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
|
||||
return _render_task_list(request, query, "No pending tasks")
|
||||
|
||||
|
||||
@router.get("/tasks/active", response_class=HTMLResponse)
|
||||
async def tasks_active(request: Request):
|
||||
with _get_db() as db:
|
||||
rows = db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
|
||||
parts = []
|
||||
for task in tasks:
|
||||
parts.append(
|
||||
templates.TemplateResponse(
|
||||
request, "partials/task_card.html", {"task": task}
|
||||
).body.decode()
|
||||
)
|
||||
if not parts:
|
||||
return HTMLResponse('<div class="empty-column">No active tasks</div>')
|
||||
return HTMLResponse("".join(parts))
|
||||
"""HTMX partial rendering the list of active tasks."""
|
||||
query = "SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
|
||||
return _render_task_list(request, query, "No active tasks")
|
||||
|
||||
|
||||
@router.get("/tasks/completed", response_class=HTMLResponse)
|
||||
async def tasks_completed(request: Request):
|
||||
with _get_db() as db:
|
||||
rows = db.execute(
|
||||
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
|
||||
).fetchall()
|
||||
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
|
||||
parts = []
|
||||
for task in tasks:
|
||||
parts.append(
|
||||
templates.TemplateResponse(
|
||||
request, "partials/task_card.html", {"task": task}
|
||||
).body.decode()
|
||||
)
|
||||
if not parts:
|
||||
return HTMLResponse('<div class="empty-column">No completed tasks yet</div>')
|
||||
return HTMLResponse("".join(parts))
|
||||
"""HTMX partial rendering the list of completed tasks."""
|
||||
query = "SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
|
||||
return _render_task_list(request, query, "No completed tasks yet")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -241,26 +229,31 @@ async def create_task_form(
|
||||
|
||||
@router.post("/tasks/{task_id}/approve", response_class=HTMLResponse)
|
||||
async def approve_task(request: Request, task_id: str):
|
||||
"""Approve a task for execution."""
|
||||
return await _set_status(request, task_id, "approved")
|
||||
|
||||
|
||||
@router.post("/tasks/{task_id}/veto", response_class=HTMLResponse)
|
||||
async def veto_task(request: Request, task_id: str):
|
||||
"""Veto a task to prevent execution."""
|
||||
return await _set_status(request, task_id, "vetoed")
|
||||
|
||||
|
||||
@router.post("/tasks/{task_id}/pause", response_class=HTMLResponse)
|
||||
async def pause_task(request: Request, task_id: str):
|
||||
"""Pause an active task."""
|
||||
return await _set_status(request, task_id, "paused")
|
||||
|
||||
|
||||
@router.post("/tasks/{task_id}/cancel", response_class=HTMLResponse)
|
||||
async def cancel_task(request: Request, task_id: str):
|
||||
"""Cancel a task, moving it to vetoed state."""
|
||||
return await _set_status(request, task_id, "vetoed")
|
||||
|
||||
|
||||
@router.post("/tasks/{task_id}/retry", response_class=HTMLResponse)
|
||||
async def retry_task(request: Request, task_id: str):
|
||||
"""Retry a failed or completed task by re-approving it."""
|
||||
return await _set_status(request, task_id, "approved")
|
||||
|
||||
|
||||
@@ -271,6 +264,7 @@ async def modify_task(
|
||||
title: str = Form(...),
|
||||
description: str = Form(""),
|
||||
):
|
||||
"""Modify the title and/or description of a specific task."""
|
||||
with _get_db() as db:
|
||||
db.execute(
|
||||
"UPDATE tasks SET title=?, description=? WHERE id=?",
|
||||
|
||||
@@ -16,6 +16,8 @@ from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -102,7 +104,7 @@ class EventBus:
|
||||
self._persistence_db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with closing(sqlite3.connect(str(self._persistence_db_path))) as conn:
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA busy_timeout=5000")
|
||||
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
|
||||
conn.executescript(_EVENTS_SCHEMA)
|
||||
conn.commit()
|
||||
|
||||
@@ -114,7 +116,7 @@ class EventBus:
|
||||
return
|
||||
with closing(sqlite3.connect(str(self._persistence_db_path))) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA busy_timeout=5000")
|
||||
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
|
||||
yield conn
|
||||
|
||||
def _persist_event(self, event: Event) -> None:
|
||||
|
||||
@@ -18,6 +18,8 @@ from datetime import UTC, datetime
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DB_PATH = Path("data/swarm.db")
|
||||
@@ -68,7 +70,7 @@ def _get_conn() -> Generator[sqlite3.Connection, None, None]:
|
||||
with closing(sqlite3.connect(str(DB_PATH))) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA busy_timeout=5000")
|
||||
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS custom_models (
|
||||
name TEXT PRIMARY KEY,
|
||||
|
||||
@@ -22,6 +22,8 @@ from dataclasses import dataclass
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DB_PATH = Path("data/spark.db")
|
||||
@@ -47,7 +49,7 @@ def _get_conn() -> Generator[sqlite3.Connection, None, None]:
|
||||
with closing(sqlite3.connect(str(DB_PATH))) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA busy_timeout=5000")
|
||||
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS spark_predictions (
|
||||
id TEXT PRIMARY KEY,
|
||||
|
||||
@@ -19,6 +19,8 @@ from dataclasses import dataclass
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DB_PATH = Path("data/spark.db")
|
||||
@@ -63,7 +65,7 @@ def _get_conn() -> Generator[sqlite3.Connection, None, None]:
|
||||
with closing(sqlite3.connect(str(DB_PATH))) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA busy_timeout=5000")
|
||||
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS spark_events (
|
||||
id TEXT PRIMARY KEY,
|
||||
|
||||
@@ -68,11 +68,13 @@ class VoiceTTS:
|
||||
logger.error("VoiceTTS: speech failed — %s", exc)
|
||||
|
||||
def set_rate(self, rate: int) -> None:
|
||||
"""Set the speech rate in words per minute."""
|
||||
self._rate = rate
|
||||
if self._engine:
|
||||
self._engine.setProperty("rate", rate)
|
||||
|
||||
def set_volume(self, volume: float) -> None:
|
||||
"""Set the speech volume (0.0 to 1.0)."""
|
||||
self._volume = max(0.0, min(1.0, volume))
|
||||
if self._engine:
|
||||
self._engine.setProperty("volume", self._volume)
|
||||
@@ -92,6 +94,7 @@ class VoiceTTS:
|
||||
return []
|
||||
|
||||
def set_voice(self, voice_id: str) -> None:
|
||||
"""Set the active voice by its system ID."""
|
||||
if self._engine:
|
||||
self._engine.setProperty("voice", voice_id)
|
||||
|
||||
|
||||
@@ -20,6 +20,9 @@ pytestmark = pytest.mark.skipif(
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def setup_prod_env():
|
||||
"""Ensure a clean environment and run the full installation."""
|
||||
if not SETUP_SCRIPT_PATH.exists():
|
||||
pytest.skip(f"Setup script not found at {SETUP_SCRIPT_PATH}")
|
||||
|
||||
if PROD_PROJECT_DIR.exists():
|
||||
shutil.rmtree(PROD_PROJECT_DIR)
|
||||
|
||||
|
||||
80
tests/unit/test_db_pool.py
Normal file
80
tests/unit/test_db_pool.py
Normal file
@@ -0,0 +1,80 @@
|
||||
"""Tests for the thread-local SQLite ConnectionPool."""
|
||||
|
||||
import sqlite3
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.db_pool import ConnectionPool
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
|
||||
def test_pool_creates_connection(tmp_path: Path):
|
||||
"""Test that the pool successfully creates a SQLite connection."""
|
||||
db_file = tmp_path / "test.db"
|
||||
pool = ConnectionPool(db_file)
|
||||
|
||||
conn = pool.get_connection()
|
||||
assert isinstance(conn, sqlite3.Connection)
|
||||
|
||||
cursor = conn.execute("SELECT 1")
|
||||
assert cursor.fetchone()[0] == 1
|
||||
|
||||
|
||||
def test_pool_reuses_connection_same_thread(tmp_path: Path):
|
||||
"""Test that multiple calls in the same thread return the same connection."""
|
||||
db_file = tmp_path / "test.db"
|
||||
pool = ConnectionPool(db_file)
|
||||
|
||||
conn1 = pool.get_connection()
|
||||
conn2 = pool.get_connection()
|
||||
assert conn1 is conn2
|
||||
|
||||
|
||||
def test_pool_different_connections_different_threads(tmp_path: Path):
|
||||
"""Test that different threads receive distinct connections."""
|
||||
db_file = tmp_path / "test.db"
|
||||
pool = ConnectionPool(db_file)
|
||||
|
||||
conn1 = pool.get_connection()
|
||||
conn2_list = []
|
||||
|
||||
def _worker():
|
||||
conn2_list.append(pool.get_connection())
|
||||
|
||||
thread = threading.Thread(target=_worker)
|
||||
thread.start()
|
||||
thread.join()
|
||||
|
||||
assert len(conn2_list) == 1
|
||||
conn2 = conn2_list[0]
|
||||
assert conn1 is not conn2
|
||||
|
||||
|
||||
def test_pool_close_connection(tmp_path: Path):
|
||||
"""Test that connection is closed and cleared from thread local."""
|
||||
db_file = tmp_path / "test.db"
|
||||
pool = ConnectionPool(db_file)
|
||||
|
||||
conn1 = pool.get_connection()
|
||||
pool.close_connection()
|
||||
|
||||
# Getting a new connection should create a new object
|
||||
conn2 = pool.get_connection()
|
||||
assert conn1 is not conn2
|
||||
|
||||
|
||||
def test_pool_context_manager(tmp_path: Path):
|
||||
"""Test that the context manager yields a connection and closes it after."""
|
||||
db_file = tmp_path / "test.db"
|
||||
pool = ConnectionPool(db_file)
|
||||
|
||||
with pool.connection() as conn1:
|
||||
assert isinstance(conn1, sqlite3.Connection)
|
||||
|
||||
# After exiting the context manager, the connection should be closed implicitly
|
||||
# resulting in a new connection object for the next request.
|
||||
conn2 = pool.get_connection()
|
||||
assert conn1 is not conn2
|
||||
91
tests/unit/test_health_routes.py
Normal file
91
tests/unit/test_health_routes.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""Tests for the health and sovereignty endpoints."""
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from dashboard.app import app
|
||||
from dashboard.routes.health import DependencyStatus
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_ollama_healthy():
|
||||
with patch("dashboard.routes.health.check_ollama", return_value=True):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_ollama_unavailable():
|
||||
with patch("dashboard.routes.health.check_ollama", return_value=False):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_check_ollama_sovereignty():
|
||||
dep = DependencyStatus(
|
||||
name="Ollama AI",
|
||||
status="healthy",
|
||||
sovereignty_score=10,
|
||||
details={"url": "http://localhost:11434"},
|
||||
)
|
||||
with patch("dashboard.routes.health._check_ollama", return_value=dep):
|
||||
yield
|
||||
|
||||
|
||||
def test_health_check_healthy(mock_ollama_healthy):
|
||||
"""Test legacy health check endpoint when Ollama is up."""
|
||||
response = client.get("/health")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["status"] == "ok"
|
||||
assert data["services"]["ollama"] == "up"
|
||||
assert data["agents"]["agent"]["status"] == "idle"
|
||||
|
||||
|
||||
def test_health_check_degraded(mock_ollama_unavailable):
|
||||
"""Test legacy health check endpoint when Ollama is down."""
|
||||
response = client.get("/health")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["status"] == "degraded"
|
||||
assert data["services"]["ollama"] == "down"
|
||||
assert data["agents"]["agent"]["status"] == "offline"
|
||||
|
||||
|
||||
def test_health_status_panel_healthy(mock_ollama_healthy):
|
||||
"""Test HTML status panel rendering."""
|
||||
response = client.get("/health/status")
|
||||
assert response.status_code == 200
|
||||
assert "text/html" in response.headers["content-type"]
|
||||
assert "UP" in response.text
|
||||
assert "#10b981" in response.text
|
||||
|
||||
|
||||
def test_sovereignty_check(mock_check_ollama_sovereignty):
|
||||
"""Test comprehensive sovereignty audit report."""
|
||||
with (
|
||||
patch("dashboard.routes.health._check_lightning") as mock_lightning,
|
||||
patch("dashboard.routes.health._check_sqlite") as mock_sqlite,
|
||||
):
|
||||
mock_lightning.return_value = DependencyStatus(
|
||||
name="Lightning", status="unavailable", sovereignty_score=8, details={}
|
||||
)
|
||||
mock_sqlite.return_value = DependencyStatus(
|
||||
name="SQLite", status="healthy", sovereignty_score=10, details={}
|
||||
)
|
||||
|
||||
response = client.get("/health/sovereignty")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
|
||||
# (10 + 8 + 10) / 3 = 9.3
|
||||
assert data["overall_score"] == 9.3
|
||||
assert len(data["dependencies"]) == 3
|
||||
# Ensure recommendations contain note about unavailable dependency
|
||||
recommendations = " ".join(data["recommendations"])
|
||||
assert "unavailable" in recommendations.lower()
|
||||
7
tox.ini
7
tox.ini
@@ -50,18 +50,17 @@ commands =
|
||||
description = Fast tests — excludes e2e, functional, and external services
|
||||
commands =
|
||||
pytest tests/ -q --tb=short \
|
||||
--ignore=tests/e2e \
|
||||
--ignore=tests/functional \
|
||||
-m "not ollama and not docker and not selenium and not external_api and not skip_ci and not slow" \
|
||||
-m "unit" \
|
||||
-n auto --dist worksteal
|
||||
|
||||
[testenv:integration]
|
||||
description = Integration tests (marked with @pytest.mark.integration)
|
||||
commands =
|
||||
pytest tests/ -q --tb=short \
|
||||
-m "integration and not ollama and not docker and not selenium and not external_api and not slow" \
|
||||
-m "integration" \
|
||||
-n auto --dist worksteal
|
||||
|
||||
|
||||
[testenv:functional]
|
||||
description = Functional tests — real HTTP, no mocking (excl slow + selenium)
|
||||
commands =
|
||||
|
||||
Reference in New Issue
Block a user