pyspark 如何在结构化流中实现窗口函数?

p4tfgftt  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(173)

首先是一些背景,我最近不得不将我的管道从标准的阅读和写移动到结构化流,因为一些业务需求需要将我们所有的管道合并在一个中心位置。我所做的操作是,从source容器中读取数据,对数据进行一些转换,将其保存为staging中的表,然后在一些其他转换后将数据移动到analytics层。
我使用代码:

  1. df = spark.read.parquet(path)
  2. transformed_df = do_transformations(df, primary_keys, transformation_name)
  3. saveAsTable_tostaging() //

字符串
我更新了上面的代码:

  1. df = {spark.readStream.format.options.schema.load //}
  2. transformed_df = do_transformations(df, primary_keys, transformation_name)
  3. streaming_query = { transformed_df.writeStream.trigger ///}
  4. Pseudo code for read and write streams. Note, the readStream() and writeStream() operations
  5. are part of the central place and I cannot change them as other teams are also using it.
  6. I can only update my transformation() function. However, I am allowed to update the trigger details of the writeStream, which I changed to "once":True


对于我的用例,管道每天在4AM上运行一次。对于我的用例,没有实际的流作业,因为我们每天只获取一次数据,这就是为什么它仍然被视为批处理作业,运行一次并停止。在更改之后,几乎所有的转换仍然正常工作,整个读/写流程也正常工作,但有一个转换一直失败。
改造:在df中有一些特定的字段,让我们称它们为'additive-fields',对于所有keys,它们必须是summed。首先,我需要检查staging中的table already exists是否存在。如果存在,然后我需要读取staging表,并在相同字段上的staging tabledf的并集上执行sum操作,如果否,然后我只需要在df上执行sum操作,然后每个键的单个记录应该被发送到staging,其中包含这些'additive-fields'的总和
例如,设df =

  1. |Key1| Key2| Nm1 | Nm2 | AdditiveField1| AdditiveField2| Db_timestamp| source_ts|
  2. |----|-----|-----|-----|---------------|---------------|-------------|----------|
  3. |A |B | Car | Bike| 10 | -0.02 |20231201 |20231127 |
  4. |A |B |Taxi |Cycle|4 |-0.06 |20231201 |20231128 |
  5. |B |C |Xyz |ABC |1 |-20 |20231201 |20231128 |


Staging_table =

  1. |Key1|Key2|Nm1|Nm2|AdditiveField1|AdditiveField2|Db_timestamp|source_ts|
  2. |--- |----|---|---|--------------|--------------|------------|---------|
  3. | B | C |Old|PQR| 7 | 1 | 20231120 | 20231101|


Here, Key1, Key2 are the keys, NM* are the normal fields on which I just have to take the latest record sorted by source_ts, and the additive fields need to be summed.
所需输出为:

  1. |Key1|Key2|Nm1 | Nm2 |AdditiveField1|AdditiveField2|Db_timestamp|source_ts|
  2. |----|----|----|-----|--------------|--------------|------------|---------|
  3. | A | B |Taxi|Cycle| 14 | -0.08 | 20231201 | 20231128|
  4. | B | C |Xyz | ABC | 8 | -19 | 20231201 | 20231128|

编辑:抱歉,无法修复问题中的表结构,df、staging_df和输出如下https://prnt.sc/l3OuqsEWgGX-

当我使用标准读写时,我可以很容易地实现这一点:

  1. def implement_additive_fields(df, table_name, primary_keys: list, fields: list):
  2. if check_tableExists(table_name): # returns true if table exists in staging else false.
  3. stmt = f"select * from {table_name}"
  4. staging_df = spark.sql(stmt)
  5. df = df.union(staging_df)
  6. window = Window.partitionBy(primary_keys).orderBy('source_ts').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
  7. For i in fields: #these are the additive fields in the df
  8. df = df.withColumn(i, F.sum(i).over(window))
  9. df = df.withColumn('rank', F.row_number().over(window)).filter(F.col('rank')==1).drop(F.col('rank'))
  10. return df


在实现结构化流媒体之后,我明白我不能再在我的df上使用这些window functions,我需要使用structured streaming可用的window functions,其中有一些时间参数,但我无法理解如何实现它们,因为我仍然作为batch运行此作业(每天一次),然后它关闭。这些表没有实际的流作业,我想对来自staging表的记录以及流df进行sum

vshtjzan

vshtjzan1#

您可以使用foreachBatch来调用函数并执行转换。
像这样修改你的函数:

  1. def implement_additive_fields(df, table_name, primary_keys: list, fields: list):
  2. if check_tableExists(table_name):
  3. stmt = f"select * from {table_name}"
  4. staging_df = spark.sql(stmt)
  5. df = df.union(staging_df)
  6. window1 = Window.partitionBy(primary_keys).orderBy(F.desc('source_ts'))
  7. window2 = Window.partitionBy(primary_keys).orderBy('source_ts')
  8. for i in fields:
  9. df = df.withColumn(i, F.sum(i).over(window2))
  10. df = df.withColumn('rank', F.row_number().over(window1)).filter(F.col('rank')==1).drop(F.col('rank'))
  11. df.write.saveAsTable("tmp_staged")
  12. print("Done")

字符串
把它叫做:

  1. primary_keys=['Key1','Key2']
  2. fields=['AdditiveField1','AdditiveField2']
  3. stream_df = spark.readStream.schema(schema=schema).format("csv").option("header","true").load("/csvs/")
  4. stream_df.writeStream.option("checkpointlocation","tmpcheckpointloc").foreachBatch(lambda df,id: implement_additive_fields(df,table_name,primary_keys=primary_keys,fields=fields)).trigger(once=True).start().awaitTermination()


以下是使用的输入数据:
| Key1| Key2| NM1| NM2|添加剂字段1| AdditiveField2| DB_timestamp|源ts|
| --|--|--|--|--|--|--|--|
| 一|B|车|自行车| 10 | -0.02 | 20231201 | 20231127 |
| 一|B|出租车|周期| 4 | -0.06 | 20231201 | 20231128 |
| B| C| Xyz| ABC| 1 |-2.0| 2023431201 | 20231128 |
输出量:
x1c 0d1x的数据
| Key1| Key2| NM1| NM2|添加剂字段1| AdditiveField2| DB_timestamp|源ts|
| --|--|--|--|--|--|--|--|
| 一|B|出租车|周期| 14 | -0.08 | 20231201 | 20231128 |
| B| C| Xyz| ABC| 8 |-1个| 2023431201 | 20231128 |

展开查看全部

相关问题