使用pyspark将数据提取到独立文件中以解决问题:Spark缓冲区保持器大小限制问题

bnlyeluc  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(116)

问题

我的问题和this one: Spark buffer holder size limit issue一样。
我的代码是这样的:

# Calculate the statistics
stats = df.groupBy("EventType").agg(
    size(collect_set("Parameters")).alias("ParameterLength"),
    collect_list("Parameters").alias("Parameters"),
    (count("*") / df.count() * 100).alias("Frequency"),
)

期望

解决方案一

我已经知道大部分的Parameters都很短,只有EventTypeAB有大量的参数,我想有没有办法跳过EventType ==“A”或“B”第一,并输出一个汇总表,然后输出EventType ==“A”或“B”到一个独立的文件或其他东西。
我想要的是这样的:

EventType   ParameterLength Frequency   Parameters
TimeChanged 0               0.799471    []
Alarm       40              71.643145   [ele1,ele2,...ele40]
...
A           9999999             12.12       None--> file generated
B           30031               5.21        None--> file generated

方案二

获取溢出列(如“A”和“B”)的前100个参数。或者当元素> 100时,执行切片,否则执行正常操作。

方案三

Parameters数组包含了所有的{“Key”:XXX,“Value”:YYY}对,有没有办法通过在spark中将它们变成像(XXX,YYY)或StructType这样的元组来减少大小?

vvppvyoh

vvppvyoh1#

我使用了解决方案2,这是我的代码:

stats = df.groupBy("EventType").agg(
    size(collect_set("Parameters")).alias("ParameterLength"),
    when(
        size(collect_set("Parameters")) > 1000,
        slice(collect_set("Parameters"), 1, 1000).alias("Parameters"),
    )
    .otherwise(collect_set("Parameters"))
    .alias("Parameters"),
    (count("*") / df.count() * 100).alias("Frequency"),
)

相关问题