pandas 如何使用groupby从spark Dataframe 中提取时序json数据

ibrsph3r  于 2023-01-19  发布在  Spark
关注(0)|答案(1)|浏览(121)

我在Spark Dataframe 中有以下数据:
| 身份证|级数|
| - ------|- ------|
| 1个|{" 2016年1月31日星期一00:00:00. 000 ":空,"2016年6月30日00:00:00.000Z":6394317.0,"2016年7月31日00:00:00.000Z":6550781.0,"2016年8月31日00:00:00.000Z":七一零七三零八|
| 第二章|{" 2016年1月31日星期一00:00:00. 000 ":空,"2016年6月30日00:00:00.000Z":6394317.0)|
我想把时间序列数据提取成更适合工作的格式;例如以下格式
| 身份证|时间戳|价值|
| - ------|- ------|- ------|
| 1个|"2016年01月31日星期一00:00:00. 000Z"|二○ ○ ○年三月|
| 1个|"2016年2月31日星期一00时00分00秒"|十万三|
| 1个|"2016年2月31日星期一00时00分00秒"|零|
| 第二章|"2012年01月31日星期一00:00:00. 000Z"|六三九四三一七|
| 第二章|"2013年02月31日星期一00:00:00. 000Z"|10000317.0|
我试过df. groupby('id '),可以在panda中通过迭代groupby对象来实现。例如:

for fund_id, df_i in df.groupby('id'):

        ts = json.loads(df_i['series'].iloc[0]) # get time series
        id = df_i['id'].iloc[0] # get id

        # storing all timeseries in temp df
        df_temp = pd.DataFrame(columns=['id','date','value'])
        df_temp['value']=ts.values()
        df_temp['date']=ts.keys()
        df_temp['id'] = id

        # Finally append all df_temp

有什么想法如何在spark中做同样的事情吗?

qxgroojn

qxgroojn1#

你可以,是的,你必须跳过一些步骤把JSON字符串转换成数组,分解它,然后只在引号外的冒号(:)上拆分剩下的字符串。
抱歉,我看不懂。游戏公园方法:

jstring = """{"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}"""
df = spark.createDataFrame(
    [
        (1, jstring)
    ],
    ["id", "series"] 
    
)

from pyspark.sql.functions import regexp_replace,explode,split,trim,expr

df.select("id",regexp_replace(regexp_replace("series","\\{",""),"\\}", "").alias("s")). \
select("id",explode(split("s",",").cast("array<string>")).alias("exp_series")). \
select("id",split("exp_series",":(?=([^\"]*\"[^\"]*\")*[^\"]*$)").alias("foo")). \
select("id",trim(expr("foo[0]")).alias("a"),trim(expr("foo[1]")).alias("b")).show()

Scala方法:

scala> val jstring = """{"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}"""
    jstring: String = {"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}

     scala> val data = Seq((1,jstring))
        data: Seq[(Int, String)] = List((1,{"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}))
        
        scala> val df = data.toDF("id","series")
        df: org.apache.spark.sql.DataFrame = [id: int, series: string]
        
        scala> df.select($"id",regexp_replace(regexp_replace($"series","\\{",""),"\\}", "").alias("s")).
             | select($"id",explode(split($"s",",").cast("array<string>")).alias("exp_series")).
             | select($"id",split($"exp_series",":(?=([^\"]*\"[^\"]*\")*[^\"]*$)").alias("foo")).
             | select($"id",trim($"foo".getItem(0)).alias("a"),trim($"foo".getItem(1)).alias("b")).show(false)
        +---+--------------------------+---------+
        |id |a                         |b        |
        +---+--------------------------+---------+
        |1  |"2016-01-31T00:00:00.000Z"|null     |
        |1  |"2016-06-30T00:00:00.000Z"|6394317.0|
        |1  |"2016-07-31T00:00:00.000Z"|6550781.0|
        |1  |"2016-08-31T00:00:00.000Z"|7107308.0|
        +---+--------------------------+---------+

相关问题