logo头像

学如逆水行舟

Server-Sent Events(SSE)协议原理与实践

Server-Sent Events(SSE)协议原理与实践

这些年,语言类大模型相关的应用成为了非常热门的提效工具。各行各业都可以通过定制化的AI工具来提高工作效率。在这类应用的客户端中,由于大模型的运算效率和输出格式的原因,你会发现几乎所有的实时交流都是流式输出的,一个体验良好的客户端会实时接收数据流,进行MarkDown格式的内容渲染。本篇文章将主要讨论实现这种流式数据接收的一种方法,使用SSE协议:Server-Sent Events,并以JavaScript和Swift语言分别来对Web端和iOS端做实践。如果你正需要一种类似语言大模型交流方式的交互体验,希望本篇文章可以为你带来启发。

一 认识SSE协议

1 简介

SSE协议全称为Server-Sent Events,从名称也可以得知,这是一种服务端向客户端发送事件消息的协议。我们知道,通常在服务端和客户端交互的的HTTP请求中,请求会在一次收发数据后结束掉(下载除外),客户端首先发起请求,将数据发送到服务端,服务端根据业务逻辑将数据返回给客户端,则这次请求就完成了。但有时候,我们需要客户端和服务端进行多轮的有状态的通信(每次HTTP请求本质都是无状态的),这当然也不复杂,通过建立Socket长连接,在同一次连接生命周期中,客户端不仅可以多次随时的向发服务端发送数据,服务端也能够主动的多次随时向客户端推送信息。我们平时使用的IM即是通信软件,通常就是使用长链接来实现的。

回到本篇文章讨论的主题,我们的核心需求是客户端一次请求,服务端可以多次向客户端推送消息。这当然使用长连接也可以实现,但Socket全双工的机制用在这里会有些浪费,也会增加复杂性和使用成本。SSE协议则是专门针对这种需求场景所产生的。

总体来说,SSE与Socket类似,都是在客户端和服务端之间建立持久的通信通道。不同的是,SSE更加轻量,并且不是全双工的,它是一个单向的通道,SSE的数据流接收过程更类似于下载。我们可以通过以下几点来特性来理解SSE协议:

1 SSE使用的是HTTP协议,天然的能在大多数互联网应用中直接使用。

2 SSE非常轻量,更加面向应用层,比Socket使用简单。

3 天然支持断线重连,减少开发成本。

4 通常用来传输文本数据,客户端可以多次接收。

5 丰富的自定义能力。

2 协议细节

SSE协议本质上依然是一个HTTP请求,要使用SSE,客户端接收到用户端的请求时,需要将Response Header中的Content-Type字段设置为:text/event-steam,状态码信息则正常返回200 OK即可,如下:

1
2
3
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

text/event-stream是标记返回数据为流式数据,SSE理论上需要服务端多次推送消息到客户端,需要保持连接为keep-alive状态。

服务端在向客户端发送数据时,每次可以发送多个消息,每个消息间使用\n\n分隔,每个消息可以有多行组成,行之间使用\n分隔。

每行的格式如下:

1
字段:值\n

其中“字段”一项是可选的,如果没有字段,以冒号开头,则表示此行为注释。

SSE协议中约定的字段包括以下4种:

1
2
3
4
data
event
id
retry

下面我们来具体介绍这几种字段以及其值的意义。

data字段标识此部分信息为数据内容,数据内容如果很长,可以分为多行,例如:

1
2
data: conent1\n
data: conent1e1_end\n\n

也可以将一个长的JSON拆分成多行发送:

1
2
3
4
data: {\n
data: "a": "a",\n
data: "b", 1\n
data: }\n\n

id字段通常标识数据的编号,便于实现断线重连等逻辑。

event字段用来标识时间的额类型,例如当服务端数据推送完成后,通常会发送一个特殊的event事件表示数据全部发送完,之后断开连接。

retry字段用来配置一个数值,指定客户端重新发起连接的间隔。

二 实践-基于Node.js的服务端SSE实践

使用Node.js平台来编写一个SSE协议的测试程序非常简单,实例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
var http = require("http");

http.createServer(function (req, res) {
var url = req.url;
if (url === "/stream") {
var count = 0
res.writeHead(200, {
"Content-Type":"text/event-stream",
"Cache-Control":"no-cache",
"Connection":"keep-alive",
"Access-Control-Allow-Origin": '*',
});
// 设置重连间隔
res.write("retry: 10000\n");
// 发起开始事件
res.write("event: start\n");
res.write("data: " + (new Date()) + "\n\n");

// 发送数据
res.write("data: " + (new Date()) + " count:" + (count++) + "\n\n");

// 每秒发送一次数据
interval = setInterval(function () {

if (count == 20) {
// 发送结束事件
res.write("event: end\n");
res.write("data: " + (new Date()) + "\n\n");
res.end();
clearInterval(interval);
} else {
res.write("data: " + (new Date()) + " count:" + (count++) + "\n\n");
}
}, 1000);

req.connection.addListener("close", function () {
clearInterval(interval);
}, false);
}
}).listen(8844, "127.0.0.1");

使用node执行上面的程序,可以在浏览器中输入如下地址进行测试:

1
http://localhost:8844/stream

SSE是一个GET请求(其实使用POST也没有问题)。服务端代码的逻辑是会每秒输出测试数据,直到输出20条后结束,并关闭连接。页面展示效果如下:

三 实践-SSE客户端实践

1 JavaScript客户端

