高效时间间隙填充数据

rur96b6h  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(398)

我想填补我工作中的空白 DataFrame 将.net用于spark。
电流 DataFrame (rawData) 包含在 reportFrom 以及 reportTo ```
DateTime reportFrom = new DateTime(2021, 3, 4, 0, 0, 0);
DateTime reportTo = new DateTime(2021, 3, 5, 0, 0, 0);

缺少一些间隔,我想用最后一个已知值填充它们。

+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id | Type| Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021| 3| 4| 0| 0| 87| Power| 0.0|
|2021| 3| 4| 0| 1| 87| Power| 0.0|
|2021| 3| 4| 0| 2| 87| Power| 0.0|
...
|2021| 3| 4| 14| 2| 87| Power| 380.0|
|2021| 3| 4| 14| 3| 87| Power| 380.0|
|2021| 3| 4| 14| 4| 87| Power| 380.0|
|2021| 3| 4| 14| 5| 87| Power| 380.0|
|2021| 3| 4| 14| 7| 87| Power| 380.0|
...
|2021| 3| 4| 22| 7| 87| Power| 0.0|

在第一步(插入缺失的分钟数)之后,我期望的结果是:

+----+-----+---+----+------+------------------+--------------------+------------------+
|Year|Month|Day|Hour|Minute|Id | Type| Value|
+----+-----+---+----+------+------------------+--------------------+------------------+
|2021| 3| 4| 0| 0| 87| Power| 0.0|
|2021| 3| 4| 0| 1| 87| Power| 0.0|
|2021| 3| 4| 0| 2| 87| Power| 0.0|
...
|2021| 3| 4| 14| 2| 87| Power| 380.0|
|2021| 3| 4| 14| 3| 87| Power| 380.0|
|2021| 3| 4| 14| 4| 87| Power| 380.0|
|2021| 3| 4| 14| 5| 87| Power| 380.0|
|2021| 3| 4| 14| 6| null| null| null|
|2021| 3| 4| 14| 7| 87| Power| 380.0|
|2021| 3| 4| 14| 8| null| null| null|
...
|2021| 3| 4| 23| 59| null| null| null|

到目前为止,我曾经创造了一个新的 `DataFrame` 所有的时间,然后表演 `left outer Join` 在两个Dataframe上。

int inc = 1;
List timeList = new List();
while (reportFrom < reportTo)
{
timeList.Add(reportFrom);
reportFrom = reportFrom.AddMinutes(inc);
}

var toFillTime0 = new List { -1, 0, 0, 0, 0 };

var dataToFill = spark.CreateDataFrame(
new List { new GenericRow(toFillTime0.ToArray()) },
new StructType( //shema
new List()
{
new StructField("Year0", new IntegerType()),
new StructField("Month0", new IntegerType()),
new StructField("Day0", new IntegerType()),
new StructField("Hour0", new IntegerType()),
new StructField("Minute0", new IntegerType()),
}));

foreach (DateTime time in timeList)
{

var toFillTime = new List<object> { time.Year, time.Month, time.Day, time.Hour, time.Minute };

var dataToFillt = spark.CreateDataFrame(
    new List<GenericRow> { new GenericRow(toFillTime.ToArray()) },
    new StructType(                     //shema
    new List<StructField>()
    {
        new StructField("Year0", new IntegerType()),
        new StructField("Month0", new IntegerType()),
        new StructField("Day0", new IntegerType()),
        new StructField("Hour0", new IntegerType()),
        new StructField("Minute0", new IntegerType()),
    }));

dataToFill = dataToFill.Union(dataToFillt);

}

dataToFill = dataToFill.Filter("Year0 > 0");

var toFillReportDataReq = dataToFill.Join(rawData,
dataToFill["Year0"] == update10["Year"] & dataToFill["Month0"] == update10["Month"] & dataToFill["Day0"] == update10["Day"]
& dataToFill["Hour0"] == update10["Hour"] & dataToFill["Minute0"] == update10["Minute"], "left_outer");

几排 `toFillReportDataReq` 如下所示:

|2021| 3| 4| 22| 4| 87| Power| 0.0|
|2021| 3| 4| 22| 5| 87| Power| 0.0|
|2021| 3| 4| 22| 6| 87| Power| 0.0|
|2021| 3| 4| 22| 7| 87| Power| 0.0|
|2021| 3| 4| 22| 8| null| null| null|
|2021| 3| 4| 22| 9| null| null| null|
|2021| 3| 4| 22| 10| null| null| null|
|2021| 3| 4| 22| 11| null| null| null|
|2021| 3| 4| 22| 12| null| null| null|
|2021| 3| 4| 22| 13| null| null| null|
|2021| 3| 4| 22| 14| null| null| null|

列中空值的替换 `Values` 已涵盖使用 `window` 以及 `last` 功能。
列中的值 `Id` 以及 `Type` 替换为 `var id = 87` 和“权力”的使用

toFillReportDataReq = toFillReportDataReq.WithColumn("Id", Functions.When(toFillReportDataReq["Id"].IsNull(), id)
.Otherwise(toFillReportDataReq["Id"]))
.WithColumn("Type", Functions.When(toFillReportDataReq["Type"].IsNull(), "Power")
.Otherwise(toFillReportDataReq["Type"]));

这个方法返回我想要的结果,但是它非常耗时(低效)。
我的问题如下:
有没有更充分的方法来创建一个新的 `DataFrame` 包含指定间隔之间的所有分钟数?
有没有办法避免加入这个方法?
在id到id和type到“power”列中定义所有值的最佳方法是什么?
谢谢!
6yjfywim

6yjfywim1#

这是我将采取的方法:
构建一个dataframe,它为您想要表示的每一分钟都有一行(我使用spark.range为我需要的每一分钟投影一行)
对于范围内的每个id,在开始日期前加一分钟
使用左外连接将日期连接到原始Dataframe,这样就不会丢失任何行
然后使用last来填充任何空白-注意,如果以null开头,则newvalue将为null,直到获得一些数据

using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Expressions;
using Microsoft.Spark.Sql.Types;

namespace StackOverflow
{
    class Program
    {
        static void Main(string[] args)
        {
            var spark = SparkSession.Builder().GetOrCreate();

            //Sample data set - we will fill in the missing minutes
            var df = spark.CreateDataFrame(new List<GenericRow>()
            {
                new GenericRow(new object[] {2021, 3, 4, 8, 3, 87, "Type1", 380.5}),
                new GenericRow(new object[] {2021, 3, 4, 8, 10, null, null, null}),
                new GenericRow(new object[] {2021, 3, 4, 8, 20, null, null, null}),
                new GenericRow(new object[] {2021, 3, 4, 8, 25, null, null, null}),
                new GenericRow(new object[] {2021, 3, 4, 8, 35, 87, "Type1", 0.0}),
                new GenericRow(new object[] {2021, 3, 4, 8, 45, 87, "Type1", 0.0})
            }, new StructType(new List<StructField>()
            {
                new StructField("Year", new IntegerType()),
                new StructField("Month", new IntegerType()),
                new StructField("Day", new IntegerType()),
                new StructField("Hour", new IntegerType()),
                new StructField("Minute", new IntegerType()),
                new StructField("ID", new IntegerType()),
                new StructField("Type", new StringType()),
                new StructField("Value", new DoubleType()),
            }));

            //start and end time
            var reportFrom = new DateTime(2021, 3, 4, 7, 0, 0);
            var reportTo = new DateTime(2021, 3, 4, 9, 0, 0);

            //convert start time to unix epoch as we can't pass a DateTime to spark (yet!)
            var unixFromTime = (reportFrom - new DateTime(1970, 1, 1, 0, 0, 0, 0)).TotalSeconds;

            //how many total rows do we need?
            var minutesToCreate = reportTo.Subtract(reportFrom).TotalMinutes;

            //create a dataframe with 1 row for every minute we need
            var everyMinute = spark.Range((long) minutesToCreate);

            //Add the reportFrom unix epoch
            everyMinute = everyMinute.WithColumn("BaseTime", Functions.Lit(unixFromTime));

            //add to the unix epoch, the Id (incrementing number) multiplied by 60 - if we didn't mul(60) it would add seconds and not minutes
            everyMinute = everyMinute.WithColumn("Time",
                Functions.Lit(unixFromTime)
                    .Plus(Functions.Col("Id").Cast("Int").Multiply(Functions.Lit(60))));

            //convert the unix epoch to an actual timestamp and drop all the intermediate columns
            everyMinute = everyMinute.WithColumn("Date",
                Functions.ToTimestamp(Functions.FromUnixTime(Functions.Col("Time")))).Select("Date");

            //convert timestamp into individual columns

            everyMinute = everyMinute.WithColumn("Year", Functions.Year(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Month", Functions.Month(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Day", Functions.DayOfMonth(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Hour", Functions.Hour(Functions.Col("Date")));
            everyMinute = everyMinute.WithColumn("Minute", Functions.Minute(Functions.Col("Date")));

            //join both data frames so...
            var dfAllData = everyMinute.Join(df, new List<string>() {"Year", "Month", "Day", "Hour", "Minute"}, "left_outer");

            //add in data using Last
            var window = Window.OrderBy("Year", "Month", "Day", "Hour", "Minute");
            var filledDataFrame = dfAllData.WithColumn("newValue",
                Functions.When(dfAllData["Value"].IsNull(),
                        Functions.Last(dfAllData["Value"], true).Over(window))
                    .Otherwise(dfAllData["Value"]));

            filledDataFrame.Show(1000, 10000);
        }
    }
}

预计起飞时间

相关问题