我们正在构建一个简单的流应用程序,它使用hbase rdd来连接传入的数据流。示例代码:
val indexState = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]).map { case (rowkey, v) => //some logic}
val result = dStream.transform { rdd =>
rdd.leftOuterJoin(indexState)
}
它工作得很好,但是当我们为streamingcontext启用检查点并让应用程序从以前创建的检查点恢复时,它总是抛出nullpointerexception。
ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
java.lang.NullPointerException
at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:119)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
有人遇到过同样的问题吗?版本:
Spark1.6.x
hadoop 2.7.x版
谢谢!
1条答案
按热度按时间qgelzfjb1#
spark流检查点不能用于从以前的作业中恢复,至少在1.6.x中是这样。如果作业已停止并重新提交,则无法重新使用检查点数据。在提交作业之前,必须删除所有旧的检查点数据。
[r] 无法从升级前代码的早期检查点信息重新启动。检查点信息基本上包含序列化的scala/java/python对象,尝试用新的、修改过的类反序列化对象可能会导致错误。在这种情况下,可以使用其他检查点目录启动升级的应用程序,也可以删除以前的检查点目录。
升级代码检查点