我有一个关于scala和sbt的项目
我有制片人
如果Kafka无法联系到我,我会尝试重新发送消息
package com.example
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NonFatal
object producer extends App {
private val decider: Supervision.Decider = {
case NonFatal(ex) =>
println("Non fatal exception in flow. Skip message and resuming flow.",
ex)
Supervision.Restart
case ex: Throwable =>
println("Other exception in flow. Stopping flow.", ex)
Supervision.Stop
}
implicit val system = ActorSystem("QuickStart")
private val strategy =
ActorMaterializerSettings(system).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(strategy)
val config = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers("10.20.10.193:9092")
.withProperty("message.send.max.retries", "3")
.withProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000")
//.withProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "5000")
val done =
Source
.single("11")
.map(value => new ProducerRecord[String, String]("example", value))
.runWith(Producer.plainSink(producerSettings))
Await.result(done, 1000 seconds)
}
我定义了一个属性:
.withProperty("message.send.max.retries", "3")
但它不起作用
当我用一个坏的kafka主机运行produser时,输出是
[INFO ] - 2018-07-30 23:00:36,951 - suppression - akka.event.slf4j.Slf4jLogger - Slf4jLogger started
(Non fatal exception in flow. Skip message and resuming flow.,org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.)
(Non fatal exception in flow. Skip message and resuming flow.,org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.)
日志中只有两次重试,而不是三次
暂无答案!
目前还没有任何答案,快来回答吧!