当我执行下面的代码时给予错误“java.lang.IllegalArgumentException:要求失败:未注册任何输出操作,因此没有要执行的内容”任何人都可以帮助我吗?
val ssc = new StreamingContext(sc, Seconds(1))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
var imput: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ENGLISH)
var dia: SimpleDateFormat = new SimpleDateFormat("dd")
var mes: java.text.SimpleDateFormat = new SimpleDateFormat("MM");
var ano: java.text.SimpleDateFormat = new SimpleDateFormat("yyyy");
var diacv: String = ""
var mescv: String = ""
var anocv: String = ""
case class Data(diacv: String, mescv: String, anocv: String)
val columns = Seq("timecol")
val data = ssc.cassandraTable("smartgrids","analyzer_temp")
.select(columns.map(ColumnName(_)).take(1):_*)
val dstream = new ConstantInputDStream(ssc, data)
class foo() {
import sqlContext.implicits._
val result = dstream.map(row => (
diacv = dia.format(imput.parse(row.toString())),
mescv = mes.format(imput.parse(row.toString())),
anocv = ano.format(imput.parse(row.toString()))
)
).toDF()
result.registerTempTable("result")
// return result
}
ssc.start()
ssc.awaitTermination()
1条答案
按热度按时间e5nszbig1#
我遇到了同样的问题,流没有输出操作。添加下面的函数就可以了
print() foreachRDD(func) saveAsObjectFiles(prefix, [suffix]) saveAsTextFiles(prefix, [suffix]) saveAsHadoopFiles(prefix, [suffix])