我正在使用kafka connect api实现一个自定义源连接器,它可以用来轮询restapi并将json响应接收到kafka主题中。现在我想知道如何实现sourcetask的轮询间隔,jdbc连接器如何提供一个轮询间隔。在某个地方我必须让线程进入睡眠状态,但在哪里我必须这样做呢?
nwlqm0z11#
我用自己的方法解决了这个用例 SourceTask 通过添加类型为 long 存储时间戳。一开始 poll() 调用字段尚未初始化,因此将轮询已配置的RESTAPI。当第一次调用 long 字段get用当前时间戳初始化。在以下所有方面 poll() 调用上一次调用的这个时间戳被检查了。如果自上一个 poll() 小于两次轮询之间的配置间隔,我将线程发送到睡眠状态,因为配置的毫秒已过。
SourceTask
long
poll()
x6h2sr282#
使用 max.poll.interval.ms .请参考此链接:https://kafka.apache.org/documentation/
max.poll.interval.ms
2条答案
按热度按时间nwlqm0z11#
我用自己的方法解决了这个用例
SourceTask
通过添加类型为long
存储时间戳。一开始poll()
调用字段尚未初始化,因此将轮询已配置的RESTAPI。当第一次调用long
字段get用当前时间戳初始化。在以下所有方面poll()
调用上一次调用的这个时间戳被检查了。如果自上一个poll()
小于两次轮询之间的配置间隔,我将线程发送到睡眠状态,因为配置的毫秒已过。x6h2sr282#
使用
max.poll.interval.ms
.请参考此链接:https://kafka.apache.org/documentation/