Published on

前端流式数据处理指南:以 AI 内容生成为例

Authors

本文将探讨如何在前端应用中高效处理流式数据,特别关注如何接收和处理 AI 流式生成内容的场景。

什么是流式数据处理?

流式数据处理是一种以连续方式处理数据的模式,而不是等待所有数据一次性加载完成。这种模式在处理大量数据或需要实时反馈的场景中特别有用,比如 AI 内容生成。

流式处理的主要优势:

  1. 更好的用户体验:用户可以立即看到部分结果,而不必等待整个响应完成
  2. 减少感知延迟:即使生成完整内容需要时间,用户也能看到进度
  3. 更高效的资源利用:浏览器可以在接收数据的同时开始处理和渲染

服务器返回流式数据的两种主要方式

在处理流式数据时,服务器通常有两种方式向前端发送数据:

1. Server-Sent Events (SSE)

Content-Type: text/event-stream
Content-Type: text/event-stream

SSE 使用 Content-Type: text/event-stream 格式,数据以特定格式按行发送。例如kimideepseek 等就是如此:

data: 第一条消息\n\n
data: 第二条消息\n\n
data: [DONE]\n\n
data

这种格式需要特殊解析,但有标准的 EventSource API 支持。

2. 普通流式响应

服务器会返回 transfer-encoding: chunked 的响应头,并且返回的数据是分块的:

{"chunk": "第一部分"} {"chunk": "第二部分"} {"chunk": "第三部分"}

或者简单的文本片段:

第一部分文本第二部分文本第三部分文本
普通流式响应

这种格式更简单,可以直接拼接使用,但需要自行处理数据边界。

服务器端返回流的示例 (Koa)

下面是使用 Koa 框架返回不同类型流的示例:

const Koa = require('koa');
const Router = require('@koa/router');

const app = new Koa();
const router = new Router();

// 返回 SSE 格式的流
router.get('/api/sse-stream', async (ctx) => {
  ctx.set({
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
  });
  
  // 获取响应对象
  const res = ctx.res;
  
  // 发送 SSE 格式的数据
  const sendSSE = (data) => {
    res.write(`data: ${data}\n\n`);
  };
  
  // 模拟 AI 生成过程
  sendSSE('这是第一部分内容');
  
  await new Promise(resolve => setTimeout(resolve, 1000));
  sendSSE('这是第二部分内容');
  
  await new Promise(resolve => setTimeout(resolve, 1000));
  sendSSE('这是最后一部分内容');
  
  await new Promise(resolve => setTimeout(resolve, 1000));
  sendSSE('[DONE]');
  
  // 结束响应
  ctx.respond = false;
  res.end();
});

// 返回普通 JSON 流
router.get('/api/json-stream', async (ctx) => {
  ctx.set({
    'Content-Type': 'application/json',
    'Transfer-Encoding': 'chunked',
  });
  
  const res = ctx.res;
  
  // 发送 JSON 数据块
  const sendChunk = (data) => {
    res.write(JSON.stringify({ chunk: data }));
  };
  
  sendChunk('这是第一部分内容');
  
  await new Promise(resolve => setTimeout(resolve, 1000));
  sendChunk('这是第二部分内容');
  
  await new Promise(resolve => setTimeout(resolve, 1000));
  sendChunk('这是最后一部分内容');
  
  // 结束响应
  ctx.respond = false;
  res.end();
});

// 返回纯文本流
router.get('/api/text-stream', async (ctx) => {
  ctx.set({
    'Content-Type': 'text/plain',
    'Transfer-Encoding': 'chunked',
  });
  
  const res = ctx.res;
  
  res.write('这是第一部分内容');
  
  await new Promise(resolve => setTimeout(resolve, 1000));
  res.write('这是第二部分内容');
  
  await new Promise(resolve => setTimeout(resolve, 1000));
  res.write('这是最后一部分内容');
  
  // 结束响应
  ctx.respond = false;
  res.end();
});

app.use(router.routes()).use(router.allowedMethods());

app.listen(3000, () => {
  console.log('服务器运行在 http://localhost:3000');
});

前端接收流式数据的方式

1. 使用 EventSource 处理 SSE 流

当服务器返回 Content-Type: text/event-stream 格式的数据时,可以使用 EventSource API:

// 创建 EventSource 连接
const eventSource = new EventSource('/api/sse-stream');
let content = '';

// 接收消息
eventSource.onmessage = (event) => {
  if (event.data === '[DONE]') {
    eventSource.close();
    console.log('流接收完成');
    return;
  }
  
  // 累积内容
  content += event.data;
  document.getElementById('content').textContent = content;
};

// 处理错误
eventSource.onerror = (error) => {
  console.error('SSE 错误:', error);
  eventSource.close();
};

2. 使用 Fetch API 处理普通流

当服务器返回普通流(如 JSON 流或纯文本流)时,可以使用 Fetch API:

async function fetchTextStream() {
  const response = await fetch('/api/text-stream');
  
  if (!response.ok) {
    throw new Error('请求失败');
  }
  
  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let content = '';
  
  // 处理流数据
  while (true) {
    const { done, value } = await reader.read();
    
    if (done) {
      break;
    }
    
    // 解码并直接拼接数据块 - 对于纯文本流非常简单
    const text = decoder.decode(value, { stream: true });
    content += text;
    
    // 更新 UI
    document.getElementById('content').textContent = content;
  }
  
  console.log('流接收完成');
}

