通过spark将csv文件加载到hbase

whitzsjs  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(468)

这是一个简单的“如何”问题:我们可以通过com.databricks.spark.csv将数据带到spark环境。我知道如何通过spark创建hbase表,并手动将数据写入hbase表。但是,是否可以通过spark将text/csv/jason文件直接加载到hbase?我看不到有人谈论这件事。所以,只是检查一下。如果可能的话,请引导我到一个好的网站,详细解释scala代码来完成它。
谢谢您,

9jyewag0

9jyewag01#

有多种方法可以做到这一点。
spark hbase连接器:
https://github.com/hortonworks-spark/shc
你可以在链接上看到很多例子。
您还可以使用spark core通过hbaseconfiguration将数据加载到hbase。
代码示例:

val fileRDD = sc.textFile(args(0), 2)
  val transformedRDD = fileRDD.map { line => convertToKeyValuePairs(line) }

  val conf = HBaseConfiguration.create()
  conf.set(TableOutputFormat.OUTPUT_TABLE, "tableName")
  conf.set("hbase.zookeeper.quorum", "localhost:2181")
  conf.set("hbase.master", "localhost:60000")
  conf.set("fs.default.name", "hdfs://localhost:8020")
  conf.set("hbase.rootdir", "/hbase")

  val jobConf = new Configuration(conf)
  jobConf.set("mapreduce.job.output.key.class", classOf[Text].getName)
  jobConf.set("mapreduce.job.output.value.class", classOf[LongWritable].getName)
  jobConf.set("mapreduce.outputformat.class", classOf[TableOutputFormat[Text]].getName)

  transformedRDD.saveAsNewAPIHadoopDataset(jobConf)

def convertToKeyValuePairs(line: String): (ImmutableBytesWritable, Put) = {

    val cfDataBytes = Bytes.toBytes("cf")
    val rowkey = Bytes.toBytes(line.split("\\|")(1))
    val put = new Put(rowkey)

    put.add(cfDataBytes, Bytes.toBytes("PaymentDate"), Bytes.toBytes(line.split("|")(0)))
    put.add(cfDataBytes, Bytes.toBytes("PaymentNumber"), Bytes.toBytes(line.split("|")(1)))
    put.add(cfDataBytes, Bytes.toBytes("VendorName"), Bytes.toBytes(line.split("|")(2)))
    put.add(cfDataBytes, Bytes.toBytes("Category"), Bytes.toBytes(line.split("|")(3)))
    put.add(cfDataBytes, Bytes.toBytes("Amount"), Bytes.toBytes(line.split("|")(4)))
    return (new ImmutableBytesWritable(rowkey), put)
  }

你也可以用这个
https://github.com/nerdammer/spark-hbase-connector

相关问题