因此,我想编写一个代码,从hadoop hbase读取一条记录,然后将其存储到sparkrdd(弹性分布式数据集)中;读取一条rdd记录,然后写入hbase。我对这两者都不了解,我需要使用aws云或hadoop虚拟机。有人请引导我从头开始。
xpszyzbs1#
请使用scala中的基本代码,我们正在使用scala读取hbase中的数据。类似地,您可以编写一个表创建来将数据写入hbase
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result} import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor } import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark._ object HBaseApp { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HBaseApp").setMaster("local[2]") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() val tableName = "table1" System.setProperty("user.name", "hdfs") System.setProperty("HADOOP_USER_NAME", "hdfs") conf.set("hbase.master", "localhost:60000") conf.setInt("timeout", 100000) conf.set("hbase.zookeeper.quorum", "localhost") conf.set("zookeeper.znode.parent", "/hbase-unsecure") conf.set(TableInputFormat.INPUT_TABLE, tableName) val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(tableName)) { val tableDesc = new HTableDescriptor(tableName) admin.createTable(tableDesc) } val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) println("Number of Records found : " + hBaseRDD.count()) sc.stop() } }
1条答案
按热度按时间xpszyzbs1#
请使用scala中的基本代码,我们正在使用scala读取hbase中的数据。类似地,您可以编写一个表创建来将数据写入hbase