我正在尝试使用hbase作为spark的数据源。所以第一步是从hbase表创建rdd。因为spark使用hadoop输入格式,所以我可以通过创建rdd找到一种使用所有行的方法http://www.vidyasource.com/blog/programming/scala/java/data/hadoop/analytics/2014/01/25/lighting-a-spark-with-hbase 但是我们如何为范围扫描创建rdd呢?欢迎所有建议。
ix0qys7i1#
以下是在spark中使用scan的示例:
import java.io.{DataOutputStream, ByteArrayOutputStream} import java.lang.String import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Base64 def convertScanToString(scan: Scan): String = { val out: ByteArrayOutputStream = new ByteArrayOutputStream val dos: DataOutputStream = new DataOutputStream(out) scan.write(dos) Base64.encodeBytes(out.toByteArray) } val conf = HBaseConfiguration.create() val scan = new Scan() scan.setCaching(500) scan.setCacheBlocks(false) conf.set(TableInputFormat.INPUT_TABLE, "table_name") conf.set(TableInputFormat.SCAN, convertScanToString(scan)) val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) rdd.count
您需要将相关库添加到spark类路径,并确保它们与您的spark兼容。小贴士:你可以用 hbase classpath 找到他们。
hbase classpath
6psbrbz92#
您可以在下面设置 conf ```val conf = HBaseConfiguration.create()//need to set all param for habseconf.set(TableInputFormat.SCAN_ROW_START, "row2");conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
conf
这将只为那些reocrd加载rdd
ddarikpa3#
下面是一个带有 TableMapReduceUtil.convertScanToString(Scan scan): ```import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HConstants;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableInputFormat;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;
TableMapReduceUtil.convertScanToString(Scan scan):
import java.io.IOException;
public class HbaseScan {
public static void main(String ... args) throws IOException, InterruptedException { // Spark conf SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("My App"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); // Hbase conf Configuration conf = HBaseConfiguration.create(); conf.set(TableInputFormat.INPUT_TABLE, "big_table_name"); // Create scan Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); scan.setStartRow(Bytes.toBytes("a")); scan.setStopRow(Bytes.toBytes("d")); // Submit scan into hbase conf conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan)); // Get RDD JavaPairRDD<ImmutableBytesWritable, Result> source = jsc .newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); // Process RDD System.out.println(source.count()); }
}
3条答案
按热度按时间ix0qys7i1#
以下是在spark中使用scan的示例:
您需要将相关库添加到spark类路径,并确保它们与您的spark兼容。小贴士:你可以用
hbase classpath
找到他们。6psbrbz92#
您可以在下面设置
conf
```val conf = HBaseConfiguration.create()//need to set all param for habse
conf.set(TableInputFormat.SCAN_ROW_START, "row2");
conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
ddarikpa3#
下面是一个带有
TableMapReduceUtil.convertScanToString(Scan scan):
```import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
public class HbaseScan {
}