计算sparkDataframe中的新列,将df1中的tokens list列与df2中的text列与pyspark交叉

eagi6jfj  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(300)

我使用的是spark2.4.5,我需要从一个tokenlist列计算情绪得分( MeaningfulWords 列)的 df1 ,根据 df2 (西班牙语情感词典)。在 df1 我必须用记号的分数列表创建一个新列,并用每条记录的分数平均值(分数总和/计数字)创建另一个列。如果列表中有令牌( df1 )不在字典里( df2 ),得分为零。
Dataframe如下所示:

df1.select("ID","MeaningfulWords").show(truncate=True, n=5)
+------------------+------------------------------+
|                ID|               MeaningfulWords|
+------------------+------------------------------+
|abcde00000qMQ00001|[casa, alejado, buen, gusto...|
|abcde00000qMq00002|[clientes, contentos, servi...|
|abcde00000qMQ00003|                 [resto, bien]|
|abcde00000qMQ00004|[mal, servicio, no, antiend...|
|abcde00000qMq00005|[gestion, adecuada, proble ...|
+------------------+------------------------------+

df2.show(5)
+-----+----------+
|score|      word|
+-----+----------+
| 1.68|abandonado|
| 3.18|    abejas|
|  2.8|    aborto|
| 2.46| abrasador|
| 8.13|    abrazo|
+-----+----------+

要添加的新列 df1 ,应如下所示:

+------------------+---------------------+
|         MeanScore|            ScoreList|
+------------------+---------------------+
|              2.95|[3.10, 2.50, 1.28,...|
|              2.15|[1.15, 3.50, 2.75,...|
|              2.75|[4.20, 1.00, 1.75,...|
|              3.25|[3.25, 2.50, 3.20,...|
|              3.15|[2.20, 3.10, 1.28,...|
+------------------+---------------------+

我已经使用 .join ,但使用具有不同数据类型的列会产生错误。我还尝试将Dataframe转换为rdd并调用函数:

def map_words_to_values(review_words, dict_df):
return [dict_df[word] for word in review_words if word in dict_df]

RDD1=swRemoved.rdd.map(list) 
RDD2=Dict_df.rdd.map(list)

reviewsRDD_dict_values = RDD1.map(lambda tuple: (tuple[0], map_words_to_values(tuple[1], RDD2)))
reviewsRDD_dict_values.take(3)

但有了这个选项,我得到了一个错误:

PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

我找到了一些例子来评分文本使用 afinn 图书馆。但它不适用于西班牙语文本。
如果可能的话,我想尝试使用pyspark的本机函数,而不是使用udf来避免影响性能。但我是一个乞丐在星火,我想找到星火的方式做到这一点。

vecaoik1

vecaoik11#

你可以先加入 array_contains 好的,那么 groupBy 聚集了 first , collect_list ,和 mean .( spark2.4+ ) welcome to SO ```
df1.show()

+------------------+----------------------------+

|ID |MeaningfulWords |

+------------------+----------------------------+

|abcde00000qMQ00001|[casa, alejado, buen, gusto]|

|abcde00000qMq00002|[clientes, contentos, servi]|

|abcde00000qMQ00003|[resto, bien] |

+------------------+----------------------------+

df2.show()

+-----+---------+

|score| word|

+-----+---------+

| 1.68| casa|

| 2.8| alejado|

| 1.03| buen|

| 3.68| gusto|

| 0.68| clientes|

| 2.1|contentos|

| 2.68| servi|

| 1.18| resto|

| 1.98| bien|

+-----+---------+

from pyspark.sql import functions as F
df1.join(df2, F.expr("""array_contains(MeaningfulWords,word)"""),'left')
.groupBy("ID").agg(F.first("MeaningfulWords").alias("MeaningfullWords")
,F.collect_list("score").alias("ScoreList")
,F.mean("score").alias("MeanScore"))
.show(truncate=False)

+------------------+----------------------------+-----------------------+------------------+

|ID |MeaningfullWords |ScoreList |MeanScore |

+------------------+----------------------------+-----------------------+------------------+

|abcde00000qMQ00003|[resto, bien] |[1.18, 1.98] |1.58 |

|abcde00000qMq00002|[clientes, contentos, servi]|[0.68, 2.1, 2.68] |1.8200000000000003|

|abcde00000qMQ00001|[casa, alejado, buen, gusto]|[1.68, 2.8, 1.03, 3.68]|2.2975 |

+------------------+----------------------------+-----------------------+------------------+

相关问题