Compare commits

..

4 Commits

Author SHA1 Message Date
Alexander Whitestone
bee62403fd fix: add graceful degradation for DB errors in tasks.py
Some checks failed
Tests / lint (pull_request) Failing after 3s
Tests / test (pull_request) Has been skipped
Wrap all _get_db() calls with sqlite3.OperationalError handling so
endpoints degrade gracefully when the task DB is locked or missing:
- Read endpoints (page, partials, list, queue status) return empty data
- Write endpoints (create, update, delete) return 503

Fixes #943

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 19:03:02 -04:00
9656a5e0d0 [claude] Add connection leak and pragma unit tests for db_pool.py (#944) (#1001)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-22 22:56:58 +00:00
Alexander Whitestone
e35a23cefa [claude] Add research prompt template library (#974) (#999)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Co-authored-by: Alexander Whitestone <alexpaynex@gmail.com>
Co-committed-by: Alexander Whitestone <alexpaynex@gmail.com>
2026-03-22 22:44:02 +00:00
Alexander Whitestone
3ab180b8a7 [claude] Add Gitea backup script (#990) (#996)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Co-authored-by: Alexander Whitestone <alexpaynex@gmail.com>
Co-committed-by: Alexander Whitestone <alexpaynex@gmail.com>
2026-03-22 22:36:51 +00:00
12 changed files with 859 additions and 279 deletions

View File

@@ -1,21 +0,0 @@
; ── Gitea Hardening — Security Overrides ─────────────────────────────────────
;
; Merge these settings into your Gitea custom/conf/app.ini.
;
; On a default Gitea install (Docker or bare-metal):
; /path/to/gitea/custom/conf/app.ini
;
; After editing, restart Gitea:
; systemctl restart gitea # bare-metal
; docker restart <gitea-container> # Docker
;
; See also: scripts/harden_gitea.sh (automated version)
[service]
; Disable public registration — only admins can create accounts
DISABLE_REGISTRATION = true
ALLOW_ONLY_EXTERNAL_REGISTRATION = false
SHOW_REGISTRATION_BUTTON = false
; Require sign-in to view any content (repos, explore, etc.)
REQUIRE_SIGNIN_VIEW = true

83
scripts/gitea_backup.sh Executable file
View File

@@ -0,0 +1,83 @@
#!/bin/bash
# Gitea backup script — run on the VPS before any hardening changes.
# Usage: sudo bash scripts/gitea_backup.sh [off-site-dest]
#
# off-site-dest: optional rsync/scp destination for off-site copy
# e.g. user@backup-host:/backups/gitea/
#
# Refs: #971, #990
set -euo pipefail
BACKUP_DIR="/opt/gitea/backups"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
GITEA_CONF="/etc/gitea/app.ini"
GITEA_WORK_DIR="/var/lib/gitea"
OFFSITE_DEST="${1:-}"
echo "=== Gitea Backup — $TIMESTAMP ==="
# Ensure backup directory exists
mkdir -p "$BACKUP_DIR"
cd "$BACKUP_DIR"
# Run the dump
echo "[1/4] Running gitea dump..."
gitea dump -c "$GITEA_CONF"
# Find the newest zip (gitea dump names it gitea-dump-*.zip)
BACKUP_FILE=$(ls -t "$BACKUP_DIR"/gitea-dump-*.zip 2>/dev/null | head -1)
if [ -z "$BACKUP_FILE" ]; then
echo "ERROR: No backup zip found in $BACKUP_DIR"
exit 1
fi
BACKUP_SIZE=$(stat -c%s "$BACKUP_FILE" 2>/dev/null || stat -f%z "$BACKUP_FILE")
echo "[2/4] Backup created: $BACKUP_FILE ($BACKUP_SIZE bytes)"
if [ "$BACKUP_SIZE" -eq 0 ]; then
echo "ERROR: Backup file is 0 bytes"
exit 1
fi
# Lock down permissions
chmod 600 "$BACKUP_FILE"
# Verify contents
echo "[3/4] Verifying backup contents..."
CONTENTS=$(unzip -l "$BACKUP_FILE" 2>/dev/null || true)
check_component() {
if echo "$CONTENTS" | grep -q "$1"; then
echo " OK: $2"
else
echo " WARN: $2 not found in backup"
fi
}
check_component "gitea-db.sql" "Database dump"
check_component "gitea-repo" "Repositories"
check_component "custom" "Custom config"
check_component "app.ini" "app.ini"
# Off-site copy
if [ -n "$OFFSITE_DEST" ]; then
echo "[4/4] Copying to off-site: $OFFSITE_DEST"
rsync -avz "$BACKUP_FILE" "$OFFSITE_DEST"
echo " Off-site copy complete."
else
echo "[4/4] No off-site destination provided. Skipping."
echo " To copy later: scp $BACKUP_FILE user@backup-host:/backups/gitea/"
fi
echo ""
echo "=== Backup complete ==="
echo "File: $BACKUP_FILE"
echo "Size: $BACKUP_SIZE bytes"
echo ""
echo "To verify restore on a clean instance:"
echo " 1. Copy zip to test machine"
echo " 2. unzip $BACKUP_FILE"
echo " 3. gitea restore --from <extracted-dir> -c /etc/gitea/app.ini"
echo " 4. Verify repos and DB are intact"

View File

@@ -1,169 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
# ── Gitea Hardening Script ──────────────────────────────────────────────────
#
# Disables public registration and requires sign-in to view content.
# Refs: Issue #988
#
# Usage (on the Gitea server):
# sudo bash scripts/harden_gitea.sh
# sudo bash scripts/harden_gitea.sh --config /path/to/custom/conf/app.ini
# sudo bash scripts/harden_gitea.sh --docker gitea # restart via docker
#
# What it does:
# 1. Patches [service] section in app.ini
# 2. Restarts Gitea so changes take effect
# 3. Verifies the changes are active
BOLD='\033[1m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m'
info() { echo -e "${GREEN}[+]${NC} $1"; }
warn() { echo -e "${YELLOW}[!]${NC} $1"; }
error() { echo -e "${RED}[x]${NC} $1"; }
# ── Defaults ────────────────────────────────────────────────────────────────
# Common Gitea config paths (checked in order)
SEARCH_PATHS=(
"/etc/gitea/app.ini"
"/opt/gitea/custom/conf/app.ini"
"/data/gitea/conf/app.ini"
"/app/gitea/conf/app.ini"
)
CONFIG_PATH=""
DOCKER_CONTAINER=""
SYSTEMD_SERVICE="gitea"
# ── Parse arguments ─────────────────────────────────────────────────────────
while [[ $# -gt 0 ]]; do
case $1 in
--config) CONFIG_PATH="$2"; shift 2 ;;
--docker) DOCKER_CONTAINER="$2"; shift 2 ;;
--service) SYSTEMD_SERVICE="$2"; shift 2 ;;
-h|--help)
echo "Usage: $0 [--config /path/to/app.ini] [--docker container] [--service name]"
exit 0
;;
*) error "Unknown option: $1"; exit 1 ;;
esac
done
# ── Find config ─────────────────────────────────────────────────────────────
if [ -z "$CONFIG_PATH" ]; then
for path in "${SEARCH_PATHS[@]}"; do
if [ -f "$path" ]; then
CONFIG_PATH="$path"
break
fi
done
fi
# If using Docker, try to find config inside the container
if [ -z "$CONFIG_PATH" ] && [ -n "$DOCKER_CONTAINER" ]; then
for path in "${SEARCH_PATHS[@]}"; do
if docker exec "$DOCKER_CONTAINER" test -f "$path" 2>/dev/null; then
CONFIG_PATH="$path"
info "Found config inside container at $path"
break
fi
done
fi
if [ -z "$CONFIG_PATH" ]; then
error "Could not find Gitea app.ini. Use --config to specify the path."
exit 1
fi
info "Using config: $CONFIG_PATH"
# ── Backup ──────────────────────────────────────────────────────────────────
BACKUP="${CONFIG_PATH}.bak.$(date +%Y%m%d%H%M%S)"
if [ -n "$DOCKER_CONTAINER" ]; then
docker exec "$DOCKER_CONTAINER" cp "$CONFIG_PATH" "$BACKUP"
else
cp "$CONFIG_PATH" "$BACKUP"
fi
info "Backup saved to $BACKUP"
# ── Apply settings ──────────────────────────────────────────────────────────
apply_setting() {
local key="$1"
local value="$2"
local file="$3"
if [ -n "$DOCKER_CONTAINER" ]; then
# Check if key exists (commented or not) and update, otherwise append to [service]
if docker exec "$DOCKER_CONTAINER" grep -qE "^;?\s*${key}\s*=" "$file" 2>/dev/null; then
docker exec "$DOCKER_CONTAINER" sed -i "s|^;*\s*${key}\s*=.*|${key} = ${value}|" "$file"
else
# Append after [service] section header
docker exec "$DOCKER_CONTAINER" sed -i "/^\[service\]/a ${key} = ${value}" "$file"
fi
else
if grep -qE "^;?\s*${key}\s*=" "$file" 2>/dev/null; then
sed -i "s|^;*\s*${key}\s*=.*|${key} = ${value}|" "$file"
else
# Ensure [service] section exists, then append
if ! grep -q '^\[service\]' "$file"; then
printf '\n[service]\n' >> "$file"
fi
sed -i "/^\[service\]/a ${key} = ${value}" "$file"
fi
fi
}
info "Applying hardening settings..."
apply_setting "DISABLE_REGISTRATION" "true" "$CONFIG_PATH"
apply_setting "ALLOW_ONLY_EXTERNAL_REGISTRATION" "false" "$CONFIG_PATH"
apply_setting "SHOW_REGISTRATION_BUTTON" "false" "$CONFIG_PATH"
apply_setting "REQUIRE_SIGNIN_VIEW" "true" "$CONFIG_PATH"
info "Settings applied:"
info " DISABLE_REGISTRATION = true"
info " ALLOW_ONLY_EXTERNAL_REGISTRATION = false"
info " SHOW_REGISTRATION_BUTTON = false"
info " REQUIRE_SIGNIN_VIEW = true"
# ── Restart Gitea ───────────────────────────────────────────────────────────
echo ""
if [ -n "$DOCKER_CONTAINER" ]; then
info "Restarting Gitea container: $DOCKER_CONTAINER"
docker restart "$DOCKER_CONTAINER"
elif systemctl is-active --quiet "$SYSTEMD_SERVICE" 2>/dev/null; then
info "Restarting Gitea via systemd: $SYSTEMD_SERVICE"
systemctl restart "$SYSTEMD_SERVICE"
else
warn "Could not detect Gitea service. Restart Gitea manually to apply changes."
fi
# ── Verify ──────────────────────────────────────────────────────────────────
echo ""
info "Verification — current [service] settings:"
if [ -n "$DOCKER_CONTAINER" ]; then
docker exec "$DOCKER_CONTAINER" grep -A 20 '^\[service\]' "$CONFIG_PATH" | head -25
else
grep -A 20 '^\[service\]' "$CONFIG_PATH" | head -25
fi
echo ""
echo -e "${GREEN}${BOLD} Gitea hardening complete.${NC}"
echo ""
echo " Registration: DISABLED"
echo " Sign-in required to view: YES"
echo ""
echo " Backup: $BACKUP"
echo ""

