kafka消费者:主题控制阅读

x8diyxa7  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(398)

我有下面的kafka消费代码,其中3个线程正在读取kafka主题中的3个分区。
有没有办法,只有在线程当前正在处理的消息得到处理之后,才能从kafka主题中读取新消息。
例如,假设topic中有100条消息,那么是否有任何方法一次只能读取和处理3条消息。现在,当这3条消息得到处理,那么只有下3条消息应该被读取,以此类推。

public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

// now launch all the threads
//
executor = Executors.newFixedThreadPool(3);

// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
    executor.submit(new ConsumerTest(stream, threadNumber));
    threadNumber++;
   }
}
62o28rlo

62o28rlo1#

如果consumertest中的迭代器正在同步处理消息,那么一次只能使用3条消息。默认情况下,enable.auto.commit为true。确保没有将其设置为false,否则需要添加提交偏移量的逻辑。
前-

ConsumerIterator<byte[], byte[]> streamIterator= stream.iterator(); 
 while (streamIterator.hasNext()) { 
   String kafkaMsg= new String(streamIterator.next().message()); 
 }
mpgws1up

mpgws1up2#

嗯,默认情况下,消费者彼此并不了解,因此他们无法“同步”他们的工作。你可以把你的三条信息 Package 成一条(从而保证它们都会被依次回复),或者引入更多的(“子”)主题。
另一种可能性(如果您真的需要保证您的三条消息将被单个消费者使用)可能是您的所有消费者同步他们的工作,或者通知跟踪您工作的控制器。
但是感觉你“做错了”,实际上队列中的消息是无状态的,只有它们在主题中的顺序决定了它们的“处理顺序”。处理消息的时间应该无关紧要。

相关问题