我在用动作 count()
触发udf函数运行。这是可行的,但在udf函数运行完成后很长时间,df.count()需要几天才能完成。Dataframe本身不是很大,大约有30k到100k行。
aws群集设置:
主节点1 m5.4xL
2 m5.4xlege用于工作节点。
spark变量和设置(这些是用于运行脚本的spark变量)
--执行器核心4
--conf spark.sql.execution.arrow.enabled=真
'spark.sql.inmemorycolumnarstorage.batchsize',2000000(在pyspark脚本中设置)
伪代码
这是我们脚本的实际结构。自定义udf函数为每一行调用postgres数据库。
from pyspark.sql.functions import pandas_udf, PandasUDFType
# udf_schema: A function that returns the schema for the dataframe
def main():
# Define pandas udf for calculation
# To perform this calculation, every row in the
# dataframe needs information pulled from our PostGres DB
# which does take some time, ~2-3 hours
@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def calculate_values(local_df):
local_df = run_calculation(local_df)
return local_df
# custom function that pulls data from our database and
# creates the dataframe
df = get_df()
df = df\
.groupBy('some_unique_id')\
.apply(calculate_values)
print(f'==> finished running calculation for {df.count()} rows!')
return
暂无答案!
目前还没有任何答案,快来回答吧!