apachespark根据列的不同值计算列值

6vl6ewon  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(298)

我正在处理下表,我想根据其他两列的不同值计算一个新列(结果)。

| id1  | id2 | outcome
|  1   |  1  |  1
|  1   |  1  |  1
|  1   |  3  |  2
|  2   |  5  |  1 
|  3   |  1  |  1  
|  3   |  2  |  2
|  3   |  3  |  3

结果应该以增量顺序开始,从1开始,基于 id1 以及 id2 . 任何关于如何在scala中实现这一点的提示。 row_number 在这种情况下似乎没有用。
这里的逻辑是 id1 我们将用min开始计算结果( id2 )对应的 id1 赋值为1。

djmepvbi

djmepvbi1#

你可以试试密级()
以你为例

val df = sqlContext
        .read
        .option("sep","|")
        .option("header", true)
        .option("inferSchema",true)
        .csv("/home/cloudera/files/tests/ids.csv") // Here we read the .csv files
        .cache()

      df.show()
      df.printSchema()

      df.createOrReplaceTempView("table")
      sqlContext.sql(
        """
          |SELECT id1, id2, DENSE_RANK() OVER(PARTITION BY id1 ORDER BY id2) AS outcome
          |FROM table
          |""".stripMargin).show()

输出

+---+---+-------+
|id1|id2|outcome|
+---+---+-------+
|  2|  5|      1|
|  1|  1|      1|
|  1|  1|      1|
|  1|  3|      2|
|  3|  1|      1|
|  3|  2|      2|
|  3|  3|      3|
+---+---+-------+
guz6ccqo

guz6ccqo2#

使用 Window 俱乐部职能( partition )他们 first id 然后 order 每个 partition 基于 second id .
现在你只需要分配一个等级( dense_rank )在每个 Window 分区。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

df
.withColumn("outcome", dense_rank().over(Window.partitionBy("id1").orderBy("id2")))

相关问题