如何在PySpark中的分组对象中插入列?

edqdpe6u  于 2022-11-01  发布在  Spark
关注(0)|答案(4)|浏览(121)

如何在分组数据中插入PySpark Dataframe ?
例如:
我有一个PySpark Dataframe ,其中包含以下列:

+--------+-------------------+--------+
|webID   |timestamp          |counts  |
+--------+-------------------+--------+
|John    |2018-02-01 03:00:00|60      |
|John    |2018-02-01 03:03:00|66      |
|John    |2018-02-01 03:05:00|70      |
|John    |2018-02-01 03:08:00|76      |
|Mo      |2017-06-04 01:05:00|10      |
|Mo      |2017-06-04 01:07:00|20      |
|Mo      |2017-06-04 01:10:00|35      |
|Mo      |2017-06-04 01:11:00|40      |
+--------+----------------- -+--------+

我需要将John和Mo的计数数据在各自的时间间隔内每分钟插值到一个数据点,我可以接受任何简单的线性插值--但请注意,我的真实的数据是每隔几秒进行一次,我希望插值到每一秒。
所以结果应该是:

+--------+-------------------+--------+
|webID   |timestamp          |counts  |
+--------+-------------------+--------+
|John    |2018-02-01 03:00:00|60      |
|John    |2018-02-01 03:01:00|62      |
|John    |2018-02-01 03:02:00|64      |
|John    |2018-02-01 03:03:00|66      |
|John    |2018-02-01 03:04:00|68      |
|John    |2018-02-01 03:05:00|70      |
|John    |2018-02-01 03:06:00|72      |
|John    |2018-02-01 03:07:00|74      |
|John    |2018-02-01 03:08:00|76      |
|Mo      |2017-06-04 01:05:00|10      |
|Mo      |2017-06-04 01:06:00|15      |
|Mo      |2017-06-04 01:07:00|20      |
|Mo      |2017-06-04 01:08:00|25      |
|Mo      |2017-06-04 01:09:00|30      |
|Mo      |2017-06-04 01:10:00|35      |
|Mo      |2017-06-04 01:11:00|40      |
+--------+----------------- -+--------+

新的行需要添加到我的原始 Dataframe 。正在寻找PySpark解决方案。

9q78igpj

9q78igpj1#

如果你使用Python,最简单的方法就是重用现有的Pandas函数,使用GROUPED_MAP udf:

from operator import attrgetter
from pyspark.sql.types import StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType

def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
    @pandas_udf(
        StructType(sorted(schema, key=attrgetter("name"))), 
        PandasUDFType.GROUPED_MAP)
    def _(pdf):
        pdf.set_index(timestamp_col, inplace=True)
        pdf = pdf.resample(freq).interpolate()
        pdf.ffill(inplace=True)
        pdf.reset_index(drop=False, inplace=True)
        pdf.sort_index(axis=1, inplace=True)
        return pdf
    return _

应用于您的数据:

from pyspark.sql.functions import to_timestamp

df = spark.createDataFrame([
    ("John",   "2018-02-01 03:00:00", 60),  
    ("John",   "2018-02-01 03:03:00", 66),  
    ("John",   "2018-02-01 03:05:00", 70),  
    ("John",   "2018-02-01 03:08:00", 76),  
    ("Mo",     "2017-06-04 01:05:00", 10),  
    ("Mo",     "2017-06-04 01:07:00", 20),  
    ("Mo",     "2017-06-04 01:10:00", 35),  
    ("Mo",     "2017-06-04 01:11:00", 40),
], ("webID", "timestamp", "counts")).withColumn(
  "timestamp", to_timestamp("timestamp")
)

df.groupBy("webID").apply(resample(df.schema, "60S")).show()

它产生

+------+-------------------+-----+
|counts|          timestamp|webID|
+------+-------------------+-----+
|    60|2018-02-01 03:00:00| John|
|    62|2018-02-01 03:01:00| John|
|    64|2018-02-01 03:02:00| John|
|    66|2018-02-01 03:03:00| John|
|    68|2018-02-01 03:04:00| John|
|    70|2018-02-01 03:05:00| John|
|    72|2018-02-01 03:06:00| John|
|    74|2018-02-01 03:07:00| John|
|    76|2018-02-01 03:08:00| John|
|    10|2017-06-04 01:05:00|   Mo|
|    15|2017-06-04 01:06:00|   Mo|
|    20|2017-06-04 01:07:00|   Mo|
|    25|2017-06-04 01:08:00|   Mo|
|    30|2017-06-04 01:09:00|   Mo|
|    35|2017-06-04 01:10:00|   Mo|
|    40|2017-06-04 01:11:00|   Mo|
+------+-------------------+-----+

这是在假设单个webID的输入数据和插值数据都适合单个节点的内存的情况下进行的(通常,其他精确的非迭代解决方案必须做出类似的假设)。如果不是这种情况,则可以通过采用重叠窗口轻松地进行近似

