scala 从带有别名的spark Dataframe 中获取值

q5iwbnjs  于 2023-08-05  发布在  Scala
关注(0)|答案(1)|浏览(113)

我加入了表,结果如下:

+----+----+----+----+----+----+----+----+
| Pk1| Pk2|Col2|Col3| Pk1| Pk3|Col1|Col3|
+----+----+----+----+----+----+----+----+
|   a|null|   1|   d|null|null|null|null|
|null|null|null|null|   a|null|   1|   g|
|   b|   1|   2|   c|   b|   1|null|null|
|   c|   1|null|   o|   c|   1|   3|   s|
|   d|   1|   4|   n|   d|   1|null|   t|
|   d|   1|   4|   n|   d|   1|   4|   n|
|   e|   1|   4|   i|null|null|null|null|
|null|null|null|null|   z|   1|null|   i|
+----+----+----+----+----+----+----+----+

 val join = source.alias("s").join(target.alias("t"), source(Pk1) === target(Pk1) && source(Pk2) === target(Pk3), "full")

join.collect().head.getAs[Any]("s.Pk1")

字符串
我想获取具有重复列名的值,但别名对我不起作用。我还可以做什么来获得值?
错误:

s.Pk1 does not exist. Available: Pk1, Pk2, Col2, Col3, Pk1, Pk3, Col1, Col3

bq3bfh9z

bq3bfh9z1#

如何使用源的pk1的计划输出:

val source = sourceSeq.toDS()
    val target = targetSeq.toDS()
    val joined = source.join(target, source("Pk1") === target("Pk1") && source("Pk2") === target("Pk3"), "full")

    val found =
      for {
        sourceFound <- joined.queryExecution.analyzed.output.find { at => at.name == "Pk1" }
        joinedFound <- joined.queryExecution.analyzed.output.zipWithIndex.find { case (at, _) => at == sourceFound }
        ind = joinedFound._2
      } yield ind

    val sourcePk1 = joined.collect().head.getAs[Any](found.get)
    println(sourcePk1)

字符串
这样做是因为AttributeReferences在计划中是唯一的,它们只是在输出中被简单的名称丢弃。alias函数只在使用sql连接时有帮助,在代码中,您直接使用源数据集来访问列,因此使用它是正确的。
至于这是不是个好主意。queryExecution的内容和可访问性在不同版本之间可能会有所不同,它并不完全是一个用户界面。您最好直接重命名字段:

def prefixFields[T](ds: Dataset[T], prefix: String): DataFrame = {
      val newNames = ds.schema.fieldNames.map(o => col(o).as(prefix + o))
      ds.select(newNames : _*)
    }

    // prefixing with s. _may_ work, but .'s have specific meaning
    val source = prefixFields(sourceSeq.toDS(), "source_")
    val target = targetSeq.toDS()
    val joined = source.join(target, col("source_Pk1") === target("Pk1") && col("source_Pk2") === target("Pk3"), "full")

    joined.show()
    val sourcePk1 = joined.collect().head.getAs[Any]("source_Pk1")
    println(sourcePk1)


这是较少的内部魔法和一个有点清晰的意图。

相关问题