我是spark和kafka的新手,我想以json格式将数据从kafka转发到spark in stream。我的问题是如何定义数据Map,以及如何定义kafkautils.createdirectstream()中的参数
val ssc = new StreamingContext(sparkConfig, Seconds(10))
case class dataMap (number: Int, address: String, product: String, store: String, seller : String)
val messages = KafkaUtils.createDirectStream[ Int, String, String, String, String](ssc, kafkaParams, topics).map(m => m.as[dataMap])
当我使用上述代码时,我收到以下错误:
error: type arguments [Int,String,String,String,String] conform to the bounds of none of the overloaded alternatives of value createDirectStream
p、 s:我已经用正确的格式定义了Kafka帕拉和主题。
1条答案
按热度按时间oxcyiej71#
我相信你想要这样的东西:
请注意,这将消除您的错误,但您还有更多的工作要做。我们要做的就是得到原始的json
String
. 然后你必须转换String
进入你的case class
. 老实说,这是一个单独的问题,与ApacheSpark无关,甚至与Kafka无关。对于这个问题,您可以在这里找到两种不同的解决方案:如何使用apachespark中kafka主题中的scala读取json数据