我有一个使用spark的应用程序(带有spark作业服务器),它使用cassandra存储。我目前的设置是 client
运行模式 master=local[*]
. 因此,有一个单Spark执行器,这也是驱动程序的进程,是使用所有8个核心的机器。我有一个cassandra示例在同一台机器上运行。
cassandra表有一个形式的主键((datasource\ id,date),clustering\ col\ 1…clustering\ col\ n),其中date是形式为“2019-02-07”的一天,是复合分区键的一部分。
在我的spark应用程序中,我运行的查询如下:
df.filter(col("date").isin(days: _*))
在spark物理计划中,我注意到这些过滤器以及“datasource\u id”分区键的过滤器被推送到cassandra cql查询。
对于我们最大的数据源,我知道分区的大小大约为30mb。因此,我在spark作业服务器配置中有以下设置:
spark.cassandra.input.split.size_in_mb = 1
但是我注意到在cassandra加载步骤中没有并行化。尽管有多个大于1mb的cassandra分区,但没有创建额外的spark分区。只有一个任务在一个核心上执行所有查询,因此需要大约20秒来加载1个月的日期范围内的数据,该范围对应于大约100万行。
我尝试了以下替代方法:
df union days.foldLeft(df)((df: DataFrame, day: String) => {
df.filter(col("date").equalTo(day))
})
这确实为cassandra中的每个“day”分区创建了一个spark分区(或任务)。但是,对于较小的数据源,其中cassandra分区的大小要小得多,这种方法在创建过多的任务和由于它们的协调而产生的开销方面被证明是非常昂贵的。对于这些数据源,将许多cassandra分区合并到一个spark分区是完全正确的。所以我想用 spark.cassandra.input.split.size_in_mb
配置在处理小型和大型数据源时都很有用。
我的理解错了吗?为了使这个配置生效,我还缺少什么吗?
p、 我也读过关于使用joinwithcassandratable的答案。然而,我们的代码依赖于使用Dataframe。另外,从cassandrardd到dataframe的转换对我们来说不是很可行,因为我们的模式是动态的,不能使用case类来指定。
暂无答案!
目前还没有任何答案,快来回答吧!