我创建了一个多线程kafka使用者,其中每个分区分配一个线程(我总共有100个分区)。我也跟着https://cwiki.apache.org/confluence/display/kafka/consumer+group+example 链接。
下面是我的消费者的init方法。
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
System.out.println("Kafka Consumer initialized.");
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topicName, 100);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicName);
executor = Executors.newFixedThreadPool(100);
在上面的init方法中,我得到了kafka流的列表(总共100个),这些流应该连接到每个分区(正如预期的那样)。
然后我使用下面的代码片段将每个流提交到不同的线程。
公共对象调用(){
for (final KafkaStream stream : streams) {
executor.execute(new StreamWiseConsumer(stream));
}
return true;
}
下面是streamwiseconsumer类。
public class StreamWiseConsumer extends Thread {
ConsumerIterator<byte[], byte[]> consumerIterator;
private KafkaStream m_stream;
public StreamWiseConsumer(ConsumerIterator<byte[], byte[]> consumerIterator) {
this.consumerIterator = consumerIterator;
}
public StreamWiseConsumer(KafkaStream kafkaStream) {
this.m_stream = kafkaStream;
}
@Override
public void run() {
ConsumerIterator<byte[], byte[]> consumerIterator = m_stream.iterator();
while(!Thread.currentThread().isInterrupted() && !interrupted) {
try {
if (consumerIterator.hasNext()) {
String reqId = UUID.randomUUID().toString();
System.out.println(reqId+ " : Event received by threadId : "+Thread.currentThread().getId());
MessageAndMetadata<byte[], byte[]> messageAndMetaData = consumerIterator.next();
byte[] keyBytes = messageAndMetaData.key();
String key = null;
if (keyBytes != null) {
key = new String(keyBytes);
}
byte[] eventBytes = messageAndMetaData.message();
if (eventBytes == null){
System.out.println("Topic: No event fetched for transaction Id:" + key);
continue;
}
String event = new String(eventBytes).trim();
// Some Processing code
System.out.println(reqId+" : Processing completed for threadId = "+Thread.currentThread().getId());
consumer.commitOffsets();
} catch (Exception ex) {
}
}
}
}
理想情况下,它应该并行地从所有100个分区开始处理。但是它从一个线程中随机抽取一些事件并处理它,然后另一个线程从另一个分区开始处理。它看起来像是顺序处理,但是有不同的线程。我希望所有100个线程都能进行处理。我是不是漏了什么?
日志链接的pfb。https://drive.google.com/file/d/14b7gqpmwurzuwewsdhnw8q01t_cq30es/view?usp=sharinghttps用法:/drive.google.com/file/d/1po\u iesojfquerw0y-m9wrub-1yjuewhf/view?usp=sharing
1条答案
按热度按时间62lalag41#
我怀疑这是否是垂直缩放Kafka流的正确方法。
kafka流本身就支持多线程消费。
使用num.stream.threads配置增加用于处理的线程数。
如果希望100个线程处理100个分区,请将num.stream.threads设置为100。