View File

@@ -0,0 +1,67 @@
---
name: Architecture Spike
type: research
typical_query_count: 2-4
expected_output_length: 600-1200 words
cascade_tier: groq_preferred
description: >
Investigate how to connect two systems or components. Produces an integration
architecture with sequence diagram, key decisions, and a proof-of-concept outline.
---
# Architecture Spike: Connect {system_a} to {system_b}
## Context
We need to integrate **{system_a}** with **{system_b}** in the context of
**{project_context}**. This spike answers: what is the best way to wire them
together, and what are the trade-offs?
## Constraints
- Prefer approaches that avoid adding new infrastructure dependencies.
- The integration should be **{sync_or_async}** (synchronous / asynchronous).
- Must work within: {environment_constraints}.
## Research Steps
1. Identify the APIs / protocols exposed by both systems.
2. List all known integration patterns (direct API, message queue, webhook, SDK, etc.).
3. Evaluate each pattern for complexity, reliability, and latency.
4. Select the recommended approach and outline a proof-of-concept.
## Output Format
### Integration Options
| Pattern | Complexity | Reliability | Latency | Notes |
|---------|-----------|-------------|---------|-------|
| ... | ... | ... | ... | ... |
### Recommended Approach
**Pattern:** {pattern_name}
**Why:** One paragraph explaining the choice.
### Sequence Diagram
```
{system_a} -> {middleware} -> {system_b}
```
Describe the data flow step by step:
1. {system_a} does X...
2. {middleware} transforms / routes...
3. {system_b} receives Y...
### Proof-of-Concept Outline
- Files to create or modify
- Key libraries / dependencies needed
- Estimated effort: {effort_estimate}
### Open Questions
Bullet list of decisions that need human input before proceeding.

