在我的项目中,我想在接受spark流式处理信息后使用sparksession分析配置单元数据。我可以轻松创建sparksession,但它使用spark warehouse本身。sparksession无法通过我传递的代码连接到配置单元元存储。在spark conf目录中,它存在hive-site.xml,并且在spark jars中有配置单元jar。
JavaInputDStream<ConsumerRecord<String, String>> tablesinfo = KafkaUtils.createDirectStream(
javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topicsFromArgset, kafkaParams)
);
tablesinfo.foreachRDD(rdd -> {
if(!rdd.isEmpty()) {
SparkSession spark = JavaSparkSessionSingleton
.getInstance(rdd.context().getConf()
, prop.getProperty("metaUris"));
....
}
}
class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;
public static SparkSession getInstance(SparkConf sparkConf,String metaUri) {
if (instance == null) {
instance = SparkSession
.builder()
.config(sparkConf)
.config("hive.metastore.uris",metaUri)
.enableHiveSupport()
.getOrCreate();
}
return instance;
}
}
isn't a allowed opration? or i have mistakes in code?
暂无答案!
目前还没有任何答案,快来回答吧!