pyspark 查找分区上30到365天之间的事件

9cbw7uwe  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(172)

我有DF,其中包括患者ID和患者接受医疗程序的日期。我需要过滤DF,以仅包括至少接受过两次手术的患者,两次手术间隔30到365天。我只需要保留患者ID和符合时间范围标准的第一次手术。
原始DF:
| 患者ID|日期|
| --|--|
| 一|18年3月1日|
| 一|18年3月15日|
| B| 19年4月1日|
| B| 19年4月4日|
| B| 19年4月7日|
| B| 19年6月3日|
和滤波后的DF:
| 患者ID|日期|
| --|--|
| B| 19年4月7日|
这是我试过的代码.

  1. w=Window.partitionBy("Pat_ID").orderBy(col("date"))
  2. for i in range(1, 366):
  3. df = df.withColumn(f"daysbetween_{i}", when ((datediff((F.lead(F.col('dx_date'), i).over(w)), "dx_date").between(30, 365)),1).otherwise(0))

字符串

oxiaedzo

oxiaedzo1#

  • 编辑:使用范围连接条件添加 *

我这样做的总体方法是:
1.通过患者ID和range join conditions将表连接到自身
1.计算每一行的成对日期差
1.按患者ID分组,最早日期的汇总

然而,这种方法的主要问题是你可能会耗尽内存,这取决于你的数据集;因为你可能会为每个病人的N程序生成N^2行。我已经添加了过滤器来只生成有效的日期对(在30-365天范围内),但是如果你的数据集真的很大,它 * 仍然有可能耗尽内存 *。
以下是我的方法:

首先,模拟数据集

  1. from datetime import datetime
  2. import pyspark.sql.functions as F
  3. from pyspark.sql.window import Window as W
  4. df = spark.createDataFrame([
  5. ("A", datetime(2018,3,1))
  6. ,("A", datetime(2018,3,15))
  7. ,("B", datetime(2019,4,1))
  8. ,("B", datetime(2019,4,4))
  9. ,("B", datetime(2019,4,7))
  10. ,("B", datetime(2019,6,3))
  11. ,("C", datetime(2018,1,1))
  12. ,("D", datetime(2018,1,1))
  13. ,("D", datetime(2018,3,1))
  14. ,("D", datetime(2020,1,1))
  15. ]
  16. ,schema="patient_id string, procedure_date date"
  17. )
  18. df.show()

个字符
接下来,我创建DataFrame filter_df,根据最大范围筛选潜在患者。这将筛选出:

  • 少于2次手术的患者
  • 两次最远手术间隔不到30天的患者

我不能使用between(30,365)max_range <= 365,因为即使最大-最小范围超过365天的限制,也可能存在最小-最大对之间的差异小于365天的日期对。
但是,反之则不然。如果最大-最小范围小于30天,则不能有另一个范围大于30天的日期对。

  1. filter_df = df.groupBy("patient_id")\
  2. .agg(
  3. F.min("procedure_date").alias("first_date")
  4. ,F.max("procedure_date").alias("latest_date")
  5. )\
  6. .withColumn("max_range", F.date_diff("latest_date", "first_date"))\
  7. .filter(F.col("max_range") >= 30)
  8. filter_df.show()
  1. +----------+----------+-----------+---------+
  2. |patient_id|first_date|latest_date|max_range|
  3. +----------+----------+-----------+---------+
  4. | B|2019-04-01| 2019-06-03| 63|
  5. | D|2018-01-01| 2020-01-01| 730|
  6. +----------+----------+-----------+---------+

的字符串
使用inner join应用过滤器:

  1. df = df.join(
  2. filter_df.select("patient_id")
  3. ,on="patient_id"
  4. ,how="inner"
  5. )
  6. df.show()
  1. +----------+--------------+
  2. |patient_id|procedure_date|
  3. +----------+--------------+
  4. | B| 2019-04-01|
  5. | B| 2019-04-04|
  6. | B| 2019-04-07|
  7. | B| 2019-06-03|
  8. | D| 2018-01-01|
  9. | D| 2018-03-01|
  10. | D| 2020-01-01|
  11. +----------+--------------+

