pysparkDataframe列:配置单元列

8cdiaqws  于 2021-06-26  发布在  Hive
关注(0)|答案(3)|浏览(284)

我有一个Hive表,如下所示:

hive> describe stock_quote;
OK
tickerid                string                                      
tradeday                string                                      
tradetime               string                                      
openprice               string                                      
highprice               string                                      
lowprice                string                                      
closeprice              string                                      
volume                  string

spark的以下代码读取csv文件并尝试将记录插入配置单元表:

sc = spark.sparkContext
lines = sc.textFile('file:///<File Location>')
rows = lines.map(lambda line : line.split(','))
rows_map = rows.map(lambda row : Row(TickerId = row[0], TradeDay = row[1], TradeTime = row[2], OpenPrice = row[3], HighPrice = row[4], LowPrice = row[5], ClosePrice = row[6], Volume = row[7]))
rows_df = spark.createDataFrame(rows_map)
rows_df.write.mode('append').insertInto('default.stock_quote')

我面临的问题是,当我在dataframe上调用show()函数时,它会按字母顺序打印列,如下所示

|ClosePrice|HighPrice|LowPrice|OpenPrice|TickerId|TradeDay|TradeTime|Volume|

,并在表中,将closeprice(df中的第1列)的值插入tickerid(hive表中的第1列)列,highprice的值插入tradeday列,依此类推。
试图对Dataframe调用select()函数,但没有帮助。尝试将列名列表如下所示:

rows_df = spark.createDataFrame(rows_map, ["TickerId", "TradeDay", "TradeTime", "OpenPrice", "HighPrice", "LowPrice", "ClosePrice", "Volume"])

上面更改了列名的顺序,但值保持在相同的位置,这更不正确。
任何帮助都将不胜感激。

tvokkenx

tvokkenx1#

你应该和我一起去 namedtuple 而不是 Row 因为“row”试图对列名进行排序。因此,有序的列名与 default.stock_quote 请检查pyspark中的scala case类等价物是什么?更多细节
所以你应该

from collections import namedtuple

table = namedtuple('table', ['TickerId', 'TradeDay', 'TradeTime', 'OpenPrice', 'HighPrice', 'LowPrice', 'ClosePrice', 'Volume'])
rows_map = rows.map(lambda row : table(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7]))

正如@user6910411所建议的,“一个普通的元组也可以”

rows_map = rows.map(lambda row : (row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7]))
rows_df = spark.createDataFrame(rows_map, ['TickerId', 'TradeDay', 'TradeTime', 'OpenPrice', 'HighPrice', 'LowPrice', 'ClosePrice', 'Volume'])

现在 insertInto 应该有用

pjngdqdw

pjngdqdw2#

你也可以使用 saveAsTable 而不是 insertInto 从文档中:
不像 insertInto , saveAsTable 将使用列名来查找正确的列位置

yxyvkwin

yxyvkwin3#

怎么会按字母顺序排列的?csv文件就是这样的吗?
总之,我会按照以下步骤来做:
从表中选择列
根据表中的列重新排列Dataframe


# pyspark below

list_columns = spark.sql('select * from table').columns # there might be simpler way
dataframe.select(*list_columns)

相关问题