kafka schema.registry.url,但不是已知的配置

6mzjoqzu  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(558)

尝试使用schema registry发布主题上的json消息,但出现以下错误。Spring Boot法
..
提供了配置“schema.registry.url”,但不是已知的配置
应用程序yml fle

server:
  port: 9080
spring:
  kafka:
    properties:
      bootstrap.servers: server1:8080 
      schema.registry.url: https://bctdsdg:8081/
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      value-serializer: JsonSerializer.class        
    ssl:
      keystore-location: classpath:cert.jks
      keystore-password: pwd
      key-password: pwd
      truststore-location: classpath:dev_cacerts.jks
      truststore-password: pwd

Kafka配置

@Configuration
@EnableKafka
public class KafkaConfig {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
      public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, com.schemaregjson.serdes.JsonSerializer.class);
        props.put("schema.registry.url", "https://bctdsdg:8081/");
        props.putAll(kafkaProperties.getSsl().buildProperties());
        props.putAll(kafkaProperties.getProperties());
        return props;
      }

     @Bean
        public ProducerFactory<String, User> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }

    @Bean
    public KafkaTemplate<String, User> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
vlju58qv

vlju58qv1#


@SpringBootApplication
public class SchemaregjsonApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(SchemaregjsonApplication.class, args);
    }

    @Autowired
    private JsonProducer jsonproducer;
    User user = new User().withAge(34).withFirstname("Don").withLastname("Joe");
    @Override
    public void run(String... args) throws Exception {
        jsonproducer.send(user);
    }
}

相关问题