下面的代码将读取hbase,然后将其转换为json结构并转换为schemardd,但问题是我 using List
为了存储json字符串,然后将其传递给javardd,对于大约100GB的数据,主服务器将在内存中加载数据。从hbase加载数据然后执行操作,然后转换为javardd的正确方法是什么。
package hbase_reader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import com.google.common.collect.Lists;
public class hbase_reader {
public static void main(String[] args) throws IOException, ParseException {
List<String> jars = Lists.newArrayList("");
SparkConf spconf = new SparkConf();
spconf.setMaster("local[2]");
spconf.setAppName("HBase");
//spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
spconf.setJars(jars.toArray(new String[jars.size()]));
JavaSparkContext sc = new JavaSparkContext(spconf);
//spconf.set("spark.executor.memory", "1g");
JavaSQLContext jsql = new JavaSQLContext(sc);
HBaseConfiguration conf = new HBaseConfiguration();
String tableName = "HBase.CounData1_Raw_Min1";
HTable table = new HTable(conf,tableName);
try {
ResultScanner scanner = table.getScanner(new Scan());
List<String> jsonList = new ArrayList<String>();
String json = null;
for(Result rowResult:scanner) {
json = "";
String rowKey = Bytes.toString(rowResult.getRow());
for(byte[] s1:rowResult.getMap().keySet()) {
String s1_str = Bytes.toString(s1);
String jsonSame = "";
for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
String s2_str = Bytes.toString(s2);
for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
jsonSame += "\""+s2_str+"\":"+s3_str+",";
}
}
jsonSame = jsonSame.substring(0,jsonSame.length()-1);
json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
}
json = json.substring(0,json.length()-1);
json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
jsonList.add(json);
}
JavaRDD<String> jsonRDD = sc.parallelize(jsonList);
JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);
System.out.println(schemaRDD.take(2));
} finally {
table.close();
}
}
}
4条答案
按热度按时间3hvapo4f1#
由于这个问题并不新鲜,目前还有其他几种选择:
hbase spark,hbase repo中直接提供的模块
hortonworks的hbaseSpark
我不太了解第一个项目,但它看起来不支持Spark2.x。不过,它在rdd级别对spark 1.6.x有丰富的支持。
另一方面,hbase上的spark有spark2.0和即将推出的spark2.1的分支。这个项目非常有前途,因为它专注于数据集/Dataframeapi。在幕后,它实现了标准的spark数据源api,并利用sparkcatalyst引擎进行查询优化。开发人员在这里声称它能够进行分区修剪、列修剪、 predicate 下推和实现数据局部性。
一个简单的例子,使用
com.hortonworks:shc:1.0.0-2.0-s_2.11
下面将介绍此回购和spark 2.0.2中的工件:u3r8eeie2#
只需添加有关如何添加扫描的注解:
tableinputformat具有以下属性:
扫描\u行\u开始
扫描\u行\u停止
6qfn3psc3#
我更喜欢阅读hbase,并在spark中进行json操作。
spark提供javasparkcontext.newapihadooprdd函数从hadoop存储(包括hbase)读取数据。您必须在配置参数和表输入格式及其键值中提供hbase配置、表名和扫描
您可以使用表输入格式类及其作业参数来提供表名和扫描配置
例子:
然后可以在spark中执行json操作。由于spark可以在内存已满时进行重新计算,因此它只加载重新计算部分(cmiiw)所需的数据,因此您不必担心数据大小
whitzsjs4#
作为使用spark(scala)读取hbase数据的基本示例,您还可以在java中编写:
2016年更新
从spark 1.0.x+开始,现在您还可以使用spark hbase连接器:
maven依赖项包括:
并找到以下相同的示例代码:
2017年更新
从spark 1.6.x+开始,现在您还可以使用shc连接器(hortonworks或hdp用户):
maven依赖项包括:
使用此连接器的主要优点是它在模式定义方面具有灵活性,并且不需要像nerdammer/spark hbase连接器那样硬编码参数。还请记住,它支持spark 2.x,因此该连接器非常灵活,并在问题和prs中提供端到端支持。
请在下面的存储库路径中查找最新的自述和示例:
hortonworks spark hbase连接器
您还可以将这个rdd转换为dataframes并在其上运行sql,或者可以将这些dataset或dataframesMap到用户定义的JavaPOJO或case类。它工作得很出色。
如果你还需要什么,请在下面评论。