解析spark中几乎没有模式的mongo集合时出现问题

djmepvbi  于 2023-08-06  发布在  Apache
关注(0)|答案(3)|浏览(128)

我正在使用Spark将数据从一个集合移动到另一个集合中。数据的模式是不一致的(我的意思是,在一个集合中只有很少的模式,具有不同的数据类型,几乎没有变化)。当我试图从spark读取数据时,采样无法获取所有数据的模式,并抛出以下错误。(我有一个复杂的模式,我不能明确提及,而不是spark通过采样获取。)
第一个月
我试着将集合作为RDD读取并作为RDD写入,但问题仍然存在。
有什么帮助吗!

  • 谢谢-谢谢
dgtucam1

dgtucam11#

所有这些com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast SOME_TYPE into a NullType都来自不正确的模式推理。对于JSON文件或mongodb等无模式的数据源,Spark会扫描一小部分数据来确定类型。如果某个特定的字段有很多NULL,你可能会很不走运,类型将被设置为NullType
您可以做的一件事是增加为模式推理而扫描的条目数。
另一个-首先获取推断的模式,修复它,然后使用修复的模式重新加载数据框:

def fix_spark_schema(schema):
  if schema.__class__ == pyspark.sql.types.StructType:
    return pyspark.sql.types.StructType([fix_spark_schema(f) for f in schema.fields])
  if schema.__class__ == pyspark.sql.types.StructField:
    return pyspark.sql.types.StructField(schema.name, fix_spark_schema(schema.dataType), schema.nullable)
  if schema.__class__ == pyspark.sql.types.NullType:
    return pyspark.sql.types.StringType()
  return schema

collection_schema = sqlContext.read \
    .format("com.mongodb.spark.sql") \
    .options(...) \
    .load() \
    .schema

collection = sqlContext.read \
    .format("com.mongodb.spark.sql") \
    .options(...) \
    .load(schema=fix_spark_schema(collection_schema))

字符串
在我的例子中,所有有问题的字段都可以用StringType表示,如果需要,可以使逻辑更复杂。

f0ofjuux

f0ofjuux2#

据我了解你的问题:* 要么Spark错误地检测到了你的schema,并认为某些字段是必需的(nullable = false)-在这种情况下,你仍然可以显式地定义它并将nullable设置为true。如果您的模式正在发展,并且在过去的一段时间内添加或删除了一个字段,但仍然保留列类型(例如,String将始终是String,而不是Struct或其他完全不同的类型)* 或者您的模式完全不一致,即您的String字段在某个时候转换为Struct或其他完全不同的类型。在这种情况下,除了使用RDD抽象和使用非常宽松的类型(如Scala中的Any(Java中的Object))并使用isInstanceOf测试将所有字段规范化为1种通用格式外,我没有看到其他解决方案
实际上,我也看到了另一种可能的解决方案,但前提是您知道哪些数据具有哪些模式。例如,如果你知道2018-01-01和2018-02-01之间的数据使用schema#1,而其他数据使用schema#2,你可以编写一个管道将schema#1转换为schema#2。稍后,您可以简单地union两个数据集,并将转换应用于一致结构化的值。
编辑:
我刚刚尝试了你给予的类似代码,它在我的本地MongoDB示例上正常工作:

val sc = getSparkContext(Array("mongodb://localhost:27017/test.init_data")) 

// Load sample data
import com.mongodb.spark._

val docFees =
  """
    | {"fees": null}
    | {"fees": { "main" : [ { "type" : "misc", "appliesPer" : "trip", "description" : null, "minAmount" : 175, "maxAmount" : 175 } ]} }
  """.stripMargin.trim.stripMargin.split("[\\r\\n]+").toSeq
MongoSpark.save(sc.parallelize(docFees.map(Document.parse)))

val rdd = MongoSpark.load(sc)
rdd.saveToMongoDB(WriteConfig(Map("uri"->"mongodb://localhost:27017/test.new_coll_data", "replaceDocument"->"true")))

字符串
当我在MongoDB shell中检查结果时,我得到了:

> coll = db.init_data; 
test.init_data
> coll.find();
{ "_id" : ObjectId("5b33d415ea78632ff8452c60"), "fees" : { "main" : [ { "type" : "misc", "appliesPer" : "trip", "description" : null, "minAmount" : 175, "maxAmount" : 175 } ] } }
{ "_id" : ObjectId("5b33d415ea78632ff8452c61"), "fees" : null }
> coll = db.new_coll_data;
test.new_coll_data
> coll.find();
{ "_id" : ObjectId("5b33d415ea78632ff8452c60"), "fees" : { "main" : [ { "type" : "misc", "appliesPer" : "trip", "description" : null, "minAmount" : 175, "maxAmount" : 175 } ] } }
{ "_id" : ObjectId("5b33d415ea78632ff8452c61"), "fees" : null }

0pizxfdo

0pizxfdo3#

Vlyubin的解决方案成功了一半,因为现在它得到了这个错误:

java.lang.NoSuchMethodError: 'scala.collection.immutable.Seq org.apache.spark.sql.types.StructType.toAttributes()'

字符串

相关问题