unix_timestamp()可以在Apache Spark中以毫秒为单位返回unix时间吗?

31moq8wy  于 2022-11-25  发布在  Apache
关注(0)|答案(6)|浏览(151)

我试图从时间戳字段中获取以毫秒(13位数)为单位的unix时间,但目前它以秒(10位数)为单位返回。

scala> var df = Seq("2017-01-18 11:00:00.000", "2017-01-18 11:00:00.123", "2017-01-18 11:00:00.882", "2017-01-18 11:00:02.432").toDF()
df: org.apache.spark.sql.DataFrame = [value: string]

scala> df = df.selectExpr("value timeString", "cast(value as timestamp) time")
df: org.apache.spark.sql.DataFrame = [timeString: string, time: timestamp]

scala> df = df.withColumn("unix_time", unix_timestamp(df("time")))
df: org.apache.spark.sql.DataFrame = [timeString: string, time: timestamp ... 1 more field]

scala> df.take(4)
res63: Array[org.apache.spark.sql.Row] = Array(
[2017-01-18 11:00:00.000,2017-01-18 11:00:00.0,1484758800], 
[2017-01-18 11:00:00.123,2017-01-18 11:00:00.123,1484758800], 
[2017-01-18 11:00:00.882,2017-01-18 11:00:00.882,1484758800], 
[2017-01-18 11:00:02.432,2017-01-18 11:00:02.432,1484758802])

即使2017-01-18 11:00:00.1232017-01-18 11:00:00.000不同,我也会得到相同的unix时间返回1484758800
我错过了什么?

gr8qqesn

gr8qqesn1#

以分数部分时间戳格式隐藏毫秒
试试看:

df = df.withColumn("time_in_milliseconds", col("time").cast("double"))

您将得到类似于1484758800.792的结果,其中792表示毫秒
至少它对我有用(Scala、Spark、Hive)

6ioyuze2

6ioyuze22#

实施Dao Thi's answer中建议的方法

import pyspark.sql.functions as F
df = spark.createDataFrame([('22-Jul-2018 04:21:18.792 UTC', ),('23-Jul-2018 04:21:25.888 UTC',)], ['TIME'])
df.show(2,False)
df.printSchema()

输出量:

+----------------------------+
|TIME                        |
+----------------------------+
|22-Jul-2018 04:21:18.792 UTC|
|23-Jul-2018 04:21:25.888 UTC|
+----------------------------+
root
|-- TIME: string (nullable = true)

正在将字符串时间格式(包括毫秒)转换为unix_timestamp(双精度)。使用子字符串方法(start_position = -7,length_of_substring=3)从字符串中提取毫秒,并将毫秒分别添加到unix_timestamp。(将子字符串转换为浮点以便添加)

df1 = df.withColumn("unix_timestamp",F.unix_timestamp(df.TIME,'dd-MMM-yyyy HH:mm:ss.SSS z') + F.substring(df.TIME,-7,3).cast('float')/1000)

在Spark中将unix_timestamp(双精度)转换为timestamp数据类型

df2 = df1.withColumn("TimestampType",F.to_timestamp(df1["unix_timestamp"]))
df2.show(n=2,truncate=False)

这将为您提供以下输出

+----------------------------+----------------+-----------------------+
|TIME                        |unix_timestamp  |TimestampType          |
+----------------------------+----------------+-----------------------+
|22-Jul-2018 04:21:18.792 UTC|1.532233278792E9|2018-07-22 04:21:18.792|
|23-Jul-2018 04:21:25.888 UTC|1.532319685888E9|2018-07-23 04:21:25.888|
+----------------------------+----------------+-----------------------+

检查架构:

df2.printSchema()

root
 |-- TIME: string (nullable = true)
 |-- unix_timestamp: double (nullable = true)
 |-- TimestampType: timestamp (nullable = true)
x6492ojm

x6492ojm3#

unix_timestamp()以秒为单位返回Unix时间戳。
时间戳中的最后3位数字与毫秒字符串(1.999sec = 1999 milliseconds)的最后3位数字相同,因此只需将时间戳字符串的最后3位数字附加到毫秒字符串的末尾。

ni65a41a

ni65a41a4#

unix_timestamp()无法实现这一点,但Spark 3.1.0提供了一个名为unix_米利斯()的内置函数:
unix_米利斯(时间戳)-返回自1970-01-01 00:00:00 UTC以来的毫秒数。截断更高级别的精度。

brtdzjyr

brtdzjyr5#

在Spark 3.0.1版之前,无法使用SQL内置函数unix_timestamp将时间戳转换为Unix时间(毫秒)。

根据Spark的DateTimeUtils上的代码
时间戳在外部显示为java.sql.Timestamp,在内部存储为longs,它们能够以微秒精度存储时间戳。
因此,如果您定义一个UDF,并将java.sql.Timestamp作为输入,则可以调用getTime以获得以毫秒为单位的Long。如果您应用unix_timestamp,则只能获得以秒为单位的精度的unix时间。

val tsConversionToLongUdf = udf((ts: java.sql.Timestamp) => ts.getTime)

将此应用于各种时间戳:

val df = Seq("2017-01-18 11:00:00.000", "2017-01-18 11:00:00.111", "2017-01-18 11:00:00.110", "2017-01-18 11:00:00.100")
  .toDF("timestampString")
  .withColumn("timestamp", to_timestamp(col("timestampString")))
  .withColumn("timestampConversionToLong", tsConversionToLongUdf(col("timestamp")))
  .withColumn("timestampUnixTimestamp", unix_timestamp(col("timestamp")))

df.printSchema()
df.show(false)

// returns
root
 |-- timestampString: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampConversionToLong: long (nullable = false)
 |-- timestampCastAsLong: long (nullable = true)

+-----------------------+-----------------------+-------------------------+-------------------+
|timestampString        |timestamp              |timestampConversionToLong|timestampUnixTimestamp|
+-----------------------+-----------------------+-------------------------+-------------------+
|2017-01-18 11:00:00.000|2017-01-18 11:00:00    |1484733600000            |1484733600         |
|2017-01-18 11:00:00.111|2017-01-18 11:00:00.111|1484733600111            |1484733600         |
|2017-01-18 11:00:00.110|2017-01-18 11:00:00.11 |1484733600110            |1484733600         |
|2017-01-18 11:00:00.100|2017-01-18 11:00:00.1  |1484733600100            |1484733600         |
+-----------------------+-----------------------+-------------------------+-------------------+
pcrecxhr

pcrecxhr6#

哇,和@@@@@一样,只是投了它

>>> df2 = df_msg.withColumn("datetime", F.col("timestamp").cast("timestamp")).withColumn("timestamp_back" , F.col("datetime").cast("double"))
>>> r = df2.rdd.take(1)[0]
>>> r.timestamp_back                                                            
1666509660.071501
>>> r.timestamp
1666509660.071501
>>> r.datetime
datetime.datetime(2022, 10, 23, 15, 21, 0, 71501)

相关问题