sparkDataframe在写入表后变空

to94eoyn  于 2021-05-31  发布在  Hadoop
关注(0)|答案(0)|浏览(308)

在将Dataframe写入配置单元表之后,我对Dataframe的行为有一些担心。
上下文:我通过运行sparkscala(版本2.2.0.2.6.4.105-1)作业 spark-submit 在我的生产环境中,有hadoop2。我进行多次计算,并将一些中间数据存储到hive orc表中;在存储一个表之后,我需要重新使用dataframe来计算一个新的dataframe以存储在另一个hive orc表中。
例如。:

// dataframe with ~10 million record
val df = prev_df.filter(some_filters)

val df_temp_table_name = "temp_table"
val df_table_name = "table"

sql("SET hive.exec.dynamic.partition = true")
sql("SET hive.exec.dynamic.partition.mode = nonstrict")
df.createOrReplaceTempView(df_temp_table_name)
sql(s"""INSERT OVERWRITE TABLE $df_table_name PARTITION(partition_timestamp)  
      SELECT * FROM  $df_temp_table_name """)

这些步骤总是有效的,并且表中正确地填充了正确的数据和分区。
在这之后,我需要使用刚刚计算的Dataframe( df )更新另一个表。所以我查询要更新到dataframe的表 df2 ,然后我加入 dfdf2 ,联接的结果需要覆盖 df2 (一个普通的、没有分区的表)。

val table_name_to_be_updated = "table2"

// Query the table to be updated
val df2 = sql(table_name_to_be_updated)

val df3 = df.join(df2).filter(some_filters).withColumn(something)

val temp = "temp_table2"

df3.createOrReplaceTempView(temp)
sql(s"""INSERT OVERWRITE TABLE $table_name_to_be_updated   
      SELECT * FROM  $temp """)

在这一点上, df3 总是发现空的,因此生成的配置单元表也总是空的。当我 .persist() 把它留在记忆里是很困难的。
测试时使用 spark-shell ,我从未遇到过这个问题。只有在中计划流时才会发生这种情况 cluster-mode 在oozie下面。
你觉得问题出在哪里?对于如何有效利用内存来解决这样的问题,您有什么建议吗?
我不知道是不是第一次 df 如果问题是因为我首先查询然后试图覆盖同一个表,那么在写入表之后,它将变为空。
提前非常感谢您,祝您度过愉快的一天!
编辑:
以前, df 在单个脚本中计算,然后插入到相应的表中。在第二个脚本中,该表被查询到一个新变量中 df ; 然后还查询要更新的表并将其存储到变量中 old_df2 比如说。然后将这两个变量连接起来,并在一个新变量中进行计算 df3 ,然后用overwrite插入到要更新的表中。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题