Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
4d8e004b5f fix: extend JSON repair to remaining json.loads sites in run_agent.py
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Successful in 42s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Nix / nix (ubuntu-latest) (pull_request) Failing after 4s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 36s
Tests / test (pull_request) Failing after 1h13m6s
Tests / e2e (pull_request) Successful in 1m32s
Nix / nix (macos-latest) (pull_request) Has been cancelled
Adds `repair_and_load_json()` to utils.py using the `json_repair` library
as a fallback when `json.loads()` fails. Replaces 8 non-hot-path json.loads
sites identified in issue #809:

- L2250: trajectory/sanitization message content parsing
- L2500: tool_call dict reconstruction in trajectory conversion
- L2535: tool_content parsing (JSON-like strings in tool responses)
- L2888: session log file loading (with warning on unrecoverable parse)
- L3119: todo content parsing in message processing
- L5963: vision result_json parsing
- L6761: memory flush tool call argument parsing
- L8300: cache serialization tool call args normalization

Each site uses an appropriate default ({} for tool args, None/continue for
content parsing) and a context label for debug tracing.

Fixes #809
2026-04-15 22:56:39 -04:00
4 changed files with 89 additions and 362 deletions

View File

@@ -1,231 +0,0 @@
"""Session compaction with fact extraction.
Before compressing conversation context, extracts durable facts
(user preferences, corrections, project details) and saves them
to the fact store so they survive compression.
Usage:
from agent.session_compactor import extract_and_save_facts
facts = extract_and_save_facts(messages)
"""
from __future__ import annotations
import json
import logging
import re
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
@dataclass
class ExtractedFact:
"""A fact extracted from conversation."""
category: str # "user_pref", "correction", "project", "tool_quirk", "general"
entity: str # what the fact is about
content: str # the fact itself
confidence: float # 0.0-1.0
source_turn: int # which message turn it came from
timestamp: float = 0.0
# Patterns that indicate user preferences
_PREFERENCE_PATTERNS = [
(r"(?:I|we) (?:prefer|like|want|need) (.+?)(?:\.|$)", "preference"),
(r"(?:always|never) (?:use|do|run|deploy) (.+?)(?:\.|$)", "preference"),
(r"(?:my|our) (?:default|preferred|usual) (.+?) (?:is|are) (.+?)(?:\.|$)", "preference"),
(r"(?:make sure|ensure|remember) (?:to|that) (.+?)(?:\.|$)", "instruction"),
(r"(?:don'?t|do not) (?:ever|ever again) (.+?)(?:\.|$)", "constraint"),
]
# Patterns that indicate corrections
_CORRECTION_PATTERNS = [
(r"(?:actually|no[, ]|wait[, ]|correction[: ]|sorry[, ]) (.+)", "correction"),
(r"(?:I meant|what I meant was|the correct) (.+?)(?:\.|$)", "correction"),
(r"(?:it'?s|its) (?:not|shouldn'?t be|wrong) (.+?)(?:\.|$)", "correction"),
]
# Patterns that indicate project/tool facts
_PROJECT_PATTERNS = [
(r"(?:the |our )?(?:project|repo|codebase|code) (?:is|uses|needs|requires) (.+?)(?:\.|$)", "project"),
(r"(?:deploy|push|commit) (?:to|on) (.+?)(?:\.|$)", "project"),
(r"(?:this|that|the) (?:server|host|machine|VPS) (?:is|runs|has) (.+?)(?:\.|$)", "infrastructure"),
(r"(?:model|provider|engine) (?:is|should be|needs to be) (.+?)(?:\.|$)", "config"),
]
def extract_facts_from_messages(messages: List[Dict[str, Any]]) -> List[ExtractedFact]:
"""Extract durable facts from conversation messages.
Scans user messages for preferences, corrections, project facts,
and infrastructure details that should survive compression.
"""
facts = []
seen_contents = set()
for turn_idx, msg in enumerate(messages):
role = msg.get("role", "")
content = msg.get("content", "")
# Only scan user messages and assistant responses with corrections
if role not in ("user", "assistant"):
continue
if not content or not isinstance(content, str):
continue
if len(content) < 10:
continue
# Skip tool results and system messages
if role == "assistant" and msg.get("tool_calls"):
continue
extracted = _extract_from_text(content, turn_idx, role)
# Deduplicate by content
for fact in extracted:
key = f"{fact.category}:{fact.content[:100]}"
if key not in seen_contents:
seen_contents.add(key)
facts.append(fact)
return facts
def _extract_from_text(text: str, turn_idx: int, role: str) -> List[ExtractedFact]:
"""Extract facts from a single text block."""
facts = []
timestamp = time.time()
# Clean text for pattern matching
clean = text.strip()
# User preference patterns (from user messages)
if role == "user":
for pattern, subcategory in _PREFERENCE_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"user_pref.{subcategory}",
entity="user",
content=content[:200],
confidence=0.7,
source_turn=turn_idx,
timestamp=timestamp,
))
# Correction patterns (from user messages)
if role == "user":
for pattern, subcategory in _CORRECTION_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"correction.{subcategory}",
entity="user",
content=content[:200],
confidence=0.8,
source_turn=turn_idx,
timestamp=timestamp,
))
# Project/infrastructure patterns (from both user and assistant)
for pattern, subcategory in _PROJECT_PATTERNS:
for match in re.finditer(pattern, clean, re.IGNORECASE):
content = match.group(1).strip() if match.lastindex else match.group(0).strip()
if len(content) > 5:
facts.append(ExtractedFact(
category=f"project.{subcategory}",
entity=subcategory,
content=content[:200],
confidence=0.6,
source_turn=turn_idx,
timestamp=timestamp,
))
return facts
def save_facts_to_store(facts: List[ExtractedFact], fact_store_fn=None) -> int:
"""Save extracted facts to the fact store.
Args:
facts: List of extracted facts.
fact_store_fn: Optional callable(category, entity, content, trust).
If None, uses the holographic fact store if available.
Returns:
Number of facts saved.
"""
saved = 0
if fact_store_fn:
for fact in facts:
try:
fact_store_fn(
category=fact.category,
entity=fact.entity,
content=fact.content,
trust=fact.confidence,
)
saved += 1
except Exception as e:
logger.debug("Failed to save fact: %s", e)
else:
# Try holographic fact store
try:
from fact_store import fact_store as _fs
for fact in facts:
try:
_fs(
action="add",
content=fact.content,
category=fact.category,
tags=fact.entity,
trust_delta=fact.confidence - 0.5,
)
saved += 1
except Exception as e:
logger.debug("Failed to save fact via fact_store: %s", e)
except ImportError:
logger.debug("fact_store not available — facts not persisted")
return saved
def extract_and_save_facts(
messages: List[Dict[str, Any]],
fact_store_fn=None,
) -> Tuple[List[ExtractedFact], int]:
"""Extract facts from messages and save them.
Returns (extracted_facts, saved_count).
"""
facts = extract_facts_from_messages(messages)
if facts:
logger.info("Extracted %d facts from conversation", len(facts))
saved = save_facts_to_store(facts, fact_store_fn)
logger.info("Saved %d/%d facts to store", saved, len(facts))
else:
saved = 0
return facts, saved
def format_facts_summary(facts: List[ExtractedFact]) -> str:
"""Format extracted facts as a readable summary."""
if not facts:
return "No facts extracted."
by_category = {}
for f in facts:
by_category.setdefault(f.category, []).append(f)
lines = [f"Extracted {len(facts)} facts:", ""]
for cat, cat_facts in sorted(by_category.items()):
lines.append(f" {cat}:")
for f in cat_facts:
lines.append(f" - {f.content[:80]}")
return "\n".join(lines)