View File

@@ -0,0 +1,74 @@
---
name: Competitive Scan
type: research
typical_query_count: 3-5
expected_output_length: 800-1500 words
cascade_tier: groq_preferred
description: >
Compare a project against its alternatives. Produces a feature matrix,
strengths/weaknesses analysis, and positioning summary.
---
# Competitive Scan: {project} vs Alternatives
## Context
Compare **{project}** against **{alternatives}** (comma-separated list of
competitors). The goal is to understand where {project} stands and identify
differentiation opportunities.
## Constraints
- Comparison date: {date}.
- Focus areas: {focus_areas} (e.g., features, pricing, community, performance).
- Perspective: {perspective} (user, developer, business).
## Research Steps
1. Gather key facts about {project} (features, pricing, community size, release cadence).
2. Gather the same data for each alternative in {alternatives}.
3. Build a feature comparison matrix.
4. Identify strengths and weaknesses for each entry.
5. Summarize positioning and recommend next steps.
## Output Format
### Overview
One paragraph: what space does {project} compete in, and who are the main players?
### Feature Matrix
| Feature / Attribute | {project} | {alt_1} | {alt_2} | {alt_3} |
|--------------------|-----------|---------|---------|---------|
| {feature_1} | ... | ... | ... | ... |
| {feature_2} | ... | ... | ... | ... |
| Pricing | ... | ... | ... | ... |
| License | ... | ... | ... | ... |
| Community Size | ... | ... | ... | ... |
| Last Major Release | ... | ... | ... | ... |
### Strengths & Weaknesses
#### {project}
- **Strengths:** ...
- **Weaknesses:** ...
#### {alt_1}
- **Strengths:** ...
- **Weaknesses:** ...
_(Repeat for each alternative)_
### Positioning Map
Describe where each project sits along the key dimensions (e.g., simplicity
vs power, free vs paid, niche vs general).
### Recommendations
Bullet list of actions based on the competitive landscape:
- **Differentiate on:** {differentiator}
- **Watch out for:** {threat}
- **Consider adopting from {alt}:** {feature_or_approach}

