Pyspark合并列值相同的行

carvr3hs  于 2023-08-02  发布在  Spark
关注(0)|答案(2)|浏览(120)

我有一个pyspark Dataframe ,看起来像这样(不能假设数据总是按照显示的顺序。此外,服务的总数也是无限的,而在下面的示例中仅显示了2个):

| service | usage | count |
|---------|-------|-------|
|   a     |  low  |   5   |
|   a     | high  |   3   |
|   b     | high  |   3   |   
|   b     | low   |   2   |

字符串
我想创建一个新的dataframe,其中每行都有字符串格式的服务使用计数。举例来说:

| service |       info       |
|---------|------------------|
|   a     |{low: 5, high: 3} |
|   b     |{low: 2, high: 3} |


这是我尝试的:

# data is originally in dataframe called df:
new_df = df.groupBy('service').agg(F.collect_list('count').alias('info'))


但是上面的输出是这样的:

| service |       info       |
|---------|------------------|
|   a     |       [5,3]      |
|   b     |       [3,2]      |


这里的问题是,info列中的数字顺序并没有告诉我哪个数字对应于high,哪个对应于low的用法。也没有标签。

jyztefdp

jyztefdp1#

这里的解决方案需要提前知道usage中的类别。这里的想法是使用pivot获取low/high列,然后将其转换为结构体,然后可以将其转换为JSON字符串。
我们从这个数据框开始

df = spark.createDataFrame(
    pd.DataFrame({
        "service": ["a", "a", "a", "b", "b", "c"],
        "usage": ["low", "high", "high", "high", "low", "low"],
        "count": [5, 3, 3, 2, 4, 1],
    })
)

个字符
由于您没有在问题中指定,因此我假设一个一般情况,即(service, usage)不是唯一的(参见a),并且如果给定“服务”的“使用”不完整(参见c)。
解决方案:

(
    df
    .groupBy("service")
    .pivot("usage", values=["low", "high"])
    .agg(
        F.max("count").alias("count"),
    )
    # -------- (1)
    .select(
        "service",
        F.to_json(F.struct(
            col("low").alias("low"),
            col("high").alias("high"),
        )).alias("info"),
    )
    .show()
    # --------- (2)
)

部分备注:

  • 当使用pivot时,如果您知道这些值,建议您指定它们,以便spark不需要花费时间计算不同的枚举(更有效)。
  • 如果你在(1)之后.show(),它看起来像这样:
+-------+---+----+
|service|low|high|
+-------+---+----+
|      c|  1|null|
|      a|  5|   3|
|      b|  4|   2|
+-------+---+----+

  • 如果你在(2)之后.show(),那么它看起来像这样(你所追求的结果):
+-------+------------------+
|service|              info|
+-------+------------------+
|      c|         {"low":1}|
|      a|{"low":5,"high":3}|
|      b|{"low":4,"high":2}|
+-------+------------------+

svmlkihl

svmlkihl2#

df.groupby('service').agg(
  to_json(
    collect_list(
      create_map('usage', 'count')
    )
  ).alias('Info')
).show()

字符串

相关问题