这个问题是[this one](在pyspark中将行列表保存到配置单元表)的派生问题。
编辑请看我的更新编辑在这篇文章的底部
我使用scala和现在的pyspark来完成相同的任务,但是我遇到了Dataframe到parquet或csv的保存速度非常慢,或者将Dataframe转换为列表或数组类型的数据结构的问题。下面是相关的python/pyspark代码和信息:
# Table is a List of Rows from small Hive table I loaded using
# query = "SELECT * FROM Table"
# Table = sqlContext.sql(query).collect()
for i in range(len(Table)):
rows = sqlContext.sql(qry)
val1 = Table[i][0]
val2 = Table[i][1]
count = Table[i][2]
x = 100 - count
# hivetemp is a table that I copied from Hive to my hfs using:
# create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
# INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;
query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)
rows = sqlContext.sql(query)
rows = rows.withColumn("col4", lit(10))
rows = rows.withColumn("col5", lit(some_string))
# writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
rows.saveAsParquetFile("rows"+str(i)+".parquet")
# tried this before and heck slow also
#rows_list = rows.collect()
#shuffle(rows_list)
我曾尝试在scala中执行上述操作,但也遇到了类似的问题。我可以很容易地加载配置单元表或配置单元表的查询,但是需要进行随机洗牌或存储大型Dataframe会遇到内存问题。另外,添加两个额外的列也有一些挑战。
我要添加行的配置单元表(hivetemp)有5570000~550万行和120列。
我在for循环中迭代的配置单元表有5000行和3列。有25个独特的 val1
(hivetemp中的一列),以及 val1
以及 val2
3000val2可以是5列中的一列及其特定的单元格值。这意味着,如果我调整了代码,那么我可以将要添加的行的查找从5000减少到26,但是我必须检索、存储和随机洗牌的行数将非常大,因此会出现内存问题(除非有人对此有建议)
至于我需要添加到表中的行总数可能是100000。
最终的目标是将原来的5.5mill行的表附加上100k+行,作为一个Hive或Parquet表来编写。如果更简单的话,我可以在自己的表中写入10万行,以后可以合并到550万个表中
scala或python还可以,不过scala更受欢迎。。
任何关于这方面的建议和最好的选择都会很好。
谢谢!
编辑一些关于这个问题的额外想法:我使用hash分区器将配置单元表划分为26个分区。这是基于有26个不同列的列值。我想在for循环中执行的操作可以通用化,这样就只需要在每个分区上执行。也就是说,我怎样才能在线编写scala代码来实现这一点,以及如何让一个单独的执行者在每个分区上执行这些循环呢?我想这会让事情变得更快。
我知道如何使用多线程来做这样的事情,但不知道如何在scala/spark范例中实现。
暂无答案!
目前还没有任何答案,快来回答吧!