我在hbase中有一个名为“sample”的表。我需要使用apachesparksql查询表。有没有任何方法可以使用apachesparksql查询读取hbase数据?
9w11ddsr1#
sparksql是一个内存查询引擎,要在hbase表上使用sparksql执行一些查询操作,您需要使用spark从hbase获取数据并创建spark rdd
SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("SparkApp"); sparkConf.setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); Configuration config = HBaseConfiguration.create(); config.addResource(new Path("/etc/hbase/hbase-site.xml")); config.addResource(new Path("/etc/hadoop/core-site.xml")); config.set(TableInputFormat.INPUT_TABLE, "sample"); JavaPairRDD<ImmutableBytesWritable, Result> hbaseRDD = javaSparkContext.newAPIHadoopRDD(hbaseConfig, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); JavaRDD<StudentBean> sampleRDD = hbaseRDD.map(new Function<Tuple2<ImmutableBytesWritable,Result>, StudentBean >() { private static final long serialVersionUID = -2021713021648730786L; public StudentBean call(Tuple2<ImmutableBytesWritable, Result> tuple) { StudentBean bean = new StudentBean (); Result result = tuple._2; bean.setRowKey(rowKey); bean.setFirstName(Bytes.toString(result.getValue(Bytes.toBytes("details"), Bytes.toBytes("firstName")))); bean.setLastName(Bytes.toString(result.getValue(Bytes.toBytes("details"), Bytes.toBytes("lastName")))); bean.setBranch(Bytes.toString(result.getValue(Bytes.toBytes("details"), Bytes.toBytes("branch")))); bean.setEmailId(Bytes.toString(result.getValue(Bytes.toBytes("details"), Bytes.toBytes("emailId")))); return bean; } });
使用这个rdd创建dataframe对象,并用一些临时表名注册它,然后就可以执行查询了
DataFrame schema = sqlContext.createDataFrame(sampleRDD, StudentBean.class); schema.registerTempTable("spark_sql_temp_table"); DataFrame schemaRDD = sqlContext.sql("YOUR_QUERY_GOES_HERE"); JavaRDD<StudentBean> result = schemaRDD.toJavaRDD().map(new Function<Row, StudentBean>() { private static final long serialVersionUID = -2558736294883522519L; public StudentBean call(Row row) throws Exception { StudentBean bean = new StudentBean(); // Do the mapping stuff here return bean; } });
1条答案
按热度按时间9w11ddsr1#
sparksql是一个内存查询引擎,要在hbase表上使用sparksql执行一些查询操作,您需要
使用spark从hbase获取数据并创建spark rdd
使用这个rdd创建dataframe对象,并用一些临时表名注册它,然后就可以执行查询了