sparksession无法在sparkstreaming操作中连接到配置单元

oyt4ldly  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(160)

在我的项目中,我想在接受spark流式处理信息后使用sparksession分析配置单元数据。我可以轻松创建sparksession,但它使用spark warehouse本身。sparksession无法通过我传递的代码连接到配置单元元存储。在spark conf目录中,它存在hive-site.xml,并且在spark jars中有配置单元jar。

JavaInputDStream<ConsumerRecord<String, String>> tablesinfo = KafkaUtils.createDirectStream(
                javaStreamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topicsFromArgset, kafkaParams)
        );
tablesinfo.foreachRDD(rdd -> {

      if(!rdd.isEmpty()) {
           SparkSession spark = JavaSparkSessionSingleton
                        .getInstance(rdd.context().getConf()
                         , prop.getProperty("metaUris"));
....
      }
}

class JavaSparkSessionSingleton {
    private static transient SparkSession instance = null;
    public static SparkSession getInstance(SparkConf sparkConf,String metaUri) {
        if (instance == null) {
            instance = SparkSession
                    .builder()
                    .config(sparkConf)
                    .config("hive.metastore.uris",metaUri)
                    .enableHiveSupport()
                    .getOrCreate();
        }
        return instance;
    }
}

isn't a allowed opration? or i have mistakes in code?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题