# coding:utf=8
from pykafka.client import KafkaClient
import logging
import json
import time
logging.basicConfig(level= logging.WARNING)
produce_logger = logging.getLogger('prodrcer')
def kafka(use_rdkafka=False):
client = KafkaClient('192.168.109.58:9092,192.168.109.70:9092,192.168.109.91:9092')
produce_start = time.time()
topic = client.topics['meteor_spider_article_dev']
# producer = topic.get_producer(sync=True, use_rdkafka=use_rdkafka)
msg_body = {
'article_id': 1,
"title": "标题",
"subtitle": "副标题",
}
msg = json.dumps(msg_body)
with topic.get_sync_producer() as producer:
for i in range(0, 1000):
producer.produce(msg)
producer.stop()
return time.time() - produce_start
def calculate_thoughput(timing, n_messages=1000, msg_size=5956):
print("Processed {0} messsages in {1:.2f} seconds".format(n_messages, timing))
print("{0:.2f} MB/s".format((msg_size * n_messages) / timing / (1024*1024)))
print("{0:.2f} Msgs/s".format(n_messages / timing))
if __name__ == '__main__':
calculate_thoughput(kafka())
Processed 1000 messsages in 76.68 seconds 0.07 MB/s 13.04 Msgs/s
这速度 怎么回事?
1
sylecn 2016-09-07 14:41:58 +08:00
topic.get_sync_producer()
虽然还没有用过 kafka ,但是这种压力测试应该都用 async 模式来发消息吧。如果用同步,起码开多线程一起发。 要不然一个一个等反馈多慢啊。 |
2
reAsOn 2016-09-07 14:47:30 +08:00
用过 kafka-python 的库,性能可以接受,需要用异步发送 + batch
|
3
996635 OP |
4
est 2016-09-07 15:13:35 +08:00
kafka 至少有 3 个 py 库,各自实现都不同。需要仔细判别。
|
5
tongle 2016-09-07 15:50:51 +08:00
Using the librdkafka extension 试试这个
|