我有问题的生产者Kafka与Spring云与Sping Boot .当我尝试创建配置在application.yml,它总是发送消息到错误的主题.我使用占位符为我的主题“kafka_demo_topic_out_0”,它发送消息不是在正确的目的地Kafka_demo_topic,但在kafka_demo_topic_out_0.这里是我的代码生产者:
package org.heller.kafka.demo.producer;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.heller.kafka.demo.producer.pojo.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
@Component
@EnableScheduling
public class KafkaProducer {
private AtomicLong idGenerator = new AtomicLong();
@Autowired
StreamBridge streamBridge;
@Scheduled(fixedDelay = 1000)
public void scheduleFixedRateTask() throws Exception {
Message message = constructMessage();
System.out.println("sending message" + message);
streamBridge.send("kafka_demo_topic_out_0", new ObjectMapper().writeValueAsString(message));
}
private Message constructMessage() {
Message message = new Message();
message.setId(idGenerator.getAndIncrement());
message.setUuid( UUID.randomUUID().toString());
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
message.setDate(now.format(formatter));
return message;
}
}
字符串
下面是我的应用程序.yml:
spring:
cloud:
stream:
kafka:
binder:
autoAddPartitions: true
brokers: localhost:9092
auto-create-topics: false
bindings:
kafka_demo_topic_out_0:
producer:
headerMode: raw
destination: kafka_demo_topic
content-type: application/json
binder: kafka
型
我使用Spring starter Kafka:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>4.1.0</version>
</dependency>
型
谢谢你的帮助
更新:Spring cloud stream bridge不知何故忽略了application.yml中的配置。我尝试了这个配置,但它仍然创建了新的主题名为kafkaDemoTopic。
spring:
cloud:
stream:
function:
definition: kafkaDemoTopic
kafka:
binder:
autoAddPartitions: true
brokers: localhost:9092
auto-create-topics: false
bindings:
kafkaDemoTopic-out-0:
headerMode: raw
destination: kafka_demo_topic
content-type: application/json
binder: kafka
型
1条答案
按热度按时间li9yvcax1#
我找到了解决方案,我的应用程序。yml:
字符串
我的Kafka制作人:
型
公司简介
型