V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
lvii
V2EX  ›  Python

python 如何并发采集设备信息

  •  
  •   lvii · 2013-07-11 11:03:09 +08:00 · 7182 次点击
    这是一个创建于 4136 天前的主题,其中的信息可能已经有所发展或是发生改变。
    各位兄台,厂里有个用 snmp 协议采集温度信息的活,需求是这样子:

    1. 有 100 多台设备
    2. 每隔 10 秒取一次数据
    3. 将取到的数据写入数据库

    我现在只实现了采集单台设备信息并写入数据库的功能
    要并发采集 100 台设备的信息,不知道要怎么解决

    使用 time snmpwalk 测试采集单台设置数据,要 real 0m2.269s
    现在只写了一个简单的 for 循环轮循发起请求,但这样的同步模式,前五台设备采集好数据,就已经过了 10s 了
    就进入了下一个采集时间片。这样明显杯具了

    昨天搜了一下 python 并发操作的一些概念:

    1. 使用多进程
    2. 使用多线程
    3. 使用协程

    对于多线程,多进程能理解,但是 同步,异步 和 阻塞和非阻塞和协程关系不是很懂
    如果在这个需求里面,批量发送 snmp 请求应该是可以异步操作的
    但是取回数据之后,要写回数据库,这个是不是就会被阻塞了?

    之前没有写过这种并发的例子。各位兄台,可否指点下,我对需求的分析是否正确
    可否有简单的实例参考一下。谢谢
    14 条回复    2013-07-15 14:57:14 +08:00
    reusFork
        1
    reusFork  
       2013-07-11 11:36:58 +08:00
    可以用标准库里的asyncore,或者gevent之类,可以同时进行多个网络io
    lvii
        2
    lvii  
    OP
       2013-07-11 12:02:56 +08:00
    对 100 条记录的批量插入数据库操作是阻塞的还是非阻塞的?
    lookhi
        3
    lookhi  
       2013-07-11 13:53:03 +08:00
    从解决问题的态度。
    起100个线程
    likuku
        4
    likuku  
       2013-07-11 14:23:52 +08:00
    @lvii 建议是每条记录在生成时的时间戳也当成记录内容的一部分,

    之后记录导入数据库后,记录产生时刻以记录内的时间戳信息为准,

    这样,你记录迟一点/异步 导入数据库也可,但每条记录所记的真实/原始采样时戳都正确。
    likuku
        5
    likuku  
       2013-07-11 14:27:56 +08:00
    @lvii
    @lookhi 的「起100个线程」,也是好办法,但设备越来越多,这状况只会更严重。

    建议是否可能改当前的“拉数据”的方法为“推数据”,每个设备本地有个小监控设备,每10秒采样,状态可以先缓存本地,之后每分钟(可以加个随机的秒,已避免成DDOS)将数据集中推给搜集中心数据库。
    timonwong
        6
    timonwong  
       2013-07-11 15:35:59 +08:00
    这种pull式获取传感器信息的,还是很好做,首先就不用考虑写一个高性能服务器需要做的活,因为这就是个客户端,实在不行在N台机器上开N个客户端分别获取不同组的信息最后统一写入到数据库(也可以写一个任务分配服务器来完成任务派发)。

    再就是客户端编写,开N个线程其实也没啥,因为你点数不多(100多个),间隙也比较长(10s),那2s多的过程大部分都耗在传输上了(还有可能是结点部分处理相当相当慢,如果是数据量相当大导致的慢,我的假设没用,这段文本也就对你毫无用途了)。

    数据库写入肯定是需要时间的,你可以另外开一个线程/进程/whatever干这种事情,写入速度要看你的数据量,数据量大还有更多的课题要做,几句话肯定说不完。

    如果不需要考虑数据恢复的问题,可以在客户端内部维护一个list, 以一定的时间间隙,将这个list整个拿出来, 原来的list引用到另外一个新空list上(还考虑同步问题,暴力法就是用锁,当然锁都是要你命一万的),然后把整个list的数据bulk insert到数据库里。另外可以用message queue来完成数据写入工作:客户端发送采集到的数据到mq里(生产者), 一个单独的进程获取mq的数据,完成写入工作(消费者),这种模型简单不易出错。
    BOYPT
        7
    BOYPT  
       2013-07-12 13:04:32 +08:00
    你的数据库不会慢到写入也要等几秒吧,不然阻塞一下有什么要紧的。
    BOYPT
        8
    BOYPT  
       2013-07-12 13:07:23 +08:00
    当然要是你数据库真那么慢也没关系啊,专门一个线程负责管理写入队列,其他收集者拿到数据后塞给这个写入者,就完了吧……标准的教科书式生产者消费者模型。
    lvii
        9
    lvii  
    OP
       2013-07-15 10:54:00 +08:00
    各位兄台,我的需求是这样的:

    1. 一共有 100 多台设备,需要每隔 10s 获取一遍信息
    2. 主动监控,因为要监控的设备没法编程,不支持 SNMP 的 trap,只支持 snmpwalk 获取 OID 信息
    3. 每个设备获取 7 个 OID 信息,其中最后一个最耗时,需要 2.2~2.4s 左右
    4. 将返回的信息格式化一下,插入 mysql 数据库

    所以,@likuku @timonwon 兄,被动监控,没法搞啊

    我用 gevent 模块测试了一下获取一台机器的信息:

    oid_list=['.1.3.6.1.4.1.30966.4.2.1.22',
    '.1.3.6.1.4.1.30966.4.2.1.23',
    '.1.3.6.1.4.1.30966.4.2.1.24',
    '.1.3.6.1.4.1.30966.4.2.1.25',
    '.1.3.6.1.4.1.30966.4.2.1.7',
    '.1.3.6.1.4.1.30966.4.2.1.13',
    '.1.3.6.1.4.1.30966.4.2.1.1' ]

    def get_snmp_info(oid,ip):
    '''
    net-snmp 官方 python 接口:
    http://www.net-snmp.org/wiki/index.php/Python_Bindings
    https://net-snmp.svn.sourceforge.net/svnroot/net-snmp/trunk/net-snmp/python/README
    '''
    return netsnmp.snmpwalk(oid, Version=2, DestHost=ip, Community="public")

    start_time=time.time()
    threads = []

    for oid_text in oid_list:
    threads.append(gevent.spawn(get_snmp_info,oid_text,desthost))
    gevent.run()
    #gevent.joinall(threads)

    print time.time()-start_time


    问题:

    耗时:0.000214099884033

    >>> threads[0].value ## spawn() 后,函数并没有在后台执行,没法获取返回值
    >>> threads[0].run() ## 需要手动调用 run() 方法进行执行
    >>> threads[0].value
    ('23',)

    gevent.spawn() 的 thread 没有执行,需要手动调用 threads[x].run() 方法来触发
    这样又和取消 #gevent.joinall(threads) 的注释,效果一样了,不是异步的模式
    和同步模式没有区别

    耗时:4.58578181267
    测试 2台设备 gevent.joinall(threads) 耗时 4.5s 这样子如果并发采集 100
    多台,时间不够用

    gevent.joinall() 手册描述:gevent.joinall() waits for them to complete
    使用 joinall() 方法,就要等待所有的 spawn 的调用都执行完,这种等待是不是一种阻塞
    lvii
        10
    lvii  
    OP
       2013-07-15 10:55:14 +08:00
    v2ex 代码格式有问题,我代码贴到这里了:http://ix.io/6F5
    timonwong
        11
    timonwong  
       2013-07-15 11:26:11 +08:00
    @lvii 我没有说被动啊,你的需求其实是客户端啊,因为是你主动请求设备的数据。。。
    最后那个OID那里,耗时长的原因是传输速度慢而不是数据量极大吧,数据量极大写起来就比较麻烦,如果只是传输速度慢(同时数据量也小)完全可以依赖上下文切换切到其它任务去。

    还有两点:
    1. 没有必要对一台设备的7次请求都分别做成线程, 直接顺序请求7次就可以了,单台设备的请求封在一个线程内(一个线程获取设备的7个OID)
    2. 还有如果你的joinall()调用时机也不对,joinall()就是等待所有线程完成,你对单台设备的7次请求就等待其完成,跟做成顺序执行7次(无线程)没有什么区别,反而可能由于上下文切换速度还慢点,如果不需要同步,开了之后不用等待,只有程序退出的时候可能你需要所有操作完成join一下。
    heqing
        12
    heqing  
       2013-07-15 11:55:02 +08:00
    单取一台设备信息需要多少时间?
    lvii
        13
    lvii  
    OP
       2013-07-15 13:59:03 +08:00
    @timonwong 兄,这7个 OID 中,前 6个都是毫秒级别,最后一个 OID 耗时 2s
    因为这个 OID 需要遍历设备的所有信息,我需要的是其中的一段。

    about 1:
    @timonwong 兄,gevent.spawn() 开启的是一个线程?
    我不知道怎么把对 7个 OID 的请求封装在一个线程里面。
    net-snmp.snmpwalk() 方法每次只能调用一个 OID 不能批量一次性调用多个 OID

    about 2:
    我想如果买个设备的信息采集可以控制在 2.4s 并且每个设备可以并行采集是没问题的。
    我 joinall() 的目的是要获取执行的返回结果,不然我后面的格式化数据和保存数据的逻辑
    没法写了。

    起始我想要的模型:

    采集设备1 (7个OID) 2.4s 返回结果,保存到字典
    采集设备2 (7个OID) 2.4s 返回结果,保存到字典
    ...
    采集设备n (7个OID) 2.4s 返回结果,保存到字典

    如果上面的采集,可以并行异步进行:

    最短时间 = 单台采集时间 + for轮循时间

    @heqing 兄,一台设备平均 2.4s
    timonwong
        14
    timonwong  
       2013-07-15 14:57:14 +08:00
    @lvii
    我先建议你换成多线程模型,net-snmp的python绑定与gevent的兼容性未知,当然net-snmp的python绑定是否完全线程安全也未知,如果不是线程安全,也好做,使用多进程模型即可。

    相当基本过程(多线程), 伪代码:

    def worker_proc(equip_id, q):

    oids = oids_for_equip_id(equip_id)
    for oid in ords:
    snmp_walk(oid)

    data = pre_process_data()
    q.put(data)

    end


    import threading

    # Consumer queue
    q= Queue()
    def data_consumer(q):
    # 这里可以考虑维护个buffer_list, 维持一个超时时间
    # 在超时或者buffer_list的大小超过阈值时, bulk insert到数据库
    data = q.get()
    save_to_db(data)


    # bookkeeping
    threads = []
    # Workers
    for equip_id in equips:
    t = threading.Thread(target=worker_proc, args=(equip_id,q))
    t.start()
    threads.append(t)
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1000 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 21:29 · PVG 05:29 · LAX 13:29 · JFK 16:29
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.