spark python avro kafka反序列化程序

zzoitvuj  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(386)

我在pythonspark应用程序中创建了一个kafka流,可以解析通过它的任何文本。

kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})

我想改变这一点,以便能够解析来自Kafka主题的avro消息。在解析文件中的avro消息时,我执行以下操作:

reader = DataFileReader(open("customer.avro", "r"), DatumReader())

我是python和spark的新手,如何更改流以解析avro消息?另外,在从kafka读取avro消息时,如何指定要使用的模式???我以前用java做过这些,但是python让我很困惑。
编辑:
我试着换上avro解码器

kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))

但我得到以下错误

TypeError: 'DatumReader' object is not callable
ejk8hzay

ejk8hzay1#

正如@zoltan fedor在评论中提到的,所提供的答案现在有点过时了,因为它自编写以来已经过去了2.5年。合流的kafka python库已经发展到在本地支持相同的功能。给定代码中唯一需要的更改是以下内容。

from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer

然后,你可以改变这条线-

kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=serializer.decode_message)

我已经测试过了,效果很好。我为将来可能需要它的人添加这个答案。

wn9m85ua

wn9m85ua2#

如果您不考虑使用confluent schema registry,并且在文本文件或dict对象中有一个模式,则可以使用fastavro python包对kafka流的avro消息进行解码:

from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
import io
import fastavro

def decoder(msg):
    # here should be your schema
    schema = {
      "namespace": "...",
      "type": "...",
      "name": "...",
      "fields": [
        {
          "name": "...",
          "type": "..."
        },
      ...}
    bytes_io = io.BytesIO(msg)
    bytes_io.seek(0)
    msg_decoded = fastavro.schemaless_reader(bytes_io, schema)
    return msg_decoded

session = SparkSession.builder \
                      .appName("Kafka Spark Streaming Avro example") \
                      .getOrCreate()

streaming_context = StreamingContext(sparkContext=session.sparkContext,
                                     batchDuration=5)

kafka_stream = KafkaUtils.createDirectStream(ssc=streaming_context,
                                             topics=['your_topic_1', 'your_topic_2'],
                                             kafkaParams={"metadata.broker.list": "your_kafka_broker_1,your_kafka_broker_2"},
                                             valueDecoder=decoder)
beq87vna

beq87vna3#

我也遇到了同样的挑战—在Pypark中反序列化来自kafka的avro消息,并使用confluent schema registry模块的messageserializer方法解决了这个问题,就像在我们的示例中,该模式存储在confluent schema registry中一样。
你可以在https://github.com/verisign/python-confluent-schemaregistry

from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(url='http://xx.xxx.xxx:8081')
serializer = MessageSerializer(schema_registry_client)

# simple decode to replace Kafka-streaming's built-in decode decoding UTF8 ()

def decoder(s):
    decoded_message = serializer.decode_message(s)
    return decoded_message

kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=decoder)

lines = kvs.map(lambda x: x[1])
lines.pprint()

显然,正如您所看到的,这段代码使用的是新的直接方法,没有接收器,因此createddirectstream(参见https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html)

相关问题