在pyspark中以分布式方式高效地生成大型Dataframe(不使用pyspark.sql.row)

oalqel3c  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(542)

问题归结为以下几个方面:我想在pyspark中使用现有的并行化输入集合和一个函数生成一个dataframe,该函数给定一个输入就可以生成相对较大的一批行。在下面的示例中,我希望使用1000个执行器生成10^12行Dataframe:

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)

(我真的不想研究给定种子的随机数的分布-这只是一个示例,我可以用它来说明大型Dataframe不是从仓库加载的,而是由代码生成的)
上面的代码几乎完全符合我的要求。问题是,它以一种非常低效的方式来实现它—代价是为每一行创建一个python行对象,然后将python行对象转换为内部spark列表示。
有没有一种方法可以转换已经以列表示形式存在的一批行(例如,上面的一个或几个numpy数组) np_array )只是让spark知道这些是一批值的列?
e、 g.我可以编写代码来生成python集合rdd,其中每个元素都是pyarrow.recordbatch或pandas.dataframe,但是如果不在这个过程中创建pyspark行对象的rdd,我就找不到方法将这些元素转换成spark dataframe。
至少有十几篇文章举例说明了如何使用pyarrow+pandas有效地将本地(到驱动程序)pandasDataframe转换为sparkDataframe,但这对我来说不是一个选择,因为我需要在执行器上以分布式方式实际生成数据,而不是在驱动程序上生成一个Dataframe并将其发送给执行器。
升级。我找到了一种避免创建行对象的方法——使用python元组的rdd。正如预期的那样,它仍然太慢,但仍然比使用row对象快一点。不过,这并不是我真正想要的(这是从python向spark传递列数据的一种非常有效的方法)。
在机器上做某些操作的测量时间(粗略的方法,测量时间有很大的变化,但在我看来仍然具有代表性):所讨论的数据集是10m行,3列(一列是常量整数,另一列是0到10m-1的整数范围,第三列是使用 np.random.random_sample :
本地生成Dataframe(10m行):~440-450ms
本地生成spark.sql.row对象的python列表(10m行):~12-15s
本地生成表示行的元组的python列表(10m行):~3.4-3.5s
仅使用1个执行器和1个初始种子值生成sparkDataframe:
使用 spark.createDataFrame(row_rdd, schema=my_schema) 约70-80岁
使用 spark.createDataFrame(tuple_rdd, schema=my_schema) :~40-45秒
(非分布式创建)使用 spark.createDataFrame(pandas_df, schema=my_schema) :~0.4-0.5秒(不需要大致相同的时间)-有 spark.sql.execution.arrow.enabled 设置为true。
对于10m行,本地到驱动程序的Dataframe在~1s内转换为sparkDataframe的例子让我有理由相信,对于在执行器中生成的Dataframe,同样的情况应该是可能的。不过,我现在可以实现的最快速度是使用python元组的rdd,10m行的速度是~40s。
所以问题仍然存在-有没有一种方法可以在pyspark中高效地以分布式方式生成一个大的sparkDataframe?

b4wnujal

b4wnujal1#

听起来瓶颈是从rdd->dataframes的转换,而且手头的函数相当快,通过pyarrow将pandadf转换为spark df也相当快。以下是两种可能的解决方案:
因为并行创建df很容易,所以使用 df.to_parquet ,即:

def generate_data(seed):
    M = 10
    np.random.seed(seed)
    np_array = np.random.random_sample(M) # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    df.reset_index().to_parquet(f"s3://bucket/part-{str(seed).zfill(5)}.parquet"

Spark读取结果Parquet文件应该是微不足道的事后。然后瓶颈就变成了io限制,这应该比spark转换元组/行类型快。
如果不允许您将任何内容保存到文件中, pandas_udf 以及 GROUPED_MAP 如果你的spark版本足够新的话,可能会对你有所帮助。它还使用pyarrow在spark dfs和pandas dfs之间进行转换,因此它应该比使用元组更快,并允许您以分布式方式从udf创建和返回pandas dfs。

import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

N = 10

df = spark.createDataFrame(
    [(i,) for i in range(N)], ["seed"]
)

def generate_data(seed):
    M = 10
    np.random.seed(seed)
    np_array = np.random.random_sample(M) # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    return df.reset_index()

@pandas_udf("index long, x double, seed long", PandasUDFType.GROUPED_MAP)
def generate_data_udf(pdf):
    output = []
    for idx, row in pdf.iterrows():
        output.append(generate_data(row["seed"]))
    return pd.concat(output)

df.groupby("seed").apply(generate_data_udf).show()

较慢的部分将是 groupby 你可能可以加快速度,这取决于你如何批量种子进入 generate_data_udf ,即:

@udf(returnType=IntegerType())
def batch_seed(seed):
    return seed // 10

df.withColumn("batch_seed", batch_seed(col("seed"))). \
groupBy("batch_seed").apply(generate_data_udf).show()
0g0grzrc

0g0grzrc2#

以下是一个不使用rdd或创建行的解决方案,但仅使用Dataframe操作:
(代码是用scala编写的,但用python编写同样的代码应该很简单)

val N = 100000

//for seed return array of index and random_value
def generate_data(i: Int): Array[(Int, Double)] = ???
val generate_data_udf = udf (generate_data _)

spark
  .range(N)
  .toDF("seed")
  .withColumn("arr", generate_data_udf($"seed"))
  .select(
    $"seed",
    explode($"arr") as "exp"
  )
  .select(
    $"seed",
    $"exp._1" as "n",
    $"exp._2" as "x"
  )
ogq8wdun

ogq8wdun3#

以下是不使用 Row -仅基于rdd。我认为这可能是最有效的方法,因为它使用 map 计算函数输出和 flatMap 为了合并这些输出-这两个操作都在RDD上执行,所以所有的东西都应该被分发。

import numpy as np
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext

def generate_data(one_integer):
  M = 2 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  return [(one_integer, i, float(np_array[i])) for i in range(M)]

N = 30 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = sc.parallelize(list_of_integers)
generated_data_rdd = list_of_integers_rdd.map(lambda x: generate_data(x))
solved_rdd = generated_data_rdd.flatMap(lambda list: list)

df = spark.createDataFrame(solved_rdd).toDF("seed", "n", "x")
df.show()

相关问题