java—如何在spark中从hbase表中获取所有数据

8fsztsew  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(413)

我在hbase中有一个名为useraction的大表,它有三个列族(song、album、singer)。我需要从'song'列族中获取所有数据作为javardd对象。我试过这个代码,但效率不高。有没有更好的解决办法?

static SparkConf sparkConf = new SparkConf().setAppName("test").setMaster(
        "local[4]");
static JavaSparkContext jsc = new JavaSparkContext(sparkConf);

static void getRatings() {

    Configuration conf = HBaseConfiguration.create();
    conf.set(TableInputFormat.INPUT_TABLE, "UserAction");
    conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "song");

    JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc
            .newAPIHadoopRDD(
                    conf,
                    TableInputFormat.class,
                    org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
                    org.apache.hadoop.hbase.client.Result.class);

    JavaRDD<Rating> count = hBaseRDD
            .map(new Function<Tuple2<ImmutableBytesWritable, Result>, JavaRDD<Rating>>() {

                @Override
                public JavaRDD<Rating> call(
                        Tuple2<ImmutableBytesWritable, Result> t)
                        throws Exception {
                    Result r = t._2;
                    int user = Integer.parseInt(Bytes.toString(r.getRow()));
                    ArrayList<Rating> ra = new ArrayList<>();

                    for (Cell c : r.rawCells()) {

                        int product = Integer.parseInt(Bytes
                                .toString(CellUtil.cloneQualifier(c)));
                        double rating = Double.parseDouble(Bytes
                                .toString(CellUtil.cloneValue(c)));

                        ra.add(new Rating(user, product, rating));
                    }

                    return jsc.parallelize(ra);
                }
            })
            .reduce(new Function2<JavaRDD<Rating>, JavaRDD<Rating>, JavaRDD<Rating>>() {
                @Override
                public JavaRDD<Rating> call(JavaRDD<Rating> r1,
                        JavaRDD<Rating> r2) throws Exception {
                    return r1.union(r2);
                }
            });
    jsc.stop();
}

宋柱家族方案设计为:

RowKey = userID, columnQualifier = songID and value = rating.
wmvff8tz

wmvff8tz1#

更新:好吧,我现在看到你的问题了,因为一些疯狂的原因你把你的数组变成了rdd return jsc.parallelize(ra); . 你为什么这么做??为什么要创建rdd的rdd??为什么不把它们作为数组呢?在执行reduce操作时,可以连接数组。rdd是一个抗干扰的分布式数据集—拥有一个由分布式数据集组成的分布式数据集在逻辑上是没有意义的。我很惊讶你的工作竟然跑了而且没有崩溃!不管怎么说,这就是你工作这么慢的原因。
不管怎样,在你的Map之后的scala,你只需要做一个 flatMap(identity) 这会把你所有的名单连在一起。
我真的不明白为什么你需要做一个减少,也许这就是你有一些低效的地方。这是我用来读取hbase表的代码(它的泛化-即适用于任何方案)。需要确保的一点是,在读取hbase表时,确保分区的数量是合适的(通常需要很多分区)。

type HBaseRow = java.util.NavigableMap[Array[Byte],
  java.util.NavigableMap[Array[Byte], java.util.NavigableMap[java.lang.Long, Array[Byte]]]]
// Map(CF -> Map(column qualifier -> Map(timestamp -> value)))
type CFTimeseriesRow = Map[Array[Byte], Map[Array[Byte], Map[Long, Array[Byte]]]]

def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
  navMap.asScala.toMap.map(cf =>
    (cf._1, cf._2.asScala.toMap.map(col =>
      (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))

def readTableAll(table: String): RDD[(Array[Byte], CFTimeseriesRow)] = {
  val conf = HBaseConfiguration.create()
  conf.set(TableInputFormat.INPUT_TABLE, table)
  sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])
  .map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
}

如您所见,我不需要在代码中进行缩减。这些方法是相当自我解释的。我可以深入研究你的代码,但我没有耐心阅读java,因为它太冗长了。
我有一些代码专门用于从行(而不是整个历史)获取最新的元素。如果你想看的话就告诉我。
最后,建议您考虑使用cassandra而不是hbase,因为datastax正在与databricks合作。

相关问题