pyspark timestamptype()提供错误的转换:valueerror:year 52129超出范围

7ivaypg9  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(363)

我正在通过kafka从mongodb获得一个pyspark中具有timestamp值的集合。在mongodb中,模式如下:

"Timestamp": {
        "$date": "2020-02-28T11:24:28.810Z"
    }

在pyspark中,我使用以下模式:

StructType([...
         StructField("Timestamp",StructType([StructField("$date",TimestampType(),True)]), True), \
         ...

我使用from_json()解析json字符串:

data_stream_clean = data_stream_after \
                .select(from_json(col("json_string"), self.schema) \
                .alias("detail")) \
                .select("detail.*") \
                .withColumn("Timestamp", col("Timestamp").getField("$date"))

然后我创建一个tempview来访问这些列,它显示:

+---+--------------------+
| Id|           Timestamp|
+---+--------------------+
|231|52129-10-04 10:00...

这是2020-02-28t11:24:28.810z的错误转换。我无法将其转换为显示以下错误的df:

ValueError: year 52129 is out of range

我还使用了unix\u timestamp(),它显示了正确的转换,即1582889068810,但使用int数据类型。不过,我想在时间戳我的数据。

afdcj2ne

afdcj2ne1#

我试着以josn的形式阅读您的示例输入,它在scala中运行良好。你能告诉我你是怎么加载Dataframe或者场景的吗?

代码

val spark = sqlContext.sparkSession
    val implicits = spark.implicits
    import implicits._
    import org.apache.spark.sql.catalyst.ScalaReflection

    val data =
      """
        |{
        |   "Timestamp": {
        |       "$date": "2020-02-28T11:24:28.810Z"
        |   }
        |}
      """.stripMargin
    val schema = StructType(Array(StructField("Timestamp",StructType(Array(StructField("$date", DataTypes.TimestampType))))))

    val ds = spark.read
      .schema(schema)
      .json(Seq(data).toDS())

    ds.show(false)
    ds.printSchema()

结果-

+------------------------+
|Timestamp               |
+------------------------+
|[2020-02-28 16:54:28.81]|
+------------------------+

root
 |-- Timestamp: struct (nullable = true)
 |    |-- $date: timestamp (nullable = true)

就连我也试着把它读成 string 然后把柱子铸成 Timestamp ,但结果是一样的-

val data =
      """
        |{
        |   "Timestamp": {
        |       "$date": "2020-02-28T11:24:28.810Z"
        |   }
        |}
      """.stripMargin
    val schema = StructType(Array(StructField("Timestamp",StructType(Array(StructField("$date", DataTypes.StringType))))))

    val ds = spark.read
      .schema(schema)
      .json(Seq(data).toDS())

    ds.show(false)
    ds.printSchema()

    ds.select(col("Timestamp.$date").cast(DataTypes.TimestampType)).show(false)

结果:

+--------------------------+
|Timestamp                 |
+--------------------------+
|[2020-02-28T11:24:28.810Z]|
+--------------------------+

root
 |-- Timestamp: struct (nullable = true)
 |    |-- $date: string (nullable = true)

+----------------------+
|$date                 |
+----------------------+
|2020-02-28 16:54:28.81|
+----------------------+

如果您正在使用加载数据 DataFrameReader 然后您可以使用下面的选项更改格式-

.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")

来自\u json

从json加载数据对我来说很好

val df = Seq(data).toDF("json_string")
      .select(from_json(col("json_string"), schema).alias("detail"))
      .select("detail.*")
    .withColumn("Timestamp", col("Timestamp").getField("$date"))
df.show(false)
    df.printSchema()

+------------------------+
|Timestamp               |
+------------------------+
|2020-02-28T11:24:28.810Z|
+------------------------+

root
 |-- Timestamp: string (nullable = true)

相关问题