正向填充.net for spark

vulvrdjw  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(381)

我在看窗口函数寻找Spark DataFrame 在.net(c#)中。
我有一个Dataframe df 包含年、月、日、小时、分钟、id、类型和值列:

| 2021 |  3  |  4  |  8  |  9  |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  10 | null |   null  |   null  |

| 2021 |  3  |  4  |  8  |  11 | null |   null  |   null  |

| 2021 |  3  |  4  |  8  |  12 | null |   null  |   null  |

| 2021 |  3  |  4  |  8  |  13 |  87  |  Type1  |    0.0  |

| 2021 |  3  |  4  |  8  |  14 |  87  |  Type1  |    0.0  |

我想用前一行基于年、月、日、小时、分钟的值填充空行(null),如下所示:

| 2021 |  3  |  4  |  8  |  9  |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  10 |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  11 |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  12 |  87  |  Type1  |  380.5  |

| 2021 |  3  |  4  |  8  |  13 |  87  |  Type1  |    0.0  |

| 2021 |  3  |  4  |  8  |  14 |  87  |  Type1  |    0.0  |

到目前为止,我在scala中找到了使用windows和lag函数的解决方案,但我不知道如何在c#中实现。在scala中,窗口的定义如下: val window = Window.orderBy("Year", "Month", "Day", "Hour", "Minute") 我想添加一个newvalue列,使用 var filledDataFrame = df.WithColumn("newValue", Functions.When(df["Value"].IsNull(), Functions.Lag(df["Value"], 1).Over(window)).Otherwise(df["Value"]) 如何在.net中为spark定义一个窗口,并使用滞后函数向前填充空值?

ybzsozfc

ybzsozfc1#

要使用lag和.net for apache spark的窗口,您已经非常接近了,需要:

var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
    new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
    new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
    new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
    new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
    new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
    new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
    new StructField("Year", new IntegerType()),
    new StructField("Month", new IntegerType()),
    new StructField("Day", new IntegerType()),
    new StructField("Hour", new IntegerType()),
    new StructField("Minute", new IntegerType()),
    new StructField("ID", new IntegerType()),
    new StructField("Type", new StringType()),
    new StructField("Value", new DoubleType()),

}));

var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
    Functions.When(df["Value"].IsNull(),
            Functions.Lag(df["Value"], 1).Over(window))
        .Otherwise(df["Value"]));

filledDataFrame.Show(1000, 10000);

这将导致:

+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute|  ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021|    3|  4|   8|     9|  87|Type1|380.5|   380.5|
|2021|    3|  4|   8|    10|null| null| null|   380.5|
|2021|    3|  4|   8|    11|null| null| null|    null|
|2021|    3|  4|   8|    12|null| null| null|    null|
|2021|    3|  4|   8|    13|  87|Type1|  0.0|     0.0|
|2021|    3|  4|   8|    14|  87|Type1|  0.0|     0.0|
+----+-----+---+----+------+----+-----+-----+--------+

但你可能想要 Last 而不是 Lag 可以跳过空值:

var spark = SparkSession.Builder().GetOrCreate();
var df = spark.CreateDataFrame(new List<GenericRow>()
{
    new GenericRow(new object[] {2021, 3, 4, 8, 9, 87, "Type1", 380.5}),
    new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
    new GenericRow(new object[] {2021, 3, 4, 8, 11, null, null, null}),
    new GenericRow(new object[] {2021, 3, 4, 8, 12, null, null, null}),
    new GenericRow(new object[] {2021, 3, 4, 8, 13, 87, "Type1", 0.0}),
    new GenericRow(new object[] {2021, 3, 4, 8, 14, 87, "Type1", 0.0})
}, new StructType(new List<StructField>()
{
    new StructField("Year", new IntegerType()),
    new StructField("Month", new IntegerType()),
    new StructField("Day", new IntegerType()),
    new StructField("Hour", new IntegerType()),
    new StructField("Minute", new IntegerType()),
    new StructField("ID", new IntegerType()),
    new StructField("Type", new StringType()),
    new StructField("Value", new DoubleType()),

}));

var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
var filledDataFrame = df.WithColumn("newValue",
    Functions.When(df["Value"].IsNull(),
        Functions.Last(df["Value"], true).Over(window))
        .Otherwise(df["Value"]));

filledDataFrame.Show(1000, 10000);

结果是:

+----+-----+---+----+------+----+-----+-----+--------+
|Year|Month|Day|Hour|Minute|  ID| Type|Value|newValue|
+----+-----+---+----+------+----+-----+-----+--------+
|2021|    3|  4|   8|     9|  87|Type1|380.5|   380.5|
|2021|    3|  4|   8|    10|null| null| null|   380.5|
|2021|    3|  4|   8|    11|null| null| null|   380.5|
|2021|    3|  4|   8|    12|null| null| null|   380.5|
|2021|    3|  4|   8|    13|  87|Type1|  0.0|     0.0|
|2021|    3|  4|   8|    14|  87|Type1|  0.0|     0.0|
+----+-----+---+----+------+----+-----+-----+--------+

希望有帮助!
预计起飞时间
(这项工作所需的using语句)

using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;

相关问题