我正在使用官方MongoDB Spark连接器从MongoDB集合中阅读Spark中的数据,代码如下:
val spark = SparkSession.
builder().
appName("MongoDB to SQL").
getOrCreate()
val df = MongoSpark.load(spark, readConfig)
df.count()
readConfig是MongoDB的一个标准读取配置,它运行得很好。我遇到的问题是,我从MongoDB中获取的一些日期/时间是字符串,它无法将其转换为Spark类型TimestampValue:
INFO DAGScheduler: Job 1 failed: count at transfer.scala:159, took 3,138191 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver):
com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a TimestampType (value: BsonString{value='2999.12.31 14:09:34'})
at com.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToDataType(MapFunctions.scala:200)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:39)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:37)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
在.count()
之前调用df.printSchema()
时,我看到有问题的属性被列为
| | | |-- endDate: string (nullable = true)
在MongoDB中,endDate也存储为String。Spark在这里是否做了额外的步骤来检测模式?然后它无法转换它......?从www.example.com上的源代码来看https://github.com/mongodb/mongo-spark/blob/master/src/main/scala/com/mongodb/spark/sql/MapFunctions.scala#L181,它只做了简单的Map,而没有复杂的转换。
使用的版本:Mongo-Scala驱动程序2.4.0、Mongo-Spark连接器2.3.0、Spark 2.3.1
3条答案
按热度按时间brgchamk1#
If I well understand your issue, it seems that you need to convert date as string into timestamp (including time zone) using
unix_timestamp
and casting it asTimestampType
If you have a df with: df with [id: int, date: string]
You can try to see this example : https://docs.databricks.com/_static/notebooks/timestamp-conversion.html
7eumitmz2#
可能是架构的另一个字段导致此错误,但不是“endDate”。您显示的错误信息中没有“endDate”导致此错误。
MongoDB Connector for Spark默认使用每个字段的1000个样本来构建其模式,因此,如果一个字段包含不同的数据类型,如字符串数据类型和日期时间数据类型,MongoDB Connector for Spark可能不对字符串数据进行采样,而将该字段作为日期时间数据类型。至少,当您使用count方法时,连接器将尝试从mongodb加载数据以指定Spark Dataframe 中的数据类型,并导致以下错误:“无法将STRING转换成TimestampType”
解决方案:
**添加MongoDB Connector for Spark的示例数据以构建正确的模式。**例如,在pyspark中:
bhmjp9jg3#
问题在于数据字段,即一个条目具有bson而不是date,这是mongo端的一个bug,需要架构来适应这种情况