sparkDataframe中的scala序列

mlmc2os5  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(520)

我在spark中有Dataframe。看起来像这样:

+-------+----------+-------+
|  value|     group|     ts|
+-------+----------+-------+
|      A|         X|      1|
|      B|         X|      2|
|      B|         X|      3|
|      D|         X|      4|
|      E|         X|      5|
|      A|         Y|      1|
|      C|         Y|      2|
+-------+----------+-------+

我想知道有多少个序列 A-B-E (序列只是后续行的列表)有。添加了一个约束,序列的后续部分可以是最大的 n 排成一排。让我们考虑一下这个例子 n 是2。
考虑组 X . 在这种情况下,正好有1 D 介于 B 以及 E (连续多次) B 已忽略)。这意味着 B 以及 E 是一行分开,因此有一个序列 A-B-E 我考虑过使用 collect_list() ,创建一个字符串(如dna)并使用regex进行子字符串搜索。但我想知道是否有一种更优雅的分布式方式,也许是使用窗口函数?
编辑:
请注意,提供的Dataframe只是一个示例。真正的Dataframe(以及组)可以是任意长的。

hjzp0vay

hjzp0vay1#

编辑以回答@tim的评论+修复类型为“aabe”的模式
是的,使用窗口函数有帮助,但是我创建了一个 id 订购:

val df = List(
  (1,"A","X",1),
  (2,"B","X",2),
  (3,"B","X",3),
  (4,"D","X",4),
  (5,"E","X",5),
  (6,"A","Y",1),
  (7,"C","Y",2)
).toDF("id","value","group","ts")

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('group).orderBy('id)

然后lag将收集所需的内容,但是需要一个函数来生成 Column 表达式(请注意拆分以消除“aabe”的重复计数。警告:这将拒绝“abaexx”类型的模式:

def createSeq(m:Int) = split(
  concat(
    (1 to 2*m)
      .map(i => coalesce(lag('value,-i).over(w),lit("")))
  :_*),"A")(0)

val m=2
val tmp = df
  .withColumn("seq",createSeq(m))

+---+-----+-----+---+----+
| id|value|group| ts| seq|
+---+-----+-----+---+----+
|  6|    A|    Y|  1|   C|
|  7|    C|    Y|  2|    |
|  1|    A|    X|  1|BBDE|
|  2|    B|    X|  2| BDE|
|  3|    B|    X|  3|  DE|
|  4|    D|    X|  4|   E|
|  5|    E|    X|  5|    |
+---+-----+-----+---+----+

因为 Column api,使用udf完全避免regex要容易得多

def patternInSeq(m: Int) = udf((str: String) => {
  var notFound = str
    .split("B")
    .filter(_.contains("E"))
    .filter(_.indexOf("E") <= m)
    .isEmpty
  !notFound
})

val res = tmp
  .filter(('value === "A") && (locate("B",'seq) > 0))
  .filter(locate("B",'seq) <= m && (locate("E",'seq) > 1))
  .filter(patternInSeq(m)('seq))
  .groupBy('group)
  .count
res.show

+-----+-----+
|group|count|
+-----+-----+
|    X|    1|
+-----+-----+

概括(超出范围)

如果你想把它概括为一系列较长的字母,这个问题就必须概括。它可能是微不足道的,但在这种情况下,类型(“abae”)的模式应该被拒绝(参见注解)。因此,最简单的概括方法是在下面的实现中使用成对规则(我添加了一个组“z”来说明这个算法的行为)

val df = List(
  (1,"A","X",1),
  (2,"B","X",2),
  (3,"B","X",3),
  (4,"D","X",4),
  (5,"E","X",5),
  (6,"A","Y",1),
  (7,"C","Y",2),
  ( 8,"A","Z",1),
  ( 9,"B","Z",2),
  (10,"D","Z",3),
  (11,"B","Z",4),
  (12,"E","Z",5)
).toDF("id","value","group","ts")

首先我们定义一对的逻辑

import org.apache.spark.sql.DataFrame
def createSeq(m:Int) = array((0 to 2*m).map(i => coalesce(lag('value,-i).over(w),lit(""))):_*)
def filterPairUdf(m: Int, t: (String,String)) = udf((ar: Array[String]) => {
  val (a,b) = t
  val foundAt = ar
    .dropWhile(_ != a)
    .takeWhile(_ != a)
    .indexOf(b)
  foundAt != -1 && foundAt <= m
})

然后,我们定义一个函数,将这个逻辑迭代地应用于Dataframe

def filterSeq(seq: List[String], m: Int)(df: DataFrame): DataFrame = {
  var a = seq(0)
  seq.tail.foldLeft(df){(df: DataFrame, b: String) => {
    val res  = df.filter(filterPairUdf(m,(a,b))('seq))
    a = b
    res
  }}
}

由于我们首先对从第一个字符开始的序列进行滤波,所以得到了简化和优化

val m = 2
val tmp = df
  .filter('value === "A") // reduce problem
  .withColumn("seq",createSeq(m))

scala> tmp.show()
+---+-----+-----+---+---------------+
| id|value|group| ts|            seq|
+---+-----+-----+---+---------------+
|  6|    A|    Y|  1|   [A, C, , , ]|
|  8|    A|    Z|  1|[A, B, D, B, E]|
|  1|    A|    X|  1|[A, B, B, D, E]|
+---+-----+-----+---+---------------+

val res = tmp.transform(filterSeq(List("A","B","E"),m))

scala> res.show()
+---+-----+-----+---+---------------+
| id|value|group| ts|            seq|
+---+-----+-----+---+---------------+
|  1|    A|    X|  1|[A, B, B, D, E]|
+---+-----+-----+---+---------------+

( transform 是一层简单的糖衣 DataFrame => DataFrame (转换)

res
  .groupBy('group)
  .count
  .show

+-----+-----+
|group|count|
+-----+-----+
|    X|    1|
+-----+-----+

正如我所说的,在扫描序列时,有不同的方法来概括“重置规则”,但是这个例子希望能帮助实现更复杂的规则。

相关问题