我有两个集群-1。ClouderaHadoop-spark作业在这里运行2。云-Cassandra集群,多个dc
在将spark作业中的Dataframe写入cassandra集群时,我正在spark中进行重新分区(repartioncount=10),然后再进行写入。见下表:
import org.apache.spark.sql.cassandra._
records.repartition(repartitionCount).write.cassandraFormat(table, keySpace)
.mode(SaveMode.Append)
.options(options)
.option(CassandraConnectorConf.LocalDCParam.name, cassandraDC.name)
.option(CassandraConnectorConf.ConnectionHostParam.name, cassandraDC.hosts)
.save()
在我的多租户spark集群中,对于具有20m记录的spark批处理负载,以及以下配置,我看到了大量的任务失败、资源抢占和动态失败。
spark.cassandra.output.batch.grouping.buffer.size=1000
spark.cassandra.output.batch.grouping.key=partition
spark.cassandra.output.concurrent.writes=20
spark.cassandra.connection.compression=LZ4
我该怎么调?重新分配是罪魁祸首吗?
ps:我一开始的理解是:对于一个有2000万行的负载,“重分区”应该将负载均匀地分布在执行器上(每个分区有2万行),批处理将在这些分区级别(2万行)上完成。但是现在,如果spark cassandra连接器在整个Dataframe级别(整个20m行)上进行批处理,我怀疑这是否会导致不必要的洗牌。
更新:删除“重分区”会大大降低ClouderaSpark集群的性能(在spark级别设置的默认分区是- spark.sql.shuffle.partitions: 200
),所以我挖得更深一点,发现我最初的理解是正确的。请注意,我的Spark和Cassandra集群是不同的。datastax spark cassandra连接器使用cassandra协调器节点为每个分区打开一个连接,因此我决定让它保持不变。正如alex所建议的,我已经减少了并发写操作,我相信这会有所帮助。
1条答案
按热度按时间e3bfsja21#
您不需要在spark中进行重新分区—只需将数据从spark写入cassandra,不要尝试更改spark cassandra连接器的默认值—它们在大多数情况下都可以正常工作。你需要看看发生了什么样的阶段性故障-很可能你只是因为一些原因而使cassandra过载
spark.cassandra.output.concurrent.writes=20
(使用默认值)(5
))—有时,编写器较少有助于更快地写入数据,因为您不会使cassandra过载,而且作业不会重新启动。附笔。
partition
在spark.cassandra.output.batch.grouping.key
-它不是spark分区,而是cassandra分区,它依赖于分区键列的值。