3. 使用 for await...of 处理流

对于支持异步迭代的现代浏览器,可以使用更简洁的 for await...of 语法:

async function processStreamWithForAwait() {
  const response = await fetch('/api/text-stream');
  
  if (!response.ok) {
    throw new Error('请求失败');
  }
  
  let content = '';
  
  // 使用 for await...of 处理流
  for await (const chunk of re    sponse.body) {
    // 对于纯文本流,直接拼接即可
    content += chunk;
    document.getElementById('content').textContent = content;
  }
  
  console.log('流处理完成');
}

使用 TanStack Query 处理流式数据

TanStack Query 提供了 streamedQuery 功能,可以优雅地处理流式数据。

1. 处理纯文本流

import { streamedQuery, useQuery } from '@tanstack/react-query';
import { useState } from 'react';

function TextStreamWithQuery() {
  // 创建一个函数,将文本流转换为 AsyncIterable
  const fetchTextStream = async () => {
    const response = await fetch('/api/text-stream');
    
    if (!response.ok) {
      throw new Error('请求失败');
    }
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    // 返回一个异步迭代器
    return {
      [Symbol.asyncIterator]: async function* () {
        while (true) {
          const { done, value } = await reader.read();
          
          if (done) {
            break;
          }
          
          // 对于纯文本流,直接解码并返回
          yield decoder.decode(value, { stream: true });
        }
      }
    };
  };
  
  const { data, isPending, isFetching } = useQuery({
    queryKey: ['text-stream'],
    queryFn: streamedQuery({
      queryFn: fetchTextStream,
    }),
  });
  
  // 将所有接收到的数据块连接起来
  const fullContent = data ? data.join('') : '';
  
  return (
    <div>
      <div className="status">
        {isPending && <p>等待第一个数据块...</p>}
        {!isPending && isFetching && <p>正在接收数据...</p>}
        {!isPending && !isFetching && <p>数据接收完成</p>}
      </div>
      
      <div className="content-container">
        {fullContent ? (
          <div>{fullContent}</div>
        ) : (
          <div>等待内容加载...</div>
        )}
      </div>
    </div>
  );
}

2. 处理 SSE 流

import { streamedQuery, useQuery } from '@tanstack/react-query';
import { useState } from 'react';

function SSEStreamWithQuery() {
  const fetchSSEStream = async () => {
    const response = await fetch('/api/sse-stream');
    
    if (!response.ok) {
      throw new Error('请求失败');
    }
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    // 返回一个异步迭代器,处理 SSE 格式
    return {
      [Symbol.asyncIterator]: async function* () {
        let buffer = '';
        
        while (true) {
          const { done, value } = await reader.read();
          
          if (done) {
            break;
          }
          
          // 解码二进制数据
          const text = decoder.decode(value, { stream: true });
          buffer += text;
          
          // 处理 SSE 格式的消息
          const lines = buffer.split('\n\n');
          buffer = lines.pop() || '';
          
          for (const line of lines) {
            if (line.startsWith('data: ')) {
              const data = line.slice(6);
              
              if (data === '[DONE]') {
                continue;
              }
              
              yield data;

            }
          }
        }
      }
    };
  };
  
  const { data, isPending, isFetching } = useQuery({
    queryKey: ['sse-stream'],
    queryFn: streamedQuery({
      queryFn: fetchSSEStream,
    }),
  });
  
  // 将所有接收到的数据块连接起来
  const fullContent = data ? data.join('') : '';
  
  return (
    <div>
      <div className="status">
        {isPending && <p>等待第一个数据块...</p>}
        {!isPending && isFetching && <p>正在接收数据...</p>}
        {!isPending && !isFetching && <p>数据接收完成</p>}
      </div>
      
      <div className="content-container">
        {fullContent ? (
          <div>{fullContent}</div>
        ) : (
          <div>等待内容加载...</div>
        )}
      </div>
    </div>
  );
}

流式处理的最佳实践

  1. 根据响应类型选择合适的处理方法

    • 对于 text/event-stream 格式,使用 EventSource 或专门的解析逻辑
    • 对于普通文本流,可以直接拼接
    • 对于 JSON 流,需要处理 JSON 解析
  2. 提供即时反馈:让用户知道系统正在工作,显示打字机效果或进度指示

  3. 优雅处理错误:流式处理可能在任何时候中断,确保适当的错误处理和恢复机制

  4. 资源清理:确保在组件卸载或流结束时关闭连接和释放资源

  5. 考虑网络条件:为不稳定的网络环境提供重试机制和断点续传功能

  6. 优化渲染性能:频繁更新可能导致性能问题,考虑使用节流或批量更新

  7. 提供取消选项:允许用户在需要时取消正在进行的流式请求

在处理前端流式数据时,特别是 AI 生成内容的场景,建议:

  1. 优先考虑使用 TanStack Query:它提供了优雅的流式数据处理方案,处理了很多边缘情况
  2. 根据服务器返回的格式选择合适的处理方法:SSE 格式和普通流需要不同的处理方式
  3. 关注用户体验:流式处理的主要目的是提升用户体验,确保界面响应及时更新
  4. 处理好错误和边界情况:网络不稳定、服务器错误等情况需要妥善处理

通过合理使用流式数据处理技术,可以显著提升 AI 内容生成等场景下的用户体验,减少用户等待时间,提供更加即时的反馈。

参考资源