首先我解析kafka消息,然后对消息应用模式。它按预期打印模式,我在foreach循环中使用rdd.todf().printschema。。但是,当我尝试使用jdbc连接保存数据时,它会给我一个错误:
scala.MatchError: Enrich.Streaming.Samples$Person@7d7904f6 (of class Enrich.Streaming.Samples$Person
class Person (
name : String,
id : String,
type: String,
.....,
.....,
.....,
.....,
32 of them
)
extends Product {
@throws(classOf[IndexOutOfBoundsException])
override def productElement(n: Int): Any = n match {
case 0 => name
case 1 => id
case 2 => type
....
....
case 31 => ...
case _ => throw new IndexOutOfBoundsException(n.toString())
}
override def productArity: Int = 32
override def canEqual(that: Any): Boolean = that.isInstanceOf[Person]
}
object Person extends Serializable {
def parse(str: String): Option[Person] = {
val paramArray = str.split("\\|")
Try(
new Person(paramArray(0),
paramArray(1),
paramArray(2)
.....
.....
.....
......
)) match {
case Success(trimarc) => Some(trimarc)
case Failure(throwable) => {
println (throwable.getMessage())
None
}
}
}
}
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val data = messages.map(_._2).map(Person.parse)
data.foreachRDD(rdd =>
rdd.toDF().write.mode("append").jdbc(url,table,prop)
)
如果有人能帮我解决这个问题,我将不胜感激。
谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!