brave跟踪

a7qyws3x  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(516)

我用的是勇敢的图书馆https://github.com/openzipkin/brave 为了追踪,现在我想用它也Kafka消费者。我想避免添加Spring侦探和杠杆只是勇敢的Kafka仪器https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients.
对于Kafka消费者,我使用@kafkalistener。代码如下所示:
testkafkaendpoint.java测试卡

@Service
public class TestKafkaEndpoint {

    @KafkaListener(topics = "myTestTopic", containerFactory = "testKafkaListenerContainerFactory")
    public void procesMyRequest(@Payload final MyRequest request) {
       // do some magic...
    }
}

和配置类testkafkanconfig.java

@Configuration
@EnableKafka
@ComponentScan
public class TestKafkaConfig {

    @Bean
    public ConsumerFactory<String, MyRequest> testConsumerFactory() {
        final Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01-localhost:9092");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGROUP");
        return new DefaultKafkaConsumerFactory<>(consumerProperties, new StringDeserializer(), new JsonDeserializer<>(MyRequest.class));
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyRequest>> testKafkaListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, MyRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(testConsumerFactory());
        factory.getContainerProperties().setErrorHandler(new LoggingErrorHandler());
        return factory;
    }

但我不知道如何使用Kafka消费时,使用Kafka工厂或杠杆Kafka。有没有人有这方面的经验,并得到了它的工作?

0aydgbwb

0aydgbwb1#

我不太熟悉,但看起来 TracingConsumer 是一个简单的消费者 Package :https://github.com/openzipkin/brave/blob/363ceb4c922305ffb4a68ac47dc152e1d15da0fb/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/tracingconsumer.java#l69-179年
您应该能够创建 DefaultKafkaConsumerFactory ; 覆盖 createConsumer 方法-侦听器容器使用。。。

this.consumer =
        KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
                this.consumerGroupId,
                this.containerProperties.getClientId(),
                KafkaMessageListenerContainer.this.clientIdSuffix,
                consumerProperties);

... 调用super.createconsumer(…)并将其 Package 在 TracingConsumer .
如果您使用的是2.5.3或更高版本,则可以添加 ConsumerPostProcessor 向dkcf。
侦探就是这么做的:
https://github.com/spring-cloud/spring-cloud-sleuth/blob/6e306e594d20361483fd19739e0f5f8e82354bf5/spring-cloud-sleuth-brave/src/main/java/org/springframework/cloud/sleuth/brave/instrument/messaging/tracemessagingautoconfiguration.java#l263-l285型

相关问题