pyspark foreachpartition未写入所有数据

7fyelxc5  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(761)

我在试着理解foreachpartition是如何工作的。由于foreachpartition print语句不会从执行器发送回spark驱动程序stdout,因此我考虑将数据写入s3。
因此,我创建了一个随机Dataframe,并尝试将每个分区的json数据写入s3。但是,我注意到有些数据并没有写入s3。无论我为Dataframe选择了多少个分区(2或18或38,等等…),在这个示例中('df_2.rdd.getnumpartitions()'),似乎每次都有一些数据没有出现在s3中。如何让foreachpartitions将整个Dataframe写入s3?我做错什么了?


# Creating random dataframe filled with random numbers

from pyspark.sql import SQLContext
import pandas as pd
df = pd.DataFrame([[1,2,5], [22,55,112],[151,663,4123],[6634,31,23],[2141,22,22],[21,11,2],[66,85,4]])
sqlCtx = SQLContext(sc)
df_2 = sqlCtx.createDataFrame(df)
df_2.show(10)
print('partitions: ', df_2.rdd.getNumPartitions())

# output:
+----+---+----+
|   0|  1|   2|
+----+---+----+
|   1|  2|   5|
|  22| 55| 112|
| 151|663|4123|
|6634| 31|  23|
|2141| 22|  22|
|  21| 11|   2|
|  66| 85|   4|
+----+---+----+

partitions:  2
import random
import os
import json
import boto3

# function to pass in foreachParittion

my_bucket = 'ramdom_bucket'
folder = 'my/random/folder'

def f(iterator):
    #assigning random number for a file
    rand_int = random.randint(4,512314123)

# rand_int = 512

    s3 = boto3.resource('s3')
    s3object = s3.Object(my_bucket, os.path.join(folder, 'data_{}.json'.format(rand_int)))
    #creating list from partition
    ls_iter = list(iterator)
    #Copying list object as JSON to a random file name in s3
    s3object.put(
        Body=(bytes(json.dumps(ls_iter).encode('UTF-8')))
    )

# df_2 is the dataframe I created in previous code block

df_2.foreachPartition(f)
ldioqlga

ldioqlga1#

在我看来,你想知道Spark是如何内部计算的吗?你可以从中得到有用的东西https://spark.apache.org/docs/latest/job-scheduling.html. 它会给你一个大致的Angular 来感受星火的调度。此外,您还可以访问调度程序、阶段调度程序、任务调度程序来了解如何执行作业。
对于这个问题,我认为h=高并发性和高计算速度几乎没有留给s3存储对象的时间。

//ADD RETURN RESULT FOR PUT ACTION, And you can debug the error.
``` `s3object.put( Body=(bytes(json.dumps(ls_iter).encode('UTF-8'))) )` ```
//ADD SOME TIME TO WAIT UNTIL THE PUT ACTION FINISGED;

相关问题