最近项目用到 kafka-python,在试验把 kafka broker 停止后,生产者和消费者都没有出现异常。
重启后生产者继续生产,但是消费者不能消费那个 topic 了。
想知道有什么办法,让消费者继续消费,或者让它能抛出异常捕捉后重新订阅一下 topic ?
1
qi1070445109 OP 补充一下我用的 kafka-python 1.4.1 代码如下:
#consumer.py from kafka import KafkaConsumer # To consume latest messages and auto-commit offsets consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092']) for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) # producer.py from kafka import KafkaProducer from kafka.errors import KafkaError producer = KafkaProducer(bootstrap_servers=['broker1:1234']) for _ in range(100000): producer.send('my-topic', b'msg' |
2
qi1070445109 OP 抱歉,没整好格式。
#consumer.py from kafka import KafkaConsumer # To consume latest messages and auto-commit offsets consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092']) for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) # producer.py from kafka import KafkaProducer from kafka.errors import KafkaError producer = KafkaProducer(bootstrap_servers=['broker1:1234']) for _ in range(100000): producer.send('my-topic', b'msg' |