From 053178e5089679c6ed992ad352d5d4bc8e68ba10 Mon Sep 17 00:00:00 2001 From: Camilo Aguilar Date: Fri, 31 Oct 2025 09:25:37 -0400 Subject: [PATCH] fix(streaming): skip SSE events with no data to prevent JSONDecodeError MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes an issue where SSE events containing only meta-fields (retry, id, event) but no data field would cause JSONDecodeError when the SDK tried to parse empty strings as JSON. Per the SSE specification (WHATWG HTML § 9.2), retry directives and other meta-only events are valid and should not be treated as data events: retry: 3000 data: {"actual":"content"} The SDK's SSE decoder correctly parses these events, setting the retry field but leaving data empty. However, the Stream.__stream__() and AsyncStream.__stream__() methods were attempting to call .json() on all events, including those with empty data, leading to JSONDecodeError. Changes: - Added check in Stream.__stream__() to skip events with empty/whitespace data - Added check in AsyncStream.__stream__() to skip events with empty/whitespace data - Added test cases for retry directives and meta-only events The fix ensures that only events with actual data are parsed as JSON, preventing the error: "json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)" that occurred when trying to parse empty strings. Example error this fixes: File "openai/_streaming.py", line 82, in __stream__ data = sse.json() File "openai/_streaming.py", line 259, in json return json.loads(self.data) json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) --- src/openai/_streaming.py | 10 ++++++ tests/test_streaming.py | 70 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index f586de74ff..649bfbabfc 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -59,6 +59,11 @@ def __stream__(self) -> Iterator[_T]: if sse.data.startswith("[DONE]"): break + # Skip events with no data (e.g., standalone retry/id directives) + # Per SSE spec, these are valid meta-only events that shouldn't be parsed as JSON + if not sse.data or sse.data.strip() == "": + continue + # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data if sse.event and sse.event.startswith("thread."): data = sse.json() @@ -161,6 +166,11 @@ async def __stream__(self) -> AsyncIterator[_T]: if sse.data.startswith("[DONE]"): break + # Skip events with no data (e.g., standalone retry/id directives) + # Per SSE spec, these are valid meta-only events that shouldn't be parsed as JSON + if not sse.data or sse.data.strip() == "": + continue + # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data if sse.event and sse.event.startswith("thread."): data = sse.json() diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 04f8e51abd..382c2097bc 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -216,6 +216,76 @@ def body() -> Iterator[bytes]: assert sse.json() == {"content": "известни"} +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_retry_directive_without_data(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: + """Test that standalone retry directives (with no data field) are skipped. + + This is a common pattern in SSE streams, especially from providers like Anthropic, + where a retry directive is sent at the beginning of the stream: + + retry: 3000 + + data: {"actual":"content"} + + The retry directive is a valid SSE meta-field but should not be parsed as JSON data. + """ + def body() -> Iterator[bytes]: + # Standalone retry directive (no data field) + yield b"retry: 3000\n" + yield b"\n" + # Actual data event + yield b'data: {"foo":true}\n' + yield b"\n" + + iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client) + + # First SSE event has retry but no data - should be yielded by decoder + sse = await iter_next(iterator) + assert sse.retry == 3000 + assert sse.data == "" + + # Second SSE event has actual data + sse = await iter_next(iterator) + assert sse.json() == {"foo": True} + + await assert_empty_iter(iterator) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) +async def test_retry_with_id_directive_without_data(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: + """Test that SSE events with only meta-fields (retry, id, event) and no data are skipped. + + Per the SSE spec, these are valid meta-only events that configure the event stream + but don't contain actual data to be processed. + """ + def body() -> Iterator[bytes]: + # Meta-only event with retry and id + yield b"id: msg_123\n" + yield b"retry: 5000\n" + yield b"\n" + # Actual data event + yield b"id: msg_124\n" + yield b'data: {"bar":false}\n' + yield b"\n" + + iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client) + + # First SSE event is meta-only + sse = await iter_next(iterator) + assert sse.id == "msg_123" + assert sse.retry == 5000 + assert sse.data == "" + + # Second SSE event has actual data + sse = await iter_next(iterator) + assert sse.id == "msg_124" + assert sse.json() == {"bar": False} + + await assert_empty_iter(iterator) + + async def to_aiter(iter: Iterator[bytes]) -> AsyncIterator[bytes]: for chunk in iter: yield chunk