如何解决缺少数据库名的异常?

jogvjijk  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(313)

我想使用spark java将从kafka接收到的数据保存到mongodb,到目前为止,我无法存储数据,因为我缺少数据库异常,但我遵循了所有的指导原则,仍然无法解决它。下面是代码:

.config("spark.mongodb.input.uri", "mongodb://localhost:27017/db.collection") 
                .config("spark.mongodb.output.uri", "mongodb://localhost:27017/db.collection") 
                .getOrCreate(); 

        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

       Map<String, String> writerMap = new HashMap<String, String>();
        writerMap.put("collection", "survey");  
        writerMap.put("writeConcern.w", "majority");
        WriteConfig writerConfig = WriteConfig.create(jsc).withOptions(writerMap);      

exception
Exception in thread "main" java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.output.uri' or 'spark.mongodb.output.database' property
    at com.mongodb.spark.config.MongoCompanionConfig$class.databaseName(MongoCompanionConfig.scala:260)
    at com.mongodb.spark.config.WriteConfig$.databaseName(WriteConfig.scala:37)
    at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:244)
    at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
    at com.mongodb.spark.config.MongoCompanionConfig$class.apply(MongoCompanionConfig.scala:124)
    at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
    at com.mongodb.spark.config.MongoCompanionConfig$class.apply(MongoCompanionConfig.scala:113)
    at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
    at com.mongodb.spark.config.MongoCompanionConfig$class.apply(MongoCompanionConfig.scala:100)
    at com.mongodb.spark.config.WriteConfig$.apply(WriteConfig.scala:37)
    at com.mongodb.spark.config.WriteConfig$.create(WriteConfig.scala:440)
    at com.mongodb.spark.config.WriteConfig.create(WriteConfig.scala)
    at com.spark.streaming.kafka.ColleagueSurveyKafkaStreaming.saveToMongoDB(ColleagueSurveyKafkaStreaming.java:239)
    at com.spark.streaming.kafka.ColleagueSurveyKafkaStreaming.sparkStreamingForColleagueSurvey(ColleagueSurveyKafkaStreaming.java:210)
    at com.spark.streaming.kafka.ColleagueSurveyKafkaStreaming.main(ColleagueSurveyKafkaStreaming.java:74)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题