增量添加到配置单元表w/scala+spark 1.3

nvbavucw  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(318)

我们的集群有spark 1.3和hive有一个很大的hive表,我需要向其中添加随机选择的行。有一个较小的表,我读取并检查一个条件,如果该条件为真,那么我获取需要的变量,然后查询要填充的随机行。我所做的就是在这个条件下做一个查询, table.where(value<number) ,然后使用 take(num rows) . 然后,由于所有这些行都包含需要从大型配置单元表中获取哪些随机行的信息,因此我遍历数组。
当我使用 ORDER BY RAND() 在查询中(使用 sqlContext ). 我创造了一个 var Hive table (可变)从较大的表中添加一列。在循环中,我做了一个 newHiveTable = newHiveTable.unionAll(random_rows) 我尝试了很多不同的方法来做到这一点,但不知道什么是最好的方式来避免cpu和临时磁盘的使用。我知道Dataframe不适用于增量添加。我现在要做的一件事是创建一个cvs文件,在循环中以增量方式将随机行写入该文件,然后在循环完成后,将cvs文件作为一个表加载,然后执行一个unionall操作以获得最终的表。
任何反馈都会很好。谢谢

dfuffjeb

dfuffjeb1#

我建议您使用配置单元创建一个外部表,定义位置,然后让spark将输出作为csv写入该目录:
在Hive中:

create external table test(key string, value string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ';'
LOCATION '/SOME/HDFS/LOCATION'

然后在斯帕克的帮助下https://github.com/databricks/spark-csv ,将Dataframe写入csv文件并附加到现有文件:

df.write.format("com.databricks.spark.csv").save("/SOME/HDFS/LOCATION/", SaveMode.Append)

相关问题