如何在kafkautils.createdirectstream()中定义参数

mzaanser  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(467)

我是spark和kafka的新手,我想以json格式将数据从kafka转发到spark in stream。我的问题是如何定义数据Map,以及如何定义kafkautils.createdirectstream()中的参数

val ssc = new StreamingContext(sparkConfig, Seconds(10))
case class dataMap (number: Int, address: String, product: String, store: String, seller : String)
val messages = KafkaUtils.createDirectStream[ Int, String, String, String, String](ssc, kafkaParams, topics).map(m => m.as[dataMap])

当我使用上述代码时,我收到以下错误:

error: type arguments [Int,String,String,String,String] conform to the bounds  of none of the overloaded alternatives of value createDirectStream

p、 s:我已经用正确的格式定义了Kafka帕拉和主题。

oxcyiej7

oxcyiej71#

我相信你想要这样的东西:

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, dataMap](...)

请注意,这将消除您的错误,但您还有更多的工作要做。我们要做的就是得到原始的json String . 然后你必须转换 String 进入你的 case class . 老实说,这是一个单独的问题,与ApacheSpark无关,甚至与Kafka无关。对于这个问题,您可以在这里找到两种不同的解决方案:如何使用apachespark中kafka主题中的scala读取json数据

相关问题