拆分并获取属性

ca1c2owp  于 2021-05-27  发布在  Hadoop
关注(0)|答案(3)|浏览(383)

输入json文件

{
        "CarBrands": [{
                "carid": "100bw",
                "filter_condition": " (YEAR == \"2009\" AND FACTS BETWEEN 0001 AND 200 AND STORE==\"UK"\" AND RESALE in  (\"2015\")) ",
            },
            {
                "carid": "25xw",
                "filter_condition": " (YEAR == \"2010\" AND FACTS  NOT IN (234,435,456) AND FACTS between 220 AND 500 AND RESALE in  (\"2017\")) ",
            },
            {
                "carid": "masy",
                "filter_condition": " (YEAR == \"2010\" AND  STORE==\"USA"\" AND (FACTS BETWEEN 600 AND 700 OR FACTS BETWEEN 810 AND 920)  AND RESALE in  (\"2018\")) ",
            },

            {
                "carid": "mxw",
                "filter_condition": " (YEAR == \"2013\" AND  FACTS ==\"1541\" AND RESALE in  (\"2019\")) ",
            }
        ]
    }

请注意:我们有一个事实表,上面提到的过滤条件来自jsonapi。
以下是需要实现的目标

Select * from Car_transactions where car_facts = (FACTS BETWEEN 0001 AND 200 ) OR (FACTS  NOT IN (234,435,456) AND FACTS between 220 AND 500)  

OR (FACTS BETWEEN 600 AND 700 OR FACTS BETWEEN 810 AND 920) OR FACTS =541

import sparkSession.implicits._
val tagsDF = sparkSession.read.option("multiLine", true).option("inferSchema", true).json("src/main/resources/carbrands.json");
val df = tagsDF.select(($"CarBrands") as "car_brands")
tez616oj

tez616oj1#

你可以用 regexp_extract 正则表达式模式匹配一个或多个“在…之间转售”子句,如下所示:

val df = Seq(
  "YEAR = 2009 AND ACTIVE in ('N') AND RESALE BETWEEN 2015 AND 2018 OR RESALE BETWEEN 2009 AND 2011 AND MAKE=\"BMW\"",
  "YEAR = 2010 AND ACTIVE in ('Y') AND RESALE BETWEEN 2016 AND 2017 AND MAKE=\"AUDI\"",
  "YEAR = 2011 AND ACTIVE in ('N') AND RESALE BETWEEN 2017 AND 2019 OR RESALE BETWEEN 2015 AND 2016 OR RESALE BETWEEN 2012 AND 2013 AND MAKE=\"HONDA\"",
  "YEAR = 2012 AND ACTIVE in ('N') AND MAKE=\"BMW\""
).toDF("input_string")

val pattern = """(?:(\s+OR\s+)?RESALE\sBETWEEN\s+\d{4}\s+AND\s+\d{4})+"""

df.select(regexp_extract($"input_string", pattern, 0).as("resale_condition")).show(false)
// +--------------------------------------------------------------------------------------------+
// |resale_condition                                                                            |
// +--------------------------------------------------------------------------------------------+
// |RESALE BETWEEN 2015 AND 2018 OR RESALE BETWEEN 2009 AND 2011                                |
// |RESALE BETWEEN 2016 AND 2017                                                                |
// |RESALE BETWEEN 2017 AND 2019 OR RESALE BETWEEN 2015 AND 2016 OR RESALE BETWEEN 2012 AND 2013|
// |                                                                                            |
// +--------------------------------------------------------------------------------------------+
q3qa4bjr

q3qa4bjr2#

你可以用这个试试 regex_extract 在spark scala

