spark无法从另一个db替换一个db的值

qybjjes1  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(267)

我得到了两个数据库a和b。a的列名为a1、a2、a3。。。b有b1、b2、b3列。。。最多b200。
a3类型是二进制的,有bi,bj,bk(总计5)在这个二进制文件中。
我需要替换bi,bj,bk。。。来自数据库b中相同列的值。条件是相同的事务日期和事务id。这两个条件是a2的两个键(这是一个Map<string,string>)。这两个条件是数据库b中的两列。
那是问题的抽象。详情如下:
数据库a的行(从数据集[row]转换的else where)正在代码中使用:

  1. private def func(
  2. row: Row,
  3. rowSchema: StructType
  4. ): Row = {
  5. val attributes = row.toSeq.zipWithIndex.collect {
  6. case (value, i) => {
  7. (rowSchema.fields(i).name, value.asInstanceOf[String])
  8. }
  9. }.toMap.asJava
  10. val result = ffunc(new JHashMap(attributes)))
  11. }

数据库b存储在s3中。。。我需要为ffunc提供属性,其中的一些值需要替换为b中的值。
我是新的Spark,但似乎Dataframe是不变的。是否可以自己创建新属性?但这样做感觉很复杂?
抱歉,我已经坚持了两个星期了,有人知道吗?谢谢您!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题