java—如何使用spark处理一系列hbase行?

mgdq6dx1  于 2021-06-04  发布在  Hadoop
关注(0)|答案(3)|浏览(481)

我正在尝试使用hbase作为spark的数据源。所以第一步是从hbase表创建rdd。因为spark使用hadoop输入格式,所以我可以通过创建rdd找到一种使用所有行的方法http://www.vidyasource.com/blog/programming/scala/java/data/hadoop/analytics/2014/01/25/lighting-a-spark-with-hbase 但是我们如何为范围扫描创建rdd呢?
欢迎所有建议。

ix0qys7i

ix0qys7i1#

以下是在spark中使用scan的示例:

import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64

def convertScanToString(scan: Scan): String = {
  val out: ByteArrayOutputStream = new ByteArrayOutputStream
  val dos: DataOutputStream = new DataOutputStream(out)
  scan.write(dos)
  Base64.encodeBytes(out.toByteArray)
}

val conf = HBaseConfiguration.create()
val scan = new Scan()
scan.setCaching(500)
scan.setCacheBlocks(false)
conf.set(TableInputFormat.INPUT_TABLE, "table_name")
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.count

您需要将相关库添加到spark类路径,并确保它们与您的spark兼容。小贴士:你可以用 hbase classpath 找到他们。

6psbrbz9

6psbrbz92#

您可以在下面设置 conf ```
val conf = HBaseConfiguration.create()//need to set all param for habse
conf.set(TableInputFormat.SCAN_ROW_START, "row2");
conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");

这将只为那些reocrd加载rdd
ddarikpa

ddarikpa3#

下面是一个带有 TableMapReduceUtil.convertScanToString(Scan scan): ```
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.io.IOException;

public class HbaseScan {

public static void main(String ... args) throws IOException, InterruptedException {

    // Spark conf
    SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("My App");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    // Hbase conf
    Configuration conf = HBaseConfiguration.create();
    conf.set(TableInputFormat.INPUT_TABLE, "big_table_name");

    // Create scan
    Scan scan = new Scan();
    scan.setCaching(500);
    scan.setCacheBlocks(false);
    scan.setStartRow(Bytes.toBytes("a"));
    scan.setStopRow(Bytes.toBytes("d"));

    // Submit scan into hbase conf
    conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan));

    // Get RDD
    JavaPairRDD<ImmutableBytesWritable, Result> source = jsc
            .newAPIHadoopRDD(conf, TableInputFormat.class,
                    ImmutableBytesWritable.class, Result.class);

    // Process RDD
    System.out.println(source.count());
}

}

相关问题