假设在mongodb中有多个dbs(db1,db2。。。dba,dbb,…),它们中的每一个都有一些集合(col1a,col1b。。。col2a、col2b……)
我想找到一种方法来管理mongodb中的多个输入和输出。我想要一个用scala语言编写的自包含scala应用程序。下面是显示我想法的伪代码:
readconfig_DB1.Col1A=Read setting pointing to DB=DB1 and collection=Col1A
readconfig_DB2.Col2B=Read setting pointing to DB=DB2 and collection=Col2B
val rdd_DB1.Col1A = MongoSpark.load(sc_DB1.Col1A)
val rdd_DB2.Col2B = MongoSpark.load(sc_DB2.Col2B)
DF_Transofmration1 = Do some transformations on DF1a and DF2b
DF_Transofmration2 = Do some transformations on DF1b and DF2a
writeConfig_DBa.Col1A=Write setting pointing to DB=DB1 and collection=Col1A
writeConfig_DBb.Col2B=Write setting pointing to DB=DB2 and collection=Col2B
MongoSpark.save(DF_Transofmration1 , writeConfig_DBa.Col1A)
MongoSpark.save(DF_Transofmration2 , writeConfig_DBa.Col2B)
编辑1:
我试着运行解决方案。
文件夹的结构:
$find .
.
./src
./src/main
./src/main/scala
./src/main/scala/application.conf
./src/main/scala/SimpleApp.scala
./build.sbt
的内容 build.sbt
```
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector" % "2.4.1",
"org.apache.spark" %% "spark-core" % "2.4.1",
"org.apache.spark" %% "spark-sql" % "2.4.1"
)
的内容 `application.conf` :
config{
database {
"spark_mongodb_input_uri": "mongodb://127.0.0.1/test.myCollection",
"spark_mongodb_user":"",
"spark_mongodb_pass":"",
"spark_mongodb_input_database": "test",
"spark_mongodb_input_collection": "myCollection",
"spark_mongodb_input_readPreference_name": "",
"spark_mongodb_output_database": "test",
"spark_mongodb_output_collection": "myCollection"
}
newreaderone {
"database": "test",
"collection": "myCollection",
"readPreference.name": ""
}
newwriterone {
"uri":"mongodb://127.0.0.1/test.myCollection"
"database": "test",
"collection": "myCollection",
"replaceDocument": "false",//If set to True, updates an existing document
"readPreference.name": "",
"maxBatchSize": "128"
}
}
的内容 `SimpleApp.scala` :
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession
object FirstApp {
def main(args: Array[String]) {
import com.typesafe.{Config,ConfigFactory}
val appConfig: Config = ConfigFactory.load("application.conf")
import scala.collection.JavaConverters._
val initial_conf:Config = appconf.getConfig("config.database")
val confMap: Map[String,String] = initial_conf.entrySet()
.iterator.asScala
.map(e => e.getKey.replaceAll("_",".") -> e.getValue.unwrapped.toString).toMap
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame,SparkSession}
val sparkConfig: SparkConf=new SparkConf()
sparkConfig.setAll(confMap)
val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate
import com.mongodb.spark._
val data: DataFrame = MongoSpark.load(spark)
import com.mongodb.spark.config._
val nreader = appConfig.getConfig("config.newreaderone")
val readMap: Map[String,Any] = nreader.entrySet()
.iterator.asScala
.map(e => e.getKey -> e.getValue.unwrapped)
.toMap
val customReader = ReadConfig(readMap)
val newDF: DataFrame = spark.read.mongo(customReader)
resultDF.write.mode("append").mongo()
}
}
编译后出错:
sbt package
[info] Updated file /Path/3/project/build.properties: set sbt.version to 1.3.10
[info] Loading global plugins from /home/sadegh/.sbt/1.0/plugins
[info] Loading project definition from /Path/3/project
[info] Loading settings for project root-3 from build.sbt ...
[info] Set current project to root-3 (in build file:/Path/3/)
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] Compiling 1 Scala source to /Path/3/target/scala-2.11/classes ...
[error] /Path/3/src/main/scala/SimpleApp.scala:8:13: object typesafe is not a member of package com
[error] import com.typesafe.{Config,ConfigFactory}
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:9:17: not found: type Config
[error] val appConfig: Config = ConfigFactory.load("application.conf")
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:9:26: not found: value ConfigFactory
[error] val appConfig: Config = ConfigFactory.load("application.conf")
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:11:19: not found: type Config
[error] val initial_conf:Config = appconf.getConfig("config.database")
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:11:28: not found: value appconf
[error] val initial_conf:Config = appconf.getConfig("config.database")
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:19:56: not found: value sparkConf
[error] val spark: SparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport.getOrCreate
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:28:21: overloaded method value apply with alternatives:
[error] (options: scala.collection.Map[String,String])com.mongodb.spark.config.ReadConfig.Self
[error] (sparkConf: org.apache.spark.SparkConf)com.mongodb.spark.config.ReadConfig.Self
[error] (sqlContext: org.apache.spark.sql.SQLContext)com.mongodb.spark.config.ReadConfig.Self
[error] (sparkSession: org.apache.spark.sql.SparkSession)com.mongodb.spark.config.ReadConfig.Self
[error] (sparkContext: org.apache.spark.SparkContext)com.mongodb.spark.config.ReadConfig.Self
[error] cannot be applied to (scala.collection.immutable.Map[String,Any])
[error] val customReader = ReadConfig(readMap)
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:29:36: value mongo is not a member of org.apache.spark.sql.DataFrameReader
[error] val newDF: DataFrame = spark.read.mongo(customReader)
[error] ^
[error] /Path/3/src/main/scala/SimpleApp.scala:30:2: not found: value resultDF
[error] resultDF.write.mode("append").mongo()
[error] ^
[error] 9 errors found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 12 s, completed Jun 14, 2020 6:55:43 PM
1条答案
按热度按时间smdncfj31#
您可以将配置作为输入传递给hocon中的应用程序。您可以尝试下面的hocon配置片段来进行多读写配置。
以上配置已经过测试,可以通过typesafe配置库轻松读取。
更新:
在call application.conf文件中输入上述配置。使用以下代码行读取文件
步骤1:读取配置文件
步骤2:要初始化spark以读写mongodb,我们使用数据库部分下的配置,如下所示:
步骤3:创建sparksession
步骤4:从mongodb读取Dataframe
上面的步骤读取配置的数据库部分中指定的集合
步骤5:阅读新收藏:
步骤6:写入mongodb集合
以上代码写入配置的数据库部分下指定的集合
ii)向sparkconf中规定的托收行以外的托收行写信
更新:
整个代码如下所示,看看最后我将Dataframe保存到mongodb的两种方法
文件夹结构应如下所示:
更新:build.sbt