如何使用Spring SSE 实现 GPT数据流传输?

大家好,我是猿java

先看一张和 GPT交互的图片,让 GPT 写一篇200字的诗歌赞美 Java:

img

那么问题来了,我们是否也可以轻松地实现这种流式交互?

但是是肯定的,这篇文章我们就来聊一聊主角:Spring SseEmitter。

1. 什么是 SSE?

SseEmitter 是 Spring Framework 中用于实现服务器发送事件(Server-Sent Events, 简称 SSE)的一个类,它是一种基于 HTTP 协议的标准,用于服务器向客户端单向推送事件。

SSE 允许服务器通过单向通道向客户端持续推送数据,适用于需要实时更新的应用场景,如实时通知、消息推送、动态数据展示等。SseEmitter 的工作原理主要涉及以下几个方面:

SSE 的特点:

  • 单向通信:仅服务器可以主动发送数据到客户端。
  • 持久连接:使用持久的 HTTP 连接,服务器可以持续发送事件。
  • 自动重连:浏览器在连接断开后会自动尝试重连。
  • 基于文本:传输的数据格式为纯文本,通常为 UTF-8 编码。

2. Spring 中的 SseEmitter

使用 SseEmitter 实现像 GPT一样的流式交互,其实还是比较简单的,在控制器中创建 SseEmitter 并返回,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RestController
public class SseController {

@GetMapping("/sse")
public SseEmitter streamSseMvc() {
SseEmitter emitter = new SseEmitter();
// 异步处理发送事件
Executors.newSingleThreadExecutor().execute(() -> {
try {
for (int i = 0; i < 10; i++) {
emitter.send("Message " + i);
Thread.sleep(1000);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
}

主要方法:

  • send(Object object):发送事件数据给客户端。
  • complete():关闭连接。
  • completeWithError(Throwable ex):在发生错误时关闭连接并发送错误信息。

效果如下图:

img

3. 工作流程

3.1 客户端请求 SSE 端点

客户端通过 EventSource API 或其他方式向服务器的 SSE 端点发送 HTTP GET 请求。例如:

1
2
3
4
5
6
7
8
9
const eventSource = new EventSource('/sse');

eventSource.onmessage = function(event) {
console.log('Received event:', event.data);
};

eventSource.onerror = function(err) {
console.error('EventSource failed:', err);
};

3.2 服务器端建立 SseEmitter

当服务器接收到 SSE 请求时,控制器方法会创建一个 SseEmitter 实例并返回。这会触发 Spring MVC 将响应头设置为 Content-Type: text/event-stream,以维持持久连接。

3.3 服务器端发送事件

通过 SseEmitter.send() 方法,服务器可以向客户端发送事件数据。通常,这些操作会在异步线程中进行,以避免阻塞主线程。

3.4 持久连接和生命周期管理

SseEmitter 管理着 SSE 连接的生命周期,包括处理超时、连接断开和错误等情况。可以通过配置超时时间来控制连接的最长持续时间:

1
SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时

如果连接在指定时间内未关闭,SseEmitter 会自动触发超时处理。

3.5 客户端接收事件

客户端通过 EventSource 接收并处理服务器发送的事件。当服务器调用 emitter.complete() 或连接因超时等原因关闭时,客户端的 onclose 事件会被触发。

4. 错误处理与重试机制

服务器端:

在发送事件过程中,如果发生异常,可以调用 emitter.completeWithError(e) 来通知客户端错误并关闭连接。

客户端端:

客户端的 EventSource 会自动尝试重新连接,当连接断开时,会触发 onerror 事件。可以在客户端代码中实现更复杂的重试逻辑,例如增加重试次数限制或延迟策略。

5. 适用场景与限制

5.1 适用场景

  • 实时通知,如聊天应用、社交媒体动态更新。
  • 实时监控,如服务器状态监控、数据仪表盘。
  • 需要频繁推送更新但数据量不大的场景。

5.2 限制

  • 仅支持服务器到客户端的单向通信。
  • 需要浏览器支持 SSE 协议(大多数现代浏览器支持,但部分老旧浏览器可能不兼容)。
  • 对于需要高频率、大数据量的实时通信,WebSocket 可能更为合适。

6. 总结

这篇文章,我们分析了如何使用SseEmitter实现客户端和服务器的流式交互,SseEmitter提供了一个简洁的方式在 Spring 应用中实现服务器发送事件,通过维护持久连接和异步事件推送,满足了大多数实时数据推送的需求。

7. 学习交流

如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注公众号:猿java,持续输出硬核文章。

drawing