pyspark dataframe withcolumn命令不起作用

0sgqnhkj  于 2021-06-24  发布在  Hive
关注(0)|答案(3)|浏览(566)

我有一个输入Dataframe:df\u input(更新的df\u input)

|comment|inp_col|inp_val|
|11     |a      |a1     |
|12     |a      |a2     |
|15     |b      |b3     |
|16     |b      |b4     |
|17     |c      |&b     |
|17     |c      |c5     |
|17     |d      |&c     |
|17     |d      |d6     |
|17     |e      |&d     |
|17     |e      |e7     |

我想将inp\u val列中的变量替换为它的值。我已尝试使用以下代码创建一个新列。
获取以“&”开头的值列表

df_new = df_inp.select(inp_val).where(df.inp_val.substr(0, 1) == '&')

现在我遍历这个列表,将“&”列值数据替换为原始列表。

for a in [row[inp_val] for row in df_new.collect()]
   df_inp = df_inp.withColumn
                 (
                   'new_col', 
                   when(df.inp_val.substr(0, 1) == '&', 
                   [row[inp_val] for row in df.select(df.inp_val).where(df.inp_col == a[1:]).collect()])
                   .otherwise(df.inp_val)
                 )

但是,我得到的错误如下:

Java.lang.RuntimeException: Unsupported literal tpe class java.util.ArrayList [[5], [6]]

基本上我希望输出如下。请检查并让我知道错误在哪里???。我想根据上面的代码插入两种类型的数据类型值??
更新的代码行:

tst_1 = tst.withColumn("col3_extract", when(tst.col3.substr(0, 1) == '&', regexp_replace(tst.col3, "&", "")).otherwise(""))

# Select which values need to be replaced; withColumnRenamed will also solve spark self join issues

# The substring search can also be done using regex function

tst_filter=tst.where(~F.col('col3').contains('&')).withColumnRenamed('col2','col2_collect')

# For the selected data, perform a collect list

tst_clct = tst_filter.groupby('col2_collect').agg(F.collect_list('col3').alias('col3_collect'))

# %% Join the main table with the collected list

tst_join = tst_1.join(tst_clct,on=tst_1.col3_extract==tst_clct.col2_collect,how='left').drop('col2_collect')

# %% In the column3  replace the values such as a, b

tst_result = tst_join.withColumn("result",F.when(~F.col('col3').contains('&'),F.array(F.col('col3'))).otherwise(F.col('col3_collect')))

但是,上面的代码不适用于多次迭代
更新的预期输出:

|comment|inp_col|inp_val|new_col                      |
|11     |a      |a1     |['a1']                       |
|12     |a      |a2     |['a2']                       |
|15     |b      |b3     |['b3']                       |
|16     |b      |b4     |['b4']                       |
|17     |c      |&b     |['b3', 'b4']                 |
|18     |c      |c5     |['c5']                       |
|19     |d      |&c     |['b3', 'b4', 'c5']           |
|20     |d      |d6     |['d6']                       |
|21     |e      |&d     |['b3', 'b4', 'c5', 'd6']     |
|22     |e      |e7     |['e7']                       |
f0ofjuux

f0ofjuux1#

你可以简单地使用 regex_replace 这样地:

df.withColumn("new_col", regex_replace(col("inp_val"), "&", ""))
5vf7fwbs

5vf7fwbs2#

你能试用这个方法吗。你的方法可能会遇到很多问题。

import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window

# Test data

tst = sqlContext.createDataFrame([(1,'a','3'),(1,'a','4'),(1,'b','5'),(1,'b','7'),(2,'c','&b'),(2,'c','&a'),(2,'d','&b')],schema=['col1','col2','col3'])

# extract the special character out

tst_1 = tst.withColumn("col3_extract",F.substring(F.col('col3'),2,1))

# Selecct which values need to be replaced; withColumnRenamed will also solve spark self join issues

# The substring search can also be done using regex function

tst_filter=tst.where(~F.col('col3').contains('&')).withColumnRenamed('col2','col2_collect')

# For the selected data, perform a collect list

tst_clct = tst_filter.groupby('col2_collect').agg(F.collect_list('col3').alias('col3_collect'))

# %% Join the main table with the collected list

tst_join = tst_1.join(tst_clct,on=tst_1.col3_extract==tst_clct.col2_collect,how='left').drop('col2_collect')

# %% In the column3  replace the values such as a, b

tst_result = tst_join.withColumn("result",F.when(~F.col('col3').contains('&'),F.array(F.col('col3'))).otherwise(F.col('col3_collect')))

结果:

+----+----+----+------------+------------+------+
|col1|col2|col3|col3_extract|col3_collect|result|
+----+----+----+------------+------------+------+
|   2|   c|  &a|           a|      [3, 4]|[3, 4]|
|   2|   c|  &b|           b|      [7, 5]|[7, 5]|
|   2|   d|  &b|           b|      [7, 5]|[7, 5]|
|   1|   a|   3|            |        null|   [3]|
|   1|   a|   4|            |        null|   [4]|
|   1|   b|   5|            |        null|   [5]|
|   1|   b|   7|            |        null|   [7]|
+----+----+----+------------+------------+------+
xghobddn

xghobddn3#

试试这个, self-joincollected listrlike join condition 是一条路要走。

df.show() #sampledataframe

# +-------+---------+---------+

# |comment|input_col|input_val|

# +-------+---------+---------+

# |     11|        a|        1|

# |     12|        a|        2|

# |     15|        b|        5|

# |     16|        b|        6|

# |     17|        c|       &b|

# |     17|        c|        7|

# +-------+---------+---------+

df.join(df.groupBy("input_col").agg(F.collect_list("input_val").alias("y1"))\
          .withColumnRenamed("input_col","x1"),F.expr("""input_val rlike x1"""),'left')\
  .withColumn("new_col", F.when(F.col("input_val").cast("int").isNotNull(), F.array("input_val"))\
                    .otherwise(F.col("y1"))).drop("x1","y1").show()

# +-------+---------+---------+-------+

# |comment|input_col|input_val|new_col|

# +-------+---------+---------+-------+

# |     11|        a|        1|    [1]|

# |     12|        a|        2|    [2]|

# |     15|        b|        5|    [5]|

# |     16|        b|        6|    [6]|

# |     17|        c|       &b| [5, 6]|

# |     17|        c|        7|    [7]|

# +-------+---------+---------+-------+

相关问题