Skip to content

Commit 5c5bbf1

Browse files
committed
fix(ssestream): skip events with empty data to prevent JSON unmarshal errors
Fix crash when parsing SSE streams that contain empty events from retry: directives or comment lines. ## Problem The eventStreamDecoder creates events with empty Data when it encounters empty lines after non-data SSE fields (like "retry: 3000"). Stream.Next() then attempts json.Unmarshal on empty bytes, causing "unexpected end of JSON input" error. This breaks streaming with any SSE server using the retry directive. ## Root Cause Per the SSE specification [1], events are dispatched when empty lines are encountered, regardless of whether data was present. The spec states for empty line handling: > "If the line is empty (a blank line) [Dispatch the event], as defined below." And for the retry field: > "If the field value consists of only ASCII digits, then interpret the field > value as an integer in base ten, and set the event stream's reconnection time > to that integer. Otherwise, ignore the field." For empty data handling: > "If the data buffer is an empty string, set the data buffer and the event > type buffer to the empty string and return." This means that a sequence like: ``` retry: 3000 ``` Creates a valid empty event according to the spec. Servers commonly send this for reconnection configuration, but the SDK assumed all events contain JSON data. [1] https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events ## Solution Check if event.Data is empty before attempting to unmarshal. Skip empty events and continue processing the stream. This maintains compatibility with OpenAI API while supporting standard SSE practices per spec. ## Tests Added - TestStream_EmptyEvents: Verifies handling of retry directive with empty event - TestStream_OnlyRetryDirective: Tests stream with only retry (no data) - TestStream_MultipleEmptyEvents: Tests multiple empty events interspersed with data All tests pass: ``` === RUN TestStream_EmptyEvents --- PASS: TestStream_EmptyEvents (0.00s) === RUN TestStream_OnlyRetryDirective --- PASS: TestStream_OnlyRetryDirective (0.00s) === RUN TestStream_MultipleEmptyEvents --- PASS: TestStream_MultipleEmptyEvents (0.00s) PASS ``` ## Impact - Enables compatibility with SSE servers using retry: directive (common practice) - No breaking changes - only adds resilience to spec-compliant edge case - Verified with streaming function calling through Anthropic API gateway ## Real-World Testing Tested with Anthropic Claude 3.5 streaming API via AI Gateway: - Before: "Stream error: unexpected end of JSON input" - After: Successfully receives and processes all streaming chunks Fixes stream crashes with "unexpected end of JSON input" when encountering SSE streams with retry directives or comment lines.
1 parent dae47f3 commit 5c5bbf1

File tree

2 files changed

+159
-0
lines changed

2 files changed

+159
-0
lines changed

packages/ssestream/ssestream.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,11 @@ func (s *Stream[T]) Next() bool {
163163
continue
164164
}
165165

166+
// Skip events with empty data (e.g., from SSE retry: or comment lines)
167+
if len(s.decoder.Event().Data) == 0 {
168+
continue
169+
}
170+
166171
var nxt T
167172

