我正在构建一个工具来使用来自kafka的数据,并将它们插入mongodb,在它们之间进行一些操作。
现在我在做:
// Poll during X ms
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
// For each record, insert it into Mongo
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message of length ["+ record.value().length() +"] received.");
Tools._insertReport(record.value());
}
我正在寻找一个类似于动态集合的解决方案,我可以在其中堆积记录,并且insert方法将插入记录,然后将其从堆中移除?就像一个内部消息队列。。
Java8流是这样的吗?如果没有,有明显的解决办法吗?
编辑1:
两种解决方案似乎都可行。kafka连接器和rxjava,因为rxjava看起来更像我要找的东西,所以我将在这里查看并发布我的研究结果。谢谢大家。
3条答案
按热度按时间mnemlml81#
所以是的,streams一点也不是,但是rxjava是一个合适的答案,可以回答我的用例。更确切地说,出版的主题是。
我创建了一个类,启动了发布主题,并使用
.onNext()
函数将我的Kafka消费记录传递给主题。定义如下:下面是我如何将数据推送到它的:
我不知道是否做得好,但它很有效。。。这是个开始。我不知道如何使用运算符,也不知道如何从这里更改日期类型。但这仍然是一个开始
jaql4c8m2#
我不知道你到底想要达到什么目的,但是为了阅读kafka的信息并将它们写入mongodb,我建议你使用kafka连接mongodb连接器!所有的排队都是由kafka本地连接完成的,无需编写代码。
您会发现许多mongodb连接器适合您的情况,这里有两个:
https://github.com/hpgrahsl/kafka-connect-mongodb/blob/master/readme.md
https://docs.lenses.io/connectors/sink/mongo.html
后者可能是一个很好的选择,首先尝试,它更简单的使用和镜头管理其他几个连接器。
flvlnr443#
Java8流是这样的吗?
不是真的,不是。
显而易见的解决办法是创建一个固定的大小
List<ConsumerRecord>
,然后定期检查该列表的大小。。。当满的时候,循环并刷新到mongo,否则一次只做一个记录。不过,最好使用kafka connect,因为它可以更合理地管理异常、重试和消息转换。