spring boot kafka类反序列化-不在受信任的包中

mkh04yzy  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(405)

我知道这个问题很常见,但在遵循不同的解决方案后,我找不到任何有效的解决方案。在kafka中接收消息时,我想反序列化字符串和自定义类对象。用绳子都很好,但我的课不行。我在消费者配置中添加了受信任的包(使用 com.springmiddleware.entities 作为我的班级所在的包裹):

@Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");

        return props;
    }

我有这个在我的房间里 application.yml 文件:

spring:
kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: 'com.springmiddleware.entities'

并将这些线条添加到 application.properties ```
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.springmiddleware.entities
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false

但以下错误不断出现:
org.apache.kafka.common.errors.serializationexception:在偏移量1处反序列化分区topic2-0的键/值时出错。如有需要,请查询过去的记录继续消费。原因:java.lang.illegalargumentexception:类“com.springmiddleware.entities.crime”不在受信任的包:[java.util,java.lang]中。如果您认为此类可以安全地反序列化,请提供其名称。如果序列化仅由受信任的源完成,则还可以启用trust all(*)。
更新
接收者配置:

@EnableKafka
@Configuration
public class ReceiverConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> consumerConfigs() {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
    props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
    return props;
}

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),  new StringDeserializer(),
            new JsonDeserializer<>());
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}
更新2

Listener Class (Receiver):

@KafkaListener(topics = "${app.topic.foo}")

@Service
public class Receiver {

 private CountDownLatch latch = new CountDownLatch(1);

    public CountDownLatch getLatch() {
        return latch;
    }

@KafkaHandler
public void listen(@Payload Crime message) {

        System.out.println("Received " + message);
}

@KafkaHandler
public void listen(@Payload String message) {

    System.out.println("Received " +  message);

}

bwntbbo3

bwntbbo31#

就用这个吧 JsonDeserializer 建造师
从版本2.2开始,您可以显式配置反序列化程序,通过使用一个具有布尔useheadersifpresent的重载构造函数(默认情况下为true),使用提供的目标类型并忽略标头中的类型信息。
下面的示例演示如何执行此操作:

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
    new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

您的代码:

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),  new StringDeserializer(),
            new JsonDeserializer<>(Object.class,false));
}

现在使用 @KafkaListener 在班级层面

@KafkaListener(topics = "myTopic")
@Service
public class MultiListenerBean {

@KafkaHandler
public void listen(Cat cat) {
    ...
}

@KafkaHandler
public void listen(Hat hat) {
    ...
}

@KafkaHandler(isDefault = true)
public void delete(Object obj) {
    ...
   }

}

相关问题