Fix Codex stream fallback for Responses completion gaps
This commit is contained in:
42
run_agent.py
42
run_agent.py
@@ -1530,11 +1530,49 @@ class AIAgent:
|
|||||||
continue
|
continue
|
||||||
if missing_completed:
|
if missing_completed:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Responses stream did not emit response.completed; falling back to non-stream create."
|
"Responses stream did not emit response.completed; falling back to create(stream=True)."
|
||||||
)
|
)
|
||||||
return self.client.responses.create(**api_kwargs)
|
return self._run_codex_create_stream_fallback(api_kwargs)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def _run_codex_create_stream_fallback(self, api_kwargs: dict):
|
||||||
|
"""Fallback path for stream completion edge cases on Codex-style Responses backends."""
|
||||||
|
fallback_kwargs = dict(api_kwargs)
|
||||||
|
fallback_kwargs["stream"] = True
|
||||||
|
stream_or_response = self.client.responses.create(**fallback_kwargs)
|
||||||
|
|
||||||
|
# Compatibility shim for mocks or providers that still return a concrete response.
|
||||||
|
if hasattr(stream_or_response, "output"):
|
||||||
|
return stream_or_response
|
||||||
|
if not hasattr(stream_or_response, "__iter__"):
|
||||||
|
return stream_or_response
|
||||||
|
|
||||||
|
terminal_response = None
|
||||||
|
try:
|
||||||
|
for event in stream_or_response:
|
||||||
|
event_type = getattr(event, "type", None)
|
||||||
|
if not event_type and isinstance(event, dict):
|
||||||
|
event_type = event.get("type")
|
||||||
|
if event_type not in {"response.completed", "response.incomplete", "response.failed"}:
|
||||||
|
continue
|
||||||
|
|
||||||
|
terminal_response = getattr(event, "response", None)
|
||||||
|
if terminal_response is None and isinstance(event, dict):
|
||||||
|
terminal_response = event.get("response")
|
||||||
|
if terminal_response is not None:
|
||||||
|
return terminal_response
|
||||||
|
finally:
|
||||||
|
close_fn = getattr(stream_or_response, "close", None)
|
||||||
|
if callable(close_fn):
|
||||||
|
try:
|
||||||
|
close_fn()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if terminal_response is not None:
|
||||||
|
return terminal_response
|
||||||
|
raise RuntimeError("Responses create(stream=True) fallback did not emit a terminal response.")
|
||||||
|
|
||||||
def _interruptible_api_call(self, api_kwargs: dict):
|
def _interruptible_api_call(self, api_kwargs: dict):
|
||||||
"""
|
"""
|
||||||
Run the API call in a background thread so the main conversation loop
|
Run the API call in a background thread so the main conversation loop
|
||||||
|
|||||||
@@ -144,6 +144,18 @@ class _FakeResponsesStream:
|
|||||||
return self._final_response
|
return self._final_response
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeCreateStream:
|
||||||
|
def __init__(self, events):
|
||||||
|
self._events = list(events)
|
||||||
|
self.closed = False
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return iter(self._events)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.closed = True
|
||||||
|
|
||||||
|
|
||||||
def test_api_mode_uses_explicit_provider_when_codex(monkeypatch):
|
def test_api_mode_uses_explicit_provider_when_codex(monkeypatch):
|
||||||
_patch_agent_bootstrap(monkeypatch)
|
_patch_agent_bootstrap(monkeypatch)
|
||||||
agent = run_agent.AIAgent(
|
agent = run_agent.AIAgent(
|
||||||
@@ -263,6 +275,42 @@ def test_run_codex_stream_falls_back_to_create_after_stream_completion_error(mon
|
|||||||
assert response.output[0].content[0].text == "create fallback ok"
|
assert response.output[0].content[0].text == "create fallback ok"
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_codex_stream_fallback_parses_create_stream_events(monkeypatch):
|
||||||
|
agent = _build_agent(monkeypatch)
|
||||||
|
calls = {"stream": 0, "create": 0}
|
||||||
|
create_stream = _FakeCreateStream(
|
||||||
|
[
|
||||||
|
SimpleNamespace(type="response.created"),
|
||||||
|
SimpleNamespace(type="response.in_progress"),
|
||||||
|
SimpleNamespace(type="response.completed", response=_codex_message_response("streamed create ok")),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
def _fake_stream(**kwargs):
|
||||||
|
calls["stream"] += 1
|
||||||
|
return _FakeResponsesStream(
|
||||||
|
final_error=RuntimeError("Didn't receive a `response.completed` event.")
|
||||||
|
)
|
||||||
|
|
||||||
|
def _fake_create(**kwargs):
|
||||||
|
calls["create"] += 1
|
||||||
|
assert kwargs.get("stream") is True
|
||||||
|
return create_stream
|
||||||
|
|
||||||
|
agent.client = SimpleNamespace(
|
||||||
|
responses=SimpleNamespace(
|
||||||
|
stream=_fake_stream,
|
||||||
|
create=_fake_create,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
response = agent._run_codex_stream({"model": "gpt-5-codex"})
|
||||||
|
assert calls["stream"] == 2
|
||||||
|
assert calls["create"] == 1
|
||||||
|
assert create_stream.closed is True
|
||||||
|
assert response.output[0].content[0].text == "streamed create ok"
|
||||||
|
|
||||||
|
|
||||||
def test_run_conversation_codex_plain_text(monkeypatch):
|
def test_run_conversation_codex_plain_text(monkeypatch):
|
||||||
agent = _build_agent(monkeypatch)
|
agent = _build_agent(monkeypatch)
|
||||||
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: _codex_message_response("OK"))
|
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: _codex_message_response("OK"))
|
||||||
|
|||||||
Reference in New Issue
Block a user