我在用Kafka来建立我的制作人。我正在使用@kafkaclient注解来设置所有producer配置micronaut kafka使我能够设置所有参数来设置事务性producer。当我推送消息时,我得到一个异常消息
io.micronaut.messaging.exceptions.MessagingClientException: Exception sending producer record: Cannot perform 'send' before completing a call to initTransactions when transactions are enabled.
回到mirconaut文档部分,看起来它要求您使用kafkaproducerapi来实现这个特性。
据我所知,kafkaproducer.inittransactions()方法需要在启动事务之前被调用,并且看起来不会发生这种情况。
有没有人在实施这项计划时遇到过类似的问题?
1条答案
按热度按时间hec6srdp1#
我猜,您是在使用单节点集群进行开发,对吗?如果是这样,您应该在本地集群上配置transaction.state.log.min.isr=1和transaction.state.log.replication.factor=1。默认情况下,它们都预配置为3。
还有一段来自合流https://docs.confluent.io/current/streams/developer-guide/config-streams.html
加工保证
应使用的加工保证。可能的值为“至少\u至少\u一次”(默认值)和“恰好\u一次”。请注意,如果只启用了一次处理,则参数commit.interval.ms的默认值将更改为100ms。此外,使用者配置为隔离级别=“read\u committed”,生产者配置为retries=integer.max\u value,并根据默认值enable.idempotence=true。请注意,“精确\u once”处理在默认情况下需要至少包含三个代理的集群,这是推荐的生产设置。对于开发,可以通过将transaction.state.log.replication.factor和transaction.state.log.min.isr中的代理设置调整为要使用的代理数来更改此设置。