1
reusFork 2013-07-11 11:36:58 +08:00
可以用标准库里的asyncore,或者gevent之类,可以同时进行多个网络io
|
2
lvii OP 对 100 条记录的批量插入数据库操作是阻塞的还是非阻塞的?
|
3
lookhi 2013-07-11 13:53:03 +08:00
从解决问题的态度。
起100个线程 |
4
likuku 2013-07-11 14:23:52 +08:00
@lvii 建议是每条记录在生成时的时间戳也当成记录内容的一部分,
之后记录导入数据库后,记录产生时刻以记录内的时间戳信息为准, 这样,你记录迟一点/异步 导入数据库也可,但每条记录所记的真实/原始采样时戳都正确。 |
5
likuku 2013-07-11 14:27:56 +08:00
|
6
timonwong 2013-07-11 15:35:59 +08:00
这种pull式获取传感器信息的,还是很好做,首先就不用考虑写一个高性能服务器需要做的活,因为这就是个客户端,实在不行在N台机器上开N个客户端分别获取不同组的信息最后统一写入到数据库(也可以写一个任务分配服务器来完成任务派发)。
再就是客户端编写,开N个线程其实也没啥,因为你点数不多(100多个),间隙也比较长(10s),那2s多的过程大部分都耗在传输上了(还有可能是结点部分处理相当相当慢,如果是数据量相当大导致的慢,我的假设没用,这段文本也就对你毫无用途了)。 数据库写入肯定是需要时间的,你可以另外开一个线程/进程/whatever干这种事情,写入速度要看你的数据量,数据量大还有更多的课题要做,几句话肯定说不完。 如果不需要考虑数据恢复的问题,可以在客户端内部维护一个list, 以一定的时间间隙,将这个list整个拿出来, 原来的list引用到另外一个新空list上(还考虑同步问题,暴力法就是用锁,当然锁都是要你命一万的),然后把整个list的数据bulk insert到数据库里。另外可以用message queue来完成数据写入工作:客户端发送采集到的数据到mq里(生产者), 一个单独的进程获取mq的数据,完成写入工作(消费者),这种模型简单不易出错。 |
7
BOYPT 2013-07-12 13:04:32 +08:00
你的数据库不会慢到写入也要等几秒吧,不然阻塞一下有什么要紧的。
|
8
BOYPT 2013-07-12 13:07:23 +08:00
当然要是你数据库真那么慢也没关系啊,专门一个线程负责管理写入队列,其他收集者拿到数据后塞给这个写入者,就完了吧……标准的教科书式生产者消费者模型。
|
9
lvii OP 各位兄台,我的需求是这样的:
1. 一共有 100 多台设备,需要每隔 10s 获取一遍信息 2. 主动监控,因为要监控的设备没法编程,不支持 SNMP 的 trap,只支持 snmpwalk 获取 OID 信息 3. 每个设备获取 7 个 OID 信息,其中最后一个最耗时,需要 2.2~2.4s 左右 4. 将返回的信息格式化一下,插入 mysql 数据库 所以,@likuku @timonwon 兄,被动监控,没法搞啊 我用 gevent 模块测试了一下获取一台机器的信息: oid_list=['.1.3.6.1.4.1.30966.4.2.1.22', '.1.3.6.1.4.1.30966.4.2.1.23', '.1.3.6.1.4.1.30966.4.2.1.24', '.1.3.6.1.4.1.30966.4.2.1.25', '.1.3.6.1.4.1.30966.4.2.1.7', '.1.3.6.1.4.1.30966.4.2.1.13', '.1.3.6.1.4.1.30966.4.2.1.1' ] def get_snmp_info(oid,ip): ''' net-snmp 官方 python 接口: http://www.net-snmp.org/wiki/index.php/Python_Bindings https://net-snmp.svn.sourceforge.net/svnroot/net-snmp/trunk/net-snmp/python/README ''' return netsnmp.snmpwalk(oid, Version=2, DestHost=ip, Community="public") start_time=time.time() threads = [] for oid_text in oid_list: threads.append(gevent.spawn(get_snmp_info,oid_text,desthost)) gevent.run() #gevent.joinall(threads) print time.time()-start_time 问题: 耗时:0.000214099884033 >>> threads[0].value ## spawn() 后,函数并没有在后台执行,没法获取返回值 >>> threads[0].run() ## 需要手动调用 run() 方法进行执行 >>> threads[0].value ('23',) gevent.spawn() 的 thread 没有执行,需要手动调用 threads[x].run() 方法来触发 这样又和取消 #gevent.joinall(threads) 的注释,效果一样了,不是异步的模式 和同步模式没有区别 耗时:4.58578181267 测试 2台设备 gevent.joinall(threads) 耗时 4.5s 这样子如果并发采集 100 多台,时间不够用 gevent.joinall() 手册描述:gevent.joinall() waits for them to complete 使用 joinall() 方法,就要等待所有的 spawn 的调用都执行完,这种等待是不是一种阻塞 |
10
lvii OP v2ex 代码格式有问题,我代码贴到这里了:http://ix.io/6F5
|
11
timonwong 2013-07-15 11:26:11 +08:00
@lvii 我没有说被动啊,你的需求其实是客户端啊,因为是你主动请求设备的数据。。。
最后那个OID那里,耗时长的原因是传输速度慢而不是数据量极大吧,数据量极大写起来就比较麻烦,如果只是传输速度慢(同时数据量也小)完全可以依赖上下文切换切到其它任务去。 还有两点: 1. 没有必要对一台设备的7次请求都分别做成线程, 直接顺序请求7次就可以了,单台设备的请求封在一个线程内(一个线程获取设备的7个OID) 2. 还有如果你的joinall()调用时机也不对,joinall()就是等待所有线程完成,你对单台设备的7次请求就等待其完成,跟做成顺序执行7次(无线程)没有什么区别,反而可能由于上下文切换速度还慢点,如果不需要同步,开了之后不用等待,只有程序退出的时候可能你需要所有操作完成join一下。 |
12
heqing 2013-07-15 11:55:02 +08:00
单取一台设备信息需要多少时间?
|
13
lvii OP @timonwong 兄,这7个 OID 中,前 6个都是毫秒级别,最后一个 OID 耗时 2s
因为这个 OID 需要遍历设备的所有信息,我需要的是其中的一段。 about 1: @timonwong 兄,gevent.spawn() 开启的是一个线程? 我不知道怎么把对 7个 OID 的请求封装在一个线程里面。 net-snmp.snmpwalk() 方法每次只能调用一个 OID 不能批量一次性调用多个 OID about 2: 我想如果买个设备的信息采集可以控制在 2.4s 并且每个设备可以并行采集是没问题的。 我 joinall() 的目的是要获取执行的返回结果,不然我后面的格式化数据和保存数据的逻辑 没法写了。 起始我想要的模型: 采集设备1 (7个OID) 2.4s 返回结果,保存到字典 采集设备2 (7个OID) 2.4s 返回结果,保存到字典 ... 采集设备n (7个OID) 2.4s 返回结果,保存到字典 如果上面的采集,可以并行异步进行: 最短时间 = 单台采集时间 + for轮循时间 @heqing 兄,一台设备平均 2.4s |
14
timonwong 2013-07-15 14:57:14 +08:00
@lvii
我先建议你换成多线程模型,net-snmp的python绑定与gevent的兼容性未知,当然net-snmp的python绑定是否完全线程安全也未知,如果不是线程安全,也好做,使用多进程模型即可。 相当基本过程(多线程), 伪代码: def worker_proc(equip_id, q): oids = oids_for_equip_id(equip_id) for oid in ords: snmp_walk(oid) data = pre_process_data() q.put(data) end import threading # Consumer queue q= Queue() def data_consumer(q): # 这里可以考虑维护个buffer_list, 维持一个超时时间 # 在超时或者buffer_list的大小超过阈值时, bulk insert到数据库 data = q.get() save_to_db(data) # bookkeeping threads = [] # Workers for equip_id in equips: t = threading.Thread(target=worker_proc, args=(equip_id,q)) t.start() threads.append(t) |