我想使用spark java将从kafka接收到的数据保存到mongodb,到目前为止,我无法存储数据,因为我缺少数据库异常,但我遵循了所有的指导原则,仍然无法解决它。下面是代码:
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/db.collection")
.config("spark.mongodb.output.uri", "mongodb://localhost:27017/db.collection")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
Map<String, String> writerMap = new HashMap<String, String>();
writerMap.put("collection", "survey");
writerMap.put("writeConcern.w", "majority");
WriteConfig writerConfig = WriteConfig.create(jsc).withOptions(writerMap);
exception
Exception in thread "main" java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.output.uri' or 'spark.mongodb.output.database' property
at com.mongodb.spark.config.MongoCompanionConfig$class.databaseName(MongoCompanionConfig.scala:260)
at com.mongodb.spark.config.WriteConfig$.databaseName(WriteConfig.scala:37)
at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:244)
at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
at com.mongodb.spark.config.MongoCompanionConfig$class.apply(MongoCompanionConfig.scala:124)
at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
at com.mongodb.spark.config.MongoCompanionConfig$class.apply(MongoCompanionConfig.scala:113)
at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
at com.mongodb.spark.config.MongoCompanionConfig$class.apply(MongoCompanionConfig.scala:100)
at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
at com.mongodb.spark.config.WriteConfig$.create(WriteConfig.scala:440)
at com.mongodb.spark.config.WriteConfig.create(WriteConfig.scala)
at com.spark.streaming.kafka.ColleagueSurveyKafkaStreaming.saveToMongoDB(ColleagueSurveyKafkaStreaming.java:239)
at com.spark.streaming.kafka.ColleagueSurveyKafkaStreaming.sparkStreamingForColleagueSurvey(ColleagueSurveyKafkaStreaming.java:210)
at com.spark.streaming.kafka.ColleagueSurveyKafkaStreaming.main(ColleagueSurveyKafkaStreaming.java:74)
暂无答案!
目前还没有任何答案,快来回答吧!