如何从dataframe列动态创建temp表?

hivapdat  于 2021-06-25  发布在  Hive
关注(0)|答案(2)|浏览(363)

我在java8中使用spark-sql-2.4.1v。我有一个需求,我需要创建一个可清空的表,它的数据是按“country\u id”分组的。
为了让我的Spark平行,我需要为每个国家创造一个诱人的。i、 e.想为每个“国家/地区id”动态创建一个诱人的。
怎么做,有什么建议吗。

hec6srdp

hec6srdp1#

{      package spark

import org.apache.spark.sql.SparkSession

object DynamicTempTable extends App {
  val spark = SparkSession.builder()
    .master("local")
    .appName("Mapper")
    .getOrCreate()

  import spark.implicits._

  case class Country(
                     ID: Int,
                     name: String
                    )

  val countryDF = Seq(
    Country(1, "c1"),
    Country(2, "c2"),
    Country(3, "c3"),
    Country(4, "c4"),
    Country(5, "c5"),
    Country(6, "c6"),
    Country(7, "c7"),
    Country(8, "c8"),
    Country(9, "c9"),
    Country(10, "c10")
  ).toDF()

  val listCountries = countryDF.select('ID).distinct().rdd.map(r => r(0)).collect()
  println(s"list countries: $listCountries")

  listCountries.foreach(i => {
    countryDF.filter('ID.equalTo(i)).createTempView(s"tmp_table_country_$i")
  })
// or parallel
listCountries.par.foreach(i => {
countryDF.filter('ID.equalTo(i)).createTempView(s"tmp_table_country_$i") })
}

}

r7s23pms

r7s23pms2#

如果你有国家目录

{    package spark

import org.apache.spark.sql.{SparkSession}

object DynamicTempTable extends App {
  val spark = SparkSession.builder()
    .master("local")
    .appName("Mapper")
    .getOrCreate()

  import spark.implicits._

  case class Country(
                     ID: Int,
                     name: String
                    )

  case class CountryData(
                      ID: Int,
                      capital: String,
                      population: Long
                    )

  case class CountryTable(
                      ID: Int,
                      name: String,
                      nameTable: String
                    )

  val countryDF = Seq(
    Country(1, "c1"),
    Country(2, "c2"),
    Country(3, "c3"),
    Country(4, "c4"),
    Country(5, "c5"),
    Country(6, "c6"),
    Country(7, "c7"),
    Country(8, "c8"),
    Country(9, "c9"),
    Country(10, "c10")
  ).toDF()

  val countryData = Seq(
    CountryData(1, "cp1", 11),
    CountryData(2, "cp2", 22),
    CountryData(3, "cp3", 33),
    CountryData(4, "cp4", 44),
    CountryData(5, "cp5", 55),
    CountryData(6, "cp6", 66),
    CountryData(7, "cp7", 77),
    CountryData(8, "cp8", 88),
    CountryData(9, "cp9", 99),
    CountryData(10, "cp10", 1010)
  ).toDF()

  import scala.collection.mutable.ListBuffer
  var tableToCountry = new ListBuffer[CountryTable]()

  countryDF.collect().foreach(i => {

    val nameTempTable = s"${i.getAs[String]("name")}_temp_table"
    val countryId = i.getAs[Int]("ID")
    val countryName = i.getAs[String]("name")

    countryData.filter('ID.equalTo(countryId)).createOrReplaceTempView(nameTempTable)

    tableToCountry += CountryTable(countryId, countryName, nameTempTable)

  })

  val tcDF = tableToCountry.toDF()
  tcDF.show(false)
//  +---+----+--------------+
//  |ID |name|nameTable     |
//  +---+----+--------------+
//  |1  |c1  |c1_temp_table |
//  |2  |c2  |c2_temp_table |
//  |3  |c3  |c3_temp_table |
//  |4  |c4  |c4_temp_table |
//  |5  |c5  |c5_temp_table |
//  |6  |c6  |c6_temp_table |
//  |7  |c7  |c7_temp_table |
//  |8  |c8  |c8_temp_table |
//  |9  |c9  |c9_temp_table |
//  |10 |c10 |c10_temp_table|
//  +---+----+--------------+

  tcDF.createOrReplaceTempView("table_to_country")

  spark.table("table_to_country").show(false)
//  +---+----+--------------+
//  |ID |name|nameTable     |
//  +---+----+--------------+
//  |1  |c1  |c1_temp_table |
//  |2  |c2  |c2_temp_table |
//  |3  |c3  |c3_temp_table |
//  |4  |c4  |c4_temp_table |
//  |5  |c5  |c5_temp_table |
//  |6  |c6  |c6_temp_table |
//  |7  |c7  |c7_temp_table |
//  |8  |c8  |c8_temp_table |
//  |9  |c9  |c9_temp_table |
//  |10 |c10 |c10_temp_table|
//  +---+----+--------------+

  println(s"~~~~> check result for table ${tcDF.select('nameTable).take(1)(0).mkString}")
  spark.table(tcDF.select('nameTable).take(1)(0).mkString).show(false)

//  ~~~~> check result for table c1_temp_table
//  +---+-------+----------+
//  |ID |capital|population|
//  +---+-------+----------+
//  |1  |cp1    |11        |
//  +---+-------+----------+

}

}

相关问题