我在s3 bucket中有一个jason格式的大数据集(5gb)。我需要转换数据的模式,并使用etl脚本将转换后的数据写回s3。
因此,我使用一个爬虫来检测模式,并将数据加载到pysparkDataframe中,然后更改模式。现在我迭代Dataframe中的每一行并将其转换为字典。删除空列,然后将字典转换为字符串并写回s3。代码如下:
# df is the pyspark dataframe
columns = df.columns
print(columns)
s3 = boto3.resource('s3')
cnt = 1
for row in df.rdd.toLocalIterator():
data = row.asDict(True)
for col_name in columns:
if data[col_name] is None:
del data[col_name]
content = json.dumps(data)
object = s3.Object('write-test-transaction-transformed', str(cnt)).put(Body=content)
cnt = cnt+1
print(cnt)
我曾经是一个文学家。上述代码的执行是串行执行的吗?如果是,那么如何优化它?有没有更好的方法来执行上述逻辑?
3条答案
按热度按时间pn9klfpd1#
我将遵循以下方法(用scala编写,但可以用python实现,只需很少的更改)-
找到数据集计数并将其命名为
totalCount
```val totalcount = inputDF.count()
获取要删除的列
使用步骤2中创建的Map,将count小于totalcount的列标记为要删除的列
选择所有具有
count == totalCount
从输入Dataframe,并将处理后的输出Dataframe按要求以任何格式保存在任何地方。请注意,
this approach will remove all the column having at least one null value
```val fieldToBool = fieldToCount.mapValues(_ < totalcount)
val processedDF = inputDF.select(fieldToBool.filterNot(.2).map(.1) :*)
// save this processedDF anywhere in any format as per requirement
qnzebej02#
我解决了上面的问题。我们可以简单地查询dataframe中的空值。df=df.filter(df.column.isnotnull()),从而删除存在null的所有行。因此,如果有n列,我们需要2^n个查询来筛选出所有可能的组合。在我的例子中,共有10列,总共1024个查询,这是可以接受的,因为sql查询是并行的。
uqdfh47h3#
假设数据集中的每一行都是json字符串格式
如果输入Dataframe具有cols,那么输出只需要不为null cols
结果是什么