azure 如何在一个需要并行运行的函数中对pyspark对象框架执行操作?

igsr9ssn  于 2023-10-22  发布在  Spark
关注(0)|答案(1)|浏览(128)

我在Azure Databricks notebook中创建了一个函数,它有一个基本的转换(对于这个问题,我简化了我的函数),如下所示:

spark = SparkSession.builder.appName('Notebook').getOrCreate()
calculation(account, data):

   data_account = spark.createDataFrame(data)

   return data_account

在这个函数中,输入data是一个Pandas嵌套框,而account只是一个字符串。
现在我想并行运行这个函数。为此,我创建了一个列表,元组作为输入数据,看起来像:

input_data = [(account_nr1, df1), (account_nr2, df2), (account_nr2, df2), ...]

接下来,我使用RDD对许多不同的account_nrdf(它们是Pandas嵌套)组合并行运行函数calculation。我的代码如下:

rdd = sc.parallelize(input_data)
result_rdd = rdd.map(lambda args: calculation(args[0], args[1]))
results = result_rdd.collect()

但是,我不断收到错误消息PicklingError: Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
有没有人知道这个错误消息是什么意思,以及我应该如何更改代码,以便我可以使用不同的输入参数并行运行我的函数?

j8ag8udp

j8ag8udp1#

试试这个:

from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

spark = SparkSession.builder.appName('Notebook').getOrCreate()

def calculation(account: str, data: DataFrame) -> DataFrame:
    return data

input_data_spark = [(account, spark.createDataFrame(df)) for account, df in input_data]

rdd = sc.parallelize(input_data_spark)

result_rdd = rdd.map(lambda args: calculation(args[0], args[1]))
results = result_rdd.collect()

相关问题