将数据附加到空Dataframe

7nbnzgx9  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(402)

我正在创建一个空的Dataframe,然后尝试向它附加另一个Dataframe。事实上,我想根据RDD的数量动态地将许多Dataframe附加到最初的空Dataframe。
如果我将值赋给第三个Dataframe中的另一个,union()函数就可以正常工作。

val df3=df1.union(df2)

但是我想继续附加到我创建的初始Dataframe(空),因为我想将所有RDD存储在一个Dataframe中。但是,下面的代码没有显示正确的计数。似乎它只是没有附加

df1.union(df2)

df1.count() // this shows 0 although df2 has some data and that is shown if I assign to third datafram.

如果我做了下面的操作(因为df1是val,所以我得到了重新分配错误),如果我把它改成var类型,我得到了kafka多线程不安全错误。

df1=d1.union(df2)

知道如何将所有动态创建的Dataframe添加到一个初始创建的Dataframe中吗?

s8vozzvw

s8vozzvw1#

DataFrames 而其他分布式数据结构是不可变的,因此对其进行操作的方法总是返回新的对象。没有附加,没有修改到位,没有 ALTER TABLE 等价物。
如果我把它改成var类型,就会得到kafka多线程不安全错误。
没有实际的代码是不可能给你一个明确的答案,但它不太可能与 union 代码。
有许多已知的spark错误是由不正确的内部实现引起的(spark-19185、spark-23623仅列举了一些)。

nnt7mjpx

nnt7mjpx2#

不知道这是不是你要找的!


# Import pyspark functions

from pyspark.sql.types import StructType, StructField, IntegerType, StringType 

# Define your schema

field = [StructField("Col1",StringType(), True), StructField("Col2", IntegerType(), True)]
schema = StructType(field)

# Your empty data frame

df = spark.createDataFrame(sc.emptyRDD(), schema)

l = []

for i in range(5):
    # Build and append to the list dynamically
    l = l + [([str(i), i])]

    # Create a temporary data frame similar to your original schema
    temp_df = spark.createDataFrame(l, schema)

    # Do the union with the original data frame
    df = df.union(temp_df)
df.show()

相关问题