val df = Seq(
  "YEAR = 2009 AND ACTIVE in ('N') AND RESALE BETWEEN 2015 AND 2018 OR RESALE BETWEEN 2009 AND 2011 AND AREA=='AZ13' AND MAKE='BMW'",
  "YEAR = 2010 AND ACTIVE in ('Y') AND RESALE BETWEEN 2016 AND 2017 AND AREA=='FAZ45' AND MAKE='AUDI'",
  "YEAR = 2011 AND ACTIVE in ('N') AND RESALE BETWEEN 2017 AND 2019 OR RESALE BETWEEN 2015 AND 2016 OR RESALE BETWEEN 2012 AND 2013 AND AREA=='07YW' AND MAKE='TOYOTA'",
  "YEAR = 2011 AND ACTIVE in ('N') AND RESALE=2000 AND AREA='PH123' AND MAKE='HONDA'",
  "YEAR = 2015 AND ACTIVE in ('N') AND RESALE NOT IN (1999,1998,2001,2002) AND RESALE BETWEEN 2015 AND 2016 OR RESALE BETWEEN 2012 AND 2013 AND AREA=='CA2BC' AND MAKE='NISSAN'",
  "YEAR = 2015 AND ACTIVE in ('N') AND RESALE IN (2001,2002) OR RESALE BETWEEN 2004 AND 2016 AND AREA=='AM13Y' AND MAKE='TESLA'",
  "YEAR = 2012 AND ACTIVE in ('N') AND AREA=='ML12A' AND MAKE='BMW'"
).toDF("Area_Condition")

scala> df.show(false)
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Area_Condition                                                                                                                                                              |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|YEAR = 2009 AND ACTIVE in ('N') AND RESALE BETWEEN 2015 AND 2018 OR RESALE BETWEEN 2009 AND 2011 AND AREA=='AZ13' AND MAKE='BMW'                                            |
|YEAR = 2010 AND ACTIVE in ('Y') AND RESALE BETWEEN 2016 AND 2017 AND AREA=='FAZ45' AND MAKE='AUDI'                                                                          |
|YEAR = 2011 AND ACTIVE in ('N') AND RESALE BETWEEN 2017 AND 2019 OR RESALE BETWEEN 2015 AND 2016 OR RESALE BETWEEN 2012 AND 2013 AND AREA=='07YW' AND MAKE='TOYOTA'         |
|YEAR = 2011 AND ACTIVE in ('N') AND RESALE=2000 AND AREA='PH123' AND MAKE='HONDA'                                                                                           |
|YEAR = 2015 AND ACTIVE in ('N') AND RESALE NOT IN (1999,1998,2001,2002) AND RESALE BETWEEN 2015 AND 2016 OR RESALE BETWEEN 2012 AND 2013 AND AREA=='CA2BC' AND MAKE='NISSAN'|
|YEAR = 2015 AND ACTIVE in ('N') AND RESALE IN (2001,2002) OR RESALE BETWEEN 2004 AND 2016 AND AREA=='AM13Y' AND MAKE='TESLA'                                                |
|YEAR = 2012 AND ACTIVE in ('N') AND AREA=='ML12A' AND MAKE='BMW'                                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

符合您需求的模式。

scala> val pattern = """(?:(\s+OR\s+)?RESALE|\.|\.[a-zA-Z]+\s+\d{4}\s+AND\s+\d{4})+(.*)"""
scala> val pattern2 = """(AREA\S+[A-Za-z0-9]+.)"""

scala> df.select(regexp_extract($"Area_Condition", pattern, 0).as("Resale_Condition"),regexp_extract($"Area_Condition", pattern2, 0).as("Area_Condition")).show(false)
+----------------------------------------------------------------------------------------------------------------------------------------+--------------+
|Resale_Condition                                                                                                                        |Area_Condition|
+----------------------------------------------------------------------------------------------------------------------------------------+--------------+
|RESALE BETWEEN 2015 AND 2018 OR RESALE BETWEEN 2009 AND 2011 AND AREA=='AZ13' AND MAKE='BMW'                                            |AREA=='AZ13'  |
|RESALE BETWEEN 2016 AND 2017 AND AREA=='FAZ45' AND MAKE='AUDI'                                                                          |AREA=='FAZ45' |
|RESALE BETWEEN 2017 AND 2019 OR RESALE BETWEEN 2015 AND 2016 OR RESALE BETWEEN 2012 AND 2013 AND AREA=='07YW' AND MAKE='TOYOTA'         |AREA=='07YW'  |
|RESALE=2000 AND AREA='PH123' AND MAKE='HONDA'                                                                                           |AREA='PH123'  |
|RESALE NOT IN (1999,1998,2001,2002) AND RESALE BETWEEN 2015 AND 2016 OR RESALE BETWEEN 2012 AND 2013 AND AREA=='CA2BC' AND MAKE='NISSAN'|AREA=='CA2BC' |
|RESALE IN (2001,2002) OR RESALE BETWEEN 2004 AND 2016 AND AREA=='AM13Y' AND MAKE='TESLA'                                                |AREA=='AM13Y' |
|                                                                                                                                        |AREA=='ML12A' |
+----------------------------------------------------------------------------------------------------------------------------------------+--------------+
eqoofvh9

