我将dict数据嵌套在存储在hdfs中的json文件中(2年的每日数据),我想在pyspark中处理这些数据,日期按列划分。把它放在 hive 的table上,
我试着先分解嵌套以获得平面结构,但还没有弄清楚如何按日期进行分区,因为它嵌套、循环和动态。我的方法应该是什么?
df = spark.read.json('hdfs://namenode:9000/forex/forex_rates.json')
dfRates = df.select(explode(array(df['rates']))).toDF("rates")
dfdate=dfRates.select("rates.2018-02-22.NZD")
# Drop the duplicated rows based on the base and date columns
forex_rates = df.select('Date', 'base', 'rates_BGN',
'rates_CNY', 'rates_NZD').dropDuplicates(['base', 'Date']).fillna(0, subset=
['BGN', 'CNY', 'NZD'])
# Export the dataframe into the Hive table forex_rates
forex_rates.write.mode("append").insertInto("forex_rates")
先谢谢你。
sample data:
{'rates': {
'2018-01-22': {'BGN': 1.9558, 'TRY': 4.6552, 'CNY': 7.8374, 'NOK': 9.6223, 'NZD': 1.6758},
'2018-01-09': {'BGN': 1.8558, 'TRY': 4.4843, 'CNY': 7.7865, 'NOK': 9.6715, 'NZD': 1.6601}
},
'start_at': '2018-01-01',
'base': 'EUR',
'end_at': '2018-02-01'
}
expected df structure:
+------------+------+-----------+-----------+-----------+
| Date | Base | rates_BGN | rates_CNY | rates_NZD |
+------------+------+-----------+-----------+-----------+
| 2018-01-22 | EUR | 1.9558 | 4.6552 | 7.8374 |
+------------+------+-----------+-----------+-----------+
| 2018-01-09 | EUR | 1.8558 | 4.4843 | 7.7865 |
+------------+------+-----------+-----------+-----------+
| .......... | ... | ...... | ..... | ...... |
+------------+------+-----------+-----------+-----------+
1条答案
按热度按时间3phpmpom1#
这段代码可能对你有帮助,输入json,
代码