apache-kafka 如何在部署期间实现Kafka Streams应用程序的高可用性?

vddsk6oq  于 2022-11-01  发布在  Apache
关注(0)|答案(2)|浏览(165)
  • 主要问题 *:我们在Kubernetes上运行Kafka Streams(Java)应用程序,以便在Kafka集群(运行Confluent Community Edition v7.0/Kafka v3.0)中使用、处理和生成真实的数据。**我们如何在部署应用程序时限制使用记录时的停机时间?**我们的初始目标是每个任务的单次停机时间约为2 sec

我们的目标是对生产环境的更改进行持续部署,但部署会造成应用程序中记录使用的停机时间,从而导致生成的真实的记录出现延迟,因此破坏性太大。
我们尝试了不同的策略来了解它如何影响延迟(停机时间)。

  • 策略1:*
  • 终止所有应用程序示例(共6个)
  • 立即启动所有新应用程序示例
  • 结果:已使用记录的测量最大延迟:85 sec
  • 策略二:*
  • 启动一个新的应用程序示例
  • 等待3 minutes以允许在新应用程序示例中恢复本地状态
  • 3 minutes之后终止一个旧应用程序示例
  • 重复此操作,直到所有旧应用程序示例终止
  • 结果:已使用记录的测量最大延迟:39 sec
  • 策略三:*
  • 与策略#2相同,但将等待时间增加到15 minutes
  • 结果:已使用记录的测量最大延迟:7 sec。然而,每个应用示例15 minutes将导致15分钟x 6个示例= 90 minutes来部署变更+额外的30 minutes来完成增量重新平衡协议。我们发现部署时间非常长。

我们一直在阅读KIP-429: Kafka Consumer Incremental Rebalance Protocol,并尝试配置应用程序以支持我们的用例。
以下是我们为策略2和策略3所做的关键Kafka Streams配置:

acceptable.recovery.lag: 6000
num.standby.replicas: 2
max.warmup.replicas: 6
probing.rebalance.interval.ms: 60000
num.stream.threads: 2

输入主题的平均速率为12 partitions,消息速率为800 records/s。有3个Kafka流键值状态存储,其中两个存储的速率与输入主题的速率相同。这两个存储的速率大约为20GB size。键集的大小大约为4000。理论上,每个分区的更改日志主题上的上述acceptable.recovery.lag的延迟时间应为~60秒。
以下是针对策略#3的每个应用示例的一些指标(运行时间、接收消息的速率和接收消息的延迟):

值得注意的观察结果是:
1a-第一个新应用程序示例启动
1b- Kafka立即重新平衡,2个旧应用程序示例分配了更多任务,2个旧应用程序示例失去了任务
1c-同时,记录的最大延迟从0.2 sec增加到3.5 sec(这表明重新平衡大约需要3 sec
2-发生探测重新平衡,Kafka Streams不知何故决定从一个旧应用示例中撤销任务,并将其分配给已经拥有最多任务的应用示例
3-最后一个旧应用程序示例已终止
4-所有分区在升级前重新平衡,增量重新平衡已完成(最后一个应用程序示例终止后约33 minutes
Other-为第一个新应用示例分配任务需要大约40 minutes。此外,每个任务都被重新分配多次,导致3 sec的许多小中断。
如果需要的话,我们可以为其他策略提供更多关于拓扑、主题、配置和度量图的详细信息(这个线程已经很大了)。

oalqel3c

oalqel3c1#

我无法发表评论(因为我对SO还比较陌生),所以我会在这里回复。你关于StatefulSet和滚动重启的观点是正确的,但是你可以通过在StatefulSet中使用podManagementPolicy: Parallel来解决这个问题。这使得它与部署相同,因为所有的pod都将同时出现。
https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#pod-management-policies

nx7onnlm

nx7onnlm2#

一些建议,一般建议,为您的Kafka流应用程序(不是专业的,但我个人观察)。
1.将您的pod从Deployment更改为StatefulSet,并添加此配置group.instance.id: "${hostname}"。这样,pod将保持pod的相同名称,您将能够使用Kafka Consumer Incremental Rebalance Protocol
1.当你现在使用StatefulSet时,将状态存储持久化在一个永久存储中,它将从Kafka中删除状态存储重载。但是注意你需要根据Kubernetes接收到的信号正确关闭Kafka流,并调整terminationGracePeriodSeconds以允许正确关闭。如果没有,Kafka Stream将检测到检查点不干净,并将重新获取状态存储(如果您的状态存储是持久化的,则不太清楚,但我认为它已经完成了)。
我唯一观察到的是,一个简单的消费者组重新平衡比你预期的2秒要长,这似乎是一个复杂的目标(就像你说的重新平衡需要3秒)。但使用增量重新平衡,我认为你会有一些分区在重新平衡期间被处理(目前没有亲自测量)。

相关问题