Amux

流式响应

处理来自任何 LLM 提供商的流式响应

启用流式传输

设置 stream: true 以接收实时生成的响应:

import { createBridge } from '@amux.ai/llm-bridge'
import { openaiAdapter } from '@amux.ai/adapter-openai'
import { anthropicAdapter } from '@amux.ai/adapter-anthropic'

const bridge = createBridge({
  inbound: openaiAdapter,
  outbound: anthropicAdapter,
  config: { apiKey: process.env.ANTHROPIC_API_KEY }
})

const stream = await bridge.chat({
  model: 'gpt-4',
  messages: [{ role: 'user', content: '给我讲个故事' }],
  stream: true  // 启用流式传输
})

处理流事件

使用 for await...of 遍历流:

for await (const event of stream) {
  if (event.type === 'content') {
    // 文本内容增量
    process.stdout.write(event.content.delta)
  }
}

Amux 在所有提供商之间标准化流事件。相同的代码适用于 OpenAI、Claude、DeepSeek 等。

流事件类型

内容事件

正在生成的文本内容:

for await (const event of stream) {
  if (event.type === 'content') {
    console.log(event.content.delta)  // "你好", "世界", "!"
  }
}

开始事件

流初始化:

for await (const event of stream) {
  if (event.type === 'start') {
    console.log('流开始')
    console.log('模型:', event.model)
  }
}

结束事件

流完成及完成原因:

for await (const event of stream) {
  if (event.type === 'end') {
    console.log('完成原因:', event.finishReason)  // 'stop', 'length', 'tool_calls'
    console.log('使用情况:', event.usage)
  }
}

推理事件

适用于具有思考/推理模式的模型(DeepSeek、Qwen、Claude 扩展思考):

for await (const event of stream) {
  if (event.type === 'reasoning') {
    console.log('思考中:', event.reasoning.delta)
  }
}

工具调用事件

当模型想要调用函数时:

for await (const event of stream) {
  if (event.type === 'tool_call') {
    console.log('工具:', event.toolCall.name)
    console.log('参数:', event.toolCall.arguments)
  }
}

完整示例

构建完整的流式响应:

const stream = await bridge.chat({
  model: 'gpt-4',
  messages: [{ role: 'user', content: '解释 TypeScript' }],
  stream: true
})

let fullContent = ''
let thinkingContent = ''

for await (const event of stream) {
  switch (event.type) {
    case 'start':
      console.log('开始流式传输...')
      break

    case 'content':
      fullContent += event.content.delta
      process.stdout.write(event.content.delta)
      break

    case 'reasoning':
      thinkingContent += event.reasoning.delta
      break

    case 'end':
      console.log('\n\n流完成')
      console.log('完成原因:', event.finishReason)
      console.log('总 token 数:', event.usage?.totalTokens)
      break

    case 'error':
      console.error('流错误:', event.error)
      break
  }
}

console.log('完整响应:', fullContent)

流中的错误处理

处理流式传输期间发生的错误:

try {
  for await (const event of stream) {
    if (event.type === 'error') {
      console.error('流错误:', event.error.message)
      break
    }

    if (event.type === 'content') {
      process.stdout.write(event.content.delta)
    }
  }
} catch (error) {
  console.error('流失败:', error)
}

Web 应用程序

对于 Web 应用程序,将流传输到客户端:

Node.js/Express

app.post('/api/chat', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream')
  res.setHeader('Cache-Control', 'no-cache')
  res.setHeader('Connection', 'keep-alive')

  const stream = await bridge.chat({
    ...req.body,
    stream: true
  })

  for await (const event of stream) {
    if (event.type === 'content') {
      res.write(`data: ${JSON.stringify(event)}\n\n`)
    }
  }

  res.end()
})

Next.js App Router

// app/api/chat/route.ts
export async function POST(req: Request) {
  const body = await req.json()

  const stream = await bridge.chat({
    ...body,
    stream: true
  })

  const encoder = new TextEncoder()
  const readable = new ReadableStream({
    async start(controller) {
      for await (const event of stream) {
        if (event.type === 'content') {
          controller.enqueue(
            encoder.encode(`data: ${JSON.stringify(event)}\n\n`)
          )
        }
      }
      controller.close()
    }
  })

  return new Response(readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache'
    }
  })
}

提供商兼容性

所有适配器都支持流式传输:

提供商流式传输推理事件工具调用事件
OpenAI
Anthropic
DeepSeek
Moonshot
Zhipu
Qwen
Gemini

下一步

On this page