消息通知是基于Websocket实现

gin框架中,使用websocket的方式如下:

代码

// scan/media/type.go

type TaskProcessInfo struct {
    Code int
    Info interface{}
}

type TaskCh struct {
    Ch chan TaskProcessInfo
    id int
}

func NewTaskCh(t *Task) *TaskCh {
    t.TaskLock.Lock()
    taskCh := &TaskCh{
        Ch: make(chan TaskProcessInfo, 10000),
        id: t.MaxId,
    }
    t.MaxId = t.MaxId + 1
    t.Messages = append(t.Messages, taskCh)
    t.TaskLock.Unlock()
    return taskCh
}

func DeleteTaskCh(t *Task, room *TaskCh) {
    t.TaskLock.Lock()
    defer t.TaskLock.Unlock()
    for i, ch := range t.Messages {
        if ch.id == room.id {
            close(ch.Ch)
            t.Messages = append(t.Messages[:i], t.Messages[i+1:]...)
            break
        }
    }
}

type History struct {
    history []TaskProcessInfo
    lock    sync.Mutex
}

func (h *History) GetHistory() []TaskProcessInfo {
    h.lock.Lock()
    defer h.lock.Unlock()
    return h.history
}

func NewHistory() *History {
    return &History{
        history: make([]TaskProcessInfo, 0, 100000),
        lock:    sync.Mutex{},
    }
}

func (t *Task) TellInfo(code int, d interface{}) {
    message := TaskProcessInfo{
        Code: code,
        Info: d,
    }
    t.Records.lock.Lock()
    t.Records.history = append(t.Records.history, message)
    t.Records.lock.Unlock()
    for _, ch := range t.Messages {
        select {
        case ch.Ch <- message:
        default:
            fmt.Println("task message is full , id : ", ch.id)
            DeleteTaskCh(t, ch)
        }
    }
}

// route/ws.go
package route

import (
    "fmt"
    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
    "webview/scan/media"
)

type TaskWS struct {
    task *media.Task
}

func NewTaskWS(Task *media.Task) *TaskWS {
    return &TaskWS{
        task: Task,
    }
}

func (r *TaskWS) RegisterRoute(e *gin.RouterGroup) {
    e.GET("/task/center", r.WebSocketHandler)
}

func (r *TaskWS) WebSocketHandler(c *gin.Context) {
    // 获取WebSocket连接
    ws, err := websocket.Upgrade(c.Writer, c.Request, nil, 1024, 1024)
    if err != nil {
        panic(err)
    }
    ws.WriteJSON("hello")
    taskCh := media.NewTaskCh(r.task)
    ws.SetCloseHandler(func(code int, text string) error {
        fmt.Println("page close")
        media.DeleteTaskCh(r.task, taskCh)
        return nil
    })
    history := r.task.Records.GetHistory()
    defer ws.Close()
    ws.WriteJSON(media.TaskProcessInfo{
        Code: media.HistoryInfo,
        Info: history,
    })
    for info := range taskCh.Ch {
        //fmt.Println("info:", info)
        err := ws.WriteJSON(info)
        if err != nil {
            fmt.Println("err:", err)
            break
        }
    }
    // 关闭WebSocket连接

}

说明

对于websocket来说,就没有路径,请求方式一谈了,是由普通的http请求升级而来。

定义基本的消息体

type TaskProcessInfo struct {
    Code int
    Info interface{}
}

Code代表消息的id,Info是interface,可以说任意类型。

传递给前端是通过json的方式,值得一提的是,Go的结构体在序列化城Json的时候,只会序列化大写字母开头的字段,所以这里的Code和Info都是大写字母开头。

发送

且Go天生支持序列化为Json,所以这里直接使用了WriteJson方法。

func (r *TaskWS) WebSocketHandler(c *gin.Context) {
    // 获取WebSocket连接
    ws, err := websocket.Upgrade(c.Writer, c.Request, nil, 1024, 1024)
    if err != nil {
        panic(err)
    }
    ws.WriteJSON("hello")
    taskCh := media.NewTaskCh(r.task)
    ws.SetCloseHandler(func(code int, text string) error {
        fmt.Println("page close")
        media.DeleteTaskCh(r.task, taskCh)
        return nil
    })
    history := r.task.Records.GetHistory()
    defer ws.Close()
    ws.WriteJSON(media.TaskProcessInfo{
        Code: media.HistoryInfo,
        Info: history,
    })
    for info := range taskCh.Ch {
        //fmt.Println("info:", info)
        err := ws.WriteJSON(info)
        if err != nil {
            fmt.Println("err:", err)
            break
        }
    }
    // 关闭WebSocket连接

}

链接关闭

这里的taskCh是一个channel,用于接收消息,每个连接都会有一个taskCh,所以在关闭连接的时候,需要将taskCh关闭。

func DeleteTaskCh(t *Task, room *TaskCh) {
    t.TaskLock.Lock()
    defer t.TaskLock.Unlock()
    for i, ch := range t.Messages {
        if ch.id == room.id {
            close(ch.Ch)
            t.Messages = append(t.Messages[:i], t.Messages[i+1:]...)
            break
        }
    }
}

值得注意的是,每次在建立连接的时候,都会下发历史消息,这样就可以在页面刷新的时候,获取到之前的消息。

history := r.task.Records.GetHistory()
ws.WriteJSON(media.TaskProcessInfo{
    Code: media.HistoryInfo,
    Info: history,
})

Code表

另外,对于消息的Code值,规定如下:

