我试图从spark中的Map操作查询配置单元表,但当它运行查询时,执行被冻结。
这是我的测试代码
val sc = new SparkContext(conf)
val datasetPath = "npiCodesMin.csv"
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
val df = sparkSession.read.option("header", true).option("sep", ",").csv(datasetPath)
df.createOrReplaceTempView("npicodesTmp")
sparkSession.sql("DROP TABLE IF EXISTS npicodes");
sparkSession.sql("CREATE TABLE npicodes AS SELECT * FROM npicodesTmp");
val res = sparkSession.sql("SELECT * FROM npicodes WHERE NPI = '1588667638'") //This works
println(res.first())
val NPIs = sc.parallelize(List("1679576722", "1588667638", "1306849450", "1932102084"))//Some existing NPIs
val rows = NPIs.mapPartitions{ partition =>
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
partition.map{code =>
val res = sparkSession.sql("SELECT * FROM npicodes WHERE NPI = '"+code+"'")//The program stops here
res.first()
}
}
rows.collect().foreach(println)
它从csv加载数据,创建一个新的配置单元表并用csv数据填充它。然后,如果我从主表查询表,它工作得很好,但是如果我在Map操作中尝试这样做,执行就会被冻结。它不产生任何错误,它继续运行而不做任何事情。
spark ui显示了这种情况
实际上,我不确定是否可以分布式地查询表,我在文档中找不到它。有什么建议吗?
谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!