我正在测试一个工作中的实现,它将看到每天有3亿条消息通过,并计划极大地扩展。有一个步骤,目前看来是简陋的,我很感谢一些建议。
我确实试过了https://scalapb.github.io/sparksql.html 但似乎无法让它工作,甚至按照他们的专业指令。
目前我有一个protobuf和一个case类用于相同的模型:
message MyThing { // proto
required string id = 1;
}
case class MyThing(id: String)
然后我有了一个Spark流
val df =
spark.readStream
.format("kafka")
// etc
.load()
kafka有效负载位于“value”列中,这是来自传输的protobuf的数组[字节]。我想将二进制列转换成具有特定structtype的行。
我现在使用的是一种奇怪的语法,涉及case类:
val encoder = Encoder.product[MyThing]
df
.select("value")
.map { row =>
// from memory so might be slightly off
val proto = MyThingProto.parseFrom(row.getBinary(0))
val myThing = MyThing.fromProto(proto)
myThing
}(encoder)
.toDF()
// business logic
.writeStream
...//output
我能让这更有效/更快吗?创建case类的开销似乎过大。我更喜欢这样做:
.map { row =>
// from memory so might be slightly off
val proto = MyThingProto.parseFrom(row.getBinary(0))
val row = buildRow(proto)
row
}(encoder??) // what kind of encoder is used here?
def buildRow(proto: MyThingProto): Row =
Row(proto.getId)
这样会更好吗?或者是使用kafka反序列化器接口的udf?
提前谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!