下面是我为流应用程序制作的错误处理程序的scala代码片段。它使用akka流来使用kafka主题中的消息('errormsg'),并将它们写入kudu中的一个表中。
val kafkaMessages: Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = Consumer.committableSource(
consumerSettings,
Subscriptions.topics(conf.getString("kafka.topics.errorRawCdr")))
val cdrs: Source[Errors, Consumer.Control] = kafkaMessages.map(msg => {
val bytes: Array[Byte] = msg.record.value()
val errormsg = (bytes.map(_.toChar)).mkString
new Errors(1235, "filename", "cdr", "cdr_type", 0, errormsg)
})
cdrs.to(new ErrorKuduSink(session, table)).run()
我想再进一步使用变量'errormsg',作为给我发邮件的几行的一部分。
如何转义'errormsg'(或者合并下面的代码段),以便变量范围是合适的?
send a new Mail (
from = ("errorhandler@domain.com"),
to = "myemailadres@domain.com",
subject = "Encountered error",
message = errormsg
)
2条答案
按热度按时间4smxwvx51#
在这里,我建议使用可变列表轻松解决您的问题:
fzsnzjdm2#
解决方案1:在Map方法中发送电子邮件(将在每个Kafka消息中发送电子邮件)
解决方案2:如果你想更复杂地使用
errormsg
在下游阶段,您需要从Map阶段返回一个元组:解决方案3:如果您想要更复杂的处理(例如在一封电子邮件中批处理多个errormsg),您可能需要创建一个runnablegraph