我正在创建一个空的Dataframe,然后尝试向它附加另一个Dataframe。事实上,我想根据RDD的数量动态地将许多Dataframe附加到最初的空Dataframe。
如果我将值赋给第三个Dataframe中的另一个,union()函数就可以正常工作。
val df3=df1.union(df2)
但是我想继续附加到我创建的初始Dataframe(空),因为我想将所有RDD存储在一个Dataframe中。但是,下面的代码没有显示正确的计数。似乎它只是没有附加
df1.union(df2)
df1.count() // this shows 0 although df2 has some data and that is shown if I assign to third datafram.
如果我做了下面的操作(因为df1是val,所以我得到了重新分配错误),如果我把它改成var类型,我得到了kafka多线程不安全错误。
df1=d1.union(df2)
知道如何将所有动态创建的Dataframe添加到一个初始创建的Dataframe中吗?
2条答案
按热度按时间s8vozzvw1#
DataFrames
而其他分布式数据结构是不可变的,因此对其进行操作的方法总是返回新的对象。没有附加,没有修改到位,没有ALTER TABLE
等价物。如果我把它改成var类型,就会得到kafka多线程不安全错误。
没有实际的代码是不可能给你一个明确的答案,但它不太可能与
union
代码。有许多已知的spark错误是由不正确的内部实现引起的(spark-19185、spark-23623仅列举了一些)。
nnt7mjpx2#
不知道这是不是你要找的!