View File

@@ -0,0 +1,68 @@
---
name: Game Analysis
type: research
typical_query_count: 2-3
expected_output_length: 600-1000 words
cascade_tier: local_ok
description: >
Evaluate a game for AI agent playability. Assesses API availability,
observation/action spaces, and existing bot ecosystems.
---
# Game Analysis: {game}
## Context
Evaluate **{game}** to determine whether an AI agent can play it effectively.
Focus on programmatic access, observation space, action space, and existing
bot/AI ecosystems.
## Constraints
- Platform: {platform} (PC, console, mobile, browser).
- Agent type: {agent_type} (reinforcement learning, rule-based, LLM-driven, hybrid).
- Budget for API/licenses: {budget}.
## Research Steps
1. Identify official APIs, modding support, or programmatic access methods for {game}.
2. Characterize the observation space (screen pixels, game state JSON, memory reading, etc.).
3. Characterize the action space (keyboard/mouse, API calls, controller inputs).
4. Survey existing bots, AI projects, or research papers for {game}.
5. Assess feasibility and difficulty for the target agent type.
## Output Format
### Game Profile
| Property | Value |
|-------------------|------------------------|
| Game | {game} |
| Genre | {genre} |
| Platform | {platform} |
| API Available | Yes / No / Partial |
| Mod Support | Yes / No / Limited |
| Existing AI Work | Extensive / Some / None|
### Observation Space
Describe what data the agent can access and how (API, screen capture, memory hooks, etc.).
### Action Space
Describe how the agent can interact with the game (input methods, timing constraints, etc.).
### Existing Ecosystem
List known bots, frameworks, research papers, or communities working on AI for {game}.
### Feasibility Assessment
- **Difficulty:** Easy / Medium / Hard / Impractical
- **Best approach:** {recommended_agent_type}
- **Key challenges:** Bullet list
- **Estimated time to MVP:** {time_estimate}
### Recommendation
One paragraph: should we proceed, and if so, what is the first step?

View File

