使用spark结构化流媒体。
我正在编写一个代码,需要在其中查找大量数据。查找非常复杂,只是不能很好地转换为连接。
e、 在表b中查找字段a并获取一个值,如果在另一个表中查找该值。如果找不到,则在表d中查找其他值c,依此类推。
我设法用hbase编写了这些查找,从功能上讲,它工作得很好。我为每个查找都编写了自定义项,例如,一个非常简单的自定义项可能是:
val someColFunc= udf( (code:String) =>
{
val value = HbaseObject.table.getRow("lookupTable", code, "cf", "value1")
if (value != null)
Bytes.toString(value)
else
null
}
)
因为javahbase客户端是不可序列化的。我正在传递这样的hbase对象
object HbaseObject {
val table = new HbaseUtilities(zkUrl)
}
hbaeutilities是我为简化查找而编写的一个类。它只是创建了一个javahbase客户机,并为我需要的get命令提供了一个 Package 器。
这是渲染我的代码太慢,这也,是好的。让我困惑的是,增加或减少执行器或内核的数量对代码的速度没有影响。不管是1个执行者还是30个执行者,它都在以完全相同的速率运行。这让我觉得缺乏平行性。所以我的所有工作人员必须共享同一个hbase对象。在每个worker开始执行之前,我是否可以用它们来示例化一个这样的对象?我已经试过使用lazy val了,没有任何效果
我甚至尝试过创建一个sharedsingleton,如图所示https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/,为我解决了一些问题,但没有失去并行性。
我知道可能有更好的方法来解决这个问题,所有的建议都是非常受欢迎的,但现在我陷入了一些限制和紧张的时间表。
1条答案
按热度按时间lqfhib0f1#
通过使用hbase项目主分支中的hbase spark connector,您可以完成您正试图完成的任务。由于某些原因,连接器似乎没有包含在任何正式的hbase构建中,但是您可以自己构建它,而且它工作正常。只需构建jar并将其包含在pom.xml中。
一旦构建,连接器将允许您在worker类中传递hbase连接对象,因此您不必担心序列化连接或构建singleton/etc。
例如: