Co-authored-by: Claude (Opus 4.6) <claude@hermes.local> Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
248 lines
8.1 KiB
Python
248 lines
8.1 KiB
Python
"""Self-correction event logger.
|
|
|
|
Records instances where the agent detected its own errors and the steps
|
|
it took to correct them. Used by the Self-Correction Dashboard to visualise
|
|
these events and surface recurring failure patterns.
|
|
|
|
Usage::
|
|
|
|
from infrastructure.self_correction import log_self_correction, get_corrections, get_patterns
|
|
|
|
log_self_correction(
|
|
source="agentic_loop",
|
|
original_intent="Execute step 3: deploy service",
|
|
detected_error="ConnectionRefusedError: port 8080 unavailable",
|
|
correction_strategy="Retry on alternate port 8081",
|
|
final_outcome="Success on retry",
|
|
task_id="abc123",
|
|
)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import sqlite3
|
|
import uuid
|
|
from collections.abc import Generator
|
|
from contextlib import closing, contextmanager
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Database
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_DB_PATH: Path | None = None
|
|
|
|
|
|
def _get_db_path() -> Path:
|
|
global _DB_PATH
|
|
if _DB_PATH is None:
|
|
from config import settings
|
|
|
|
_DB_PATH = Path(settings.repo_root) / "data" / "self_correction.db"
|
|
return _DB_PATH
|
|
|
|
|
|
@contextmanager
|
|
def _get_db() -> Generator[sqlite3.Connection, None, None]:
|
|
db_path = _get_db_path()
|
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with closing(sqlite3.connect(str(db_path))) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS self_correction_events (
|
|
id TEXT PRIMARY KEY,
|
|
source TEXT NOT NULL,
|
|
task_id TEXT DEFAULT '',
|
|
original_intent TEXT NOT NULL,
|
|
detected_error TEXT NOT NULL,
|
|
correction_strategy TEXT NOT NULL,
|
|
final_outcome TEXT NOT NULL,
|
|
outcome_status TEXT DEFAULT 'success',
|
|
error_type TEXT DEFAULT '',
|
|
created_at TEXT DEFAULT (datetime('now'))
|
|
)
|
|
""")
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_sc_created ON self_correction_events(created_at)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_sc_error_type ON self_correction_events(error_type)"
|
|
)
|
|
conn.commit()
|
|
yield conn
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Write
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def log_self_correction(
|
|
*,
|
|
source: str,
|
|
original_intent: str,
|
|
detected_error: str,
|
|
correction_strategy: str,
|
|
final_outcome: str,
|
|
task_id: str = "",
|
|
outcome_status: str = "success",
|
|
error_type: str = "",
|
|
) -> str:
|
|
"""Record a self-correction event and return its ID.
|
|
|
|
Args:
|
|
source: Module or component that triggered the correction.
|
|
original_intent: What the agent was trying to do.
|
|
detected_error: The error or problem that was detected.
|
|
correction_strategy: How the agent attempted to correct the error.
|
|
final_outcome: What the result of the correction attempt was.
|
|
task_id: Optional task/session ID for correlation.
|
|
outcome_status: 'success', 'partial', or 'failed'.
|
|
error_type: Short category label for pattern analysis (e.g.
|
|
'ConnectionError', 'TimeoutError').
|
|
|
|
Returns:
|
|
The ID of the newly created record.
|
|
"""
|
|
event_id = str(uuid.uuid4())
|
|
if not error_type:
|
|
# Derive a simple type from the first word of the detected error
|
|
error_type = detected_error.split(":")[0].strip()[:64]
|
|
|
|
try:
|
|
with _get_db() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO self_correction_events
|
|
(id, source, task_id, original_intent, detected_error,
|
|
correction_strategy, final_outcome, outcome_status, error_type)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
event_id,
|
|
source,
|
|
task_id,
|
|
original_intent[:2000],
|
|
detected_error[:2000],
|
|
correction_strategy[:2000],
|
|
final_outcome[:2000],
|
|
outcome_status,
|
|
error_type,
|
|
),
|
|
)
|
|
conn.commit()
|
|
logger.info(
|
|
"Self-correction logged [%s] source=%s error_type=%s status=%s",
|
|
event_id[:8],
|
|
source,
|
|
error_type,
|
|
outcome_status,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("Failed to log self-correction event: %s", exc)
|
|
|
|
return event_id
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Read
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def get_corrections(limit: int = 50) -> list[dict]:
|
|
"""Return the most recent self-correction events, newest first."""
|
|
try:
|
|
with _get_db() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT * FROM self_correction_events
|
|
ORDER BY created_at DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
except Exception as exc:
|
|
logger.warning("Failed to fetch self-correction events: %s", exc)
|
|
return []
|
|
|
|
|
|
def get_patterns(top_n: int = 10) -> list[dict]:
|
|
"""Return the most common recurring error types with counts.
|
|
|
|
Each entry has:
|
|
- error_type: category label
|
|
- count: total occurrences
|
|
- success_count: corrected successfully
|
|
- failed_count: correction also failed
|
|
- last_seen: ISO timestamp of most recent occurrence
|
|
"""
|
|
try:
|
|
with _get_db() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
error_type,
|
|
COUNT(*) AS count,
|
|
SUM(CASE WHEN outcome_status = 'success' THEN 1 ELSE 0 END) AS success_count,
|
|
SUM(CASE WHEN outcome_status = 'failed' THEN 1 ELSE 0 END) AS failed_count,
|
|
MAX(created_at) AS last_seen
|
|
FROM self_correction_events
|
|
GROUP BY error_type
|
|
ORDER BY count DESC
|
|
LIMIT ?
|
|
""",
|
|
(top_n,),
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
except Exception as exc:
|
|
logger.warning("Failed to fetch self-correction patterns: %s", exc)
|
|
return []
|
|
|
|
|
|
def get_stats() -> dict:
|
|
"""Return aggregate statistics for the summary panel."""
|
|
try:
|
|
with _get_db() as conn:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT
|
|
COUNT(*) AS total,
|
|
SUM(CASE WHEN outcome_status = 'success' THEN 1 ELSE 0 END) AS success_count,
|
|
SUM(CASE WHEN outcome_status = 'partial' THEN 1 ELSE 0 END) AS partial_count,
|
|
SUM(CASE WHEN outcome_status = 'failed' THEN 1 ELSE 0 END) AS failed_count,
|
|
COUNT(DISTINCT error_type) AS unique_error_types,
|
|
COUNT(DISTINCT source) AS sources
|
|
FROM self_correction_events
|
|
"""
|
|
).fetchone()
|
|
if row is None:
|
|
return _empty_stats()
|
|
d = dict(row)
|
|
total = d.get("total") or 0
|
|
if total:
|
|
d["success_rate"] = round((d.get("success_count") or 0) / total * 100)
|
|
else:
|
|
d["success_rate"] = 0
|
|
return d
|
|
except Exception as exc:
|
|
logger.warning("Failed to fetch self-correction stats: %s", exc)
|
|
return _empty_stats()
|
|
|
|
|
|
def _empty_stats() -> dict:
|
|
return {
|
|
"total": 0,
|
|
"success_count": 0,
|
|
"partial_count": 0,
|
|
"failed_count": 0,
|
|
"unique_error_types": 0,
|
|
"sources": 0,
|
|
"success_rate": 0,
|
|
}
|