pyspark 合并具有不同字段的数组结构的两列

xa9qqrwz  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(181)

我有一个架构的框架

  1. StructType(
  2. [
  3. StructField('product_id', IntegerType(), True),
  4. StructField('tenant_id', IntegerType(), True),
  5. StructField("materials", ArrayType(StructType([
  6. StructField('id', IntegerType(), True),
  7. StructField('percentage', FloatType(), True)]
  8. ))),
  9. StructField("elastic", ArrayType(StructType([
  10. StructField('id', IntegerType(), True),
  11. StructField('name', MapType(StringType(), StringType()), True)
  12. ])))
  13. ])

字符串
我想合并这两个结构体,以便有一个新的3个字段,id,百分比和名称,其中materials.id = elastic.id喜欢:

  1. StructType(
  2. [
  3. StructField('product_id', IntegerType(), True),
  4. StructField('tenant_id', IntegerType(), True),
  5. StructField("materials", ArrayType(StructType([
  6. StructField('id', IntegerType(), True),
  7. StructField('percentage', FloatType(), True),
  8. StructField('name', MapType(StringType(), StringType()), True)]
  9. )))
  10. ])


基本上,
我想通过这个Before and expected result
我尝试过udf和结果,但在性能方面不是最好的方法。

  1. @udf(returnType=ArrayType(StructType([
  2. StructField("id", IntegerType(), False),
  3. StructField('percentage', FloatType(), True),
  4. StructField('name', MapType(StringType(), StringType()), True)
  5. ])))
  6. def expand_list(materials, elastic):
  7. final = []
  8. for k in materials:
  9. for i in elastic:
  10. if k.id == i.id:
  11. final += [{'id': k.id, 'percentage': k.percentage, 'name': i.name}]
  12. return final

nle07wnf

nle07wnf1#

使用transform遍历第一个数组,然后使用filter在第二个数组中查找相应的条目:

  1. from pyspark.sql import functions as F
  2. # some testdata
  3. testdata="""
  4. {"product_id": 1, "tenant_id": 1, "materials": [{"id": 1, "percentage": 0.1}, {"id": 3, "percentage": 0.3}, {"id": 2, "percentage": 0.2}], "elastic": [{"id": 1, "name": "one"},{"id":2, "name": "two"}] }
  5. """
  6. df = spark.read.json(spark.sparkContext.parallelize([testdata]))
  7. # create a new column with the merged array
  8. df.withColumn("merged_materials", F.expr("""
  9. transform(materials, m -> named_struct(
  10. 'id', m.id,
  11. 'percentage', m.percentage,
  12. 'name', filter(elastic, e -> e.id == m.id)[0].name)
  13. )
  14. """
  15. )).show(vertical=True, truncate=False)

字符串
输出量:

  1. -RECORD 0----------------------------------------------------------
  2. elastic | [{1, one}, {2, two}]
  3. materials | [{1, 0.1}, {3, 0.3}, {2, 0.2}]
  4. product_id | 1
  5. tenant_id | 1
  6. merged_materials | [{1, 0.1, one}, {3, 0.3, null}, {2, 0.2, two}]

展开查看全部

相关问题