背景与动机
在产品中,我们经常需要向前端“持续推送”任务进度、后台计算结果或异步事件。若这些信息主要是“服务端单向推送”,且交互不复杂,使用浏览器原生的 Server‑Sent Events(SSE)会比 WebSocket 更加轻量、省心。
本文基于我的 Next.js 项目实战,详细介绍如何实现一个稳定的 SSE 通道,并在多处业务中复用。
SSE 基本原理(超简版)
- 建立一个长连接 HTTP 流,响应头为 text/event-stream。
- 服务端不断向流里写入“事件文本帧”(本质是文本行),浏览器端用 EventSource 原生对象接收。
- 适合服务端单向推送;浏览器自动重连,开发者可用“心跳”防止中间代理断开。
服务端实现:Next.js Route Handler + ReadableStream
需要实现一个SSE 端点 src/app/api/tasks/stream-new/route.ts,核心是返回一个 ReadableStream,并设置持久化的 SSE 头:
route.ts:
return new Response(stream, { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Cache-Control', }, })
创建流的入口如下,通过 start(controller) 启动后持续写入事件数据(中间省略推送逻辑):
route.ts
// 创建 SSE 流 const stream = new ReadableStream({ start(controller) { let isActive = true
实现要点(结合你的代码逻辑整理):
- 身份校验:进入 SSE 前先用 auth.api.getSession 校验用户身份。
- 订阅参数:从 taskIds 解析需要监控的任务集合。
- 事件协议:服务端统一输出 JSON,包含 type(connected/heartbeat/task_update/all_completed/error)、tasks、message 等字段。
- 心跳与保活:周期性发送 heartbeat,防止代理/中间层因空闲断开。
- 资源清理:连接断开/超时(例如 5 分钟)时关闭流,避免泄漏。
- 结束信号:任务全部完成后发送 all_completed 并主动关闭连接。
客户端实现:EventSource + 可复用 Hook
封装在自定义 Hook useTaskStream 中,统一处理连接、消息分发、错误与重连:
创建连接并持有引用:
useTaskStream.ts:
try { const taskIdsParam = taskIds.join(',') const url = `/api/tasks/stream-new?taskIds=${encodeURIComponent(taskIdsParam)}` const eventSource = new EventSource(url) eventSourceRef.current = eventSource
统一解析消息并路由处理:
eventSource.onmessage = (event) => { try { const data: SSEEvent = JSON.parse(event.data) switch (data.type) { case 'connected': setConnectionId(data.connectionId || null) break case 'heartbeat': // 心跳消息,保持连接活跃 break case 'task_update': if (data.tasks && onTaskUpdate) { onTaskUpdate(data.tasks) } break
Hook 的职责清单:
- 防重复连接、清理旧连接。
- 统一消息协议解析,并通过回调传给业务。
- 错误处理与有限次重连策略(实现了计数 + 退避)。
- 在 all_completed 时自动关闭连接,避免“悬空长连”。
在页面中使用:多任务与单任务订阅
订阅一组任务 ID,来同步多个任务的进度与结果:
useTaskStream({ taskIds: interviewTaskIds, onTaskUpdate: handleInterviewsTaskUpdate, onAllCompleted: () => { console.log('All interviews tasks completed') }, onError: handleInterviewsError, enabled: !readonly && interviewsTask && interviewsTask.length > 0 && interviewsTask.some((task) => task.taskStatus !== 'completed'), })
单任务订阅,写法一致、体验统一:
useTaskStream({ taskIds: questionsTask.taskId ? [questionsTask.taskId] : [], onTaskUpdate: handleTaskUpdate, onAllCompleted: handleAllCompleted, onError: handleError, enabled: !readonly && !!questionsTask.taskId && questionsTask.taskStatus !== 'completed', })
这套模式的好处:
- 消息协议统一,Hook 可在多处复用。
- 业务层变得很轻,仅需实现 onTaskUpdate/onError 做 UI 更新。
- 订阅条件可控(通过 enabled 避免不必要的连接)。
心跳与重连策略完整代码示例:
服务端片段:事件封装 + 心跳 + 断开与超时:
// 服务端(Next.js Route Handler 中使用) function createSSEController(controller: ReadableStreamDefaultController) { const encoder = new TextEncoder(); // 基础发送:写入一帧事件(符合 text/event-stream) const send = (payload: string) => { controller.enqueue(encoder.encode(`data: ${payload}\n\n`)); }; // 发送 JSON 事件(统一协议) const sendEvent = (data: unknown) => { send(JSON.stringify(data)); }; // 设置浏览器的重连等待(可选,毫秒) const setRetry = (ms: number) => { controller.enqueue(encoder.encode(`retry: ${ms}\n\n`)); }; // 心跳:避免代理/网关因为空闲断开(建议 15–30s) let heartbeatTimer: NodeJS.Timeout | null = null; const startHeartbeat = (intervalMs = 15000) => { heartbeatTimer = setInterval(() => { sendEvent({ type: 'heartbeat', timestamp: Date.now() }); }, intervalMs); }; // 断开 & 清理 const close = () => { try { if (heartbeatTimer) clearInterval(heartbeatTimer); // 可选:发送一帧,提示即将断开 sendEvent({ type: 'server_closing', timestamp: Date.now() }); } finally { controller.close(); } }; // 超时兜底(例如 5 分钟) let timeoutTimer: NodeJS.Timeout | null = null; const startTimeout = (ms = 5 * 60 * 1000) => { timeoutTimer = setTimeout(() => { sendEvent({ type: 'timeout', message: 'SSE connection timeout' }); close(); }, ms); }; const cancelTimeout = () => { if (timeoutTimer) clearTimeout(timeoutTimer); }; return { send, sendEvent, setRetry, startHeartbeat, startTimeout, cancelTimeout, close }; } // 用法示例(Route Handler 内部) export async function GET(request: Request) { // ... 进行鉴权与参数解析 ... const stream = new ReadableStream({ start(controller) { const sse = createSSEController(controller); // 指定浏览器自动重连退避下限(可选) sse.setRetry(5000); // 连接建立通知 sse.sendEvent({ type: 'connected', connectionId: crypto.randomUUID(), timestamp: Date.now() }); // 启动心跳和超时兜底 sse.startHeartbeat(15000); sse.startTimeout(5 * 60 * 1000); // 业务:按需推送任务更新 const pushTaskUpdate = (tasks: any[]) => { sse.sendEvent({ type: 'task_update', tasks, timestamp: Date.now() }); }; // 任务全部完成 -> 通知并关闭连接 const finish = () => { sse.sendEvent({ type: 'all_completed', timestamp: Date.now() }); sse.close(); }; // 示例:模拟一条任务更新 setTimeout(() => pushTaskUpdate([{ taskId: 't1', taskStatus: 'running', updatedAt: new Date().toISOString() }]), 1000); // 示例:模拟完成 setTimeout(finish, 8000); // 可选:通过 request.signal 感知客户端断开(Node/Edge 支持情况不同) // request.signal?.addEventListener('abort', () => sse.close()); }, cancel() { // 客户端关闭时的兜底清理(可选) } }); return new Response(stream, { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Cache-Control', }, }); }
客户端片段:重连策略 + 清理 + 可见性优化:
// 客户端(React Hook,可直接替换/融合到你的 useTaskStream 中) import { useEffect, useRef, useState, useCallback } from 'react'; interface SSEEvent { type: 'connected' | 'heartbeat' | 'task_update' | 'all_completed' | 'error' | string; tasks?: any[]; connectionId?: string; message?: string; timestamp?: number; } interface Options { url: string; // SSE 路径(含查询参数) onTaskUpdate?: (tasks: any[]) => void; onAllCompleted?: () => void; onError?: (msg: string) => void; enabled?: boolean; maxReconnect?: number; // 最大重连次数 } export function useSSE({ url, onTaskUpdate, onAllCompleted, onError, enabled = true, maxReconnect = 5, }: Options) { const [isConnected, setIsConnected] = useState(false); const eventSourceRef = useRef<EventSource | null>(null); const reconnectAttempts = useRef(0); const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null); const clearReconnectTimer = () => { if (reconnectTimerRef.current) { clearTimeout(reconnectTimerRef.current); reconnectTimerRef.current = null; } }; const disconnect = useCallback(() => { clearReconnectTimer(); if (eventSourceRef.current) { eventSourceRef.current.close(); eventSourceRef.current = null; } setIsConnected(false); }, []); const scheduleReconnect = useCallback(() => { if (reconnectAttempts.current >= maxReconnect) { onError?.('Max reconnect attempts reached'); return; } reconnectAttempts.current += 1; // 指数退避 + 抖动:1s,2s,4s... 最大 16s const base = Math.min(1000 * 2 ** (reconnectAttempts.current - 1), 16000); const jitter = Math.floor(Math.random() * 500); const delay = base + jitter; clearReconnectTimer(); reconnectTimerRef.current = setTimeout(() => { connect(); }, delay); }, [maxReconnect, onError]); const handleMessage = useCallback((event: MessageEvent) => { try { const data: SSEEvent = JSON.parse(event.data); switch (data.type) { case 'connected': reconnectAttempts.current = 0; break; case 'heartbeat': break; case 'task_update': if (data.tasks) onTaskUpdate?.(data.tasks); break; case 'all_completed': onAllCompleted?.(); disconnect(); break; case 'error': onError?.(data.message || 'Unknown SSE error'); break; default: // 可选:处理自定义事件 break; } } catch (e) { onError?.('Failed to parse SSE message'); } }, [disconnect, onAllCompleted, onTaskUpdate, onError]); const connect = useCallback(() => { if (!enabled) return; if (eventSourceRef.current && eventSourceRef.current.readyState === EventSource.OPEN) return; // 断开旧连接 if (eventSourceRef.current) { eventSourceRef.current.close(); eventSourceRef.current = null; } try { const es = new EventSource(url); eventSourceRef.current = es; es.onopen = () => { setIsConnected(true); reconnectAttempts.current = 0; }; es.onmessage = handleMessage; es.onerror = () => { setIsConnected(false); // 关闭旧连接再重连 es.close(); eventSourceRef.current = null; scheduleReconnect(); }; } catch (e) { onError?.('Failed to open SSE'); scheduleReconnect(); } }, [enabled, handleMessage, onError, scheduleReconnect, url]); // 首次与依赖变化时连接 useEffect(() => { if (!enabled) return; connect(); return () => disconnect(); }, [connect, disconnect, enabled]); // 页面不可见时,减少资源占用:隐藏时断开,显示时重连(可选) useEffect(() => { const onVisibility = () => { if (document.hidden) { disconnect(); } else { reconnectAttempts.current = 0; connect(); } }; document.addEventListener('visibilitychange', onVisibility); return () => document.removeEventListener('visibilitychange', onVisibility); }, [connect, disconnect]); return { isConnected, disconnect, reconnectNow: connect }; }
使用示例:
const taskIdsParam = taskIds.join(','); const url = `/api/tasks/stream-new?taskIds=${encodeURIComponent(taskIdsParam)}`; useSSE({ url, onTaskUpdate: (tasks) => { /* 更新 UI */ }, onAllCompleted: () => { /* 关闭 loading 等 */ }, onError: (msg) => { console.error(msg); }, enabled: taskIds.length > 0, maxReconnect: 5, });
消息协议设计建议
消息实现可以采用 JSON 文本帧。参考以下结构:
- type: connected | heartbeat | task_update | all_completed | error
- tasks: TaskUpdate[](包含 taskId、taskStatus、taskResult、updatedAt)
- message: 错误信息或服务端反馈
- timestamp: 服务端生成的时间戳,便于调试/排序
好处:
- 统一事件入口,前端只需一处 switch 即可扩展。
- 易于跨页面/组件复用,降低维护成本。
注意事项与最佳实践
- 认证与鉴权
- 在 SSE 端点最先校验用户身份,拒绝未授权连接。
- 如需跨域订阅,务必正确配置 CORS。
- 连接稳定性
- 定期心跳(例如 15–30 秒)避免被中间层关闭。
- 浏览器 EventSource 会自动重连;同时建议在应用层实现有限次数的退避重连,防止雪崩。
- 资源释放
- 组件卸载时关闭 EventSource,清理计时器与引用。
- 服务端对每个连接设置超时兜底(你用的是 5 分钟),防泄漏。
- 端到端可观测性
- 打点连接建立/断开、重连次数、错误类型。
- 记录 connectionId 便于串联日志与排障。
- 与部署环境的兼容
- SSE 是长连接,对代理/负载均衡有要求。务必确保反向代理不强制超时(或心跳间隔短于代理超时)。
- 若走边缘网络(Edge Runtime),确认 ReadableStream 与头部支持情况。
- 数据序列化
- 仅发送字符串;JSON 序列化后按行写入,避免二进制与大型 payload(可拆分或引用)。
何时选用 SSE,何时选择 WebSocket
- 更适合 SSE 的场景
- 主要是服务端单向推送:任务进度、通知、异步完成结果。
- 事件频次中低、消息体结构化为主(JSON 文本)。
- 更适合 WebSocket 的场景
- 需要双向实时交互:聊天室、协同编辑、白板、在线游戏。
- 极低延迟与高频数据交换;需要自定义二进制协议或帧路由。
- 业务是“任务驱动、单向进度下发”为主,SSE 完美契合。
- 浏览器原生支持 + 服务端实现轻量,Hook 一次封装多处复用。
- 免去了 WebSocket 的连接状态管理、粘性路由与水平扩展复杂度。
实战要点清单(速查)
- 服务端
- 设置头:Content-Type 为 text/event-stream,Cache-Control: no-cache,Connection: keep-alive。
- 用 ReadableStream 持续写入事件;心跳保活;超时兜底关闭。
- 统一 JSON 协议:type、tasks、message、timestamp。
- 接入鉴权、记录 connectionId、埋点日志。
- 客户端
- 使用 EventSource(url) 建立连接,集中 onmessage 解析。
- 做好错误回调与有限次重连,组件卸载时关闭连接。
- 把公共逻辑封装成 Hook,业务只关注 onTaskUpdate。
结语
SSE 在“服务端单向推送”的领域里,简洁、稳定且极具工程效率。在 Next.js 中,通过一个 Route Handler 和一个复用的 Hook,就能优雅地覆盖多种异步任务场景。若你遇到需要双向实时通信的复杂交互,再考虑 WebSocket 也不迟。