如何使用pyspark在group by之后捕获单词的频率

xqkwcwgp  于 2023-10-15  发布在  Spark
关注(0)|答案(2)|浏览(109)

我有一个表格数据与键和值和键是不唯一的。例如:

+-----+------+
| key | value|
--------------
| 1   |  the |
| 2   |   i  |
| 1   |   me |
| 1   |   me |
| 2   | book |
| 1   |table |
+-----+------+

现在假设这个表分布在spark集群中的不同节点上。如何使用pyspark计算单词在不同键中的频率?例如,在上面的例子中,我希望输出:

+-----+------+-------------+
| key | value| frequencies |
---------------------------+
| 1   |  the | 1/4         |
| 2   |   i  | 1/2         |
| 1   |   me | 2/4         |
| 2   | book | 1/2         |
| 1   |table | 1/4         |
+-----+------+-------------+
fdx2calv

fdx2calv1#

不确定你是否可以将合并多层次操作与DF结合起来,但是在2个步骤中完成,并将concat留给你,这是可行的:

# Running in Databricks, not all stuff required
# You may want to do to upper or lowercase for better results.
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

data = [("1", "the"), ("2", "I"), ("1", "me"),
        ("1", "me"), ("2", "book"), ("1", "table")]
rdd = sc.parallelize(data)
someschema = rdd.map(lambda x: Row(c1=x[0], c2=x[1]))
df = sqlContext.createDataFrame(someschema)

df1 = df.groupBy("c1", "c2") \
        .count() 

df2 = df1.groupBy('c1') \
         .sum('count')

df3 = df1.join(df2,'c1')
df3.show()

返回:

+---+-----+-----+----------+
| c1|   c2|count|sum(count)|
+---+-----+-----+----------+
|  1|table|    1|         4|
|  1|  the|    1|         4|
|  1|   me|    2|         4|
|  2|    I|    1|         2|
|  2| book|    1|         2|
+---+-----+-----+----------+

你可以重新格式化最后2秒,但我很好奇,如果我们可以做所有在1去。在普通的SQL中,我们会使用内联视图和合并。
这适用于集群标准,Spark通常都是关于什么的。groupBy会考虑到这一切。
次要编辑
因为外面很热,所以我更深入地研究了一下。这是一个很好的概述:http://stevendavistechnotes.blogspot.com/2018/06/apache-spark-bi-level-aggregation.html .在阅读了这篇文章并进行了实验之后,我无法让它更优雅,将输出减少到5行似乎是不可能的。

wj8zmpe1

wj8zmpe12#

另一个可行的选择是窗口函数。首先,定义每个values-keys和for key的出现次数。然后只需添加另一列分数(您将减少分数)

from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from fractions import Fraction
from pyspark.sql.functions import udf

@udf (StringType())
def getFraction(frequency):
    return str(Fraction(frequency))

schema = StructType([StructField("key", IntegerType(), True),
                     StructField("value", StringType(), True)])

data = [(1, "the"), (2, "I"), (1, "me"),
        (1, "me"), (2, "book"), (1, "table")]

spark = SparkSession.builder.appName('myPython').getOrCreate()
input_df = spark.createDataFrame(data, schema)

(input_df.withColumn("key_occurrence",
            F.count(F.lit(1)).over(Window.partitionBy(F.col("key"))))
.withColumn("value_occurrence", F.count(F.lit(1)).over(Window.partitionBy(F.col("value"), F.col('key'))))
.withColumn("frequency", getFraction(F.col("value_occurrence"), F.col("key_occurrence"))).dropDuplicates().show())

相关问题