Spark:Union只能在具有兼容列类型的表上执行,Struct< name,id>!= Struct< id,name>

zzlelutf  于 2023-06-24  发布在  Apache
关注(0)|答案(4)|浏览(163)

错误:只能对具有兼容列类型的表执行联合。**struct(tier:string,skyward_number:string,skyward_points:string)<> struct(skyward_number:string,tier:string,skyward_points:string)**在第二表的第一列;;
这里结构体字段的顺序是不同的,但其余的都是相同的。
dataframe 1架构

root
 |-- emcg_uuid: string (nullable = true)
 |-- name: string (nullable = true)
 |-- phone_no: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- country: string (nullable = true)
 |-- travel_type: string (nullable = true)
 |-- gdpr_restricted_flg: string (nullable = false)
 |-- gdpr_reason_code: string (nullable = false)
 |-- document: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- skyward: struct (nullable = false)
 |    |-- tier: string (nullable = false)
 |    |-- skyward_number: string (nullable = false)
 |    |-- skyward_points: string (nullable = false)

dataframe2 schema
root
 |-- emcg_uuid: string (nullable = true)
 |-- name: string (nullable = true)
 |-- phone_no: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- country: string (nullable = true)
 |-- travel_type: string (nullable = true)
 |-- gdpr_restricted_flg: string (nullable = true)
 |-- gdpr_reason_code: string (nullable = true)
 |-- document: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- skyward: struct (nullable = false)
 |    |-- skyward_number: string (nullable = false)
 |    |-- tier: string (nullable = false)
 |    |-- skyward_points: string (nullable = false)

如何解决这个问题?

8fq7wneg

8fq7wneg1#

union的默认Spark行为是标准的SQL行为,所以是按位置匹配。这意味着,两个DataFrame中的模式必须包含相同的字段,并且相同的字段具有相同的顺序。
如果你想通过名称匹配schema,可以使用Spark 2.3中引入的unionByName
您也可以重新Map字段:

val df1 = ...
val df2 = /...
df1.toDF(df2.columns: _*).union(df2)

编辑:我现在看到编辑了。
您可以再次添加这些列:

import org.apache.spark.sql.functions._
val withCorrectedStruct = df1.withColumn("skyward", struct($"skyward_number", $"tier", $"skyward_points"))
tmb3ates

tmb3ates2#

//preserves the order the columns while doing union
  def getStructRecursiveDataFrame(df1 : DataFrame, df2 : DataFrame,columns : Array[String]) : DataFrame = {
    if(columns.isEmpty) {
      df2
    }
    else {
      println("test")
      val col_name = columns.head
      val col_schema = df1.schema.fields.find(_.name == col_name).get
      if(col_schema.dataType.typeName.equals("struct")){
        println("test1")
        val updatedStructNames: Seq[Column] = col_schema.dataType.asInstanceOf[StructType].fieldNames.map(name => col(col_name+"." + name))
        getStructRecursiveDataFrame(df1,df2.withColumn(col_name, struct(updatedStructNames: _*)),columns.tail)
      }
      else{ getStructRecursiveDataFrame(df1,df2,columns.tail)}
    }
  }

  def unionByName(a:  org.apache.spark.sql.DataFrame, b:  org.apache.spark.sql.DataFrame):  org.apache.spark.sql.DataFrame = {

    val b_new_df = getStructRecursiveDataFrame(a,b,a.columns)
    val columns_seq = a.columns.toSet.intersect(b_new_df.columns.toSet).map(col).toSeq
    a.select(columns_seq: _*).union(b_new_df.select(columns_seq: _*))
  }

成果

