我有一个在dataproc集群上为不同区域并行执行spark作业的流。它为每个区域创建一个集群,执行spark作业并在完成后删除集群。
spark作业使用 org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset
方法传递bigquery配置以在bigquery表上保存数据。作业将数据保存在多个表中,每个作业多次调用saveasnewapihadoopdataset方法。
问题是,有时我会收到一个由hadoop临时bigquery数据集中的冲突引起的错误,该数据集是它为运行作业而在内部创建的:
Exception in thread "main" com.google.api.client.googleapis.json.GoogleJsonResponseException: 409 Conflict
{
"code" : 409,
"errors" : [ {
"domain" : "global",
"message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013",
"reason" : "duplicate"
} ],
"message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013"
}
at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1056)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.io.bigquery.BigQueryOutputCommitter.setupJob(BigQueryOutputCommitter.java:107)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1150)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1078)
at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:819)
...
上面异常的时间戳201802250620_有一个sufix,我不确定它是否代表时间。
我的想法是,有时作业同时运行,并尝试在名称中创建具有相同时间戳的数据集。在并行作业中或在另一个saveasnewapihadoopdataset调用的同一作业中。
我们如何避免这个错误而不延迟作业的执行?
我使用的依赖关系是:
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>bigquery-connector</artifactId>
<version>0.10.2-hadoop2</version>
<scope>provided</scope>
</dependency>
dataproc映像版本是1.1
编辑1:
我试过用 IndirectBigQueryOutputFormat
但现在我得到一个错误,说地面军事系统的输出路径已经存在,甚至在每个路径上传递不同的时间 saveAsNewAPIHadoopDataset
打电话。
这是我的代码:sparkconf sc=new sparkconf().setappname(“myapp”);
try (JavaSparkContext jsc = new JavaSparkContext(sc)) {
JavaPairRDD<String, String> filesJson = jsc.wholeTextFiles(jsonFolder, parts);
JavaPairRDD<String, String> jsons = filesJson.flatMapToPair(new FileSplitter()).repartition(parts);
JavaPairRDD<Object, JsonObject> objsJson = jsons.flatMapToPair(new JsonParser()).filter(t -> t._2() != null).cache();
objsJson
.filter(new FilterType(MSG_TYPE1))
.saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE1", "gs://my-bucket/tmp1"));
objsJson
.filter(new FilterType(MSG_TYPE2))
.saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE2", "gs://my-bucket/tmp2"));
objsJson
.filter(new FilterType(MSG_TYPE3))
.saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE3", "gs://my-bucket/tmp3"));
// here goes another ingestion process. same code as above but diferrent params, parsers, etc.
}
Configuration createConf(String table, String outGCS) {
Configuration conf = new Configuration();
BigQueryOutputConfiguration.configure(conf, table, null, outGCS, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, TextOutputFormat.class);
conf.set("mapreduce.job.outputformat.class", IndirectBigQueryOutputFormat.class.getName());
return conf;
}
1条答案
按热度按时间watbbzwu1#
我相信可能发生的是,每个Map器都试图创建自己的数据集。这是相当低效的(并且消耗了与Map绘制者数量成比例的每日配额)。
另一种方法是使用
IndirectBigQueryOutputFormat
对于输出类:indirectbigqueryoutputformat的工作原理是首先将所有数据缓冲到云存储临时表中,然后在commitjob上通过一个操作将云存储中的所有数据复制到bigquery中。建议将其用于大型作业,因为每个hadoop/spark作业只需要一个bigquery“load”作业,而bigqueryoutputformat为每个hadoop/spark任务执行一个bigquery作业。
请看下面的示例:https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example