在我的脚本中,我有一个有3列的表,分别是vehicleId
,tripStartDateTime
和correlationId
。表可以按vehicleId
列分区。
+------------------+--------------------+------------------------------------+
|vehicleId |tripStartDateTime |correlationId |
+------------------+--------------------+------------------------------------+
|00045b1b-0ac9-4dce|2023-07-26T16:35:34Z|1f036bb8-cac4-43c1-b29e-a7646884fe2e|
|00045b1b-0ac9-4dce|2023-07-26T17:27:38Z|134b785e-e013-41b1-aabc-094a90b95482|
|00045b1b-0ac9-4dce|2023-07-26T18:04:16Z|51fb0e53-2938-431c-8825-7f461849dfe3|
|00045b1b-0ac9-4dce|2023-07-26T18:32:46Z|954a4f96-2c51-403b-9fd5-d07a7cdc35dd|
|00045b1b-0ac9-4dce|2023-07-26T18:40:18Z|811a1336-27f3-4e8c-99cc-22f5debe21a3|
|8eba-55a058fb4dd0f|2023-07-20T10:35:34Z|1f036bff-cac4-dddd-ddsa-a7646884fe2e|
|8eba-55a058fb4dd0f|2023-07-20T10:65:34Z|23226bff-cac4-dddd-ddsa-a7646884fe2e|
...
字符串
在每一个vehicleId
(分区)里面,我想根据tripStartDateTime
一行一行的处理,每一行都会传递给一个自定义函数,在自定义函数里面,有复杂的计算,每一行的结果都会保存到另一个表里面,下一行会使用前面几行的结果,所以每一行都要按顺序处理。
如何编写高效的脚本,保证每个分区可以并发处理(因为一辆车不影响其他车),但在一个分区内,记录会按顺序一条一条处理?
我目前的解决方案是准备一个UDF,将每一行传递给UDF,然后在for循环中调用UDF。但这很慢,尽管我在Databricks中使用了多节点集群。
for (row <- df.collect()) {
processRowUDF(row)
}
型
谁能给予我一些关于如何优化它的建议?谢谢
2条答案
按热度按时间dxxyhpgq1#
您可以使用自定义的pandas udf类型
PandasUDFType.GROUPED_AGG
。它接受分组数据(在本例中按vehicleId分组)并返回列表列表,这些列表可以分解以插入到另一个表或创建另一个框架。下面是一个例子。字符串
输出量:
型
daolsyd02#
类似的方法使用scala spark和Dataset。有关于如何编写自定义用户定义聚合函数的指南。
https://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html
字符串