spark结构化流媒体中kafka协议的反序列化

pu82cl6c  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(253)

我正在测试一个工作中的实现,它将看到每天有3亿条消息通过,并计划极大地扩展。有一个步骤,目前看来是简陋的,我很感谢一些建议。
我确实试过了https://scalapb.github.io/sparksql.html 但似乎无法让它工作,甚至按照他们的专业指令。
目前我有一个protobuf和一个case类用于相同的模型:

message MyThing { // proto
    required string id = 1;
}

case class MyThing(id: String)

然后我有了一个Spark流

val df =  
  spark.readStream
    .format("kafka")
    // etc
    .load()

kafka有效负载位于“value”列中,这是来自传输的protobuf的数组[字节]。我想将二进制列转换成具有特定structtype的行。
我现在使用的是一种奇怪的语法,涉及case类:

val encoder = Encoder.product[MyThing]

df
  .select("value")
  .map { row => 
     // from memory so might be slightly off
     val proto = MyThingProto.parseFrom(row.getBinary(0)) 
     val myThing = MyThing.fromProto(proto)
     myThing
  }(encoder)
  .toDF()
  // business logic
  .writeStream
  ...//output

我能让这更有效/更快吗?创建case类的开销似乎过大。我更喜欢这样做:

.map { row => 
     // from memory so might be slightly off
     val proto = MyThingProto.parseFrom(row.getBinary(0)) 
     val row = buildRow(proto)
     row
  }(encoder??) // what kind of encoder is used here?

  def buildRow(proto: MyThingProto): Row = 
      Row(proto.getId)

这样会更好吗?或者是使用kafka反序列化器接口的udf?
提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题