我得到了两个数据库a和b。a的列名为a1、a2、a3。。。b有b1、b2、b3列。。。最多b200。
a3类型是二进制的,有bi,bj,bk(总计5)在这个二进制文件中。
我需要替换bi,bj,bk。。。来自数据库b中相同列的值。条件是相同的事务日期和事务id。这两个条件是a2的两个键(这是一个Map<string,string>)。这两个条件是数据库b中的两列。
那是问题的抽象。详情如下:
数据库a的行(从数据集[row]转换的else where)正在代码中使用:
private def func(
row: Row,
rowSchema: StructType
): Row = {
val attributes = row.toSeq.zipWithIndex.collect {
case (value, i) => {
(rowSchema.fields(i).name, value.asInstanceOf[String])
}
}.toMap.asJava
val result = ffunc(new JHashMap(attributes)))
}
数据库b存储在s3中。。。我需要为ffunc提供属性,其中的一些值需要替换为b中的值。
我是新的Spark,但似乎Dataframe是不变的。是否可以自己创建新属性?但这样做感觉很复杂?
抱歉,我已经坚持了两个星期了,有人知道吗?谢谢您!
暂无答案!
目前还没有任何答案,快来回答吧!