partial = (df
    .groupBy("webID", window("timestamp", "5 minutes", "3 minutes")["start"])
    .apply(resample(df.schema, "60S")))

以及聚合最终结果

from pyspark.sql.functions import mean

(partial
    .groupBy("webID", "timestamp")
    .agg(mean("counts")
    .alias("counts"))
    # Order by key and timestamp, only for consistent presentation
    .orderBy("webId", "timestamp")
    .show())

这当然是昂贵得多的(有两次混洗,并且一些值将被计算多次),但是如果重叠不足够大以包括下一个观测,则也会留下间隙。

+-----+-------------------+------+
|webID|          timestamp|counts|
+-----+-------------------+------+
| John|2018-02-01 03:00:00|  60.0|
| John|2018-02-01 03:01:00|  62.0|
| John|2018-02-01 03:02:00|  64.0|
| John|2018-02-01 03:03:00|  66.0|
| John|2018-02-01 03:04:00|  68.0|
| John|2018-02-01 03:05:00|  70.0|
| John|2018-02-01 03:08:00|  76.0|
|   Mo|2017-06-04 01:05:00|  10.0|
|   Mo|2017-06-04 01:06:00|  15.0|
|   Mo|2017-06-04 01:07:00|  20.0|
|   Mo|2017-06-04 01:08:00|  25.0|
|   Mo|2017-06-04 01:09:00|  30.0|
|   Mo|2017-06-04 01:10:00|  35.0|
|   Mo|2017-06-04 01:11:00|  40.0|
+-----+-------------------+------+
pkwftd7m

pkwftd7m2#

解决此问题的本地pyspark实现(无udf)是:

import pyspark.sql.functions as F
resample_interval = 1  # Resample interval size in seconds

df_interpolated = (
  df_data
  # Get timestamp and Counts of previous measurement via window function
  .selectExpr(
    "webID",
    "LAG(Timestamp) OVER (PARTITION BY webID ORDER BY Timestamp ASC) as PreviousTimestamp",
    "Timestamp as NextTimestamp",
    "LAG(Counts) OVER (PARTITION BY webID ORDER BY Timestamp ASC) as PreviousCounts",
    "Counts as NextCounts",
  )
  # To determine resample interval round up start and round down end timeinterval to nearest interval boundary
  .withColumn("PreviousTimestampRoundUp", F.expr(f"to_timestamp(ceil(unix_timestamp(PreviousTimestamp)/{resample_interval})*{resample_interval})"))
  .withColumn("NextTimestampRoundDown", F.expr(f"to_timestamp(floor(unix_timestamp(NextTimestamp)/{resample_interval})*{resample_interval})"))
  # Make sure we don't get any negative intervals (whole interval is within resample interval)
  .filter("PreviousTimestampRoundUp<=NextTimestampRoundDown")
  # Create resampled time axis by creating all "interval" timestamps between previous and next timestamp
  .withColumn("Timestamp", F.expr(f"explode(sequence(PreviousTimestampRoundUp, NextTimestampRoundDown, interval {resample_interval} second)) as Timestamp"))
  # Sequence has inclusive boundaries for both start and stop. Filter out duplicate Counts if original timestamp is exactly a boundary.
  .filter("Timestamp<NextTimestamp")
  # Interpolate Counts between previous and next
  .selectExpr(
    "webID",
    "Timestamp", 
    """(unix_timestamp(Timestamp)-unix_timestamp(PreviousTimestamp))
        /(unix_timestamp(NextTimestamp)-unix_timestamp(PreviousTimestamp))
        *(NextCounts-PreviousCounts) 
        +PreviousCounts
        as Counts"""
  )
)

我最近写了一篇博文,解释了这种方法,并展示了这种方法与上面提到的Pandasudf方法相比,在大数据集上的伸缩性要好得多:https://medium.com/delaware-pro/interpolate-big-data-time-series-in-native-pyspark-d270d4b592a1

qybjjes1

qybjjes13#

这不是一个Python解决方案,但我认为下面的Scala解决方案可以在Python中使用类似的方法实现,它涉及使用lag Window函数在每行中创建一个时间范围,并使用UDF通过java.timeAPI将时间范围扩展为per-minute时间序列和插值计数的列表,然后使用Spark的explode方法将其展平:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val df = Seq(
  ("John", "2018-02-01 03:00:00", 60),
  ("John", "2018-02-01 03:03:00", 66),
  ("John", "2018-02-01 03:05:00", 70),
  ("Mo", "2017-06-04 01:07:00", 20),
  ("Mo", "2017-06-04 01:10:00", 35),
  ("Mo", "2017-06-04 01:11:00", 40)
).toDF("webID", "timestamp", "count")

val winSpec = Window.partitionBy($"webID").orderBy($"timestamp")

