WebSocket
提供了全双工通道形式,允许服务器和客户端之间进行双向通信。这意味着不但客户端可以向服务端发送消息,同时服务端也可以向服务器发送消息。
WebSocket
通常用于
- 实时聊天工具
- 实时游戏
- 实时协作工具
- ……
但对于很多 web 程序来说,WebSocket
可能过大。比如,客户端需要实时更新会员中心页的通知数量,不需要双向通信。这时,只需要由服务端向客户端发送消息即可,这个技术就是:SSE
。
什么是 SSE
SSE(Server-Sent Events)
使用的是基于 HTTP 的协议。严格来讲,HTTP 协议是无法做到服务器主动推送消息的。但凡事都有例外,就是服务器向客户端声明发送的是数据流。
1
| Content-Type:text/event-stream
|
也就是说,服务端发送的并不是一次性的数据包,而是不间断的数据流。只要有数据,服务端会源源不断的向客户端发送数据,直到客户端关闭连接。
这种通信是以流的形式来通信,视频播放即是这种形式。
SSE
利用这种机制向浏览器推送消息。目前除了IE浏览器,基本上所有的浏览器都支持。
与 WebSocket 的区别
SSE
提供单向通信,WebSocket
提供双向通信SSE
基于 HTTP 协议,WebSocket
基于单独的协议(WebSocket 协议)SSE
只能传输文本,WebSocket
支持二进制和文本传输SSE
不支持通信加密,WebSocket
支持加密通信(TLS/SSL)SSE
有最大连接数限制,WebSocket
协议本身吴最大连接数的限制SSE
支持重连,WebSocket
需要自己实现
SSE 的特点
- 通过单个 HTTP 长连接推送消息,无需轮询
- 使用轻量级的文本格式(
JSON
),减少不必要的资源消耗 - 浏览器普遍支持
- 实现简单
数据格式
请求头
服务端向浏览器发送的 SSE
数据,必须是 UTF-8
编码的文本,必须包含以下 HTTP 请求头:
1
2
3
| Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
|
消息体
SSE
的消息体格式如下:
field 可以是以下四个值:
id 字段
id 相当于每一条数据的编号。
如果发生断连,浏览器会将 id 放到 Last-Event-Id
中进行重连来作为以重简单的同步机制。
event 字段
event 表示自定义的事件类型,如果忽略,默认是 message
。浏览器可以用 addEventListener()
来监听该事件。
1
2
3
4
| event: foo\n
data: a foo event\n\n
data: a default event\n\n
|
以上代码实现了 2 条信息。
第一条的事件名称是 foo
,第二条没有指定事件,触发浏览器的 message
事件。
data 字段
消息数据,以文本的形式发送。格式:
如果以 \n\n
为结尾,表示一条完整的数据消息。
多行数据可以以 \n
为结尾,如下发送一条 JSON 数据:
1
2
3
4
| data: {\n
data: "name": "张三",\n
data: "age", 25\n
data: }\n\n
|
retry 字段
服务端可以用 retry
来指定浏览器重新发起连接的时间间隔。单位毫秒
以上代码表示浏览器间隔 2 秒重新发起连接。
如果 retry
的值不是整数,则会被忽略。
浏览器会发起重连的原因:
SSE 实战
接下来我们完成一个简单示例来进一步了解 SSE
的用法。
用户在刊登楼盘的时候,需要上传跟楼盘相关的图片。
用户一般是用手机拍摄的图片,如果要求从 PC 浏览器上传,就需要用户从手机端将图片传输到 PC,这就增加了用户的工作量,会让用户感到很不方便。
于是,我们需要实现一个功能,使用户能够从手机端上传图片,同时 PC 浏览器也能同步显示用户上传的图片。
具体流程
1、用户进入刊登页面,显示上传图片二维码
2、用户用手机照相机扫二维码后,会跳转到手机浏览器
3、用户在手机浏览器上传图片
4、PC 浏览器端同步显示该图片
服务端实现
服务端我们用 Go 语言,kratos 框架实现,。
定义支持 SSE
的 header 头:
1
2
3
4
| // 設置頭部,告訴瀏覽器這是一個 SSE 流
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
|
实例化一个 messageChan
的通道变量。向客户端发送的消息将通过此通道传递。
1
2
| // 建立一個消息通道
messageChan := make(chan string, 1)
|
创建一个协程,向 messageChan
通道中发送数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| go func () {
timeout := time.After(30 * time.Minute)
defer func() {
// 關閉消息通道
close(messageChan)
messageChan = nil
}()
for {
images, err := s.imageUC.GetImagesByIdentifier(ctx, memberID, identifier)
if err != nil {
s.logger.Errorf("getImages error: %v", err)
return
}
if images == nil {
return
}
var buf bytes.Buffer
_ = json.NewEncoder(&buf).Encode(images)
select {
case messageChan <- buf.String():
case <-timeout:
return
}
time.Sleep(time.Second)
}
}
|
循环读取通道中的数据,messageChan
持续等待并接收数据。如果消息可用,那么它将写入到 http.ResponseWriter
中,同时刷新消息。
如果客户端关闭连接,Context().Done()
会给出一个关闭信号,我们就可以退出循环(退出函数)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| flusher, _ := w.(kratosHttp.Flusher)
for {
select {
case message, ok := <-messageChan:
if !ok {
return nil
}
// 向客戶端發送事件
event = fmt.Sprintf("event: %s\n", event)
message = fmt.Sprintf("data: %s\n", message)
retry := "retry: 2000\n"
_, _ = fmt.Fprintf(w, "%s%s%s\n", event, retry, message)
flusher.Flush()
case <-ctx.Request().Context().Done():
return nil
}
}
|
完整代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
| // RegisterSSESvcServers register SSE.
func RegisterSSESvcServers(s *kratosHttp.Server, srv *SSEService) {
s.Route("/v1").GET("/api/sse/events", srv.events)
}
func (s *SSEService) events(ctx kratosHttp.Context) error {
r := ctx.Request()
w := ctx.Response()
memberID, err := strconv.ParseUint(r.URL.Query().Get("member_id"), 10, 32)
if err != nil {
_ = JsonRes(constant.StatusParamError, "請傳遞member_id")(w, r, nil)
return nil
}
identifier := r.URL.Query().Get("identifier")
if identifier == "" {
_ = JsonRes(constant.StatusParamError, "請傳遞identifier")(w, r, nil)
return nil
}
event := r.URL.Query().Get("event")
if event == "" {
_ = JsonRes(constant.StatusParamError, "請傳遞event")(w, r, nil)
return nil
}
// 建立一個消息通道
messageChan := make(chan string, 1)
go s.getMessage(ctx, messageChan, uint32(memberID), identifier)
// 設置頭部,告訴瀏覽器這是一個 SSE 流
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// 準備 flusher
flusher, _ := w.(kratosHttp.Flusher)
for {
select {
case message, ok := <-messageChan:
if !ok {
return nil
}
// 向客戶端發送事件
event = fmt.Sprintf("event: %s\n", event)
message = fmt.Sprintf("data: %s\n", message)
retry := "retry: 2000\n"
_, _ = fmt.Fprintf(w, "%s%s%s\n", event, retry, message)
flusher.Flush()
case <-ctx.Request().Context().Done():
return nil
}
}
}
func (s *SSEService) getMessage(
ctx kratosHttp.Context,
messageChan chan<- string,
memberID uint32,
identifier string,
) {
timeout := time.After(30 * time.Minute)
defer func() {
// 關閉消息通道
close(messageChan)
messageChan = nil
}()
for {
images, err := s.imageUC.GetImagesByIdentifier(ctx, memberID, identifier)
if err != nil {
s.logger.Errorf("getImages error: %v", err)
return
}
if images == nil {
return
}
var buf bytes.Buffer
_ = json.NewEncoder(&buf).Encode(images)
select {
case messageChan <- buf.String():
case <-timeout:
return
}
time.Sleep(time.Second)
}
}
|
js 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| <!DOCTYPE html>
<html>
<head>
<title>SSE test</title>
<script type="text/javascript">
window.addEventListener("DOMContentLoaded", function () {
var evsrc = new EventSource("http://go.dev.591.com.hk:8084/v1/api/sse/events?event=upload_by_qr&member_id=1&identifier=efabcoxe");
evsrc.addEventListener("upload_by_qr", function (ev) {
console.log("Received upload_by_qr event: " + ev.data);
document.getElementById("log").insertAdjacentHTML("beforeend", "<li>" + ev.data + "</li>");
});
evsrc.onerror = function (ev) {
console.log("readyState = " + ev.currentTarget.readyState);
};
})
</script>
</head>
<body>
<h1>SSE test</h1>
<div>
<ul id="log">
</ul>
</div>
</body>
</html>
|
SSE 测试
1、启动服务
1
2
| INFO ts=2024-01-23T14:18:47+08:00 caller=http/server.go:317 service.id=addcn-taiwan.meta.juicefs.io service.name=house_release service.version= trace.id= span.id= msg=[HTTP] server listening on: [::]:8084
INFO ts=2024-01-23T14:18:47+08:00 caller=grpc/server.go:212 service.id=addcn-taiwan.meta.juicefs.io service.name=house_release service.version= trace.id= span.id= msg=[gRPC] server listening on: [::]:9094
|
2、访问 html 页面