sparkDataframe似乎重新计算了两次

whitzsjs  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(366)

解决:我解决了这个问题,这是由于在流程的第一个通道中的一个非常愚蠢、愚蠢、愚蠢的错误。基本上,我是在计算一个写进配置单元表的Dataframe;然后需要使用这个Dataframe来创建 temporaryDF 在许多段落之后,我最初是从头开始查询表,而不是使用Dataframe的副本写入表中。错误在于,刚刚计算的Dataframe丢失了之前的分区(由于流的特定逻辑),而下一次的计算需要创建 temporaryDF 还需要至少两个以前的分区。我不知道为什么,我不记得什么时候,我决定缓存刚刚计算的一个,从而丢失信息并在oozie下得到一个空分区(在spark shell中,我总是使用至少三个分区,因为一段时间后手动更新表-每个新分区每15分钟出现一次)。我可能是在深夜工作冲刺和我的大脑认为这是值得搞砸了。
我投了赞成票,接受了“蓝色幻影”的回答,因为他在我描述的具体情况下是非常正确的。
原文:我有一个奇怪的行为使用 Spark-Submit hadoop2中的sparkv.2.2.0.2.6.4.105-1(使用scala)在oozie工作流下与使用 Spark-Shell .
我有一个配置单元表,其中包含每15分钟跟踪一次某些进程的记录。每次新记录或“旧”记录仍满足感兴趣进程的逻辑时,表都会被覆盖。
我通过一个专栏来记录这些记录的年代 times_investigated ,范围从1到9。
我创建了一个临时Dataframe,我们称之为 temporayDF ,它同时包含旧条目和新条目(两种类型都需要存在才能运行有用的计算)。这个 temporayDF 然后根据 $"times_investigated" === 1 以及 $"times_investigated > 1" (或 =!= 1 ). 然后,处理后的条目与 union 在最后的Dataframe中,然后将其写入原始配置单元表。

// Before, I run the query on the 'old' Hive table and the logic over old and new entries.
// I now have a temporary dataframe
val temporaryDF = previousOtherDF
                  .withColumn("original_col_new", conditions)
                  .withColumn("original_other_col_new", otherConditions)
                  .withColumn("times_investigated_new", nvl($"times_investigated" + 1, 1))
                  .select(
                    previousColumns,
                    $"original_col_new".as("original_col"),
                    $"original_other_col_new".as("original_other_col"),
                    $"times_investigated_new".as("times_investigated"))
                    .cache

// Now I need to split the temporayDF in 2 to run some other logic on the new entries.
val newEntriesDF = temporaryDF
                    .filter($"times_investigated" === 1)
                    .join(neededDF, conditions, "leftouter")
                    .join(otherNeededDF, conditions, "leftouter")
                    .groupBy(cols)
                    .agg(min(colOne),
                         max(colTwo),
                         min(colThree),
                         max(colFour))
                    .withColumn("original_col_five_new",
                                when(conditions).otherwise(somethingElse))
                    .withColumn("original_col_six_new",
                                when(conditions).otherwise(somethingElse)) 
                    .select(orderedColumns)

val oldEntriesDF = temporaryDF.filter($"times_investigated" > 1)

val finalTableDF = oldEntriesDF.union(newEntriesDF)

// Now I write the table
finalTableDF.createOrReplaceTempView(tempFinalTableDF)
sql("""INSERT OVERWRITE TABLE  $finalTableDF 
       SELECT * FROM  tempFinalTableDF """)

// I would then need to re-use the newly-computed table to process further information...

问题是:配置单元表没有为新条目提供 times_investigated = 1. 它只处理旧的条目,因此,在一个条目可以停留在表中的9次之后,它就完全空了。
我在sparkshell中运行了一些测试,在许多次迭代中,一切都运行得很好,甚至从shell手动编写配置单元表也在配置单元表中产生了预期的结果,但是当我在oozie下启动工作流时,奇怪的行为又出现了。
我在sparkshell中注意到,在编写了Hive表之后,如果我去计算 temporaryDF.show() ,新条目将更新为 $"times_investigated" = 2!
我试着复制 temporaryDF 使用新条目和旧条目处理不同的Dataframe copyOfTemporaryDF 写入配置单元表后更新。
似乎这种重新计算是在oozie下编写配置单元表之前发生的。
我知道我可以用一种不同的方式来计算操作,但是如果可能的话,我需要在当前流上找到一个快速的临时解决方案。
最重要的是,我很想了解引擎盖下正在发生的事情,以避免自己以后陷入这种情况。
你们有什么线索和/或建议吗?
我尝试缓存中间Dataframe,但没有成功。
p、 很抱歉可能是糟糕的编码实践
编辑。更多背景: temporaryDF 来自其他中间Dataframe,仅用于一次计算感兴趣的Dataframe。最后一段 temporaryDFwithColumn 操作,在哪里 $"times_investigated" 使用自定义 nvl 函数(其工作方式与sql函数完全相同),并且在旧版本的流中从未出现问题(参见下面的段落)。
edit2:我还尝试在一个长链序列中合并对新条目和旧条目的操作,以便 temopraryDF 实际上是要写入配置单元表中的最终Dataframe,但是 times_investigated =1仍然没有考虑(但是我没有问题) Spark-Shell 以及 .show 将Dataframe写入表后进行重新计算,因此所调查的时间为+1)。

nvbavucw

nvbavucw1#

使用.cache,否则将得到重新计算。如果rdd或df要在单Spark应用程序中多次使用,那么您应该为适当的Dataframe或rdd执行此操作—甚至不依赖于操作,有时您会得到“跳过的阶段”。

val temporaryDF = previousOperations...cache()

2 VAL使用temporarydf,而不缓存重新计算,正如您所看到的,它们可能会给出不同的结果。应该缓存的。
当然,如果一个工作进程死亡,或者分区被逐出,则需要重新计算。
.cache对于大于可用群集内存的数据集可能不理想。被逐出的每个分区都将从源代码处重建,这是一件代价高昂的事情。
另外,使用适当的分区和迭代几次可能比持久化/缓存更好;但这要视情况而定。

相关问题