在pyspark中优化行访问和转换

esbemjvw  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(487)

我在s3 bucket中有一个jason格式的大数据集(5gb)。我需要转换数据的模式,并使用etl脚本将转换后的数据写回s3。
因此,我使用一个爬虫来检测模式,并将数据加载到pysparkDataframe中,然后更改模式。现在我迭代Dataframe中的每一行并将其转换为字典。删除空列,然后将字典转换为字符串并写回s3。代码如下:


# df is the pyspark dataframe

columns = df.columns
print(columns)
s3 = boto3.resource('s3')
cnt = 1

for row in df.rdd.toLocalIterator():
    data = row.asDict(True)

    for col_name in columns:
        if data[col_name] is None:
            del data[col_name]

    content = json.dumps(data)
    object = s3.Object('write-test-transaction-transformed', str(cnt)).put(Body=content)
    cnt = cnt+1
print(cnt)

我曾经是一个文学家。上述代码的执行是串行执行的吗?如果是,那么如何优化它?有没有更好的方法来执行上述逻辑?

pn9klfpd

pn9klfpd1#

我将遵循以下方法(用scala编写,但可以用python实现,只需很少的更改)-
找到数据集计数并将其命名为 totalCount ```
val totalcount = inputDF.count()

找到 `count(col)` 对于所有的dataframe列,获取字段到其计数的Map
对于输入dataframe的所有列,计算计数
请注意 `count(anycol)` 返回提供的列都为非空的行数。例如-如果一列有10个行值,如果有5个值 `null` 然后计数(列)变为5
取第一行作为 `Map[colName, count(colName)]` 称为 `fieldToCount` ```
val cols = inputDF.columns.map { inputCol =>
      functions.count(col(inputCol)).as(inputCol)
    }
// Returns the number of rows for which the supplied column are all non-null.
    // count(null) returns 0
    val row = dataset.select(cols: _*).head()
    val fieldToCount = row.getValuesMap[Long]($(inputCols))

获取要删除的列
使用步骤2中创建的Map,将count小于totalcount的列标记为要删除的列
选择所有具有 count == totalCount 从输入Dataframe,并将处理后的输出Dataframe按要求以任何格式保存在任何地方。
请注意, this approach will remove all the column having at least one null value ```
val fieldToBool = fieldToCount.mapValues(_ < totalcount)
val processedDF = inputDF.select(fieldToBool.filterNot(.2).map(.1) :*)
// save this processedDF anywhere in any format as per requirement

我相信这种方法会比你目前的方法表现得更好
qnzebej0

qnzebej02#

我解决了上面的问题。我们可以简单地查询dataframe中的空值。df=df.filter(df.column.isnotnull()),从而删除存在null的所有行。因此,如果有n列,我们需要2^n个查询来筛选出所有可能的组合。在我的例子中,共有10列,总共1024个查询,这是可以接受的,因为sql查询是并行的。

uqdfh47h

uqdfh47h3#

假设数据集中的每一行都是json字符串格式

import pyspark.sql.functions as F

def drop_null_cols(data):
    import json
    content = json.loads(data)
    for key, value in list(content.items()):
        if value is None:
            del content[key]

    return json.dumps(content)

drop_null_cols_udf = F.udf(drop_null_cols, F.StringType())

df = spark.createDataFrame(
    ["{\"name\":\"Ranga\", \"age\":25, \"city\":\"Hyderabad\"}",
     "{\"name\":\"John\", \"age\":null, \"city\":\"New York\"}",
     "{\"name\":null, \"age\":31, \"city\":\"London\"}"],
    "string"
).toDF("data")

df.select(
    drop_null_cols_udf("data").alias("data")
).show(10,False)

如果输入Dataframe具有cols,那么输出只需要不为null cols

df = spark.createDataFrame(
        [('Ranga', 25, 'Hyderabad'),
         ('John', None, 'New York'),
         (None, 31, 'London'),
        ],
        ['name', 'age', 'city']
    )

df.withColumn(
    "data", F.to_json(F.struct([x for x in df.columns]))
).select(
    drop_null_cols_udf("data").alias("data")
).show(10, False)

# df.write.format("csv").save("s3://path/to/file/) -- save to s3

结果是什么

+-------------------------------------------------+
|data                                             |
+-------------------------------------------------+
|{"name": "Ranga", "age": 25, "city": "Hyderabad"}|
|{"name": "John", "city": "New York"}             |
|{"age": 31, "city": "London"}                    |
+-------------------------------------------------+

相关问题