ItemStruct = StructType([StructField("BomId", StringType()), StructField("price", StringType())])
BomStruct = StructType([StructField("OrderId",StringType()), StructField("items", ArrayType(ItemStruct))])
sampledata_sof = [Row("123-A", [Row("Bom-11", 120), Row("Bom-12", 140)]), Row("100-A", [Row("Bom-23", 170), Row("Bom-24", 190)])]
dfSampleBom = spark.createDataFrame(spark.sparkContext.parallelize(sampledata_sof), BomStruct)
dfSampleBom.printSchema()
dfSampleBom.show()```
字符串
[Output from jupyter notebook](https://i.stack.imgur.com/XzBhG.png)
问:给定上述结构,如何实现以下内容?如果Bom-11在items中,则添加item Bom-99(price $99)。预期输出:OrderId = 123-A的行应在items列表中包含{Bom-99,99}。换句话说,希望生成并有条件地添加一个或几个元素到items ArrayType列中。
尝试使用
df.rdd.map(lambda x: generateItems(x))
型
但得到了错误 *pyspark.errors.exceptions.base。PySparkRuntimeError:[CONTEXT_ONLY_VALID_ON_DRIVER]似乎您正在尝试从广播变量、操作或转换引用SparkContext。SparkContext只能在驱动程序上使用,而不能在它在worker上运行的代码中使用。有关详细信息,请参阅SPARK-5063。*
DF中的项目数量为1000个,因此希望有一个可以激发的解决方案,可以本地分发和有效处理。(阅读UDF可能无法跨工作节点分发,因此不确定这是否是一个选项)
1条答案
按热度按时间ztmd8pv51#
您可以先使用
filter
来确定items
是否有Bom-11
,然后使用array_insert
或concat
将结构体插入到现有数组中。Pyspark 3.4+
字符串
Pyspark 3.1+
型