我们可以使用多个sparksessions访问两个不同的hive服务器吗

oxf4rvwz  于 2021-06-26  发布在  Hive
关注(0)|答案(2)|浏览(386)

我有一个场景来比较两个不同的表源和目标,从两个独立的远程配置单元服务器,我们可以使用两个吗 SparkSessions 好像我试过了below:-

val spark = SparkSession.builder().master("local")
  .appName("spark remote")
  .config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.160:3306/metastore?useSSL=false")
  .config("javax.jdo.option.ConnectionUserName", "hiveroot")
  .config("javax.jdo.option.ConnectionPassword", "hivepassword")
  .config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
  .config("hive.metastore.uris", "thrift://192.168.175.160:9083")
  .enableHiveSupport()
  .getOrCreate()

SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

val sparkdestination = SparkSession.builder()
  .config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.42:3306/metastore?useSSL=false")
  .config("javax.jdo.option.ConnectionUserName", "hiveroot")
  .config("javax.jdo.option.ConnectionPassword", "hivepassword")
  .config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
  .config("hive.metastore.uris", "thrift://192.168.175.42:9083")
  .enableHiveSupport()
  .getOrCreate()

我试过了 SparkSession.clearActiveSession() and SparkSession.clearDefaultSession() 但它不起作用,错误如下:

Hive: Failed to access metastore. This class should not accessed in runtime.
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

有没有其他方法可以使用多个配置单元访问两个配置单元表 SparkSessions 或者 SparkContext .
谢谢

vql8enpb

vql8enpb1#

看看
SparkSession getOrCreate 方法
哪个州
获取现有的sparksession,或者,如果没有现有的sparksession,则基于此生成器中设置的选项创建一个新的sparksession
此方法首先检查是否存在有效的线程本地sparksession,如果是,则返回该线程本地sparksession。然后检查是否存在有效的全局默认sparksession,如果是,则返回该sparksession。如果不存在有效的全局默认sparksession,则该方法将创建一个新的sparksession,并将新创建的sparksession指定为全局默认值。如果返回现有sparksession,则此生成器中指定的配置选项将应用于现有sparksession。
这就是它返回第一个会话及其配置的原因。
请仔细阅读文档,找出创建会话的其他方法。。
我在做<2Spark版本。因此,我不知道如何创建新的会话,而不发生配置冲突。。
但是,这里有一个有用的测试用例,即sparksessionbuildersuite.scala来做这个-diy。。
该测试用例中的示例方法

test("use session from active thread session and propagate config options") {
    val defaultSession = SparkSession.builder().getOrCreate()
    val activeSession = defaultSession.newSession()
    SparkSession.setActiveSession(activeSession)
    val session = SparkSession.builder().config("spark-config2", "a").getOrCreate()

    assert(activeSession != defaultSession)
    assert(session == activeSession)
    assert(session.conf.get("spark-config2") == "a")
    assert(session.sessionState.conf == SQLConf.get)
    assert(SQLConf.get.getConfString("spark-config2") == "a")
    SparkSession.clearActiveSession()

    assert(SparkSession.builder().getOrCreate() == defaultSession)
    SparkSession.clearDefaultSession()
  }
kq0g1dla

kq0g1dla2#

我用这种方式和工作完美的Spark2.1

val sc = SparkSession.builder()
             .config("hive.metastore.uris", "thrift://dbsyz1111:10000")
             .enableHiveSupport()
             .getOrCreate()

// Createdataframe 1 from by reading the data from hive table of metstore 1
val dataframe_1 = sc.sql("select * from <SourcetbaleofMetaStore_1>")

// Resetting the existing Spark Contexts
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

//Initialize Spark session2 with Hive Metastore 2
val spc2 = SparkSession.builder()
               .config("hive.metastore.uris", "thrift://dbsyz2222:10004")
               .enableHiveSupport()
               .getOrCreate()

// Load dataframe 2 of spark context 1 into a new dataframe of spark context2, By getting schema and data by converting to rdd  API
val dataframe_2 = spc2.createDataFrame(dataframe_1.rdd, dataframe_1.schema)

dataframe_2.write.mode("Append").saveAsTable(<targettableNameofMetastore_2>)

相关问题