SSE Full Pipeline: Every Hop from Groq to Browser
The complete SSE transport chain — Groq streaming API, FastAPI async generators, Nginx proxy configuration, fetch ReadableStream, and state isolation under concurrent threads.
Streaming output is not a single technical point — it's a complete pipeline. Any broken link destroys the streaming effect. Deeppin's SSE pipeline is optimized at every hop from Groq to the browser.
§ 01Part 1 — Groq streaming API
Groq is accessed via LiteLLM Router. LiteLLM's completion interface supports stream=True, returning an async iterator that yields one chunk at a time:
async for chunk in await router.acompletion(
model="chat",
messages=context,
stream=True,
max_tokens=2048,
):
token = chunk.choices[0].delta.content
if token: # skip empty deltas (e.g. role delta)
yield token§ 02Part 2 — FastAPI async generator
FastAPI's StreamingResponse accepts an async generator and writes each yielded value directly into the HTTP response body. SSE protocol requires format data: <content>\n\n:
async def stream_response(thread_id: str, message: str):
context = await build_context(thread_id)
tokens = []
try:
async for token in llm_stream(context):
tokens.append(token)
payload = json.dumps({"type": "token", "text": token})
yield f"data: {payload}\n\n"
await save_assistant_message(thread_id, "".join(tokens))
yield f"data: {json.dumps({'type': 'done'})}\n\n"
except asyncio.CancelledError:
# User closed the tab or cancelled
await save_partial_message(thread_id, "".join(tokens))
return
return StreamingResponse(
stream_response(thread_id, message),
media_type="text/event-stream",
headers={"X-Accel-Buffering": "no"},
)§ 03Part 3 — Nginx: the most common mistake
Nginx's default proxy_buffering accumulates 8KB or 16KB before forwarding. This means the first 100 tokens get held back, then arrive in a burst — streaming effect completely destroyed.
location /api/ {
proxy_pass http://localhost:8000;
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 300s;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header X-Real-IP $remote_addr;
}The X-Accel-Buffering: no response header lets Nginx disable buffering per-request without touching global config — useful when serving both static files (wants buffering) and SSE endpoints in the same server block.
§ 04Part 4 — Frontend: why fetch instead of EventSource
EventSource is the browser's built-in SSE client, but it only supports GET requests. Deeppin's chat endpoint is POST (message in the body), so EventSource won't work. Use fetch + ReadableStream instead:
const res = await fetch(`/api/threads/${threadId}/chat`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
signal: abortRef.current.signal,
});
const reader = res.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
for (const line of text.split("\n")) {
if (!line.startsWith("data: ")) continue;
const data = JSON.parse(line.slice(6));
if (data.type === "token") appendToken(threadId, data.text);
else if (data.type === "done") finishStream(threadId);
}
}§ 05Part 5 — Concurrent streaming: Zustand state isolation
One of Deeppin's core scenarios is sending messages in the main thread and multiple pins simultaneously, receiving concurrent streams. Each thread's stream state must be strictly isolated:
// useStreamStore.ts — keyed by threadId
appendToken: (threadId, token) => set(state => ({
streams: {
...state.streams,
[threadId]: {
...state.streams[threadId] ?? { isStreaming: true, error: null },
buffer: (state.streams[threadId]?.buffer ?? "") + token,
},
},
}))§ 06Part 6 — Cancellation
When the user closes a tab or navigates away, the frontend calls AbortController.abort(), cancelling the fetch request. The backend FastAPI receives a CancelledError. The handling strategy: save whatever has been generated as an incomplete assistant message (marked [interrupted]), so no generated content is lost.