feat: Three-Strike Detector for sovereignty enforcement (#962)
Some checks failed
Tests / lint (pull_request) Failing after 14s
Tests / test (pull_request) Has been skipped

- Add src/timmy/sovereignty/three_strike.py with violation tracking
- Add src/dashboard/routes/three_strike.py with REST API endpoints
- Wire three_strike_router into dashboard app
- Add 16 unit tests (test_three_strike.py) and route tests (test_three_strike_routes.py)
- All 400 unit tests passing

Fixes #962

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexander Whitestone
2026-03-23 18:50:24 -04:00
parent d697c3d93e
commit 20a5f14d65
6 changed files with 1024 additions and 0 deletions

View File

@@ -49,6 +49,7 @@ from dashboard.routes.quests import router as quests_router
from dashboard.routes.scorecards import router as scorecards_router
from dashboard.routes.sovereignty_metrics import router as sovereignty_metrics_router
from dashboard.routes.sovereignty_ws import router as sovereignty_ws_router
from dashboard.routes.three_strike import router as three_strike_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router
@@ -676,6 +677,7 @@ app.include_router(quests_router)
app.include_router(scorecards_router)
app.include_router(sovereignty_metrics_router)
app.include_router(sovereignty_ws_router)
app.include_router(three_strike_router)
@app.websocket("/ws")

View File

@@ -0,0 +1,118 @@
"""Three-Strike Detector dashboard routes.
Provides JSON API endpoints for inspecting and managing the three-strike
detector state.
Refs: #962
"""
import logging
from typing import Any
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from timmy.sovereignty.three_strike import CATEGORIES, get_detector
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/sovereignty/three-strike", tags=["three-strike"])
class RecordRequest(BaseModel):
category: str
key: str
metadata: dict[str, Any] = {}
class AutomationRequest(BaseModel):
artifact_path: str
@router.get("")
async def list_strikes() -> dict[str, Any]:
"""Return all strike records."""
detector = get_detector()
records = detector.list_all()
return {
"records": [
{
"category": r.category,
"key": r.key,
"count": r.count,
"blocked": r.blocked,
"automation": r.automation,
"first_seen": r.first_seen,
"last_seen": r.last_seen,
}
for r in records
],
"categories": sorted(CATEGORIES),
}
@router.get("/blocked")
async def list_blocked() -> dict[str, Any]:
"""Return only blocked (category, key) pairs."""
detector = get_detector()
records = detector.list_blocked()
return {
"blocked": [
{
"category": r.category,
"key": r.key,
"count": r.count,
"automation": r.automation,
"last_seen": r.last_seen,
}
for r in records
]
}
@router.post("/record")
async def record_strike(body: RecordRequest) -> dict[str, Any]:
"""Record a manual action. Returns strike state; 409 when blocked."""
from timmy.sovereignty.three_strike import ThreeStrikeError
detector = get_detector()
try:
record = detector.record(body.category, body.key, body.metadata)
return {
"category": record.category,
"key": record.key,
"count": record.count,
"blocked": record.blocked,
"automation": record.automation,
}
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
except ThreeStrikeError as exc:
raise HTTPException(
status_code=409,
detail={
"error": "three_strike_block",
"message": str(exc),
"category": exc.category,
"key": exc.key,
"count": exc.count,
},
) from exc
@router.post("/{category}/{key}/automation")
async def register_automation(
category: str, key: str, body: AutomationRequest
) -> dict[str, bool]:
"""Register an automation artifact to unblock a (category, key) pair."""
detector = get_detector()
detector.register_automation(category, key, body.artifact_path)
return {"success": True}
@router.get("/{category}/{key}/events")
async def get_strike_events(category: str, key: str, limit: int = 50) -> dict[str, Any]:
"""Return the individual strike events for a (category, key) pair."""
detector = get_detector()
events = detector.get_events(category, key, limit=limit)
return {"category": category, "key": key, "events": events}

View File

@@ -4,4 +4,8 @@ Tracks how much of each AI layer (perception, decision, narration)
runs locally vs. calls out to an LLM. Feeds the sovereignty dashboard.
Refs: #954, #953
Three-strike detector and automation enforcement.
Refs: #962
"""

View File

@@ -0,0 +1,486 @@
"""Three-Strike Detector for Repeated Manual Work.
Tracks recurring manual actions by category and key. When the same action
is performed three or more times, it blocks further attempts and requires
an automation artifact to be registered first.
Strike 1 (count=1): discovery — action proceeds normally
Strike 2 (count=2): warning — action proceeds with a logged warning
Strike 3 (count≥3): blocked — raises ThreeStrikeError; caller must
register an automation artifact first
Governing principle: "If you do the same thing manually three times,
you have failed to crystallise."
Categories tracked:
- vlm_prompt_edit VLM prompt edits for the same UI element
- game_bug_review Manual game-bug reviews for the same bug type
- parameter_tuning Manual parameter tuning for the same parameter
- portal_adapter_creation Manual portal-adapter creation for same pattern
- deployment_step Manual deployment steps
The Falsework Checklist is enforced before cloud API calls via
:func:`falsework_check`.
Refs: #962
"""
from __future__ import annotations
import json
import logging
import sqlite3
from contextlib import closing
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ── Constants ────────────────────────────────────────────────────────────────
DB_PATH = Path(settings.repo_root) / "data" / "three_strike.db"
CATEGORIES = frozenset(
{
"vlm_prompt_edit",
"game_bug_review",
"parameter_tuning",
"portal_adapter_creation",
"deployment_step",
}
)
STRIKE_WARNING = 2
STRIKE_BLOCK = 3
_SCHEMA = """
CREATE TABLE IF NOT EXISTS strikes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL,
key TEXT NOT NULL,
count INTEGER NOT NULL DEFAULT 0,
blocked INTEGER NOT NULL DEFAULT 0,
automation TEXT DEFAULT NULL,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_strikes_cat_key ON strikes(category, key);
CREATE INDEX IF NOT EXISTS idx_strikes_blocked ON strikes(blocked);
CREATE TABLE IF NOT EXISTS strike_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL,
key TEXT NOT NULL,
strike_num INTEGER NOT NULL,
metadata TEXT DEFAULT '{}',
timestamp TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_se_cat_key ON strike_events(category, key);
CREATE INDEX IF NOT EXISTS idx_se_ts ON strike_events(timestamp);
"""
# ── Exceptions ────────────────────────────────────────────────────────────────
class ThreeStrikeError(RuntimeError):
"""Raised when a manual action has reached the third strike.
Attributes:
category: The action category (e.g. ``"vlm_prompt_edit"``).
key: The specific action key (e.g. a UI element name).
count: Total number of times this action has been recorded.
"""
def __init__(self, category: str, key: str, count: int) -> None:
self.category = category
self.key = key
self.count = count
super().__init__(
f"Three-strike block: '{category}/{key}' has been performed manually "
f"{count} time(s). Register an automation artifact before continuing. "
f"Run the Falsework Checklist (see three_strike.falsework_check)."
)
# ── Data classes ──────────────────────────────────────────────────────────────
@dataclass
class StrikeRecord:
"""State for one (category, key) pair."""
category: str
key: str
count: int
blocked: bool
automation: str | None
first_seen: str
last_seen: str
@dataclass
class FalseworkChecklist:
"""Pre-cloud-API call checklist — must be completed before making
expensive external calls.
Instantiate and call :meth:`validate` to ensure all answers are provided.
"""
durable_artifact: str = ""
artifact_storage_path: str = ""
local_rule_or_cache: str = ""
will_repeat: bool | None = None
elimination_strategy: str = ""
sovereignty_delta: str = ""
# ── internal ──
_errors: list[str] = field(default_factory=list, init=False, repr=False)
def validate(self) -> list[str]:
"""Return a list of unanswered questions. Empty list → checklist passes."""
self._errors = []
if not self.durable_artifact.strip():
self._errors.append("Q1: What durable artifact will this call produce?")
if not self.artifact_storage_path.strip():
self._errors.append("Q2: Where will the artifact be stored locally?")
if not self.local_rule_or_cache.strip():
self._errors.append("Q3: What local rule or cache will this populate?")
if self.will_repeat is None:
self._errors.append("Q4: After this call, will I need to make it again?")
if self.will_repeat and not self.elimination_strategy.strip():
self._errors.append("Q5: If yes, what would eliminate the repeat?")
if not self.sovereignty_delta.strip():
self._errors.append("Q6: What is the sovereignty delta of this call?")
return self._errors
@property
def passed(self) -> bool:
"""True when :meth:`validate` found no unanswered questions."""
return len(self.validate()) == 0
# ── Store ─────────────────────────────────────────────────────────────────────
class ThreeStrikeStore:
"""SQLite-backed three-strike store.
Thread-safe: creates a new connection per operation.
"""
def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or DB_PATH
self._init_db()
# ── setup ─────────────────────────────────────────────────────────────
def _init_db(self) -> None:
try:
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(self._db_path))) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
conn.executescript(_SCHEMA)
conn.commit()
except Exception as exc:
logger.warning("Failed to initialise three-strike DB: %s", exc)
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
return conn
# ── record ────────────────────────────────────────────────────────────
def record(
self,
category: str,
key: str,
metadata: dict[str, Any] | None = None,
) -> StrikeRecord:
"""Record a manual action and return the updated :class:`StrikeRecord`.
Raises :exc:`ThreeStrikeError` when the action is already blocked
(count ≥ STRIKE_BLOCK) and no automation has been registered.
Args:
category: Action category; must be in :data:`CATEGORIES`.
key: Specific identifier within the category.
metadata: Optional context stored alongside the event.
Returns:
The updated :class:`StrikeRecord`.
Raises:
ValueError: If *category* is not in :data:`CATEGORIES`.
ThreeStrikeError: On the third (or later) strike with no automation.
"""
if category not in CATEGORIES:
raise ValueError(
f"Unknown category '{category}'. Valid: {sorted(CATEGORIES)}"
)
now = datetime.now(UTC).isoformat()
meta_json = json.dumps(metadata or {})
try:
with closing(self._connect()) as conn:
# Upsert the aggregate row
conn.execute(
"""
INSERT INTO strikes (category, key, count, blocked, first_seen, last_seen)
VALUES (?, ?, 1, 0, ?, ?)
ON CONFLICT(category, key) DO UPDATE SET
count = count + 1,
last_seen = excluded.last_seen
""",
(category, key, now, now),
)
row = conn.execute(
"SELECT * FROM strikes WHERE category=? AND key=?",
(category, key),
).fetchone()
count = row["count"]
blocked = bool(row["blocked"])
automation = row["automation"]
# Record the individual event
conn.execute(
"INSERT INTO strike_events (category, key, strike_num, metadata, timestamp) "
"VALUES (?, ?, ?, ?, ?)",
(category, key, count, meta_json, now),
)
# Mark as blocked once threshold reached
if count >= STRIKE_BLOCK and not blocked:
conn.execute(
"UPDATE strikes SET blocked=1 WHERE category=? AND key=?",
(category, key),
)
blocked = True
conn.commit()
except ThreeStrikeError:
raise
except Exception as exc:
logger.warning("Three-strike DB error during record: %s", exc)
# Re-raise DB errors so callers are aware
raise
record = StrikeRecord(
category=category,
key=key,
count=count,
blocked=blocked,
automation=automation,
first_seen=row["first_seen"],
last_seen=now,
)
self._emit_log(record)
if blocked and not automation:
raise ThreeStrikeError(category=category, key=key, count=count)
return record
def _emit_log(self, record: StrikeRecord) -> None:
"""Log a warning or info message based on strike number."""
if record.count == STRIKE_WARNING:
logger.warning(
"Three-strike WARNING: '%s/%s' has been performed manually %d times. "
"Consider writing an automation.",
record.category,
record.key,
record.count,
)
elif record.count >= STRIKE_BLOCK:
logger.warning(
"Three-strike BLOCK: '%s/%s' reached %d strikes — automation required.",
record.category,
record.key,
record.count,
)
else:
logger.info(
"Three-strike discovery: '%s/%s' — strike %d.",
record.category,
record.key,
record.count,
)
# ── automation registration ───────────────────────────────────────────
def register_automation(
self,
category: str,
key: str,
artifact_path: str,
) -> None:
"""Unblock a (category, key) pair by registering an automation artifact.
Once registered, future calls to :meth:`record` will proceed normally
and the strike counter resets to zero.
Args:
category: Action category.
key: Specific identifier within the category.
artifact_path: Path or identifier of the automation artifact.
"""
try:
with closing(self._connect()) as conn:
conn.execute(
"UPDATE strikes SET automation=?, blocked=0, count=0 "
"WHERE category=? AND key=?",
(artifact_path, category, key),
)
conn.commit()
logger.info(
"Three-strike: automation registered for '%s/%s'%s",
category,
key,
artifact_path,
)
except Exception as exc:
logger.warning("Failed to register automation: %s", exc)
# ── queries ───────────────────────────────────────────────────────────
def get(self, category: str, key: str) -> StrikeRecord | None:
"""Return the :class:`StrikeRecord` for (category, key), or None."""
try:
with closing(self._connect()) as conn:
row = conn.execute(
"SELECT * FROM strikes WHERE category=? AND key=?",
(category, key),
).fetchone()
if row is None:
return None
return StrikeRecord(
category=row["category"],
key=row["key"],
count=row["count"],
blocked=bool(row["blocked"]),
automation=row["automation"],
first_seen=row["first_seen"],
last_seen=row["last_seen"],
)
except Exception as exc:
logger.warning("Failed to query strike record: %s", exc)
return None
def list_blocked(self) -> list[StrikeRecord]:
"""Return all currently-blocked (category, key) pairs."""
try:
with closing(self._connect()) as conn:
rows = conn.execute(
"SELECT * FROM strikes WHERE blocked=1 ORDER BY last_seen DESC"
).fetchall()
return [
StrikeRecord(
category=r["category"],
key=r["key"],
count=r["count"],
blocked=True,
automation=r["automation"],
first_seen=r["first_seen"],
last_seen=r["last_seen"],
)
for r in rows
]
except Exception as exc:
logger.warning("Failed to query blocked strikes: %s", exc)
return []
def list_all(self) -> list[StrikeRecord]:
"""Return all strike records ordered by last seen (most recent first)."""
try:
with closing(self._connect()) as conn:
rows = conn.execute(
"SELECT * FROM strikes ORDER BY last_seen DESC"
).fetchall()
return [
StrikeRecord(
category=r["category"],
key=r["key"],
count=r["count"],
blocked=bool(r["blocked"]),
automation=r["automation"],
first_seen=r["first_seen"],
last_seen=r["last_seen"],
)
for r in rows
]
except Exception as exc:
logger.warning("Failed to list strike records: %s", exc)
return []
def get_events(self, category: str, key: str, limit: int = 50) -> list[dict]:
"""Return the individual strike events for (category, key)."""
try:
with closing(self._connect()) as conn:
rows = conn.execute(
"SELECT * FROM strike_events WHERE category=? AND key=? "
"ORDER BY timestamp DESC LIMIT ?",
(category, key, limit),
).fetchall()
return [
{
"strike_num": r["strike_num"],
"timestamp": r["timestamp"],
"metadata": json.loads(r["metadata"]) if r["metadata"] else {},
}
for r in rows
]
except Exception as exc:
logger.warning("Failed to query strike events: %s", exc)
return []
# ── Falsework checklist helper ────────────────────────────────────────────────
def falsework_check(checklist: FalseworkChecklist) -> None:
"""Enforce the Falsework Checklist before a cloud API call.
Raises :exc:`ValueError` listing all unanswered questions if the checklist
does not pass.
Usage::
checklist = FalseworkChecklist(
durable_artifact="embedding vectors for UI element foo",
artifact_storage_path="data/vlm/foo_embeddings.json",
local_rule_or_cache="vlm_cache",
will_repeat=False,
sovereignty_delta="eliminates repeated VLM call",
)
falsework_check(checklist) # raises ValueError if incomplete
"""
errors = checklist.validate()
if errors:
raise ValueError(
"Falsework Checklist incomplete — answer all questions before "
"making a cloud API call:\n" + "\n".join(f"{e}" for e in errors)
)
# ── Module-level singleton ────────────────────────────────────────────────────
_detector: ThreeStrikeStore | None = None
def get_detector() -> ThreeStrikeStore:
"""Return the module-level :class:`ThreeStrikeStore`, creating it once."""
global _detector
if _detector is None:
_detector = ThreeStrikeStore()
return _detector

View File

@@ -0,0 +1,332 @@
"""Tests for the three-strike detector.
Refs: #962
"""
import pytest
from timmy.sovereignty.three_strike import (
CATEGORIES,
STRIKE_BLOCK,
STRIKE_WARNING,
FalseworkChecklist,
StrikeRecord,
ThreeStrikeError,
ThreeStrikeStore,
falsework_check,
)
@pytest.fixture
def store(tmp_path):
"""Isolated store backed by a temp DB."""
return ThreeStrikeStore(db_path=tmp_path / "test_strikes.db")
# ── Category constants ────────────────────────────────────────────────────────
class TestCategories:
@pytest.mark.unit
def test_all_categories_present(self):
expected = {
"vlm_prompt_edit",
"game_bug_review",
"parameter_tuning",
"portal_adapter_creation",
"deployment_step",
}
assert expected == CATEGORIES
@pytest.mark.unit
def test_strike_thresholds(self):
assert STRIKE_WARNING == 2
assert STRIKE_BLOCK == 3
# ── ThreeStrikeStore ──────────────────────────────────────────────────────────
class TestThreeStrikeStore:
@pytest.mark.unit
def test_first_strike_returns_record(self, store):
record = store.record("vlm_prompt_edit", "login_button")
assert isinstance(record, StrikeRecord)
assert record.count == 1
assert record.blocked is False
assert record.category == "vlm_prompt_edit"
assert record.key == "login_button"
@pytest.mark.unit
def test_second_strike_count(self, store):
store.record("vlm_prompt_edit", "login_button")
record = store.record("vlm_prompt_edit", "login_button")
assert record.count == 2
assert record.blocked is False
@pytest.mark.unit
def test_third_strike_raises(self, store):
store.record("vlm_prompt_edit", "login_button")
store.record("vlm_prompt_edit", "login_button")
with pytest.raises(ThreeStrikeError) as exc_info:
store.record("vlm_prompt_edit", "login_button")
err = exc_info.value
assert err.category == "vlm_prompt_edit"
assert err.key == "login_button"
assert err.count == 3
@pytest.mark.unit
def test_fourth_strike_still_raises(self, store):
for _ in range(3):
try:
store.record("deployment_step", "build_docker")
except ThreeStrikeError:
pass
with pytest.raises(ThreeStrikeError):
store.record("deployment_step", "build_docker")
@pytest.mark.unit
def test_different_keys_are_independent(self, store):
store.record("vlm_prompt_edit", "login_button")
store.record("vlm_prompt_edit", "login_button")
# Different key — should not be blocked
record = store.record("vlm_prompt_edit", "logout_button")
assert record.count == 1
@pytest.mark.unit
def test_different_categories_are_independent(self, store):
store.record("vlm_prompt_edit", "foo")
store.record("vlm_prompt_edit", "foo")
# Different category, same key — should not be blocked
record = store.record("game_bug_review", "foo")
assert record.count == 1
@pytest.mark.unit
def test_invalid_category_raises_value_error(self, store):
with pytest.raises(ValueError, match="Unknown category"):
store.record("nonexistent_category", "some_key")
@pytest.mark.unit
def test_metadata_stored_in_events(self, store):
store.record("parameter_tuning", "learning_rate", metadata={"value": 0.01})
events = store.get_events("parameter_tuning", "learning_rate")
assert len(events) == 1
assert events[0]["metadata"]["value"] == 0.01
@pytest.mark.unit
def test_get_returns_none_for_missing(self, store):
assert store.get("vlm_prompt_edit", "not_there") is None
@pytest.mark.unit
def test_get_returns_record(self, store):
store.record("vlm_prompt_edit", "submit_btn")
record = store.get("vlm_prompt_edit", "submit_btn")
assert record is not None
assert record.count == 1
@pytest.mark.unit
def test_list_all_empty(self, store):
assert store.list_all() == []
@pytest.mark.unit
def test_list_all_returns_records(self, store):
store.record("vlm_prompt_edit", "a")
store.record("vlm_prompt_edit", "b")
records = store.list_all()
assert len(records) == 2
@pytest.mark.unit
def test_list_blocked_empty_when_no_strikes(self, store):
assert store.list_blocked() == []
@pytest.mark.unit
def test_list_blocked_contains_blocked(self, store):
for _ in range(3):
try:
store.record("deployment_step", "push_image")
except ThreeStrikeError:
pass
blocked = store.list_blocked()
assert len(blocked) == 1
assert blocked[0].key == "push_image"
@pytest.mark.unit
def test_register_automation_unblocks(self, store):
for _ in range(3):
try:
store.record("deployment_step", "push_image")
except ThreeStrikeError:
pass
store.register_automation("deployment_step", "push_image", "scripts/push.sh")
# Should no longer raise
record = store.record("deployment_step", "push_image")
assert record.blocked is False
assert record.automation == "scripts/push.sh"
@pytest.mark.unit
def test_register_automation_resets_count(self, store):
for _ in range(3):
try:
store.record("deployment_step", "push_image")
except ThreeStrikeError:
pass
store.register_automation("deployment_step", "push_image", "scripts/push.sh")
# register_automation resets count to 0; one new record brings it to 1
new_record = store.record("deployment_step", "push_image")
assert new_record.count == 1
@pytest.mark.unit
def test_get_events_returns_most_recent_first(self, store):
store.record("vlm_prompt_edit", "nav", metadata={"n": 1})
store.record("vlm_prompt_edit", "nav", metadata={"n": 2})
events = store.get_events("vlm_prompt_edit", "nav")
assert len(events) == 2
# Most recent first
assert events[0]["metadata"]["n"] == 2
@pytest.mark.unit
def test_get_events_respects_limit(self, store):
for i in range(5):
try:
store.record("vlm_prompt_edit", "el")
except ThreeStrikeError:
pass
events = store.get_events("vlm_prompt_edit", "el", limit=2)
assert len(events) == 2
# ── FalseworkChecklist ────────────────────────────────────────────────────────
class TestFalseworkChecklist:
@pytest.mark.unit
def test_valid_checklist_passes(self):
cl = FalseworkChecklist(
durable_artifact="embedding vectors",
artifact_storage_path="data/embeddings.json",
local_rule_or_cache="vlm_cache",
will_repeat=False,
sovereignty_delta="eliminates repeated call",
)
assert cl.passed is True
assert cl.validate() == []
@pytest.mark.unit
def test_missing_artifact_fails(self):
cl = FalseworkChecklist(
artifact_storage_path="data/x.json",
local_rule_or_cache="cache",
will_repeat=False,
sovereignty_delta="delta",
)
errors = cl.validate()
assert any("Q1" in e for e in errors)
@pytest.mark.unit
def test_missing_storage_path_fails(self):
cl = FalseworkChecklist(
durable_artifact="artifact",
local_rule_or_cache="cache",
will_repeat=False,
sovereignty_delta="delta",
)
errors = cl.validate()
assert any("Q2" in e for e in errors)
@pytest.mark.unit
def test_will_repeat_none_fails(self):
cl = FalseworkChecklist(
durable_artifact="artifact",
artifact_storage_path="path",
local_rule_or_cache="cache",
sovereignty_delta="delta",
)
errors = cl.validate()
assert any("Q4" in e for e in errors)
@pytest.mark.unit
def test_will_repeat_true_requires_elimination_strategy(self):
cl = FalseworkChecklist(
durable_artifact="artifact",
artifact_storage_path="path",
local_rule_or_cache="cache",
will_repeat=True,
sovereignty_delta="delta",
)
errors = cl.validate()
assert any("Q5" in e for e in errors)
@pytest.mark.unit
def test_will_repeat_false_no_elimination_needed(self):
cl = FalseworkChecklist(
durable_artifact="artifact",
artifact_storage_path="path",
local_rule_or_cache="cache",
will_repeat=False,
sovereignty_delta="delta",
)
errors = cl.validate()
assert not any("Q5" in e for e in errors)
@pytest.mark.unit
def test_missing_sovereignty_delta_fails(self):
cl = FalseworkChecklist(
durable_artifact="artifact",
artifact_storage_path="path",
local_rule_or_cache="cache",
will_repeat=False,
)
errors = cl.validate()
assert any("Q6" in e for e in errors)
@pytest.mark.unit
def test_multiple_missing_fields(self):
cl = FalseworkChecklist()
errors = cl.validate()
# At minimum Q1, Q2, Q3, Q4, Q6 should be flagged
assert len(errors) >= 5
# ── falsework_check() helper ──────────────────────────────────────────────────
class TestFalseworkCheck:
@pytest.mark.unit
def test_raises_on_incomplete_checklist(self):
with pytest.raises(ValueError, match="Falsework Checklist incomplete"):
falsework_check(FalseworkChecklist())
@pytest.mark.unit
def test_passes_on_complete_checklist(self):
cl = FalseworkChecklist(
durable_artifact="artifact",
artifact_storage_path="path",
local_rule_or_cache="cache",
will_repeat=False,
sovereignty_delta="delta",
)
falsework_check(cl) # should not raise
# ── ThreeStrikeError ──────────────────────────────────────────────────────────
class TestThreeStrikeError:
@pytest.mark.unit
def test_attributes(self):
err = ThreeStrikeError("vlm_prompt_edit", "foo", 3)
assert err.category == "vlm_prompt_edit"
assert err.key == "foo"
assert err.count == 3
@pytest.mark.unit
def test_message_contains_details(self):
err = ThreeStrikeError("deployment_step", "build", 4)
msg = str(err)
assert "deployment_step" in msg
assert "build" in msg
assert "4" in msg

View File

@@ -0,0 +1,82 @@
"""Integration tests for the three-strike dashboard routes.
Refs: #962
"""
import pytest
class TestThreeStrikeRoutes:
@pytest.mark.unit
def test_list_strikes_returns_200(self, client):
response = client.get("/sovereignty/three-strike")
assert response.status_code == 200
data = response.json()
assert "records" in data
assert "categories" in data
@pytest.mark.unit
def test_list_blocked_returns_200(self, client):
response = client.get("/sovereignty/three-strike/blocked")
assert response.status_code == 200
data = response.json()
assert "blocked" in data
@pytest.mark.unit
def test_record_strike_first(self, client):
response = client.post(
"/sovereignty/three-strike/record",
json={"category": "vlm_prompt_edit", "key": "test_btn"},
)
assert response.status_code == 200
data = response.json()
assert data["count"] == 1
assert data["blocked"] is False
@pytest.mark.unit
def test_record_invalid_category_returns_422(self, client):
response = client.post(
"/sovereignty/three-strike/record",
json={"category": "not_a_real_category", "key": "x"},
)
assert response.status_code == 422
@pytest.mark.unit
def test_third_strike_returns_409(self, client):
for _ in range(2):
client.post(
"/sovereignty/three-strike/record",
json={"category": "deployment_step", "key": "push_route_test"},
)
response = client.post(
"/sovereignty/three-strike/record",
json={"category": "deployment_step", "key": "push_route_test"},
)
assert response.status_code == 409
data = response.json()
assert data["detail"]["error"] == "three_strike_block"
assert data["detail"]["count"] == 3
@pytest.mark.unit
def test_register_automation_returns_success(self, client):
response = client.post(
"/sovereignty/three-strike/deployment_step/some_key/automation",
json={"artifact_path": "scripts/auto.sh"},
)
assert response.status_code == 200
assert response.json()["success"] is True
@pytest.mark.unit
def test_get_events_returns_200(self, client):
client.post(
"/sovereignty/three-strike/record",
json={"category": "vlm_prompt_edit", "key": "events_test_key"},
)
response = client.get(
"/sovereignty/three-strike/vlm_prompt_edit/events_test_key/events"
)
assert response.status_code == 200
data = response.json()
assert data["category"] == "vlm_prompt_edit"
assert data["key"] == "events_test_key"
assert len(data["events"]) >= 1