使用Spark Java使用不同结构的Dataframe中的数据写入BigQuery表

kmbjn2e3  于 2022-11-16  发布在  Apache
关注(0)|答案(1)|浏览(160)

我正在构建一个ETL管道,该管道从BQ中读取数据,执行转换,然后写入另一个具有完全不同表结构的BQ表。Map不是1对1,并且在管道运行期间会生成一些字段。
这是包含数据的现有 Dataframe 的模式(它是两个单独BQ表的连接):

|-- message_ID: string (nullable = false)
     |-- msg_priority: string (nullable = true)
     |-- subject: string (nullable = true)
     |-- sender: string (nullable = false)
     |-- jsonColumn: struct (nullable = true)
     |    |-- UserInfo: struct (nullable = true)
     |    |    |-- AccountName: string (nullable = true)
     |    |    |-- AccountNumber: long (nullable = true)
     |    |    |-- CorporateEmailAddress: string (nullable = true)
     |    |    |-- FirstName: string (nullable = true)
     |    |    |-- LastName: string (nullable = true)
     |-- REGION_NAME: string (nullable = true)
     |-- COUNTRY_CODE: string (nullable = true)

当前空的BQ表的模式是:

|-- email_id: string (nullable = false)
     |-- particpant: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- participant_type: string (nullable = false)
     |    |    |-- participant_email: string (nullable = false)  
     |-- priority: string (nullable = true)
     |-- subject: string (nullable = true)

有没有使用Spark来指定我想写入每一列的内容?我看到的文档中的每个示例都显示了如何将结果写入匹配的表或基于您正在编写的Dataframe生成表。然而,我受到现有表结构的限制。
在测试可能的解决方案时,我尝试从表1中只选择“主题”,并直接写入表2,如下所示:

Dataset<Row> table1 = spark.sql(
                "SELECT subject FROM joinedTable"
        );

     
        table1.write().format("bigquery")
                .option("table", "outputTable")
                .option("writeMethod", "direct")
                .mode(SaveMode.Append)
                .save();

然而这给了我错误:

java.lang.IllegalArgumentException: com.google.cloud.bigquery.connector.common.BigQueryConnectorException$InvalidSchemaException: Destination table's schema is not compatible with dataframe's schema

那么,这是不是告诉我,我写入表的Dataframe必须完全匹配?如果是这样的话,怎么可能实现我正在做的事情呢?

vhipe2zx

vhipe2zx1#

BigQuery中的DataFrame和目标表应该具有相同的模式。由于这是应用程序逻辑的一部分,BigQuery和spark-bigquery-connector都无法执行此模式转换。下面是一个开始:

Dataset<Row> table1 = spark.sql(
  "SELECT message_ID as email_id, ??? as participant, msg_priority as priority, subject FROM joinedTable"
);

table1.printSchema();
     
table1.write().format("bigquery")
  .option("table", "outputTable")
  .option("writeMethod", "direct")
  .mode(SaveMode.Append)
  .save();

我还没有创建参与者,因为我不确定如何将jsonColumn转换为参与者数组。
请注意printSchema()调用-请在日志中验证table 1的模式是否与BigQuery中的模式相同。

相关问题