在java中实现微批处理

nwlls2ji  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(354)

我正在开发一个基于Kafka的应用程序,Kafka监听器将在其中监听录音;一旦Kafka收到一个记录,我可以需要写到一个文件记录。这里要将记录写入一个文件,我们要使用具有batchsize和超时设置的微批处理。例如,batchsize为10,timeout设置为1000 ms,这意味着等待10条记录,然后以1000 ms的等待时间写入文件。如果在任何情况下,kafka在1000 ms内仅收到5条记录,则在该批中仅写入5条记录。
我在java中的效率有多高。

ymdaylpp

ymdaylpp1#

在这种情况下,一种常见的方法是将所有记录放在一个队列中。并且有一个线程,它将在队列达到10或1000毫秒后获取这些记录,具体取决于先到的是什么。
消费者代码:

CountDownLatch countDownLatch = new CountDownLatch(10);
 countDownLatch.await(1000, TimeUnit.MILLISECONDS);
 int queueSize = queue.size();
 for(int i = 0; i < queueSize; ++i) {
     ... do your work here or put in a batch a do it right after loop
 }

生产商代码:

Record record = ...receive new record...
 queue.put(record);
 consumer.getCountDownLatch().countDown();

作为一个队列,我建议使用unbound-one,比如 LinkedTransferQueue ,因为您不想在达到10个任务时停止生产者,所以仍然需要使用kafka的结果。
另一个选择是React流。

s2j5cfk0

s2j5cfk02#

听起来你应该使用kafka连接api。这是apachekafka的一部分,旨在支持您描述的流程。
这里有一个开发者指南。

相关问题