使用map()时如何处理spark/scala中的上下文

b1zrtrql  于 2021-06-26  发布在  Hive
关注(0)|答案(0)|浏览(262)

我对scala不太熟悉,对spark也不太熟悉,我正在尝试开发一个基本的测试来理解Dataframe实际上是如何工作的。我的目标是基于另一个表的一些注册表的值来更新我的mydf。
一方面,我有自己的应用程序:

object TestApp {
  def main(args: Array[String]) {
    val conf: SparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(conf)
    implicit val hiveContext : SQLContext = new HiveContext(sc)
    val test: Test = new Test()
    test.test
  }
}

另一方面,我有我的测试课程:

class Test(implicit sqlContext: SQLContext) extends Serializable {

  val hiveContext: SQLContext = sqlContext
  import hiveContext.implicits._

  def test(): Unit = {
    val myDF = hiveContext.read.table("myDB.Customers").sort($"cod_a", $"start_date".desc)
    myDF.map(myMap).take(1)
  }
  def myMap(row: Row): Row = {
    def _myMap: (String, String) = {
      val investmentDF: DataFrame = hiveContext.read.table("myDB.Investment")
      var target: (String, String) = casoX(investmentDF, row.getAs[String]("cod_a"), row.getAs[String]("cod_p"))
      target
    }
    def casoX(df: DataFrame, codA: String, codP: String)(implicit hiveContext: SQLContext): (String, String) = {
      var rows: Array[Row] = null
      if (codP != null) {
        println(df)
        rows = df.filter($"cod_a" === codA && $"cod_p" === codP).orderBy($"sales".desc).select($"cod_t", $"nom_t").collect
      } else {
        rows = df.filter($"cod_a" === codA).orderBy($"sales".desc).select($"cod_t", $"nom_t").collect
      }
      if (rows.length > 0) (row(0).asInstanceOf[String], row(1).asInstanceOf[String]) else null
    }
    val target: (String, String) = _myMap
    Row(row(0), row(1), row(2), row(3), row(4), row(5), row(6), target._1, target._2, row(9))
  }
}
``` `Well, when I execute it, I have a NullPointerException on the instruction val investmentDF: DataFrame = hiveContext.read.table("myDB.Investment")` ,更准确地说 `hiveContext.read` 如果我在“test”函数中分析hivecontext,我可以访问它的sparkcontext,并且可以毫无问题地加载df。然而,如果我在得到nullpointerexception之前分析我的hivecontext对象,它的sparkcontext是空的,并且我假设由于sparkcontext是不可序列化的(并且由于我在map函数中,我丢失了hivecontext对象的一部分,对吗?)
不管怎样,我不知道我的代码到底出了什么问题,我应该如何修改它以获得investmentdf而不产生任何nullpointerexception?
谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题