将scala中的两个Dataframe与不具有精确值的列连接起来

gmol1639  于 2021-06-25  发布在  Hive
关注(0)|答案(3)|浏览(420)

我尝试将两个Dataframe合并到一个值不完全相同的列中。
下面给出的是df1

  1. +--------+-----+------+
  2. | NUM_ID | TIME|SG1_V |
  3. +--------+-----+------+
  4. |XXXXX01 |1001 |79.0 |
  5. |XXXXX01 |1005 |88.0 |
  6. |XXXXX01 |1010 |99.0 |
  7. |XXXXX01 |1015 |null |
  8. |XXXXX01 |1020 |100.0 |
  9. |XXXXX02 |1001 |81.0 |
  10. |XXXXX02 |1010 |91.0 |
  11. |XXXXX02 |1050 |93.0 |
  12. |XXXXX02 |1060 |93.0 |
  13. |XXXXX02 |1070 |93.0 |
  14. +--------+-----+------+

下面是df2

  1. +---------+-----+------+
  2. | NUM_ID | TIME|SG2_V |
  3. +---------+-----+------+
  4. |XXXXX01 |1001 | 99.0|
  5. |XXXXX01 |1003 | 22.0|
  6. |XXXXX01 |1007 | 85.0|
  7. |XXXXX01 |1011 | 1.0 |
  8. |XXXXX02 |1001 | 22.0|
  9. |XXXXX02 |1009 | 85.0|
  10. |XXXXX02 |1048 | 1.0 |
  11. |XXXXX02 |1052 | 99.0|
  12. +---------+-----+------+

我必须在num\u id列和time列上连接这两个df,前者应该完全相同,后者可能/可能不完全相同。
df2中的时间可能/可能不包含df1中的精确值。如果值不精确,那么我必须使用可用的最高最近值(即df2中的列值应为=<df1中的精确值)联接。
在看了下面所示的预期输出之后,会更加清楚。

  1. +--------+-----+------+-----+------+
  2. | NUM_ID | TIME|SG1_V | TIME|SG2_V |
  3. +--------+-----+------+-----+------+
  4. |XXXXX01 |1001 |79.0 |1001 | 99.0|
  5. |XXXXX01 |1005 |88.0 |1003 | 22.0|
  6. |XXXXX01 |1010 |99.0 |1007 | 85.0|
  7. |XXXXX01 |1015 |null |1011 | 1.0 |
  8. |XXXXX01 |1020 |100.0 |1011 | 1.0 |
  9. |XXXXX02 |1001 |81.0 |1001 | 22.0|
  10. |XXXXX02 |1010 |91.0 |1009 | 85.0|
  11. |XXXXX02 |1050 |93.0 |1048 | 1.0 |
  12. |XXXXX02 |1060 |93.0 |1052 | 99.0|
  13. |XXXXX02 |1070 |93.0 |1052 | 99.0|
  14. +--------+-----+------+-----+------+

对于num_id xx 01,df1中的时间(1005)在df2中不可用,因此它采用小于1005的最近值(1003)。
如何以这样的方式连接,如果没有精确的值,则使用最近的值连接。
感谢任何线索。提前谢谢。

monwx1rj

monwx1rj1#

简单的方法是使用spark的一个窗口函数row\u number()或rank():

  1. scala> spark.sql("""
  2. | SELECT * FROM (
  3. | SELECT *,
  4. | ROW_NUMBER() OVER (PARTITION BY df1.NUM_ID, df1.TIME ORDER BY (df1.TIME - df2.TIME)) rno
  5. | FROM df1 JOIN df2
  6. | ON df2.NUM_ID = df1.NUM_ID AND
  7. | df2.TIME <= df1.TIME
  8. | ) T
  9. | WHERE T.rno = 1
  10. |""").show()
  11. +-------+----+-----+-------+----+-----+---+
  12. | NUM_ID|TIME|SG1_V| NUM_ID|TIME|SG2_V|rno|
  13. +-------+----+-----+-------+----+-----+---+
  14. |XXXXX01|1001| 79.0|XXXXX01|1001| 99.0| 1|
  15. |XXXXX01|1005| 88.0|XXXXX01|1003| 22.0| 1|
  16. |XXXXX01|1010| 99.0|XXXXX01|1007| 85.0| 1|
  17. |XXXXX01|1015| null|XXXXX01|1011| 1.0| 1|
  18. |XXXXX01|1020|100.0|XXXXX01|1011| 1.0| 1|
  19. |XXXXX02|1001| 81.0|XXXXX02|1001| 22.0| 1|
  20. |XXXXX02|1010| 91.0|XXXXX02|1009| 85.0| 1|
  21. +-------+----+-----+-------+----+-----+---+
  22. scala>
