嗨,我对这个有点困惑。
我的Spark代码生成预期的输出,但一旦我尝试和发送rdd回Kafka它是错误的每一次。我不太确定我的代码出了什么问题。
def sendkafka(messages):
kafka = KafkaClient(kafkaip)
producer = SimpleProducer(kafka, 'dev_test_upstream')
for message in messages:
producer.send_messages(message)
1条答案
按热度按时间w8f9ii691#
您确定sendkafka()方法按预期工作吗。看起来您提供了部分列表,所以不确定该部分是否有效。我有一个如何从python向kafka发送数据的示例,看看python客户端发布和使用来自apachekafka的消息,您可能希望使用producer.py而不是sendkafka()方法