关于pyspark windows函数中的ntile函数

nfzehxib  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(529)

我正在运行以下代码段,

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import ntile
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )

columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)windowSpec  = 
Window.partitionBy("department").orderBy("salary")
df.withColumn("ntile",ntile(2).over(windowSpec)).show()

结果如下所示,我不明白ntile(2)是如何工作的,为什么第一行和第二行的nitle列值是 1 为什么第三排和第四排 1 以及 2 即使他们有相同的工资价值。我就是搞不懂这些列是怎么计算出来的?

thtygnil

thtygnil1#

ntile() 由sql定义以返回大小尽可能相等的平铺。这就产生了你所看到的结果——领带可以(任意)在不同的行中分开。
而不是 ntile() ,可以使用直接计算。这里有一种方法:

ceil(rank() over (partition by department order by salary) * 2.0 /
     count(*) over (partition by department)
    )

请注意,瓷砖的大小不一定相同——有些瓷砖可能会完全丢失。但是,领带都放在同一块瓷砖上。

mnowg1ta

mnowg1ta2#

ntile window函数用于将结果集分解为指定数量的近似相等的组或桶。ntile函数返回与每行相关联的bucket编号。整数的名称来源于将结果集划分为四分之一(四分位数)、十分之一(十分位数)等的实践。
如果行数不能被bucket整除,那么ntile函数将生成两个大小的组,其差值为1。较大的组总是按照ORDERBY子句中指定的顺序出现在较小的组之前。
您提供的示例将每个部门的员工分为两组:

Window.partitionBy("department").orderBy("salary")
df.withColumn("ntile",ntile(2).over(windowSpec)).show()

首先,partitionby将员工按部门名称划分为多个分区。
然后,orderby按工资对每个分区中的员工进行排序。
最后,ntile(2)函数为每个分区中的每一行分配一个bucket编号。每当部门发生变化时,它就会重置桶号。
现在我又介绍了一名员工 MariaZ 这将有助于更好地理解。

simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("MariaZ", "Sales", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )

columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("ntile",ntile(2).over(windowSpec)).show()

输出:

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        James|     Sales|  3000|    1|
|       MariaZ|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    2|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
+-------------+----------+------+-----+

相关问题