我有一个pysparkDataframe,如下所示:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import udf
schema = T.StructType([ # schema
T.StructField("id", T.StringType(), True),
T.StructField("code", T.ArrayType(T.StringType()), True)])
df = spark.createDataFrame([{"id": "1", "code": ["a1", "a2","a3","a4"]},
{"id": "2", "code": ["b1","b2"]},
{"id": "3", "code": ["c1","c2","c3"]},
{"id": "4", "code": ["d1", "b3"]}],
schema=schema)
输出
df.show()
| id| code|
|---|----------------|
| 1|[a1, a2, a3, a4]|
| 2| [b1, b2]|
| 3| [c1, c2, c3]|
| 4| [d1, b3]|
我希望能够通过提供一个列和列表给函数来过滤行,如果有任何interestection,则返回true(使用disjoint from here,因为将有许多非命中)
def lst_intersect(data_lst,query_lst):
return not set(data_lst).isdisjoint(query_lst)
lst_intersect_udf = F.udf(lambda x,y: lst_intersect(x,y), T.BooleanType())
当我试着用这个的时候
query_lst = ['a1','b3']
df = df.withColumn("code_found", lst_intersect_udf(F.col('code'),F.lit(query_lst)))
获取以下错误
Unsupported literal type class java.util.ArrayList [a1, b3]
我可以通过改变函数等来解决这个问题,但是我想知道我是否有一些基本的错误 F.lit(query_lst)
?
1条答案
按热度按时间bq8i3lrv1#
lit
只接受一个值,不接受python列表。您需要传入一个数组列,其中包含列表中的文本值,例如使用列表理解。也就是说,如果spark>=2.4,那么也可以使用sparksql函数
arrays_overlap
要获得更好的性能: