在每个apachespark工作节点上创建javahbase客户机示例

aemubtdh  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(346)

使用spark结构化流媒体。
我正在编写一个代码,需要在其中查找大量数据。查找非常复杂,只是不能很好地转换为连接。
e、 在表b中查找字段a并获取一个值,如果在另一个表中查找该值。如果找不到,则在表d中查找其他值c,依此类推。
我设法用hbase编写了这些查找,从功能上讲,它工作得很好。我为每个查找都编写了自定义项,例如,一个非常简单的自定义项可能是:

val someColFunc= udf( (code:String) =>
        {
            val value = HbaseObject.table.getRow("lookupTable", code, "cf", "value1")
            if (value != null)
                Bytes.toString(value)
            else
                null
        }
    )

因为javahbase客户端是不可序列化的。我正在传递这样的hbase对象

object HbaseObject {
 val table = new HbaseUtilities(zkUrl)
}

hbaeutilities是我为简化查找而编写的一个类。它只是创建了一个javahbase客户机,并为我需要的get命令提供了一个 Package 器。
这是渲染我的代码太慢,这也,是好的。让我困惑的是,增加或减少执行器或内核的数量对代码的速度没有影响。不管是1个执行者还是30个执行者,它都在以完全相同的速率运行。这让我觉得缺乏平行性。所以我的所有工作人员必须共享同一个hbase对象。在每个worker开始执行之前,我是否可以用它们来示例化一个这样的对象?我已经试过使用lazy val了,没有任何效果
我甚至尝试过创建一个sharedsingleton,如图所示https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/,为我解决了一些问题,但没有失去并行性。
我知道可能有更好的方法来解决这个问题,所有的建议都是非常受欢迎的,但现在我陷入了一些限制和紧张的时间表。

lqfhib0f

lqfhib0f1#

通过使用hbase项目主分支中的hbase spark connector,您可以完成您正试图完成的任务。由于某些原因,连接器似乎没有包含在任何正式的hbase构建中,但是您可以自己构建它,而且它工作正常。只需构建jar并将其包含在pom.xml中。
一旦构建,连接器将允许您在worker类中传递hbase连接对象,因此您不必担心序列化连接或构建singleton/etc。
例如:

JavaSparkContext jSPContext ...; //Create Java Spark Context
HBaseConfiguration hbConf = HBaseConfiguration.create();
hbConf.set("hbase.zookeeper.quorum", zkQuorum);
hbConf.set("hbase.zookeeper.property.clientPort", PORT_NUM);
// this is your key link to HBase from Spark -- use it every time you need to access HBase inside the Spark parallelism:
JavaHBaseContext hBaseContext = new JavaHBaseContext(jSPContext, hbConf);   

// Create an RDD and parallelize it with HBase access:
JavaRDD<String> myRDD = ... //create your RDD
hBaseContext.foreachPartition(myRDD,  new SparkHBaseWorker());
// You can also do other usual Spark tasks, like mapPartitions, forEach, etc.

// The Spark worker class for foreachPartition use-case on RDD of type String would look something like this:
class SparkHBaseWorker implements VoidFunction<Tuple2<Iterator<String>, Connection>>
{
    private static final long serialVersionUID = 1L;

    public WorkerIngest()
    {
    }

// Put all your HBase logic into this function:
    @Override
    public void call(Tuple2<Iterator<String>, Connection> t) throws Exception
    {           
        // This is your HBase connection object:
        Connection conn = t._2();
        // Now you can do direct access to HBase from this Spark worker node:
        Table hbTable = conn.getTable(TableName.valueOf(MY_TABLE));
        // now do something with the table/etc.
    }
}

相关问题