SSE 流式输出
🎯 学习目标
- 理解 SSE(Server-Sent Events)在 LLM API 中的事件格式
- 使用 OpenAI SDK 与 fetch 消费 stream: true 响应
- 在前端用 ReadableStream 逐段渲染文本
- 用 AbortController 实现用户取消与超时控制
引言
非流式 API 要等模型 整段生成完毕 才返回 JSON,用户面对长时间空白。流式输出(Streaming) 每生成若干 Token 就推送一块数据,显著降低 感知延迟——首字更快出现,体验接近「打字」。OpenAI 兼容 API 的 stream 模式常用 SSE(Server-Sent Events):HTTP 长连接上连续发送 data: {...}\n\n 行,最后一行 data: [DONE]。
本节先在 Node 用 SDK 消费流,再用 原生 fetch + ReadableStream 解析 SSE(理解原理),最后给出 Express 转发流 与 AbortController 取消模式。注意:流式改善体验,不减少总生成 Token 数;计费仍按完整输出计算。
章节正文
第 1 步:SDK 流式调用:async iterator
const stream = await client.chat.completions.create({
model: 'gpt-4o-mini',
messages: [{ role: 'user', content: '用五句话介绍 SSE' }],
stream: true,
})
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta?.content ?? ''
process.stdout.write(delta)
}
console.log('\n--- done ---')每个 chunk 含 增量 delta,不是全文。拼接 delta 得完整回复。也可能收到 finish_reason、usage(部分厂商在最后一 chunk 或单独 event 提供)。
Python:
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "用五句话介绍 SSE"}],
stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta.content or ""
print(delta, end="", flush=True)开发调试时可 console.log(JSON.stringify(chunk)) 看清字段结构。
第 2 步:fetch + ReadableStream 手动解析 SSE
理解底层便于对接非 SDK 环境或自定义网关:
const res = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'gpt-4o-mini',
messages: [{ role: 'user', content: '数到 5' }],
stream: true,
}),
})
if (!res.ok) throw new Error(await res.text())
const reader = res.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() ?? ''
for (const line of lines) {
if (!line.startsWith('data: ')) continue
const payload = line.slice(6).trim()
if (payload === '[DONE]') continue
const json = JSON.parse(payload)
const text = json.choices?.[0]?.delta?.content ?? ''
process.stdout.write(text)
}
}要点:SSE 可能半包到达,需 buffer 按行切;空行分隔事件;跳过 event: 行若存在。
第 3 步:Express 转发 SSE 到浏览器
后端代理保护 API Key,前端连自家域名:
import express from 'express'
import OpenAI from 'openai'
const app = express()
app.use(express.json())
const client = new OpenAI()
app.post('/api/chat/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.flushHeaders?.()
try {
const stream = await client.chat.completions.create({
model: 'gpt-4o-mini',
messages: req.body.messages,
stream: true,
})
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta?.content ?? ''
if (delta) {
res.write(`data: ${JSON.stringify({ text: delta })}\n\n`)
}
}
res.write('data: [DONE]\n\n')
res.end()
} catch (e) {
res.write(`data: ${JSON.stringify({ error: e.message })}\n\n`)
res.end()
}
})
app.listen(3000)Nginx 反向代理需 proxy_buffering off; 或 X-Accel-Buffering: no,否则 SSE 被缓冲、前端一次性才收到。
第 4 步:AbortController:用户点击「停止」
浏览器端:
const controller = new AbortController()
fetch('/api/chat/stream', {
method: 'POST',
body: JSON.stringify({ messages }),
signal: controller.signal,
})
.then(async (res) => {
const reader = res.body.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
appendUI(decoder.decode(value))
}
})
.catch((e) => {
if (e.name === 'AbortError') console.log('用户取消')
else console.error(e)
})
document.getElementById('stop').onclick = () => controller.abort()Node OpenAI SDK 传 signal:
const ac = new AbortController()
setTimeout(() => ac.abort(), 30_000) // 30s 超时
const stream = await client.chat.completions.create(
{ model: 'gpt-4o-mini', messages, stream: true },
{ signal: ac.signal },
)服务端应在客户端断开后 abort 上游 fetch,释放连接与 GPU 算力(监听 req.on('close'))。
第 5 步:错误处理与并发流
错误分阶段:
- HTTP 4xx/5xx(未建立流):读
response.text()得 JSON error - 流中途错误:部分厂商发
data: {"error":...}event;需解析并展示 - 网络闪断:前端提示重试,可保留已生成 partial 内容
并发:每请求独立 ReadableStream,勿共享 reader。UI 列表多会话时 sessionId → AbortController Map 管理取消。
与 JSON 模式:结构化输出有时不支持 stream 或 stream 完再校验;见 2.6。
测试 curl:
curl -N https://api.openai.com/v1/chat/completions \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-H "Content-Type: application/json" \
-d '{"model":"gpt-4o-mini","stream":true,"messages":[{"role":"user","content":"hi"}]}'-N 禁用 curl 缓冲,应看到逐行 data: 输出。
动手练习
- 用 SDK 流式打印一篇短诗,统计首 Token 到达时间(Date.now 差值)。
- 实现 Express
/api/chat/stream并用 curl -N 验证;若经 Nginx,确认无缓冲。 - 在前端或 Node 脚本中 AbortController 中途 cancel,确认不再收到 delta。
- 模拟 SSE 半包:把两个 chunk 拼进 buffer 再解析,写单元测试断言文本正确拼接。
常见问题
Q:流式会更便宜吗?
通常按相同 output tokens 计费。收益在体验与可提前 cancel 省下的「未展示 tokens」——若厂商对 cancel 仍全量计费则节省有限,查定价说明。
Q:为什么前端一次性显示全部文字?
常见原因:代理缓冲、未 flush、用错了非流式 API、或前端等 stream 结束才更新 DOM。逐 delta 更新 innerText。
Q:WebSocket 和 SSE 选哪个?
OpenAI 兼容流式用 SSE 足够;双向高频信令或多模态二进制再考虑 WebSocket。
本节小结
LLM 流式通过 SSE 分块推送 delta,SDK 的 for-await 或 fetch+ReadableStream 均可消费。后端转发需关闭代理缓冲;AbortController 实现用户取消与超时。流式降低感知延迟,错误要区分连接前与流中途,并发每路独立 reader 与 signal。