我正在尝试配置confluent-ConsumerTimesTampsInteptor以支持confluent kafka复制,并配置了JavaSpring引导应用程序,如下所述
应用程序属性
# consumer timestamp interceptor
interceptor.classes=io.confluent.connect.replicator.offsets.ConsumerTimestampsInterceptor
timestamp.producer.security.protocol=PLAINTEXT
timestamp.producer.sasl.mechanism=NONE
# timestamp.producer.ssl.endpoint.identification.algorithm=
# timestamp.producer.sasl.jaas.config=
src.consumer.group.id=lkc-302y2
消费者.java
package com.example.demo.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.example.demo.model.ProductKafka;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.LinkedList;
import java.util.Queue;
import java.util.logging.Logger;
@Service
public class Consumer {
private static final Logger LOGGER = Logger.getLogger(Consumer.class.getName());
public static ArrayDeque<ProductKafka> consumeQueue = new ArrayDeque<>();
@KafkaListener(autoStartup = "false", topics = "#{'${spring.kafka.topics}'.split('\\\\ ')}", groupId = "#{'${spring.kafka.groupId}'}")
public void consume(ProductKafka productKafka) throws IOException {
consumeQueue.offer(productKafka);
LOGGER.info(String.format("#### -> Logger Consumed message -> %s", productKafka.toString()));
System.out.printf("#### -> Consumed message -> %s", productKafka.toString());
}
}
下面是项目结构
在此处输入图像描述
但是它不起作用。
我们的kafka管理员提到,虽然我添加了拦截器,但是kafka消费者没有配置为使用拦截器。
我不知道如何让消费者使用拦截器
暂无答案!
目前还没有任何答案,快来回答吧!