spark结构化流如何以protobuf格式写入kafka

q7solyqu  于 2021-05-16  发布在  Spark
关注(0)|答案(0)|浏览(426)

Spark:3.0.0
斯卡拉:2.12
汇合
我有Spark结构化流的工作,并寻找一个写Dataframe到Kafka在protbuf格式的例子。
我从postgresql读取消息,在完成所有转换之后,有一个带有key和value的Dataframe:

root
 |-- key: string (nullable = true)
 |-- value: binary (nullable = false)

将消息推送到Kafka的代码:

val kafkaOptions = Seq(
      KAFKA_BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer",
      "schema.registry.url" -> "http://localhost:8081",
      "topic" -> "test_users"
    )

 tDF
      .write
      .format(KAFKA)
      .options(kafkaOptions.toMap)
      .save()

二进制格式的消息已发布,但我无法反序列化,因为合流中没有架构
有没有一个lib可以简单地为我做一些事情?或者我可以参考的示例代码。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题