scala—将Dataframe从spark集群写入cassandra集群:分区和性能调优

qcbq4gxm  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(495)

我有两个集群-1。ClouderaHadoop-spark作业在这里运行2。云-Cassandra集群,多个dc
在将spark作业中的Dataframe写入cassandra集群时,我正在spark中进行重新分区(repartioncount=10),然后再进行写入。见下表:

import org.apache.spark.sql.cassandra._
records.repartition(repartitionCount).write.cassandraFormat(table, keySpace)
  .mode(SaveMode.Append)
  .options(options)
  .option(CassandraConnectorConf.LocalDCParam.name, cassandraDC.name)
  .option(CassandraConnectorConf.ConnectionHostParam.name, cassandraDC.hosts)
  .save()

在我的多租户spark集群中,对于具有20m记录的spark批处理负载,以及以下配置,我看到了大量的任务失败、资源抢占和动态失败。

spark.cassandra.output.batch.grouping.buffer.size=1000
spark.cassandra.output.batch.grouping.key=partition
spark.cassandra.output.concurrent.writes=20 
spark.cassandra.connection.compression=LZ4

我该怎么调?重新分配是罪魁祸首吗?
ps:我一开始的理解是:对于一个有2000万行的负载,“重分区”应该将负载均匀地分布在执行器上(每个分区有2万行),批处理将在这些分区级别(2万行)上完成。但是现在,如果spark cassandra连接器在整个Dataframe级别(整个20m行)上进行批处理,我怀疑这是否会导致不必要的洗牌。
更新:删除“重分区”会大大降低ClouderaSpark集群的性能(在spark级别设置的默认分区是- spark.sql.shuffle.partitions: 200 ),所以我挖得更深一点,发现我最初的理解是正确的。请注意,我的Spark和Cassandra集群是不同的。datastax spark cassandra连接器使用cassandra协调器节点为每个分区打开一个连接,因此我决定让它保持不变。正如alex所建议的,我已经减少了并发写操作,我相信这会有所帮助。

e3bfsja2

e3bfsja21#

您不需要在spark中进行重新分区—只需将数据从spark写入cassandra,不要尝试更改spark cassandra连接器的默认值—它们在大多数情况下都可以正常工作。你需要看看发生了什么样的阶段性故障-很可能你只是因为一些原因而使cassandra过载 spark.cassandra.output.concurrent.writes=20 (使用默认值)( 5 ))—有时,编写器较少有助于更快地写入数据,因为您不会使cassandra过载,而且作业不会重新启动。
附笔。 partitionspark.cassandra.output.batch.grouping.key -它不是spark分区,而是cassandra分区,它依赖于分区键列的值。

相关问题