scala—mongodb中的读写配置设置管理

7rfyedvj  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(522)

假设在mongodb中有多个dbs(db1,db2。。。dba,dbb,…),它们中的每一个都有一些集合(col1a,col1b。。。col2a、col2b……)
我想找到一种方法来管理mongodb中的多个输入和输出。我想要一个用scala语言编写的自包含scala应用程序。下面是显示我想法的伪代码:

readconfig_DB1.Col1A=Read setting pointing to DB=DB1 and collection=Col1A
readconfig_DB2.Col2B=Read setting pointing to DB=DB2 and collection=Col2B

val rdd_DB1.Col1A = MongoSpark.load(sc_DB1.Col1A)
val rdd_DB2.Col2B = MongoSpark.load(sc_DB2.Col2B)

DF_Transofmration1 = Do some transformations on DF1a and DF2b
DF_Transofmration2 = Do some transformations on DF1b and DF2a

writeConfig_DBa.Col1A=Write setting pointing to DB=DB1 and collection=Col1A
writeConfig_DBb.Col2B=Write setting pointing to DB=DB2 and collection=Col2B

MongoSpark.save(DF_Transofmration1 , writeConfig_DBa.Col1A)
MongoSpark.save(DF_Transofmration2 , writeConfig_DBa.Col2B)

编辑1:
我试着运行解决方案。
文件夹的结构:

$find .
.
./src
./src/main
./src/main/scala
./src/main/scala/application.conf
./src/main/scala/SimpleApp.scala
./build.sbt

