我正在尝试将一个PySpark数据框列从字符串格式转换为日期格式,我参考了很多其他的问题和答案,我将显示我尝试过的每一行代码都注解掉了,并且到目前为止所有的导入,所有这些尝试都来自已解决的问题,它为其他有相同问题的人做了一些技巧。
更新1:所以我的问题显然是我需要M/dd/yyyy,但现在有一个新的错误,它只适用于前5个条目,因为之后有一个不同的格式:Fail to parse '4/2/2012' in the new parser
,这需要M/d/yyyy。有没有办法只包含M/d/yyyy和MM/dd/yyyy以及所有可能的组合?
更新2:在更改Excel中的日期格式,为所有日期添加前导零之后,如果只有一位数字,则现在包含前导零,以使其具有两位数位置并标准化所有日期,但由于某种原因,PySpark无法再次处理它。Fail to parse '03/23/12'
更新3:我现在尝试使用更复杂的函数和循环来解析日期字符串,下面是使用mapping和rdd的新代码,但是将rdd转换回dataframe不起作用,toDF()不能用于rdd:
#makes the rdd parse date strings
def cleanDates(date_time_str):
date_time_obj = datetime.datetime.strptime(date_time_str, '%m/%d/%Y')
fields = date_time_obj.split('/')
month = fields[0]
day = fields[1]
year = fields[2]
return month, day, year
amazonDates = amazon_train.select("Date")
amazonDates.show()
rdd_amazon = amazonDates.rdd.map(lambda x: cleanDates(x))
#rdd_amazon.collect() #didn't work
#rdd_amazon.sc.parallelize() #didn't work
type(rdd_amazon) #pipeline rdd
sc.toDF(rdd_amazon) #sc = spark context
#rdd_amazon = amazon_mapping.toDF() #didn't work
#rdd_amazon = rdd_amazon.flatMap(lambda x: cleanDates(x))
#amazon_train = amazon_train.withColumn('Date', rdd_amazon) #didn't work
#amazon_train = amazon_train.withColumn("Date", to_date('Date', "MM/dd/yyyy")) #didn't work
#amazon_train.show() #didn't work
#amazon_train.printSchema() #didn't work
#amazon_train.groupBy(year("date").alias('Year')).agg({'Close': 'mean'}).show() #didn't work
这些问题都不起作用:
How to change the column type from String to Date in DataFrames?
Why I get null results from date_format() PySpark function?
下面是方案和前5行:
amazon_train.printSchema()
amazon_train.show(5)
root
|-- Date: string (nullable = true)
|-- Open: double (nullable = true)
|-- High: double (nullable = true)
|-- Low: double (nullable = true)
|-- Close: double (nullable = true)
|-- Adj Close: double (nullable = true)
|-- Volume: integer (nullable = true)
+---------+----------+----------+----------+----------+----------+-------+
| Date| Open| High| Low| Close| Adj Close| Volume|
+---------+----------+----------+----------+----------+----------+-------+
|3/23/2012|192.009995|196.199997|191.800003|195.039993|195.039993|5984000|
|3/26/2012|196.479996|202.970001| 195.5|202.869995|202.869995|7613700|
|3/27/2012|203.589996|209.850006|202.880005|205.440002|205.440002|9600800|
|3/28/2012|206.139999| 207.0|200.309998|201.160004|201.160004|6245000|
|3/29/2012|201.279999|205.309998|200.630005|204.610001|204.610001|5711200|
+---------+----------+----------+----------+----------+----------+-------+
only showing top 5 rows
我尝试了下面的注解行,我希望将'Date'列转换为正式的日期数据类型,无论是通过添加一个新列还是直接转换它,实际上并不重要:
#None of these work:
amazon_train.select(col("Date"),to_date(col("Date"),"MM-dd-yyyy").alias("date")).show()
#mazon_train.select(to_date(amazon_train.Date, 'yyyy-MM-dd HH:mm:ss').alias('date')).collect()
amazon_train.withColumn("New Date",expr("to_date(Date, yyyy-MM-dd)")).show()
amazon_train.withColumn(amazon_train.select(to_timestamp("Date", 'yyyy-MM-dd'))).alias('New Date').show()
amazon_train.select("Date").show()
amazon_train.Date = to_timestamp(amazon_train.Date, 'yyyy-MM-dd').alias('New Date').collect()
amazon_train = amazon_train.withColumn('New Date', to_date(unix_timestamp(col('Date'), 'MM-dd-yyyy').cast("timestamp")))
amazon_train = amazon_train.withColumn('col_with_date_format',sf.to_date(amazon_train.Date))
amazon_train = amazon_train.withColumn("Date", amazon_train["Date"].cast(DateType()))
amazon_train.select(date_format('Date', 'MM-dd-yyy').alias('newFormat')).show()
amazon_train.select(date_format(unix_timestamp("Date", "MM-dd-yyyy").cast("timestamp"), "MM-dd-yyyy")).show()
amazon_train.withColumn('New Date', F.date_format(F.to_date('Date', "MM/dd/yyyy"),'MM-dd-yyyy')).show()
F.date_format(F.to_date(amazon_train["Date"], "MM/dd/yyyy"), "MM-dd-yyyy")
amazon_train["Date"].cast(DateType())
amazon_train = amazon_train.withColumn("New Dates", date_format(to_date(col("Date"),"MM/dd/yyyy"),"MM-dd-yyyy"))
date_format(to_date(amazon_train.Date,"MM/dd/yyyy"),"MM/dd/yyyy")
主要问题,它一直说无效:
+----+----------+----------+----------+----------+----------+-------+
|date| Open| High| Low| Close| Adj Close| Volume|
+----+----------+----------+----------+----------+----------+-------+
|null|192.009995|196.199997|191.800003|195.039993|195.039993|5984000|
|null|196.479996|202.970001| 195.5|202.869995|202.869995|7613700|
|null|203.589996|209.850006|202.880005|205.440002|205.440002|9600800|
|null|206.139999| 207.0|200.309998|201.160004|201.160004|6245000|
|null|201.279999|205.309998|200.630005|204.610001|204.610001|5711200|
+----+----------+----------+----------+----------+----------+-------+
only showing top 5 rows
root
|-- date: date (nullable = true)
|-- Open: double (nullable = true)
|-- High: double (nullable = true)
|-- Low: double (nullable = true)
|-- Close: double (nullable = true)
|-- Adj Close: double (nullable = true)
|-- Volume: integer (nullable = true)
1条答案
按热度按时间lrpiutwd1#
好吧,所以扣上牛仔,这一个得到 * 复杂 *
**步骤1和2:**首先为PySpark创建spark上下文,然后添加SQL上下文,将数据放入 Dataframe 等。
**第3步:**然后将dates列选择到新视图中
**第4步:**然后我们使用带有lambda的rddMap函数将列放入collect()的列表中,以将datetime.strptime()函数应用于列'Date'的每条记录
**第5步:**然后用spark上下文(sc)创建一个SQLContext,并使用for循环将列表放回新的 Dataframe 中
**第6步:**最后我们选择该 Dataframe 的唯一列,添加行索引,并将新的 Dataframe 与原始起始 Dataframe 合并,或者您可以只替换第一个日期的列,而不是添加它(这包括删除行索引,一旦它们合并,这些只是索引它们并合并它们)