在sparksql中使用键值(和条件)组合对配置单元Map列进行筛选时出现的问题

9vw9lbht  于 2021-06-24  发布在  Hive
关注(0)|答案(2)|浏览(261)

我有一个Map列类型的配置单元表 key 它以键值对的形式存储值。我需要编写组合两个键值的过滤条件
示例数据集:

+---------------+--------------+----------------------+
| column_value  | metric_name  |         key          |
+---------------+--------------+----------------------+
| A37B          | Mean         | {0:"202006",1:"1"}  |
| ACCOUNT_ID    | Mean         | {0:"202006",1:"2"}  |
| ANB_200       | Mean         | {0:"202006",1:"3"}  |
| ANB_201       | Mean         | {0:"202006",1:"4"}  |
| AS82_RE       | Mean         | {0:"202006",1:"5"}  |
| ATTR001       | Mean         | {0:"202007",1:"2"}  |
| ATTR001_RE    | Mean         | {0:"202007",1:"3"}  |
| ATTR002       | Mean         | {0:"202007",1:"4"}  |
| ATTR002_RE    | Mean         | {0:"202007",1:"5"}  |
| ATTR003       | Mean         | {0:"202008",1:"3"}  |
| ATTR004       | Mean         | {0:"202008",1:"4"}  |
| ATTR005       | Mean         | {0:"202008",1:"5"}  |
| ATTR006       | Mean         | {0:"202009",1:"4"}  |
| ATTR006       | Mean         | {0:"202009",1:"5"}  |

我需要编写一个sparksql查询来根据 Key 列不在条件中,两个键同时存在。