的内容 build.sbt ```
scalaVersion := "2.11.12"

libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector" % "2.4.1",
"org.apache.spark" %% "spark-core" % "2.4.1",
"org.apache.spark" %% "spark-sql" % "2.4.1"
)

的内容 `application.conf` :

config{
database {
"spark_mongodb_input_uri": "mongodb://127.0.0.1/test.myCollection",
"spark_mongodb_user":"",
"spark_mongodb_pass":"",
"spark_mongodb_input_database": "test",
"spark_mongodb_input_collection": "myCollection",
"spark_mongodb_input_readPreference_name": "",
"spark_mongodb_output_database": "test",
"spark_mongodb_output_collection": "myCollection"
}

newreaderone {
  "database": "test",
  "collection": "myCollection",
  "readPreference.name": ""
}

newwriterone {
  "uri":"mongodb://127.0.0.1/test.myCollection"
  "database": "test",
  "collection": "myCollection",
  "replaceDocument": "false",//If set to True, updates an existing document
  "readPreference.name": "",
  "maxBatchSize": "128"
}

}

的内容 `SimpleApp.scala` :

import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession

object FirstApp {
def main(args: Array[String]) {

import com.typesafe.{Config,ConfigFactory}
val appConfig: Config = ConfigFactory.load("application.conf")
import scala.collection.JavaConverters._
val initial_conf:Config = appconf.getConfig("config.database")
val confMap: Map[String,String] = initial_conf.entrySet()
.iterator.asScala
.map(e => e.getKey.replaceAll("_",".") -> e.getValue.unwrapped.toString).toMap
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame,SparkSession}
val sparkConfig: SparkConf=new SparkConf()
sparkConfig.setAll(confMap)
val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate
import com.mongodb.spark._
val data: DataFrame = MongoSpark.load(spark)
import com.mongodb.spark.config._
val nreader = appConfig.getConfig("config.newreaderone")
val readMap: Map[String,Any] = nreader.entrySet()
.iterator.asScala
.map(e => e.getKey -> e.getValue.unwrapped)
.toMap
val customReader = ReadConfig(readMap)
val newDF: DataFrame = spark.read.mongo(customReader)
resultDF.write.mode("append").mongo()

}
}

编译后出错:

sbt package
[info] Updated file /Path/3/project/build.properties: set sbt.version to 1.3.10
[info] Loading global plugins from /home/sadegh/.sbt/1.0/plugins
[info] Loading project definition from /Path/3/project
[info] Loading settings for project root-3 from build.sbt ...
[info] Set current project to root-3 (in build file:/Path/3/)
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] Compiling 1 Scala source to /Path/3/target/scala-2.11/classes ...
[error] /Path/3/src/main/scala/SimpleApp.scala:8:13: object typesafe is not a member of package com
[error] import com.typesafe.{Config,ConfigFactory}
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:9:17: not found: type Config
[error] val appConfig: Config = ConfigFactory.load("application.conf")
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:9:26: not found: value ConfigFactory
[error] val appConfig: Config = ConfigFactory.load("application.conf")
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:11:19: not found: type Config
[error] val initial_conf:Config = appconf.getConfig("config.database")
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:11:28: not found: value appconf
[error] val initial_conf:Config = appconf.getConfig("config.database")
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:19:56: not found: value sparkConf
[error] val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:28:21: overloaded method value apply with alternatives:
[error] (options: scala.collection.Map[String,String])com.mongodb.spark.config.ReadConfig.Self
[error] (sparkConf: org.apache.spark.SparkConf)com.mongodb.spark.config.ReadConfig.Self
[error] (sqlContext: org.apache.spark.sql.SQLContext)com.mongodb.spark.config.ReadConfig.Self
[error] (sparkSession: org.apache.spark.sql.SparkSession)com.mongodb.spark.config.ReadConfig.Self
[error] (sparkContext: org.apache.spark.SparkContext)com.mongodb.spark.config.ReadConfig.Self
[error] cannot be applied to (scala.collection.immutable.Map[String,Any])
[error] val customReader = ReadConfig(readMap)
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:29:36: value mongo is not a member of org.apache.spark.sql.DataFrameReader
[error] val newDF: DataFrame = spark.read.mongo(customReader)
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:30:2: not found: value resultDF
[error] resultDF.write.mode("append").mongo()
[error] ^
[error] 9 errors found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 12 s, completed Jun 14, 2020 6:55:43 PM

smdncfj3

smdncfj31#

您可以将配置作为输入传递给hocon中的应用程序。您可以尝试下面的hocon配置片段来进行多读写配置。

config{
    database {
      "spark_mongodb_input_uri": "mongodb://connection/string/here",
      "spark_mongodb_user":"your_user_name",
      "spark_mongodb_pass":"your_password",
      "spark_mongodb_input_database": "Some_Db_Name",
      "spark_mongodb_input_collection": "Some_Col_Name",
      "spark_mongodb_input_readPreference_name": "primaryPreferred",
      "spark_mongodb_output_database": "Some_Output_Db_Name",
      "spark_mongodb_output_collection": "Some_Output_Col_Name"
    }

    newreaderone {
      "database": "sf",
      "collection": "matchrecord",
      "readPreference.name": "secondaryPreferred"
    }

    newwriterone {
      "uri":"mongodb://uri of same or new mongo cluster/instance"
      "database": "db_name",
      "collection": "col_name",
      "replaceDocument": "false",//If set to True, updates an existing document
      "readPreference.name": "secondaryPreferred",
      "maxBatchSize": "128"
    }
}

以上配置已经过测试,可以通过typesafe配置库轻松读取。
更新:
在call application.conf文件中输入上述配置。使用以下代码行读取文件
步骤1:读取配置文件

import com.typesafe.{Config,ConfigFactory}

val appConfig: Config = ConfigFactory.load("/path/to/application.conf")

步骤2:要初始化spark以读写mongodb,我们使用数据库部分下的配置,如下所示:

import scala.collection.JavaConverters._

val initial_conf:Config = appconf.getConfig("config.database")

val confMap: Map[String,String] = initial_conf.entrySet()
.iterator.asScala
.map(e => e.getKey.replaceAll("_",".") -> e.getValue.unwrapped.toString).toMap

步骤3:创建sparksession

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame,SparkSession}

val sparkConfig: SparkConf=new SparkConf()

sparkConfig.setAll(confMap)

val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate

步骤4:从mongodb读取Dataframe

import com.mongodb.spark._
val data: DataFrame = MongoSpark.load(spark)

上面的步骤读取配置的数据库部分中指定的集合
步骤5:阅读新收藏:

import com.mongodb.spark.config._

val nreader = appConfig.getConfig("config.newreaderone")

val readMap: Map[String,Any] = nreader.entrySet()
.iterator.asScala
.map(e => e.getKey -> e.getValue.unwrapped)
.toMap

val customReader = ReadConfig(readMap)

val newDF: DataFrame = spark.read.mongo(customReader)

步骤6:写入mongodb集合

resultDF.write.mode("append").mongo()

以上代码写入配置的数据库部分下指定的集合
ii)向sparkconf中规定的托收行以外的托收行写信

import com.mongodb.spark.config._

val nwriter = appConfig.getConfig("config.newwriterone")

val writerMap: Map[String,Any] = nreader.entrySet()
            .iterator.asScala
            .map(e => e.getKey -> e.getValue.unwrapped).toMap

val writeConf = WriteConfig(writerMap)

MongoSpark.save(resultDF, writeConf)

更新:
整个代码如下所示,看看最后我将Dataframe保存到mongodb的两种方法

import com.typesafe.{Config,ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame,SparkSession}
import scala.collection.JavaConverters._
import com.mongodb.spark._
import com.mongodb.spark.config._

object ReadWriteMongo{

  def main(args: Array[String]): Unit = {

    val appConfig: Config = ConfigFactory.load("application.conf")

    val initial_conf:Config = appconf.getConfig("config.database")

    val confMap: Map[String,String] = initial_conf.entrySet()
    .iterator.asScala
    .map(e => e.getKey.replaceAll("_",".") -> e.getValue.unwrapped.toString).toMap

    val sparkConfig: SparkConf=new SparkConf()

    sparkConfig.setAll(confMap)

    val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate

    val data: DataFrame = MongoSpark.load(spark)

    val nreader = appConfig.getConfig("config.newreaderone")

    val readMap: Map[String,Any] = nreader.entrySet()
    .iterator.asScala
    .map(e => e.getKey -> e.getValue.unwrapped)
    .toMap

    val customReader = ReadConfig(readMap)

    /*
    Read Data from MongoDB
    */
    val newDF: DataFrame = spark.read.mongo(customReader)

    /*
    * After you performing processing on the newDF above and save
    * the result in a new Dataframe called "resultDF". 
    * You can save the DF as follows
    */
    resultDF.write.mode("append").mongo()

    /*
    *Alternatively You can save a Dataframe also by passing a WriteConfig as follows
    */

    val nwriter = appConfig.getConfig("config.newwriterone")

    val writerMap: Map[String,Any] = nreader.entrySet()
                .iterator.asScala
                .map(e => e.getKey -> e.getValue.unwrapped).toMap

    val writeConf = WriteConfig(writerMap)

    MongoSpark.save(resultDF, writeConf)
  }
}

文件夹结构应如下所示:

src/

src/main/scala/com/test/ReadWriteMongo.scala

src/main/resources/application.conf

更新:build.sbt

val sparkVersion = "2.4.1"

scalaVersion := "2.11.12"

scalacOptions ++= Seq(
  "-deprecation",
  "-feature",
  "-Xfuture",
  "-encoding",
  "UTF-8",
  "-unchecked",
  "-language:postfixOps"
)

libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.4.0",
"org.mongodb.spark" %% "mongo-spark-connector" % sparkVersion,
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided
)

mainClass in assembly := Some("com.test.SimpleApp.scala")

assembly / test := {}

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

assemblyMergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")       => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")   => MergeStrategy.discard
  case "reference.conf"                                 => MergeStrategy.concat
  case x: String if x.contains("UnusedStubClass.class") => MergeStrategy.first
  case _                                                => MergeStrategy.first
}

相关问题