我正在使用spark cosmosdb连接器将大量数据写入cosmosdb容器。因为这是一个批量上传/写入,并且同时有读取操作。我想通过spark connector限制写入操作使用的RU。根据连接器的wiki,我找到了配置文件 WriteThroughputBudget
可用于限制写入消耗。https://github.com/azure/azure-cosmosdb-spark/wiki/configuration-references
根据维基, WriteThroughputBudget
是一个整数值,用于定义特定spark作业中的摄取操作不应超过的ru预算。
我试着用 option
写入Dataframe,如下所示
inputDataset.write.mode(SaveMode.Overwrite).format("com.microsoft.azure.cosmosdb.spark").option("WriteThroughputBudget", 400).options(writeConfig).save()
另外,尝试使用 options
写入Dataframe
val writeConfig: Map[String, String] = Map(
"Endpoint" -> accountEndpoint,
"Masterkey" -> accountKey,
"Database" -> database,
"Collection" -> collection,
"Upsert" -> "true",
"ConnectionMode" -> "Gateway",
"WritingBatchSize" -> "1000",
"WriteThroughputBudget" -> "1000")
在这两种情况下,写入操作都会失败并执行expetion
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
at com.microsoft.azure.cosmosdb.spark.CosmosDBConnectionCache$.getOrCreateBulkExecutor(CosmosDBConnectionCache.scala:104)
at com.microsoft.azure.cosmosdb.spark.CosmosDBConnection.getDocumentBulkImporter(CosmosDBConnection.scala:57)
at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$.bulkImport(CosmosDBSpark.scala:264)
at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$.com$microsoft$azure$cosmosdb$spark$CosmosDBSpark$$savePartition(CosmosDBSpark.scala:389)
at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$$anonfun$1.apply(CosmosDBSpark.scala:152)
at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$$anonfun$1.apply(CosmosDBSpark.scala:152)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我理解配置的价值 WriteThroughputBudget
,是整数值。即使我将整数作为字符串传递,它也应该隐式地强制转换它,但是如果失败了。
我还有别的方法可以说明吗 WriteThroughputBudget
选项。
1条答案
按热度按时间j0pj023g1#
特定版本3.0.5有问题。azure-cosmosdb-spark\ 2.4.0\ 2.11-3.0.5-uber.jar。
WriteThroughputBudget
可以使用以下版本:azure-cosmosdb-spark\ 2.4.0\ 2.11-3.4.0-uber.jar
azure-cosmosdb-spark\ 2.4.0\ 2.11-2.1.2-uber.jar
azure-cosmosdb-spark\ 2.4.0\ 2.11-3.6.1-uber.jar