相册系统 进度通知.md
消息通知是基于Websocket实现
gin框架中,使用websocket的方式如下:
代码
// scan/media/type.go
type TaskProcessInfo struct {
Code int
Info interface{}
}
type TaskCh struct {
Ch chan TaskProcessInfo
id int
}
func NewTaskCh(t *Task) *TaskCh {
t.TaskLock.Lock()
taskCh := &TaskCh{
Ch: make(chan TaskProcessInfo, 10000),
id: t.MaxId,
}
t.MaxId = t.MaxId + 1
t.Messages = append(t.Messages, taskCh)
t.TaskLock.Unlock()
return taskCh
}
func DeleteTaskCh(t *Task, room *TaskCh) {
t.TaskLock.Lock()
defer t.TaskLock.Unlock()
for i, ch := range t.Messages {
if ch.id == room.id {
close(ch.Ch)
t.Messages = append(t.Messages[:i], t.Messages[i+1:]...)
break
}
}
}
type History struct {
history []TaskProcessInfo
lock sync.Mutex
}
func (h *History) GetHistory() []TaskProcessInfo {
h.lock.Lock()
defer h.lock.Unlock()
return h.history
}
func NewHistory() *History {
return &History{
history: make([]TaskProcessInfo, 0, 100000),
lock: sync.Mutex{},
}
}
func (t *Task) TellInfo(code int, d interface{}) {
message := TaskProcessInfo{
Code: code,
Info: d,
}
t.Records.lock.Lock()
t.Records.history = append(t.Records.history, message)
t.Records.lock.Unlock()
for _, ch := range t.Messages {
select {
case ch.Ch <- message:
default:
fmt.Println("task message is full , id : ", ch.id)
DeleteTaskCh(t, ch)
}
}
}
// route/ws.go
package route
import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"webview/scan/media"
)
type TaskWS struct {
task *media.Task
}
func NewTaskWS(Task *media.Task) *TaskWS {
return &TaskWS{
task: Task,
}
}
func (r *TaskWS) RegisterRoute(e *gin.RouterGroup) {
e.GET("/task/center", r.WebSocketHandler)
}
func (r *TaskWS) WebSocketHandler(c *gin.Context) {
// 获取WebSocket连接
ws, err := websocket.Upgrade(c.Writer, c.Request, nil, 1024, 1024)
if err != nil {
panic(err)
}
ws.WriteJSON("hello")
taskCh := media.NewTaskCh(r.task)
ws.SetCloseHandler(func(code int, text string) error {
fmt.Println("page close")
media.DeleteTaskCh(r.task, taskCh)
return nil
})
history := r.task.Records.GetHistory()
defer ws.Close()
ws.WriteJSON(media.TaskProcessInfo{
Code: media.HistoryInfo,
Info: history,
})
for info := range taskCh.Ch {
//fmt.Println("info:", info)
err := ws.WriteJSON(info)
if err != nil {
fmt.Println("err:", err)
break
}
}
// 关闭WebSocket连接
}
说明
对于websocket来说,就没有路径,请求方式一谈了,是由普通的http请求升级而来。
定义基本的消息体
type TaskProcessInfo struct {
Code int
Info interface{}
}
Code代表消息的id,Info是interface,可以说任意类型。
传递给前端是通过json的方式,值得一提的是,Go的结构体在序列化城Json的时候,只会序列化大写字母开头的字段,所以这里的Code和Info都是大写字母开头。
发送
且Go天生支持序列化为Json,所以这里直接使用了WriteJson方法。
func (r *TaskWS) WebSocketHandler(c *gin.Context) {
// 获取WebSocket连接
ws, err := websocket.Upgrade(c.Writer, c.Request, nil, 1024, 1024)
if err != nil {
panic(err)
}
ws.WriteJSON("hello")
taskCh := media.NewTaskCh(r.task)
ws.SetCloseHandler(func(code int, text string) error {
fmt.Println("page close")
media.DeleteTaskCh(r.task, taskCh)
return nil
})
history := r.task.Records.GetHistory()
defer ws.Close()
ws.WriteJSON(media.TaskProcessInfo{
Code: media.HistoryInfo,
Info: history,
})
for info := range taskCh.Ch {
//fmt.Println("info:", info)
err := ws.WriteJSON(info)
if err != nil {
fmt.Println("err:", err)
break
}
}
// 关闭WebSocket连接
}
链接关闭
这里的taskCh是一个channel,用于接收消息,每个连接都会有一个taskCh,所以在关闭连接的时候,需要将taskCh关闭。
func DeleteTaskCh(t *Task, room *TaskCh) {
t.TaskLock.Lock()
defer t.TaskLock.Unlock()
for i, ch := range t.Messages {
if ch.id == room.id {
close(ch.Ch)
t.Messages = append(t.Messages[:i], t.Messages[i+1:]...)
break
}
}
}
值得注意的是,每次在建立连接的时候,都会下发历史消息,这样就可以在页面刷新的时候,获取到之前的消息。
history := r.task.Records.GetHistory()
ws.WriteJSON(media.TaskProcessInfo{
Code: media.HistoryInfo,
Info: history,
})
Code表
另外,对于消息的Code值,规定如下:
const (
ScanDirTaskMessage = iota
MakePhotoPreviewTaskMessage = iota
MakeVideoPreviewTaskMessage = iota
MakeVideoPreviewStartInfo = iota
HistoryInfo = iota
)
这里的iota是Go语言的一个特性,可以自增,所以这里的值就是0,1,2,3,4。
<!-- 值的table -->
Code | 说明 | Info |
---|---|---|
0 | 扫描目录任务 | ScanDirTaskMessage |
1 | 生成图片预览任务 | MakePhotoPreviewTaskMessage |
2 | 生成视频预览任务 | MakeVideoPreviewTaskMessage |
3 | 生成视频预览开始信息 | MakeVideoPreviewStartInfo |
如上,每个Code对应的Info都是不同的,这样前端就可以根据Code来处理不同的消息。
值得一提的是,这里的Info是interface,所以可以是任意类型,这样就可以传递任意类型的数据。
历史消息 竞态分析
再看看HistoryInfo,这个是用于下发历史消息的,这里的Info是一个数组,数组里面的元素是TaskProcessInfo。
func (h *History) GetHistory() []TaskProcessInfo {
h.lock.Lock()
defer h.lock.Unlock()
return h.history
}
这里的GetHistory方法是用于获取历史消息的,这里使用了锁,因为在下发消息的时候,可能会有其他的协程在修改这个数组,所以需要加锁。
此处返回的事切片,切片的底层结构如下:
graph TD
A[切片] --> B[起始地址]
A --> C[长度]
A --> D[容量]
而history的切片只会被append,当切片返回到调用方后,自身的切片成为A,返回的切片称为B
A传递到B的过程中相当于是进行了三个变量的值传递(起始地址,长度,容量),所以在别处进行TellInfo
的时候,不会影响到这里的切片,只会使A的长度发生变化,当长度超过容量的时候,会重新分配内存,此时A和B指向的起始地址就不一样了。
因此,加锁只需要加在获取部分,避免发生竞态条件即可。
通知
而在TellInfo
中,插入消息的时候也需要加锁。
func (t *Task) TellInfo(code int, d interface{}) {
message := TaskProcessInfo{
Code: code,
Info: d,
}
t.Records.lock.Lock()
t.Records.history = append(t.Records.history, message)
t.Records.lock.Unlock()
for _, ch := range t.Messages {
select {
case ch.Ch <- message:
default:
fmt.Println("task message is full , id : ", ch.id)
DeleteTaskCh(t, ch)
}
}
}
如上代码所示,使用select
来进行非阻塞的发送,如果发送失败,则说明该链接的缓冲区已满,缓冲区我们可以设置为1w,当缓冲区满了之后,说明该链接已太长时间没有读取消息,此时就可以关闭该链接了。
这样也能处理为顾及到的地方(意外关闭,未设置通道删除)
当然,这里的select
只是为了处理缓冲区满了的情况,如果不需要处理这种情况,可以直接使用ch.Ch <- message
,这样就会阻塞,直到消息被读取。(不推荐)
也可以在select
的default
下取出第一条消息,然后再发送,这样就不会阻塞,但是会导致消息丢失,具体应该参考业务细节。