我在pythonspark应用程序中创建了一个kafka流,可以解析通过它的任何文本。
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
我想改变这一点,以便能够解析来自Kafka主题的avro消息。在解析文件中的avro消息时,我执行以下操作:
reader = DataFileReader(open("customer.avro", "r"), DatumReader())
我是python和spark的新手,如何更改流以解析avro消息?另外,在从kafka读取avro消息时,如何指定要使用的模式???我以前用java做过这些,但是python让我很困惑。
编辑:
我试着换上avro解码器
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))
但我得到以下错误
TypeError: 'DatumReader' object is not callable
3条答案
按热度按时间ejk8hzay1#
正如@zoltan fedor在评论中提到的,所提供的答案现在有点过时了,因为它自编写以来已经过去了2.5年。合流的kafka python库已经发展到在本地支持相同的功能。给定代码中唯一需要的更改是以下内容。
然后,你可以改变这条线-
我已经测试过了,效果很好。我为将来可能需要它的人添加这个答案。
wn9m85ua2#
如果您不考虑使用confluent schema registry,并且在文本文件或dict对象中有一个模式,则可以使用fastavro python包对kafka流的avro消息进行解码:
beq87vna3#
我也遇到了同样的挑战—在Pypark中反序列化来自kafka的avro消息,并使用confluent schema registry模块的messageserializer方法解决了这个问题,就像在我们的示例中,该模式存储在confluent schema registry中一样。
你可以在https://github.com/verisign/python-confluent-schemaregistry
显然,正如您所看到的,这段代码使用的是新的直接方法,没有接收器,因此createddirectstream(参见https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html)