获取从python kafkaproducer发送的消息

ds97pgxw  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(421)

我的目标是从非文件源(即在程序中生成或通过api发送)获取数据,并将其发送到spark流。为了实现这一点,我通过基于python的 KafkaProducer :

$ bin/zookeeper-server-start.sh config/zookeeper.properties &
$ bin/kafka-server-start.sh config/server.properties &
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
$ python 
Python 3.6.1| Anaconda custom (64-bit)
> from kafka import KafkaProducer
> import time
> producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
> producer.send(topic = 'my-topic', value = 'MESSAGE ACKNOWLEDGED', timestamp_ms = time.time())
> producer.close()
> exit()

我的问题是,在检查consumershell脚本中的主题时,没有显示任何内容:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic my-topic
^C$

这里缺什么东西了吗?我对spark/kafka/messaging系统还不熟悉,所以什么都可以。kafka版本是0.11.0.0(scala2.11),配置文件没有任何更改。

hivapdat

hivapdat1#

如果在向主题发送消息后启动使用者,则使用者可能会跳过该消息,因为它会将主题偏移量(可以将其视为从中读取的“起点”)设置为主题的结尾。要改变这种行为,请尝试添加 --from-beginning 选项:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

你也可以试试 kafkacat ,这比Kafka的游戏机消费者和生产者(imho)更方便。读Kafka的留言 kafkacat 可使用以下命令执行:

kafkacat -C -b 'localhost:9092' -o beginning -e -D '\n' -t 'my-topic'

希望能有帮助。

4bbkushb

4bbkushb2#

我发现了问题 value_serializer 因为我没有将json模块导入到解释器,所以默默地崩溃了。有两种解决方案,一种是简单地导入模块,然后 "MESSAGE ACKNOWLEDGED" (带引号)后退。或者你可以移除 value_serializer 一起把 value 将在下一行中发送到字节字符串中的字符串(即。 b'MESSAGE ACKNOWLEDGED' 对于python3),这样您就可以不带引号地返回消息。
我还将kafka切换到版本0.10.2.1(scala2.11),因为kafkapython文档中没有确认它与版本0.11.0.0兼容

相关问题