为Pyspark窗口函数分配每个窗口的相同uuid

pwuypxnk  于 2022-11-21  发布在  Spark
关注(0)|答案(1)|浏览(155)

我有这样一个Pyspark Dataframe :

data = [ {"master_record_id": "001-0073-683496",
         'dob': datetime.date(2000, 1, 1),
         "patient_ssn": "123456789",
         "dodid": "1234567891",
         "dqi_id":"123",
         "site_id":700},
        {"master_record_id": "001-0013-101321",
         'dob': datetime.date(2000, 1, 1),
         "patient_ssn": "123456789",
         "dodid": "1234567891",
         "dqi_id":"123",
         "site_id":701},
        {"master_record_id": "001-0046-2845712",
        'dob': datetime.date(1999, 2, 3),
         "patient_ssn": "987654321",
         "dodid": "0987654322",
         "dqi_id":None,
         "site_id":775},
        {"master_record_id": "001-0048-2845712",
        'dob': datetime.date(1999, 2, 3),
         "patient_ssn": "987654321",
         "dodid": "0987654322",
         "dqi_id":None,
         "site_id":775}] 

df = spark.createDataFrame(data=data)

我希望能够使用窗口函数为共享相同dodid、patient_ssn和dob的记录分配uuid。
目前我有一个可行的解决方案,但它不能扩展到数百万条记录。我相信是什么让它变慢了,在集群中循环,并创建了几个 Dataframe 。有没有办法直接在Window函数中分配一个uuid?我的可行但低效的解决方案如下:

# Filter out any recrods with null dob/patient_ssn/dodid
df = df.filter("dob is NOT NULL AND patient_ssn is NOT NULL AND dodid is NOT NULL")
# Create a cluster id based on dob/patient_ssn/dodid
window = Window.orderBy(["dob","patient_ssn","dodid"])
df = df.withColumn("cluster_id",lit(f.dense_rank().over(window)-1))
    
cluster_list = set(df.select("cluster_id").rdd.flatMap(lambda x: x).collect())    
df_list = []
# Iterate throuh clusters assigning uuid to each cluster. Each cluster will now be a new dataframe
for cluster in cluster_list:
    temp = df.filter(col("cluster_id")==cluster)
    df_list.append(temp.withColumn("uuid",f.lit(str(uuid.uuid1()))))
# Union all dataframes from df_list     
df = reduce(DataFrame.unionAll,df_list)
wfveoks0

wfveoks01#

我看不出使用Window函数的原因。连接所有三个“key”列,然后运行md5怎么样?
.withColumn('uuid',md5(连续字符串('多迪','病人_ssn ','多布')))

相关问题