collect()将timestamp列(UTC)转换为pyspark中的本地时区(IST)

vfhzx4xs  于 2023-05-16  发布在  Spark
关注(0)|答案(2)|浏览(179)

spark从MySQL中读取一个表,其中有一个存储UTC时区值的时间戳列。Spark在本地(IST)配置。MySQL在时间戳值下面存储。

spark.conf.set("spark.sql.session.timeZone" , "UTC")
df.show(100,False)

使用上面的配置文件后,我可以看到正确的记录与df.show()。稍后df.rdd.collect()将这些值转换回IST时区。

for row in df.rdd.collect():
    print("row.Mindate ",row.Mindate)
    

row.Mindate 2021-03-02 19:30:31
row.Mindate 2021-04-01 14:05:03
row.Mindate 2021-06-15 11:39:40
row.Mindate 2021-07-07 18:14:17
row.Mindate 2021-08-03 10:48:51
row.Mindate 2021-10-06 10:21:11

SparkDataFrame和DF.rdd显示了不同结果集。即使在"spark.sql.session.timeZone" , "UTC"之后,它如何将值更改回本地时区。
先谢谢你了
编辑1:
def.printSchema()

root
 |-- Mindate: timestamp (nullable = true)
 |-- Maxdate: timestamp (nullable = true)
h9a6wy2h

h9a6wy2h1#

解决方案

确保Spark时区(spark.sql.session.timeZone)设置为与Python时区(TZ环境变量)相同的时区。Spark将在调用DataFrame.collect()时在两者之间转换。您可以按如下方式执行此操作:

import os
import time

# change Python timezone
os.environ["TZ"] = "UTC"
time.tzset()

# change Spark timezone
spark.conf.set("spark.sql.session.timeZone", "UTC")

问题详情

我遇到过完全相同的问题,每当我从Spark收集DataFrame到Python时,都会转换时间戳。我写了下面这个简单的测试来确认行为:

def test_timezone_string_to_timestamp_utc(spark):
    spark.conf.set("spark.sql.session.timeZone", "UTC")
    df = spark.createDataFrame([("1970-01-01 10:00:00",)], "ts STRING").withColumn("ts", f.col("ts").cast("timestamp"))
    assert df.collect()[0]["ts"] == datetime.datetime(1970, 1, 1, 10)

失败,返回消息datetime.datetime(1970, 1, 1, 11, 0) != datetime.datetime(1970, 1, 1, 10, 0)。当DataFrame从Spark TimeStampType收集到Python的datetime.datetime时,时间戳从UTC转换为我的系统默认值(CET)。我不知道为什么会这样,但事情就是这样...正如@Kashyap也指出的那样,我试着查看了源代码,但真的不明白为什么会发生这种情况。
Python时区默认为系统默认时区,可以通过TZ系统变量进行配置。参见Python文档
因此,只要Python的时区和Spark的时区设置之间存在差异,就会发生转换。我们可以如下检查:

>>> spark.conf.get("spark.sql.session.timeZone")
'UTC'
>>> import time
>>> time.tzname
('CET', 'CEST')
gcuhipw9

gcuhipw92#

TL;DR
collect()将timestamp列(UTC)转换为pyspark中的本地时区(IST)
不,不是的。事实上,你读取的 Dataframe 中的时间戳没有时区。您看到的只是show()基于会话本地时区的行为。

TimestampType类型的列中存储datetime.datetime值时时区信息丢失

如所述in the docs
日期时间类型

  • 时间戳类型:表示由年、月、日、小时、分钟和秒字段的值组成的值,并使用会话本地时区。timestamp值表示绝对时间点。

正如你在代码中看到的,TimestampType是python datetime.datetime的 Package 器,但是它去掉了时区,并在内部将其存储为epoch time

class TimestampType(AtomicType, metaclass=DataTypeSingleton):
    """Timestamp (datetime.datetime) data type.
    """

    def needConversion(self):
        return True

    def toInternal(self, dt):
        if dt is not None:
            seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
                       else time.mktime(dt.timetuple()))
            return int(seconds) * 1000000 + dt.microsecond

    def fromInternal(self, ts):
        if ts is not None:
            # using int to avoid precision loss in float
            return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)

更多示例代码:

from typing import Union
from pyspark.sql.types import TimestampType, StringType
from datetime import datetime
from pyspark.sql import DataFrame, functions as F

def to_str(val: Union[str, datetime]) -> str:
    type_str = f'{type(val).__name__}:'
    if isinstance(val, str):
        return type_str + val
    else:
        return f'{type_str}{val.isoformat()} tz:{val.tzinfo}'

