mongodb MongoType转换异常:即使显式架构不包含NullTypes,也无法使用Mongo Spark连接器将STRING强制转换为NullType

siv3szwd  于 2022-11-28  发布在  Go
关注(0)|答案(2)|浏览(226)

我正在从MongodB导入一个集合到Spark。

val partitionDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "db").option("collection", collectionName).load()

对于结果DataFrame中的data列,我得到以下类型:

StructType(StructField(configurationName,NullType,true), ...

因此在一些列中的至少一些类型是NullType
根据Spark中的Writing null values to Parquet(当NullType位于StructType内部时),我尝试通过将所有NullType替换为StringType来修复模式:

def denullifyStruct(struct: StructType): StructType = {
  val items = struct.map{ field => StructField(field.name, denullify(field.dataType), field.nullable, field.metadata) }
  StructType(items)
}

def denullify(dt: DataType): DataType = {
  if (dt.isInstanceOf[StructType]) {
    val struct = dt.asInstanceOf[StructType]
    return denullifyStruct(struct)
  } else if (dt.isInstanceOf[ArrayType]) {
    val array = dt.asInstanceOf[ArrayType]
    return ArrayType(denullify(array.elementType), array.containsNull)
  } else if (dt.isInstanceOf[NullType]) {
    return StringType
  }
  return dt
}

val fixedDF = spark.createDataFrame(partitionDF.rdd, denullifyStruct(partitionDF.schema))

发出fixedDF.printSchema,我可以看到NullType不再存在于fixedDF的模式中。

fixedDF.write.mode("overwrite").parquet(partitionName + ".parquet")

出现以下错误:

Caused by: com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a NullType (value: BsonString{value='117679.8'})
    at com.mongodb.spark.sql.MapFunctions$.convertToDataType(MapFunctions.scala:214)
    at com.mongodb.spark.sql.MapFunctions$.$anonfun$documentToRow$1(MapFunctions.scala:37)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)

又是一个NullType
当我只计算行数时,也会出现同样的问题:fixedDF.count() .
Spark在写Parquet(或计数)时是否会再次推断模式?是否有可能关闭这种推断(或以其他方式克服这种推断)?

2izufjch

2izufjch1#

问题不是由parquet写入方法引起的。由于某种类型转换问题,将数据作为 Dataframe 读取时出错。此jira page表示从mondoDB读取数据时需要添加samplePoolSize选项和other options

mspsb9vt

mspsb9vt2#

问题是,即使您提供了带有显式模式的DataFrame,对于某些操作(如count()或保存到磁盘),Mongo派生的DataFrame仍将推断模式。
为了推断架构,它使用采样,这意味着它在推断时看不到某些数据。如果它只看到具有null值的某个字段,它将为其推断NullType。稍后,当它遇到具有某个字符串的此字段时,这样的字符串将无法转换为NullType
因此,这里的基本问题是采样。如果您的模式是稳定的和“密集的”(每个或几乎每个文档都填充了所有字段),采样将很好地工作。但如果某些字段是“稀疏的”(高概率为空),采样可能会失败。
一个粗略的解决方案是完全避免抽样。也就是说,使用一般总体而不是样本来推断模式。如果没有太多的数据(或者你能够等待),它可能会工作。
下面是一个实验分支:https://github.com/rpuch/mongo-spark/tree/read-full-collection-instead-of-sampling
这个想法是从采样切换到使用整个集合,如果这样配置的话。引入一个新的配置选项有点太麻烦了,所以我只是在'sampleSize'配置选项设置为1的情况下禁用采样,如下所示:

.option("sampleSize", 1) // MAGIC! This effectively turns sampling off, instead the schema is inferred based on general population

在这种情况下,完全避免了采样。一个明显的解决方案是使用等于集合大小的N来采样,这使得MongoDB在内存中对大量数据进行排序,这似乎是有问题的。因此,我完全禁用了采样。

相关问题