Java8流是我正在寻找的解决方案吗?

e0bqpujr  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(417)

我正在构建一个工具来使用来自kafka的数据,并将它们插入mongodb,在它们之间进行一些操作。
现在我在做:

// Poll during X ms
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
// For each record, insert it into Mongo
for (ConsumerRecord<String, String> record : records) {
  System.out.println("Message of length ["+ record.value().length() +"] received.");
  Tools._insertReport(record.value());
}

我正在寻找一个类似于动态集合的解决方案,我可以在其中堆积记录,并且insert方法将插入记录,然后将其从堆中移除?就像一个内部消息队列。。
Java8流是这样的吗?如果没有,有明显的解决办法吗?
编辑1:
两种解决方案似乎都可行。kafka连接器和rxjava,因为rxjava看起来更像我要找的东西,所以我将在这里查看并发布我的研究结果。谢谢大家。

mnemlml8

mnemlml81#

所以是的,streams一点也不是,但是rxjava是一个合适的答案,可以回答我的用例。更确切地说,出版的主题是。
我创建了一个类,启动了发布主题,并使用 .onNext() 函数将我的Kafka消费记录传递给主题。定义如下:

public static void _initRx(){
        RxRecordsList = PublishSubject.create();
        RxRecordsList.subscribe(_initRxRecordConsumer());
    }
    private static Observer<ConsumerRecord<String,String>> _initRxRecordConsumer(){
        return new Observer<ConsumerRecord<String,String>>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Rx Subscrition : OK");
            }

            @Override
            public void onNext(ConsumerRecord record) {
                System.out.println("Message received - Length : "+record.toString().length());
                MongoHelper._insertReport(record.value().toString());
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Error: "+ e);
            }

            @Override
            public void onComplete() {
                System.out.println("Stream ended");
            }
        };
    }

下面是我如何将数据推送到它的:

while(true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
  for (ConsumerRecord<String, String> record : records) {
    RxHDB_Records.RxRecordsList.onNext(record);
  }
}

我不知道是否做得好,但它很有效。。。这是个开始。我不知道如何使用运算符,也不知道如何从这里更改日期类型。但这仍然是一个开始

jaql4c8m

jaql4c8m2#

我不知道你到底想要达到什么目的,但是为了阅读kafka的信息并将它们写入mongodb,我建议你使用kafka连接mongodb连接器!所有的排队都是由kafka本地连接完成的,无需编写代码。
您会发现许多mongodb连接器适合您的情况,这里有两个:
https://github.com/hpgrahsl/kafka-connect-mongodb/blob/master/readme.md
https://docs.lenses.io/connectors/sink/mongo.html
后者可能是一个很好的选择,首先尝试,它更简单的使用和镜头管理其他几个连接器。

flvlnr44

flvlnr443#

Java8流是这样的吗?
不是真的,不是。
显而易见的解决办法是创建一个固定的大小 List<ConsumerRecord> ,然后定期检查该列表的大小。。。当满的时候,循环并刷新到mongo,否则一次只做一个记录。
不过,最好使用kafka connect,因为它可以更合理地管理异常、重试和消息转换。

相关问题