如何将PySpark Dataframe 列从字符串格式转换为日期格式

bnlyeluc  于 2022-12-17  发布在  Spark
关注(0)|答案(1)|浏览(190)

我正在尝试将一个PySpark数据框列从字符串格式转换为日期格式,我参考了很多其他的问题和答案,我将显示我尝试过的每一行代码都注解掉了,并且到目前为止所有的导入,所有这些尝试都来自已解决的问题,它为其他有相同问题的人做了一些技巧。
更新1:所以我的问题显然是我需要M/dd/yyyy,但现在有一个新的错误,它只适用于前5个条目,因为之后有一个不同的格式:Fail to parse '4/2/2012' in the new parser,这需要M/d/yyyy。有没有办法只包含M/d/yyyy和MM/dd/yyyy以及所有可能的组合?
更新2:在更改Excel中的日期格式,为所有日期添加前导零之后,如果只有一位数字,则现在包含前导零,以使其具有两位数位置并标准化所有日期,但由于某种原因,PySpark无法再次处理它。Fail to parse '03/23/12'
更新3:我现在尝试使用更复杂的函数和循环来解析日期字符串,下面是使用mapping和rdd的新代码,但是将rdd转换回dataframe不起作用,toDF()不能用于rdd:

#makes the rdd parse date strings
def cleanDates(date_time_str):
      date_time_obj = datetime.datetime.strptime(date_time_str, '%m/%d/%Y')
      fields = date_time_obj.split('/')
      month = fields[0]
      day = fields[1]
      year = fields[2]
      return month, day, year

amazonDates = amazon_train.select("Date")
amazonDates.show()

rdd_amazon = amazonDates.rdd.map(lambda x: cleanDates(x))
#rdd_amazon.collect() #didn't work
#rdd_amazon.sc.parallelize() #didn't work
type(rdd_amazon) #pipeline rdd
sc.toDF(rdd_amazon) #sc = spark context

#rdd_amazon = amazon_mapping.toDF() #didn't work

#rdd_amazon = rdd_amazon.flatMap(lambda x: cleanDates(x))

#amazon_train = amazon_train.withColumn('Date', rdd_amazon) #didn't work

#amazon_train = amazon_train.withColumn("Date", to_date('Date', "MM/dd/yyyy")) #didn't work
#amazon_train.show() #didn't work
#amazon_train.printSchema() #didn't work
#amazon_train.groupBy(year("date").alias('Year')).agg({'Close': 'mean'}).show() #didn't work

这些问题都不起作用:
How to change the column type from String to Date in DataFrames?
Why I get null results from date_format() PySpark function?
下面是方案和前5行:

amazon_train.printSchema()
amazon_train.show(5)
root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)

+---------+----------+----------+----------+----------+----------+-------+
|     Date|      Open|      High|       Low|     Close| Adj Close| Volume|
+---------+----------+----------+----------+----------+----------+-------+
|3/23/2012|192.009995|196.199997|191.800003|195.039993|195.039993|5984000|
|3/26/2012|196.479996|202.970001|     195.5|202.869995|202.869995|7613700|
|3/27/2012|203.589996|209.850006|202.880005|205.440002|205.440002|9600800|
|3/28/2012|206.139999|     207.0|200.309998|201.160004|201.160004|6245000|
|3/29/2012|201.279999|205.309998|200.630005|204.610001|204.610001|5711200|
+---------+----------+----------+----------+----------+----------+-------+
only showing top 5 rows

我尝试了下面的注解行,我希望将'Date'列转换为正式的日期数据类型,无论是通过添加一个新列还是直接转换它,实际上并不重要:

#None of these work:

