Kafka 如何让Python脚本保存数据

iezvtpos  于 11个月前  发布在  Apache
关注(0)|答案(2)|浏览(100)

目前我有一个python脚本,它从一个对象中获取数据并修改它,然后将其生成到一个Kafka主题,它看起来像这样(没有逻辑函数):

def produce_data(data):
    record_value = json.dumps(data)
    print("Producing record: {}\t{}".format("timeseries data", record_value))
    time.sleep(0)
    producer.produce(topic, key="timeseries data", value=record_value, on_delivery=ack)
    producer.poll(0)
    producer.flush()
    print("{} messages were produced to topic {}!".format(delivered_records, topic))
def generate_data():
    update_rover_samples()
    for rover_data in rovers:
        print("thread finished...exiting")
        produce_data(rover_data)

if __name__ == '__main__':

    conf = {'bootstrap.servers': '127.0.0.2:9092'}
    producer = Producer(conf)

    delivered_records = 0

    create_topic()

    def ack(err, msg):
        global delivered_records
        """
        Message Delivered Successfully!!!
        """
        if err is not None:
            print("Failed to deliver message: {}".format(err))
        else:
            delivered_records += 1
            print("Produced record to topic {} partition [{}] @ offset {}"
                  .format(msg.topic(), msg.partition(), msg.offset()))

    generate_data()

字符串
现在我可以调用generate_data()4次,它会更新它4次,但是在生产者完成并再次调用之后,脚本会生成新的数据。我知道我可以将它链接到数据库,但是我只有一个对象需要更新,所以我想知道是否有什么可以使用环境变量,或者是否有任何特定的方法来存储它,这将是最好的。
对Kafka来说很新鲜,所以任何建议都会有帮助!

bwitn5fc

bwitn5fc1#

我不知道Kafka,但如果它只是一个值,你想存储和读取,只需创建一个文件,没有扩展名(或.txt)和读/写它。
https://www.w3schools.com/python/python_file_write.asp
如果你想以有组织的方式存储更多的数据,请阅读有关.json文件的信息。

cbjzeqam

cbjzeqam2#

一个很好的从Python打开、修改和写出文件的shell应该是这样的。在这里,你可以修改文件而不需要修改原始文件。你可以根据你保存的对象的结构来修改它。另外,ALL CAPS在这里只是占位符。

replacedContent = ""
with open(FILE, 'r') as input_file:       
    for line in input_file.readlines():
        #File modification logic here
        if line.startswith(STRING_VARIABLE):
                updateLine = line.replace(line[16:], SOME_STRING + "\"\n")
        else:
            updateLine = line

        replacedContent = replacedContent + updateLine
    
    input_file.close()
    output_file = open(NEWFILE, "w")
    output_file.write(replacedContent)
    output_file.close()

字符串

相关问题