@@ -0,0 +1,79 @@
---
name: Integration Guide
type: research
typical_query_count: 3-5
expected_output_length: 1000-2000 words
cascade_tier: groq_preferred
description: >
Step-by-step guide to wire a specific tool into an existing stack,
complete with code samples, configuration, and testing steps.
---
# Integration Guide: Wire {tool} into {stack}
## Context
Integrate **{tool}** into our **{stack}** stack. The goal is to
**{integration_goal}** (e.g., "add vector search to the dashboard",
"send notifications via Telegram").
## Constraints
- Must follow existing project conventions (see CLAUDE.md).
- No new cloud AI dependencies unless explicitly approved.
- Environment config via `pydantic-settings` / `config.py`.
## Research Steps
1. Review {tool}'s official documentation for installation and setup.
2. Identify the minimal dependency set required.
3. Map {tool}'s API to our existing patterns (singletons, graceful degradation).
4. Write integration code with proper error handling.
5. Define configuration variables and their defaults.
## Output Format
### Prerequisites
- Dependencies to install (with versions)
- External services or accounts required
- Environment variables to configure
### Configuration
```python
# In config.py — add these fields to Settings:
{config_fields}
```
### Implementation
```python
# {file_path}
{implementation_code}
```
### Graceful Degradation
Describe how the integration behaves when {tool} is unavailable:
| Scenario | Behavior | Log Level |
|-----------------------|--------------------|-----------|
| {tool} not installed | {fallback} | WARNING |
| {tool} unreachable | {fallback} | WARNING |
| Invalid credentials | {fallback} | ERROR |
### Testing
```python
# tests/unit/test_{tool_snake}.py
{test_code}
```
### Verification Checklist
- [ ] Dependency added to pyproject.toml
- [ ] Config fields added with sensible defaults
- [ ] Graceful degradation tested (service down)
- [ ] Unit tests pass (`tox -e unit`)
- [ ] No new linting errors (`tox -e lint`)

View File

@@ -0,0 +1,67 @@
---
name: State of the Art
type: research
typical_query_count: 4-6
expected_output_length: 1000-2000 words
cascade_tier: groq_preferred
description: >
Comprehensive survey of what currently exists in a given field or domain.
Produces a structured landscape overview with key players, trends, and gaps.
---
# State of the Art: {field} (as of {date})
## Context
Survey the current landscape of **{field}**. Identify key players, recent
developments, dominant approaches, and notable gaps. This is a point-in-time
snapshot intended to inform decision-making.
## Constraints
- Focus on developments from the last {timeframe} (e.g., 12 months, 2 years).
- Prioritize {priority} (open-source, commercial, academic, or all).
- Target audience: {audience} (technical team, leadership, general).
## Research Steps
1. Identify the major categories or sub-domains within {field}.
2. For each category, list the leading projects, companies, or research groups.
3. Note recent milestones, releases, or breakthroughs.
4. Identify emerging trends and directions.
5. Highlight gaps — things that don't exist yet but should.
## Output Format
### Executive Summary
Two to three sentences: what is the state of {field} right now?
### Landscape Map
| Category | Key Players | Maturity | Trend |
|---------------|--------------------------|-------------|-------------|
| {category_1} | {player_a}, {player_b} | Early / GA | Growing / Stable / Declining |
| {category_2} | {player_c}, {player_d} | Early / GA | Growing / Stable / Declining |
### Recent Milestones
Chronological list of notable events in the last {timeframe}:
- **{date_1}:** {event_description}
- **{date_2}:** {event_description}
### Trends
Numbered list of the top 3-5 trends shaping {field}:
1. **{trend_name}** — {one-line description}
2. **{trend_name}** — {one-line description}
### Gaps & Opportunities
Bullet list of things that are missing, underdeveloped, or ripe for innovation.
### Implications for Us
One paragraph: what does this mean for our project? What should we do next?

View File

@@ -0,0 +1,52 @@
---
name: Tool Evaluation
type: research
typical_query_count: 3-5
expected_output_length: 800-1500 words
cascade_tier: groq_preferred
description: >
Discover and evaluate all shipping tools/libraries/services in a given domain.
Produces a ranked comparison table with pros, cons, and recommendation.
---
# Tool Evaluation: {domain}
## Context
You are researching tools, libraries, and services for **{domain}**.
The goal is to find everything that is currently shipping (not vaporware)
and produce a structured comparison.
## Constraints
- Only include tools that have public releases or hosted services available today.
- If a tool is in beta/preview, note that clearly.
- Focus on {focus_criteria} when evaluating (e.g., cost, ease of integration, community size).
## Research Steps
1. Identify all actively-maintained tools in the **{domain}** space.
2. For each tool, gather: name, URL, license/pricing, last release date, language/platform.
3. Evaluate each tool against the focus criteria.
4. Rank by overall fit for the use case: **{use_case}**.
## Output Format
### Summary
One paragraph: what the landscape looks like and the top recommendation.
### Comparison Table
| Tool | License / Price | Last Release | Language | {focus_criteria} Score | Notes |
|------|----------------|--------------|----------|----------------------|-------|
| ... | ... | ... | ... | ... | ... |
### Top Pick
- **Recommended:** {tool_name} — {one-line reason}
- **Runner-up:** {tool_name} — {one-line reason}
### Risks & Gaps
Bullet list of things to watch out for (missing features, vendor lock-in, etc.).

