python处理pyspark中带分区的嵌套json文件

slmsl1lt  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(293)

我将dict数据嵌套在存储在hdfs中的json文件中(2年的每日数据),我想在pyspark中处理这些数据,日期按列划分。把它放在 hive 的table上,
我试着先分解嵌套以获得平面结构,但还没有弄清楚如何按日期进行分区,因为它嵌套、循环和动态。我的方法应该是什么?

df = spark.read.json('hdfs://namenode:9000/forex/forex_rates.json')
dfRates = df.select(explode(array(df['rates']))).toDF("rates")
dfdate=dfRates.select("rates.2018-02-22.NZD")

# Drop the duplicated rows based on the base and date columns

forex_rates = df.select('Date', 'base', 'rates_BGN', 
'rates_CNY', 'rates_NZD').dropDuplicates(['base', 'Date']).fillna(0, subset= 
['BGN', 'CNY', 'NZD'])

# Export the dataframe into the Hive table forex_rates

forex_rates.write.mode("append").insertInto("forex_rates")

先谢谢你。

sample data:

           {'rates': {
                        '2018-01-22': {'BGN': 1.9558, 'TRY': 4.6552, 'CNY': 7.8374, 'NOK': 9.6223, 'NZD': 1.6758}, 
                        '2018-01-09': {'BGN': 1.8558, 'TRY': 4.4843, 'CNY': 7.7865, 'NOK': 9.6715, 'NZD': 1.6601}
                      }, 
            'start_at': '2018-01-01', 
            'base': 'EUR', 
            'end_at': '2018-02-01'
           }
expected df structure:

+------------+------+-----------+-----------+-----------+
| Date       | Base | rates_BGN | rates_CNY | rates_NZD |
+------------+------+-----------+-----------+-----------+
| 2018-01-22 | EUR  | 1.9558    | 4.6552    | 7.8374    |
+------------+------+-----------+-----------+-----------+
| 2018-01-09 | EUR  | 1.8558    | 4.4843    | 7.7865    |
+------------+------+-----------+-----------+-----------+
| .......... | ...  | ......    | .....     | ......    |
+------------+------+-----------+-----------+-----------+
3phpmpom

3phpmpom1#

这段代码可能对你有帮助,输入json,

{'rates': {
                '2018-01-22': {'BGN': 1.9558, 'TRY': 4.6552, 'CNY': 7.8374, 'NOK': 9.6223, 'NZD': 1.6758}, 
                '2018-01-09': {'BGN': 1.9558, 'TRY': 4.4843, 'CNY': 7.7865, 'NOK': 9.6715, 'NZD': 1.6601}
              }, 
    'start_at': '2018-01-01', 
    'base': 'EUR', 
    'end_at': '2018-02-01'
   }

代码

from pyspark.sql.functions import *

df=spark.read.option("multiline","true").json("file:///home/sathya/test-datasets/forex_rates.json")

base=df.select("base").rdd.collect()[0].asDict()["base"]
start_at=df.select("start_at").rdd.collect()[0].asDict()["start_at"]
end_at=df.select("end_at").rdd.collect()[0].asDict()["end_at"]

df2=df.select("rates.*")

# python 2

stack_characteristics = str(len(df2.columns))+','+','.join(["'{}',`{}`".format(v,v) for v in df2.columns])

df2.select(expr('''stack({})'''.format(stack_characteristics)).alias('date','vals')).select('date', 'vals.*').withColumn("base",lit(base)).withColumn("start_at",lit(start_at)).withColumn("end_at",lit(end_at)).show()

# python3

# stack_characteristics = str(len(df.columns))+','+','.join([f"'{v}',`{v}`" for v in df.columns])

# df.select(expr(f'''stack({stack_characteristics})''').alias('date','vals')).select('date', 'vals.*').withColumn("base",lit(base)).withColumn("start_at",lit(start_at)).withColumn("end_at",lit(end_at)).show()

'''
+----------+------+------+------+------+------+----+----------+----------+
|      date|   BGN|   CNY|   NOK|   NZD|   TRY|base|  start_at|    end_at|
+----------+------+------+------+------+------+----+----------+----------+
|2018-01-09|1.9558|7.7865|9.6715|1.6601|4.4843| EUR|2018-01-01|2018-02-01|
|2018-01-22|1.9558|7.8374|9.6223|1.6758|4.6552| EUR|2018-01-01|2018-02-01|
+----------+------+------+------+------+------+----+----------+----------+
'''

相关问题