我试着比较两组数据。一种是dataframe一组静态数据,以avro格式写入。现在,这种比较从avro读回并检查哪个列有时间戳,比较失败,因为avro将数据存储为long,sql类型转换给出不同的值
**--CREATE THE DATAFRAME**
val data = Seq(Row("1",java.sql.Timestamp.valueOf("2019-03-15 18:20:06.456")))
val schemaOrig = List( StructField("rowkey",StringType,true)
,StructField("txn_ts",TimestampType,true))
val sourceDf = spark.createDataFrame(spark.sparkContext.parallelize(data),StructType(schemaOrig))
sourceDf.write.avro("test")
sourceDf.printSchema
root
|-- rowkey: string (nullable = true)
|-- txn_ts: timestamp (nullable = true)
sourceDf.show(false)
+----------------+-----------------------+
|rowkey |txn_ts |
+----------------+-----------------------+
|1 |2019-03-15 18:20:06.456|
+----------------+-----------------------+
--As shown above the avro file has the expected schema specified ie String and Timestamp
--Now Read the data back from Avro
val avroDf=spark.read.avro("test")
avroDf.printSchema
root
|-- rowkey: string (nullable = true)
|-- txn_ts: long (nullable = true)
avroDf.show(false)
--Avro Df schema is printing the timestamp field as long and data showing epoch time
+----------------+-------------+
|rowkey |txn_ts |
+----------------+-------------+
|1 |1552688406456|
+----------------+-------------+
compare the 2 Df
sourceDf.except(avroDf).show(false)
--Gives error due to datatype mismatch
org.apache.spark.sql.AnalysisException: Except can only be performed on tables with the compatible column types. bigint <> timestamp at the second column of the second table;;
'Except
:- AnalysisBarrier
CAST the avro data long field back to time
stamp
val modifiedAvroDf=avroDf.withColumn("txn_ts", col("txn_ts").cast(TimestampType))
modifiedAvroDf.printSchema
|-- rowkey: string (nullable = true)
|-- txn_ts: timestamp (nullable = true)
modifiedAvroDf.show(false)
--Showing wrong timestamp value
+----------------+-----------------------+
|rowkey |txn_ts |
+----------------+-----------------------+
|1 |51172-09-26 11:07:366.0|
+----------------+-----------------------+
--Now Try to cast the source column to long
val sourceModDf=sourceDf.withColumn("txn_ts",col("txn_ts").cast(LongType))
sourceModDf.printSchema
|-- rowkey: string (nullable = true)
|-- txn_ts: long (nullable = true)
sourceModDf.show(false)
sourceModDf.except(modifiedAvroDf).show(false)
1条答案
按热度按时间yvt65v4c1#
已创建udf以将长字符串转换为时间戳字符串。请检查下面的代码。