流式响应
处理来自任何 LLM 提供商的流式响应
启用流式传输
设置 stream: true 以接收实时生成的响应:
import { createBridge } from '@amux.ai/llm-bridge'
import { openaiAdapter } from '@amux.ai/adapter-openai'
import { anthropicAdapter } from '@amux.ai/adapter-anthropic'
const bridge = createBridge({
inbound: openaiAdapter,
outbound: anthropicAdapter,
config: { apiKey: process.env.ANTHROPIC_API_KEY }
})
const stream = await bridge.chat({
model: 'gpt-4',
messages: [{ role: 'user', content: '给我讲个故事' }],
stream: true // 启用流式传输
})处理流事件
使用 for await...of 遍历流:
for await (const event of stream) {
if (event.type === 'content') {
// 文本内容增量
process.stdout.write(event.content.delta)
}
}Amux 在所有提供商之间标准化流事件。相同的代码适用于 OpenAI、Claude、DeepSeek 等。
流事件类型
内容事件
正在生成的文本内容:
for await (const event of stream) {
if (event.type === 'content') {
console.log(event.content.delta) // "你好", "世界", "!"
}
}开始事件
流初始化:
for await (const event of stream) {
if (event.type === 'start') {
console.log('流开始')
console.log('模型:', event.model)
}
}结束事件
流完成及完成原因:
for await (const event of stream) {
if (event.type === 'end') {
console.log('完成原因:', event.finishReason) // 'stop', 'length', 'tool_calls'
console.log('使用情况:', event.usage)
}
}推理事件
适用于具有思考/推理模式的模型(DeepSeek、Qwen、Claude 扩展思考):
for await (const event of stream) {
if (event.type === 'reasoning') {
console.log('思考中:', event.reasoning.delta)
}
}工具调用事件
当模型想要调用函数时:
for await (const event of stream) {
if (event.type === 'tool_call') {
console.log('工具:', event.toolCall.name)
console.log('参数:', event.toolCall.arguments)
}
}完整示例
构建完整的流式响应:
const stream = await bridge.chat({
model: 'gpt-4',
messages: [{ role: 'user', content: '解释 TypeScript' }],
stream: true
})
let fullContent = ''
let thinkingContent = ''
for await (const event of stream) {
switch (event.type) {
case 'start':
console.log('开始流式传输...')
break
case 'content':
fullContent += event.content.delta
process.stdout.write(event.content.delta)
break
case 'reasoning':
thinkingContent += event.reasoning.delta
break
case 'end':
console.log('\n\n流完成')
console.log('完成原因:', event.finishReason)
console.log('总 token 数:', event.usage?.totalTokens)
break
case 'error':
console.error('流错误:', event.error)
break
}
}
console.log('完整响应:', fullContent)流中的错误处理
处理流式传输期间发生的错误:
try {
for await (const event of stream) {
if (event.type === 'error') {
console.error('流错误:', event.error.message)
break
}
if (event.type === 'content') {
process.stdout.write(event.content.delta)
}
}
} catch (error) {
console.error('流失败:', error)
}Web 应用程序
对于 Web 应用程序,将流传输到客户端:
Node.js/Express
app.post('/api/chat', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
const stream = await bridge.chat({
...req.body,
stream: true
})
for await (const event of stream) {
if (event.type === 'content') {
res.write(`data: ${JSON.stringify(event)}\n\n`)
}
}
res.end()
})Next.js App Router
// app/api/chat/route.ts
export async function POST(req: Request) {
const body = await req.json()
const stream = await bridge.chat({
...body,
stream: true
})
const encoder = new TextEncoder()
const readable = new ReadableStream({
async start(controller) {
for await (const event of stream) {
if (event.type === 'content') {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify(event)}\n\n`)
)
}
}
controller.close()
}
})
return new Response(readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache'
}
})
}提供商兼容性
所有适配器都支持流式传输:
| 提供商 | 流式传输 | 推理事件 | 工具调用事件 |
|---|---|---|---|
| OpenAI | ✅ | ❌ | ✅ |
| Anthropic | ✅ | ✅ | ✅ |
| DeepSeek | ✅ | ✅ | ✅ |
| Moonshot | ✅ | ❌ | ✅ |
| Zhipu | ✅ | ❌ | ✅ |
| Qwen | ✅ | ✅ | ✅ |
| Gemini | ✅ | ❌ | ✅ |