我用的是勇敢的图书馆https://github.com/openzipkin/brave 为了追踪,现在我想用它也Kafka消费者。我想避免添加Spring侦探和杠杆只是勇敢的Kafka仪器https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients.
对于Kafka消费者,我使用@kafkalistener。代码如下所示:
testkafkaendpoint.java测试卡
@Service
public class TestKafkaEndpoint {
@KafkaListener(topics = "myTestTopic", containerFactory = "testKafkaListenerContainerFactory")
public void procesMyRequest(@Payload final MyRequest request) {
// do some magic...
}
}
和配置类testkafkanconfig.java
@Configuration
@EnableKafka
@ComponentScan
public class TestKafkaConfig {
@Bean
public ConsumerFactory<String, MyRequest> testConsumerFactory() {
final Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01-localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGROUP");
return new DefaultKafkaConsumerFactory<>(consumerProperties, new StringDeserializer(), new JsonDeserializer<>(MyRequest.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyRequest>> testKafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, MyRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(testConsumerFactory());
factory.getContainerProperties().setErrorHandler(new LoggingErrorHandler());
return factory;
}
但我不知道如何使用Kafka消费时,使用Kafka工厂或杠杆Kafka。有没有人有这方面的经验,并得到了它的工作?
1条答案
按热度按时间0aydgbwb1#
我不太熟悉,但看起来
TracingConsumer
是一个简单的消费者 Package :https://github.com/openzipkin/brave/blob/363ceb4c922305ffb4a68ac47dc152e1d15da0fb/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/tracingconsumer.java#l69-179年您应该能够创建
DefaultKafkaConsumerFactory
; 覆盖createConsumer
方法-侦听器容器使用。。。... 调用super.createconsumer(…)并将其 Package 在
TracingConsumer
.如果您使用的是2.5.3或更高版本,则可以添加
ConsumerPostProcessor
向dkcf。侦探就是这么做的:
https://github.com/spring-cloud/spring-cloud-sleuth/blob/6e306e594d20361483fd19739e0f5f8e82354bf5/spring-cloud-sleuth-brave/src/main/java/org/springframework/cloud/sleuth/brave/instrument/messaging/tracemessagingautoconfiguration.java#l263-l285型