嗨,我是JavaSpark的新手,已经寻找解决方案好几天了。
我正在将mongodb数据加载到配置单元表中,但是,我在saveastable时发现了一些错误,从而导致了此错误
com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a StructType(StructField(oid,StringType,true)) (value: BsonString{value='54d3e8aeda556106feba7fa2'})
我已经尝试增加样本大小,不同的mongoSpark连接器版本。。。但没有有效的解决方案。
我搞不清楚根本原因是什么,两者之间还有什么差距需要解决?
最令人困惑的是,我有相似的数据集,使用相同的流而没有问题。
mongodb数据模式类似于嵌套结构和数组
root
|-- sample: struct (nullable = true)
| |-- parent: struct (nullable = true)
| | |-- expanded: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- distance: integer (nullable = true)
| | | | |-- id: struct (nullable = true)
| | | | | |-- oid: string (nullable = true)
| | | | |-- keys: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- name: string (nullable = true)
| | | | |-- parent_id: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- oid: string (nullable = true)
| | | | |-- type: string (nullable = true)
| | |-- id: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- oid: string (nullable = true)
样本数据
"sample": {
"expanded": [
{
"distance": 0,
"type": "domain",
"id": "54d3e17b5cf737074d4065b0",
"parent_id": [
"54d3e1775cf737074d406599"
],
"name": "level2"
},
{
"distance": 1,
"type": "domain",
"id": "54d3e1775cf737074d406599",
"name": "level1"
}
],
"id": [
"54d3e17b5cf737074d4065b0"
]
}
样本代码
public static void main(final String[] args) throws InterruptedException {
// spark session read mongodb
SparkSession mongo_spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("mongo_spark.master", "local")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/test_db.test_collection")
.enableHiveSupport()
.getOrCreate();
// Create a JavaSparkContext using the SparkSession's SparkContext object
JavaSparkContext jsc = new JavaSparkContext(mongo_spark.sparkContext());
// Load data and infer schema, disregard toDF() name as it returns Dataset
Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();
implicitDS.printSchema();
implicitDS.show();
// createOrReplaceTempView to see if the data being read
// implicitDS.createOrReplaceTempView("my_table");
// implicitDS.printSchema();
// implicitDS.show();
// saveAsTable
implicitDS.write().saveAsTable("my_table");
mongo_spark.sql("SELECT * FROM my_table limit 1").show();
mongo_spark.stop();
}
如果有人有什么想法,我将不胜感激。谢谢
2条答案
按热度按时间uhry853o1#
我也遇到了同样的问题,samplesize部分修复了这个问题,但是如果您有大量数据,就无法解决它。
下面是解决方法,你可以解决这个问题。将此方法与增加的样本量结合使用(在我的例子中是100000):
gcuhipw92#
当我适当增加样本量时,这个问题就不存在了。
如何配置java spark sparksession samplesize