168173
if s.decoder.Event().Type == "" || !strings.HasPrefix(s.decoder.Event().Type, "thread.") {
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package ssestream
2+
3+
import (
4+
"bytes"
5+
"net/http"
6+
"testing"
7+
)
8+
9+
type mockReadCloser struct {
10+
*bytes.Reader
11+
}
12+
13+
func (m mockReadCloser) Close() error {
14+
return nil
15+
}
16+
17+
// TestStream_EmptyEvents tests that the stream correctly handles empty SSE events
18+
// (e.g., from retry: directives or comment lines) without crashing on JSON unmarshal
19+
func TestStream_EmptyEvents(t *testing.T) {
20+
// Simulate SSE stream with retry directive that creates empty event
21+
sseData := `retry: 3000
22+
23+
data: {"id":"msg_01ABC","type":"content_block_delta","delta":{"type":"text","text":"Hello"}}
24+
25+
data: [DONE]
26+
27+
`
28+
29+
resp := &http.Response{
30+
StatusCode: 200,
31+
Header: http.Header{"Content-Type": []string{"text/event-stream"}},
32+
Body: mockReadCloser{bytes.NewReader([]byte(sseData))},
33+
}
34+
35+
decoder := NewDecoder(resp)
36+
if decoder == nil {
37+
t.Fatal("Expected decoder to be created, got nil")
38+
}
39+
40+
type testMsg struct {
41+
ID string `json:"id"`
42+
Type string `json:"type"`
43+
Delta struct {
44+
Type string `json:"type"`
45+
Text string `json:"text"`
46+
} `json:"delta"`
47+
}
48+
49+
stream := NewStream[testMsg](decoder, nil)
50+
51+
// Should successfully iterate without crashing on empty event
52+
var receivedMessages int
53+
for stream.Next() {
54+
msg := stream.Current()
55+
receivedMessages++
56+
57+
if msg.ID != "msg_01ABC" {
58+
t.Errorf("Expected ID 'msg_01ABC', got '%s'", msg.ID)
59+
}
60+
if msg.Delta.Text != "Hello" {
61+
t.Errorf("Expected text 'Hello', got '%s'", msg.Delta.Text)
62+
}
63+
}
64+
65+
if err := stream.Err(); err != nil {
66+
t.Errorf("Expected no error, got: %v", err)
67+
}
68+
69+
if receivedMessages != 1 {
70+
t.Errorf("Expected 1 message, got %d", receivedMessages)
71+
}
72+
}
73+
74+
// TestStream_OnlyRetryDirective tests stream with only retry directive (no data events)
75+
func TestStream_OnlyRetryDirective(t *testing.T) {
76+
sseData := `retry: 3000
77+
78+
`
79+
80+
resp := &http.Response{
81+
StatusCode: 200,
82+
Header: http.Header{"Content-Type": []string{"text/event-stream"}},
83+
Body: mockReadCloser{bytes.NewReader([]byte(sseData))},
84+
}
85+
86+
decoder := NewDecoder(resp)
87+
type testMsg struct {
88+
ID string `json:"id"`
89+
}
90+
stream := NewStream[testMsg](decoder, nil)
91+
92+
// Should handle gracefully without any messages
93+
var count int
94+
for stream.Next() {
95+
count++
96+
}
97+
98+
if err := stream.Err(); err != nil {
99+
t.Errorf("Expected no error, got: %v", err)
100+
}
101+
102+
if count != 0 {
103+
t.Errorf("Expected 0 messages, got %d", count)
104+
}
105+
}
106+
107+
// TestStream_MultipleEmptyEvents tests handling of multiple empty events
108+
func TestStream_MultipleEmptyEvents(t *testing.T) {
109+
sseData := `retry: 3000
110+
111+
: comment line
112+
113+
data: {"id":"1","text":"first"}
114+
115+
retry: 5000
116+
117+
data: {"id":"2","text":"second"}
118+
119+
`
120+
121+
resp := &http.Response{
122+
StatusCode: 200,
123+
Header: http.Header{"Content-Type": []string{"text/event-stream"}},
124+
Body: mockReadCloser{bytes.NewReader([]byte(sseData))},
125+
}
126+
127+
decoder := NewDecoder(resp)
128+
type testMsg struct {
129+
ID string `json:"id"`
130+
Text string `json:"text"`
131+
}
132+
stream := NewStream[testMsg](decoder, nil)
133+
134+
messages := []testMsg{}
135+
for stream.Next() {
136+
messages = append(messages, stream.Current())
137+
}
138+
139+
if err := stream.Err(); err != nil {
140+
t.Errorf("Expected no error, got: %v", err)
141+
}
142+
143+
if len(messages) != 2 {
144+
t.Fatalf("Expected 2 messages, got %d", len(messages))
145+
}
146+
147+
if messages[0].ID != "1" || messages[0].Text != "first" {
148+
t.Errorf("First message incorrect: %+v", messages[0])
149+
}
150+
151+
if messages[1].ID != "2" || messages[1].Text != "second" {
152+
t.Errorf("Second message incorrect: %+v", messages[1])
153+
}
154+
}

0 commit comments

Comments
 (0)