[INFO] DATAFRAME-1 SCHEME
root
 |-- emcg_uuid: string (nullable = true)
 |-- name: string (nullable = true)
 |-- phone_no: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- country: string (nullable = true)
 |-- travel_type: string (nullable = true)
 |-- gdpr_restricted_flg: string (nullable = false)
 |-- gdpr_reason_code: string (nullable = false)
 |-- document: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- skyward: struct (nullable = false)
 |    |-- tier: string (nullable = false)
 |    |-- skyward_number: string (nullable = false)
 |    |-- skyward_points: string (nullable = false)

[INFO] DATAFRAME-2 SCHEME
root
 |-- emcg_uuid: string (nullable = true)
 |-- name: string (nullable = true)
 |-- phone_no: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- country: string (nullable = true)
 |-- travel_type: string (nullable = true)
 |-- gdpr_restricted_flg: string (nullable = true)
 |-- gdpr_reason_code: string (nullable = true)
 |-- document: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- skyward: struct (nullable = false)
 |    |-- skyward_number: string (nullable = false)
 |    |-- tier: string (nullable = false)
 |    |-- skyward_points: string (nullable = false)

[INFO] DATAFRAME SCHEME AFTER THE UNION
root
 |-- skyward: struct (nullable = false)
 |    |-- skyward_number: string (nullable = false)
 |    |-- tier: string (nullable = false)
 |    |-- skyward_points: string (nullable = false)
 |-- name: string (nullable = true)
 |-- document: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- phone_no: string (nullable = true)
 |-- travel_type: string (nullable = true)
 |-- gdpr_restricted_flg: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gdpr_reason_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- emcg_uuid: string (nullable = true)

[INFO] TEST CASE FOR ANONYMIZATION VALIDATION
[INFO] INPUT DATA
+----+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
|name|phone_no  |travel_type|gdpr_restricted_flg|dob       |gdpr_reason_code|country|emcg_uuid|document                                   |skyward          |
+----+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
|ravi|8747436090|freq       |                   |1988-05-28|                |dubai  |uuid_1   |Map(document_type -> passport, id -> A3343)|[123456,blue,687]|
|aaaa|8747436091|freg       |                   |1988-06-25|                |europe |uuid_2   |Map(document_type -> passport, id -> A3341)|[123456,blue,687]|
|bbbb|8747436092|reg        |                   |1988-07-26|                |india  |uuid_3   |Map(document_type -> passport, id -> A3345)|[123456,blue,687]|
|cccc|8747436093|na         |                   |1988-08-27|                |georgia|uuid_4   |Map(document_type -> passport, id -> A3349)|[123456,blue,687]|
|dddd|8747436094|na         |                   |1988-09-29|                |swis   |uuid_5   |Map(document_type -> passport, id -> B3343)|[123456,blue,687]|
|null|8747436095|freq       |                   |1988-02-30|                |us     |uuid_6   |Map(document_type -> passport, id -> C3343)|[123456,blue,687]|
|null|8747436096|na         |                   |1988-01-01|                |canada |uuid_7   |Map(document_type -> null, id -> D3343)    |[123456,blue,687]|
+----+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+

[INFO] EXPECTED OUTPUT
+-------+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
|name   |phone_no  |travel_type|gdpr_restricted_flg|dob       |gdpr_reason_code|country|emcg_uuid|document                                   |skyward          |
+-------+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
|DDDDDDD|9999999   |freq       |Y                  |1988-05-XX|13-001          |XXXXXXX|uuid_1   |Map(document_type -> ZZZZZ, id -> HH343)   |[123456,blue,687]|
|aaaa   |8747436091|freg       |                   |1988-06-25|                |europe |uuid_2   |Map(document_type -> passport, id -> A3341)|[123456,blue,687]|
|DDDDDDD|9999999   |reg        |Y                  |1988-07-XX|13-001          |XXXXXXX|uuid_3   |Map(document_type -> ZZZZZ, id -> HH345)   |[123456,blue,687]|
|cccc   |8747436093|na         |                   |1988-08-27|                |georgia|uuid_4   |Map(document_type -> passport, id -> A3349)|[123456,blue,687]|
|dddd   |8747436094|na         |                   |1988-09-29|                |swis   |uuid_5   |Map(document_type -> passport, id -> B3343)|[123456,blue,687]|
|null   |8747436095|freq       |                   |1988-02-30|                |us     |uuid_6   |Map(document_type -> passport, id -> C3343)|[123456,blue,687]|
|null   |9999999   |na         |Y                  |1988-01-XX|13-001          |XXXXXXX|uuid_7   |Map(document_type -> null, id -> HH343)    |[123456,blue,687]|
+-------+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+

