我已经将一个广播数组列表定义为public static,当在job handler方法中启动新作业时,这个数组列表(数组列表的名称:“qlist”)填充了新值,然后在dstream lambda闭包中使用这个数组列表,但在spark cluster上运行时,作业失败,并显示消息“null pointer exception”:
原因:org.apache.spark.sparkexception:由于阶段失败而中止作业:阶段17.0中的任务1失败了4次,最近的失败:阶段17.0中的任务1.3丢失(tid 40,192.168.1.97,executor 0):qprocessing.lambda$3处的java.lang.nullpointerexception(qprocessing)。java:345) ...
我的代码:
@Override
public void onBatchSubmitted(StreamingListenerBatchSubmitted arg0) {
// TODO Auto-generated method stub
QProcessing.qList.value().clear();
for(int i = 0; i < 2; i++)
try {
QProcessing.qList.value().add(i, QProcessing.bufferedReader.readLine());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
...
private static JavaPairDStream<Long, List<String>> DistributeSerach(
JavaPairDStream<Long, BPlusTree<Integer, String>> inputRDD, int role, int accessControlType,
boolean topkAttach,int i) {
return inputRDD.mapToPair(index -> {
List<String> searchResult = null;
Instant startDistributedBPTSearch = Instant.now();
searchResult = index._2.searchRange(Integer.parseInt(QProcessing.qList.value()[i].split(",")[0]),
BPlusTree.RangePolicy.INCLUSIVE,Integer.parseInt(QProcessing.qList.value()[i].split(",")[1]),
BPlusTree.RangePolicy.INCLUSIVE, role, accessControlType, topkAttach);
Instant endDistributedBPTSearch = Instant.now();
Duration timeElapsedDistributedBPTSearch = Duration.between(startDistributedBPTSearch,
endDistributedBPTSearch);
Tuple2<Long, List<String>> tuple = new Tuple2<Long, List<String>>(
timeElapsedDistributedBPTSearch.toMillis(), searchResult);
return tuple;
});
}
暂无答案!
目前还没有任何答案,快来回答吧!