spring boot kafka consumer-如何正确使用spring boot中的kafka消息

e0bqpujr  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(239)

我正在开发一个spring-boot应用程序,它应该使用kafka消息。我有一个奇怪的结果:当我使用kafka-console-producer.sh发送消息时,我的消费者只检测并打印其他消息。例如,在kafka控制台producer中,我会键入“one”->enter->“two”->enter->“three”->enter。在我的Spring Boot消费者,我只会看到“二”,“四”,等等。。。
my consumeconfigfactory.java:

import java.util.Properties;

import javax.annotation.PostConstruct;

import kafka.consumer.ConsumerConfig;

import org.springframework.stereotype.Component;

@Component
public class ConsumerConfigFactory {

    private static final String ZK_CONNECT = "10.211.55.2:2181";

    private ConsumerConfig consumerConfig;

    @PostConstruct
    private void createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", ZK_CONNECT);
        props.put("group.id", "Video-cg-0");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "100");
        consumerConfig = new ConsumerConfig(props);
    }

    public ConsumerConfig getConsumerConfig() {
        return consumerConfig;
    }
}

我的consumerthreadpool.java:

import static kafka.consumer.Consumer.createJavaConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.PostConstruct;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.utiltube.kafka.config.ConsumerConfigFactory;

@Component
public class ConsumerThreadPool {

    private static final String TOPIC = "test";
    private static final Integer NUM_THREADS = 1;

    @Autowired
    private ConsumerConfigFactory consumerConfigFactory;

    private ConsumerConnector consumer;
    private ExecutorService threadPool;

    public ConsumerThreadPool() {
        threadPool = Executors.newFixedThreadPool(NUM_THREADS);
    }

    @PostConstruct
    public void startConsuming() {
        ConsumerConfig consumerConfig = consumerConfigFactory.getConsumerConfig();
        consumer = createJavaConsumerConnector(consumerConfig);

        consume();
    }

    public void consume() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, NUM_THREADS);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);

        int threadNumber = 0;
        for (final KafkaStream<byte[], byte[]> stream : streams) {
            threadPool.submit(new VideoConsumer(stream, threadNumber));
            threadNumber++;
        }
    }
}

我的消费者.java:

import java.io.IOException;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.utiltube.kafka.video.model.Video;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class VideoConsumer implements Runnable {
    private ObjectMapper objectMapper;
    private KafkaStream<byte[], byte[]> kafkaStream;
    private int threadNumber;

    public VideoConsumer(KafkaStream<byte[], byte[]> kafkaStream, int threadNumber) {
        this.threadNumber = threadNumber;
        this.kafkaStream = kafkaStream;
        this.objectMapper = new ObjectMapper();
    }
    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();

        while (it.hasNext()) {
            byte[] messageData = it.next().message();
            try {
                //String videoFromMessage = objectMapper.readValue(messageData, String.class);
                //byte[] videoFromMessage = it.next().message();
                //System.out.print("got message");
                String videoFromMessage = new String(it.next().message());
                System.out.print("Thread:" + threadNumber + ".Consuming video: " + videoFromMessage + "\n");
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }

        System.out.println("Shutting down Thread: " + kafkaStream);
    }
}

为了捕捉和显示每一条信息,你能改变什么?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题