Spark Mongodb Connector Scala -缺少数据库名称

vwhgwdsa  于 2023-04-06  发布在  Scala
关注(0)|答案(3)|浏览(183)

我遇到了一个奇怪的问题。我试图使用mongodb spark连接器将Spark本地连接到MongoDB。
除了设置spark之外,我还使用了以下代码:

val readConfig = ReadConfig(Map("uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/movie_db.movie_ratings"))

// Load the movie rating data from Mongo DB
val movieRatings = MongoSpark.load(sc, readConfig).toDF()

movieRatings.show(100)

但是,我得到一个编译错误:

java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.database' property.

在我设置readConfig的地方。我不明白为什么当我在Map中明明有一个uri属性时,它却抱怨没有设置uri。我可能错过了什么。

p8h8hvxi

p8h8hvxi1#

您可以从这里提到的SparkSession执行此操作

val spark = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/movie_db.movie_ratings")
    .config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .getOrCreate()

使用配置创建 Dataframe

val readConfig = ReadConfig(Map("uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", "readPreference.name" -> "secondaryPreferred"))
val df = MongoSpark.load(spark)

将df写入mongodb

MongoSpark.save(
df.write
    .option("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .mode("overwrite"))

**您的代码中:**配置中缺少前缀

val readConfig = ReadConfig(Map(
    "spark.mongodb.input.uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", 
    "spark.mongodb.input.readPreference.name" -> "secondaryPreferred"), 
    Some(ReadConfig(sc)))

val writeConfig = WriteConfig(Map(
    "spark.mongodb.output.uri" -> "mongodb://127.0.0.1/movie_db.movie_ratings"))
t8e9dugd

t8e9dugd2#

对于Java,您可以在创建spark session时设置configs,也可以先创建会话,然后将其设置为运行时配置。
1.

SparkSession sparkSession = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnector")
    .config("spark.mongodb.input.uri","mongodb://localhost:27017/movie_db.movie_ratings")
    .config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .getOrCreate()


2.

SparkSession sparkSession = SparkSession.builder()
        .master("local")
        .appName("MongoSparkConnector")
        .getOrCreate()

然后

String mongoUrl = "mongodb://localhost:27017/movie_db.movie_ratings";
   sparkSession.sparkContext().conf().set("spark.mongodb.input.uri", mongoURL);
   sparkSession.sparkContext().conf().set("spark.mongodb.output.uri", mongoURL);
   Map<String, String> readOverrides = new HashMap<String, String>();
   readOverrides.put("collection", sourceCollection);
   readOverrides.put("readPreference.name", "secondaryPreferred");
   ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides);
   Dataset<Row> df = MongoSpark.loadAndInferSchema(sparkSession,readConfig);
3phpmpom

3phpmpom3#

如果您使用的是MongoDB Spark连接器版本为10.1或更高,则为SparkConfhas changed的配置密钥。

SparkSession sparkSession = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnector")
    .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/movie_db.movie_ratings")
    .config("spark.mongodb.read.readPreference.name", "secondaryPreferred")
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .getOrCreate()

设置配置的不同方法仍然可用。

相关问题