impala内置函数不可用

pu82cl6c  于 2021-06-26  发布在  Impala
关注(0)|答案(1)|浏览(480)

我在 Impala 中使用了一个内置函数,比如:

select id, parse_url(my_table.url, "QUERY", "extensionId") from my_table

现在我正在迁移到sparksql(使用jupyter笔记本中的pyspark):

my_table.select(my_table.id.cast('string'), parse_url(my_table.url.cast('string'), "QUERY", "extensionId")).show()

但是,我得到了以下错误:

NameError: name 'parse_url' is not defined

还尝试了以下方法:

my_table.registerTempTable("my_table")

sqlContext.sql("select id, url, parse_url(url, 'QUERY', 'extensionId') as new_url from my_table").show(100)

但是所有的 new_url 变成 null .
你知道我错过了什么吗?另外,人们会如何处理这样的问题?谢谢!

wnvonmuf

wnvonmuf1#

一些缺失部分:
不能用spark执行impala函数。
有一个具有相同名称和语法的配置单元udf可以与spark一起使用,但它没有本机实现和函数 Package 器。这就是为什么可以使用sql调用它 HiveContext / SparkSession 有Hive的支持。
一般来说,它应该工作得很好:

spark.sql("""SELECT parse_url(
    'http://example.com?extensionId=foo', 'QUERY', 'extensionId'
)""").show()
+-----------------------------------------------------------------+
|parse_url(http://example.com?extensionId=foo, QUERY, extensionId)|
+-----------------------------------------------------------------+
|                                                              foo|
+-----------------------------------------------------------------+

以及 NULL 输出意味着给定部分无法匹配:

spark.sql("""SELECT parse_url(
    'http://example.com?bar=foo', 'QUERY', 'extensionId'
)""").show()
+---------------------------------------------------------+
|parse_url(http://example.com?bar=foo, QUERY, extensionId)|
+---------------------------------------------------------+
|                                                     null|
+---------------------------------------------------------+

使用自定义项可以获得类似的结果,但速度会明显减慢。

from typing import Dict
from urllib.parse import parse_qsl, urlsplit
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, MapType

def parse_args(col: str) -> Dict[str, str]:
    """
    http://stackoverflow.com/a/21584580/6910411
    """
    try:
        return dict(parse_qsl(urlsplit(col).query))
    except:
        pass

parse_args_ = udf(parse_args, MapType(StringType(), StringType()))

数据定义为:

df = sc.parallelize([
    ("http://example.com?bar=foo", ),
    ("http://example.com?extensionId=foo", ),
]).toDF(["url"])

其用途如下:

df.select(parse_args_("url")["extensionId"]).show()

结果是:

+----------------------------+
|parse_args(url)[extensionId]|
+----------------------------+
|                        null|
|                         foo|
+----------------------------+

相关问题