[INFO] ACTUAL OUTPUT
+-------+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
|name   |phone_no  |travel_type|gdpr_restricted_flg|dob       |gdpr_reason_code|country|emcg_uuid|document                                   |skyward          |
+-------+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
|DDDDDDD|9999999   |freq       |Y                  |1988-05-XX|13-001          |XXXXXXX|uuid_1   |Map(document_type -> ZZZZZ, id -> HH343)   |[UUUUU,blue,JJ7] |
|aaaa   |8747436091|freg       |                   |1988-06-25|                |europe |uuid_2   |Map(document_type -> passport, id -> A3341)|[123456,blue,687]|
|DDDDDDD|9999999   |reg        |Y                  |1988-07-XX|13-001          |XXXXXXX|uuid_3   |Map(document_type -> ZZZZZ, id -> HH345)   |[UUUUU,blue,JJ7] |
|cccc   |8747436093|na         |                   |1988-08-27|                |georgia|uuid_4   |Map(document_type -> passport, id -> A3349)|[123456,blue,687]|
|dddd   |8747436094|na         |                   |1988-09-29|                |swis   |uuid_5   |Map(document_type -> passport, id -> B3343)|[123456,blue,687]|
|null   |8747436095|freq       |                   |1988-02-30|                |us     |uuid_6   |Map(document_type -> passport, id -> C3343)|[123456,blue,687]|
|null   |9999999   |na         |Y                  |1988-01-XX|13-001          |XXXXXXX|uuid_7   |Map(document_type -> null, id -> HH343)    |[UUUUU,blue,JJ7] |
+-------+----------+-----------+-------------------+----------+----------------+-------+---------+-------------------------------------------+-----------------+
k4aesqcs

k4aesqcs3#

如果只有一个字段不同,并且名称已知(“skyward”),则可以解析为:

val data = List(("1", "2", "3"))
val bulkDF = data.toDF("emcg_uuid", "tier", "skyward_number")

// union parts
val tsDF = bulkDF.withColumn("skyward", struct($"tier", $"skyward_number"))
val stDF = bulkDF.withColumn("skyward", struct($"skyward_number", $"tier"))

// change struct "skyward" in last stDF
val schema = tsDF.schema.fields.find(_.name == "skyward").get
val updatedStructNames: Seq[Column] = schema.dataType.asInstanceOf[StructType].fieldNames.map(name => col("skyward." + name))
val withUpdatedSchema = stDF.withColumn("skyward", struct(updatedStructNames: _*))

// union
tsDF.union(withUpdatedSchema).show(false)

对于许多这样的结构字段,可以只使用一些循环。

ryevplcw

ryevplcw4#

我遇到了同样的问题“错误:只能对具有兼容列类型的表执行联合。
调查了以下可能导致该问题的可能性。

  1. dataset 1和dataset 2之间的模式不匹配-我的情况是匹配。
  2. dataset 1和dataset 2之间的列顺序-列顺序不匹配导致了问题。为了按照以下步骤维持数据集1和数据集2中的列顺序,
    打印dataset 1和dataset 2的模式,以了解dataset 1的当前列顺序,根据dataset 2对齐dataset 1的列,反之亦然。示例:datset 1(column 2,column 1)dataset 2(column 1,column 1)datset1.select(“column 1”,“column 2”)).union(dataset 2)-将排列列并解决问题。

相关问题