我有一个spark的工作,就是把csv文件中的数据加载到mysql数据库中。
一切正常,但最近我注意到,Spark打开许多连接在插入阶段(300+连接)。对于每个insert语句来说,感觉就像打开一个新的连接,保持它的打开状态,并在某个时间点执行commit和关闭连接。有没有一种方法可以在每次插入后提交,或者在10k批处理后提交一次?
这将是不为每个插入打开连接。如果它需要处理1k条记录,这是很好的,但是当你处理数十亿条记录时,它会占用很多资源。
我有一个spark的工作,就是把csv文件中的数据加载到mysql数据库中。
一切正常,但最近我注意到,Spark打开许多连接在插入阶段(300+连接)。对于每个insert语句来说,感觉就像打开一个新的连接,保持它的打开状态,并在某个时间点执行commit和关闭连接。有没有一种方法可以在每次插入后提交,或者在10k批处理后提交一次?
这将是不为每个插入打开连接。如果它需要处理1k条记录,这是很好的,但是当你处理数十亿条记录时,它会占用很多资源。
1条答案
按热度按时间uz75evzq1#
如果您在Dataframe上有任何操作,即导致shuffl的Dataframe,默认情况下,spark,创建200个分区。导致200个到数据库的连接。
spark.sql.shuffle.partitions—配置为联接或聚合洗牌数据时要使用的分区数。-默认值:200
使用以下命令检查Dataframe的分区数:
df.rdd.getNumPartitions
在经常使用的列上使用以下命令重新划分Dataframe:df.repartition(NUMBER_OF_PARTIOTONS, col("Frequent_used_column"))
您还可以设置“batchsize”参数来控制每次往返要插入的行数。这有助于提高jdbc驱动程序的性能。默认为1000。