sparkDataframe生成

hs1rzwqc  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(446)

我有两个Dataframe: dataframe1 有70000行,如:

  1. location_id, location, flag
  2. 1,Canada,active
  3. 2,Paris,active
  4. 3,London,active
  5. 4,Berlin,active

第二测向 lookup 已修改每个位置的ID(此Dataframe会不时修改),例如:

  1. id,location
  2. 1,Canada
  3. 10,Paris
  4. 4,Berlin
  5. 3,London

我的问题是,我需要把新的身份证作为 lookup 如果 location_id 不同于 id 然后,对于每个位置,保持标志名称为非活动的对应位置的旧id(以维护历史数据)和标志名称为活动的新id。因此,配置单元中的输出表应该如下所示:

  1. location_id,location,flag
  2. 1,Canada,active
  3. 2,Paris,inactive
  4. 10,Paris,active
  5. 3,London,active
  6. 4,Berlin,active

我试着先加入两个框架。然后在加入df时,我正在执行操作,以将所有记录保存在配置单元中。我尝试了以下操作:

  1. val joinedFrame = dataframe1.join(lookup, "location")
  2. val df_temp = joinedFrame.withColumn("flag1", when($"tag_id" === $"tag_number", "active").otherwise("inactive"))
  3. var count = 1
  4. df_temp.foreach(x => {
  5. val flag1 = x.getAs[String]("flag1").toString
  6. val flag = x.getAs[String]("flag").toString
  7. val location_id = x.getAs[String]("location_id").toString
  8. val location = x.getAs[String]("location").toString
  9. val id = x.getAs[String]("id").toString
  10. if ((count != 1)&&(flag1 != flag)){
  11. println("------not equal-------",flag1,"-------",flag,"---------",id,"---------",location,"--------",location_id)
  12. val df_main = sc.parallelize(Seq((location_id, location,flag1), (id, location, flag))).toDF("location_id", "location", "flag")
  13. df_main.show
  14. df_main.write.insertInto("location_coords")
  15. }
  16. count += 1
  17. })

它打印具有不同ID的位置值,但在将这些值保存为Dataframe时,我遇到了一个异常:

  1. not equal------inactive------active---10---------Paris---------2
  2. 17/09/29 03:43:29 ERROR Executor: Exception in task 0.0 in stage 25.0 (TID 45)
  3. java.lang.NullPointerException
  4. at $line83.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:75)
  5. at $line83.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:65)
  6. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  7. at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  8. at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
  9. at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
  10. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
  11. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
  12. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  13. at org.apache.spark.scheduler.Task.run(Task.scala:99)
  14. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
  15. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  16. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  17. at java.lang.Thread.run(Thread.java:748)
  18. 17/09/29 03:43:29 WARN TaskSetManager: Lost task 0.0 in stage 25.0 (TID 45, localhost, executor driver): java.lang.NullPointerException
  19. at $line83.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:75)
  20. at $line83.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:65)
  21. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  22. at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  23. at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
  24. at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
  25. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
  26. at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
  27. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  28. at org.apache.spark.scheduler.Task.run(Task.scala:99)
  29. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
  30. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  31. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  32. at java.lang.Thread.run(Thread.java:748)
c7rzv4ha

c7rzv4ha1#

根据你的意见,我认为最简单的方法是 join 而是在身份证上。在执行外部联接时,缺少的列将以null结束,这些行是您感兴趣的已更新行。
在这之后,剩下的就是更新location列(如果它是空的)以及flag列,请参见下面的代码(注意,我稍微更改了列名):

  1. val spark = SparkSession.builder.getOrCreate()
  2. import spark.implicits._
  3. val df = Seq((1,"Canada","active"),(2,"Paris","active"),(3,"London","active"),(4,"Berlin","active"))
  4. .toDF("id", "location", "flag")
  5. val df2 = Seq((1,"Canada"),(10,"Paris"),(4,"Berlin"),(3,"London"))
  6. .toDF("id", "location_new")
  7. val df3 = df.join(df2, Seq("id"), "outer")
  8. .filter($"location".isNull or $"location_new".isNull)
  9. .withColumn("location", when($"location_new".isNull, $"location").otherwise($"location_new"))
  10. .withColumn("flag", when($"location" === $"location_new", "active").otherwise("inactive"))
  11. .drop("location_new")
  12. > df3.show()
  13. +---+--------+--------+
  14. | id|location| flag|
  15. +---+--------+--------+
  16. | 10| Paris| active|
  17. | 2| Paris|inactive|
  18. +---+--------+--------+

在此之后,您可以使用这个新的Dataframe来更新配置单元表。

展开查看全部

相关问题