java Spark Structured Streaming自动将时间戳转换为本地时间

kcrjzv8t  于 2023-09-29  发布在  Java
关注(0)|答案(5)|浏览(85)

我的时间戳是UTC和ISO8601,但使用结构化流,它会自动转换为本地时间。有没有办法阻止这种转变?我想要UTC的。
我从Kafka阅读json数据,然后使用from_json Spark函数解析它们。
输入:

{"Timestamp":"2015-01-01T00:00:06.222Z"}

流量:

SparkSession
  .builder()
  .master("local[*]")
  .appName("my-app")
  .getOrCreate()
  .readStream()
  .format("kafka")
  ... //some magic
  .writeStream()
  .format("console")
  .start()
  .awaitTermination();

架构:

StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField("Timestamp", DataTypes.TimestampType, true),});

输出量:

+--------------------+
|           Timestamp|
+--------------------+
|2015-01-01 01:00:...|
|2015-01-01 01:00:...|
+--------------------+

正如你所看到的,时间自己增加了。
PS:我试着用from_utc_timestamp Spark函数进行实验,但没有运气。

3zwjbxry

3zwjbxry1#

备注

这个答案 * 主要 * 在Spark < 2.2中有用。有关Spark的新版本,请参阅the answer by astro-asz
但是我们应该注意到,从Spark 2.4.0开始,spark.sql.session.timeZone不会设置user.timezonejava.util.TimeZone.getDefault)。因此,单独设置spark.sql.session.timeZone可能会导致SQL和非SQL组件使用不同时区设置的尴尬局面。
因此,我仍然建议显式设置user.timezone,即使设置了spark.sql.session.timeZone

TL;DR不幸的是,这就是Spark现在处理时间戳的方式,除了直接在epoch时间上操作,而不使用日期/时间实用程序之外,真的没有内置的替代方案。

您可以在Spark开发人员列表中进行深入的讨论:SQL TIMESTAMP semantics vs. SPARK-18350
到目前为止,我发现的最干净的解决方法是将驱动程序和执行器的-Duser.timezone设置为UTC。例如使用submit:

bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" \
                --conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC"

或通过调整配置文件(spark-defaults.conf):

spark.driver.extraJavaOptions      -Duser.timezone=UTC
spark.executor.extraJavaOptions    -Duser.timezone=UTC
lsmepo6l

lsmepo6l2#

虽然已经提供了两个非常好的答案,但我发现它们都是解决问题的重锤。我不想在整个应用程序中修改时区解析行为,也不想改变JVM的默认时区。在经历了很多痛苦之后,我找到了一个解决方案,我将在下面分享。

将time[/date]字符串解析为时间戳进行日期操作,然后正确地将结果呈现回来

首先,让我们解决如何让Spark SQL正确地将日期[/时间]字符串(给定格式)解析为时间戳,然后正确地将该时间戳呈现出来,以便它显示与原始字符串输入相同的日期[/时间]。一般方法是:

- convert a date[/time] string to time stamp [via to_timestamp]
    [ to_timestamp  seems to assume the date[/time] string represents a time relative to UTC (GMT time zone) ]
- relativize that timestamp to the timezone we are in via from_utc_timestamp

下面的测试代码实现了这种方法。我们所在的时区作为第一个参数传递给timeTricks方法。代码将输入字符串“1970-01-01”转换为localizedTimeStamp(通过from_utc_timestamp),并验证该时间戳的“valueOf”与“1970-01-01 00:00:00”相同。

