如何使用子查询和Map查找进行高阶函数转换?

d4so4syb  于 2021-05-24  发布在  Spark
关注(0)|答案(2)|浏览(565)

这是我上一个问题的后续问题

scala> val map1 = spark.sql("select map('s1', 'p1', 's2', 'p2', 's3', 'p3') as lookup")

map1:org.apache.spark.sql.dataframe=[查找:map<string,string>]

scala> val ds1 = spark.sql("select 'p1' as p, Array('s2','s3') as c")

ds1:org.apache.spark.sql.dataframe=[p:string,c:array]

scala>  ds1.createOrReplaceTempView("ds1")

scala> map1.createOrReplaceTempView("map1")

scala> map1.show()
+--------------------+
|              lookup|
+--------------------+
|[p1 -> s1, p2 -> ...|
+--------------------+

scala> ds1.show()
+---+--------+
|  p|       c|
+---+--------+
| p1|[s2, s3]|
+---+--------+

map1.selectExpr("element_at(`lookup`, 's2')").first()

res50:org.apache.spark.sql.row=[p2]

scala> spark.sql("select element_at(`lookup`, 's1') from map1").show()
+----------------------+
|element_at(lookup, s1)|
+----------------------+
|                    p1|
+----------------------+

到现在为止,一直都还不错。在接下来的两个步骤中,我将讨论一些问题:

scala> ds1.selectExpr("p", "c", "transform(c, cs -> map1.selectExpr('element_at(`lookup`, cs)')) as cs").show()

20/09/28 19:44:59 warn hiveconf:名称为hive.stats.jdbc.timeout的hiveconf不存在20/09/28 19:44:59 warn hiveconf:名称为hive.stats.retries.wait的hiveconf不存在20/09/28 19:45:03 warn objectstore:在metastore中找不到版本信息。未启用hive.metastore.schema.verification,因此录制架构版本2.3.0 20/09/28 19:45:03警告对象存储:已调用setmetastoreschemaversion,但已禁用录制版本:version=2.3.0,comment=set by metastoreroot@10.1.21.76 20/09/28 19:45:03警告objectstore:获取数据库Map1失败,返回nosuchobjectexception org.apache.spark.sql.analysisexception:未定义的函数:“selectexpr”。此函数既不是已注册的临时函数,也不是在数据库“map1”中注册的永久函数。;第1行位置19

scala> spark.sql("""select p, c, transform(c, cs -> (select element_at(`lookup`, cs) from map1)) cc from ds1""").show()

org.apache.spark.sql.analysisexception:无法解析' cs '给定的输入列:[map1.lookup];第1行位置61;'将[p#329,c#330,transform(c#330,lambdafunction(标量子查询#713[],lambda cs#715,false))投影为cc#714]:+-'project[unsolvedaalias('element#u at(lookup#327,'cs),none)]:+-subqueryalias map1:+-project[map(s1,p1,s2,p2,s3,p3)as lookup#327]:+-onerowrationrelation+-subqueryalias ds1+-project[p#329,array(s2,s3)如c#330]+-onerowrelatio
如何解决这些问题?

eh57zj3b

eh57zj3b1#

只需将表名添加到 from 条款。

spark.sql("""select p, c, transform(c, cs -> element_at(`lookup`, cs)) cc from ds1 a, map1 b""").show()

+---+--------+--------+
|  p|       c|      cc|
+---+--------+--------+
| p1|[s2, s3]|[p2, p3]|
+---+--------+--------+
ih99xse1

ih99xse12#

如果 map1 如果没有太多行,可以对从中的数组提取的所有值集进行交叉联接 c 柱。

spark.sql("select col as value, element_at(map1.lookup, col) as key +
          "from (select explode(ds1.c) from ds1) as v cross join map1")

结果(将上面的值赋给dataframe类型的值,并调用 .show ):

+-----+---+
|value|key|
+-----+---+
|   s2| p2|
|   s3| p3|
+-----+---+

相关问题