我正在测试一个kafka连接器的弹性,我想在它运行时杀死一个worker,从而杀死连接器示例。最简单的方法可能是强制分布式模式在多个节点上运行,然后只杀死该节点上的工作进程(对吧?)。如何让kafka connect在不仅仅是它启动的节点上生成worker?这是worker config中定义的吗?
np8igboo1#
是的,处理故障和自动重启工作负载正是kafka connect所能做的。您可以将其作为一个集群运行,通常每个节点有一个worker。然后每个worker运行一个或多个任务,这由connect管理。如果某个辅助进程死亡,则它正在运行的所有任务都将以负载平衡的方式在其他可用辅助进程上重新启动。查看架构参考了解更多信息。要将worker定义为在集群中,请为它们分配相同的 group.id . 有关更多信息,请参阅配置文档。
group.id
juzqafwq2#
所以最后我做的是:将kafka connect分布式模式所需的所有jar复制到我想要在其上运行的两个节点(在hdp2.5.3中,只能在一个节点上获得这些jar)。在这两个节点上,我都使用指向jar的属性文件运行了start脚本。使用rest接口,我发布了带有任务的连接器,我可以看到一个工人拥有连接器示例,另一个工人拥有它的任务。我删除了任务工作者节点(使用 ps -ef | grep connect ),看到它在剩下的节点上重生了。我重置了测试并尝试终止连接器示例节点,令我惊讶的是,连接器示例在另一个节点上重新启动。综上所述,我的弹性测试,Kafka连接似乎是在玩打鼹鼠;无论任务或连接器在哪里,您都可以将它们删除,它们将在其他地方重生。
ps -ef | grep connect
2条答案
按热度按时间np8igboo1#
是的,处理故障和自动重启工作负载正是kafka connect所能做的。您可以将其作为一个集群运行,通常每个节点有一个worker。然后每个worker运行一个或多个任务,这由connect管理。如果某个辅助进程死亡,则它正在运行的所有任务都将以负载平衡的方式在其他可用辅助进程上重新启动。查看架构参考了解更多信息。
要将worker定义为在集群中,请为它们分配相同的
group.id
. 有关更多信息,请参阅配置文档。juzqafwq2#
所以最后我做的是:
将kafka connect分布式模式所需的所有jar复制到我想要在其上运行的两个节点(在hdp2.5.3中,只能在一个节点上获得这些jar)。
在这两个节点上,我都使用指向jar的属性文件运行了start脚本。
使用rest接口,我发布了带有任务的连接器,我可以看到一个工人拥有连接器示例,另一个工人拥有它的任务。
我删除了任务工作者节点(使用
ps -ef | grep connect
),看到它在剩下的节点上重生了。我重置了测试并尝试终止连接器示例节点,令我惊讶的是,连接器示例在另一个节点上重新启动。
综上所述,我的弹性测试,Kafka连接似乎是在玩打鼹鼠;无论任务或连接器在哪里,您都可以将它们删除,它们将在其他地方重生。