我们正在用kafka构建一个异常管理工具。将有源连接器-这将拉记录从物理文件。另一方面,将有sink connect(mongodb sinkconnect),它将从主题中提取记录并将其推送到mongodb。一切正常。
我们需要在不同的主题中捕获事件(用于审计目的)。例如,
源任务(文件轮询任务)启动事件示例(如果收到文件)
源任务(文件轮询任务)结束事件示例(如果文件已完全处理)
sink task(pushing records to mongodb task)start事件示例,mongodb connect开始处理文件a的记录
sink task(pushing records to mongodb task)end事件示例,将文件a的记录完全推送到mongodb
我这里有几个问题:1.通过在sourcetask中示例化kafkaproducer,我们可以将事件发送到不同的主题,一旦文件被完全处理,我们就可以发送一个事件
public class FileSourceTask extends SourceTask {
private Producer<Key, Event> auditProducer;
public void start(Map<String, String> props) {
auditProducer = new KafkaProducer<Key, Event>(auditProps);
}
public List<SourceRecord> poll() {
List<SourceRecord> results = this.filePoller.poll();
if(results.isEmpty() && eventNotSentForCurrentFile) {
Event event = new Event();
auditProducer.send(
new ProducerRecord<Key, Event>(this.props.get("event.topic"), key, event));
}
// futher processing
}
上述方法正确吗?
上面的解决方案工作得很好-因为它只运行一个任务(maxtasks=1),但是对于我们的用例,在sink任务(mongodb connect)中实现这一点非常困难。由于此主题是分区的,因此将创建许多任务。无法跟踪接收器任务的开始事件和结束事件
请提出解决这个问题的方法。
非常感谢。
1条答案
按热度按时间oxcyiej71#
我认为,您可以围绕kafka connect restapi构建一些东西
https://docs.confluent.io/current/connect/restapi.html#get--连接器-(字符串名称)-状态
但是有了它,您需要保持观察器处于连接器状态,并且一旦连接器的所有任务完成,您就可以执行该操作。