获取队列中的所有kafka消息并停止java流式传输

ijnw1ujt  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(289)

我需要在晚上执行一个作业,它将获取kafka队列中的所有消息,并对它们执行一个进程。我可以得到消息,但Kafka流正在等待更多的消息,我无法继续我的进程。我有以下代码:

...
private ConsumerConnector consumerConnector;
private final static String TOPIC = "test";

public MessageStreamConsumer() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "test-group");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    }
public List<String> getMessages() {
                Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
                topicCountMap.put(TOPIC, new Integer(1));
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                        .createMessageStreams(topicCountMap);
                KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
                ConsumerIterator<byte[], byte[]> it = stream.iterator();
                List<String> messages = new ArrayList<>();
                while (it.hasNext())
                    messages.add(new String(it.next().message()));
                return messages;
            }

代码能够获取消息,但当它处理最后一条消息时,它将保留在行中:

while (it.hasNext())

问题是,我怎样才能从Kafka那里得到所有的信息,停止流并继续我的其他任务。
我希望你能帮助我
谢谢

yjghlzjz

yjghlzjz1#

我目前正在使用kafka 0.10.0.1进行开发,发现了有关使用consumer property auto.offset.reset的混合信息,所以我做了一些实验来找出实际发生的情况。
基于这些,我现在这样理解:当设置属性时:

auto.offset.reset=earliest

这会将使用者定位到分配的分区中的第一条可用消息(在分区上未进行任何提交时),或者将使用者定位到最后提交的分区偏移量(请注意,您总是提交上次读取偏移量+1,否则您将在每次重新启动时重新读取最后提交的消息)消费者)
或者,不设置auto.offset.reset,这意味着将使用默认值“latest”。
在这种情况下,您不会收到任何有关连接使用者的旧消息-只会收到连接使用者后发布到主题的消息。
最后,如果要确保接收某个主题和指定分区的所有可用消息,则必须调用seektobeginning()。
似乎建议首先调用poll(0l),以确保您的使用者获得分配的分区(或者在ConsumerBalanceListener中实现您的代码!),然后将每个分配的分区搜索到“开始”:

kafkaConsumer.poll(0L);
kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());
57hvy0tb

57hvy0tb2#

像这样的办法也许行得通。基本上,这个想法是使用Kafka消费者和民意调查,直到你得到一些记录,然后停止时,你得到一个空批。

package kafka.examples;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer1 extends Thread
{
    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;
    private final DateFormat df;
    private final String logTag;
    private boolean noMoreData = false;
    private boolean gotData = false;
    private int messagesReceived = 0;
    AtomicBoolean isRunning = new AtomicBoolean(true);
    CountDownLatch shutdownLatch = new CountDownLatch(1);

    public Consumer1(Properties props)
    {
        logTag = "Consumer1";

        consumer = new KafkaConsumer<>(props);
        this.topic = props.getProperty("topic");
        this.df = new SimpleDateFormat("HH:mm:ss");

        consumer.subscribe(Collections.singletonList(this.topic));
    }

    public void getMessages() {
        System.out.println("Getting messages...");
        while (noMoreData == false) {
            //System.out.println(logTag + ": Doing work...");

            ConsumerRecords<Integer, String> records = consumer.poll(1000);
            Date now = Calendar.getInstance().getTime();
            int recordsCount = records.count();
            messagesReceived += recordsCount;
            System.out.println("recordsCount: " + recordsCount);
            if (recordsCount > 0) {
               gotData = true;
            }

            if (gotData && recordsCount == 0) {
                noMoreData = true;
            }

            for (ConsumerRecord<Integer, String> record : records) {
                int kafkaKey = record.key();
                String kafkaValue = record.value();
                System.out.println(this.df.format(now) + " " + logTag + ":" +
                        " Received: {" + kafkaKey + ":" + kafkaValue + "}" +
                        ", partition(" + record.partition() + ")" +
                        ", offset(" + record.offset() + ")");
            }
        }
        System.out.println("Received " + messagesReceived + " messages");
    }

    public void processMessages() {
        System.out.println("Processing messages...");
    }

    public void run() {
        getMessages();
        processMessages();
    }
}
ekqde3dh

ekqde3dh3#

Kafka流似乎从一开始就不支持消费。
您可以创建一个本地kafka消费者并设置 auto.offset.reset 到最早,它将从一开始就消耗消息。

相关问题