的字符串
然后,我通过患者id和range join conditions(基本上,将date_diff().between()条件添加到join(on=...)参数) 将这个过滤后的结果与其本身连接起来。为此,我必须重命名患者id和日期列以区分它们。
这为我提供了所有患者的所有有效日期对。
使用范围连接条件,我避免了生成4^2 + 3^2 = 25行的二次行为,而是只得到日期范围(30,365)天内的4个有效日期对。
无论你使用F.date_diff("date_b", "date_a")还是F.date_diff("date_a", "date_b")都没有关系--成对差分是对称的,你会得到相同的结果。负结果也没有关系,原因也是一样的。

  1. df = df\
  2. .withColumnRenamed("patient_id", "id_a")\
  3. .withColumnRenamed("procedure_date", "date_a")\
  4. .join(
  5. df\
  6. .withColumnRenamed("patient_id", "id_b")\
  7. .withColumnRenamed("procedure_date", "date_b")
  8. ,on=[
  9. F.col("id_a") == F.col("id_b")
  10. ,F.date_diff("date_b", "date_a").between(30, 365)
  11. ]
  12. ,how="inner"
  13. )\
  14. .withColumn("date_diff", F.date_diff("date_b", "date_a"))
  15. df.show()
  1. +----+----------+----+----------+---------+
  2. |id_a| date_a|id_b| date_b|date_diff|
  3. +----+----------+----+----------+---------+
  4. | B|2019-04-01| B|2019-06-03| 63|
  5. | B|2019-04-04| B|2019-06-03| 60|
  6. | B|2019-04-07| B|2019-06-03| 57|
  7. | D|2018-01-01| D|2018-03-01| 59|
  8. +----+----------+----+----------+---------+

然后我按患者ID分组,并选择min("date_a")以获得符合此条件的 * 第一个程序 *。
但是我注意到我的代码发现患者B的1 April 2019是答案,而不是您的示例7 April 2019结果。
根据你对“第一次”的理解,你需要修改最后一个代码块来得到你的答案。

  1. df = df\
  2. .groupBy("id_a")\
  3. .agg(F.min("date_a").alias("procedure_date"))\
  4. .select(
  5. F.col("id_a").alias("patient_id")
  6. ,"procedure_date"
  7. )
  8. df.show()
  1. +----------+--------------+
  2. |patient_id|procedure_date|
  3. +----------+--------------+
  4. | B| 2019-04-01|
  5. | D| 2018-01-01|
  6. +----------+--------------+
展开查看全部
iq0todco

iq0todco2#

一种使用内置Spark函数而不使用joins的方法(出于性能原因):
1.将数据按patient_idcollect all dates into a list分组。
1.对于每个日期列表,使用this answer生成所有可能的日期对。

  1. Filter out不符合标准的日期对长于30天且短于365天。
  2. Sort剩余的对,使得具有最小开始日期的对是列表的第一个元素。
    1.第一对的第一个日期是预产期。
  1. from pyspark.sql import functions as F
  2. df = ...
  3. df.groupBy("patient_id").agg(F.collect_list("procedure_date")
  4. .alias("procedure_dates"))\
  5. .withColumn("procedure_dates",
  6. F.filter(
  7. F.transform(
  8. F.flatten(F.transform(
  9. c:="procedure_dates",
  10. lambda x: F.arrays_zip(F.array_repeat(x, F.size(c)), c)
  11. )),
  12. lambda x: F.array(x["0"], x[c])
  13. ),
  14. lambda x: x[0] < x[1]
  15. ))\
  16. .withColumn("procedure_dates", F.filter("procedure_dates",
  17. lambda col: F.datediff(col[1], col[0]).between(30, 365))) \
  18. .withColumn("procedure_dates", F.sort_array("procedure_dates")) \
  19. .withColumn("result", F.col("procedure_dates")[0][0]) \
  20. .drop("procedure_dates") \
  21. .show()

字符串
测试结果:

  1. +----------+----------+
  2. |patient_id| result|
  3. +----------+----------+
  4. | A| null|
  5. | B|2019-04-01|
  6. +----------+----------+

展开查看全部

相关问题