eqoofvh93#

package sample
import java.util

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame, Row}

object splitObj {
  def main(args: Array[String]) = {
    implicit val sparkSession = SparkSession.builder().master("local").getOrCreate()
    import sparkSession.implicits._
    val tagsDF = sparkSession.read.option("multiLine", true).option("inferSchema", true).json("src/main/resources/sample.json")
    val df = tagsDF.select(explode($"CarBrands") as "car_brands").select($"car_brands.*")
    df.show(false)
 /* +-----+-----------------------------------------------------------------------------------------------------------------------+
    |carid|filter_condition                                                                                                       |
    +-----+-----------------------------------------------------------------------------------------------------------------------+
    |100bw| (YEAR == "2009" AND FACTS BETWEEN 0001 AND 200 AND STORE=="UK" AND RESALE in ("2015"))                                |
    |25xw | (YEAR == "2010" AND FACTS NOT IN (234,435,456) AND FACTS between 220 AND 500 AND RESALE in ("2017"))                  |
    |masy | (YEAR == "2010" AND STORE=="USA" AND (FACTS BETWEEN 600 AND 700 OR FACTS BETWEEN 810 AND 920) AND RESALE in ("2018")) |
    |mxw  | (YEAR != "2013" AND  FACTS =="1541" AND RESALE in  ("2019"))                                                          |
    +-----+-----------------------------------------------------------------------------------------------------------------------+*/
    val df_where = pasrseWhereClause(df).toDF("carid","where_condition")
    val f = df.join(df_where,Seq("carid"),"inner").select($"carid",explode($"where_condition").as("condn"))
    f.show(false)

  /*+-----+----------------------------------------------------------------------------+
    |carid|condn                                                                       |
    +-----+----------------------------------------------------------------------------+
    |100bw|(YEAR == 2009)                                                              |
    |100bw|((FACTS >= 1) AND (FACTS <= 200))                                           |
    |100bw|(STORE == UK)                                                               |
    |100bw|RESALE IN (2015)                                                            |
    |25xw |(YEAR == 2010)                                                              |
    |25xw |FACTS NOT IN (234,435,456)                                                  |
    |25xw |((FACTS >= 220) AND (FACTS <= 500))                                         |
    |25xw |RESALE IN (2017)                                                            |
    |masy |(YEAR == 2010)                                                              |
    |masy |(STORE == USA)                                                              |
    |masy |(((FACTS >= 600) AND (FACTS <= 700)) OR ((FACTS >= 810) AND (FACTS <= 920)))|
    |masy |RESALE IN (2018)                                                            |
    |mxw  |(YEAR != 2013)                                                              |
    |mxw  |(FACTS == 1541)                                                             |
    |mxw  |RESALE IN (2019)                                                            |
    +-----+----------------------------------------------------------------------------+*/
    val col_list = Seq("resale","facts")
    val fil_col_df = col_list.map({c =>
      f.filter(upper($"condn").contains(c.toUpperCase))
    })
      fil_col_df.reduce(_.union(_)).show(false)
    /*
    +-----+----------------------------------------------------------------------------+
    |carid|condn                                                                       |
    +-----+----------------------------------------------------------------------------+
    |100bw|RESALE IN (2015)                                                            |
    |25xw |RESALE IN (2017)                                                            |
    |masy |RESALE IN (2018)                                                            |
    |mxw  |RESALE IN (2019)                                                            |
    |100bw|((FACTS >= 1) AND (FACTS <= 200))                                           |
    |25xw | FACTS NOT IN (234,435,456)                                                 |
    |25xw |((FACTS >= 220) AND (FACTS <= 500))                                         |
    |masy |(((FACTS >= 600) AND (FACTS <= 700)) OR ((FACTS >= 810) AND (FACTS <= 920)))|
    |mxw  |(FACTS == 1541)                                                             |
    +-----+----------------------------------------------------------------------------+
    */

  }
    def pasrseWhereClause(df:DataFrame)(implicit sparkSession: SparkSession): Seq[(String, Seq[String])] = {

      val parseWhere = df.collect().map({ tblrow =>
        val id = tblrow.getAs[String]("carid")
        val query = tblrow.getAs[String]("filter_condition")
        val q = sparkSession.sessionState.sqlParser.parsePlan("select * from tbl where "+query.replace("\"","'"))
        val w = q.children.collect{case f:Filter  =>
          val filter_condns = f.condition.productIterator.flatMap{
            case And(l,r) => Seq(l.simpleString.replace("'",""),r.simpleString.replace("'",""))
            case o:Predicate => Seq(o.simpleString.replace("'",""))
          }
          filter_condns.map(filter_condn => {
            val not_condn = filter_condn.contains("NOT") match {
              case true => filter_condn.replace ("NOT", "").replace ("IN", "NOT IN").replace ("=", "!=")
              case false => filter_condn
            }
            not_condn.replace("=", "==")
              .replace("&&", "AND")
              .replace("||", "OR").replace("<==", "<=").replace(">==", ">=")

          })
        }.toList.flatten
        (id,w)
      })
      parseWhere
    }

}