def minuteList(timePattern: String) = udf{ (ts1: String, ts2: String, c1: Int, c2: Int) =>
  import java.time.LocalDateTime
  import java.time.format.DateTimeFormatter

  val timeFormat = DateTimeFormatter.ofPattern(timePattern)

  val perMinTS = if (ts1 == ts2) Vector(ts1) else {
      val t1 = LocalDateTime.parse(ts1, timeFormat)
      val t2 = LocalDateTime.parse(ts2, timeFormat)
      Iterator.iterate(t1.plusMinutes(1))(_.plusMinutes(1)).takeWhile(! _.isAfter(t2)).
        map(_.format(timeFormat)).
        toVector
    }

  val sz = perMinTS.size

  val perMinCount = for { i <- 1 to sz } yield c1 + ((c2 - c1) * i / sz)

  perMinTS zip perMinCount
}

df.
  withColumn("timestampPrev", when(row_number.over(winSpec) === 1, $"timestamp").
    otherwise(lag($"timestamp", 1).over(winSpec))).
  withColumn("countPrev", when(row_number.over(winSpec) === 1, $"count").
    otherwise(lag($"count", 1).over(winSpec))).
  withColumn("minuteList",
    minuteList("yyyy-MM-dd HH:mm:ss")($"timestampPrev", $"timestamp", $"countPrev", $"count")).
  withColumn("minute", explode($"minuteList")).
  select($"webID", $"minute._1".as("timestamp"), $"minute._2".as("count")).
  show
// +-----+-------------------+-----+
// |webID|          timestamp|count|
// +-----+-------------------+-----+
// | John|2018-02-01 03:00:00|   60|
// | John|2018-02-01 03:01:00|   62|
// | John|2018-02-01 03:02:00|   64|
// | John|2018-02-01 03:03:00|   66|
// | John|2018-02-01 03:04:00|   68|
// | John|2018-02-01 03:05:00|   70|
// |   Mo|2017-06-04 01:07:00|   20|
// |   Mo|2017-06-04 01:08:00|   25|
// |   Mo|2017-06-04 01:09:00|   30|
// |   Mo|2017-06-04 01:10:00|   35|
// |   Mo|2017-06-04 01:11:00|   40|
// +-----+-------------------+-----+
kx5bkwkv

kx5bkwkv4#

我扩展了@大卫的伟大答案,并使其动态化,以便在更广泛的 Dataframe 上使用,并使其可重用。
它将组和时间列名作为输入变量:

import pyspark.sql.functions as f

resample_interval = 1  # Resample interval size in seconds
group_str = 'webID'    # name of group column
time_str = 'timestamp' # name of timestamp column

然后动态地检测 Dataframe 中存在哪些其他列,然后动态地创建选择表达式和大卫提出的插值表达式:


# extract columns to interpolate

interpol_col_str = df_data.drop(group_str, time_str).columns

# create select expression to get interpolation columns and previous measurement via window function

col_create_expr = []
for col in interpol_col_str:
  col_create_expr.extend([
  f"LAG({col}) OVER (PARTITION BY {group_str} ORDER BY {time_str} ASC) as Previous{col}",
  f"{col} as Next{col}"
  ])

# create interpolation expression for each interpolation column

interpol_expr = []
for col in interpol_col_str:
  interpol_expr.extend([
      f"""(unix_timestamp({time_str})-unix_timestamp(Previous{time_str}))
        /(unix_timestamp(Next{time_str})-unix_timestamp(Previous{time_str}))
        *(Next{col}-Previous{col}) 
        +Previous{col}
        as {col}"""]
  )

通过使用*-〉*col_create_expr*interpol_expr对列表进行解包,可以将表达式添加到插值方法中:

df_interpolated = (
  df_data
  # Get timestamp and interpolation columns of previous measurement via window function
  .selectExpr(
    f"{group_str}",
    f"LAG({time_str}) OVER (PARTITION BY {group_str} ORDER BY {time_str} ASC) as Previous{time_str}",
    f"{time_str} as Next{time_str}",
    *col_create_expr
  )
  # To determine resample interval round up start and round down end timeinterval to nearest interval boundary
  .withColumn(f"Previous{time_str}RoundUp", f.expr(f"to_timestamp(ceil(unix_timestamp(Previous{time_str})/{resample_interval})*{resample_interval})"))
  .withColumn(f"Next{time_str}RoundDown", f.expr(f"to_timestamp(floor(unix_timestamp(Next{time_str})/{resample_interval})*{resample_interval})"))
  # Make sure we don't get any negative intervals (whole interval is within resample interval)
  .filter(f"Previous{time_str}RoundUp<=Next{time_str}RoundDown")
  # Create resampled time axis by creating all "interval" timestamps between previous and next timestamp
  .withColumn(f"{time_str}", f.expr(f"explode(sequence(Previous{time_str}RoundUp, Next{time_str}RoundDown, interval {resample_interval} second)) as {time_str}"))
  # Sequence has inclusive boundaries for both start and stop. Filter out duplicate {column_str} if original timestamp is exactly a boundary.
  .filter(f"{time_str}<Next{time_str}")
  # Interpolate {column_str} between previous and next
  .selectExpr(
    f"{group_str}",
    f"{time_str}", 
    *interpol_expr
  )
)

相关问题