sparksql性能

bjp0bcyl  于 2021-06-09  发布在  Hbase
关注(0)|答案(2)|浏览(385)

我的代码的算法如下
第一步。获取一个hbase实体数据到hbaserdd

JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = 
                 jsc.newAPIHadoopRDD(hbase_conf,  TableInputFormat.class,
                 ImmutableBytesWritable.class, Result.class);

第二步。将hbaserdd转换为rowpairdd

// in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data 
     JavaPairRDD<String, Row> rowPairRDD = hBaseRDD 
                            .mapToPair(***); 
    dataRDD.repartition(500);
        dataRDD.cache();

第三步。将rowpairdd转换为schemardd

JavaSchemaRDD schemaRDD =   sqlContext.applySchema(rowPairRDD.values(), schema); 
            schemaRDD.registerTempTable("testentity"); 
           sqlContext.sqlContext().cacheTable("testentity");

第四步。使用sparksql执行第一个简单的sql查询。

JavaSQLContext  sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
    JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE 
             column3 = 'value1' ") 
     List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();

第五步。使用sparksql执行第二个简单的sql查询。

JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity 
                                     WHERE column3 = 'value2' ") 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();

第六步。使用sparksql执行第三个简单的sql查询。

JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' "); 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();

试验结果如下:
测试用例1:
当我插入300000条记录时,hbase实体,然后运行代码。
第一次查询需要60407ms
第二个查询需要838毫秒
3td查询需要792ms
如果我使用hbase api进行类似的查询,只需要2000 ms。显然,最后2个spark sql查询比hbase api查询快得多。
我相信第一个sparksql查询会花费大量时间从hbase加载数据。
所以第一个查询比最后两个查询慢得多。我想结果是意料之中的
测试用例2:
当我插入40万条记录时。hbase实体,然后运行代码。
第一次查询需要87213毫秒
第二个查询需要83238ms
3td查询需要82092 ms
如果我使用hbase api进行类似的查询,只需要3500毫秒。显然,3个spark sql查询比hbase api查询慢得多。
最后两个sparksql查询也非常慢,性能与第一个查询相似,为什么?我如何调整性能?

rwqw0loc

rwqw0loc1#

我怀疑您正在尝试缓存比已分配给spark示例的数据更多的数据。我将尝试分解在执行完全相同的查询时发生的事情。
首先,spark中的一切都是懒惰的。这意味着当你打电话的时候 rdd.cache() ,除非你用rdd做点什么,否则什么都不会发生。
第一个查询
完全hbase扫描(慢)
增加分区数(导致无序,缓慢)
数据实际上被缓存到内存中,因为spark是懒惰的(有点慢)
应用where predicate (fast)
收集结果
第二/第三查询
全内存扫描(快速)
应用where predicate (fast)
收集结果
现在,spark将尝试缓存尽可能多的rdd。如果它不能缓存整个东西,你可能会遇到一些严重的减速。如果缓存之前的某个步骤导致了洗牌,则尤其如此。对于后续的每个查询,您可能会在第一个查询中重复步骤1-3。这不太理想。
要查看是否没有完全缓存rdd,请转到spark web ui( http://localhost:4040 如果处于本地独立模式)并查找rdd存储/持久性信息。确保它是100%。
编辑(根据评论):
我的hbase中有400000个数据大小,只有大约250mb。为什么我需要使用2g来修复这个问题(但是1g>>250mb)
我不能肯定你为什么会达到你的极限 spark.executor.memory=1G ,但我将添加一些有关缓存的更相关的信息。
spark只将执行器堆内存的一部分分配给缓存。默认情况下,这是 spark.storage.memoryFraction=0.6 或60%。所以你真的只有 1GB * 0.6 .
hbase中使用的总空间可能与spark中缓存时占用的总堆空间不同。默认情况下,spark在内存中存储时不会序列化java对象。因此,在存储java时会有相当大的开销 Object 元数据。您可以更改默认的持久性级别。
您知道如何缓存所有数据以避免第一次查询的性能不佳吗?
调用任何操作都会导致rdd被缓存。就这么做吧

scala> rdd.cache
scala> rdd.count

现在它被缓存了。

fcg9iug3

fcg9iug32#

我希望您在一次运行中一个接一个地运行这些查询,如果是,为什么要为每个查询创建单独的sqlcontext?您还可以尝试重新分区rdd,这将增加并行性。如果可能,还可以缓存rdd。
希望以上步骤能提高性能。

相关问题