我有一个带有BashOperator的Airflow DAG,它运行一个Kafka生产者,生成随机计数的消息。这些消息由Kafka消费者消费,消费者将它们写入JSON文件。但是,我希望消费者在处理特定计数的消息后优雅地停止,而不会在Airflow DAG中导致错误。
我已经考虑过使用超时,但我更喜欢更简洁的解决方案。是否有推荐的方法来实现这一点?我希望在使用者处理完初始消息集后,DAG继续进行下一步而不会出错。
任何帮助或指导的最佳方法,为这种情况将不胜感激!
我的生产者代码是:
import time
import json
import random
from datetime import datetime
from kafka import KafkaProducer
def generate_data() -> dict:
id_val = random.randint(1,1000000)
cur_timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
platform_type = random.choice(['ios','android','web','mobile-web'])
messages = {
'id': id_val,
'cur_timestamp': cur_timestamp,
'type': platform_type
}
return messages
def serializer(messages):
return json.dumps(messages).encode('utf-8')
topic = "new_topic"
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=serializer,
api_version=(0,11,5)
)
record_cnt = random.randint(10,100)
def produce_msg():
for _ in range(record_cnt):
send_msg = generate_data()
producer.send(topic, send_msg)
print(f'Producing message {str(send_msg)}')
producer.flush()
producer.close()
produce_msg()
print('produce finished')
字符串
我的消费者代码是:
import json
from kafka import KafkaConsumer
topic = "new_topic"
json_path = '/home/airflow/clickstream.json'
consumer = KafkaConsumer(topic, bootstrap_servers="localhost:9092",
enable_auto_commit=True,auto_offset_reset='earliest')
def consumer_to_json():
with open(json_path, 'w') as json_file:
for send_msg in consumer:
message_value = send_msg.value.decode("utf-8")
json_data = json.loads(message_value)
print(json_data)
json.dump(json_data, json_file)
json_file.write('\n')
consumer.close()
print ('finish')
if __name__ == "__main__":
consumer_to_json()
型
1条答案
按热度按时间bxgwgixi1#
你能数一下吗?
字符串
值得指出的是,Kafka Connect已经可以写入以行分隔的JSON文件,并将持续运行。否则,写入可搜索的系统,如Mongo / Elasticsearch / RDBMS JDBC数据库(因为您无论如何都需要Airflow)可能比普通文件更好。