有没有办法阻止自定义kafka反序列化程序无限次重试?

jfgube3f  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(233)

我正在使用一个自定义的kafka反序列化程序来从json封送一个对象。

val props = Map(
  "bootstrap.servers" -> kafkaQueue.kafkaHost,
  "group.id" -> adapterServer.adapterConfig.kafkaGroup,
  "enable.auto.commit" -> "false",
  "max.poll.records" -> "1",
  "auto.offset.reset" -> "earliest",
  "auto.commit.interval.ms" -> "1000",
  "key.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
  "value.deserializer" -> "com.mystuff.CustomJSONDeserializer"
)
new KafkaConsumer[Array[Byte], MyMessage](props)

我看到的一件事是,如果有人将坏的json发布到主题中,kafka会尝试用我的自定义反序列化程序对其进行反序列化,但却做不到。customjsondeserializer抛出了一个异常,但是kafka一直在尝试。
所以它只是无限旋转,试图重新反序列化坏的json,基本上卡住了。由于这一切都发生在Kafka内部,我不知道如何阻止它,并告诉它继续下一个信息。
我怎样才能避免这种情况?

gzszwxb4

gzszwxb41#

如果捕获到异常,应用程序可以查找受影响的消息以避免它。实际上,0.10.0.0使用者中有一个bug掩盖了反序列化错误,所以我不确定早期版本是否正确地传播了异常。从即将发布的0.10.0.1版本开始,我们将至少记录有关导致错误的消息的主题、分区和偏移量的信息。

相关问题