从Kafka connect中删除源连接器后,此连接器的已保存偏移量仍然存在于Kafka中。当再次启动具有相同名称的相同连接器时,它将从上次保存的偏移量开始处理数据。更改连接器的名称不是一个解决方案,因为它很难自动提交连接器。是否有解决此问题的方法(重命名源连接器不是一个选项)
e5njpo681#
连接Connect使用的偏移量主题;它是JSON,因此可以检查其格式。构造一系列将源的偏移量设置为0的消息有关使用FileStreamSource的完整示例,请参见see this post有一个KIP-199和JIRA也一直在要求围绕这一点的工具,也要求删除源连接器应该删除它的偏移量。
4urapxun2#
在3.6.0版本中,Kafka Connect将添加原生支持,用于通过REST API重置sink和source连接器的偏移量,作为KIP-875的一部分。如果您运行的是3.6.0或更高版本,请首先向/connectors/{name}/stop端点发出一个PUT请求以停止(但不删除)连接器,然后通过向/connectors/{name}/offsets端点发出一个PUT请求来重置其偏移量。
/connectors/{name}/stop
/connectors/{name}/offsets
nwnhqdif3#
我发现,如果你知道Kafka Connector正在使用的主题,这可能会起作用:Kafka : Reset offset of a specific partition of topic,这里还有另一个可能有帮助的参考:https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b我想我也遇到过类似的问题,我所做的就是删除以偏移量开头的主题,这意味着我必须停止Kafka Connect,删除适用的偏移量主题,然后重新启动Kafka Connect。不确定这是否是最好的选择,但这里有一些选项可以满足您的请求。
lyfkaqu14#
重置kafka-connect连接器的使用者组的偏移量。所有连接器的使用者组都不同-因此应该没问题,不会影响任何其他连接器。
4条答案
按热度按时间e5njpo681#
连接Connect使用的偏移量主题;它是JSON,因此可以检查其格式。
构造一系列将源的偏移量设置为0的消息
有关使用FileStreamSource的完整示例,请参见see this post
有一个KIP-199和JIRA也一直在要求围绕这一点的工具,也要求删除源连接器应该删除它的偏移量。
4urapxun2#
在3.6.0版本中,Kafka Connect将添加原生支持,用于通过REST API重置sink和source连接器的偏移量,作为KIP-875的一部分。
如果您运行的是3.6.0或更高版本,请首先向
/connectors/{name}/stop
端点发出一个PUT请求以停止(但不删除)连接器,然后通过向/connectors/{name}/offsets
端点发出一个PUT请求来重置其偏移量。nwnhqdif3#
我发现,如果你知道Kafka Connector正在使用的主题,这可能会起作用:Kafka : Reset offset of a specific partition of topic,这里还有另一个可能有帮助的参考:https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b
我想我也遇到过类似的问题,我所做的就是删除以偏移量开头的主题,这意味着我必须停止Kafka Connect,删除适用的偏移量主题,然后重新启动Kafka Connect。不确定这是否是最好的选择,但这里有一些选项可以满足您的请求。
lyfkaqu14#
重置kafka-connect连接器的使用者组的偏移量。所有连接器的使用者组都不同-因此应该没问题,不会影响任何其他连接器。