将kafka(kafka python)转储到txt文件

wfauudbj  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(731)

我需要定期将kafka消费者的输出转储到excel文件中。我使用以下代码:

from kafka import KafkaConsumer
from kafka import KafkaProducer
import json,time
from xlutils.copy import copy    
from xlrd import open_workbook
import pandas

consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
KafkaConsumer()
consumer.subscribe("test")

rowx=0
colx=0

for msg in consumer:
        book_ro = open_workbook("twitter.xls")
        book = copy(book_ro)  # creates a writeable copy
        sheet1 = book.get_sheet(0)  # get a first sheet
        sheet1.write(rowx,colx, msg[6])
        book.save("twitter.xls")

现在,我的问题是代码效率不高。对于我需要打开、写入、然后保存excel文件的每条消息。有没有什么方法可以打开excel一次,写一次,然后关闭它(对于一批消息而不是for循环)?tnx公司

k7fdbhmy

k7fdbhmy1#

是的,打开,写,保存和关闭每一条消息是低效的,你可以做一批。但仍然需要在消费循环中进行。

msg_buffer = []
buffer_size = 100
for msg in consumer:
        msg_buffer.append(msg[6])
        if len(msg_buffer) >= buffer_size:
            book_ro = open_workbook("twitter.xls")
            book = copy(book_ro)  # creates a writeable copy
            for _msg in msg_buffer:
                sheet1 = book.get_sheet(0)  # get a first sheet
                sheet1.write(rowx,colx, _msg)
            book.save("twitter.xls")
            msg_buffer = []

你可能认为这会比nobatch快100倍。
更新评论:
是的,通常我们将永远留在这个循环中,它在内部使用poll来获取新消息、发送心跳和提交偏移量。如果您的目标是使用来自此主题的消息并保存消息,那么它应该是一个长时间运行的循环。
这是kafka python设计,您应该这样使用来消费消息或使用consumer.poll()。
至于你为什么要 for msg in consumer: ,因为使用者是迭代器对象,所以它的类实现 __iter__ 以及 __next__ ,它使用一个获取程序来获取记录。您可以参考更多的实现细节https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/group.py

相关问题