如何在没有Hive-site.xml的情况下将Spark SQL连接到远程Hive元存储(通过节约协议)?

shstlldc  于 2022-09-27  发布在  Hive
关注(0)|答案(11)|浏览(145)

我将HiveContext与SparkSQL结合使用,并尝试连接到远程配置单元元存储,设置配置单元元储存的唯一方法是包含配置单元站点。类路径上的xml(或将其复制到/etc/spark/conf/)。
有没有办法在java代码中以编程方式设置此参数,而不包括配置单元站点。xml?如果是,Spark配置要使用什么?

zengzsys

zengzsys1#

对于Spark 1.x,可以设置为:

System.setProperty("hive.metastore.uris", "thrift://METASTORE:9083");

final SparkConf conf = new SparkConf();
SparkContext sc = new SparkContext(conf);
HiveContext hiveContext = new HiveContext(sc);

final SparkConf conf = new SparkConf();
SparkContext sc = new SparkContext(conf);
HiveContext hiveContext = new HiveContext(sc);
hiveContext.setConf("hive.metastore.uris", "thrift://METASTORE:9083");

如果您的配置单元被Kerberized,请更新

尝试在创建HiveContext之前设置这些:

System.setProperty("hive.metastore.sasl.enabled", "true");
System.setProperty("hive.security.authorization.enabled", "false");
System.setProperty("hive.metastore.kerberos.principal", hivePrincipal);
System.setProperty("hive.metastore.execute.setugi", "true");
snvhrwxg

snvhrwxg2#

在spark 2.0.+中,它应该看起来像这样:
别忘了用你的“hive.metastore.uri”替换它。这假设您已经启动了配置单元元存储服务(而不是配置单元服务器)。

val spark = SparkSession
          .builder()
          .appName("interfacing spark sql to hive metastore without configuration file")
          .config("hive.metastore.uris", "thrift://localhost:9083") // replace with your hivemetastore service's thrift url
          .enableHiveSupport() // don't forget to enable hive support
          .getOrCreate()

        import spark.implicits._
        import spark.sql
        // create an arbitrary frame
        val frame = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("word", "count")
        // see the frame created
        frame.show()
        /**
         * +-----+-----+
         * | word|count|
         * +-----+-----+
         * |  one|    1|
         * |  two|    2|
         * |three|    3|
         * +-----+-----+
         */
        // write the frame
        frame.write.mode("overwrite").saveAsTable("t4")
wbgh16ku

wbgh16ku3#

我也面临同样的问题,但解决了。只需在Spark 2.0版本中执行以下步骤

**步骤1:**复制配置单元站点。从Hive conf文件夹到spark conf.的xml文件
**步骤2:**编辑Spark环境。sh文件并配置mysql驱动程序。(如果您使用Mysql作为配置单元元存储。)

或者将MySQL驱动程序添加到Maven/SBT(如果使用这些驱动程序)

**步骤3:**创建spark会话时,添加enableHiveSupport()

val spark=SparkSession.builder.master(“local”).appName(“testing”).enableHiveSupport().getOrCreate()

示例代码:

package sparkSQL

/**
  * Created by venuk on 7/12/16.
  */

import org.apache.spark.sql.SparkSession

object hivetable {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.master("local[*]").appName("hivetable").enableHiveSupport().getOrCreate()

    spark.sql("create table hivetab (name string, age int, location string) row format delimited fields terminated by ',' stored as textfile")
    spark.sql("load data local inpath '/home/hadoop/Desktop/asl' into table hivetab").show()
    val x = spark.sql("select * from hivetab")
    x.write.saveAsTable("hivetab")
  }
}

输出:

qcbq4gxm

qcbq4gxm4#

Spark版本:2.0.2
配置单元版本:1.2.1
下面是我从Spark连接到Hive元存储的Java代码:

import org.apache.spark.sql.SparkSession;

public class SparkHiveTest {

    public static void main(String[] args) {

        SparkSession spark = SparkSession
                  .builder()
                  .appName("Java Spark Hive Example")
                  .config("spark.master", "local")
                  .config("hive.metastore.uris",                
                   "thrift://abc123.com:9083")
                  .config("spark.sql.warehouse.dir", "/apps/hive/warehouse")
                  .enableHiveSupport()
                  .getOrCreate();

        spark.sql("SELECT * FROM default.survey_data limit 5").show();
    }
}
gmol1639

gmol16395#

有些类似问题标记为重复,这是为了从Spark连接到配置单元,而不使用hive.metastore.uris或单独的节约服务器(9083),也不复制配置单元站点。xml到SPARK_CONF_DIR。

import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder()
  .appName("hive-check")
  .config(
    "spark.hadoop.javax.jdo.option.ConnectionURL",
    "JDBC_CONNECT_STRING"
  )
  .config(
    "spark.hadoop.javax.jdo.option.ConnectionDriverName",
    "org.postgresql.Driver"
  )
  .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
  .config("spark.hadoop.javax.jdo.option.ConnectionUserName", "JDBC_USER")
  .config("spark.hadoop.javax.jdo.option.ConnectionPassword", "JDBC_PASSWORD")
  .enableHiveSupport()
  .getOrCreate()
spark.catalog.listDatabases.show(false)
y1aodyip

y1aodyip6#

