confluent cloud apache kafka consumer-topic[test-1]不存在,missingtopicsfaltal为true

wsewodh2  于 2021-06-04  发布在  Kafka
关注(0)|答案(3)|浏览(439)

我是一个新手,尝试使用合流云apachekafka在两个spring-boot微服务之间进行通信。
在合流云上使用kafka时,在servicea将消息发布到主题之后,我在我的使用者(serviceb)上遇到以下错误。但是,当我登录到合流云时,我看到消息已经成功发布到主题。

org.springframework.context.ApplicationContextException: Failed to start bean 
'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is 
 java.lang.IllegalStateException: Topic(s) [topic-1] is/are not present and 
 missingTopicsFatal is true

当我在本地服务器上运行kafka时,我不会遇到这个问题。servicea能够将消息发布到本地kafka服务器上的主题,serviceb能够成功地使用该消息。
我在application.properties中提到了我的本地kafka服务器配置(如注解掉的代码)
服务a:生产商
应用程序属性

app.topic=test-1

# Remote

ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
requiredusername="*******"
password="****"

# Local

# ssl.endpoint.identification.algorithm=https

# security.protocol=SASL_SSL

# sasl.mechanism=PLAIN

# request.timeout.ms=20000

# bootstrap.servers=localhost:9092

# retry.backoff.ms=500

# sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule

发件人.java

public class Sender {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Value("${app.topic}")
private String topic;

public void send(String data){
    Message<String> message = MessageBuilder
            .withPayload(data)
            .setHeader(KafkaHeaders.TOPIC, topic)
            .build();
    kafkaTemplate.send(message);
  }
}

kafkaproducerconfig.java文件

@Configuration
@EnableKafka
public class KafkaProducerConfig {

@Value("${bootstrap.servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory(producerConfigs());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate(producerFactory());
 }

}

服务b:消费者
应用程序属性

app.topic=test-1

# Remote

ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
requiredusername="*******"
password="****"

# Local

# ssl.endpoint.identification.algorithm=https

# security.protocol=SASL_SSL

# sasl.mechanism=PLAIN

# request.timeout.ms=20000

# bootstrap.servers=localhost:9092

# retry.backoff.ms=500

# sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule

kafkaconsumerconfig.java文件

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
  @Value("${bootstrap.servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "confluent_cli_consumer_040e5c14-0c18-4ae6-a10f-8c3ff69cbc1a"); // confluent cloud consumer group-id
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory(
            consumerConfigs(),
            new StringDeserializer(), new StringDeserializer());
}

@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
 }
}

Kafka消费者.java

@Service
public class KafkaConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaListener.class);

@Value("{app.topic}")
private String kafkaTopic;

  @KafkaListener(topics = "${app.topic}", containerFactory = "kafkaListenerContainerFactory")
  public void receive(@Payload String data) {
    LOG.info("received data='{}'", data);
  }
}
nbysray5

nbysray51#

用户名和密码是jaas配置的一部分,所以将它们放在一行中

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkaclient1" password="kafkaclient1-secret";

我还建议您验证您的属性文件是否正确加载到客户端

t1rydlwq

t1rydlwq2#

@板球007的答案是正确的。您需要在sasl.jaas.config属性值中嵌入用户名和密码(尤其是集群api密钥和api密码)。
您可以通过以下官方示例再次检查java客户机应如何连接到confluent cloud:https://github.com/confluentinc/examples/blob/5.3.1-post/clients/cloud/java/src/main/java/io/confluent/examples/clients/cloud
谢谢,
--里卡多

cclgggtu

cclgggtu3#

请参阅引导文档。
不能直接将任意kafka属性放在application.properties文件中。
自动配置支持的属性显示在appendix-application-properties.html中。请注意,在大多数情况下,这些属性(连字符或camelcase)直接Map到apachekafka虚线属性。有关详细信息,请参阅apachekafka文档。
这些属性中的前几个应用于所有组件(生产者、消费者、管理员和流),但是如果希望使用不同的值,可以在组件级别指定。apache kafka将属性的重要性指定为high、medium或low。spring boot auto configuration支持所有高重要性属性、一些选定的中、低属性以及任何没有默认值的属性。
只有kafka支持的属性的子集可以直接通过kafkaproperties类获得。如果要使用不直接支持的其他属性配置生产者或使用者,请使用以下属性:

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth

这将公共prop.one kafka属性设置为first(适用于producer、consumers和admins),prop.two admin属性设置为second,prop.three consumer属性设置为third,prop.four producer属性设置为fourth,prop.five streams属性设置为fifth。
...

相关问题