我将sparkDataframe和scala与如下Dataframe一起使用:
User Id | Date | Url
-------------------------------------------
1 |2020-08-30 | https://example.com/2
2 |2020-08-15 | https://example.com/1
1 |2020-08-01 | https://example.com/3
3 |2020-08-18 | https://example.com/1
1 |2020-08-02 | https://example.com/1
2 |2020-08-04 | https://example.com/2
1 |2020-08-22 | https://example.com/8
4 |2020-08-08 | https://example.com/8
1 |2020-08-29 | https://example.com/4
2 |2020-08-12 | https://example.com/6
1 |2020-08-01 | https://example.com/3
3 |2020-08-18 | https://example.com/7
1 |2020-08-03 | https://example.com/1
2 |2020-08-04 | https://example.com/2
1 |2020-08-23 | https://example.com/6
4 |2020-08-08 | https://example.com/5
...
(有点像访问日志)
我想按周和url对其进行分组,其中包含一些独特的用户,如:
URL |Week |Unique user count
------------------------------------------------------------------------
https://example.com/1 |today to today-7 days | 5
https://example.com/1 |today-7 days to today-14 days | 3
https://example.com/2 |today to today-7 days | 1
https://example.com/2 |today-7 days to today-14 days | 4
https://example.com/3 |today to today-7 days | 6
https://example.com/3 |today-7 days to today-14 days | 4
https://example.com/4 |today to today-7 days | 2
https://example.com/4 |today-7 days to today-14 days | 3
https://example.com/5 |today to today-7 days | 12
https://example.com/5 |today-7 days to today-14 days | 8
https://example.com/6 |today to today-7 days | 6
https://example.com/6 |today-7 days to today-14 days | 4
https://example.com/7 |today to today-7 days | 5
https://example.com/7 |today-7 days to today-14 days | 3
https://example.com/8 |today to today-7 days | 1
https://example.com/8 |today-7 days to today-14 days | 4
我对spark和dataframes还不熟悉,但我假设我想使用partitionby和windows。
如果任何用户一天内访问多次,我不想计算他们多次。
到目前为止,我尝试了:
val dfWithNewColumn= startingDf.withColumn("timeframe_indicator",
when((col("Date") >= sevenDaysAgoDate, "timeframe_1")
when((col("Date") < sevenDaysAgoDate)
&& (col("Date") >= fourteenDaysAgoDate), "timeframe_2")
otherwise("outside_timeframes"))
val dupesOut = dfWithNewColumn.dropDuplicates("Date", "User Id", "Url")
val grouped = dupesOut.groupBy("Url", "timeframe_indicator").count().withColumnRenamed("count", "Unique User Count")
它似乎在工作,但我希望看到如何使用一个窗口,如果可能的话。
暂无答案!
目前还没有任何答案,快来回答吧!