我有一个Kafka机器在AWS中运行,它由几个主题组成。我有下面的Lambda函数,它产生一个消息,并将其推到Kafka主题之一。
import json from kafka
import KafkaClient from kafka
import SimpleProducer from kafka
import KafkaProducer
def lambda_handler(event, context):
kafka = KafkaClient("XXXX.XXX.XX.XX:XXXX")
print(kafka)
producer = SimpleProducer(kafka, async = True)
print(producer)
task_op = {
"'message": "Hai, Calling from AWS Lambda"
}
print(json.dumps(task_op))
producer.send_messages("topic_atx_ticket_update",json.dumps(task_op).encode('utf-8'))
print(producer.send_messages)
return ("Messages Sent to Kafka Topic")
但是我看到消息没有像我预期的那样被推送。
注:角色和策略、连接没有问题。
1条答案
按热度按时间deikduxw1#
在创建Kafka Producer对象时,
“async”字符串应为False,如
然后,
您可以从AWS Lambda向主题发送Kafka消息。