qazwsxkevin
V2EX  ›  WebSocket

请教, websockets 模块起服务, websockets.serve 的方法问题。

  •  
  •   qazwsxkevin · Oct 23, 2023 · 1217 views
    This topic created in 936 days ago, the information mentioned may be changed or developed.
    import datetime
    import random
    import string
    import asyncio
    import time
    
    import websockets
    
    from multiprocessing import Manager
    from concurrent import futures
    
    # 忽略警告
    import warnings
    warnings.filterwarnings("ignore")
    
    strLen = 30
    
    def putmsg(que, gvar):
        cc = 0
        while True:
            cc += 1
    
            if gvar['flag'] == True:
                break
    
            ranStr = ''
            for s in range(strLen):
                ranStr = ranStr + random.choice(string.ascii_letters + string.digits)
    
            # slTime = random.uniform(0.01,0.2)
    
            logStr = str(cc) + ' ' + "{:.2f}".format(slTime) + ' ' + str(datetime.datetime.now().replace(microsecond=0)) + ' ' + ranStr
            # print(logStr)
    		
            # test
            # print(cc, '#', que.qsize())
    
            que.put(logStr)
    
            # time.sleep(slTime)
            time.sleep(1.5)
    
    def wsock(queu, gvar):
        loop = asyncio.get_event_loop()
        async def stoploop():
            loop.stop()
    
        # Maintain a list of connected clients
        connected_clients = set()
    
        async def register(websocket):
            # Add a new client to the list
            connected_clients.add(websocket)
            print('connected_clients.add(websocket)')
    
        async def unregister(websocket):
            # Remove a client from the list
            connected_clients.remove(websocket)
            print('connected_clients.remove(websocket)')
    
        async def broadcast(message):
            # Send a message to all connected clients
            if connected_clients:
                await asyncio.gather(*(client.send(message) for client in connected_clients))
    
        async def echo(websocket, que=queu):
            await register(websocket)
    
            while True:
                if queu.qsize():
                    msgStr = queu.get()
                    if connected_clients:
                        try:
                            await asyncio.gather(*(client.send(msgStr) for client in connected_clients))
                        except Exception as e:
                            print(e) # echo:received 1001 (going away); then sent 1001 (going away)
                            break
                else:
                    asyncio.sleep(0.3)
    
    
            async for message in websocket:
                # Broadcast the received message to all clients
                if message == 'stop':
                    gvar['flag'] = True
                    await stoploop()
                await broadcast(message)
    
            await unregister(websocket)
    
    
        start_server = websockets.serve(echo, "172.17.0.2", 25299)
    
        asyncio.set_event_loop(loop)
        # loop.create_task(start_server)
        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()
    
    if __name__ == '__main__':
        # 队列
        msgQue = Manager().Queue()
        # 全局变量
        glovar = Manager().dict()
    
        # 启停开关
        glovar['flag'] = False
    
        # 处理进程
        proc = futures.ProcessPoolExecutor(max_workers=2)
    
        wsockRet = proc.submit(wsock, msgQue, glovar)
        putmsgRet = proc.submit(putmsg, msgQue, glovar)
    

    问题是 websocket.serve 使用 echo 方法作为 handle ,
    只有在 websocket 接口有事件的时候,才会调用 echo 进行处理,(被动式)

    echo 的被动方法,队列里的日志越来越多,
    想有一个永久循环,如果有 client(s),send 取出的队列内容,没有 client ,取出就 pass 了,
    websocket.serve 被动调用不适合这个场合,看官方也没有更好的提示,
    请教大家这里怎么换个方式实现呢?

    2 replies    2023-10-24 16:48:19 +08:00
    julyclyde
        1
    julyclyde  
       Oct 24, 2023
    看了一下这个库的说明,感觉好奇怪
    为什么把“处理”和“IO”合在一个库里面啊
    qazwsxkevin
        2
    qazwsxkevin  
    OP
       Oct 24, 2023
    @julyclyde #1 是啊,官网上异步和线程的范例逻辑思想,是做了很高的包装,要用在其它场景,很难适合,打算过几天有时间再去仔细地看看 python websocket 官网的 API renference ,看看能不能粒度化用在我的场景上。。。
    About   ·   Help   ·   Advertise   ·   Blog   ·   API   ·   FAQ   ·   Solana   ·   1014 Online   Highest 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 39ms · UTC 19:02 · PVG 03:02 · LAX 12:02 · JFK 15:02
    ♥ Do have faith in what you're doing.