- Published on
前端流式数据处理指南:以 AI 内容生成为例
- Authors
- Name
- 不作声
- GitHub
- Github @buzuosheng
本文将探讨如何在前端应用中高效处理流式数据,特别关注如何接收和处理 AI 流式生成内容的场景。
什么是流式数据处理?
流式数据处理是一种以连续方式处理数据的模式,而不是等待所有数据一次性加载完成。这种模式在处理大量数据或需要实时反馈的场景中特别有用,比如 AI 内容生成。
流式处理的主要优势:
- 更好的用户体验:用户可以立即看到部分结果,而不必等待整个响应完成
- 减少感知延迟:即使生成完整内容需要时间,用户也能看到进度
- 更高效的资源利用:浏览器可以在接收数据的同时开始处理和渲染
服务器返回流式数据的两种主要方式
在处理流式数据时,服务器通常有两种方式向前端发送数据:
1. Server-Sent Events (SSE)


SSE 使用 Content-Type: text/event-stream
格式,数据以特定格式按行发送。例如kimi、deepseek 等就是如此:
data: 第一条消息\n\n
data: 第二条消息\n\n
data: [DONE]\n\n

这种格式需要特殊解析,但有标准的 EventSource API 支持。
2. 普通流式响应
服务器会返回 transfer-encoding: chunked
的响应头,并且返回的数据是分块的:
{"chunk": "第一部分"} {"chunk": "第二部分"} {"chunk": "第三部分"}
或者简单的文本片段:
第一部分文本第二部分文本第三部分文本

