pyspark -如何向ArrayType列添加新元素

svmlkihl  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(233)

ItemStruct = StructType([StructField("BomId", StringType()), StructField("price", StringType())])
BomStruct = StructType([StructField("OrderId",StringType()), StructField("items", ArrayType(ItemStruct))])
sampledata_sof = [Row("123-A", [Row("Bom-11", 120), Row("Bom-12", 140)]), Row("100-A", [Row("Bom-23", 170), Row("Bom-24", 190)])]

dfSampleBom = spark.createDataFrame(spark.sparkContext.parallelize(sampledata_sof), BomStruct)
dfSampleBom.printSchema()
dfSampleBom.show()```

  1. 字符串
  2. [Output from jupyter notebook](https://i.stack.imgur.com/XzBhG.png)
  3. 问:给定上述结构,如何实现以下内容?如果Bom-11items中,则添加item Bom-99price $99)。预期输出:OrderId = 123-A的行应在items列表中包含{Bom-9999}。换句话说,希望生成并有条件地添加一个或几个元素到items ArrayType列中。
  4. 尝试使用

df.rdd.map(lambda x: generateItems(x))

  1. 但得到了错误 *pyspark.errors.exceptions.basePySparkRuntimeError:[CONTEXT_ONLY_VALID_ON_DRIVER]似乎您正在尝试从广播变量、操作或转换引用SparkContextSparkContext只能在驱动程序上使用,而不能在它在worker上运行的代码中使用。有关详细信息,请参阅SPARK-5063。*
  2. DF中的项目数量为1000个,因此希望有一个可以激发的解决方案,可以本地分发和有效处理。(阅读UDF可能无法跨工作节点分发,因此不确定这是否是一个选项)
ztmd8pv5

ztmd8pv51#

您可以先使用filter来确定items是否有Bom-11,然后使用array_insertconcat将结构体插入到现有数组中。

Pyspark 3.4+

  1. item_to_ingest = F.struct(F.lit('Bom-99').alias('BomId'), F.lit(99).alias('price'))
  2. df = (dfSampleBom.select(
  3. 'OrderId',
  4. F.when(F.size(F.filter('items', lambda x: x['BomId'] == 'Bom-11')) > 0,
  5. F.array_insert('items', -1, item_to_ingest))
  6. .otherwise(F.col('items')).alias('items')))

字符串

Pyspark 3.1+

  1. item_to_ingest = F.struct(F.lit('Bom-99').alias('BomId'), F.lit(99).alias('price'))
  2. df = (dfSampleBom.select(
  3. 'OrderId',
  4. F.when(F.size(F.filter('items', lambda x: x['BomId'] == 'Bom-11')) > 0,
  5. F.concat('items', F.array(item_to_ingest)))
  6. .otherwise(F.col('items')).alias('items')))

展开查看全部

相关问题