更改scala局部变量范围

shyt4zoc  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(375)

下面是我为流应用程序制作的错误处理程序的scala代码片段。它使用akka流来使用kafka主题中的消息('errormsg'),并将它们写入kudu中的一个表中。

val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = Consumer.committableSource(
    consumerSettings,
    Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))

  val cdrs: Source[Errors, Consumer.Control] = kafkaMessages.map(msg => {
    val bytes: Array[Byte] = msg.record.value()
    val errormsg = (bytes.map(_.toChar)).mkString
    new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
  })

  cdrs.to(new ErrorKuduSink(session, table)).run()

我想再进一步使用变量'errormsg',作为给我发邮件的几行的一部分。
如何转义'errormsg'(或者合并下面的代码段),以便变量范围是合适的?

send a new Mail (
    from = ("errorhandler@domain.com"),
    to = "myemailadres@domain.com",
    subject = "Encountered error",
    message = errormsg
  )
4smxwvx5

4smxwvx51#

在这里,我建议使用可变列表轻松解决您的问题:

val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = Consumer.committableSource(
    consumerSettings,
    Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))

    import scala.collection.mutable._
    val errorMessages: MutableList[String] = new MutableList

  val cdrs: Source[Errors, Consumer.Control] = kafkaMessages.map(msg => {
    val bytes: Array[Byte] = msg.record.value()
    val errormsg = (bytes.map(_.toChar)).mkString
    errorMessages += errormsg
    new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
  })

  cdrs.to(new ErrorKuduSink(session, table)).run()
fzsnzjdm

fzsnzjdm2#

解决方案1:在Map方法中发送电子邮件(将在每个Kafka消息中发送电子邮件)

def sendEmail(errormsg: String): Unit = ???

val cdrs: Source[Errors, Consumer.Control] = 
  kafkaMessages.map { msg => 
    val bytes: Array[Byte] = msg.record.value()
    val errormsg = (bytes.map(_.toChar)).mkString
    sendEmail(errormsg) // call function that sends email
    new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
  }

解决方案2:如果你想更复杂地使用 errormsg 在下游阶段,您需要从Map阶段返回一个元组:

val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = 
  Consumer.committableSource(consumerSettings, Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))

val cdrs: Source[Errors, Consumer.Control] = 
  kafkaMessages.map { msg => 
    val bytes: Array[Byte] = msg.record.value()
    val errormsg = (bytes.map(_.toChar)).mkString
    (new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg), errormsg) // we are returning a tuple so type of downstream elements will be (Errors, String)
  }.map { case i@(errors, errormsg) => 
    sendEmail(errormsg)
    i
  }.map { tuple =>
    ...
  }.map(_._1) // as we dont need a tuple any more we can get original element and continue processing of it

cdrs.to(new ErrorKuduSink(session, table)).run()

解决方案3:如果您想要更复杂的处理(例如在一封电子邮件中批处理多个errormsg),您可能需要创建一个runnablegraph

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Consumer.committableSource(consumerSettings, Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
    .map { msg => 
      val bytes: Array[Byte] = msg.record.value()
      val errormsg = (bytes.map(_.toChar)).mkString
      (new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg), errormsg)
    }
  val kuduout = new ErrorKuduSink(session, table)
  val emailout = Sink.foreach[Seq[String]] { errormsgs =>
    sendEmail(errormsgs)
  }
  val f1 = Flow[(Errors, String)]
    .map(_._1) // take errors

  val f2 = Flow[(Errors, String)]
    .map(_._2) // take errormsgs
    .groupedWithin(100, 1.hour)

  val bcast = builder.add(Broadcast[Int](2))

  in ~> bcast
  bcast ~> f1 ~> kuduout
  bcast ~> f2 ~> emailout 
  ClosedShape
})

相关问题