使用dataproc写入bigquery在使用spark bigquery连接器时很慢

sr4lhrrt  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(582)

我们有一个spark流应用程序,它从pubsub读取数据并应用一些转换,然后将javadstream转换为dataset,然后将结果写入bigquery normalize表。
下面是示例代码。所有normalize表都在currenttimestamp列上分区。有什么参数可以用来提高写性能吗?

  1. pubSubMessageDStream
  2. .foreachRDD(new VoidFunction2<JavaRDD<PubSubMessageSchema>, Time>() {
  3. @Override
  4. public void call(JavaRDD<PubSubMessageSchema> v1, Time v2) throws Exception {
  5. Dataset<PubSubMessageSchema> pubSubDataSet = spark.createDataset(v1.rdd(), Encoders.bean(PubSubMessageSchema.class));
  6. ---
  7. ---
  8. ---
  9. for (Row payloadName : payloadNameList) {
  10. Dataset<Row> normalizedDS = null;
  11. if(payloadNameAList.contains(payloadName) {
  12. normalizedDS = dataSet.filter(col(colA.equalTo(<Value>)));
  13. } else if(payloadNameBList.contains(payloadName) {
  14. normalizedDS = dataSet.filter(col(colA.equalTo(<Value>)));
  15. }
  16. normalizedDS.selectExpr(columnsBigQuery).write().format("bigquery")
  17. .option("temporaryGcsBucket", gcsBucketName)
  18. .option("table", tableName)
  19. .option("project", projectId)
  20. .option("parentProject", parentProjectId)
  21. .mode(SaveMode.Append)
  22. .save();
  23. }
  24. }
  25. }
qv7cva1a

qv7cva1a1#

写入bigquery需要写入gcs,然后触发bigquery加载作业。你可以试着改变主意 intermediateFormat 查看它是否影响性能-从我们的测试来看,更好的格式取决于模式和数据大小。
此外,在即将发布的连接器版本0.19.0中,有一个针对spark 2.4的datasource v2 api的写实现,它应该可以将性能提高10-15%。

相关问题