const (
    ScanDirTaskMessage          = iota
    MakePhotoPreviewTaskMessage = iota
    MakeVideoPreviewTaskMessage = iota
    MakeVideoPreviewStartInfo   = iota
    HistoryInfo                 = iota
)

这里的iota是Go语言的一个特性,可以自增,所以这里的值就是0,1,2,3,4。
<!-- 值的table -->

Code说明Info
0扫描目录任务ScanDirTaskMessage
1生成图片预览任务MakePhotoPreviewTaskMessage
2生成视频预览任务MakeVideoPreviewTaskMessage
3生成视频预览开始信息MakeVideoPreviewStartInfo

如上,每个Code对应的Info都是不同的,这样前端就可以根据Code来处理不同的消息。

值得一提的是,这里的Info是interface,所以可以是任意类型,这样就可以传递任意类型的数据。

历史消息 竞态分析

再看看HistoryInfo,这个是用于下发历史消息的,这里的Info是一个数组,数组里面的元素是TaskProcessInfo。

func (h *History) GetHistory() []TaskProcessInfo {
    h.lock.Lock()
    defer h.lock.Unlock()
    return h.history
}

这里的GetHistory方法是用于获取历史消息的,这里使用了锁,因为在下发消息的时候,可能会有其他的协程在修改这个数组,所以需要加锁。

此处返回的事切片,切片的底层结构如下:

graph TD
    A[切片] --> B[起始地址]
    A --> C[长度]
    A --> D[容量]

切片示意图

而history的切片只会被append,当切片返回到调用方后,自身的切片成为A,返回的切片称为B

A传递到B的过程中相当于是进行了三个变量的值传递(起始地址,长度,容量),所以在别处进行TellInfo的时候,不会影响到这里的切片,只会使A的长度发生变化,当长度超过容量的时候,会重新分配内存,此时A和B指向的起始地址就不一样了。

因此,加锁只需要加在获取部分,避免发生竞态条件即可。

通知

而在TellInfo中,插入消息的时候也需要加锁。

func (t *Task) TellInfo(code int, d interface{}) {
    message := TaskProcessInfo{
        Code: code,
        Info: d,
    }
    t.Records.lock.Lock()
    t.Records.history = append(t.Records.history, message)
    t.Records.lock.Unlock()
    for _, ch := range t.Messages {
        select {
        case ch.Ch <- message:
        default:
            fmt.Println("task message is full , id : ", ch.id)
            DeleteTaskCh(t, ch)
        }
    }
}

如上代码所示,使用select来进行非阻塞的发送,如果发送失败,则说明该链接的缓冲区已满,缓冲区我们可以设置为1w,当缓冲区满了之后,说明该链接已太长时间没有读取消息,此时就可以关闭该链接了。

这样也能处理为顾及到的地方(意外关闭,未设置通道删除)

当然,这里的select只是为了处理缓冲区满了的情况,如果不需要处理这种情况,可以直接使用ch.Ch <- message,这样就会阻塞,直到消息被读取。(不推荐)

也可以在selectdefault下取出第一条消息,然后再发送,这样就不会阻塞,但是会导致消息丢失,具体应该参考业务细节。

使用文本编译器Code

[toc]

gcc

gcc是c/c++语言的编译器 只是在编译程序时需要

一般来说 windows使用的mingw的gcc
当然 有很多方法

方法1 安装mingw

随便找了一个,因为下载源在国外 下载速度很慢,所以我挂了个网盘链接 建议是做好科学上网的准备,如果因为网络原因 方法1可能安装成功,可能安装失败,如果安装失败 参考方法3

「setup-x86_64.exe」,点击链接保存,或者复制本段内容,打开「阿里云盘」APP ,无需下载,极速在线查看享用。
链接:https://www.aliyundrive.com/s/nSchSQCVzAx

然后下一步安装法直到出现这个界面 像我一样勾选

先选择分类为category

然后点开Devel

接着往下滑 直到找到gcc(可以在搜索框里面输入gcc)

像我一样 先选择对应的版本 然后勾选框框

然后往下执行

当完成安装过后 点击windows菜单 开始搜索 编辑系统环境变量

像这样

进入过后

双击path

点击新建添加新的环境变量 通过浏览找到安装cygwin64的目录 然后把双击下面的bin目录将其作为环境变量

都确认过后 打开cmd 输出如其

gcc: fatal error: no input files
compilation terminated.

就成功了

方法2 使用wsl2 子系统也就是linux

暂时不讲

方法3 将gcc指向dev的gcc(只能说将就)

暂时不讲

code(推荐 本编教程)

考虑到抽筋的网络问题 直接从我的分享下载吧

如果是linux或者Mac 请到官网下载

我的备份

然后开始安装

全部勾选

然后无脑下一步安装法

装好过后打开

111

如我所示打开搜索框

搜索code runner 然后点击安装

111

安装好后点击我圈起来的小齿轮 点击extension settings

111

翻到如下设置,将Run in terminal ,save all files before run 以及save file before run 都勾选上

111

如图

111

然后关闭当前设置页面 返回插件页面继续安装插件 c++插件 我圈起来的俩个都装

111

111

以及有需要的朋友可以装汉化插件 基本就齐活了

这时 回到桌面 建立一个叫code的文件夹 然后右键 选择通过code打开

111

111

打开过后是空的 像我一样 点击我圈起来的那个小符号 就能新建一个文件了

文件名输入后记得加后缀.cpp或者 .c

输入代码

#include<stdio.h>
int main(){
    int a ;
    scanf("%d",&a);
    printf("%d hello world\n",a);
    return 0;
}

按下ctrl alt n就可以运行

111

运行框如图所示

输入111 成功输出 证明没问题