如何在Python中处理特定数量的消息后优雅地停止Kafka消费者?

ff29svar  于 2023-11-16  发布在  Apache
关注(0)|答案(1)|浏览(117)

我有一个带有BashOperator的Airflow DAG,它运行一个Kafka生产者,生成随机计数的消息。这些消息由Kafka消费者消费,消费者将它们写入JSON文件。但是,我希望消费者在处理特定计数的消息后优雅地停止,而不会在Airflow DAG中导致错误。
我已经考虑过使用超时,但我更喜欢更简洁的解决方案。是否有推荐的方法来实现这一点?我希望在使用者处理完初始消息集后,DAG继续进行下一步而不会出错。
任何帮助或指导的最佳方法,为这种情况将不胜感激!

我的生产者代码是:

import time 
import json 
import random 
from datetime import datetime
from kafka import KafkaProducer

def generate_data() -> dict: 
    
    id_val = random.randint(1,1000000) 
    cur_timestamp = time.strftime("%Y-%m-%d %H:%M:%S") 
    platform_type = random.choice(['ios','android','web','mobile-web']) 
    messages = {
        'id': id_val,
        'cur_timestamp': cur_timestamp,
        'type': platform_type
    }
    return messages

def serializer(messages):
    return json.dumps(messages).encode('utf-8') 

topic = "new_topic" 
producer = KafkaProducer( 
    bootstrap_servers=['localhost:9092'],
        value_serializer=serializer,
    api_version=(0,11,5)
) 

record_cnt = random.randint(10,100)
def produce_msg():
    for _ in range(record_cnt):
        send_msg = generate_data()
        producer.send(topic, send_msg)
        print(f'Producing message {str(send_msg)}')
    producer.flush()    
    producer.close()

                
produce_msg() 

print('produce finished')

字符串

我的消费者代码是:

import json 
from kafka import KafkaConsumer

topic = "new_topic" 
json_path = '/home/airflow/clickstream.json'
consumer = KafkaConsumer(topic, bootstrap_servers="localhost:9092",
enable_auto_commit=True,auto_offset_reset='earliest')

def consumer_to_json():
    with open(json_path, 'w') as json_file:
        for send_msg in consumer:
            message_value = send_msg.value.decode("utf-8")
     
            json_data = json.loads(message_value)
            print(json_data)
            json.dump(json_data, json_file)
            json_file.write('\n')
        consumer.close()
        print ('finish')  

if __name__ == "__main__":
    consumer_to_json()

bxgwgixi

bxgwgixi1#

你能数一下吗?

num_messages = 0
for send_msg in consumer:
    message_value = send_msg.value.decode("utf-8")
    json_data = json.loads(message_value)
    print(json_data)
    json.dump(json_data, json_file)
    json_file.write('\n')
    num_messages += 1
    if num_messages >= MESSAGES_LIMIT:  # TODO: define       
        break
consumer.close()

字符串
值得指出的是,Kafka Connect已经可以写入以行分隔的JSON文件,并将持续运行。否则,写入可搜索的系统,如Mongo / Elasticsearch / RDBMS JDBC数据库(因为您无论如何都需要Airflow)可能比普通文件更好。

相关问题