View File

@@ -106,7 +106,7 @@ from agent.trajectory import (
convert_scratchpad_to_think, has_incomplete_scratchpad,
save_trajectory as _save_trajectory_to_file,
)
from utils import atomic_json_write, env_var_enabled
from utils import atomic_json_write, env_var_enabled, repair_and_load_json
@@ -2246,9 +2246,8 @@ class AIAgent:
for msg in getattr(review_agent, "_session_messages", []):
if not isinstance(msg, dict) or msg.get("role") != "tool":
continue
try:
data = json.loads(msg.get("content", "{}"))
except (json.JSONDecodeError, TypeError):
data = repair_and_load_json(msg.get("content", "{}"), default=None, context="trajectory_content")
if data is None:
continue
if not data.get("success"):
continue
@@ -2496,13 +2495,13 @@ class AIAgent:
if not tool_call or not isinstance(tool_call, dict): continue
# Parse arguments - should always succeed since we validate during conversation
# but keep try-except as safety net
try:
arguments = json.loads(tool_call["function"]["arguments"]) if isinstance(tool_call["function"]["arguments"], str) else tool_call["function"]["arguments"]
except json.JSONDecodeError:
# This shouldn't happen since we validate and retry during conversation,
# but if it does, log warning and use empty dict
logging.warning(f"Unexpected invalid JSON in trajectory conversion: {tool_call['function']['arguments'][:100]}")
arguments = {}
raw_args = tool_call["function"]["arguments"]
if isinstance(raw_args, str):
arguments = repair_and_load_json(raw_args, default={}, context="trajectory_tool_call")
if arguments == {} and raw_args.strip() not in ("{}", ""):
logging.warning("Unexpected invalid JSON in trajectory conversion: %.100s", raw_args)
else:
arguments = raw_args
tool_call_json = {
"name": tool_call["function"]["name"],
@@ -2530,11 +2529,10 @@ class AIAgent:
# Try to parse tool content as JSON if it looks like JSON
tool_content = tool_msg["content"]
try:
if tool_content.strip().startswith(("{", "[")):
tool_content = json.loads(tool_content)
except (json.JSONDecodeError, AttributeError):
pass # Keep as string if not valid JSON
if isinstance(tool_content, str) and tool_content.strip().startswith(("{", "[")):
parsed = repair_and_load_json(tool_content, default=None, context="trajectory_tool_content")
if parsed is not None:
tool_content = parsed
tool_index = len(tool_responses)
tool_name = (
@@ -2885,14 +2883,21 @@ class AIAgent:
# with partial history and would otherwise clobber the full JSON log.
if self.session_log_file.exists():
try:
existing = json.loads(self.session_log_file.read_text(encoding="utf-8"))
existing_count = existing.get("message_count", len(existing.get("messages", [])))
if existing_count > len(cleaned):
logging.debug(
"Skipping session log overwrite: existing has %d messages, current has %d",
existing_count, len(cleaned),
)
return
existing = repair_and_load_json(
self.session_log_file.read_text(encoding="utf-8"),
default=None,
context="session_log_load",
)
if existing is None:
logging.warning("Session log at %s could not be parsed; allowing overwrite", self.session_log_file)
else:
existing_count = existing.get("message_count", len(existing.get("messages", [])))
if existing_count > len(cleaned):
logging.debug(
"Skipping session log overwrite: existing has %d messages, current has %d",
existing_count, len(cleaned),
)
return
except Exception:
pass # corrupted existing file — allow the overwrite
@@ -3115,13 +3120,12 @@ class AIAgent:
# Quick check: todo responses contain "todos" key
if '"todos"' not in content:
continue
try:
data = json.loads(content)
if "todos" in data and isinstance(data["todos"], list):
last_todo_response = data["todos"]
break
except (json.JSONDecodeError, TypeError):
data = repair_and_load_json(content, default=None, context="todo_content")
if data is None:
continue
if "todos" in data and isinstance(data["todos"], list):
last_todo_response = data["todos"]
break
if last_todo_response:
# Replay the items into the store (replace mode)
@@ -5960,7 +5964,7 @@ class AIAgent:
result_json = asyncio.run(
vision_analyze_tool(image_url=vision_source, user_prompt=analysis_prompt)
)
result = json.loads(result_json) if isinstance(result_json, str) else {}
result = repair_and_load_json(result_json, default={}, context="vision_result") if isinstance(result_json, str) else {}
description = (result.get("analysis") or "").strip()
except Exception as e:
description = f"Image analysis failed: {e}"
@@ -6758,7 +6762,7 @@ class AIAgent:
for tc in tool_calls:
if tc.function.name == "memory":
try:
args = json.loads(tc.function.arguments)
args = repair_and_load_json(tc.function.arguments, default={}, context="memory_flush")
flush_target = args.get("target", "memory")
from tools.memory_tool import memory_tool as _memory_tool
_memory_tool(
@@ -8297,14 +8301,15 @@ class AIAgent:
for tc in tcs:
if isinstance(tc, dict) and "function" in tc:
try:
args_obj = json.loads(tc["function"]["arguments"])
tc = {**tc, "function": {
**tc["function"],
"arguments": json.dumps(
args_obj, separators=(",", ":"),
sort_keys=True,
),
}}
args_obj = repair_and_load_json(tc["function"]["arguments"], default=None, context="cache_serialization")
if args_obj is not None:
tc = {**tc, "function": {
**tc["function"],
"arguments": json.dumps(
args_obj, separators=(",", ":"),
sort_keys=True,
),
}}
except Exception:
pass
new_tcs.append(tc)

View File

@@ -1,91 +0,0 @@
"""Tests for session compaction with fact extraction."""
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from agent.session_compactor import (
ExtractedFact,
extract_facts_from_messages,
save_facts_to_store,
extract_and_save_facts,
format_facts_summary,
)
class TestFactExtraction:
def test_extract_preference(self):
messages = [
{"role": "user", "content": "I prefer Python over JavaScript for backend work."},
]
facts = extract_facts_from_messages(messages)
assert len(facts) >= 1
assert any("Python" in f.content for f in facts)
def test_extract_correction(self):
messages = [
{"role": "user", "content": "Actually the port is 8081 not 8080."},
]
facts = extract_facts_from_messages(messages)
assert len(facts) >= 1
assert any("8081" in f.content for f in facts)
def test_extract_project_fact(self):
messages = [
{"role": "user", "content": "The project uses Gitea for source control."},
]
facts = extract_facts_from_messages(messages)
assert len(facts) >= 1
def test_skip_tool_results(self):
messages = [
{"role": "assistant", "content": "Running command...", "tool_calls": [{"id": "1"}]},
{"role": "tool", "content": "output here"},
]
facts = extract_facts_from_messages(messages)
assert len(facts) == 0
def test_skip_short_messages(self):
messages = [
{"role": "user", "content": "ok"},
]
facts = extract_facts_from_messages(messages)
assert len(facts) == 0
def test_deduplication(self):
messages = [
{"role": "user", "content": "I prefer Python."},
{"role": "user", "content": "I prefer Python."},
]
facts = extract_facts_from_messages(messages)
# Should deduplicate
python_facts = [f for f in facts if "Python" in f.content]
assert len(python_facts) == 1
class TestSaveFacts:
def test_save_with_callback(self):
saved = []
def mock_save(category, entity, content, trust):
saved.append({"category": category, "content": content})
facts = [ExtractedFact("user_pref", "user", "likes dark mode", 0.8, 0)]
count = save_facts_to_store(facts, fact_store_fn=mock_save)
assert count == 1
assert len(saved) == 1
class TestFormatSummary:
def test_empty(self):
assert "No facts" in format_facts_summary([])
def test_with_facts(self):
facts = [
ExtractedFact("user_pref", "user", "likes dark mode", 0.8, 0),
ExtractedFact("correction", "user", "port is 8081", 0.9, 1),
]
summary = format_facts_summary(facts)
assert "2 facts" in summary
assert "user_pref" in summary

View File

@@ -145,6 +145,50 @@ def safe_json_loads(text: str, default: Any = None) -> Any:
return default
def repair_and_load_json(text: str, default: Any = None, *, context: str = "") -> Any:
"""Parse JSON with automatic repair fallback.
Tries ``json.loads`` first. On failure, attempts to repair the string
using the ``json_repair`` library before falling back to *default*.
Logs a debug-level warning when repair is triggered so that callers can
observe silent-failure patterns without raising exceptions.
Args:
text: The JSON string to parse.
default: Value returned when both parse and repair fail.
context: Optional label included in the debug log (e.g. the call-site
name) to aid tracing.
Returns:
Parsed Python object, or *default* on unrecoverable failure.
"""
if not isinstance(text, str):
return default
try:
return json.loads(text)
except (json.JSONDecodeError, ValueError):
pass
try:
import json_repair # optional dependency
repaired = json_repair.repair_json(text, return_objects=True)
# json_repair returns "" when it cannot produce a valid structure.
# Guard against returning that sentinel as if it were a successful parse.
# Exception: if the original text was a JSON empty-string literal like `""`
# then "" is the correct parse result.
if repaired == "" and text.strip() not in ('""', "''"):
tag = f" [{context}]" if context else ""
logger.debug("repair_and_load_json%s: repair yielded empty string; returning default", tag)
return default
tag = f" [{context}]" if context else ""
logger.debug("repair_and_load_json%s: repaired malformed JSON (first 120 chars): %.120s", tag, text)
return repaired
except Exception as exc:
tag = f" [{context}]" if context else ""
logger.debug("repair_and_load_json%s: repair failed (%s); returning default", tag, exc)
return default
# ─── Environment Variable Helpers ─────────────────────────────────────────────