我有以下Dataframe:
val df1 = Seq(("Roger","Rabbit", "ABC123"), ("Roger","Rabit", "ABC123"),("Roger","Rabbit", "ABC123"), ("Trevor","Philips","XYZ987"), ("Trevor","Philips","XYZ987")).toDF("first_name", "last_name", "record")
+----------+---------+------+
|first_name|last_name|record|
+----------+---------+------+
|Roger |Rabbit |ABC123|
|Roger |Rabit |ABC123|
|Roger |Rabbit |ABC123|
|Trevor |Philips |XYZ987|
|Trevor |Philips |XYZ987|
+----------+---------+------+
我想按列对这个Dataframe中的记录进行分组 record
. 然后我想在野外寻找异常 first_name
以及 last_name
,对于具有相同 record
价值观。
到目前为止,我发现最好的方法是 approx_count_distinct
:
val wind_person = Window.partitionBy("record")
df1.withColumn("unique_fields",cconcat($"first_name",$"last_name"))
.withColumn("anomaly",capprox_count_distinct($"unique_fields") over wind_person)
.show(false)
+----------+---------+------+-------------+-------+
|first_name|last_name|record|unique_fields|anomaly|
+----------+---------+------+-------------+-------+
|Roger |Rabbit |ABC123|RogerRabbit |2 |
|Roger |Rabbit |ABC123|RogerRabbit |2 |
|Roger |Rabit |ABC123|RogerRabit |2 |
|Trevor |Philips |XYZ987|TrevorPhilips|1 |
|Trevor |Philips |XYZ987|TrevorPhilips|1 |
+----------+---------+------+-------------+-------+
检测到异常的地方是 anomaly
列大于1。
问题在于 approx_count_distinct
我们得到的只是一个近似值,我不确定我们能有多大信心它总是返回一个准确的计数。
一些额外信息:
Dataframe可能包含超过5亿条记录
Dataframe以前是基于 record
柱
对于每个不同的 record
,不超过15行
使用安全 approx_count_distinct
在这种情况下,100%的准确率或有更好的窗口功能在Spark实现这一点?
2条答案
按热度按时间tgabmvqs1#
你可以得到确切的答案
countDistinct
在Windows上用一些dense_rank
操作:xhv8bpkk2#
你可以用
collect_set
的unique_fields
透过Windowswind_person
得到它的大小,它等于该字段的计数: