通过spark流从hbase读取数据

tv6aics1  于 2021-06-10  发布在  Hbase
关注(0)|答案(2)|浏览(454)

所以我的项目流程是kafka->spark streaming->hbase
现在我想再次从hbase读取数据,它将遍历上一个作业创建的表,并进行一些聚合,然后以不同的列格式将其存储在另一个表中
Kafka->Spark流(2ms)->hbase->Spark流(10ms)->hbase
现在我不知道如何使用spark流从hbase读取数据。我发现了一个cloudera实验室项目是sparkonhbase(http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/)库,但我不知道如何从hbase获取用于流处理的inputdstream。请提供任何指针或库链接,如果有任何这将有助于我做到这一点。

gmxoilav

gmxoilav1#

拼接机(开源)有一个演示显示Spark流运行。
http://community.splicemachine.com/category/tutorials/data-ingestion-streaming/
下面是这个用例的示例代码。
https://github.com/splicemachine/splice-community-sample-code/tree/master/tutorial-kafka-spark-streaming

mbyulnm0

mbyulnm02#

您可以使用queuestream:streamingcontext从RDD队列创建数据流

JavaSparkContext sc = new JavaSparkContext(conf);
org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
JavaHBaseContext jhbc = new JavaHBaseContext(sc, hconf);
Scan scan1 = new Scan();           
scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName.getBytes());

// Create RDD
         rdd = jhbc.hbaseRDD(tableName, scan1, new Function<Tuple2<ImmutableBytesWritable, Result>, Tuple2<ImmutableBytesWritable, Result>>() {
            @Override
            public Tuple2<ImmutableBytesWritable, Result> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
                return immutableBytesWritableResultTuple2;
            }
        });

   // Create streaming context and queue
   JavaSparkStreamingContext ssc = new JavaSparkStramingContext(sc);

   Queue<JavaRDD<Tuple2<ImmutableBytesWritable, Result> >> queue =new Queue<JavaRDD<Tuple2<ImmutableBytesWritable, Result>>>( );
        queue.enqueue(rdd);

JavaDStream<Tuple2<ImmutableBytesWritable, Result>> ssc.queueStream(queue);

ps:你可以用spark(不用流媒体)

相关问题