在java中,基于列上的自定义函数在sparksql中删除重复行

pcrecxhr  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(385)

我正在尝试从java中sparksql的数据集中删除重复数据。我的数据集有三列。假设列的名称是 name, timestamp, and score . name是雇员姓名的字符串表示形式,timestamp是雇员执行的活动的long(epoch表示形式)。分数是代表员工分数的整数字段。
现在,假设我有以下数据集:

  1. Name --> timestamp --> scores
  2. John --> 1595239200000 --> 10
  3. John --> 1595242800000 --> 10
  4. Bob --> 1595246400000 --> 20
  5. John --> 1595239200000 --> 10

注意,在上面的数据集中,第一行和第四行是相同的。
当我在上面的数据集上使用distinct()函数时

  1. myDataset.distinct()

我得到的结果是

  1. Name --> timestamp --> scores
  2. John --> 1595239200000 --> 10
  3. John --> 1595242800000 --> 10
  4. Bob --> 1595246400000 --> 20

在这种情况下,第四行被消除,这是预期的行为。
我想要的是转换 timestamp 进入 yyyy-MM-dd 格式化,然后使用名称字段的组合删除重复项。因此,从原始数据集来看,第一行、第二行和第四行具有相同的日期值,即 2020-07-20 因为name=john。我只想给名字加一行='约翰'。
因此,从上面的数据集中删除重复行之后,结果数据集将变成

  1. Name --> timestamp --> scores
  2. John --> 1595239200000 --> 10
  3. Bob --> 1595246400000 --> 20

请注意,我没有任何约束来仅为相同的名称保留第一个时间戳。任何时间戳都适用于我,只要它们都属于同一个日期。
到目前为止我试过的是

  1. Dataset<Row> duplicateRemovedDataset = myDataset
  2. .dropDuplicates("Name", String.valueOf(functions.from_unixtime
  3. (functions.col("timestamp").divide(1000), "yyyy-MM-dd")));

但这让我产生了这个错误

  1. User class threw exception: org.apache.spark.sql.AnalysisException:
  2. Cannot resolve column name "from_unixtime((timestamp / 1000), yyyy-MM-dd)" among list of my column name

我该怎么做呢?
或者更一般地说,如何在调用 dropDuplicates 在数据集上?

tquggr8v

tquggr8v1#

您可以创建一个新的 column 你需要的日期格式和 drop 在您想要的列上重复,如下所示
对于java

  1. import static org.apache.spark.sql.functions.*;
  2. Dataset<Row> resultDF = df.withColumn("date", to_date(to_timestamp(df.col("Timestamp").divide(1000)), "yyyy-MM-dd"));
  3. resultDF.dropDuplicates("Name", "date")
  4. .drop("date")
  5. .show(false);

对于scala

  1. import org.apache.spark.sql.functions._
  2. val resultDF = df.withColumn("date", to_date(to_timestamp(col("Timestamp") / 1000), "yyyy-MM-dd"))
  3. resultDF.dropDuplicates("Name", "date")
  4. .drop("date")
  5. .show(false)

输出:

  1. +----+-------------+-----+
  2. |Name|Timestamp |score|
  3. +----+-------------+-----+
  4. |Bob |1595246400000|20 |
  5. |John|1595239200000|10 |
  6. +----+-------------+-----+
展开查看全部
ee7vknir

ee7vknir2#

试试这个:

  1. val myDataset = Seq(("John","1595239200000",10),
  2. ("John", "1595242800000" ,10),
  3. ("Bob", "1595246400000" ,20),
  4. ("John", "1595239200000" ,10)
  5. )
  6. .toDF("Name", "timestamp","score")
  7. myDataset.show()
  8. +----+-------------+-----+
  9. |Name| timestamp|score|
  10. +----+-------------+-----+
  11. |John|1595239200000| 10|
  12. |John|1595242800000| 10|
  13. | Bob|1595246400000| 20|
  14. |John|1595239200000| 10|
  15. +----+-------------+-----+
  16. import org.apache.spark.sql.functions.{col, to_date, to_timestamp}
  17. myDataset.withColumn("datestamp",to_date(from_unixtime($"timestamp" / 1000))).dropDuplicates("name","datestamp").show()
  18. +----+-------------+-----+----------+
  19. |name| timestamp|score| datestamp|
  20. +----+-------------+-----+----------+
  21. | Bob|1595246400000| 20|2020-07-20|
  22. |John|1595239200000| 10|2020-07-20|
  23. +----+-------------+-----+----------+
展开查看全部

相关问题