Harden Codex stream handling and ack continuation

This commit is contained in:
George Pickett
2026-02-25 18:56:06 -08:00
parent 3ba8b15f13
commit e63986b534
2 changed files with 160 additions and 6 deletions

View File

@@ -508,6 +508,7 @@ class AIAgent:
action_markers = (
"look into",
"look at",
"inspect",
"scan",
"check",
@@ -526,11 +527,33 @@ class AIAgent:
"report back",
"summarize",
)
workspace_markers = (
"directory",
"current directory",
"current dir",
"cwd",
"repo",
"repository",
"codebase",
"project",
"folder",
"filesystem",
"file tree",
"files",
"path",
)
user_text = (user_message or "").strip().lower()
user_requests_action = any(marker in user_text for marker in action_markers) or "~/" in user_text or "/" in user_text
user_targets_workspace = (
any(marker in user_text for marker in workspace_markers)
or "~/" in user_text
or "/" in user_text
)
assistant_mentions_action = any(marker in assistant_text for marker in action_markers)
return user_requests_action and assistant_mentions_action
assistant_targets_workspace = any(
marker in assistant_text for marker in workspace_markers
)
return (user_targets_workspace or assistant_targets_workspace) and assistant_mentions_action
def _extract_reasoning(self, assistant_message) -> Optional[str]:
@@ -1499,10 +1522,29 @@ class AIAgent:
def _run_codex_stream(self, api_kwargs: dict):
"""Execute one streaming Responses API request and return the final response."""
with self.client.responses.stream(**api_kwargs) as stream:
for _ in stream:
pass
return stream.get_final_response()
max_stream_retries = 1
for attempt in range(max_stream_retries + 1):
try:
with self.client.responses.stream(**api_kwargs) as stream:
for _ in stream:
pass
return stream.get_final_response()
except RuntimeError as exc:
err_text = str(exc)
missing_completed = "response.completed" in err_text
if missing_completed and attempt < max_stream_retries:
logger.debug(
"Responses stream closed before completion (attempt %s/%s); retrying.",
attempt + 1,
max_stream_retries + 1,
)
continue
if missing_completed:
logger.debug(
"Responses stream did not emit response.completed; falling back to non-stream create."
)
return self.client.responses.create(**api_kwargs)
raise
def _interruptible_api_call(self, api_kwargs: dict):
"""

View File

@@ -124,6 +124,26 @@ def _codex_ack_message_response(text: str):
)
class _FakeResponsesStream:
def __init__(self, *, final_response=None, final_error=None):
self._final_response = final_response
self._final_error = final_error
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def __iter__(self):
return iter(())
def get_final_response(self):
if self._final_error is not None:
raise self._final_error
return self._final_response
def test_api_mode_uses_explicit_provider_when_codex(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
agent = run_agent.AIAgent(
@@ -192,6 +212,57 @@ def test_build_api_kwargs_codex(monkeypatch):
assert "function" not in kwargs["tools"][0]
def test_run_codex_stream_retries_when_completed_event_missing(monkeypatch):
agent = _build_agent(monkeypatch)
calls = {"stream": 0}
def _fake_stream(**kwargs):
calls["stream"] += 1
if calls["stream"] == 1:
return _FakeResponsesStream(
final_error=RuntimeError("Didn't receive a `response.completed` event.")
)
return _FakeResponsesStream(final_response=_codex_message_response("stream ok"))
agent.client = SimpleNamespace(
responses=SimpleNamespace(
stream=_fake_stream,
create=lambda **kwargs: _codex_message_response("fallback"),
)
)
response = agent._run_codex_stream({"model": "gpt-5-codex"})
assert calls["stream"] == 2
assert response.output[0].content[0].text == "stream ok"
def test_run_codex_stream_falls_back_to_create_after_stream_completion_error(monkeypatch):
agent = _build_agent(monkeypatch)
calls = {"stream": 0, "create": 0}
def _fake_stream(**kwargs):
calls["stream"] += 1
return _FakeResponsesStream(
final_error=RuntimeError("Didn't receive a `response.completed` event.")
)
def _fake_create(**kwargs):
calls["create"] += 1
return _codex_message_response("create fallback ok")
agent.client = SimpleNamespace(
responses=SimpleNamespace(
stream=_fake_stream,
create=_fake_create,
)
)
response = agent._run_codex_stream({"model": "gpt-5-codex"})
assert calls["stream"] == 2
assert calls["create"] == 1
assert response.output[0].content[0].text == "create fallback ok"
def test_run_conversation_codex_plain_text(monkeypatch):
agent = _build_agent(monkeypatch)
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: _codex_message_response("OK"))
@@ -439,3 +510,44 @@ def test_run_conversation_codex_continues_after_ack_stop_message(monkeypatch):
for msg in result["messages"]
)
assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
def test_run_conversation_codex_continues_after_ack_for_directory_listing_prompt(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [
_codex_ack_message_response(
"I'll check what's in the current directory and call out 3 notable items."
),
_codex_tool_call_response(),
_codex_message_response("Directory summary complete."),
]
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("look at current directory and list 3 notable things")
assert result["completed"] is True
assert result["final_response"] == "Directory summary complete."
assert any(
msg.get("role") == "assistant"
and msg.get("finish_reason") == "incomplete"
and "current directory" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(
msg.get("role") == "user"
and "Continue now. Execute the required tool calls" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])