pyspark向每行添加新记录

hk8txs48  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(439)

这个问题在这里已经有答案了

如何在sparkDataframe中添加常量列(2个答案)
4个月前关门了。
我用的是spark 2.3.1。我从json文件中读取数据,有五条<class'pyspark.sql.types.row'>类型的记录,比如
行(年龄=24,工资ID=1,工资=2900)
我想在所有五个记录中添加一个新值,新值是这样的字典格式

{'age_condition':True,'salary_condition':True}

所以,现在新行应该是这样的
行(age=24,payloadid=1,salary=2900,result={'age\u condition':true,'salary\u condition':true})

u3r8eeie

u3r8eeie1#

这样怎么样?请注意 Result 列被视为字符串类型而不是dict。

import pyspark.sql.functions as f
from pyspark.sql.types import Row

row_list = [Row(age=24, payloadId=1, salary=2900)]
row_add = {'age_condition':True,'salary_condition':True}

spark.createDataFrame(row_list) \
  .withColumn('Result', f.lit(str(row_add))) \
  .collect()

[Row(age=24, payloadId=1, salary=2900, Result="{'age_condition': True, 'salary_condition': True}")]
w51jfk4q

w51jfk4q2#

我不知道你为什么要把字典添加到dataframe列中,从而使事情复杂化,你应该添加两个新列 age_condition 以及 salary_condition 而不是布尔类型。

这应该是你想要的。。。

from pyspark.sql.types import *

schema = StructType([StructField("dict", StructType([StructField("age_condition", BooleanType(), True), StructField("salary_condition", BooleanType(), True)]), True)])

newDf = spark.createDataFrame([{'age_condition':True,'salary_condition':True}], schema=schema)

df = spark.read.json("/whatever/json/path")

df.crossJoin(newDf) #no of records is same as in df as no of records in newDf is 1

相关问题