展开查看全部
eqzww0vc

eqzww0vc2#

如果需要使用两个字段和其中一个字段的特定间隔进行连接,可以执行以下操作:

  1. import org.apache.spark.sql.functions.when
  2. val spark = SparkSession.builder().master("local[1]").getOrCreate()
  3. val df1 : DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("XXXXX01",1001,79.0),
  4. Row("XXXXX01",1005,88.0),
  5. Row("XXXXX01",1010,99.0),
  6. Row("XXXXX01",1015, null),
  7. Row("XXXXX01",1020,100.0),
  8. Row("XXXXX02",1001,81.0))),
  9. StructType(Seq(StructField("NUM_ID", StringType, false), StructField("TIME", IntegerType, false), StructField("SG1_V", DoubleType, true))))
  10. val df2 : DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("XXXXX01",1001,79.0),
  11. Row("XXXXX01",1001, 99.0),
  12. Row("XXXXX01",1003, 22.0),
  13. Row("XXXXX01",1007, 85.1),
  14. Row("XXXXX01",1011, 1.0),
  15. Row("XXXXX02",1001,22.0))),
  16. StructType(Seq(StructField("NUM_ID", StringType, false), StructField("TIME", IntegerType, false), StructField("SG1_V", DoubleType, false))))
  17. val interval : Int = 10
  18. def main(args: Array[String]) : Unit = {
  19. df1.join(df2, ((df1("TIME")) - df2("TIME") > lit(interval)) && df1("NUM_ID") === df2("NUM_ID")).show()
  20. }

结果如下:

  1. +-------+----+-----+-------+----+-----+
  2. | NUM_ID|TIME|SG1_V| NUM_ID|TIME|SG1_V|
  3. +-------+----+-----+-------+----+-----+
  4. |XXXXX01|1015| null|XXXXX01|1001| 79.0|
  5. |XXXXX01|1015| null|XXXXX01|1001| 99.0|
  6. |XXXXX01|1015| null|XXXXX01|1003| 22.0|
  7. |XXXXX01|1020|100.0|XXXXX01|1001| 79.0|
  8. |XXXXX01|1020|100.0|XXXXX01|1001| 99.0|
  9. |XXXXX01|1020|100.0|XXXXX01|1003| 22.0|
  10. |XXXXX01|1020|100.0|XXXXX01|1007| 85.1|
  11. +-------+----+-----+-------+----+-----+
展开查看全部
ycggw6v2

ycggw6v23#

上面的解决方案是在将Dataframe保存到配置单元表之后连接Dataframe。
我尝试通过应用相同的逻辑连接两个Dataframe而不保存到配置单元表中,如下所示。

  1. val finalSignals = finalABC.as("df1").join(finalXYZ.as("df2"), $"df1.NUM_ID" === $"df2.NUM_ID" && $"df2.TIME" <= $"df1.TIME", "left").withColumn("rno", row_number.over(Window.partitionBy($"df1.NUM_ID", $"df1.TIME").orderBy($"df1.TIME" - $"df2.TIME"))).select(col("df1.NUM_ID").as("NUM_ID"),col("df1.TIME"),col("df2.NUM_ID").as("NUM_ID2"),col("df1.TIME").as("TIME2"),
  2. col("rno")).filter("rno == 1")

这是否等同于上述提供的解决方案

  1. spark.sql("""
  2. | SELECT * FROM (
  3. | SELECT *,
  4. | ROW_NUMBER() OVER (PARTITION BY df1.NUM_ID, df1.TIME ORDER BY (df1.TIME - df2.TIME)) rno
  5. | FROM df1 JOIN df2
  6. | ON df2.NUM_ID = df1.NUM_ID AND
  7. | df2.TIME <= df1.TIME
  8. | ) T
  9. | WHERE T.rno = 1
  10. |""")
展开查看全部

相关问题