我是这个领域的初学者,所以我对它没有什么感觉。。。
hbase版本:0.98.24-hadoop2
spark版本:2.1.0
下面的代码尝试将从spark streming kafka producer接收的数据放入hbase。
Kafka输入数据格式如下:
第1行,标签1123
第1行,标签2134
spark流处理通过分隔符“,”分割接收行,然后将数据放入hbase。但是,我的应用程序在调用htable.put()方法时遇到了一个错误。有人能解释一下为什么下面的代码会抛出错误吗?
谢谢您。
JavaDStream<String> records = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 7113426295831342436L;
HTable htable;
public HTable set() throws IOException{
Configuration hconfig = HBaseConfiguration.create();
hconfig.set("hbase.zookeeper.property.clientPort", "2222");
hconfig.set("hbase.zookeeper.quorum", "127.0.0.1");
HConnection hconn = HConnectionManager.createConnection(hconfig);
htable = new HTable(hconfig, tableName);
return htable;
};
@Override
public Iterator<String> call(String x) throws IOException {
////////////// Put into HBase /////////////////////
String[] data = x.split(",");
if (null != data && data.length > 2 ){
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
String ts = sdf.format(new Date());
Put put = new Put(Bytes.toBytes(ts));
put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("LINEID"), Bytes.toBytes(data[0]));
put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("TAGID"), Bytes.toBytes(data[1]));
put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("VAL"), Bytes.toBytes(data[2]));
/*I've checked data passed like this
{"totalColumns":3,"row":"20170120200927",
"families":{"TAGVALUE":
[{"qualifier":"LINEID","vlen":3,"tag[], "timestamp":9223372036854775807},
{"qualifier":"TAGID","vlen":3,"tag":[],"timestamp":9223372036854775807},
{"qualifier":"VAL","vlen":6,"tag" [],"timestamp":9223372036854775807}]}}*/
//*********************ERROR*******************//
htable.put(put);
htable.close();
}
return Arrays.asList(COLDELIM.split(x)).iterator();
}
});
错误代码:
Exception in thread "main" org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23, localhost, executor driver): java.lang.NullPointerException
at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:154)
at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:123)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
1条答案
按热度按时间zfycwa2u1#
您没有调用此方法
public HTable set() throws IOException
返回htable示例。因为htable示例为null,而您正在尝试对null执行操作
你得到如下npe