apachespark在scala中处理kafka消息字节数组

im9ewurl  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(414)

我正在尝试使用spark streaming和kafka来接收和处理从web服务器接收的消息。
我正在测试中提到的消费者https://github.com/dibbhatt/kafka-spark-consumer/blob/master/readme.md 利用它提供的额外功能。
作为第一步,我将尝试使用提供的示例来查看结果。但是,我在实际查看有效负载中的数据时遇到了困难。
查看以下函数的结果:

ReceiverLauncher.launch

我可以看到它返回一个RDD集合,每种类型:

MessageAndMetadata[Array[Byte]]

我被困在这一点上,不知道如何解析这一点,并看到实际的数据。web上所有使用spark附带的consumer的示例都会创建一个迭代器对象,遍历它并处理数据。但是,这个自定义使用者返回的对象没有给我任何迭代器接口。
有一个 getPayload() 方法,但我不知道如何从中获取数据。
我的问题是:
对于生产环境来说,这个消费者真的是一个好的选择吗?从外观上看,它提供的特性和它提供的抽象似乎非常有前途。
有人试过吗?有人知道如何获取数据吗?
提前谢谢,
教育部

fdbelqdn

fdbelqdn1#

getpayload()需要转换为字符串,例如。

new String(line.getPayload())

相关问题