在尝试从spark连接到hive元存储时,我观察到一个奇怪的行为,而没有使用hive-site.xml。
当我们在创建SparkSession时在spark代码中使用hive.metastore.uris属性时,一切都正常。但如果我们不在代码中指定,而是在使用带有--conf标志的spark-shellspark-submit时指定,则它将不起作用。
它将引发如下所示的警告,并且不会连接到远程元存储。

Warning: Ignoring non-Spark config property: hive.metastore.uris

一种解决方法是使用下面的属性。

spark.hadoop.hive.metastore.uris
4ngedf3f

4ngedf3f7#

对于Spark 3.x:

// Scala
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", "hive_warehouse_hdfs_path")
  .enableHiveSupport()
  .getOrCreate()

# Python

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", "hive_warehouse_hdfs_path") \
    .enableHiveSupport() \
    .getOrCreate()

检查可用的数据库:

spark.catalog.listDatabases().show()

source

but5z9lq

but5z9lq8#

下面的代码对我有用。我们可以忽略本地元存储的hive.metastore.uris配置,spark将在本地备用仓库目录中创建hive对象。

import org.apache.spark.sql.SparkSession;

object spark_hive_support1 
{
  def main (args: Array[String]) 
   {
    val spark = SparkSession
      .builder()
      .master("yarn")
      .appName("Test Hive Support")
      //.config("hive.metastore.uris", "jdbc:mysql://localhost/metastore")
      .enableHiveSupport
      .getOrCreate();

    import spark.implicits._

    val testdf = Seq(("Word1", 1), ("Word4", 4), ("Word8", 8)).toDF;
    testdf.show;
    testdf.write.mode("overwrite").saveAsTable("WordCount");
  }
}
w8biq8rn

w8biq8rn9#

在Hadoop 3中,Spark和Hive目录是分开的,因此:
对于Spark外壳(默认为.enableHiveSupport()),只需尝试:

pyspark-shell --conf spark.hadoop.metastore.catalog.default=hive

对于spark submit作业,创建如下所示的spark session:

SparkSession.builder.appName("Test").enableHiveSupport().getOrCreate()

然后在spark-submit命令中添加以下conf:

--conf spark.hadoop.metastore.catalog.default=hive

但对于ORC表(以及更一般的内部表),建议使用HiveWareHouse Connector。

0g0grzrc

0g0grzrc10#

正在设置spark.hadoop.metastore.catalog。default=Hive对我有效。

up9lanfz

up9lanfz11#

我在Spark 2.4.8、Spark 3.1.3或Spark 3.2.2中遇到以下错误。hadoop版本是3.2、Hbase 2.4.14、Hive 3.1.13和Scala 2.12

线程“main”java.io中出现异常。IOException:由于以前的错误,无法创建记录读取器。有关更多详细信息,请查看任务完整日志中以前的日志行。在org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)的org.apacher.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInput_FormatBase.java:253),在org.aaphe.sparket.rdd.rdd.$anonfun$partitions$2(rdd.scala:300),我调用spark-submit,如下所示

export HBASE_JAR_FILES="/usr/local/hbase/lib/hbase-unsafe-4.1.1.jar,/usr/local/hbase/lib/hbase-common-2.4.14.jar,/usr/local/hbase/lib/hbase-client-2.4.14.jar,/usr/local/hbase/lib/hbase-protocol-2.4.14.jar,/usr/local/hbase/lib/guava-11.0.2.jar,/usr/local/hbase/lib/client-facing-thirdparty/htrace-core4-4.2.0-incubating.jar" 

/opt/spark/bin/spark-submit --master local[*] --deploy-mode client --num-executors 1 --executor-cores 1 --executor-memory 480m --driver-memory 512m --driver-class-path $(echo $HBASE_JAR_FILES | tr ',' ':') --jars "$HBASE_JAR_FILES" --files /usr/local/hive/conf/hive-site.xml --conf "spark.hadoop.metastore.catalog.default=hive" --files /usr/local/hbase/conf/hbase-site.xml --class com.hbase.dynamodb.migration.HbaseToDynamoDbSparkMain --conf "spark.driver.maxResultSize=256m" /home/hadoop/scala-2.12/sbt-1.0/HbaseToDynamoDb-assembly-0.1.0-SNAPSHOT.jar

The code is as follows.

val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("Hbase To DynamoDb migration demo")
      .config("hive.metastore.warehouse.dir", "/user/hive/warehouse")
      .config("hive.metastore.uris","thrift://localhost:9083")
      .enableHiveSupport()
      .getOrCreate()

    spark.catalog.listDatabases().show()
   val sqlDF = spark.sql("select rowkey, office_address, office_phone, name, personal_phone from hvcontacts")

sqlDF.show()

配置单元外部表是在Hbase顶部创建的,如下所示。

create external table if not exists hvcontacts (rowkey STRING, office_address STRING, office_phone STRING, name STRING, personal_phone STRING) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,Office:Address,Office:Phone,Personal:name,Personal:Phone') TBLPROPERTIES ('hbase.table.name' = 'Contacts');

元存储在mysql中,我可以查询tbls表来验证hive中的外部表。还有其他人面临类似的问题吗?
注意:我这里没有使用HiveSpark连接器。

相关问题