amazon_train.select(col("Date"),to_date(col("Date"),"MM-dd-yyyy").alias("date")).show()
#mazon_train.select(to_date(amazon_train.Date, 'yyyy-MM-dd HH:mm:ss').alias('date')).collect()
amazon_train.withColumn("New Date",expr("to_date(Date, yyyy-MM-dd)")).show()
amazon_train.withColumn(amazon_train.select(to_timestamp("Date", 'yyyy-MM-dd'))).alias('New Date').show()
amazon_train.select("Date").show()
amazon_train.Date = to_timestamp(amazon_train.Date, 'yyyy-MM-dd').alias('New Date').collect()
amazon_train = amazon_train.withColumn('New Date', to_date(unix_timestamp(col('Date'), 'MM-dd-yyyy').cast("timestamp")))
amazon_train = amazon_train.withColumn('col_with_date_format',sf.to_date(amazon_train.Date))
amazon_train = amazon_train.withColumn("Date", amazon_train["Date"].cast(DateType()))
amazon_train.select(date_format('Date', 'MM-dd-yyy').alias('newFormat')).show()
amazon_train.select(date_format(unix_timestamp("Date", "MM-dd-yyyy").cast("timestamp"), "MM-dd-yyyy")).show()
amazon_train.withColumn('New Date', F.date_format(F.to_date('Date', "MM/dd/yyyy"),'MM-dd-yyyy')).show()
F.date_format(F.to_date(amazon_train["Date"], "MM/dd/yyyy"), "MM-dd-yyyy")
amazon_train["Date"].cast(DateType())
amazon_train = amazon_train.withColumn("New Dates", date_format(to_date(col("Date"),"MM/dd/yyyy"),"MM-dd-yyyy"))
date_format(to_date(amazon_train.Date,"MM/dd/yyyy"),"MM/dd/yyyy")

主要问题,它一直说无效:

+----+----------+----------+----------+----------+----------+-------+
|date|      Open|      High|       Low|     Close| Adj Close| Volume|
+----+----------+----------+----------+----------+----------+-------+
|null|192.009995|196.199997|191.800003|195.039993|195.039993|5984000|
|null|196.479996|202.970001|     195.5|202.869995|202.869995|7613700|
|null|203.589996|209.850006|202.880005|205.440002|205.440002|9600800|
|null|206.139999|     207.0|200.309998|201.160004|201.160004|6245000|
|null|201.279999|205.309998|200.630005|204.610001|204.610001|5711200|
+----+----------+----------+----------+----------+----------+-------+
only showing top 5 rows

root
 |-- date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
lrpiutwd

lrpiutwd1#

好吧,所以扣上牛仔,这一个得到 * 复杂 *

**步骤1和2:**首先为PySpark创建spark上下文,然后添加SQL上下文,将数据放入 Dataframe 等。

#All imports
from pyspark.sql import SparkSession
from datetime import datetime
import dateparser
from pyspark.sql import Row, SQLContext
import functools
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import concat, lit, col
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col, unix_timestamp, to_date
from pyspark.sql import functions as F

**第3步:**然后将dates列选择到新视图中

df_dates = df.select("Date")

**第4步:**然后我们使用带有lambda的rddMap函数将列放入collect()的列表中,以将datetime.strptime()函数应用于列'Date'的每条记录

df_list = df_dates.rdd.map(lambda x: (datetime.strptime(x['Date'], "%m/%d/%Y"))).collect()

**第5步:**然后用spark上下文(sc)创建一个SQLContext,并使用for循环将列表放回新的 Dataframe 中

sqlContext = SQLContext(sc)
new_df = sqlContext.createDataFrame([(item,) for item in df_list], ['Datetimes'])

**第6步:**最后我们选择该 Dataframe 的唯一列,添加行索引,并将新的 Dataframe 与原始起始 Dataframe 合并,或者您可以只替换第一个日期的列,而不是添加它(这包括删除行索引,一旦它们合并,这些只是索引它们并合并它们)

just_dates_col = new_df.select('Datetimes')

# add row_index because no common columns, df = original dataframe
df2 = just_dates_col.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df1 = df.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))

df2 = df2.join(df1, on=["row_index"]).drop("row_index")

相关问题