多线程kafka使用者不并行处理所有分区

j91ykkif  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(333)

我创建了一个多线程kafka使用者,其中每个分区分配一个线程(我总共有100个分区)。我也跟着https://cwiki.apache.org/confluence/display/kafka/consumer+group+example 链接。
下面是我的消费者的init方法。

consumer =  kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
        System.out.println("Kafka Consumer initialized.");
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topicName, 100);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicName);

        executor = Executors.newFixedThreadPool(100);

在上面的init方法中,我得到了kafka流的列表(总共100个),这些流应该连接到每个分区(正如预期的那样)。
然后我使用下面的代码片段将每个流提交到不同的线程。
公共对象调用(){

for (final KafkaStream stream : streams) {
        executor.execute(new StreamWiseConsumer(stream));
    }
    return true;
  }

下面是streamwiseconsumer类。

public class StreamWiseConsumer extends Thread {

    ConsumerIterator<byte[], byte[]> consumerIterator;
    private KafkaStream m_stream;

    public StreamWiseConsumer(ConsumerIterator<byte[], byte[]> consumerIterator) {
        this.consumerIterator = consumerIterator;
    }

    public StreamWiseConsumer(KafkaStream kafkaStream) {
        this.m_stream = kafkaStream;
    }

    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> consumerIterator  = m_stream.iterator();

        while(!Thread.currentThread().isInterrupted() && !interrupted) {
            try {
                if (consumerIterator.hasNext()) {
                    String reqId = UUID.randomUUID().toString();
                    System.out.println(reqId+ " : Event received by threadId : "+Thread.currentThread().getId());
                    MessageAndMetadata<byte[], byte[]> messageAndMetaData = consumerIterator.next();
                    byte[] keyBytes = messageAndMetaData.key();
                    String key = null;
                    if (keyBytes != null) {
                        key = new String(keyBytes);
                    }
                    byte[] eventBytes = messageAndMetaData.message();
                    if (eventBytes == null){
                        System.out.println("Topic: No event fetched for transaction Id:" + key);
                        continue;
                    }
                    String event = new String(eventBytes).trim();
                    // Some Processing code
                    System.out.println(reqId+" : Processing completed for threadId = "+Thread.currentThread().getId());
                    consumer.commitOffsets();
            } catch (Exception ex) {

            }

        }
    }
}

理想情况下,它应该并行地从所有100个分区开始处理。但是它从一个线程中随机抽取一些事件并处理它,然后另一个线程从另一个分区开始处理。它看起来像是顺序处理,但是有不同的线程。我希望所有100个线程都能进行处理。我是不是漏了什么?
日志链接的pfb。https://drive.google.com/file/d/14b7gqpmwurzuwewsdhnw8q01t_cq30es/view?usp=sharinghttps用法:/drive.google.com/file/d/1po\u iesojfquerw0y-m9wrub-1yjuewhf/view?usp=sharing

62lalag4

62lalag41#

我怀疑这是否是垂直缩放Kafka流的正确方法。
kafka流本身就支持多线程消费。
使用num.stream.threads配置增加用于处理的线程数。
如果希望100个线程处理100个分区,请将num.stream.threads设置为100。

相关问题