View File

@@ -104,25 +104,29 @@ class _TaskView:
@router.get("/tasks", response_class=HTMLResponse)
async def tasks_page(request: Request):
"""Render the main task queue page with 3-column layout."""
with _get_db() as db:
pending = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('pending_approval') ORDER BY created_at DESC"
).fetchall()
]
active = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
]
completed = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
]
try:
with _get_db() as db:
pending = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('pending_approval') ORDER BY created_at DESC"
).fetchall()
]
active = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
]
completed = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
]
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
pending, active, completed = [], [], []
return templates.TemplateResponse(
request,
@@ -146,10 +150,14 @@ async def tasks_page(request: Request):
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending(request: Request):
"""Return HTMX partial for pending approval tasks."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
).fetchall()
try:
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
).fetchall()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
return HTMLResponse('<div class="empty-column">Database unavailable</div>')
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
@@ -166,10 +174,14 @@ async def tasks_pending(request: Request):
@router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active(request: Request):
"""Return HTMX partial for active (approved/running/paused) tasks."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
try:
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
return HTMLResponse('<div class="empty-column">Database unavailable</div>')
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
@@ -186,10 +198,14 @@ async def tasks_active(request: Request):
@router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed(request: Request):
"""Return HTMX partial for completed/vetoed/failed tasks (last 50)."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
try:
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
return HTMLResponse('<div class="empty-column">Database unavailable</div>')
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
@@ -225,13 +241,17 @@ async def create_task_form(
now = datetime.now(UTC).isoformat()
priority = priority if priority in VALID_PRIORITIES else "normal"
with _get_db() as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(task_id, title, description, priority, assigned_to, now),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
try:
with _get_db() as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(task_id, title, description, priority, assigned_to, now),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
task = _TaskView(_row_to_dict(row))
return templates.TemplateResponse(request, "partials/task_card.html", {"task": task})
@@ -280,13 +300,17 @@ async def modify_task(
description: str = Form(""),
):
"""Update task title and description."""
with _get_db() as db:
db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?",
(title, description, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
try:
with _get_db() as db:
db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?",
(title, description, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
if not row:
raise HTTPException(404, "Task not found")
task = _TaskView(_row_to_dict(row))
@@ -298,13 +322,17 @@ async def _set_status(request: Request, task_id: str, new_status: str):
completed_at = (
datetime.now(UTC).isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
with _get_db() as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
try:
with _get_db() as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
if not row:
raise HTTPException(404, "Task not found")
task = _TaskView(_row_to_dict(row))
@@ -330,22 +358,26 @@ async def api_create_task(request: Request):
if priority not in VALID_PRIORITIES:
priority = "normal"
with _get_db() as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_by, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
task_id,
title,
body.get("description", ""),
priority,
body.get("assigned_to", ""),
body.get("created_by", "operator"),
now,
),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
try:
with _get_db() as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_by, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
task_id,
title,
body.get("description", ""),
priority,
body.get("assigned_to", ""),
body.get("created_by", "operator"),
now,
),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
return JSONResponse(_row_to_dict(row), status_code=201)
@@ -353,8 +385,12 @@ async def api_create_task(request: Request):
@router.get("/api/tasks", response_class=JSONResponse)
async def api_list_tasks():
"""List all tasks as JSON."""
with _get_db() as db:
rows = db.execute("SELECT * FROM tasks ORDER BY created_at DESC").fetchall()
try:
with _get_db() as db:
rows = db.execute("SELECT * FROM tasks ORDER BY created_at DESC").fetchall()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
return JSONResponse([], status_code=200)
return JSONResponse([_row_to_dict(r) for r in rows])
@@ -369,13 +405,17 @@ async def api_update_status(task_id: str, request: Request):
completed_at = (
datetime.now(UTC).isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
with _get_db() as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
try:
with _get_db() as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
if not row:
raise HTTPException(404, "Task not found")
return JSONResponse(_row_to_dict(row))
@@ -384,9 +424,13 @@ async def api_update_status(task_id: str, request: Request):
@router.delete("/api/tasks/{task_id}", response_class=JSONResponse)
async def api_delete_task(task_id: str):
"""Delete a task."""
with _get_db() as db:
cursor = db.execute("DELETE FROM tasks WHERE id=?", (task_id,))
db.commit()
try:
with _get_db() as db:
cursor = db.execute("DELETE FROM tasks WHERE id=?", (task_id,))
db.commit()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
if cursor.rowcount == 0:
raise HTTPException(404, "Task not found")
return JSONResponse({"success": True, "id": task_id})
@@ -400,15 +444,19 @@ async def api_delete_task(task_id: str):
@router.get("/api/queue/status", response_class=JSONResponse)
async def queue_status(assigned_to: str = "default"):
"""Return queue status for the chat panel's agent status indicator."""
with _get_db() as db:
running = db.execute(
"SELECT * FROM tasks WHERE status='running' AND assigned_to=? LIMIT 1",
(assigned_to,),
).fetchone()
ahead = db.execute(
"SELECT COUNT(*) as cnt FROM tasks WHERE status IN ('pending_approval','approved') AND assigned_to=?",
(assigned_to,),
).fetchone()
try:
with _get_db() as db:
running = db.execute(
"SELECT * FROM tasks WHERE status='running' AND assigned_to=? LIMIT 1",
(assigned_to,),
).fetchone()
ahead = db.execute(
"SELECT COUNT(*) as cnt FROM tasks WHERE status IN ('pending_approval','approved') AND assigned_to=?",
(assigned_to,),
).fetchone()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
return JSONResponse({"is_working": False, "current_task": None, "tasks_ahead": 0})
if running:
return JSONResponse(

View File

@@ -3,6 +3,99 @@
Verifies task CRUD operations and the dashboard page rendering.
"""
import sqlite3
from unittest.mock import patch
# ---------------------------------------------------------------------------
# DB error handling tests
# ---------------------------------------------------------------------------
_DB_ERROR = sqlite3.OperationalError("database is locked")
def test_tasks_page_degrades_on_db_error(client):
"""GET /tasks renders empty columns when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.get("/tasks")
assert response.status_code == 200
assert "TASK QUEUE" in response.text
def test_pending_partial_degrades_on_db_error(client):
"""GET /tasks/pending returns fallback HTML when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.get("/tasks/pending")
assert response.status_code == 200
assert "Database unavailable" in response.text
def test_active_partial_degrades_on_db_error(client):
"""GET /tasks/active returns fallback HTML when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.get("/tasks/active")
assert response.status_code == 200
assert "Database unavailable" in response.text
def test_completed_partial_degrades_on_db_error(client):
"""GET /tasks/completed returns fallback HTML when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.get("/tasks/completed")
assert response.status_code == 200
assert "Database unavailable" in response.text
def test_api_create_task_503_on_db_error(client):
"""POST /api/tasks returns 503 when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.post("/api/tasks", json={"title": "Test"})
assert response.status_code == 503
def test_api_list_tasks_empty_on_db_error(client):
"""GET /api/tasks returns empty list when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.get("/api/tasks")
assert response.status_code == 200
assert response.json() == []
def test_queue_status_degrades_on_db_error(client):
"""GET /api/queue/status returns idle status when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.get("/api/queue/status")
assert response.status_code == 200
data = response.json()
assert data["is_working"] is False
assert data["current_task"] is None
# ---------------------------------------------------------------------------
# Existing tests
# ---------------------------------------------------------------------------
def test_tasks_page_returns_200(client):
response = client.get("/tasks")

View File

@@ -242,6 +242,145 @@ class TestCloseAll:
conn.execute("SELECT 1")
class TestConnectionLeaks:
"""Test that connections do not leak."""
def test_get_connection_after_close_returns_fresh_connection(self, tmp_path):
"""After close, get_connection() returns a new working connection."""
pool = ConnectionPool(tmp_path / "test.db")
conn1 = pool.get_connection()
pool.close_connection()
conn2 = pool.get_connection()
assert conn2 is not conn1
# New connection must be usable
cursor = conn2.execute("SELECT 1")
assert cursor.fetchone()[0] == 1
pool.close_connection()
def test_context_manager_does_not_leak_connection(self, tmp_path):
"""After context manager exit, thread-local conn is cleared."""
pool = ConnectionPool(tmp_path / "test.db")
with pool.connection():
pass
# Thread-local should be cleaned up
assert pool._local.conn is None
def test_context_manager_exception_does_not_leak_connection(self, tmp_path):
"""Connection is cleaned up even when an exception occurs."""
pool = ConnectionPool(tmp_path / "test.db")
try:
with pool.connection():
raise RuntimeError("boom")
except RuntimeError:
pass
assert pool._local.conn is None
def test_threads_do_not_leak_into_each_other(self, tmp_path):
"""A connection opened in one thread is invisible to another."""
pool = ConnectionPool(tmp_path / "test.db")
# Open a connection on main thread
pool.get_connection()
visible_from_other_thread = []
def check():
has_conn = hasattr(pool._local, "conn") and pool._local.conn is not None
visible_from_other_thread.append(has_conn)
t = threading.Thread(target=check)
t.start()
t.join()
assert visible_from_other_thread == [False]
pool.close_connection()
def test_repeated_open_close_cycles(self, tmp_path):
"""Repeated open/close cycles do not accumulate leaked connections."""
pool = ConnectionPool(tmp_path / "test.db")
for _ in range(50):
with pool.connection() as conn:
conn.execute("SELECT 1")
# After each cycle, connection should be cleaned up
assert pool._local.conn is None
class TestPragmaApplication:
"""Test that SQLite pragmas can be applied and persist on pooled connections.
The codebase uses WAL journal mode and busy_timeout pragmas on connections
obtained from the pool. These tests verify that pattern works correctly.
"""
def test_wal_journal_mode_persists(self, tmp_path):
"""WAL journal mode set on a pooled connection persists for its lifetime."""
pool = ConnectionPool(tmp_path / "test.db")
conn = pool.get_connection()
conn.execute("PRAGMA journal_mode=WAL")
mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
assert mode == "wal"
# Same connection should retain the pragma
same_conn = pool.get_connection()
mode2 = same_conn.execute("PRAGMA journal_mode").fetchone()[0]
assert mode2 == "wal"
pool.close_connection()
def test_busy_timeout_persists(self, tmp_path):
"""busy_timeout pragma set on a pooled connection persists."""
pool = ConnectionPool(tmp_path / "test.db")
conn = pool.get_connection()
conn.execute("PRAGMA busy_timeout=5000")
timeout = conn.execute("PRAGMA busy_timeout").fetchone()[0]
assert timeout == 5000
pool.close_connection()
def test_pragmas_apply_per_connection(self, tmp_path):
"""Pragmas set on one thread's connection are independent of another's."""
pool = ConnectionPool(tmp_path / "test.db")
conn_main = pool.get_connection()
conn_main.execute("PRAGMA cache_size=9999")
other_cache = []
def check_pragma():
conn = pool.get_connection()
# Don't set cache_size — should get the default, not 9999
val = conn.execute("PRAGMA cache_size").fetchone()[0]
other_cache.append(val)
pool.close_connection()
t = threading.Thread(target=check_pragma)
t.start()
t.join()
# Other thread's connection should NOT have our custom cache_size
assert other_cache[0] != 9999
pool.close_connection()
def test_session_pragma_resets_on_new_connection(self, tmp_path):
"""Session-level pragmas (cache_size) reset on a new connection."""
pool = ConnectionPool(tmp_path / "test.db")
conn1 = pool.get_connection()
conn1.execute("PRAGMA cache_size=9999")
assert conn1.execute("PRAGMA cache_size").fetchone()[0] == 9999
pool.close_connection()
conn2 = pool.get_connection()
cache = conn2.execute("PRAGMA cache_size").fetchone()[0]
# New connection gets default cache_size, not the previous value
assert cache != 9999
pool.close_connection()
def test_wal_mode_via_context_manager(self, tmp_path):
"""WAL mode can be set within a context manager block."""
pool = ConnectionPool(tmp_path / "test.db")
with pool.connection() as conn:
conn.execute("PRAGMA journal_mode=WAL")
mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
assert mode == "wal"
class TestIntegration:
"""Integration tests for real-world usage patterns."""