V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
wxxshu
V2EX  ›  推广

弹幕系统更新的血与泪

  •  1
     
  •   wxxshu · 2017-12-20 19:04:34 +08:00 · 4187 次点击
    这是一个创建于 2528 天前的主题,其中的信息可能已经有所发展或是发生改变。

    16 年是直播浪潮兴起的元年,许多互联网公司的业务都开始涉足直播内容模块。我目前所在公司接手的第一份工作,就是直播业务中的弹幕系统优化。随着公司直播业务的变化,弹幕系统从最初的版本到后来优化了三四个版本,这个过程大概持续了一年的时间,本文将从我司早期的弹幕系统开始给大家介绍整个更新过程的“血与泪 ”。

    早期弹幕系统

    一、基本状况

    1.由 PHP + Gateway 框架编写
    2.所有的 Client ID 存放在 Redis 里面
    3.最初由三台机器挂载在 LVS 系统后方提供服务
    4.使用多进程的方式,开启多个 worker 进程来处理消息传递内容

    二、存在的问题

    1.内存占用量巨大,单机( 4 核 8G 配置)承受 500 左右的 Client 就会达到内存上限
    2.每次发送消息的时候,每台机器都需要从 Redis 里面拿取对应房间的所有 Client ID ;并发高时,Redis 的单进程处理效率和内网带宽就成为瓶颈 。
    3.单机的并发处理能力被消息处理的 worker 进程数量限制。同时开启过多的进程,也是对系统资源的格外浪费。
    4.单房间超过 2000 人的时候,消息的延迟有可能会达到 1 分钟左右,这是极其严重的问题。

    三、临时改造

    由于需要解决的问题比较紧迫,所以快速做了一些逻辑上的改变和业务层面的取舍:
    1.对 Redis 的实例进行了拆分,使用了双机,单机 4 实例的方式,分散了 Redis 的压力。 2.对消息处理 worker 进程的逻辑做了一些修改,限制了单位时间内进行广播的消息数量,多余的消息会被丢弃 。
    3.对于已经完成了直播进入点播状态的房间,额外启用了另外一套弹幕系统来进行分流。
    4.单个房间切成多个房间进行消息处理。

    四、改造之后的效果

    1.Redis 压力大幅度降低;
    2.单机 IO 性能压力降低;
    3.同样数量的机器,可以承载更多的直播房间个数。 image 但是,根本问题并没有得到解决。在临时解决压力问题之后,我们需要花一些时间来重新对弹幕系统进行分析,按照分析后的需求,对新的弹幕系统进行重构。

    新的弹幕系统

    一、新弹幕系统面临的挑战

    1.单房间人数较高,依照我们公司直播情况,单房间 5 – 10 万人同时在线是会出现的。
    2.由于直播内容等情况造成的某时间段用户暴涨。
    3.需要尽可能实时到达,延迟过高的话会大大降低互动的实时性。
    4.每一条消息,都要递送大量的长连接。
    5.大量长连接的维护机制。
    6.在运营的过程中,需要处理用户黑名单、IP 黑名单、敏感词等需求。

    二、新的弹幕系统需求

    1.由于内存的管理对于 PHP 来说算是一个短板,对于大并发且长时间稳定不需要经常更新维护的系统来说,并非最好的选择,因此选一门合适的语言是必须的。
    2.分布式支持,可以快速的横向扩展,单房间人数可以支持到十万级别。
    3.可以方便快捷的对系统进行第三方消息的发送(例如礼物信息、系统通知等)。
    4.尽量使用本地内存管理来记录房间内客户端连接,剩下大量的数据交互和查询时间。 5.并发支持消息广播,提高广播效率。

    三、新弹幕系统版本的改造方法

    1.选择当前正红且对高并发支持良好的 Golang 作为开发语言。
    2.使用开发语言进行客户端连接的管理,且每台机器只管理自己收到的连接请求。
    3.使用并发的房间内广播逻辑,同时对多人进行广播。

    新弹幕系统改造的相关经验

    下面先对一个模块细节进行分析,然后进一步分析模块上层的调度逻辑。

    一、房间管理

    type RoomInfo struct {
    RoomID string //房间 ID
    Lock *sync.Mutex //房间操作锁
    Rows []*RowList //房间多行 Slice
    Length uint64 //当前房间总节点数
    LastChangeTime time.Time //最后一次更新时间
    }
    type RowList struct {
    Nodes []*Node //节点列表
    }
    

    由于每个房间都有自己的 ID,客户端建立连接之后,就会被放到一个大厅房间里面。接着,客户端自己提交 RoomID 上来,连接会被重新连接到对应的房间里面。 每个连接在建立之后,都会被包装成一个 Node,放到 Rows 里面。

    type Node struct {
    RoomID string
    ClientID int64
    Conn *websocket.Conn
    UpdateTime time.Time
    LastSendTime time.Time //最后一次发送消息时间
    IsAlive bool
    DisabledRead bool//是否已经被关闭了发言权限
    }
    

    每一个 Node 中,都有一个 IsAlive 来表示连接是否成功。如果连接断开,或者因为其他原因强制停止服务的话,会修改此标记状态。然后由定时的处理机制将此连接关闭并从内存中清除。Rows 的本质就是一组事先设定了长度的 Node Slice。

    发送消息的时候,每一组 slice 使用一个协程来顺序发送。同一房间内的连接,就可以依照 slice 分组进行并发发送。 发送的时候,会使用锁将整个房间锁住,以防止并发情况下同一连接混入两条信息。

    二、消息管理

    var messageChannel map[string]chan nodeMessage
    func init() {
    messageChannel = make(map[string]chan nodeMessage)
    }
    func sendMessageToChannel(roomId string, nm nodeMessage) error {
    //如果房间不存在,创建一个房间
    if c, ok := messageChannel[roomId]; ok {
    c
    } else {
    //创建房间通道
    messageChannel[roomId] = make(chan nodeMessage, 1024)
    messageChannel[roomId]
    //创建房间实例
    roomObj := &RoomInfo{}
    roomObj.RoomID = roomId
    roomObj.Rows = make([]*RowList, 0, 4)
    roomObj.Lock = &sync.Mutex{}
    //创建新的协程来监控房间
    go daemonReciver(messageChannel[roomId], roomObj)
    go timerForClean(messageChannel[roomId])
    //如果是大厅的话,启动大厅清理协程
    if roomId == "" {
    go CleanHall(roomObj)
    }
    }
    return nil
    }
    

    以上是关于弹幕信息传递的一部分代码。 首先,每一个房间,都有自己的消息通道,所有的这些通道根据 RoomID 为 key,记录在一个叫做 messageChannel 的 map 里面。 每次收到消息的时候,都直接把消息丢到 channel 里面,就可以了。(后面由守护协程来处理)如果没有房间通道的话,就建立房间的通道 channel,并启动每个房间的一系列协程。

    三、服务器管理

    这里的方案比较简单,其实就是建立一个上一层的聊天室即一个房间,所有的服务器都会主动连接到这里,每一个服务器收到的信息,就会在这个房间里面广播到别的机器去。

    四、守护协程们管理

    守护协程处理很多琐碎的事情,保证房间内信息的正常分发以及房间连接的正常管理。各个守护协程的功能如下:
    1.消息发送协程:每个房间配备一个,从 channel 里面获取到要发送到本房间的消息,然后在并发调用各个 RowList 的发送消息机制。
    2.房间整理协程:因为会有连接断开、房间更换等修改 Node 状态的行为,所以定期会有房间整理协程来进行节点整理,删除当前房间无关的节点等以提高消息的发送效率。 image

    五、测试相关

    运行环境:云主机 8 核 16G 实例
    操作系统:Centos7 (未进行系统优化或参数调整)
    测试内容:单机建立 15000 websocket 连接,并且发送消息,进入指定房间(所有连接进入同一房间)。一个客户端进入房间,发送一条消息,经过敏感词处理、IP 和用户黑名单处理,然后被广播到所有节点。
    测试结果:
    CPU 占用:保持在 5%以下
    内存占用:2GB (包括操作系统本身开销)
    网络占用:峰值 10Mb/s 左右
    发送效率:15000 节点广播,100ms – 110ms 左右。
    根据测试结果计算:
    完全可以在 8 核 16G 的机器上,实现无压力运行 50K 并发,峰值接近 60 – 70K 的处理能力。

    六、更多分享

    我目前正在尝试把完成这套弹幕系统的基本功能开源出来。已经提取出来了一部分,当前的地址为: https://github.com/logan-go/roomManager,感兴趣的读者可以通过链接查看。

    小结

    弹幕系统给视频直播 /点播增加了更多内容的互动娱乐性质,从最初的 A 站 B 站发展到现在各主流视频网站 APP。如何健康高效的管理弹幕系统,也是当下视频行业需要重视的一门技术活。

    9 条回复    2018-01-08 09:53:39 +08:00
    finalspeed
        1
    finalspeed  
       2017-12-21 00:17:55 +08:00 via Android
    厉害了,赞一个
    wxxshu
        2
    wxxshu  
    OP
       2017-12-21 11:21:42 +08:00
    想看更多技术干货请关注“ UCloud 技术公告牌”微信公众号哦!
    gamexg
        3
    gamexg  
       2017-12-21 16:34:48 +08:00
    >发送消息的时候,每一组 slice 使用一个协程来顺序发送。

    请教如果某个连接的 tcp 缓冲区满了,会造成整组都被阻塞吧?
    这个问题怎么解决?

    另外连接数是 1.5W ?
    根据网关的经验,单机 5W 连接共开 10W 协程也没什么问题,也许每个连接一个协程一个 chan 能够解决?
    suilongfei
        4
    suilongfei  
       2017-12-21 16:57:11 +08:00
    @gamexg 目前缓冲区的这个地方,还没有进行优化。
    第一个版的开发周期比较短,而且相对来说,弹幕系统单链接发送的信息量比较少,暂时没有出现缓冲区满的情况。这个方面的考虑暂时也就被靠后了。后面会在 GitHub 上发一个方便使用的版本出来,然后再根据大家实际遇到的问题进行优化。

    第二个问题,虽然多协程多 chan 的切换消耗比线程要低的多,但是也不是没有开销的。另外,如果对一个房间一个 chan 发消息,和对一个房间一万个 chan 发消息,瞬间的内存使用应该还是不小的。所以,折中考虑,目前是单房间单 chan,然后每个 slice 并发发送
    gamexg
        5
    gamexg  
       2017-12-21 17:59:10 +08:00
    @suilongfei #4 不希望增加 chan 也许可以做个心跳?
    发消息的协程被阻塞时调度线程负责强制关闭阻塞的连接或者启动一个新协程继续发送消息?
    suilongfei
        6
    suilongfei  
       2017-12-21 19:23:46 +08:00
    @gamexg 嗯嗯,这个问题,刚才我考虑了一下,我觉得可以分两个方面来考虑

    第一,我觉得你提出的问题比较实际,如果在对链接发送消息的时候确实被阻塞了,应该需要一个完善的处理方案。
    第二,目前,websocket 使用的是第三方库 https://github.com/gorilla/websocket,个人觉得这个问题,需要在连接库端进行处理。如果有必要的话,可以一起对 websocket 库提代码
    gamexg
        7
    gamexg  
       2017-12-21 20:07:48 +08:00
    @suilongfei #6 仔细考虑了下,可以再增加一个协程,整个流程做成这样:


    ```

    // 群发单条消息
    func(){

    taskChan:=make(chan *Task)

    go func(){
    for conn :=range taskChan{
    // 对单个连接发送数据
    }
    }()


    for _,row:=range Rows {

    tick:=time.NewTicker(1*time.Second)

    select {
    case <-tick.C:
    // 上一任务超时
    // close(taskChan)后重新 make 一个 taskChan
    // 然后再次启动一个新协程处理后面的连接
    // 这样会要求单个连接 write 方法需要支持多线程安全,比较好的办法是单个连接内部设置个 chan 缓冲区,写阻塞之后的写操作缓存到 chan,chan 满了丢弃数据?
    case task->taskChan:
    }
    }
    }

    ```


    但是感觉一般不需要这么麻烦,最简单的办法是设置极短的 Deadline,哪个连接缓冲区满直接超时然后强制关闭连接最省心。
    suilongfei
        8
    suilongfei  
       2017-12-22 08:11:01 +08:00
    @gamexg

    粗浅的看了一下,所用的 websocket 库应该是处理了在数据量不同的情况下,写缓存的问题
    zhaolion
        9
    zhaolion  
       2018-01-08 09:53:39 +08:00
    这一块可以参考一下 bilibili 的 Terry-Mao 的 Goim 的代码,我从中获益良多
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5750 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 06:24 · PVG 14:24 · LAX 22:24 · JFK 01:24
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.