V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
gitxuzan
V2EX  ›  程序员

用 golang 封装了一个 websocket 框架,主要是学习设计模式,目前在自己公司内部有使用!

  •  
  •   gitxuzan · 2023-04-02 02:23:21 +08:00 · 2887 次点击
    这是一个创建于 583 天前的主题,其中的信息可能已经有所发展或是发生改变。

    zinx-websocket 版本

    代码地址: https://github.com/Xuzan9396/zinx-ws

    目标?

    维护 golang 版 websocket 版本, 打算跟 zinx tcp 版本同步,然后无偿开源

    为什么做这个项目?

    做这个项目初衷,主要因为自己公司做直播平台的,之前公司写了一套,websocket 封装的框架,主要做房间服务器,和 h5 小游戏服务器,但是由于感觉随着业务增大,后面感觉某些设计有缺陷,看了冰哥的设计模式, 打算跟着冰哥设计模式重写一个 websocket

    打算项目使用?

    后续会在自己的项目中使用,打算在直播间的小游戏,准备上线使用
    

    具体怎么使用

    参数说明
    
      "Name": "zin-ws -------gitxuzan",
      "Host": "127.0.0.1",
      "端口": "端口",
      "TcpPort": 8999,
      "最大连接数": "最大连接数",
      "MaxConn": 1000,
      "最大的包大小": "最大包大小",
      "MaxPackageSize": 4096,
      "worker 池子": "worker 池子 10 个并发处理读的数据",
      "WorkerPoolSize": 10
    
    
    数据发送格式简单说明(后续修改成格式定义)
    MsgId len body
    协议号 ID body 长度 二进制 body 长度
    uint32 uint32 []byte
    服务端配置设置
    wsconfig.SetWSConfig("127.0.0.1", 8999, wsconfig.WithName("gitxuzan ----- websocket"))
    还有其他设置例如:
    wsconfig.WithWorkerSize(10) // 设置 10 个 worker 处理业务逻辑
    wsconfig.WithMaxPackSize(4096)  // 每个发送的包大小 4k
    wsconfig.WithMaxConn(1000)	// 同时在线 1000 个连接     
    wsconfig.WithVersion()	        // 自定义本地版本   
    
    定义业务逻辑协议
    type LoginInfo struct {
    	znet.BaseRouter
    }
    
    例如上面写的 LoginInfo 继承 znet.BaseRouter
    重写三个方法依次执行:
    PreHandle
    Handle
    PostHandle
    
    设置 router 映射到具体的方法上
    同时要设置 router 
    	// 登录
    s.AddRouter(1001, &LoginInfo{})
    1001 代表协议号,相当于协议投里面的 msgId,映射到具体某个业务,发送端需要发送对应的协议号
    
    request 的一些功能,例如下面的案例,模拟登入验证等等
    func (l *LoginInfo) PreHandle(request ziface.IRequest) {
    request 中 目前有发送,断开,获取当前属性,获取当前连接
    }
    
    完整的服务端使用代码
    package main
    
    import (
    	wsconfig "github.com/Xuzan9396/ws/config"
    	"github.com/Xuzan9396/ws/ziface"
    	"github.com/Xuzan9396/ws/znet"
    	"log"
    	"time"
    )
    
    func init() {
    	log.SetFlags(log.Lshortfile | log.LstdFlags)
    }
    
    type LoginInfo struct {
    	znet.BaseRouter
    }
    
    // 模拟登录逻辑
    func (l *LoginInfo) PreHandle(request ziface.IRequest) {
    	auth := false
    	<-time.After(5 * time.Second) // 模拟业务
    	if auth == false {
    		// 模拟登录认证失败,然后断开连接
    		request.GetConnetion().Stop()
    	}
    }
    
    type PingInfo struct {
    	znet.BaseRouter
    }
    type HelloInfo struct {
    	znet.BaseRouter
    }
    
    func (p *PingInfo) PreHandle(request ziface.IRequest) {
    	log.Printf("pre:%s,conntId:%d,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID())
    }
    
    func (p *PingInfo) Handle(request ziface.IRequest) {
    	log.Printf("Handle:%s,conntId:%d,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID())
    
    }
    
    func (p *PingInfo) PostHandle(request ziface.IRequest) {
    	log.Printf("post:%s,conntId:%d,,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID())
    	request.GetConnetion().SendMsg(request.GetMsgID(), []byte("回复 ping!"))
    }
    func (p *HelloInfo) PreHandle(request ziface.IRequest) {
    	log.Printf("pre:%s,conntId:%d,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID())
    }
    
    func (p *HelloInfo) Handle(request ziface.IRequest) {
    	log.Printf("Handle:%s,conntId:%d,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID())
    
    }
    
    func (p *HelloInfo) PostHandle(request ziface.IRequest) {
    	log.Printf("post:%s,conntId:%d,,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID())
    	request.GetConnetion().SendMsg(request.GetMsgID(), []byte("回复 hello!"))
    
    }
    
    // 创建链接后初始化函数
    func SetOnConnetStart(conn ziface.IConnection) {
    	conn.SetProperty("name", "xuzan")
    	res, bools := conn.GetProperty("name")
    	if bools {
    		log.Println("name", res.(string))
    	}
    	conn.RemoveProperty("name")
    }
    
    func GetConnectNum(s ziface.IServer) {
    	go func() {
    		ticker := time.NewTicker(5 * time.Second)
    		defer ticker.Stop()
    		for {
    			select {
    			case <-ticker.C:
    				connNumTotal := s.GetConnMgr().Len()
    				log.Println("连接数量:", connNumTotal)
    			}
    		}
    	}()
    }
    
    func main() {
    	//设置配置
    	wsconfig.SetWSConfig("127.0.0.1", 8999, wsconfig.WithName("gitxuzan ----- websocket"))
    	// 创建一个 server 句柄
    	s := znet.NewServer()
    	// 启动 sever
    	s.SetOnConnStart(SetOnConnetStart)
    	// 测试业务
    	s.AddRouter(1, &HelloInfo{})
    	// 其他业务
    	s.AddRouter(2, &PingInfo{})
    	// 登录
    	s.AddRouter(1001, &LoginInfo{})
    
    	// 监控长连接数量
    	GetConnectNum(s)
    	s.Server()
    }
    
    
    
    完整的客户端案例代码
    package main
    
    import (
    	"flag"
    	"github.com/Xuzan9396/ws/znet"
    	"github.com/gorilla/websocket"
    	"log"
    	"net/http"
    	"net/url"
    	"os"
    	"os/signal"
    	"time"
    )
    
    var addr = flag.String("addr", "127.0.0.1:8999", "http service address")
    
    func main() {
    	flag.Parse()
    	log.SetFlags(0)
    
    	interrupt := make(chan os.Signal, 1)
    	signal.Notify(interrupt, os.Interrupt)
    
    	u := url.URL{Scheme: "ws", Host: *addr, Path: "/"}
    	log.Printf("connecting to %s", u.String())
    	c, _, err := websocket.DefaultDialer.Dial(u.String(), http.Header{"User-Agent": {""}})
    	if err != nil {
    		log.Fatal("dial:", err)
    	}
    	defer c.Close()
    
    	log.Println("ws 连接成功")
    	ticker := time.NewTicker(3 * time.Second)
    	defer ticker.Stop()
    
    	p := znet.NewDataPack()
    	by := []byte{'h', 'e', 'l', 'l', 'o'}
    	resBytes, err := p.Pack(&znet.Message{
    		Id:      1,
    		DataLen: uint32(len(by)),
    		Data:    by,
    	})
    
    	byPing := []byte("ping")
    	resPingBytes, _ := p.Pack(&znet.Message{
    		Id:      2,
    		DataLen: uint32(len(byPing)),
    		Data:    byPing,
    	})
    	timer := time.NewTimer(30 * time.Second)
    
    	go read(c)
    	for {
    		select {
    		case <-timer.C:
    			// 模拟认证登录
    			sendMsg := []byte("login")
    			sendMsgPack, _ := p.Pack(&znet.Message{
    				Id:      1001,
    				DataLen: uint32(len(sendMsg)),
    				Data:    sendMsg,
    			})
    			err := c.WriteMessage(websocket.BinaryMessage, sendMsgPack)
    			if err != nil {
    				log.Println("write:", err)
    				timer.Stop()
    				return
    			}
    			log.Println("login 写入成功:", string(sendMsg))
    			timer.Stop()
    		case <-ticker.C:
    			sendMsg := resBytes
    			err := c.WriteMessage(websocket.BinaryMessage, sendMsg)
    			if err != nil {
    				log.Println("write:", err)
    				return
    			}
    			log.Println("写入成功:", string(by))
    
    			err = c.WriteMessage(websocket.BinaryMessage, resPingBytes)
    			if err != nil {
    				log.Println("write:", err)
    				return
    			}
    
    			log.Println("写入成功:", string(resPingBytes))
    		case <-interrupt:
    			log.Println("interrupt")
    			err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
    			if err != nil {
    				log.Println("write close:", err)
    				return
    			}
    
    		}
    	}
    
    }
    
    func read(c *websocket.Conn) {
    	for {
    		_, message, err := c.ReadMessage()
    		if err != nil {
    			log.Println("read:", err)
    			return
    		}
    		p := znet.NewDataPack()
    		img, err := p.Unpack(message)
    		if err != nil {
    			log.Println("read:", err)
    			return
    		}
    		log.Printf("msgId:%d,recv: %s", img.GetMsgId(), img.GetData())
    	}
    }
    
    
    参考链接

    https://github.com/aceld/zinx 来自冰哥

    16 条回复    2023-04-03 22:25:05 +08:00
    Nnq
        1
    Nnq  
       2023-04-02 05:56:51 +08:00
    golang 现在好多相似的库,都快选择恐惧症了
    fridaycatye
        2
    fridaycatye  
       2023-04-02 09:14:16 +08:00
    强,我也是看了他的教程学习 go
    pubby
        3
    pubby  
       2023-04-02 11:55:37 +08:00
    我感觉 websocket 的痛点是 1 支持大量连接 2.和业务解耦

    我们公司是做了一套 ws 的基础服务
    分成 2 部分
    [接入] 部分,做成分布式可水平任意数量扩展,部署在各种廉价云机器上。
    [消息服务] 部分,一般部署在业务所在集群,和每个 [接入] 器之间建立(一条) ws 进行数据通信,维护所有客户端信息。

    业务端使用 http/grpc 和 [消息服务] 进行收发消息。
    这样业务根本不需要关心 ws 处理,只要做消息回调和消息发送的处理。


    接入:客户端先 http 请求一个连接 token ,此时会告诉客户端连接 token ,以及连到哪个 [接入] 点
    发送消息:业务端---http/grpc--> [消息服务] ---ws---> [接入器] ----ws----> 客户端
    接收消息:客户端----ws----> [接入器] -----ws-----> [消息服务] ----http/grpc---> 业务端


    [接入] 和 [消息服务] 之间只使用一条 ws 连接交换消息,这样传输链路上的各种网关就不需要维护大量 ws 连接了。
    gitxuzan
        4
    gitxuzan  
    OP
       2023-04-02 12:12:50 +08:00
    @pubby 这套架构搭建复杂吗,有成熟的具体方案吗?还有个疑问,你这一条 ws 是共用客户端和业务端共用?
    lesismal
        5
    lesismal  
       2023-04-02 12:15:02 +08:00   ❤️ 1
    @pubby
    接入层如果也是用 go ,还可以考虑用我这个来承载大量连接降低硬件消耗:
    https://github.com/lesismal/nbio
    如果不是用 go 而是用 nginx 那些,就不需要我这个了,除非为了一些功能开发方便

    @gitxuzan @fridaycatye
    刚看了一眼你们冰哥哥的代码,比如:
    https://github.com/aceld/zinx/blob/master/ztimer/timer.go#L76
    这种定时器要在到期前一直占用一个协程。而标准库 time.AfterFunc 只要在到期时启动一个协程、执行完就退出。
    冰的这种代码,太不适合真正的大项目了,也就玩玩小项目能干翻 py 这些。
    学思路可以,别被这些理论派、缺少实战的 up 把自己带偏了。
    这代码辣眼睛,不继续看了。
    pubby
        6
    pubby  
       2023-04-02 12:27:56 +08:00
    @lesismal 嗯,当时出发点就是堆机器,所以用了当时最可靠的 github.com/gorilla/websocket ( 2017 年)

    golang 各种解决单机 ws 能力的方案还是最近几年的事情。
    pubby
        7
    pubby  
       2023-04-02 12:32:22 +08:00
    @gitxuzan “这套架构搭建复杂吗,有成熟的具体方案吗?还有个疑问,你这一条 ws 是共用客户端和业务端共用?”

    我们 2017 年就开始用这套方案了。

    业务端不使用 ws ,业务端使用 http/grpc 和 [消息服务] 通信。 这样我们开发人员就不需要写 ws 相关的逻辑,当成 web 服务的逻辑来写。也和业务端语言无关了。
    lesismal
        8
    lesismal  
       2023-04-02 12:46:58 +08:00
    @gitxuzan @pubby
    我这个能让你们代码更简单,老业务当然没必要去浪费时间替换,但是新业务的话,欢迎试驾。。。
    https://github.com/lesismal/arpc
    性能也还可以:
    https://colobu.com/2022/07/31/2022-rpc-frameworks-benchmarks

    易用性和扩展性请看看示例,该有的基本都有了
    名字带了 rpc 但其实是全功能的网络库,server 主动发消息都可以的,也不限制 rpc 的方式,也可以只是推送消息不需要另一端响应,client/server side 都可以做这些,做游戏网络库可以,做 IM 可以,做 RPC 可以,做推送服务之类的都可以。支持中间件之类的各种扩展。支持前端 js client 而且也能用 http ,所以 web 前端一把梭也可以。常见的游戏客户端引擎基本都支持 js ,所以用来做游戏也可以一把梭。
    在一些对性能要求极致的比如 fps 游戏,当然我还是会自己定制网络库,把中间件之类的不必要的代码去掉,把协议头做更极致的优化。
    neoblackcap
        9
    neoblackcap  
       2023-04-02 18:15:07 +08:00
    @Nnq 一个建议,如果你觉得这活的工作量不大,那么请自己实现自己维护。那么就不用去理解人家的库是怎么调用的。而且维护起来也更加轻松
    Nnq
        10
    Nnq  
       2023-04-03 02:18:41 +08:00
    @neoblackcap 只是感觉大家都是反复造轮子的感觉,时间都放在重写维护上了,如果是一个东西企业内部重度使用的话还好,有些 lib 可能几年只用那么一次,之后都不一定有人维护了,文档要是也没有;基本上可以扔垃圾桶里了
    bv
        11
    bv  
       2023-04-03 09:41:58 +08:00
    @lesismal #6 看 B 站上讲 GMP 调度,算是讲的最清晰的 UP 了,受益不少。但是 zinx 和这个 websocket 代码质量确实辣眼睛。
    lesismal
        12
    lesismal  
       2023-04-03 10:32:15 +08:00
    @bv 虽然没怎么看,但隔三岔五就会看到有人夸赞,可以看出其实作为 golang 知识传播、做得算是不错了。知识培训机构这种和实战差别还是很大,尤其是好些人习惯了不管是啥自己先造个轮子再说,直接使用标准库 timer 比他这个好得多却非要画蛇添足。不只是 zinx ,其他一些培训机构、某些厂出的号称”架构师“的 go 框架也差不多
    bv
        13
    bv  
       2023-04-03 10:52:49 +08:00   ❤️ 1
    @lesismal 标准库已经相当优秀,习惯上来就自造轮子往往是对标准库了解不够,使用标准库能更容易写出简洁、通用、易于理解的代码。
    lizhenda
        14
    lizhenda  
       2023-04-03 11:44:49 +08:00
    @bv 赞同~~
    Nazz
        15
    Nazz  
       2023-04-03 11:49:25 +08:00
    @Nnq 尝试造轮子才能推动自身进步
    Nnq
        16
    Nnq  
       2023-04-03 22:25:05 +08:00
    @Nazz 没毛病 😂
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3693 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 04:18 · PVG 12:18 · LAX 20:18 · JFK 23:18
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.