pyspark:创建包含when和contains/isin的列

66bbxpm5  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(405)

我在2.x版本的spark上使用pyspark。
我有两个sqlDataframe, df1 以及 df2 . df1 是具有相同标头名称的多个小型dfs的并集。

df1 = (
    df1_1.union(df1_2)
    .union(df1_3)
    .union(df1_4)
    .union(df1_5)
    .union(df1_6)
    .union(df1_7)
    .distinct()
)
``` `df2` 没有相同的标头名称。
我试图实现的是创建一个新列,并根据条件用2个值填充它。但情况是 `if in the column of df1 you contain an element of an column of df2 then write A else B` 所以我试着这样做:

df1 = df1.withColumn(
"new_col",
when(df1["ColA"].substr(0, 4).contains(df2["ColA_a"]), "A").otherwise(
"B"
),
)

每个领域都是 `string` 类型。
我也试过用 `isin` 但错误是一样的。
注: `substr(0, 4)` 是因为在 `df1["ColA"]` 我只需要在我的领域4个字符匹配 `df2["ColA_a"]` .
py4j.protocol.py4jjavaerror:调用o660.select时出错:org.apache.spark.sql.analysisexception:operator中cola#438、colb#439缺少已解析属性cola#a#444!项目[含(cola#438,cola#444)和含(cola,cola#451)]。;;
我在网上读到的解决方案是:
克隆dfs
收集df并创建新的df(这里我们失去了spark的性能,这是非常悲哀的)
重命名具有相同名称或不同名称的列(命名含糊不清?)
编辑:这里是一些输入输出请求

df1
+-----+-----+-----+
| Col1| ColA| ColB|
+-----+-----+-----+
|value|3062x|value|
|value|2156x|value|
|value|3059x|value|
|value|3044x|value|
|value|2661x|value|
|value|2400x|value|
|value|1907x|value|
|value|4384x|value|
|value|4427x|value|
|value|2091x|value|
+-----+-----+-----+

df2
+------+------+
|ColA_a|ColB_b|
+------+------+
| 2156| GMVT7|
| 2156| JQL71|
| 2156| JZDSQ|
| 2050| GX8PH|
| 2050| G67CV|
| 2050| JFFF7|
| 2031| GCT5C|
| 2170| JN0LB|
| 2129| J2PRG|
| 2091| G87WT|
+------+------+

output
+-----+-----+-----+-------+
| Col1| ColA| ColB|new_col|
+-----+-----+-----+-------+
|value|3062x|value| B |
|value|2156x|value| A |
|value|3059x|value| B |
|value|3044x|value| B |
|value|2661x|value| B |
|value|2400x|value| B |
|value|1907x|value| B |
|value|4384x|value| B |
|value|4427x|value| B |
|value|2091x|value| A |
+-----+-----+-----+-------+

uujelgoq

uujelgoq1#

可以使用rlike join来确定该值是否存在于其他列中

df1=sqlContext.createDataFrame([
('value',3062,'value'),
('value',2156,'value'),
('value',3059,'value'),
('value',3044,'value'),
('value',2661,'value'),
('value',2400,'value'),
('value',1907,'value'),
('value',4384,'value'),
('value',4427,'value'),
('value',2091,'value')
],schema=['Col1', 'ColA', 'ColB'])

df2 =sqlContext.createDataFrame([
(2156, 'GMVT7'),
(  2156, 'JQL71'),
(  2156, 'JZDSQ'),
(  2050, 'GX8PH'),
(  2050, 'G67CV'),
(  2050, 'JFFF7'),
(  2031, 'GCT5C'),
(  2170, 'JN0LB'),
(  2129, 'J2PRG'),
(  2091, 'G87WT')],schema=['ColA_a','ColB_b'])

# %%

df_join = df1.join(df2.select('ColA_a').distinct(),F.expr("""ColA rlike ColA_a"""),how = 'left')
df_fin = df_join.withColumn("new_col",F.when(F.col('ColA_a').isNull(),'B').otherwise('A'))

df_fin.show()
+-----+----+-----+------+-------+
| Col1|ColA| ColB|ColA_a|new_col|
+-----+----+-----+------+-------+
|value|3062|value|  null|      B|
|value|2156|value|  2156|      A|
|value|3059|value|  null|      B|
|value|3044|value|  null|      B|
|value|2661|value|  null|      B|
|value|2400|value|  null|      B|
|value|1907|value|  null|      B|
|value|4384|value|  null|      B|
|value|4427|value|  null|      B|
|value|2091|value|  2091|      A|
+-----+----+-----+------+-------+

如果不喜欢rlike join,可以在join中使用isin()方法。

df_join = df1.join(df2.select('ColA_a').distinct(),F.col('ColA').isin(F.col('ColA_a')),how = 'left')
df_fin = df_join.withColumn("new_col",F.when(F.col('ColA_a').isNull(),'B').otherwise('A'))

结果是一样的

相关问题