我们有一个Kafka主题的数据流。我是用Spark流读的。
val ssc = new StreamingContext(l_sparkcontext, Seconds(30))
val kafkaStream = KafkaUtils.createStream(ssc, "xxxx.xx.xx.com:2181", "new-spark-streaming-group", Map("event_log" -> 10))
这个很好用。我想通过为流数据指定列来编写一个Parquet文件。因此,我做了以下几点
kafkaStream.foreachRDD(rdd => {
if (rdd.count() == 0 ) {
println("No new SKU's received in this time interval " + Calendar.getInstance().getTime())
}
else {
println("No of SKUs received " + rdd.count())
rdd.map(record => {
record._2
}).toDF("customer_id","sku","type","event","control_group","event_date").write.mode(SaveMode.Append).format("parquet").save(outputPath)
然而,这给出了一个错误
java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (1): _1
New column names (6): customer_id, sku, type, event, control_group, event_date
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.sql.DataFrame.toDF(DataFrame.scala:224)
at org.apache.spark.sql.DataFrameHolder.toDF(DataFrameHolder.scala:36)
at kafka_receive_messages$$anonfun$main$1.apply(kafka_receive_messages.scala:77)
at kafka_receive_messages$$anonfun$main$1.apply(kafka_receive_messages.scala:69)
请问我犯了什么错误。我们应该在Map上分开吗?如果我们这样做,那么我们就不能将它转换为todf(“…columns…”)
谢谢你的帮助。
当做
巴拉
1条答案
按热度按时间kmbjn2e31#
谢谢你过来。我已经解决了。这是一个编码问题。对于将来想这样做的人,请更改下面的else部分
再次感谢
巴拉