如何在spark cosmosdb连接器中传递“writethroughputbudget”配置

neskvpey  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(349)

我正在使用spark cosmosdb连接器将大量数据写入cosmosdb容器。因为这是一个批量上传/写入,并且同时有读取操作。我想通过spark connector限制写入操作使用的RU。根据连接器的wiki,我找到了配置文件 WriteThroughputBudget 可用于限制写入消耗。https://github.com/azure/azure-cosmosdb-spark/wiki/configuration-references
根据维基, WriteThroughputBudget 是一个整数值,用于定义特定spark作业中的摄取操作不应超过的ru预算。
我试着用 option 写入Dataframe,如下所示

inputDataset.write.mode(SaveMode.Overwrite).format("com.microsoft.azure.cosmosdb.spark").option("WriteThroughputBudget", 400).options(writeConfig).save()

另外,尝试使用 options 写入Dataframe

val writeConfig: Map[String, String] = Map(
      "Endpoint" -> accountEndpoint,
      "Masterkey" -> accountKey,
      "Database" -> database,
      "Collection" -> collection,
      "Upsert" -> "true",
      "ConnectionMode" -> "Gateway",
      "WritingBatchSize" -> "1000",
      "WriteThroughputBudget" -> "1000")

在这两种情况下,写入操作都会失败并执行expetion

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
        at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
        at com.microsoft.azure.cosmosdb.spark.CosmosDBConnectionCache$.getOrCreateBulkExecutor(CosmosDBConnectionCache.scala:104)
        at com.microsoft.azure.cosmosdb.spark.CosmosDBConnection.getDocumentBulkImporter(CosmosDBConnection.scala:57)
        at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$.bulkImport(CosmosDBSpark.scala:264)
        at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$.com$microsoft$azure$cosmosdb$spark$CosmosDBSpark$$savePartition(CosmosDBSpark.scala:389)
        at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$$anonfun$1.apply(CosmosDBSpark.scala:152)
        at com.microsoft.azure.cosmosdb.spark.CosmosDBSpark$$anonfun$1.apply(CosmosDBSpark.scala:152)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我理解配置的价值 WriteThroughputBudget ,是整数值。即使我将整数作为字符串传递,它也应该隐式地强制转换它,但是如果失败了。
我还有别的方法可以说明吗 WriteThroughputBudget 选项。

j0pj023g

j0pj023g1#

特定版本3.0.5有问题。azure-cosmosdb-spark\ 2.4.0\ 2.11-3.0.5-uber.jar。 WriteThroughputBudget 可以使用以下版本:
azure-cosmosdb-spark\ 2.4.0\ 2.11-3.4.0-uber.jar
azure-cosmosdb-spark\ 2.4.0\ 2.11-2.1.2-uber.jar
azure-cosmosdb-spark\ 2.4.0\ 2.11-3.6.1-uber.jar

相关问题