kafka spark流式hbase插入问题

8aqjt8rx  于 2021-06-09  发布在  Hbase
关注(0)|答案(0)|浏览(204)

我正在使用kafka发送一个包含3列的文件,使用sparkstreaming1.3将其插入hbase。我的hbase是这样的:

ROW                      COLUMN+CELL
 zone:bizert             column=travail:call, timestamp=1491836364921, value=contact:numero
 zone:jendouba           column=travail:Big data, timestamp=1491835836290, value=contact:email
 zone:tunis              column=travail:info, timestamp=1491835897342, value=contact:num
3 row(s) in 0.4200 seconds

这就是我用spark流读取数据的方式,我用的是 spark-shell :

import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
 val ssc = new StreamingContext(sc, Seconds(10))
 val topicSet = Set ("zed")
 val kafkaParams = Map[String, String]("metadata.broker.list" -> "xx.xx.xxx.xx:9092")
 val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
 lines.foreachRDD(rdd => { (!rdd.partitions.isEmpty)
 lines.saveAsTextFiles("hdfs://xxxxx:8020/user/admin/zed/steams3/")
})

这段代码在我将数据保存到hdfs时起作用,即使它将许多空数据保存到hdfs。在写这个问题之前,我在这里搜索了一些和我类似的问题,但我没有找到一个好的解决办法。
你能提出最好的方法吗?。这就是我的代码现在的样子

val sc = new SparkContext("local", "Hbase spark")
val tableName = "notz"
    val conf = HBaseConfiguration.create()
    conf.addResource(new Path("file:///opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/etc/hbase/conf.dist/hbase-site.xml"))
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    val admin = new HBaseAdmin(conf)
lines.foreachRDD(rdd => { (!rdd.partitions.isEmpty)
if(!admin.isTableAvailable(tableName)) {

      print("Creating GHbase Table")
      val tableDesc = new HTableDescriptor(tableName)
      tableDesc.addFamily(new HColumnDescriptor("zone"
                                    .getBytes()))

      admin.createTable(tableDesc)

    }else{
      print("Table already exists!!")
    }
val myTable = new HTable(conf, tableName)

// i'm blocked here
    })

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题