sparksql从worker查询配置单元表

mtb9vblg  于 2021-06-26  发布在  Hive
关注(0)|答案(0)|浏览(240)

我试图从spark中的Map操作查询配置单元表,但当它运行查询时,执行被冻结。
这是我的测试代码

val sc = new SparkContext(conf)
val datasetPath = "npiCodesMin.csv"
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()

val df = sparkSession.read.option("header", true).option("sep", ",").csv(datasetPath)
df.createOrReplaceTempView("npicodesTmp")
sparkSession.sql("DROP TABLE IF EXISTS npicodes");
sparkSession.sql("CREATE TABLE npicodes AS SELECT * FROM npicodesTmp");

val res = sparkSession.sql("SELECT * FROM npicodes WHERE NPI = '1588667638'") //This works
println(res.first())

val NPIs = sc.parallelize(List("1679576722", "1588667638", "1306849450", "1932102084"))//Some existing NPIs

val rows = NPIs.mapPartitions{ partition =>
  val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
  partition.map{code =>
    val res = sparkSession.sql("SELECT * FROM npicodes WHERE NPI = '"+code+"'")//The program stops here
    res.first()
  }
}

rows.collect().foreach(println)

它从csv加载数据,创建一个新的配置单元表并用csv数据填充它。然后,如果我从主表查询表,它工作得很好,但是如果我在Map操作中尝试这样做,执行就会被冻结。它不产生任何错误,它继续运行而不做任何事情。
spark ui显示了这种情况

实际上,我不确定是否可以分布式地查询表,我在文档中找不到它。有什么建议吗?
谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题