kafka侦听器-在spring boot中配置侦听器?

bvk5enib  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(634)

我正在尝试配置confluent-ConsumerTimesTampsInteptor以支持confluent kafka复制,并配置了JavaSpring引导应用程序,如下所述
应用程序属性


# consumer timestamp interceptor

interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor
timestamp.producer.security.protocol=PLAINTEXT
timestamp.producer.sasl.mechanism=NONE

# timestamp.producer.ssl.endpoint.identification.algorithm=

# timestamp.producer.sasl.jaas.config=

src.consumer.group.id=lkc-302y2

消费者.java

package com.example.demo.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.example.demo.model.ProductKafka;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.LinkedList;
import java.util.Queue;
import java.util.logging.Logger;

@Service
public class Consumer {
    private static final Logger LOGGER = Logger.getLogger(Consumer.class.getName());
    public static ArrayDeque<ProductKafka> consumeQueue = new ArrayDeque<>();

    @KafkaListener(autoStartup = "false", topics = "#{'${spring.kafka.topics}'.split('\\\\ ')}", groupId = "#{'${spring.kafka.groupId}'}")
    public void consume(ProductKafka productKafka) throws IOException {
        consumeQueue.offer(productKafka);

        LOGGER.info(String.format("#### -> Logger Consumed message -> %s", productKafka.toString()));
        System.out.printf("#### -> Consumed message -> %s", productKafka.toString());
    }
}

下面是项目结构
在此处输入图像描述
但是它不起作用。
我们的kafka管理员提到,虽然我添加了拦截器,但是kafka消费者没有配置为使用拦截器。
我不知道如何让消费者使用拦截器

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题