spark:通过将Dataframe更改为dstream来应用连接的正确方法

vlf7wbxs  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(304)

我使用pyspark,有dstream(一些用户交互)。当新的数据批到达时,我想选择数据的一个子集(并使用它来适应ml模型)并将未关闭值的一个子集保存到另一个dataframe。然后,当下一批到达时,我想将上一步中未关闭的Dataframe与到达的Dataframe连接起来,并计算transforms/groupbys/etc。
但是,有一个问题:我只能调用一次arrived_df.join(saved_df)(因为流式api在计算之前构造了dag),现在我不知道如何在新批到达时更新保存的_df并将其与arrived_df连接起来!
这个计划相当庞大而且晦涩难懂,所以有一个例子:
我们用arrived.join(saved),saved=[]来定义dag
第一批到货:到货=[1,2,3];已保存=[](保存(开始时为空)
我们选择了[1,2],并为下一步保存了3,所以saved=[3]
新批到达:到达=[4,5,6],保存=[3]
但是,join(saved)将被调用,但是在构建dag时saved是[],所以join的结果是[4,5,6],而不是我想要的[4,5,6,3]。
我的问题是:在spark dstream中保存以前批的值并在将来批到达时在下一次计算中使用它们的正确方法是什么?
目前我有两个解决方法,但我不认为这些是解决我问题的正确方法:
使用某个列名(在我的例子中是“user\u id”)调用arrived.groupby(),将每个组与保存的具有相同“user\u id”的组连接起来,然后使用这个连接的df
写一个dstream,每次新批到达时都会生成保存的数据流,然后我们可以调用dstream.join(saveddstream)
一些代码示例:

stream_reader = spark.readStream
data_stream = stream_reader.text("some_path_to_dir")

data_batch_df = df_from_stream() # some function to convert RDD to DF

# creating new DF which'll contain saved data

saved_df = spark.createDataFrame(spark.sparkContext.emptyRDD())

joined_df = data_batch_df.join(saved_df)

chosen_interacts, new_saved_df = _split_chosen_and_unchosen_data(joined_df) # returns two dataframes

# HERE I NEED TO SAVE new_saved_df AND USE IT IN JOIN WHEN NEXT BATCH ARRIVES

# BUT I CAN'T, BECAUSE data_batch_df.join(saved_df) already has link to old version

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题