在spark streaming中使用mapgroupwithstate时使用批处理查找表,给出java.lang.nullpointerexception

tkqqtvp1  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(271)

假设我有一个事件流,如下json,它被转换成case类

e.g.
{"id" : "IdOfUser" , "title": "Title1"}
 to 
case class UserEvent(id: Int, title: String)

.
我在mongodb中还有一个表,其中包含每个标题的元数据。它充当查找表,例如

val lookup_df =  sparkSession.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.TitleMetaData")))

我还使用mapgroupwithstate跟踪状态(https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html)
我的有状态会话如下所示。

case class UserSession(var visited: collection.Map[String, Boolean],
                       var size:Int
                       )

我的主要问题似乎是,每当我试图在Map函数中使用一个操作时,lookuptable就会消失。当我将Dataframe作为对象中的全局变量读取时,会出现如下代码。这个代码给了我一个空的异常。

object StreamState{ 
      val sparkSession = SparkSession.builder()
      .master("local")
      .appName("StreamState")
      .getOrCreate()

      val new_data = sparkSession.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.TitleMetaData")))
    }

此streamstate对象具有以下功能(updateuserstatewithevent在运行收集时引发错误)
错误发生在这里。def updateuserstatewithevent命令(statez:usersession, event:userevent):usersession={import sparksession.implicits.\uval current\u event=lookup\u df.filter($“title”==event.url)//此收集行给我一个错误val size=current\u event.select(“size”).as[int].collect()(0)val empty\u map=mapstring,boolean state.visited=empty\u map state.size=size state}
我已经把这个助手函数做了初步的设计,这样我们就不会被逻辑所束缚。state的size对象正被从一开始就读入的表(从mongodb)更改。这是我的Map函数。

def updateAcrossEvents(user:Int,
                           events: Iterator[UserEvent],
                           oldState: GroupState[UserSession]):UserSession = {

      var state:UserSession = if (oldState.exists) {

        println(oldState.get.visited)
        oldState.get
      }
      else {
        val empty_map = Map[String, Boolean]()

        val empty_session = UserSession(empty_map, 0)
        empty_session
      }

      import sparkSession.implicits._
      for (event <- events) {
        state = updateUserStateWithEvent(state, event)
        oldState.update(state)
      }
      state
    }

最后,这里是我的流状态对象的主要函数(这是从https://blog.yuvalitzchakov.com/exploring-stateful-streaming-with-spark-structured-streaming/)您可以在该博客中找到deserializeuserevent。

def main(args: Array[String]): Unit = {

      sparkSession.sparkContext.setLogLevel("WARN")
      import sparkSession.implicits._

      val userEventsStream = sparkSession.readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", 12345)
        .load()
        .as[String]

      val finishedUserSessionsStream: Dataset[UserSession] =
        userEventsStream
          .map(deserializeUserEvent)
          .groupByKey(_.id)
          .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
            updateAcrossEvents)

      finishedUserSessionsStream.writeStream
        .outputMode(OutputMode.Update())
        .format("console")
        .option("checkpointLocation", "checkpoint")
        .option("truncate",false)
        .start()
        .awaitTermination()

}

这是给我中止任务java.lang.nullpointerexception
我想知道为什么会发生这个错误,以及我如何修复它。它与我们的Map函数接受事件迭代器有关系吗,或者全局变量在流的上下文中不起作用吗?
以下是您可以从中获取的完整代码

import com.mongodb.spark.config.ReadConfig
import com.mongodb.spark.sql._

import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, SparkSession}

import user.{UserEvent, UserSession,USession}

import scala.collection.{mutable, _}
import argonaut.Argonaut._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}

import scala.collection.Map

object StreamStateThree {

  val sparkSession = SparkSession.builder()
    .master("local")
    .appName("StreamStateThree")
    //.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/blog.articles")
    //.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/blog.vectors")
    .getOrCreate()

  sparkSession.sparkContext.setLogLevel("WARN")
  import sparkSession.implicits._
  val sc = sparkSession.sparkContext

  val new_data = sparkSession.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.cleanVectors")))
  sparkSession.sparkContext.broadcast(new_data)

  def updateUserStateWithEvent(statez:USession, event:UserEvent):USession = {
    println("Updating")

    val empty_map = Map[String, Boolean]()
    val empty_rec: Array[String] = Array("")
    val current_event = new_data.filter($"title" === event.url)
    val size:Int = current_event.select("size").as[Int].collect()(0)
    //.limit(1)
    val empty_session = USession(empty_map,-7)
    empty_session
  }

  def updateAcrossEvents(user:Int,
                         events: Iterator[UserEvent],
                         oldState: GroupState[USession]):USession = {
    var state:USession = if (oldState.exists) {
      println("State exists with the following visited")
      oldState.get
    }
    else {
      println("State does not exist")

      val empty_map = Map[String, Boolean]()
      val empty_session = USession(empty_map,-7)
      empty_session
    }
    // we simply specify an old date that we can compare against and
    // immediately update based on the values in our data

    for (event <- events) {
      state = updateUserStateWithEvent(state, event)
      oldState.update(state)
    }
    state
  }

  def deserializeUserEvent(json: String): UserEvent = {
    json.decodeEither[UserEvent] match {
      case Right(userEvent) => userEvent
      case Left(error) =>
        println(s"Failed to parse user event: $error")
        UserEvent.empty
    }
  }

def main(args: Array[String]): Unit = {
    //new_data2.show(20,false)
  val userEventsStream = sparkSession.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 12346)
    .load()
    .as[String]

  val finishedUserSessionsStream =
    userEventsStream
      .map(deserializeUserEvent)
      .groupByKey(_.id)
      .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents)

  finishedUserSessionsStream.writeStream
    .outputMode(OutputMode.Update())
    .format("console")
    .option("checkpointLocation", "checkpoint")
    .option("truncate",false)
    .start()
    .awaitTermination()
}

}

下面是案例类供您使用

package user
import org.apache.spark.sql.Dataset

import scala.collection.Map
case class USession(var visited: collection.Map[String, Boolean],
                       var size : Int)

和事件

package user

import argonaut.Argonaut._
import argonaut.CodecJson

case class UserEvent(id: Int, url: String)
object UserEvent {
  implicit def codec: CodecJson[UserEvent] =
    casecodec2(UserEvent.apply, UserEvent.unapply)("id", "url")

  lazy val empty = UserEvent(-1, "")
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题