这种格式更简单,可以直接拼接使用,但需要自行处理数据边界。
服务器端返回流的示例 (Koa)
下面是使用 Koa 框架返回不同类型流的示例:
const Koa = require('koa');
const Router = require('@koa/router');
const app = new Koa();
const router = new Router();
// 返回 SSE 格式的流
router.get('/api/sse-stream', async (ctx) => {
ctx.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
// 获取响应对象
const res = ctx.res;
// 发送 SSE 格式的数据
const sendSSE = (data) => {
res.write(`data: ${data}\n\n`);
};
// 模拟 AI 生成过程
sendSSE('这是第一部分内容');
await new Promise(resolve => setTimeout(resolve, 1000));
sendSSE('这是第二部分内容');
await new Promise(resolve => setTimeout(resolve, 1000));
sendSSE('这是最后一部分内容');
await new Promise(resolve => setTimeout(resolve, 1000));
sendSSE('[DONE]');
// 结束响应
ctx.respond = false;
res.end();
});
// 返回普通 JSON 流
router.get('/api/json-stream', async (ctx) => {
ctx.set({
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked',
});
const res = ctx.res;
// 发送 JSON 数据块
const sendChunk = (data) => {
res.write(JSON.stringify({ chunk: data }));
};
sendChunk('这是第一部分内容');
await new Promise(resolve => setTimeout(resolve, 1000));
sendChunk('这是第二部分内容');
await new Promise(resolve => setTimeout(resolve, 1000));
sendChunk('这是最后一部分内容');
// 结束响应
ctx.respond = false;
res.end();
});
// 返回纯文本流
router.get('/api/text-stream', async (ctx) => {
ctx.set({
'Content-Type': 'text/plain',
'Transfer-Encoding': 'chunked',
});
const res = ctx.res;
res.write('这是第一部分内容');
await new Promise(resolve => setTimeout(resolve, 1000));
res.write('这是第二部分内容');
await new Promise(resolve => setTimeout(resolve, 1000));
res.write('这是最后一部分内容');
// 结束响应
ctx.respond = false;
res.end();
});
app.use(router.routes()).use(router.allowedMethods());
app.listen(3000, () => {
console.log('服务器运行在 http://localhost:3000');
});
前端接收流式数据的方式
1. 使用 EventSource 处理 SSE 流
当服务器返回 Content-Type: text/event-stream
格式的数据时,可以使用 EventSource API:
// 创建 EventSource 连接
const eventSource = new EventSource('/api/sse-stream');
let content = '';
// 接收消息
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
console.log('流接收完成');
return;
}
// 累积内容
content += event.data;
document.getElementById('content').textContent = content;
};
// 处理错误
eventSource.onerror = (error) => {
console.error('SSE 错误:', error);
eventSource.close();
};
2. 使用 Fetch API 处理普通流
当服务器返回普通流(如 JSON 流或纯文本流)时,可以使用 Fetch API:
async function fetchTextStream() {
const response = await fetch('/api/text-stream');
if (!response.ok) {
throw new Error('请求失败');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let content = '';
// 处理流数据
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
// 解码并直接拼接数据块 - 对于纯文本流非常简单
const text = decoder.decode(value, { stream: true });
content += text;
// 更新 UI
document.getElementById('content').textContent = content;
}
console.log('流接收完成');
}
3. 使用 for await...of 处理流
对于支持异步迭代的现代浏览器,可以使用更简洁的 for await...of
语法:
async function processStreamWithForAwait() {
const response = await fetch('/api/text-stream');
if (!response.ok) {
throw new Error('请求失败');
}
let content = '';
// 使用 for await...of 处理流
for await (const chunk of re sponse.body) {
// 对于纯文本流,直接拼接即可
content += chunk;
document.getElementById('content').textContent = content;
}
console.log('流处理完成');
}
使用 TanStack Query 处理流式数据
TanStack Query 提供了 streamedQuery
功能,可以优雅地处理流式数据。
1. 处理纯文本流
import { streamedQuery, useQuery } from '@tanstack/react-query';
import { useState } from 'react';
function TextStreamWithQuery() {
// 创建一个函数,将文本流转换为 AsyncIterable
const fetchTextStream = async () => {
const response = await fetch('/api/text-stream');
if (!response.ok) {
throw new Error('请求失败');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
// 返回一个异步迭代器
return {
[Symbol.asyncIterator]: async function* () {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
// 对于纯文本流,直接解码并返回
yield decoder.decode(value, { stream: true });
}
}
};
};
const { data, isPending, isFetching } = useQuery({
queryKey: ['text-stream'],
queryFn: streamedQuery({
queryFn: fetchTextStream,
}),
});
// 将所有接收到的数据块连接起来
const fullContent = data ? data.join('') : '';
return (
<div>
<div className="status">
{isPending && <p>等待第一个数据块...</p>}
{!isPending && isFetching && <p>正在接收数据...</p>}
{!isPending && !isFetching && <p>数据接收完成</p>}
</div>
<div className="content-container">
{fullContent ? (
<div>{fullContent}</div>
) : (
<div>等待内容加载...</div>
)}
</div>
</div>
);
}
2. 处理 SSE 流
import { streamedQuery, useQuery } from '@tanstack/react-query';
import { useState } from 'react';
function SSEStreamWithQuery() {
const fetchSSEStream = async () => {
const response = await fetch('/api/sse-stream');
if (!response.ok) {
throw new Error('请求失败');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
// 返回一个异步迭代器,处理 SSE 格式
return {
[Symbol.asyncIterator]: async function* () {
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
// 解码二进制数据
const text = decoder.decode(value, { stream: true });
buffer += text;
// 处理 SSE 格式的消息
const lines = buffer.split('\n\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
continue;
}
yield data;
}
}
}
}
};
};
const { data, isPending, isFetching } = useQuery({
queryKey: ['sse-stream'],
queryFn: streamedQuery({
queryFn: fetchSSEStream,
}),
});
// 将所有接收到的数据块连接起来
const fullContent = data ? data.join('') : '';
return (
<div>
<div className="status">
{isPending && <p>等待第一个数据块...</p>}
{!isPending && isFetching && <p>正在接收数据...</p>}
{!isPending && !isFetching && <p>数据接收完成</p>}
</div>
<div className="content-container">
{fullContent ? (
<div>{fullContent}</div>
) : (
<div>等待内容加载...</div>
)}
</div>
</div>
);
}
流式处理的最佳实践
根据响应类型选择合适的处理方法:
- 对于
text/event-stream
格式,使用 EventSource 或专门的解析逻辑 - 对于普通文本流,可以直接拼接
- 对于 JSON 流,需要处理 JSON 解析
- 对于
提供即时反馈:让用户知道系统正在工作,显示打字机效果或进度指示
优雅处理错误:流式处理可能在任何时候中断,确保适当的错误处理和恢复机制
资源清理:确保在组件卸载或流结束时关闭连接和释放资源
考虑网络条件:为不稳定的网络环境提供重试机制和断点续传功能
优化渲染性能:频繁更新可能导致性能问题,考虑使用节流或批量更新
提供取消选项:允许用户在需要时取消正在进行的流式请求
在处理前端流式数据时,特别是 AI 生成内容的场景,建议:
- 优先考虑使用 TanStack Query:它提供了优雅的流式数据处理方案,处理了很多边缘情况
- 根据服务器返回的格式选择合适的处理方法:SSE 格式和普通流需要不同的处理方式
- 关注用户体验:流式处理的主要目的是提升用户体验,确保界面响应及时更新
- 处理好错误和边界情况:网络不稳定、服务器错误等情况需要妥善处理
通过合理使用流式数据处理技术,可以显著提升 AI 内容生成等场景下的用户体验,减少用户等待时间,提供更加即时的反馈。