在每个apachespark工作节点上创建javahbase客户机示例

aemubtdh  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(422)

使用spark结构化流媒体。
我正在编写一个代码,需要在其中查找大量数据。查找非常复杂,只是不能很好地转换为连接。
e、 在表b中查找字段a并获取一个值,如果在另一个表中查找该值。如果找不到,则在表d中查找其他值c,依此类推。
我设法用hbase编写了这些查找,从功能上讲,它工作得很好。我为每个查找都编写了自定义项,例如,一个非常简单的自定义项可能是:

  1. val someColFunc= udf( (code:String) =>
  2. {
  3. val value = HbaseObject.table.getRow("lookupTable", code, "cf", "value1")
  4. if (value != null)
  5. Bytes.toString(value)
  6. else
  7. null
  8. }
  9. )

因为javahbase客户端是不可序列化的。我正在传递这样的hbase对象

  1. object HbaseObject {
  2. val table = new HbaseUtilities(zkUrl)
  3. }

hbaeutilities是我为简化查找而编写的一个类。它只是创建了一个javahbase客户机,并为我需要的get命令提供了一个 Package 器。
这是渲染我的代码太慢,这也,是好的。让我困惑的是,增加或减少执行器或内核的数量对代码的速度没有影响。不管是1个执行者还是30个执行者,它都在以完全相同的速率运行。这让我觉得缺乏平行性。所以我的所有工作人员必须共享同一个hbase对象。在每个worker开始执行之前,我是否可以用它们来示例化一个这样的对象?我已经试过使用lazy val了,没有任何效果
我甚至尝试过创建一个sharedsingleton,如图所示https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/,为我解决了一些问题,但没有失去并行性。
我知道可能有更好的方法来解决这个问题,所有的建议都是非常受欢迎的,但现在我陷入了一些限制和紧张的时间表。

lqfhib0f

lqfhib0f1#

通过使用hbase项目主分支中的hbase spark connector,您可以完成您正试图完成的任务。由于某些原因,连接器似乎没有包含在任何正式的hbase构建中,但是您可以自己构建它,而且它工作正常。只需构建jar并将其包含在pom.xml中。
一旦构建,连接器将允许您在worker类中传递hbase连接对象,因此您不必担心序列化连接或构建singleton/etc。
例如:

  1. JavaSparkContext jSPContext ...; //Create Java Spark Context
  2. HBaseConfiguration hbConf = HBaseConfiguration.create();
  3. hbConf.set("hbase.zookeeper.quorum", zkQuorum);
  4. hbConf.set("hbase.zookeeper.property.clientPort", PORT_NUM);
  5. // this is your key link to HBase from Spark -- use it every time you need to access HBase inside the Spark parallelism:
  6. JavaHBaseContext hBaseContext = new JavaHBaseContext(jSPContext, hbConf);
  7. // Create an RDD and parallelize it with HBase access:
  8. JavaRDD<String> myRDD = ... //create your RDD
  9. hBaseContext.foreachPartition(myRDD, new SparkHBaseWorker());
  10. // You can also do other usual Spark tasks, like mapPartitions, forEach, etc.
  11. // The Spark worker class for foreachPartition use-case on RDD of type String would look something like this:
  12. class SparkHBaseWorker implements VoidFunction<Tuple2<Iterator<String>, Connection>>
  13. {
  14. private static final long serialVersionUID = 1L;
  15. public WorkerIngest()
  16. {
  17. }
  18. // Put all your HBase logic into this function:
  19. @Override
  20. public void call(Tuple2<Iterator<String>, Connection> t) throws Exception
  21. {
  22. // This is your HBase connection object:
  23. Connection conn = t._2();
  24. // Now you can do direct access to HBase from this Spark worker node:
  25. Table hbTable = conn.getTable(TableName.valueOf(MY_TABLE));
  26. // now do something with the table/etc.
  27. }
  28. }
展开查看全部

相关问题