有没有一种方法可以将csv文件加载到带有scala的hbase中而不必使用spark?我在找一个类似happybase的工具。
hfyxw5xn1#
请检查此链接以供参考:http://www.mccarroll.net/snippets/hbaseload/ 希望有帮助。hbaseimportexport是一个超类,它定义了一些公共代码
class HBaseImportExport { var DEFAULT_COLUMN_FAMILY: String = "c1" class HBaseCol(var family: String, var qualifier: String) }
hbaseimporter将csv文件的第一行解释为字段名。在研究如何将csv数据导入hbase时。
import java.io.File import java.io.FileInputStream import java.io.IOException import java.util.ArrayList import java.util.HashSet import java.util.List import java.util.Set import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.log4j.Logger import HBaseImporter._ object HBaseImporter { def main(args: Array[String]): Unit = { if (args.length < 2 || args.length > 3) { println( "Usage: HBaseImporter <tablename> <csv file path> [<key field name>]") } val tableName: String = args(0) val f: File = new File(args(1)) var keyColumn: String = null if (args.length > 2) { keyColumn = args(2) } val importer: HBaseImporter = new HBaseImporter(tableName) importer.importCSV(f, keyColumn) } } class HBaseImporter(var tableName: String) extends HBaseImportExport { var admin: HBaseAdmin = _ var config: Configuration = _ var families: Set[String] = new HashSet[String]() var columns: List[HBaseCol] = new ArrayList[HBaseCol]() var keyPosition: Int = -1 def init(): Unit = { config = HBaseConfiguration.create() admin = new HBaseAdmin(config) } private def deleteTable(): Unit = { try { admin.disableTable(tableName) admin.deleteTable(tableName) } catch { case e: Exception => {} } } private def createTable(): Unit = { val desc: HTableDescriptor = new HTableDescriptor(tableName) admin.createTable(desc) admin.disableTable(tableName) for (family <- families) { val cf1: HColumnDescriptor = new HColumnDescriptor(family) admin.addColumn(tableName, cf1) } admin.enableTable(tableName) } private def analyzeHeaders(headers: Array[String], keyColumn: String): Unit = { columns.clear() families.clear() var col: Int = 0 for (header <- headers) { var family: String = DEFAULT_COLUMN_FAMILY var qualifier: String = header var pos: Int = 0 if ((pos = header.indexOf(":")) > 0) { family = header.substring(0, pos) qualifier = header.substring(pos + 1) } columns.add(new HBaseCol(family, qualifier)) families.add(family) if (header == keyColumn) { keyPosition = col } { col += 1; col - 1 } } } private def loadData(cis: CsvInputStream): Unit = { val table: HTable = new HTable(config, tableName) var vals: Array[String] = cis.readLine() val logger: Logger = org.apache.log4j.Logger.getLogger(this.getClass) var counter: Int = 0 var rowId: String = "" while (vals != null) { rowId = if (keyPosition >= 0 && keyPosition < vals.length) vals(keyPosition) else "r" + counter val put: Put = new Put(rowId.getBytes("UTF-8")) var col: Int = 0 for (column <- columns) { if (col >= vals.length) { //break } put.add(column.family.getBytes("UTF-8"), column.qualifier.getBytes, vals(col).getBytes) col += 1 } table.put(put) vals = cis.readLine() counter += 1 if (counter % 10000 == 0) { logger.info("Imported " + counter + " records") } } cis.close() } /** * import CSV to an HBase table * * @param tableName name of the table in HBase * @param csvFile a file * * @throws IOException */ def importCSV(csvFile: File, keyColumn: String): Unit = { init() val fis: FileInputStream = new FileInputStream(csvFile) val cis: CsvInputStream = new CsvInputStream(fis) // read field names from the first line of the csv file analyzeHeaders(cis.readLine(), keyColumn) deleteTable() createTable() loadData(cis) cis.close() } }
1条答案
按热度按时间hfyxw5xn1#
请检查此链接以供参考:http://www.mccarroll.net/snippets/hbaseload/ 希望有帮助。
hbaseimportexport是一个超类,它定义了一些公共代码
hbaseimporter将csv文件的第一行解释为字段名。在研究如何将csv数据导入hbase时。