如何从一个位置读取表并将数据写入其他集群的表

pjngdqdw  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(338)

我从设置hive.metastore.uris的metastore启动spark应用程序读取表统计信息。但是,我需要将数据写入另一个配置单元。
我尝试清理活动会话和默认会话,用新的metastore uri构建另一个会话,但是spark继续尝试写入第一个配置单元的表。

val spark = SparkSession.builder()
          .appName(appName)
          .enableHiveSupport()
          .config("hive.metastore.uris", FIRST_METASTORE)
          .config("spark.sql.hive.convertMetastoreOrc", "false")
          .config("spark.sql.caseSensitive", "false")
          .config("hive.exec.dynamic.partition", "true")
          .config("hive.exec.dynamic.partition.mode", "nonstrict")
          .getOrCreate()

val df = spark.sql("DESCRIBE FORMATTED source_table")

SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

val spark2 = SparkSession.builder()
          .appName(appName)
          .enableHiveSupport()
          .config("hive.metastore.uris", NEW_MESTASTORE)
          .config("spark.sql.hive.convertMetastoreOrc", "false")
          .config("spark.sql.caseSensitive", "false")
          .config("hive.exec.dynamic.partition", "true")
          .config("hive.exec.dynamic.partition.mode", "nonstrict")
          .getOrCreate()

SparkSession.setDefaultSession(sparkSession2)
SparkSession.setActiveSession(sparkSession2)

df.write
      .format("parquet")
      .mode(SaveMode.Overwrite)
      .insertInto("other_cluster_table")
  }

正如我所说的,dataframe应该被写到新的metastore和catalog的表位置,但事实并非如此。这是因为接口dataframewriter从 df.sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) 为了插入到某个已有的表中,但我该如何处理它?

b1payxdu

b1payxdu1#

在阅读了多个sparkContext之后,我解决了这个问题,只需将parquet直接写到namenode/directory/to/partition/中,然后使用beeline将分区添加到表中。

相关问题