Kafka与交易

qnyhuwrf  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(383)

我想在事务中使用springkafka,但我不太明白它应该如何配置以及如何工作。
这是我的配置

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    props.put(ProducerConfig.ACKS_CONFIG, "all");

此配置用于事务id前缀为的defaultkafkaproducerfactory:

defaultKafkaProducerFactory.setTransactionIdPrefix("my_app.");

问题1:
我应该如何选择此事务id前缀?如果我理解正确,spring将使用这个前缀为创建的每个生产者生成一个事务id。
为什么不能直接使用“uuid.randomuuid()?
问题2:
如果生产者被销毁,它将生成一个新的事务id。因此,如果应用程序崩溃,重新启动时它将重用旧的事务id。
这正常吗???
问题3:
我正在使用一个部署在云上的应用程序,它可以自动放大/缩小。这意味着我的前缀无法修复,因为每个示例上的所有生产者都将有冲突的事务id。
我应该添加一个随机部分吗?当示例缩小/增大或崩溃并重新启动时,是否需要恢复相同的前缀?
问题4:
最后但并非最不重要的是,我们正在使用Kafka的凭据。这似乎不起作用:

Current ACLs for resource `TransactionalId:my_app*`:
    User:CN... has Allow permission for operations: All from hosts: *

我应该如何设置我的ACL知道我的事务ID是生成的?
编辑1
进一步阅读后,如果我理解正确。
如果有一个c0(consumer)从p0(partition)读取,如果代理启动consumer重新平衡。p0可以分配给另一个消费者c1。此使用者c1应使用与前一个c0相同的事务id以防止重复(僵尸围栏)?
在SpringKafka你是如何做到这一点的?事务id似乎与使用者无关,因此与分区读取无关。
谢谢

oknwwptz

oknwwptz1#

你不能使用随机tid,因为僵尸围栏-如果服务器崩溃,你可能会有一个部分事务在主题中,这将永远不会完成,没有更多的将被消耗从任何分区与该事务的写入。
这是出于上述原因设计的。
同样,你不能随机化;出于上述原因。
例如,cloudfoundry有一个指示示例索引的环境变量。如果您使用的云平台不包含类似的内容,那么您必须以某种方式模拟它。然后,在事务id中使用它:

spring.kafka.producer.transaction-id-prefix=foo-${instance.index}-

acls-我不能回答这个问题;我不熟悉Kafka的权限;最好单独问一个问题。
我认为我们需要向spring添加一些逻辑,以确保相同的事务id总是用于特定的主题/分区。
https://github.com/spring-projects/spring-kafka/issues/800#issuecomment-419501929
编辑
自从这个答案(kip-447)以来,情况发生了变化;如果您的代理是2.5.0或更高版本-请参阅。https://docs.spring.io/spring-kafka/docs/2.5.5.release/reference/html/#exactly-一次又一次https://docs.spring.io/spring-kafka/docs/2.6.0-snapshot/reference/html/#exactly-一次

相关问题