查找表太大,无法放入内存的批处理作业(spark)

iszxjhcz  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(405)

我正在尝试编写一个批处理作业来处理当前位于hbase数据库(aws中的emr集群)中的几百TB的数据,所有这些数据都在一个大表中。对于我正在处理的每一行,我都需要从第二个hbase表中的查找表(一个简单的整数到字符串的Map)中获取额外的数据。我们每排查5到10次。
我当前的实现使用一个spark作业,它将输入表的分区分发给它的worker,如下所示:

Configuration hBaseConfig = newHBaseConfig();
hBaseConfig.set(TableInputFormat.SCAN, convertScanToString(scan));
hBaseConfig.set(TableInputFormat.INPUT_TABLE, tableName);

JavaPairRDD<ImmutableBytesWritable, Result> table = sparkContext.newAPIHadoopRDD(hBaseConfig, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
table.map(val -> { 
    // some preprocessing  
}).foreachPartition(p -> {
    p.forEachRemaining(row -> {
        // code that does the lookup
    });
});

问题是查找表太大,无法放入工作者的内存中。它们都需要访问查找表的所有部分,但它们的访问模式将从缓存中获得显著的好处。
我不能使用简单的map作为广播变量,因为它需要放入内存,这样的想法对吗?
spark使用的是无共享体系结构,因此我认为在所有工作线程之间共享缓存并不是一种简单的方法,但是我们能为每个工作线程构建一个简单的lru缓存吗?
如何实现这样一个本地工作缓存,以便在缓存未命中时从hbase中的查找表中获取数据?我是否可以将对第二个表的引用分发给所有工作人员?
除了作为数据源的hbase之外,我对技术的选择没有兴趣。除了spark之外,还有其他框架更适合我的用例吗?

sqxo8psd

sqxo8psd1#

您有几个选项来处理此要求:
1-使用rdd或数据集联接
您可以将两个hbase表作为spark rdd或数据集加载,然后执行 join 在查找键上。spark将把这两个rdd分割成分区,并对内容进行洗牌,以便具有相同密钥的行最终位于相同的执行器上。通过管理spark中的分区数,您应该能够连接任意大小的2个表。
2-广播解析程序示例
您可以广播执行hbase查找和临时lru缓存的解析器示例,而不是广播map。每个执行器将获得这个示例的一个副本,并且可以管理自己的缓存,您可以在中调用它们 foreachPartition() 代码。
注意,解析器示例需要实现可序列化,因此必须将缓存、hbase连接和hbase配置属性声明为 transient ,以便在每个执行器上初始化。
我在scala中维护的一个项目上运行了这样一个设置:如果您知道自己的访问模式并有效地管理缓存,那么它可以工作并且比直接的spark连接更有效
3-使用hbase spark连接器实现查找逻辑
apachehbase最近加入了改进的hbase spark连接器。现在文档非常稀少,您需要查看jira票据和这些工具cloudera的sparkonhbase的前一个版本的文档,但是测试套件中的最后一个单元测试看起来非常像您想要的
不过,我对这个api没有经验。

相关问题