我对scala不太熟悉,对spark也不太熟悉,我正在尝试开发一个基本的测试来理解Dataframe实际上是如何工作的。我的目标是基于另一个表的一些注册表的值来更新我的mydf。
一方面,我有自己的应用程序:
object TestApp {
def main(args: Array[String]) {
val conf: SparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)
implicit val hiveContext : SQLContext = new HiveContext(sc)
val test: Test = new Test()
test.test
}
}
另一方面,我有我的测试课程:
class Test(implicit sqlContext: SQLContext) extends Serializable {
val hiveContext: SQLContext = sqlContext
import hiveContext.implicits._
def test(): Unit = {
val myDF = hiveContext.read.table("myDB.Customers").sort($"cod_a", $"start_date".desc)
myDF.map(myMap).take(1)
}
def myMap(row: Row): Row = {
def _myMap: (String, String) = {
val investmentDF: DataFrame = hiveContext.read.table("myDB.Investment")
var target: (String, String) = casoX(investmentDF, row.getAs[String]("cod_a"), row.getAs[String]("cod_p"))
target
}
def casoX(df: DataFrame, codA: String, codP: String)(implicit hiveContext: SQLContext): (String, String) = {
var rows: Array[Row] = null
if (codP != null) {
println(df)
rows = df.filter($"cod_a" === codA && $"cod_p" === codP).orderBy($"sales".desc).select($"cod_t", $"nom_t").collect
} else {
rows = df.filter($"cod_a" === codA).orderBy($"sales".desc).select($"cod_t", $"nom_t").collect
}
if (rows.length > 0) (row(0).asInstanceOf[String], row(1).asInstanceOf[String]) else null
}
val target: (String, String) = _myMap
Row(row(0), row(1), row(2), row(3), row(4), row(5), row(6), target._1, target._2, row(9))
}
}
``` `Well, when I execute it, I have a NullPointerException on the instruction val investmentDF: DataFrame = hiveContext.read.table("myDB.Investment")` ,更准确地说 `hiveContext.read` 如果我在“test”函数中分析hivecontext,我可以访问它的sparkcontext,并且可以毫无问题地加载df。然而,如果我在得到nullpointerexception之前分析我的hivecontext对象,它的sparkcontext是空的,并且我假设由于sparkcontext是不可序列化的(并且由于我在map函数中,我丢失了hivecontext对象的一部分,对吗?)
不管怎样,我不知道我的代码到底出了什么问题,我应该如何修改它以获得investmentdf而不产生任何nullpointerexception?
谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!