在java中,基于列上的自定义函数在sparksql中删除重复行

pcrecxhr  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(354)

我正在尝试从java中sparksql的数据集中删除重复数据。我的数据集有三列。假设列的名称是 name, timestamp, and score . name是雇员姓名的字符串表示形式,timestamp是雇员执行的活动的long(epoch表示形式)。分数是代表员工分数的整数字段。
现在,假设我有以下数据集:

Name --> timestamp       -->  scores 
John --> 1595239200000   -->  10  
John --> 1595242800000   -->  10
Bob  --> 1595246400000   -->  20
John --> 1595239200000   -->  10

注意,在上面的数据集中,第一行和第四行是相同的。
当我在上面的数据集上使用distinct()函数时

myDataset.distinct()

我得到的结果是

Name --> timestamp       -->  scores 
John --> 1595239200000   -->  10  
John --> 1595242800000   -->  10
Bob  --> 1595246400000   -->  20

在这种情况下,第四行被消除,这是预期的行为。
我想要的是转换 timestamp 进入 yyyy-MM-dd 格式化,然后使用名称字段的组合删除重复项。因此,从原始数据集来看,第一行、第二行和第四行具有相同的日期值,即 2020-07-20 因为name=john。我只想给名字加一行='约翰'。
因此,从上面的数据集中删除重复行之后,结果数据集将变成

Name --> timestamp       -->  scores 
John --> 1595239200000   -->  10  
Bob  --> 1595246400000   -->  20

请注意,我没有任何约束来仅为相同的名称保留第一个时间戳。任何时间戳都适用于我,只要它们都属于同一个日期。
到目前为止我试过的是

Dataset<Row> duplicateRemovedDataset = myDataset
                .dropDuplicates("Name", String.valueOf(functions.from_unixtime
                        (functions.col("timestamp").divide(1000), "yyyy-MM-dd")));

但这让我产生了这个错误

User class threw exception: org.apache.spark.sql.AnalysisException: 
Cannot resolve column name "from_unixtime((timestamp / 1000), yyyy-MM-dd)" among list of my column name

我该怎么做呢?
或者更一般地说,如何在调用 dropDuplicates 在数据集上?

tquggr8v

tquggr8v1#

您可以创建一个新的 column 你需要的日期格式和 drop 在您想要的列上重复,如下所示
对于java

import static org.apache.spark.sql.functions.*;
Dataset<Row> resultDF = df.withColumn("date", to_date(to_timestamp(df.col("Timestamp").divide(1000)), "yyyy-MM-dd"));

resultDF.dropDuplicates("Name", "date")
        .drop("date")
        .show(false);

对于scala

import org.apache.spark.sql.functions._
val resultDF = df.withColumn("date", to_date(to_timestamp(col("Timestamp") / 1000), "yyyy-MM-dd"))

resultDF.dropDuplicates("Name", "date")
  .drop("date")
  .show(false)

输出:

+----+-------------+-----+
|Name|Timestamp    |score|
+----+-------------+-----+
|Bob |1595246400000|20   |
|John|1595239200000|10   |
+----+-------------+-----+
ee7vknir

ee7vknir2#

试试这个:

val myDataset = Seq(("John","1595239200000",10),           
              ("John", "1595242800000" ,10),
             ("Bob", "1595246400000" ,20),
             ("John", "1595239200000" ,10)
            )
.toDF("Name", "timestamp","score")
myDataset.show()

+----+-------------+-----+
|Name|    timestamp|score|
+----+-------------+-----+
|John|1595239200000|   10|
|John|1595242800000|   10|
| Bob|1595246400000|   20|
|John|1595239200000|   10|
+----+-------------+-----+

import org.apache.spark.sql.functions.{col, to_date, to_timestamp}

myDataset.withColumn("datestamp",to_date(from_unixtime($"timestamp" / 1000))).dropDuplicates("name","datestamp").show()

+----+-------------+-----+----------+
|name|    timestamp|score| datestamp|
+----+-------------+-----+----------+
| Bob|1595246400000|   20|2020-07-20|
|John|1595239200000|   10|2020-07-20|
+----+-------------+-----+----------+

相关问题