我在2.x版本的spark上使用pyspark。
我有两个sqlDataframe, df1
以及 df2
. df1
是具有相同标头名称的多个小型dfs的并集。
df1 = (
df1_1.union(df1_2)
.union(df1_3)
.union(df1_4)
.union(df1_5)
.union(df1_6)
.union(df1_7)
.distinct()
)
``` `df2` 没有相同的标头名称。
我试图实现的是创建一个新列,并根据条件用2个值填充它。但情况是 `if in the column of df1 you contain an element of an column of df2 then write A else B` 所以我试着这样做:
df1 = df1.withColumn(
"new_col",
when(df1["ColA"].substr(0, 4).contains(df2["ColA_a"]), "A").otherwise(
"B"
),
)
每个领域都是 `string` 类型。
我也试过用 `isin` 但错误是一样的。
注: `substr(0, 4)` 是因为在 `df1["ColA"]` 我只需要在我的领域4个字符匹配 `df2["ColA_a"]` .
py4j.protocol.py4jjavaerror:调用o660.select时出错:org.apache.spark.sql.analysisexception:operator中cola#438、colb#439缺少已解析属性cola#a#444!项目[含(cola#438,cola#444)和含(cola,cola#451)]。;;
我在网上读到的解决方案是:
克隆dfs
收集df并创建新的df(这里我们失去了spark的性能,这是非常悲哀的)
重命名具有相同名称或不同名称的列(命名含糊不清?)
编辑:这里是一些输入输出请求
df1
+-----+-----+-----+
| Col1| ColA| ColB|
+-----+-----+-----+
|value|3062x|value|
|value|2156x|value|
|value|3059x|value|
|value|3044x|value|
|value|2661x|value|
|value|2400x|value|
|value|1907x|value|
|value|4384x|value|
|value|4427x|value|
|value|2091x|value|
+-----+-----+-----+
df2
+------+------+
|ColA_a|ColB_b|
+------+------+
| 2156| GMVT7|
| 2156| JQL71|
| 2156| JZDSQ|
| 2050| GX8PH|
| 2050| G67CV|
| 2050| JFFF7|
| 2031| GCT5C|
| 2170| JN0LB|
| 2129| J2PRG|
| 2091| G87WT|
+------+------+
output
+-----+-----+-----+-------+
| Col1| ColA| ColB|new_col|
+-----+-----+-----+-------+
|value|3062x|value| B |
|value|2156x|value| A |
|value|3059x|value| B |
|value|3044x|value| B |
|value|2661x|value| B |
|value|2400x|value| B |
|value|1907x|value| B |
|value|4384x|value| B |
|value|4427x|value| B |
|value|2091x|value| A |
+-----+-----+-----+-------+
1条答案
按热度按时间uujelgoq1#
可以使用rlike join来确定该值是否存在于其他列中
如果不喜欢rlike join,可以在join中使用isin()方法。
结果是一样的