def print_df_info(df: DataFrame):
    df.show(truncate=False)
    for row in df.collect():
        log('DF :', ','.join([to_str(cell) for cell in row]))
    for row in df.rdd.collect():
        log('RDD:', ','.join([to_str(cell) for cell in row]))

spark.conf.set("spark.sql.session.timeZone", "UTC")
timestamps = ['2021-04-01 10:00:00 -05:00']
timestamp_data = [{'col_original_str': s} for s in timestamps]

my_df = spark.createDataFrame(timestamp_data)
# 1. col_original_str -> col_to_timestamp (convert to UTC and stored WITHOUT timezone)
my_df = my_df.withColumn('col_to_timestamp', F.to_timestamp(my_df.col_original_str))
# 2. col_to_timestamp -> col_date_format (convert an Epoch time (which has no timezone) to string)
my_df = my_df.withColumn('col_date_format', F.date_format(my_df.col_to_timestamp, "yyyy-MM-dd HH:mm:ss.SSSXXX"))
# This is really confusing.
# 3. col_to_timestamp -> col_to_utc_timestamp (tell pyspark to interpret col_to_timestamp with
#                                              timezone Asia/Kolkata, and convert it to UTC)
my_df = my_df.withColumn('col_reinterpret_tz', F.to_utc_timestamp(my_df.col_to_timestamp, 'Asia/Kolkata'))

my_df.printSchema()

log('#################################################')
log('df with session.timeZone set to UTC')
spark.conf.set("spark.sql.session.timeZone", "UTC")
print_df_info(my_df)

log('#################################################')
log('df with session.timeZone set to Asia/Kolkata')
spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")
print_df_info(my_df)

输出中的注解:

  1. DF :RDD :(参见来自print_df_info()的日志)具有完全相同的内容。它们是相同数据上的不同立面。
    1.更改spark.sql.session.timeZone对“内部表示”没有影响。参见print_df_info()的日志。
    1.更改spark.sql.session.timeZone将更改show()打印timestamp类型值的方式。
2021-11-08T12:16:22.817 spark.version: 3.0.3
root
 |-- col_original_str: string (nullable = true)
 |-- col_to_timestamp: timestamp (nullable = true)
 |-- col_date_format: string (nullable = true)
 |-- col_reinterpret_tz: timestamp (nullable = true)

2021-11-08T13:57:54.243 #################################################
2021-11-08T13:57:54.244 df with session.timeZone set to UTC

+--------------------------+-------------------+------------------------+-------------------+
|col_original_str          |col_to_timestamp   |col_date_format         |col_reinterpret_tz |
+--------------------------+-------------------+------------------------+-------------------+
|2021-04-01 10:00:00 -05:00|2021-04-01 15:00:00|2021-04-01 15:00:00.000Z|2021-04-01 09:30:00|
+--------------------------+-------------------+------------------------+-------------------+

2021-11-08T13:57:54.506 DF : str:2021-04-01 10:00:00 -05:00,datetime:2021-04-01T10:00:00 tz:None,str:2021-04-01 15:00:00.000Z,datetime:2021-04-01T04:30:00 tz:None
2021-11-08T13:57:54.569 RDD: str:2021-04-01 10:00:00 -05:00,datetime:2021-04-01T10:00:00 tz:None,str:2021-04-01 15:00:00.000Z,datetime:2021-04-01T04:30:00 tz:None

2021-11-08T13:57:54.570 #################################################
2021-11-08T13:57:54.570 df with session.timeZone set to Asia/Kolkata

+--------------------------+-------------------+------------------------+-------------------+
|col_original_str          |col_to_timestamp   |col_date_format         |col_reinterpret_tz |
+--------------------------+-------------------+------------------------+-------------------+
|2021-04-01 10:00:00 -05:00|2021-04-01 20:30:00|2021-04-01 15:00:00.000Z|2021-04-01 15:00:00|
+--------------------------+-------------------+------------------------+-------------------+

2021-11-08T13:57:54.828 DF : str:2021-04-01 10:00:00 -05:00,datetime:2021-04-01T10:00:00 tz:None,str:2021-04-01 15:00:00.000Z,datetime:2021-04-01T04:30:00 tz:None
2021-11-08T13:57:54.916 RDD: str:2021-04-01 10:00:00 -05:00,datetime:2021-04-01T10:00:00 tz:None,str:2021-04-01 15:00:00.000Z,datetime:2021-04-01T04:30:00 tz:None

一些参考文献:

相关问题