我有下面的方法写入cassandra一些时间它保存数据罚款。当我再次运行它,有时它是抛出NullPointerException不确定是什么地方出了问题...你能请帮助我。
'
@throws(classOf[IOException])
def writeDfToCassandra(o_model_family:DataFrame , keyspace:String, columnFamilyName: String) = {
logger.info(s"writeDfToCassandra")
o_model_family.write.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> columnFamilyName, "keyspace" -> keyspace ))
.mode(SaveMode.Append)
.save()
}
'
18/10/29 05:23:56 ERROR BMValsProcessor: java.lang.NullPointerException
at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
at java.util.regex.Matcher.reset(Matcher.java:309)
at java.util.regex.Matcher.<init>(Matcher.java:229)
at java.util.regex.Pattern.matcher(Pattern.java:1093)
at scala.util.matching.Regex.findFirstIn(Regex.scala:388)
at org.apache.spark.util.Utils$$anonfun$redact$1$$anonfun$apply$15.apply(Utils.scala:2698)
at org.apache.spark.util.Utils$$anonfun$redact$1$$anonfun$apply$15.apply(Utils.scala:2698)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.util.Utils$$anonfun$redact$1.apply(Utils.scala:2698)
at org.apache.spark.util.Utils$$anonfun$redact$1.apply(Utils.scala:2696)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.util.Utils$.redact(Utils.scala:2696)
at org.apache.spark.util.Utils$.redact(Utils.scala:2663)
at org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions$1.apply(SQLConf.scala:1650)
at org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions$1.apply(SQLConf.scala:1650)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.internal.SQLConf.redactOptions(SQLConf.scala:1650)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.simpleString(SaveIntoDataSourceCommand.scala:52)
at org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:178)
at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:556)
at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:480)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$4.apply(QueryExecution.scala:198)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$4.apply(QueryExecution.scala:198)
at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:198)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at com.snp.utils.DbUtils$.writeDfToCassandra(DbUtils.scala:47)
1条答案
按热度按时间qyyhg6bp1#
奇怪的是,这在Spark Utils的“redact”函数中是失败的。这是在可能传递给Spark的选项上使用的,以从UI等中删除敏感数据。我不能想象为什么在SqlConf中会弹出一个空的键名(因为我相信你只能有空字符串),但我会检查那里。可能是在执行方法时conf的一个突变?