我已经用avro格式将sparkDataframe列之一写入kafka。然后我尝试从这个主题中读取数据,并将其从avro转换为data frame列。数据的类型是时间戳,而不是数据库中的时间戳,我得到一些默认值:
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
1970-01-01 00:00:00
对于其他数据类型(如string)的列,也可以注意到同样的行为。初始时间戳值如下所示,这是我想要得到的结果:
2019-03-19 12:26:03.003
2019-03-19 12:26:09
2019-03-19 12:27:04.003
2019-03-19 12:27:08.007
2019-03-19 12:28:01.013
2019-03-19 12:28:05.007
2019-03-19 12:28:09.023
2019-03-19 12:29:04.003
2019-03-19 12:29:07.047
2019-03-19 12:30:00.003
以下是转换为avro后的相同数据:
00 F0 E1 9B BC B3 9C C2 05
00 80 E9 F7 C1 B3 9C C2 05
00 F0 86 B2 F6 B3 9C C2 05
00 B0 E9 9A FA B3 9C C2 05
00 90 A4 E1 AC B4 9C C2 05
00 B0 EA C8 B0 B4 9C C2 05
00 B0 88 B3 B4 B4 9C C2 05
00 F0 BE EA E8 B4 9C C2 05
00 B0 89 DE EB B4 9C C2 05
00 F0 B6 9E 9E B5 9C C2 05
如何解决此转换问题?
将avro写入kafka、读取并转换回Dataframe的代码。我试着使用toïavro和fromïavro spark avro方法:
import org.apache.spark.sql.avro._
val castDF = testDataDF.select(to_avro(testDataDF.col("update_database_time")) as 'value)
castDF
.write
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("topic", "app_state_test")
.save()
val cachedDf = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", "app_state_test")
.load()
val jsonSchema = "{\"name\": \"update_database_time\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\", \"default\": \"NONE\"}"
cachedDf.select(from_avro(cachedDf.col("value"), jsonSchema) as 'test)
暂无答案!
目前还没有任何答案,快来回答吧!