编辑:

package sample
import java.util

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.types._

object splitObj {
  def main(args: Array[String]) = {
    implicit val sparkSession = SparkSession.builder().master("local").getOrCreate()
    import sparkSession.implicits._
    val tagsDF = sparkSession.read.option("multiLine", true).option("inferSchema", true).json("src/main/resources/sample.json")
    val df = tagsDF.select(explode($"CarBrands") as "car_brands").select($"car_brands.*")
    df.show(false)
 /* +-------+-----------------------------------------------------------------------------------------------------------------------+
    |carid  |filter_condition                                                                                                       |
    +-------+-----------------------------------------------------------------------------------------------------------------------+
    |100bw  | (YEAR == "2009" AND FACTS BETWEEN 0001 AND 200 AND STORE=="UK" AND RESALE in ("2015"))                                |
    |mxw    | (YEAR != "2013" AND  (FACTS =="1541" AND STORE=="US" AND FACTS !="200" AND YEAR == "2014") AND RESALE in  ("2019"))   |
    |100bxxw| (YEAR == "2009" OR STORE=="UK" OR RESALE in ("2015"))                                                                 |
    |25xw   | (YEAR == "2010" AND FACTS NOT IN (234,435,456) AND FACTS between 220 AND 500 AND RESALE in ("2017"))                  |
    |masy   | (YEAR == "2010" AND STORE=="USA" AND (FACTS BETWEEN 600 AND 700 OR FACTS BETWEEN 810 AND 920) AND RESALE in ("2018")) |
    |mxw    | (YEAR != "2013" AND  (FACTS =="1541" AND STORE=="US" AND FACTS !="200" AND YEAR == "2014") AND RESALE in  ("2019"))   |
    +-------+-----------------------------------------------------------------------------------------------------------------------+*/
    val df_where = pasrseWhereClause(df).toDF("carid","where_condition")
    val f = df.join(df_where,Seq("carid"),"inner").select($"carid",explode($"where_condition").as("condn"))
    f.show(200,false)
  /*+-------+---------------------------+
    |carid  |condn                      |
    +-------+---------------------------+
    |100bw  |'YEAR='2009'               |
    |100bw  |'FACTS between 1 and 200   |
    |100bw  |'STORE='UK'                |
    |100bw  |'RESALE IN (2015)          |
    |mxw    |NOT ('YEAR = 2013)         |
    |mxw    |'FACTS='1541'              |
    |mxw    |'STORE='US'                |
    |mxw    |NOT ('FACTS = 200)         |
    |mxw    |'YEAR='2014'               |
    |mxw    |'RESALE IN (2019)          |
    |mxw    |NOT ('YEAR = 2013)         |
    |mxw    |'FACTS='1541'              |
    |mxw    |'STORE='US'                |
    |mxw    |NOT ('FACTS = 200)         |
    |mxw    |'YEAR='2014'               |
    |mxw    |'RESALE IN (2019)          |
    |100bxxw|'YEAR='2009'               |
    |100bxxw|'STORE='UK'                |
    |100bxxw|'RESALE IN (2015)          |
    |25xw   |'YEAR='2010'               |
    |25xw   |NOT 'FACTS IN (234,435,456)|
    |25xw   |'FACTS between 220 and 500 |
    |25xw   |'RESALE IN (2017)          |
    |masy   |'YEAR='2010'               |
    |masy   |'STORE='USA'               |
    |masy   |'FACTS between 600 and 700 |
    |masy   |'FACTS between 810 and 920 |
    |masy   |'RESALE IN (2018)          |
    |mxw    |NOT ('YEAR = 2013)         |
    |mxw    |'FACTS='1541'              |
    |mxw    |'STORE='US'                |
    |mxw    |NOT ('FACTS = 200)         |
    |mxw    |'YEAR='2014'               |
    |mxw    |'RESALE IN (2019)          |
    |mxw    |NOT ('YEAR = 2013)         |
    |mxw    |'FACTS='1541'              |
    |mxw    |'STORE='US'                |
    |mxw    |NOT ('FACTS = 200)         |
    |mxw    |'YEAR='2014'               |
    |mxw    |'RESALE IN (2019)          |
    +-------+---------------------------+

* /

    val col_list = Seq("resale","facts")
    val fil_col_df = col_list.map({c =>
      f.filter(upper($"condn").contains(c.toUpperCase))
    })
      fil_col_df.reduce(_.union(_)).show(200,false)

    /*
    +-------+---------------------------+
    |carid  |condn                      |
    +-------+---------------------------+
    |100bw  |'RESALE IN (2015)          |
    |mxw    |'RESALE IN (2019)          |
    |mxw    |'RESALE IN (2019)          |
    |100bxxw|'RESALE IN (2015)          |
    |25xw   |'RESALE IN (2017)          |
    |masy   |'RESALE IN (2018)          |
    |mxw    |'RESALE IN (2019)          |
    |mxw    |'RESALE IN (2019)          |
    |100bw  |'FACTS between 1 and 200   |
    |mxw    |'FACTS='1541'              |
    |mxw    |NOT ('FACTS = 200)         |
    |mxw    |'FACTS='1541'              |
    |mxw    |NOT ('FACTS = 200)         |
    |25xw   |NOT 'FACTS IN (234,435,456)|
    |25xw   |'FACTS between 220 and 500 |
    |masy   |'FACTS between 600 and 700 |
    |masy   |'FACTS between 810 and 920 |
    |mxw    |'FACTS='1541'              |
    |mxw    |NOT ('FACTS = 200)         |
    |mxw    |'FACTS='1541'              |
    |mxw    |NOT ('FACTS = 200)         |
    +-------+---------------------------+
    */

  }
  def parseExpressions(expression: Expression): Seq[String] = {
    expression match{
      case And(l,r) => (l,r) match {
        case (gte: GreaterThanOrEqual,lte: LessThanOrEqual) => Seq(s"""${gte.left.toString} between ${gte.right.toString} and ${lte.right.toString}""")
        case (_,_) => Seq(l,r).flatMap(parseExpressions)
      }
      case Or(l,r) => Seq(l,r).flatMap(parseExpressions)
      case EqualTo(l,r) =>
        val prettyLeft = if(l.resolved && l.dataType == StringType) s"'${l.toString}'" else l.toString
        val prettyRight = if(r.resolved && r.dataType == StringType) s"'${r.toString}'" else r.toString
        Seq(s"$prettyLeft=$prettyRight")
      case inn: IsNotNull => Seq(s"${inn.child.toString} is not null")
      case p: Predicate => Seq(p.toString)
    }
  }

  def pasrseWhereClause(df:DataFrame)(implicit sparkSession: SparkSession): Seq[(String, Seq[String])] = {
    var xx=Seq(Set.empty[String],Set.empty[String])
    val parseWhere = df.collect().map({ tblrow =>
      val id = tblrow.getAs[String]("carid")
      val query = tblrow.getAs[String]("filter_condition")
      val q = sparkSession.sessionState.sqlParser.parsePlan("select * from tbl where "+query.replace("\"","'"))
      var lt = ""
      var rt =""
      val w: Seq[Expression] = q.children.collect{case f:Filter  =>
        f.condition.productIterator.flatMap{
          case o:Predicate => Seq(o)
        }
        //xx.toList.flatten
      }.toList.flatten
      val output = w.flatMap{parseExpressions}
      (id,output)
    })
    parseWhere
  }

}

相关问题