select * from table where key[0] between 202006 and 202009 and key NOT IN (0:"202009",1:"5)

预期产量:

+---------------+--------------+----------------------+
| column_value  | metric_name  |         key          |
+---------------+--------------+----------------------+
| A37B          | Mean         | {0:"202006",1:"1"}  |
| ACCOUNT_ID    | Mean         | {0:"202006",1:"2"}  |
| ANB_200       | Mean         | {0:"202006",1:"3"}  |
| ANB_201       | Mean         | {0:"202006",1:"4"}  |
| AS82_RE       | Mean         | {0:"202006",1:"5"}  |
| ATTR001       | Mean         | {0:"202007",1:"2"}  |
| ATTR001_RE    | Mean         | {0:"202007",1:"3"}  |
| ATTR002       | Mean         | {0:"202007",1:"4"}  |
| ATTR002_RE    | Mean         | {0:"202007",1:"5"}  |
| ATTR003       | Mean         | {0:"202008",1:"3"}  |
| ATTR004       | Mean         | {0:"202008",1:"4"}  |
| ATTR005       | Mean         | {0:"202008",1:"5"}  |
| ATTR006       | Mean         | {0:"202009",1:"4"}  |
0md85ypi

0md85ypi1#

检查以下代码。
使用spark scala

scala> df.show(false)
+------------+-----------+------------------+
|column_value|metric_name|key               |
+------------+-----------+------------------+
|A37B        |Mean       |{0:"202006",1:"1"}|
|ACCOUNT_ID  |Mean       |{0:"202006",1:"2"}|
|ANB_200     |Mean       |{0:"202006",1:"3"}|
|ANB_201     |Mean       |{0:"202006",1:"4"}|
|AS82_RE     |Mean       |{0:"202006",1:"5"}|
|ATTR001     |Mean       |{0:"202007",1:"2"}|
|ATTR001_RE  |Mean       |{0:"202007",1:"3"}|
|ATTR002     |Mean       |{0:"202007",1:"4"}|
|ATTR002_RE  |Mean       |{0:"202007",1:"5"}|
|ATTR003     |Mean       |{0:"202008",1:"3"}|
|ATTR004     |Mean       |{0:"202008",1:"4"}|
|ATTR005     |Mean       |{0:"202008",1:"5"}|
|ATTR006     |Mean       |{0:"202009",1:"4"}|
|ATTR006     |Mean       |{0:"202009",1:"5"}|
+------------+-----------+------------------+

创建要匹配的架构 key 列值。

scala> import org.apache.spark.sql.types._

scala>  val schema = DataType
.fromJson("""{"type":"struct","fields":[{"name":"0","type":"string","nullable":true,"metadata":{}},{"name":"1","type":"string","nullable":true,"metadata":{}}]}""")
.asInstanceOf[StructType]

打印架构 key 列。

scala> schema.printTreeString
root
 |-- 0: string (nullable = true)
 |-- 1: string (nullable = true)

将schema json应用于dataframe中的键列。

scala> :paste
// Convert key column values to valid json & then apply schema json.
df
.withColumn("key_new",
    from_json(
        regexp_replace(
            regexp_replace(
                $"key",
                "0:",
                "\"0\":"
            ),
            "1:",
            "\"1\":"
        ),
        schema
    )
)
.filter(
    $"key_new.0".between(202006,202009) &&
    !($"key_new.0" === 202009 && $"key_new.1" === 5)
).show(false)

最终输出

+------------+-----------+------------------+-----------+
|column_value|metric_name|key               |key_new    |
+------------+-----------+------------------+-----------+
|A37B        |Mean       |{0:"202006",1:"1"}|[202006, 1]|
|ACCOUNT_ID  |Mean       |{0:"202006",1:"2"}|[202006, 2]|
|ANB_200     |Mean       |{0:"202006",1:"3"}|[202006, 3]|
|ANB_201     |Mean       |{0:"202006",1:"4"}|[202006, 4]|
|AS82_RE     |Mean       |{0:"202006",1:"5"}|[202006, 5]|
|ATTR001     |Mean       |{0:"202007",1:"2"}|[202007, 2]|
|ATTR001_RE  |Mean       |{0:"202007",1:"3"}|[202007, 3]|
|ATTR002     |Mean       |{0:"202007",1:"4"}|[202007, 4]|
|ATTR002_RE  |Mean       |{0:"202007",1:"5"}|[202007, 5]|
|ATTR003     |Mean       |{0:"202008",1:"3"}|[202008, 3]|
|ATTR004     |Mean       |{0:"202008",1:"4"}|[202008, 4]|
|ATTR005     |Mean       |{0:"202008",1:"5"}|[202008, 5]|
|ATTR006     |Mean       |{0:"202009",1:"4"}|[202009, 4]|
+------------+-----------+------------------+-----------+

使用spark sql

scala> spark.sql("select * from data").show(false)
+------------+-----------+------------------+
|column_value|metric_name|key               |
+------------+-----------+------------------+
|A37B        |Mean       |{0:"202006",1:"1"}|
|ACCOUNT_ID  |Mean       |{0:"202006",1:"2"}|
|ANB_200     |Mean       |{0:"202006",1:"3"}|
|ANB_201     |Mean       |{0:"202006",1:"4"}|
|AS82_RE     |Mean       |{0:"202006",1:"5"}|
|ATTR001     |Mean       |{0:"202007",1:"2"}|
|ATTR001_RE  |Mean       |{0:"202007",1:"3"}|
|ATTR002     |Mean       |{0:"202007",1:"4"}|
|ATTR002_RE  |Mean       |{0:"202007",1:"5"}|
|ATTR003     |Mean       |{0:"202008",1:"3"}|
|ATTR004     |Mean       |{0:"202008",1:"4"}|
|ATTR005     |Mean       |{0:"202008",1:"5"}|
|ATTR006     |Mean       |{0:"202009",1:"4"}|
|ATTR006     |Mean       |{0:"202009",1:"5"}|
+------------+-----------+------------------+
scala> :paste
// Entering paste mode (ctrl-D to finish)

spark.sql("""
WITH table_data AS (
    SELECT
        column_value,
        metric_name,
        key,
        get_json_object(replace(replace(key,'0:','\"0\":'),'1:','\"1\":'),'$.0') as k,
        get_json_object(replace(replace(key,'0:','\"0\":'),'1:','\"1\":'),'$.1') as v
    FROM data
)
SELECT
    column_value,
    metric_name,
    key,
    k,
    v
FROM table_data
WHERE
    (k between 202006 and 202009) AND
    !(k = 202009 AND V = 5)
""").show(false)

// Exiting paste mode, now interpreting.

+------------+-----------+------------------+------+---+
|column_value|metric_name|key               |k     |v  |
+------------+-----------+------------------+------+---+
|A37B        |Mean       |{0:"202006",1:"1"}|202006|1  |
|ACCOUNT_ID  |Mean       |{0:"202006",1:"2"}|202006|2  |
|ANB_200     |Mean       |{0:"202006",1:"3"}|202006|3  |
|ANB_201     |Mean       |{0:"202006",1:"4"}|202006|4  |
|AS82_RE     |Mean       |{0:"202006",1:"5"}|202006|5  |
|ATTR001     |Mean       |{0:"202007",1:"2"}|202007|2  |
|ATTR001_RE  |Mean       |{0:"202007",1:"3"}|202007|3  |
|ATTR002     |Mean       |{0:"202007",1:"4"}|202007|4  |
|ATTR002_RE  |Mean       |{0:"202007",1:"5"}|202007|5  |
|ATTR003     |Mean       |{0:"202008",1:"3"}|202008|3  |
|ATTR004     |Mean       |{0:"202008",1:"4"}|202008|4  |
|ATTR005     |Mean       |{0:"202008",1:"5"}|202008|5  |
|ATTR006     |Mean       |{0:"202009",1:"4"}|202009|4  |
+------------+-----------+------------------+------+---+

scala>
kzmpq1sx

kzmpq1sx2#

使用map()函数将not in参数转换为map:

select * from your_data 
 where key[0] between  202006 and 202009 
   and key NOT IN ( map(0,"202009",1,"5") ); --can be many map() comma separated

相关问题