如何在akka actor系统终止时运行调度函数

cnh2zyt3  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(342)

我有一个actor系统,我用它来调度函数的执行,该函数从flink的map操作符生成kafka主题中的事件。在例外情况下,actor系统终止,并在akka文件中声明(参见https://doc.akka.io/docs/akka/current/scheduler.html#from-akka actor actorsystem)所有计划的任务都应该执行。在我的例子中,当函数被执行时,会抛出一个java.lang.noclassdeffounderror,它与函数中使用的类有关。

new RichMapFunction[String, String] {
      implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
      var myActorSystem: ActorSystem = _
      var kafkaProducer: KafkaProducer[String, String] = _
      var runtimeContext: RuntimeContext = _

      override def map(value: String): String = {
        value match {
          case "stop" =>
            throw new Exception("Stop command received")
          case _ =>
            myActorSystem.scheduler.scheduleOnce(FiniteDuration(5L, MINUTES)){
              kafkaProducer.send(new ProducerRecord[String, String]("test", value.reverse))
            }
        }

        s"scheduled function on event $value"
      }

      override def open(parameters: Configuration): Unit = {
        myActorSystem = ActorSystem("testSystem")
        kafkaProducer = {
          val props = new Properties()
          props.put("bootstrap.servers", "localhost:9092")
          // props.put("acks", "all")
          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
          new KafkaProducer[String, String](props)
        }
        runtimeContext = getRuntimeContext
      }

      override def close(): Unit = {
        println("Terminate actor system...")
        myActorSystem.terminate()
      }
    }
xdnvmnnf

xdnvmnnf1#

actor系统终止是异步的,所以我使用了下面的代码。

Await.result(myActorSystem.terminate(), scala.concurrent.duration.Duration.Inf)

相关问题