如何在一行中左连接两个结构数组?

mzillmmw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(345)

我正在处理从facebook的ads insights api中提取的“动作分解”数据
facebook没有把 action (#购买)和 action_value ($amount of purchase)在同一列中,因此我需要根据操作的标识符(在我的例子中是id#+设备类型)加入我这边的那些。
如果每个操作只是它自己的行,那么用sql连接它们当然是微不足道的。但在这种情况下,我需要在每一行中连接两个结构。我想做的事相当于 LEFT JOIN 跨两个结构,在两列上匹配。理想情况下,我可以单独使用sql(而不是pyspark/scala/etc)。
到目前为止,我已经尝试了:
sparksql inline 发电机。这给了我每个操作各自的行,但是由于原始数据集中的父行没有唯一的标识符,因此无法按行连接这些结构。也尝试过使用 inline() 但一次只能使用一个“生成器”函数。
使用sparksql arrays_zip 函数来组合它们。但这不起作用,因为顺序并不总是一样的,它们有时没有相同的键。
我考虑写一本书 map 在pyspark中运行。但是map函数似乎只通过索引而不是名称来标识列,如果列以后需要更改(可能是在使用第三方api时),那么这看起来很脆弱。
我考虑过写一个pyspark自定义项,这似乎是最好的选择,但需要一个我没有的许可( SELECT on anonymous function ). 如果这真的是最好的选择,我会努力争取这个许可。
为了更好地说明:数据集中的每一行都有一个 actions 以及 action_values 包含如下数据的列。

actions = [
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.123",
    "value": "1"
  },
  {
    "action_device": "desktop", /* Same conversion ID; different device. */
    "action_type": "offsite_conversion.custom.321",
    "value": "1"
  },
  {
    "action_device": "iphone", /* Same conversion ID; different device. */
    "action_type": "offsite_conversion.custom.321",
    "value": "2"
  }
  {
    "action_device": "iphone", /* has "actions" but not "actions_values" */
    "action_type": "offsite_conversion.custom.789",
    "value": "1"
  },
]
action_values = [
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.123",
    "value": "49.99"
  },
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.321",
    "value": "19.99"
  },
  {
    "action_device": "iphone",
    "action_type": "offsite_conversion.custom.321",
    "value": "99.99"
  }
]

我希望每一行在一个结构中都有两个数据点,如下所示:

my_desired_result = [
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.123",
    "count": "1", /* This comes from the "action" struct */
    "value": "49.99" /* This comes from the "action_values" struct */
  },
  {
    "action_device": "desktop",
    "action_type": "offsite_conversion.custom.321",
    "count": "1",
    "value": "19.99"
  },
  {
    "action_device": "iphone",
    "action_type": "offsite_conversion.custom.321",
    "count": "2",
    "value": "99.99"
  },
  {
    "action_device": "iphone",
    "action_type": "offsite_conversion.custom.789",
    "count": "1",
    "value": null /* NULL because there is no value for conversion#789 AND iphone */
  }
]
5f0d552i

5f0d552i1#

iiuc,您可以尝试转换,然后使用筛选器从中查找第一个匹配项 action_values 通过匹配动作类型和动作设备:

df.printSchema()
root
 |-- action_values: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- action_device: string (nullable = true)
 |    |    |-- action_type: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- actions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- action_device: string (nullable = true)
 |    |    |-- action_type: string (nullable = true)
 |    |    |-- value: string (nullable = true)

df.createOrReplaceTempView("df_table")

spark.sql("""

  SELECT       
    transform(actions, x -> named_struct(
      'action_device', x.action_device,
      'action_type', x.action_type,
      'count', x.value,
      'value', filter(action_values, y -> y.action_device = x.action_device AND y.action_type = x.action_type)[0].value
    )) as result
  FROM df_table

""").show(truncate=False)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|result                                                                                                                                                                                                  |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[[desktop, offsite_conversion.custom.123, 1, 49.99], [desktop, offsite_conversion.custom.321, 1, 19.99], [iphone, offsite_conversion.custom.321, 2, 99.99], [iphone, offsite_conversion.custom.789, 1,]]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

更新:如果是完全联接,可以尝试以下sql:

spark.sql("""

  SELECT

  concat(
    /* actions left join action_values with potentially multiple matched values */
    flatten(
      transform(actions, x ->
        transform(
          filter(action_values, y -> y.action_device = x.action_device AND y.action_type = x.action_type),
          z -> named_struct(
            'action_device', x.action_device,
            'action_type', x.action_type,
            'count', x.value,
            'value', z.value
          )
        )
      )
    ),
    /* action_values missing from actions */
    transform(
      filter(action_values, x -> !exists(actions, y -> x.action_device = y.action_device AND x.action_type = y.action_type)),
      z -> named_struct(
        'action_device', z.action_device,
        'action_type', z.action_type,
        'count', NULL,
        'value', z.value
      )
    )
  ) as result

  FROM df_table

""").show(truncate=False)

相关问题