我试图在spark任务的Dataframe上执行foreach循环。我用以下命令提交我的spark任务。
spark-submit --class Hive_Cis.DataAnalyze --master local --deploy-mode client --executor-memory 1g --name DataAnalyze --conf "spark.app.id=DataAnalyze" Hive_CIS-1.0-SNAPSHOT-jar-with-dependencies.jar
现在我的类是spark任务的驱动程序,如下所示。
enter public class DataAnalyze {
public static void main(String[] args){
SparkConf conf = new SparkConf().setAppName("DataAnalyze").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc);
DataFrame dataSrcsTable = hiveContext.sql("SELECT * FROM default.data_tables_metrics");
dataSrcsTable.show();
dataSrcsTable.foreach(new DataTableReader(hiveContext));
}//END OF MAIN
那么扩展abstractfunction1的类是:
public class DataTableReader extends AbstractFunction1 implements Serializable{
private HiveContext hiveConnection;
private static final long serialVersionUID = 1919222653470174456L;
public DataTableReader(HiveContext hiveData){
this.hiveConnection = hiveData;
}
@Override
public BoxedUnit apply(Object o) {
Row row = (Row)o;
DataFrame examinetable;
String tableName;
String query;
Timestamp lastCurrentDate;
long count;
if(!row.isNullAt(0)) {
tableName = row.getString(0);
count = row.getLong(1);
lastCurrentDate = row.getTimestamp(2);
System.out.println("\n"+ tableName + "\n");
query = new String("SELECT * from default.");
query = query + tableName;
System.out.println("\n"+ query + "\n");
//TODO this creates a null pointer issue
//Not serializing correctly maybe ?
examinetable = hiveConnection.sql(query);
try {
if (examinetable == null) {
System.out.println("\n\n Error input is null \n\n");
}
if (count < examinetable.count()) {
System.out.println("Count is low");
}
}catch (NullPointerException e){
System.out.println("\n Null pointer at get table \n");
}
}
return BoxedUnit.UNIT;
}
}//下课
在todo点处,以下堆栈跟踪存在空指针问题。
16/09/01 07:35:38 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
at org.apache.spark.sql.SQLConf.setConf(SQLConf.scala:588)
at org.apache.spark.sql.SQLContext.setConf(SQLContext.scala:128)
at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:555)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:553)
at org.apache.spark.sql.hive.HiveContext.parseSql(HiveContext.scala:333)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
at Hive_Cis.DataTableReader.apply(DataTableReader.java:53)
at Hive_Cis.DataTableReader.apply(DataTableReader.java:16)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/09/01 07:35:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
at org.apache.spark.sql.SQLConf.setConf(SQLConf.scala:588)
at org.apache.spark.sql.SQLContext.setConf(SQLContext.scala:128)
at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:555)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:553)
at org.apache.spark.sql.hive.HiveContext.parseSql(HiveContext.scala:333)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
at Hive_Cis.DataTableReader.apply(DataTableReader.java:53)
at Hive_Cis.DataTableReader.apply(DataTableReader.java:16)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
但是,我可以看到正在连接到配置单元元存储:
16/09/01 07:35:37 INFO hive.HiveContext: Initializing execution hive, version 1.1.0
16/09/01 07:35:37 INFO client.ClientWrapper: Inspected Hadoop version: 2.6.0-cdh5.7.2
16/09/01 07:35:37 INFO client.ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.7.2
16/09/01 07:35:38 INFO hive.metastore: Trying to connect to metastore with URI thrift://*************:9083
16/09/01 07:35:38 INFO hive.metastore: Opened a connection to metastore, current connections: 2
16/09/01 07:35:38 INFO hive.metastore: Connected to metastore.
任何帮助都将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!