如何创建包含从Pyspark中的另一列生成的导联值的新列?

whhtz7ly  于 2023-01-01  发布在  Spark
关注(0)|答案(1)|浏览(177)

以下代码拉低每日油价(dcoilwtico),将每日数据重新采样为每月数据,计算12个月(即年同比百分比)变化,最后包含一个循环,将年同比百分比变化向前移动1个月(dcoilwtico_1)、2个月(dcoilwtico_2),一直移动到12个月(dcoilwtico_12)作为新列:

  1. import pandas_datareader as pdr
  2. start = datetime.datetime (2016, 1, 1)
  3. end = datetime.datetime (2022, 12, 1)
  4. #1. Get historic data
  5. df_fred_daily = pdr.DataReader(['DCOILWTICO'],'fred', start, end).dropna().resample('M').mean() # Pull daily, remove NaN and collapse from daily to monthly
  6. df_fred_daily.columns= df_fred_daily.columns.str.lower()
  7. #2. Expand df range: index, column names
  8. index_fred = pd.date_range('2022-12-31', periods=13, freq='M')
  9. columns_fred_daily = df_fred_daily.columns.to_list()
  10. #3. Append history + empty df
  11. df_fred_daily_forecast = pd.DataFrame(index=index_fred, columns=columns_fred_daily)
  12. df_fred_test_daily=pd.concat([df_fred_daily, df_fred_daily_forecast])
  13. #4. New df, calculate yoy percent change for each commodity
  14. df_fred_test_daily_yoy= ((df_fred_test_daily - df_fred_test_daily.shift(12))/df_fred_test_daily.shift(12))*100
  15. #5. Extend each variable as a series from 1 to 12 months
  16. for col in df_fred_test_daily_yoy.columns:
  17. for i in range(1,13):
  18. df_fred_test_daily_yoy["%s_%s"%(col,i)] = df_fred_test_daily_yoy[col].shift(i)
  19. df_fred_test_daily_yoy.tail(18)

并生成以下df:

问:我的真实的示例包含数百列,我希望使用Pyspark生成这些相同的结果。
用Pyspark怎么编码呢?

xam8gpfp

xam8gpfp1#

由于你的代码已经准备好了,我会用考拉,“一个PandasSpark版”,你只需要安装https://pypi.org/project/koalas/
参见简单示例

  1. import databricks.koalas as ks
  2. import pandas as pd
  3. pdf = pd.DataFrame({'x':range(3), 'y':['a','b','b'], 'z':['a','b','b']})
  4. # Create a Koalas DataFrame from pandas DataFrame
  5. df = ks.from_pandas(pdf)
  6. # Rename the columns
  7. df.columns = ['x', 'y', 'z1']
  8. # Do some operations in place:
  9. df['x2'] = df.x * df.x

相关问题