我想在databricks中创建一个缓慢变化的维度。我的源Dataframe包含以下信息。
+-------------------+-------------------------+----------+-----------+-------------+
| actionimmediately | date | deviceid | patchguid | status |
+-------------------+-------------------------+----------+-----------+-------------+
| False | 2018-08-15 04:01:00.000 | 123 | 00-001 | Install |
| True | 2018-08-16 00:00:00.000 | 123 | 00-001 | Install |
| False | 2018-08-10 01:00:00.000 | 123 | 00-001 | Not Approved|
| False | 2020-01-01 00:00:00.000 | 333 | 11-111 | Declined |
+-------------------+-------------------------+----------+-----------+-------------+
我希望作为输出的Dataframe如下所示:
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
| mergekey | deviceid | patchguid | status | actionimmediately | starttime | endtime | current |
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
| 12300-001 | 123 | 00-001 | Not Approved | False | 2018-08-10 01:00:00.000 | 2018-08-15 04:01:00.000 | False |
| 12300-001 | 123 | 00-001 | Install | False | 2018-08-15 04:01:00.000 | 2018-08-16 00:00:00.000 | False |
| 12300-001 | 123 | 00-001 | Install | True | 2018-08-16 00:00:00.000 | null | True |
| 33311-111 | 333 | 11-111 | Declined | False | 2020-01-01 00:00:00.000 | null | True |
+-----------+----------+-----------+--------------+-------------------+-------------------------+-------------------------+---------+
实际上,sourcefile包含275475行
我已经试过两种解决方案,但都很慢。比如+-10小时。
解决方案1:使用三角洲湖合并
首先,我创建一个seqid,稍后用于迭代。这是因为合并不能多次更新同一行。我正在用一个窗口创建seqid。
source_df = source_df.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))
w1 = Window.partitionBy('mergekey').orderBy('date')
source_df = source_df.withColumn('seqid', row_number().over(w1))
然后创建一个for循环,该循环在每个seqid上运行并合并行。实际上,max\ seq\ id是1900
def createTable (df, SeqId):
df\
.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.where(col('seqid') == SeqId)\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))\
.write.format('delta')\
.partitionBy("current")\
.options(header='true',path='/mnt/destinationncentral/patch_approval')\
.saveAsTable('patch_approval')
def MergePatchApproval (df,deltatable,seqNummer):
dataframe = df.where(col('seqid') == seqNummer)
newToInsert = dataframe.alias('updates')\
.join(deltatable.toDF().alias('table'),['deviceid','patchguid'])\
.select(\
'updates.actionimmediately',\
'updates.date',\
'updates.deviceid',\
'updates.patchguid',\
'updates.status',\
'updates.seqid')\
.where('table.current = true and \
(table.status <> updates.status or table.actionimmediately <> updates.actionimmediately)')
stagedUpdates = (newToInsert.selectExpr('NULL as mergekey','*')\
.union(dataframe\
.withColumn('mergekey',concat(col('deviceid'),col('patchguid')))\
.select(\
'mergekey',\
'actionimmediately',\
'date',\
'deviceid',\
'patchguid',\
'status',\
'seqid')))
deltatable.alias('t')\
.merge(stagedUpdates.alias('s'),'t.current = true and t.mergekey = s.mergekey')\
.whenMatchedUpdate(condition = 't.current = true and \
(t.status <> s.status or t.actionimmediately <> s.actionimmediately)', \
set = {
'endtime':'s.date',
'current':'false'
}).whenNotMatchedInsert(values = {
'mergekey':'s.mergekey',
'deviceid':'s.deviceid',
'patchguid':'s.patchguid',
'status':'s.status',
'actionimmediately':'s.actionimmediately',
'starttime':'s.date',
'endtime':'NULL',
'current':'true'
}).execute()
for i in range(max_seq_id):
i = i + 1
print(i)
df = source_df.where(col('seqid') == i)
if(i == 1):
tablecount = spark.sql("show tables like 'patch_approval'").count()
if(tablecount == 0):
createTable(df,i)
approval_table = DeltaTable.forPath(spark,'/mnt/destinationncentral/patch_approval')
else:
approval_table = DeltaTable.forPath(spark,'/mnt/destinationncentral/patch_approval')
MergePatchApproval(df,approval_table,i)
else:
MergePatchApproval(df,approval_table,i)
这个解决方案的问题是,在azuredatalake上写数据需要一些时间,我认为这是正常的,但是每次迭代的执行时间都在增加。
解决方案2:向上插入Dataframe,最后写一次
在这个解决方案中,我也使用for循环和seqid,但是除了将每个循环都写入azuredatalake之外,我只在最后才这样做。此解决方案解决了写入延迟问题,但每个循环结束的时间仍在增加。
def createDestDF(sourceDF):
dest_df = sourceDF\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))
return dest_df
def getChangedRecords(sourceDF,destDF):
changedRecords = sourceDF.alias('u')\
.join(destDF.alias('t'),['deviceid','patchguid'])\
.select(\
'u.actionimmediately',\
'u.date',\
'u.deviceid',\
'u.patchguid',\
'u.status',\
'u.seqid',\
'u.mergekey')\
.where('t.current = true and \
(t.status <> u.status or t.actionimmediately <> u.actionimmediately)')
return changedRecords
def getNewRecords(sourceDF,destDF):
newRecords = sourceDF.alias('n')\
.join(destDF.alias('t'),['deviceid','patchguid'],'left')\
.select(\
't.mergekey',\
'n.actionimmediately',\
'n.date',\
'deviceid',\
'patchguid',\
'n.status',\
'n.seqid')\
.where('t.current is null')
return newRecords
def upsertChangedRecords(sourceDF,destDF):
endTimeColumn = expr("""IF(endtimeOld IS NULL, date, endtimeOld)""")
currentColumn = expr("""IF(date IS NULL, currentOld, False)""")
updateDF = sourceDF.alias('s').join(destDF.alias('t'),'mergekey','right').select(\
'mergekey',\
't.deviceid',\
't.patchguid',\
't.status',\
't.actionimmediately',\
't.starttime',\
's.date',\
col('t.current').alias('currentOld'),\
col('t.endTime').alias('endtimeOld'))\
.withColumn('endtime',endTimeColumn)\
.withColumn('current',currentColumn)\
.drop('currentOld','date','endTimeOld')
updateInsertDF = sourceDF\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))
resultDF = updateDF.union(updateInsertDF)
return resultDF
def insertNewRecords(sourceDF, destDF):
insertDF = sourceDF\
.select(\
'mergekey',\
'deviceid',\
'patchguid',\
'status',\
'actionimmediately',\
col('date').alias('starttime'))\
.withColumn('endtime',lit(None).cast('timestamp'))\
.withColumn('current',lit(True))
resultDF = destDF.union(insertDF)
return resultDF
for i in range(max_seq_id):
i = i + 1
print(i)
seq_df = source_df.where(col('seqid') == i)
if i == 1:
tablecount = spark.sql("show tables like 'patch_approval'").count()
if(tablecount == 0):
dest_df = createDestDF(seq_df)
else:
changed_df = getChangedRecords(seq_df,dest_df)
new_df = getNewRecords(seq_df,dest_df)
dest_df = upsertChangedRecords(changed_df,dest_df)
dest_df = insertNewRecords(new_df,dest_df)
else:
changed_df = getChangedRecords(seq_df,dest_df)
new_df = getNewRecords(seq_df,dest_df)
dest_df = upsertChangedRecords(changed_df,dest_df)
dest_df = insertNewRecords(new_df,dest_df)
dest_df\
.write\
.format('delta')\
.partitionBy('current')\
.mode('overwrite')\
.options(header='true',path='/mnt/destinationncentral/patch_approval')\
.saveAsTable('patch_approval')
知道如何解决for循环中执行时间增加的问题吗?
谨致问候,
1条答案
按热度按时间9avjhtql1#
据我所知,行不会随着时间的推移而从源表中消失—如果是这样,可以通过将sparkDataframe放入临时视图并对其编写查询来解决问题:
它应该是非常快的结果,在准确的输出你想要的。我在你的样本数据上查过了,之后的数据显示: