Skip to content

SSE 流式输出

🎯 学习目标

  • 理解 SSE(Server-Sent Events)在 LLM API 中的事件格式
  • 使用 OpenAI SDK 与 fetch 消费 stream: true 响应
  • 在前端用 ReadableStream 逐段渲染文本
  • 用 AbortController 实现用户取消与超时控制

引言

非流式 API 要等模型 整段生成完毕 才返回 JSON,用户面对长时间空白。流式输出(Streaming) 每生成若干 Token 就推送一块数据,显著降低 感知延迟——首字更快出现,体验接近「打字」。OpenAI 兼容 API 的 stream 模式常用 SSE(Server-Sent Events):HTTP 长连接上连续发送 data: {...}\n\n 行,最后一行 data: [DONE]

本节先在 Node 用 SDK 消费流,再用 原生 fetch + ReadableStream 解析 SSE(理解原理),最后给出 Express 转发流AbortController 取消模式。注意:流式改善体验,不减少总生成 Token 数;计费仍按完整输出计算。

章节正文

第 1 步:SDK 流式调用:async iterator

javascript
const stream = await client.chat.completions.create({
  model: 'gpt-4o-mini',
  messages: [{ role: 'user', content: '用五句话介绍 SSE' }],
  stream: true,
})

for await (const chunk of stream) {
  const delta = chunk.choices[0]?.delta?.content ?? ''
  process.stdout.write(delta)
}
console.log('\n--- done ---')

每个 chunk增量 delta,不是全文。拼接 delta 得完整回复。也可能收到 finish_reasonusage(部分厂商在最后一 chunk 或单独 event 提供)。

Python

python
stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "用五句话介绍 SSE"}],
    stream=True,
)
for chunk in stream:
    delta = chunk.choices[0].delta.content or ""
    print(delta, end="", flush=True)

开发调试时可 console.log(JSON.stringify(chunk)) 看清字段结构。

第 2 步:fetch + ReadableStream 手动解析 SSE

理解底层便于对接非 SDK 环境或自定义网关:

javascript
const res = await fetch('https://api.openai.com/v1/chat/completions', {
  method: 'POST',
  headers: {
    Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    model: 'gpt-4o-mini',
    messages: [{ role: 'user', content: '数到 5' }],
    stream: true,
  }),
})

if (!res.ok) throw new Error(await res.text())

const reader = res.body.getReader()
const decoder = new TextDecoder()
let buffer = ''

while (true) {
  const { done, value } = await reader.read()
  if (done) break
  buffer += decoder.decode(value, { stream: true })

  const lines = buffer.split('\n')
  buffer = lines.pop() ?? ''

  for (const line of lines) {
    if (!line.startsWith('data: ')) continue
    const payload = line.slice(6).trim()
    if (payload === '[DONE]') continue
    const json = JSON.parse(payload)
    const text = json.choices?.[0]?.delta?.content ?? ''
    process.stdout.write(text)
  }
}

要点:SSE 可能半包到达,需 buffer 按行切;空行分隔事件;跳过 event: 行若存在。

第 3 步:Express 转发 SSE 到浏览器

后端代理保护 API Key,前端连自家域名:

javascript
import express from 'express'
import OpenAI from 'openai'

const app = express()
app.use(express.json())
const client = new OpenAI()

app.post('/api/chat/stream', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream')
  res.setHeader('Cache-Control', 'no-cache')
  res.setHeader('Connection', 'keep-alive')
  res.flushHeaders?.()

  try {
    const stream = await client.chat.completions.create({
      model: 'gpt-4o-mini',
      messages: req.body.messages,
      stream: true,
    })

    for await (const chunk of stream) {
      const delta = chunk.choices[0]?.delta?.content ?? ''
      if (delta) {
        res.write(`data: ${JSON.stringify({ text: delta })}\n\n`)
      }
    }
    res.write('data: [DONE]\n\n')
    res.end()
  } catch (e) {
    res.write(`data: ${JSON.stringify({ error: e.message })}\n\n`)
    res.end()
  }
})

app.listen(3000)

Nginx 反向代理需 proxy_buffering off;X-Accel-Buffering: no,否则 SSE 被缓冲、前端一次性才收到。

第 4 步:AbortController:用户点击「停止」

浏览器端:

javascript
const controller = new AbortController()

fetch('/api/chat/stream', {
  method: 'POST',
  body: JSON.stringify({ messages }),
  signal: controller.signal,
})
  .then(async (res) => {
    const reader = res.body.getReader()
    const decoder = new TextDecoder()
    while (true) {
      const { done, value } = await reader.read()
      if (done) break
      appendUI(decoder.decode(value))
    }
  })
  .catch((e) => {
    if (e.name === 'AbortError') console.log('用户取消')
    else console.error(e)
  })

document.getElementById('stop').onclick = () => controller.abort()

Node OpenAI SDK 传 signal

javascript
const ac = new AbortController()
setTimeout(() => ac.abort(), 30_000) // 30s 超时

const stream = await client.chat.completions.create(
  { model: 'gpt-4o-mini', messages, stream: true },
  { signal: ac.signal },
)

服务端应在客户端断开后 abort 上游 fetch,释放连接与 GPU 算力(监听 req.on('close'))。

第 5 步:错误处理与并发流

错误分阶段

  1. HTTP 4xx/5xx(未建立流):读 response.text() 得 JSON error
  2. 流中途错误:部分厂商发 data: {"error":...} event;需解析并展示
  3. 网络闪断:前端提示重试,可保留已生成 partial 内容

并发:每请求独立 ReadableStream,勿共享 reader。UI 列表多会话时 sessionId → AbortController Map 管理取消。

与 JSON 模式:结构化输出有时不支持 stream 或 stream 完再校验;见 2.6。

测试 curl

bash
curl -N https://api.openai.com/v1/chat/completions \
  -H "Authorization: Bearer $OPENAI_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"model":"gpt-4o-mini","stream":true,"messages":[{"role":"user","content":"hi"}]}'

-N 禁用 curl 缓冲,应看到逐行 data: 输出。

动手练习

  1. 用 SDK 流式打印一篇短诗,统计首 Token 到达时间(Date.now 差值)。
  2. 实现 Express /api/chat/stream 并用 curl -N 验证;若经 Nginx,确认无缓冲。
  3. 在前端或 Node 脚本中 AbortController 中途 cancel,确认不再收到 delta。
  4. 模拟 SSE 半包:把两个 chunk 拼进 buffer 再解析,写单元测试断言文本正确拼接。

常见问题

Q:流式会更便宜吗?

通常按相同 output tokens 计费。收益在体验与可提前 cancel 省下的「未展示 tokens」——若厂商对 cancel 仍全量计费则节省有限,查定价说明。

Q:为什么前端一次性显示全部文字?

常见原因:代理缓冲、未 flush、用错了非流式 API、或前端等 stream 结束才更新 DOM。逐 delta 更新 innerText。

Q:WebSocket 和 SSE 选哪个?

OpenAI 兼容流式用 SSE 足够;双向高频信令或多模态二进制再考虑 WebSocket。

本节小结

LLM 流式通过 SSE 分块推送 delta,SDK 的 for-await 或 fetch+ReadableStream 均可消费。后端转发需关闭代理缓冲;AbortController 实现用户取消与超时。流式降低感知延迟,错误要区分连接前与流中途,并发每路独立 reader 与 signal。