kafka在spark streaming中创建DirectStream

xyhw6mcr  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(412)

我正在尝试spark streaming+kafka集成指南(kafka broker版本0.10.0或更高版本)中的示例代码。代码可以运行没有任何错误,但我不能收到任何记录。如果我从一开始就运行kafka-console-consumer.sh,我就可以得到记录。有人知道原因吗?我的代码如下:

val broker = "221.181.73.44:19092"
val topics = Array("connect-test")
val groupid = "SparkStreamingLoad3"
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> broker,
  "group.id" -> groupid,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "auto.offset.reset" -> "earliest", //earliest | latest
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

stream.print()

ssc.start()
ssc.awaitTermination()

我的sbt版本是:

version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-streaming-kafka-0-10_2.10" % "2.1.0",
  "org.apache.spark" % "spark-core_2.10" % "2.1.0",
"org.apache.spark" % "spark-streaming_2.10" % "2.1.0",
"org.apache.kafka" % "kafka_2.10" % "0.10.2.1"
)

谢谢!

w1jd8yoj

w1jd8yoj1#

val broker = "221.181.73.44:19092"

默认端口是9092,这可能是问题所在。 "auto.offset.reset" -> "earliest" 以及 "enable.auto.commit" -> false 应该始终从主题日志的开头开始读取,因为偏移量不会存储在任何位置。所以这没有问题。
另外,我们可以看到您使用的完整命令吗 kafka-console-consumer.sh ?

nzkunb0c

nzkunb0c2#

最后,我解决了这个问题。答案如下:
主题中的数据是从console producer生成的,console producer是一个字符串列表。但是,数据的格式为[array[byte],array[byte]]。不是[string,string]。因此,如果我使用stringdeserializer,将不会收到任何数据。
我从console学习了消费源代码writeto(consumerrecord:consumerrecord[array[byte],array[byte]],output:printstream):unit
rdds中的键/值可以包含空值。在我的例子中,所有的键都是空的。我使用以下代码获取数据:
stream=kafkautils.createdirectstream[array[byte],array[byte]](ssc,preferconsistent,subscribe[array[byte],array[byte]](topics,kafkaparams))stream.map(rdd=>new string(option(rdd.key()).getorelse(“null.getbytes))+“| | | delemiter | | |”+new string(option(rdd.value()).getorelse(“null.getbytes))).print()

相关问题