spark流数据问题将数据放入hbase

nlejzf6q  于 2021-06-10  发布在  Hbase
关注(0)|答案(1)|浏览(369)

我是这个领域的初学者,所以我对它没有什么感觉。。。
hbase版本:0.98.24-hadoop2
spark版本:2.1.0
下面的代码尝试将从spark streming kafka producer接收的数据放入hbase。
Kafka输入数据格式如下:
第1行,标签1123
第1行,标签2134
spark流处理通过分隔符“,”分割接收行,然后将数据放入hbase。但是,我的应用程序在调用htable.put()方法时遇到了一个错误。有人能解释一下为什么下面的代码会抛出错误吗?
谢谢您。

JavaDStream<String> records = lines.flatMap(new FlatMapFunction<String, String>() {   
    private static final long serialVersionUID = 7113426295831342436L;

    HTable htable; 
    public HTable set() throws IOException{ 
        Configuration hconfig = HBaseConfiguration.create();
        hconfig.set("hbase.zookeeper.property.clientPort", "2222");
        hconfig.set("hbase.zookeeper.quorum", "127.0.0.1");  

        HConnection hconn = HConnectionManager.createConnection(hconfig);  

        htable = new HTable(hconfig, tableName); 

        return htable;  
    };  
    @Override
    public Iterator<String> call(String x) throws IOException {  

        ////////////// Put into HBase   ///////////////////// 
        String[] data = x.split(",");   

        if (null != data && data.length > 2 ){ 
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");   
            String ts = sdf.format(new Date());  

            Put put = new Put(Bytes.toBytes(ts)); 

            put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("LINEID"), Bytes.toBytes(data[0]));
            put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("TAGID"), Bytes.toBytes(data[1]));
            put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("VAL"), Bytes.toBytes(data[2]));

/*I've checked data passed like this 
{"totalColumns":3,"row":"20170120200927",
"families":{"TAGVALUE":
[{"qualifier":"LINEID","vlen":3,"tag[],  "timestamp":9223372036854775807},
{"qualifier":"TAGID","vlen":3,"tag":[],"timestamp":9223372036854775807},
{"qualifier":"VAL","vlen":6,"tag" [],"timestamp":9223372036854775807}]}}*/ 

//*********************ERROR*******************//   
            htable.put(put);  
            htable.close();  

        }

        return Arrays.asList(COLDELIM.split(x)).iterator(); 
    } 
});

错误代码:

Exception in thread "main" org.apache.spark.SparkException: Job 

aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23, localhost, executor driver): java.lang.NullPointerException
at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:154)
at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:123)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
zfycwa2u

zfycwa2u1#

您没有调用此方法 public HTable set() throws IOException 返回htable示例。
因为htable示例为null,而您正在尝试对null执行操作

htable.put()

你得到如下npe

stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23, localhost, executor driver): java.lang.NullPointerException

相关问题