我尝试在我的spark流媒体应用程序中读取kafka头(因为只有spark 3.0以后的结构化流媒体才支持读取头),我尝试使用dstream api。
然而,我得到的标题不是消息中的实际标题。
这是我正在使用的代码
try {
val sparkStreamingSession = new StreamingContext(spark.sparkContext, Seconds(1))
val kafkaParams: Map[String, Object] = Map(
"bootstrap.servers" -> kafkaBrokers,
"key.serializer" -> classOf[StringSerializer],
"value.serializer" -> classOf[StringSerializer],
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"group.id" -> "my_streaming_client",
"enable.auto.commit" -> false.asInstanceOf[Object]
)
val topics = Array(topicName)
val kafkaDStream = KafkaUtils.createDirectStream(
sparkStreamingSession,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
val processedStream = kafkaDStream.map(record => {
val headers: Headers = record.headers()
for (header <- headers.toArray) {
println(".Header Name =" + header.key())
println(".Header Value=" + new String(header.value()))
}
})
processedStream.print()
sparkStreamingSession.start()
sparkStreamingSession.awaitTermination()
}
catch {
case e: Exception => {
e.printStackTrace()
logger.error("Exception in convert ={}", e)
}
case unknown: Exception => {
println(s"Unknown exception: $unknown")
}
}
这是我在日志里看到的。
.Header Name =task.generation
.Header Value=1
.Header Name =task.id
.Header Value=0
.Header Name =current.iteration
.Header Value=3955984
不知道Kafka消息的实际头发生了什么。
谢谢你的帮助。
谢谢
暂无答案!
目前还没有任何答案,快来回答吧!