在Kafka的官方合流中,这就是高级消费者的写法。
https://cwiki.apache.org/confluence/display/kafka/consumer+group+example
具体来说,我们在一个分区下创建多个流来使用。
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
随后,我们在consumertest中的每个流中迭代。我想了解手动提交在这种情况下是如何工作的?手动提交由 consumerConnector.commitOffsets()
我们通过多个流进行消费,而consumerconnector对所有流都是公用的。第一个问题是CommitofSets将如何运作?它会穿越所有的溪流吗?如果是这样的话,是否只有一个流才能使用手动提交?我用的是Kafka0.8.2.2
暂无答案!
目前还没有任何答案,快来回答吧!