scala—计算由窗口函数分区的给定列的不同值,而不使用approx\u count\u distinct()

bis0qfac  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(283)

我有以下Dataframe:

val df1 = Seq(("Roger","Rabbit", "ABC123"), ("Roger","Rabit", "ABC123"),("Roger","Rabbit", "ABC123"), ("Trevor","Philips","XYZ987"), ("Trevor","Philips","XYZ987")).toDF("first_name", "last_name", "record")

+----------+---------+------+
|first_name|last_name|record|
+----------+---------+------+
|Roger     |Rabbit   |ABC123|
|Roger     |Rabit    |ABC123|
|Roger     |Rabbit   |ABC123|
|Trevor    |Philips  |XYZ987|
|Trevor    |Philips  |XYZ987|
+----------+---------+------+

我想按列对这个Dataframe中的记录进行分组 record . 然后我想在野外寻找异常 first_name 以及 last_name ,对于具有相同 record 价值观。
到目前为止,我发现最好的方法是 approx_count_distinct :

val wind_person = Window.partitionBy("record")

df1.withColumn("unique_fields",cconcat($"first_name",$"last_name"))
.withColumn("anomaly",capprox_count_distinct($"unique_fields") over wind_person)
.show(false)    

+----------+---------+------+-------------+-------+
|first_name|last_name|record|unique_fields|anomaly|
+----------+---------+------+-------------+-------+
|Roger     |Rabbit   |ABC123|RogerRabbit  |2      |
|Roger     |Rabbit   |ABC123|RogerRabbit  |2      |
|Roger     |Rabit    |ABC123|RogerRabit   |2      |
|Trevor    |Philips  |XYZ987|TrevorPhilips|1      |
|Trevor    |Philips  |XYZ987|TrevorPhilips|1      |
+----------+---------+------+-------------+-------+

检测到异常的地方是 anomaly 列大于1。
问题在于 approx_count_distinct 我们得到的只是一个近似值,我不确定我们能有多大信心它总是返回一个准确的计数。
一些额外信息:
Dataframe可能包含超过5亿条记录
Dataframe以前是基于 record
对于每个不同的 record ,不超过15行
使用安全 approx_count_distinct 在这种情况下,100%的准确率或有更好的窗口功能在Spark实现这一点?

tgabmvqs

tgabmvqs1#

你可以得到确切的答案 countDistinct 在Windows上用一些 dense_rank 操作:

val df2 = df1.withColumn(
    "unique_fields", 
    concat($"first_name",$"last_name")
).withColumn(
    "anomaly", 
    dense_rank().over(Window.partitionBy("record").orderBy("unique_fields")) + 
    dense_rank().over(Window.partitionBy("record").orderBy(desc("unique_fields"))) 
    - 1
)

df2.show
+----------+---------+------+-------------+-------+
|first_name|last_name|record|unique_fields|anomaly|
+----------+---------+------+-------------+-------+
|     Roger|    Rabit|ABC123|   RogerRabit|      2|
|     Roger|   Rabbit|ABC123|  RogerRabbit|      2|
|     Roger|   Rabbit|ABC123|  RogerRabbit|      2|
|    Trevor|  Philips|XYZ987|TrevorPhilips|      1|
|    Trevor|  Philips|XYZ987|TrevorPhilips|      1|
+----------+---------+------+-------------+-------+
xhv8bpkk

xhv8bpkk2#

你可以用 collect_setunique_fields 透过Windows wind_person 得到它的大小,它等于该字段的计数:

df1.withColumn("unique_fields", concat($"first_name", $"last_name"))
  .withColumn("anomaly", size(collect_set($"unique_fields").over(wind_person)))
  .show

//+----------+---------+------+-------------+-------+
//|first_name|last_name|record|unique_fields|anomaly|
//+----------+---------+------+-------------+-------+
//|Roger     |Rabbit   |ABC123|RogerRabbit  |2      |
//|Roger     |Rabit    |ABC123|RogerRabit   |2      |
//|Roger     |Rabbit   |ABC123|RogerRabbit  |2      |
//|Trevor    |Philips  |XYZ987|TrevorPhilips|1      |
//|Trevor    |Philips  |XYZ987|TrevorPhilips|1      |
//+----------+---------+------+-------------+-------+

相关问题