object TimeTravails {
  def main(args: Array[String]): Unit = {

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._

    val spark: SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExample")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    import spark.implicits._
    import java.sql.Timestamp

    def timeTricks(timezone: String): Unit =  {
      val df2 = List("1970-01-01").toDF("timestr"). // can use to_timestamp even without time parts !
        withColumn("timestamp", to_timestamp('timestr, "yyyy-MM-dd")).
        withColumn("localizedTimestamp", from_utc_timestamp('timestamp, timezone)).
        withColumn("weekday", date_format($"localizedTimestamp", "EEEE"))
      val row = df2.first()
      println("with timezone: " + timezone)
      df2.show()
      val (timestamp, weekday) = (row.getAs[Timestamp]("localizedTimestamp"), row.getAs[String]("weekday"))

      timezone match {
        case "UTC" =>
          assert(timestamp ==  Timestamp.valueOf("1970-01-01 00:00:00")  && weekday == "Thursday")
        case "PST" | "GMT-8" | "America/Los_Angeles"  =>
          assert(timestamp ==  Timestamp.valueOf("1969-12-31 16:00:00")  && weekday == "Wednesday")
        case  "Asia/Tokyo" =>
          assert(timestamp ==  Timestamp.valueOf("1970-01-01 09:00:00")  && weekday == "Thursday")
      }
    }

    timeTricks("UTC")
    timeTricks("PST")
    timeTricks("GMT-8")
    timeTricks("Asia/Tokyo")
    timeTricks("America/Los_Angeles")
  }
}

结构化流将传入的日期[/时间]字符串解释为UTC(非本地时间)问题的解决方案

下面的代码说明了如何应用上述技巧(稍作修改),以纠正时间戳被本地时间和GMT之间的偏移量所移动的问题。

object Struct {
  import org.apache.spark.sql.SparkSession
  import org.apache.spark.sql.functions._

  def main(args: Array[String]): Unit = {

    val timezone = "PST"

    val spark: SparkSession = SparkSession.builder()
      .master("local[3]")
      .appName("SparkByExample")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val df = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", "9999")
      .load()

    import spark.implicits._

    val splitDf = df.select(split(df("value"), " ").as("arr")).
      select($"arr" (0).as("tsString"), $"arr" (1).as("count")).
      withColumn("timestamp", to_timestamp($"tsString", "yyyy-MM-dd"))
    val grouped = splitDf.groupBy(window($"timestamp", "1 day", "1 day").as("date_window")).count()

    val tunedForDisplay =
      grouped.
        withColumn("windowStart", to_utc_timestamp($"date_window.start", timezone)).
        withColumn("windowEnd", to_utc_timestamp($"date_window.end", timezone))

    tunedForDisplay.writeStream
      .format("console")
      .outputMode("update")
      .option("truncate", false)
      .start()
      .awaitTermination()
  }
}

密码需要通过套接字输入…我使用的程序'nc'(网猫)开始像这样:

nc -l 9999

然后我启动Spark程序,并向net cat提供一行输入:

1970-01-01 4

我得到的输出说明了偏移移位的问题:

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+-------------------+-------------------+
|date_window                               |count|windowStart        |windowEnd          |
+------------------------------------------+-----+-------------------+-------------------+
|[1969-12-31 16:00:00, 1970-01-01 16:00:00]|1    |1970-01-01 00:00:00|1970-01-02 00:00:00|
+------------------------------------------+-----+-------------------+-------------------+

请注意,date_window的开始和结束与输入相差8小时(因为我处于GMT-7/8时区,PST)。但是,我使用to_utc_timestamp来纠正这个偏移,以获得包含输入的一天窗口的正确开始和结束日期时间:1970-01-01 00:00:00,1970 -01-02 00:00:00。
请注意,在第一个代码块中,我们使用from_utc_timestamp,而对于结构化流解决方案,我们使用to_utc_timestamp。我还没有弄清楚在给定的情况下使用这两个中的哪一个。(如果你知道的话,请告诉我!).

3phpmpom

3phpmpom3#

另一个对我有效的解决方案是将jvm默认时区设置为您的目标时区(在您的情况下是UTC)。

TimeZone.setDefault(TimeZone.getTimeZone("UTC"));

我在将我的spark dataframe写入数据库之前添加了上述代码。

laximzn5

laximzn54#

如果会话时区不是UTC,则所有这些函数都将转换为会话时区:

from_unixtime(1695773049) 
   timestamp('1970-01-01 00:00:00.0 UTC') 
   cast('1970-01-01 00:00:00.0 UTC' as timestamp)
   current_timestamp()

将会话时区设置为UTC是避免转换的最佳解决方案,请参阅接受的答案。如果由于某种原因,您无法控制会话时区,因为它是在高级框架中设置的,并且您只编写SQL代码,那么您可以检查current_timezone并有条件地转换如下:

case when current_timezone() <> 'UTC' 
                 then to_utc_timestamp(from_unixtime(1695773049), current_timezone()) 
            else from_unixtime(1695773049)
 end as smart_conversion --smart conversion back to UTC if we are not in UTC
at0kjp5o

at0kjp5o5#

对我来说,它工作用途:

spark.conf.set("spark.sql.session.timeZone", "UTC")

它告诉spark SQL使用UTC作为时间戳的默认时区。我在Spark SQL中使用了它,例如:

select *, cast('2017-01-01 10:10:10' as timestamp) from someTable

我知道它在2.0.1中不起作用。但在Spark 2.2中工作。我在SQLTransformer中也使用了它,并且它工作了。
我不确定流。

相关问题