使用pyspark对多个文件进行排序

2lpgd968  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(538)

我有股票数据(超过6000+股,100+gb)保存为hdf5文件。
基本上,我正在尝试将这个pandas代码翻译成pyspark。理想情况下,我希望有用于排名的值,以及排名本身保存到一个文件。

  1. agg_df = pd.DataFrame()
  2. for stock in stocks:
  3. df = pd.read_csv(stock)
  4. df = my_func(df) # custom function, output of which will be used for ranking. For simplicity, can use standard deviation
  5. agg_df = pd.concat([agg_df, df], ax=1) # row-wise concat
  6. agg_df.rank() #.to_csv() <- would like to save ranks for future use

每个数据文件都有相同的架构,如:

  1. Symbol Open High Low Close Volume
  2. DateTime
  3. 2010-09-13 09:30:00 A 29.23 29.25 29.17 29.25 17667.0
  4. 2010-09-13 09:31:00 A 29.26 29.34 29.25 29.33 5000.0
  5. 2010-09-13 09:32:00 A 29.31 29.36 29.31 29.36 600.0
  6. 2010-09-13 09:33:00 A 29.33 29.36 29.30 29.35 7300.0
  7. 2010-09-13 09:34:00 A 29.35 29.39 29.31 29.39 3222.0

期望输出(其中每个数字是一个秩):

  1. A AAPL MSFT ...etc
  2. DateTime
  3. 2010-09-13 09:30:00 1 3 7 ...
  4. 2010-09-13 09:31:00 4 5 7 ...
  5. 2010-09-13 09:32:00 24 17 99 ...
  6. 2010-09-13 09:33:00 7 63 42 ...
  7. 2010-09-13 09:34:00 5 4 13 ...

我阅读了关于window和pyspark.sql的其他答案,但不知道如何将它们应用到我的案例中,因为我需要在排名前按行汇总这些答案(至少在pandas中是这样)
编辑1:在我将数据读取到rdd之后 rdd = sc.parallelize(data.keys).map(data.read_data) ,rdd变为pipelinerdd,它没有.select()方法。0xdfdf的示例包含一个Dataframe中的所有数据,但我不认为将所有内容附加到一个Dataframe中来进行计算是个好主意。
结果:终于解决了。有两个问题:读取文件和执行计算。
关于读取文件,我最初使用 rdd = sc.parallelize(data.keys).map(data.read_data) 这就产生了pipelinerdd,这是一个Dataframe的集合。这些需要转换为sparkDataframe,以便解决方案能够工作。最后我把我的hdf5文件转换成parquet,并把它们保存到一个单独的文件夹中。然后使用

  1. sqlContext = pyspark.sql.SQLContext(sc)
  2. rdd_p = sqlContext.read.parquet(r"D:\parq")

将所有文件读取到一个Dataframe。
然后根据接受的答案进行计算。非常感谢0xdff的帮助
额外费用:
讨论-https://chat.stackoverflow.com/rooms/214307/discussion-between-biarys-and-0xdfdfdfdf
0xDFDF解决方案-https://gist.github.com/0xdfdfdfdf/a93a7e444803f606008c7422784d1

x7yiwoj4

x7yiwoj41#

事实上,windows函数可以做到这一点。我已经创建了一个小的模拟数据集,它应该类似于您的。

  1. columns = ['DateTime', 'Symbol', 'Open', 'High', 'Low', 'Close', 'Volume']
  2. data = [('2010-09-13 09:30:00','A',29.23,29.25,29.17,29.25,17667.0),
  3. ('2010-09-13 09:31:00','A',29.26,29.34,29.25,29.33,5000.0),
  4. ('2010-09-13 09:32:00','A',29.31,29.36,29.31,29.36,600.0),
  5. ('2010-09-13 09:34:00','A',29.35,29.39,29.31,29.39,3222.0),
  6. ('2010-09-13 09:30:00','AAPL',39.23,39.25,39.17,39.25,37667.0),
  7. ('2010-09-13 09:31:00','AAPL',39.26,39.34,39.25,39.33,3000.0),
  8. ('2010-09-13 09:32:00','AAPL',39.31,39.36,39.31,39.36,300.0),
  9. ('2010-09-13 09:33:00','AAPL',39.33,39.36,39.30,39.35,3300.0),
  10. ('2010-09-13 09:34:00','AAPL',39.35,39.39,39.31,39.39,4222.0),
  11. ('2010-09-13 09:34:00','MSFT',39.35,39.39,39.31,39.39,7222.0)]
  12. df = spark.createDataFrame(data, columns)

现在, df.show() 会给我们这个:

  1. +-------------------+------+-----+-----+-----+-----+-------+
  2. | DateTime|Symbol| Open| High| Low|Close| Volume|
  3. +-------------------+------+-----+-----+-----+-----+-------+
  4. |2010-09-13 09:30:00| A|29.23|29.25|29.17|29.25|17667.0|
  5. |2010-09-13 09:31:00| A|29.26|29.34|29.25|29.33| 5000.0|
  6. |2010-09-13 09:32:00| A|29.31|29.36|29.31|29.36| 600.0|
  7. |2010-09-13 09:34:00| A|29.35|29.39|29.31|29.39| 3222.0|
  8. |2010-09-13 09:30:00| AAPL|39.23|39.25|39.17|39.25|37667.0|
  9. |2010-09-13 09:31:00| AAPL|39.26|39.34|39.25|39.33| 3000.0|
  10. |2010-09-13 09:32:00| AAPL|39.31|39.36|39.31|39.36| 300.0|
  11. |2010-09-13 09:33:00| AAPL|39.33|39.36| 39.3|39.35| 3300.0|
  12. |2010-09-13 09:34:00| AAPL|39.35|39.39|39.31|39.39| 4222.0|
  13. |2010-09-13 09:34:00| MSFT|39.35|39.39|39.31|39.39| 7222.0|
  14. +-------------------+------+-----+-----+-----+-----+-------+

下面是解决方案,它使用前面提到的窗口函数 rank() . 需要一些转换,您可以使用 pivot() 功能。

  1. from pyspark.sql.window import Window
  2. import pyspark.sql.functions as f
  3. result = (df
  4. .select(
  5. 'DateTime',
  6. 'Symbol',
  7. f.rank().over(Window().partitionBy('DateTime').orderBy('Volume')).alias('rank')
  8. )
  9. .groupby('DateTime')
  10. .pivot('Symbol')
  11. .agg(f.first('rank'))
  12. .orderBy('DateTime')
  13. )

通过呼叫 result.show() 您将得到:

  1. +-------------------+----+----+----+
  2. | DateTime| A|AAPL|MSFT|
  3. +-------------------+----+----+----+
  4. |2010-09-13 09:30:00| 1| 2|null|
  5. |2010-09-13 09:31:00| 2| 1|null|
  6. |2010-09-13 09:32:00| 2| 1|null|
  7. |2010-09-13 09:33:00|null| 1|null|
  8. |2010-09-13 09:34:00| 1| 2| 3|
  9. +-------------------+----+----+----+

确保你了解 rank() , dense_rank() 以及 row_number() 函数,因为当它们在给定的窗口中遇到相等的数时,它们的行为不同-您可以在这里找到解释。

展开查看全部

相关问题