kafka多线程comsumer抛出closedchannelexception

ttygqcqt  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(337)
public class MessageHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class);

private void run() {
    Properties props = new Properties();
    props.put("zookeeper.connect", "localhost:2181");
    props.put("group.id", "testgroup");
    props.put("zookeeper.session.timeout.ms", "500");
    props.put("zookeeper.sync.time.ms", "250");
    props.put("auto.commit.interval.ms", "1000");
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

    String topic = "mytopic";
    Map<String, Integer> topicCount = new HashMap<String, Integer>();
    topicCount.put(topic, 2);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector.createMessageStreams(topicCount);
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
    int thread = 0;
    LOGGER.info("size: {}", streams.size());
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    for (final KafkaStream stream : streams) {
        final int tid = thread++;
        LOGGER.info("submit thread {}", tid);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : (Iterable<MessageAndMetadata<byte[], byte[]>>) stream) {
                    byte[] key = messageAndMetadata.key();
                    String message = new String(messageAndMetadata.message());
                    LOGGER.info("key: {} message: {} thread: {}", key, message, tid);
                }
            }
        });
    }

    if (consumerConnector != null)
        consumerConnector.shutdown();
}

public static void main(String[] args) {
    new MessageHandler().run();
}

}
运行此使用者后,我得到以下异常:

WARN  2016-08-13 22:46:56.969] [testgroup_debian-1471099616127-8c8586c4-leader-finder-thread] kafka.utils.Logging$class.warn(Logging.scala:89) [Fetching topic metadata with correlation id 0 for topics [Set(mytopic)] from broker [BrokerEndPoint(0,debian,9092)] failed]
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

为什么我会得到这个例外?代理和zookeeper的配置应该可以,因为我可以使用控制台生产者/消费者发送/接收消息。

blmhpbnm

blmhpbnm1#

我好像找到问题了。呼叫

consumerConnector.shutdown();

在使用任何消息之前立即关闭连接。

p4tfgftt

p4tfgftt2#

使用者是单线程的,不是线程安全的。如果您想从两个线程中消费,那么每个线程都需要自己的线程 ConsumerConnector 示例。

相关问题