实现了SSE协议的浏览器中会提供一个名为EventSource的对象,此对象对SSE协议的数据交互提供了支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<html>
<script>
var source = new EventSource("http://localhost.charlesproxy.com:8844/stream");
source.onopen = function (event) {
// 连接建立后会回调
console.log(event)
};

source.onmessage = function (event) {
// 接收到数据回调
var data = event.data;
console.log(data)
};
source.onerror = function (event) {
// 连接错误回调
console.log(event)
};
</script>
</html>

运行客户端代码,从打印信息可以看到,start和end事件并没有监听到,这是因为这两个事件时我们自定义的事件,要监听自定义事件,方法如下:

1
2
3
4
5
6
7
8
9
// 监听自定义事件
source.addEventListener('start', function (event) {
var data = event.data;
console.log(event)
}, false);
source.addEventListener('end', function (event) {
var data = event.data;
console.log(event)
}, false);

2 Swift客户端

Swift客户端可以直接使用原生提供的URLSession来实现,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import UIKit

class ViewController: UIViewController, URLSessionDataDelegate {

private var url: URL!
private var task: URLSessionDataTask!
private var session: URLSession!
private var receivedData = Data()

override func viewDidLoad() {
super.viewDidLoad()

self.url = URL(string: "http://localhost.charlesproxy.com:8844/stream")
let configuration = URLSessionConfiguration.default
self.session = URLSession(configuration: configuration, delegate: self, delegateQueue: OperationQueue.main)

startListening()
}

func startListening() {
let request = URLRequest(url: url)

task = session.dataTask(with: request)
task.resume()

}

private func parseSSE(data: Data) {
guard let eventsString = String(data: data, encoding: .utf8) else {
print("Unable to decode data")
return
}

let events = eventsString.split(separator: "\n\n")

for event in events {
let eventLines = event.split(separator: "\n")
var eventData: [String: String] = [:]

for line in eventLines {
let keyValue = line.split(separator: ":", maxSplits: 1)
if keyValue.count == 2 {
let key = String(keyValue[0]).trimmingCharacters(in: .whitespaces)
let value = String(keyValue[1]).trimmingCharacters(in: .whitespaces)
eventData[key] = value
}
}

handleEvent(eventData: eventData)
}
}

private func handleEvent(eventData: [String: String]) {
if let eventType = eventData["event"], let data = eventData["data"] {
print("Event Type: \(eventType), Data: \(data)")
} else if let data = eventData["data"] {
print("Data: \(data)")
}
}

func stopListening() {
task?.cancel()
task = nil
}

// MARK: - URLSessionDataDelegate methods

func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
receivedData.append(data)

// 处理完整的事件
if let eventsString = String(data: receivedData, encoding: .utf8), eventsString.contains("\n\n") {
let eventsData = eventsString.data(using: .utf8)!
parseSSE(data: eventsData)

// 清除已处理的数据
receivedData = Data()
}
}

func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
if let error = error {
print("Error occurred: \(error)")
} else {
print("Connection closed by server")
}
}
}

需要注意,要使用代理的方式接收数据,而不是block回调的方式,block回调方式的接口会等待所有数据接收完成后才会回调。运行上面的iOS客户端代码,控制台打印数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Event Type: start, Data: Mon Sep 30 2024 14:11:04 GMT+0800 (China Standard Time)
Data: Mon Sep 30 2024 14:11:04 GMT+0800 (China Standard Time) count:0
Data: Mon Sep 30 2024 14:11:05 GMT+0800 (China Standard Time) count:1
Data: Mon Sep 30 2024 14:11:06 GMT+0800 (China Standard Time) count:2
Data: Mon Sep 30 2024 14:11:07 GMT+0800 (China Standard Time) count:3
Data: Mon Sep 30 2024 14:11:08 GMT+0800 (China Standard Time) count:4
Data: Mon Sep 30 2024 14:11:09 GMT+0800 (China Standard Time) count:5
Data: Mon Sep 30 2024 14:11:10 GMT+0800 (China Standard Time) count:6
Data: Mon Sep 30 2024 14:11:11 GMT+0800 (China Standard Time) count:7
Data: Mon Sep 30 2024 14:11:12 GMT+0800 (China Standard Time) count:8
Data: Mon Sep 30 2024 14:11:13 GMT+0800 (China Standard Time) count:9
Data: Mon Sep 30 2024 14:11:14 GMT+0800 (China Standard Time) count:10
Data: Mon Sep 30 2024 14:11:15 GMT+0800 (China Standard Time) count:11
Data: Mon Sep 30 2024 14:11:16 GMT+0800 (China Standard Time) count:12
Data: Mon Sep 30 2024 14:11:17 GMT+0800 (China Standard Time) count:13
Data: Mon Sep 30 2024 14:11:18 GMT+0800 (China Standard Time) count:14
Data: Mon Sep 30 2024 14:11:19 GMT+0800 (China Standard Time) count:15
Data: Mon Sep 30 2024 14:11:20 GMT+0800 (China Standard Time) count:16
Data: Mon Sep 30 2024 14:11:21 GMT+0800 (China Standard Time) count:17
Data: Mon Sep 30 2024 14:11:22 GMT+0800 (China Standard Time) count:18
Data: Mon Sep 30 2024 14:11:23 GMT+0800 (China Standard Time) count:19
Event Type: end, Data: Mon Sep 30 2024 14:11:24 GMT+0800 (China Standard Time)
Connection closed by server

到此,我们对SSE协议的原理,定义以及服务端和客户端的用法实践做了简单介绍,对于单向的服务端发送序列数据到客户端且在短时间内会完成的场景,你都可以考虑使用SSE实现,希望本篇文章可以为你带来一些收获。