dataproc冲突

sg2wtvxw  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(285)

我有一个在dataproc集群上为不同区域并行执行spark作业的流。它为每个区域创建一个集群,执行spark作业并在完成后删除集群。
spark作业使用 org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset 方法传递bigquery配置以在bigquery表上保存数据。作业将数据保存在多个表中,每个作业多次调用saveasnewapihadoopdataset方法。
问题是,有时我会收到一个由hadoop临时bigquery数据集中的冲突引起的错误,该数据集是它为运行作业而在内部创建的:

Exception in thread "main" com.google.api.client.googleapis.json.GoogleJsonResponseException: 409 Conflict
{
 "code" : 409,
 "errors" : [ {
   "domain" : "global",
   "message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013",
   "reason" : "duplicate"
 } ],
 "message" : "Already Exists: Dataset <my-gcp-project>:<MY-DATASET>_hadoop_temporary_job_201802250620_0013"
}
    at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:321)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1056)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.hadoop.io.bigquery.BigQueryOutputCommitter.setupJob(BigQueryOutputCommitter.java:107)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1150)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1078)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1078)
    at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:819)
    ...

上面异常的时间戳201802250620_有一个sufix,我不确定它是否代表时间。
我的想法是,有时作业同时运行,并尝试在名称中创建具有相同时间戳的数据集。在并行作业中或在另一个saveasnewapihadoopdataset调用的同一作业中。
我们如何避免这个错误而不延迟作业的执行?
我使用的依赖关系是:

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>bigquery-connector</artifactId>
    <version>0.10.2-hadoop2</version>
    <scope>provided</scope>
</dependency>

dataproc映像版本是1.1
编辑1:
我试过用 IndirectBigQueryOutputFormat 但现在我得到一个错误,说地面军事系统的输出路径已经存在,甚至在每个路径上传递不同的时间 saveAsNewAPIHadoopDataset 打电话。
这是我的代码:sparkconf sc=new sparkconf().setappname(“myapp”);

try (JavaSparkContext jsc = new JavaSparkContext(sc)) {
    JavaPairRDD<String, String> filesJson = jsc.wholeTextFiles(jsonFolder, parts);
    JavaPairRDD<String, String> jsons = filesJson.flatMapToPair(new FileSplitter()).repartition(parts);
    JavaPairRDD<Object, JsonObject> objsJson = jsons.flatMapToPair(new JsonParser()).filter(t -> t._2() != null).cache();

    objsJson
    .filter(new FilterType(MSG_TYPE1))
    .saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE1", "gs://my-bucket/tmp1"));

    objsJson
    .filter(new FilterType(MSG_TYPE2))
    .saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE2", "gs://my-bucket/tmp2"));

    objsJson
    .filter(new FilterType(MSG_TYPE3))
    .saveAsNewAPIHadoopDataset(createConf("my-project:MY_DATASET.MY_TABLE3", "gs://my-bucket/tmp3"));

    // here goes another ingestion process. same code as above but diferrent params, parsers, etc.
}

Configuration createConf(String table, String outGCS) {
  Configuration conf = new Configuration();
  BigQueryOutputConfiguration.configure(conf, table, null, outGCS, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, TextOutputFormat.class);
  conf.set("mapreduce.job.outputformat.class", IndirectBigQueryOutputFormat.class.getName());
  return conf;
}
watbbzwu

watbbzwu1#

我相信可能发生的是,每个Map器都试图创建自己的数据集。这是相当低效的(并且消耗了与Map绘制者数量成比例的每日配额)。
另一种方法是使用 IndirectBigQueryOutputFormat 对于输出类:
indirectbigqueryoutputformat的工作原理是首先将所有数据缓冲到云存储临时表中,然后在commitjob上通过一个操作将云存储中的所有数据复制到bigquery中。建议将其用于大型作业,因为每个hadoop/spark作业只需要一个bigquery“load”作业,而bigqueryoutputformat为每个hadoop/spark任务执行一个bigquery作业。
请看下面的示例:https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example

相关问题