Deeppin/ articles
Article · SSE

SSE Full Pipeline: Every Hop from Groq to Browser

The complete SSE transport chain — Groq streaming API, FastAPI async generators, Nginx proxy configuration, fetch ReadableStream, and state isolation under concurrent threads.

2026-04-1511 min readSSEstreamingarchitecture

Streaming output is not a single technical point — it's a complete pipeline. Any broken link destroys the streaming effect. Deeppin's SSE pipeline is optimized at every hop from Groq to the browser.

Groq APIstream=True · token by tokenLiteLLM Routerusage-based routing · auto fallback 429FastAPI generatorStreamingResponse · yield "data: {...}\n\n"Nginxproxy_buffering off ← criticalBrowser ReadableStreamfetch + POST · decoder.decode()Zustand storestreams[threadId].buffer += tokentoken appears on screen
Fig. 1·sse-pipeline

§ 01Part 1 — Groq streaming API

Groq is accessed via LiteLLM Router. LiteLLM's completion interface supports stream=True, returning an async iterator that yields one chunk at a time:

async for chunk in await router.acompletion(
    model="chat",
    messages=context,
    stream=True,
    max_tokens=2048,
):
    token = chunk.choices[0].delta.content
    if token:  # skip empty deltas (e.g. role delta)
        yield token
ichunk.choices[0].delta.content is None at stream end. Must explicitly filter, otherwise you'll send the string 'null' to the client.

§ 02Part 2 — FastAPI async generator

FastAPI's StreamingResponse accepts an async generator and writes each yielded value directly into the HTTP response body. SSE protocol requires format data: <content>\n\n:

async def stream_response(thread_id: str, message: str):
    context = await build_context(thread_id)
    tokens = []
    
    try:
        async for token in llm_stream(context):
            tokens.append(token)
            payload = json.dumps({"type": "token", "text": token})
            yield f"data: {payload}\n\n"
        
        await save_assistant_message(thread_id, "".join(tokens))
        yield f"data: {json.dumps({'type': 'done'})}\n\n"
    
    except asyncio.CancelledError:
        # User closed the tab or cancelled
        await save_partial_message(thread_id, "".join(tokens))
        return

return StreamingResponse(
    stream_response(thread_id, message),
    media_type="text/event-stream",
    headers={"X-Accel-Buffering": "no"},
)

§ 03Part 3 — Nginx: the most common mistake

Nginx's default proxy_buffering accumulates 8KB or 16KB before forwarding. This means the first 100 tokens get held back, then arrive in a burst — streaming effect completely destroyed.

location /api/ {
    proxy_pass http://localhost:8000;
    
    proxy_buffering off;
    proxy_cache off;
    proxy_read_timeout 300s;
    
    proxy_http_version 1.1;
    proxy_set_header Connection "";
    proxy_set_header X-Real-IP $remote_addr;
}

The X-Accel-Buffering: no response header lets Nginx disable buffering per-request without touching global config — useful when serving both static files (wants buffering) and SSE endpoints in the same server block.

§ 04Part 4 — Frontend: why fetch instead of EventSource

EventSource is the browser's built-in SSE client, but it only supports GET requests. Deeppin's chat endpoint is POST (message in the body), so EventSource won't work. Use fetch + ReadableStream instead:

const res = await fetch(`/api/threads/${threadId}/chat`, {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ message }),
  signal: abortRef.current.signal,
});

const reader = res.body!.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  
  const text = decoder.decode(value);
  for (const line of text.split("\n")) {
    if (!line.startsWith("data: ")) continue;
    const data = JSON.parse(line.slice(6));
    if (data.type === "token") appendToken(threadId, data.text);
    else if (data.type === "done") finishStream(threadId);
  }
}

§ 05Part 5 — Concurrent streaming: Zustand state isolation

One of Deeppin's core scenarios is sending messages in the main thread and multiple pins simultaneously, receiving concurrent streams. Each thread's stream state must be strictly isolated:

// useStreamStore.ts — keyed by threadId
appendToken: (threadId, token) => set(state => ({
  streams: {
    ...state.streams,
    [threadId]: {
      ...state.streams[threadId] ?? { isStreaming: true, error: null },
      buffer: (state.streams[threadId]?.buffer ?? "") + token,
    },
  },
}))

§ 06Part 6 — Cancellation

When the user closes a tab or navigates away, the frontend calls AbortController.abort(), cancelling the fetch request. The backend FastAPI receives a CancelledError. The handling strategy: save whatever has been generated as an incomplete assistant message (marked [interrupted]), so no generated content is lost.