我在试着理解foreachpartition是如何工作的。由于foreachpartition print语句不会从执行器发送回spark驱动程序stdout,因此我考虑将数据写入s3。
因此,我创建了一个随机Dataframe,并尝试将每个分区的json数据写入s3。但是,我注意到有些数据并没有写入s3。无论我为Dataframe选择了多少个分区(2或18或38,等等…),在这个示例中('df_2.rdd.getnumpartitions()'),似乎每次都有一些数据没有出现在s3中。如何让foreachpartitions将整个Dataframe写入s3?我做错什么了?
# Creating random dataframe filled with random numbers
from pyspark.sql import SQLContext
import pandas as pd
df = pd.DataFrame([[1,2,5], [22,55,112],[151,663,4123],[6634,31,23],[2141,22,22],[21,11,2],[66,85,4]])
sqlCtx = SQLContext(sc)
df_2 = sqlCtx.createDataFrame(df)
df_2.show(10)
print('partitions: ', df_2.rdd.getNumPartitions())
# output:
+----+---+----+
| 0| 1| 2|
+----+---+----+
| 1| 2| 5|
| 22| 55| 112|
| 151|663|4123|
|6634| 31| 23|
|2141| 22| 22|
| 21| 11| 2|
| 66| 85| 4|
+----+---+----+
partitions: 2
import random
import os
import json
import boto3
# function to pass in foreachParittion
my_bucket = 'ramdom_bucket'
folder = 'my/random/folder'
def f(iterator):
#assigning random number for a file
rand_int = random.randint(4,512314123)
# rand_int = 512
s3 = boto3.resource('s3')
s3object = s3.Object(my_bucket, os.path.join(folder, 'data_{}.json'.format(rand_int)))
#creating list from partition
ls_iter = list(iterator)
#Copying list object as JSON to a random file name in s3
s3object.put(
Body=(bytes(json.dumps(ls_iter).encode('UTF-8')))
)
# df_2 is the dataframe I created in previous code block
df_2.foreachPartition(f)
1条答案
按热度按时间ldioqlga1#
在我看来,你想知道Spark是如何内部计算的吗?你可以从中得到有用的东西https://spark.apache.org/docs/latest/job-scheduling.html. 它会给你一个大致的Angular 来感受星火的调度。此外,您还可以访问调度程序、阶段调度程序、任务调度程序来了解如何执行作业。
对于这个问题,我认为h=高并发性和高计算速度几乎没有留给s3存储对象的时间。