使用scala将csv数据加载到hbase

jc3wubiy  于 2021-06-08  发布在  Hbase
关注(0)|答案(1)|浏览(438)

有没有一种方法可以将csv文件加载到带有scala的hbase中而不必使用spark?我在找一个类似happybase的工具。

hfyxw5xn

hfyxw5xn1#

请检查此链接以供参考:http://www.mccarroll.net/snippets/hbaseload/ 希望有帮助。
hbaseimportexport是一个超类,它定义了一些公共代码

class HBaseImportExport {
  var DEFAULT_COLUMN_FAMILY: String = "c1"
  class HBaseCol(var family: String, var qualifier: String)
}

hbaseimporter将csv文件的第一行解释为字段名。在研究如何将csv数据导入hbase时。

import java.io.File
import java.io.FileInputStream
import java.io.IOException
import java.util.ArrayList
import java.util.HashSet
import java.util.List
import java.util.Set
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.log4j.Logger
import HBaseImporter._

object HBaseImporter {

  def main(args: Array[String]): Unit = {
    if (args.length < 2 || args.length > 3) {
      println(
        "Usage: HBaseImporter <tablename> <csv file path> [<key field name>]")
    }
    val tableName: String = args(0)
    val f: File = new File(args(1))
    var keyColumn: String = null
    if (args.length > 2) {
      keyColumn = args(2)
    }
    val importer: HBaseImporter = new HBaseImporter(tableName)
    importer.importCSV(f, keyColumn)
  }

}

class HBaseImporter(var tableName: String) extends HBaseImportExport {
  var admin: HBaseAdmin = _
  var config: Configuration = _
  var families: Set[String] = new HashSet[String]()
  var columns: List[HBaseCol] = new ArrayList[HBaseCol]()
  var keyPosition: Int = -1

  def init(): Unit = {
    config = HBaseConfiguration.create()
    admin = new HBaseAdmin(config)
  }

  private def deleteTable(): Unit = {
    try {
      admin.disableTable(tableName)
      admin.deleteTable(tableName)
    } catch {
      case e: Exception => {}

    }
  }

  private def createTable(): Unit = {
    val desc: HTableDescriptor = new HTableDescriptor(tableName)
    admin.createTable(desc)
    admin.disableTable(tableName)
    for (family <- families) {
      val cf1: HColumnDescriptor = new HColumnDescriptor(family)
      admin.addColumn(tableName, cf1)
    }
    admin.enableTable(tableName)
  }

  private def analyzeHeaders(headers: Array[String], keyColumn: String): Unit = {
    columns.clear()
    families.clear()
    var col: Int = 0
    for (header <- headers) {
      var family: String = DEFAULT_COLUMN_FAMILY
      var qualifier: String = header
      var pos: Int = 0
      if ((pos = header.indexOf(":")) > 0) {
        family = header.substring(0, pos)
        qualifier = header.substring(pos + 1)
      }
      columns.add(new HBaseCol(family, qualifier))
      families.add(family)
      if (header == keyColumn) {
        keyPosition = col
      }
      { col += 1; col - 1 }
    }
  }

  private def loadData(cis: CsvInputStream): Unit = {
    val table: HTable = new HTable(config, tableName)
    var vals: Array[String] = cis.readLine()
    val logger: Logger = org.apache.log4j.Logger.getLogger(this.getClass)
    var counter: Int = 0
    var rowId: String = ""
    while (vals != null) {
      rowId =
        if (keyPosition >= 0 && keyPosition < vals.length) vals(keyPosition)
        else "r" + counter
      val put: Put = new Put(rowId.getBytes("UTF-8"))
      var col: Int = 0
      for (column <- columns) {
        if (col >= vals.length) {
//break
        }
        put.add(column.family.getBytes("UTF-8"),
                column.qualifier.getBytes,
                vals(col).getBytes)
        col += 1
      }
      table.put(put)
      vals = cis.readLine()
      counter += 1
      if (counter % 10000 == 0) {
        logger.info("Imported " + counter + " records")
      }
    }
    cis.close()
  }

  /**
    * import CSV to an HBase table
    *
    * @param tableName name of the table in HBase
    * @param csvFile a file
    *
    * @throws IOException
    */
  def importCSV(csvFile: File, keyColumn: String): Unit = {
    init()
    val fis: FileInputStream = new FileInputStream(csvFile)
    val cis: CsvInputStream = new CsvInputStream(fis)
    // read field names from the first line of the csv file
    analyzeHeaders(cis.readLine(), keyColumn)
    deleteTable()
    createTable()
    loadData(cis)
    cis.close()
  }

}

相关问题