scala spark中的凝聚udf

c0vxltue  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(420)

我已经写了以下代码,这是工作良好。但是我想连接udf,这样代码就可以压缩成几行。请建议我怎么做。下面是我写的代码。

val myUdf1 = udf((Number: Long) => ((Number) >> 24) & 255)
val myUdf2 = udf((Number: Long) => ((Number) >> 16) & 255)
val myUdf3 = udf((Number: Long) => ((Number) >> 8) & 255)
val myUdf4 = udf((Number: Long) => (Number) & 255)

val df=Data.withColumn("bitwise 1", myUdf1(Data("Ip")))
  .withColumn("bitwise 2", myUdf2(Data("Ip")))
  .withColumn("bitwise 3", myUdf3(Data("Ip")))
  .withColumn("bitwise 4", myUdf4(Data("Ip")))

val FinalDF =  df.withColumn("FinalIp",concat(col("bitwise 1"),lit("."),col("bitwise 2"),lit("."),col("bitwise 3"),lit("."),col("bitwise 4")))
.drop("bitwise 1").drop("bitwise 2").drop("bitwise 3").drop("bitwise 4")
yfjy0ee7

yfjy0ee71#

正如@someshwar kale所建议的,你可以不用udf。
如果您选择使用自定义项,则可以将自定义项中的函数抽象出来,并连接到一个函数

scala> Data.show
+---+
| ip|
+---+
| 10|
| 20|
+---+

scala> val a:Seq[(Int, Int)] = Seq((24, 255), (16,255), (8, 255),(0,255))
a: Seq[(Int, Int)] = List((24,255), (16,255), (8,255), (0,255))

scala> val myUdf = udf((number: Long) => (a.map((t:(Int, Int)) => (number >> t._1) & t._2).mkString(".")))
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(LongType)))

scala> Data.withColumn("finalIp", myUdf($"ip")).show
+---+--------+
| ip| finalIp|
+---+--------+
| 10|0.0.0.10|
| 20|0.0.0.20|
+---+--------+
35g0bw71

35g0bw712#

我认为,这是可以实现的没有自定义项-

有自定义项

val Data = spark.range(2).withColumn("Ip", lit(10))
    val myUdf1 = udf((Number: Long) => ((Number) >> 24) & 255)
    val myUdf2 = udf((Number: Long) => ((Number) >> 16) & 255)
    val myUdf3 = udf((Number: Long) => ((Number) >> 8) & 255)
    val myUdf4 = udf((Number: Long) => (Number) & 255)

    val df=Data.withColumn("bitwise 1", myUdf1(Data("Ip")))
      .withColumn("bitwise 2", myUdf2(Data("Ip")))
      .withColumn("bitwise 3", myUdf3(Data("Ip")))
      .withColumn("bitwise 4", myUdf4(Data("Ip")))

    val FinalDF =  df.withColumn("FinalIp",concat(col("bitwise 1"),lit("."),col("bitwise 2"),lit("."),col("bitwise 3"),lit("."),col("bitwise 4")))
      .drop("bitwise 1").drop("bitwise 2").drop("bitwise 3").drop("bitwise 4")
    FinalDF.show(false)

    /**
      * +---+---+--------+
      * |id |Ip |FinalIp |
      * +---+---+--------+
      * |0  |10 |0.0.0.10|
      * |1  |10 |0.0.0.10|
      * +---+---+--------+
      */

无自定义项

spark.range(2).withColumn("Ip", lit(10))
      .withColumn("FinalIp",
        concat_ws(".", expr("shiftRight(Ip, 24) & 255"), expr("shiftRight(Ip, 16) & 255"),
          expr("shiftRight(Ip, 8) & 255"), expr("Ip & 255"))
      ).show(false)

    /**
      * +---+---+--------+
      * |id |Ip |FinalIp |
      * +---+---+--------+
      * |0  |10 |0.0.0.10|
      * |1  |10 |0.0.0.10|
      * +---+---+--------+
      */

相关问题