如何使用Spring Cloud将消息从Kafka发送到主题

ijxebb2r  于 11个月前  发布在  Apache
关注(0)|答案(1)|浏览(112)

我有问题的生产者Kafka与Spring云与Sping Boot .当我尝试创建配置在application.yml,它总是发送消息到错误的主题.我使用占位符为我的主题“kafka_demo_topic_out_0”,它发送消息不是在正确的目的地Kafka_demo_topic,但在kafka_demo_topic_out_0.这里是我的代码生产者:

package org.heller.kafka.demo.producer;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import org.heller.kafka.demo.producer.pojo.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;

@Component
@EnableScheduling
public class KafkaProducer {

    private AtomicLong idGenerator = new AtomicLong();
    
    @Autowired
    StreamBridge streamBridge;
    
    @Scheduled(fixedDelay  = 1000)
    public void scheduleFixedRateTask() throws Exception {
        Message message = constructMessage();
        System.out.println("sending message" + message);
        
        streamBridge.send("kafka_demo_topic_out_0", new ObjectMapper().writeValueAsString(message));
    }
    
    private Message constructMessage() {
        Message message = new Message();
        message.setId(idGenerator.getAndIncrement());
        message.setUuid( UUID.randomUUID().toString());
        
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        message.setDate(now.format(formatter));
         
        return message;
    }
    
}

字符串
下面是我的应用程序.yml:

spring:
    cloud:
      stream:
        kafka:
          binder:
             autoAddPartitions: true
             brokers: localhost:9092
             auto-create-topics: false
      bindings:
        kafka_demo_topic_out_0:
          producer:
              headerMode: raw
        destination: kafka_demo_topic
        content-type: application/json
        binder: kafka


我使用Spring starter Kafka:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    <version>4.1.0</version>
</dependency>


谢谢你的帮助
更新:Spring cloud stream bridge不知何故忽略了application.yml中的配置。我尝试了这个配置,但它仍然创建了新的主题名为kafkaDemoTopic。

spring:
    cloud:
      stream:
       function:
          definition: kafkaDemoTopic
       kafka:
          binder:
             autoAddPartitions: true
             brokers: localhost:9092
             auto-create-topics: false
      bindings:
        kafkaDemoTopic-out-0:
              headerMode: raw
              destination: kafka_demo_topic
              content-type: application/json
      binder: kafka

li9yvcax

li9yvcax1#

我找到了解决方案,我的应用程序。yml:

spring:
  cloud:
    stream:
      bindings:
        kafkaDemoTopic:
          destination: kafka_demo_topic
    kafka:
       binder:
           autoAddPartitions: true
           brokers: localhost:9092
           autoCreateTopics: false

字符串
我的Kafka制作人:

package org.heller.kafka.demo.producer;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import org.heller.kafka.demo.producer.pojo.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;

@Component
@EnableScheduling

public class KafkaProducer {

    private AtomicLong idGenerator = new AtomicLong();
    
    @Autowired
    StreamBridge streamBridge;
    
    @Scheduled(fixedDelay  = 1000)
    public void scheduleFixedRateTask() throws Exception {
        Message message = constructMessage();
        System.out.println("sending message" + message);
        
        streamBridge.send("kafkaDemoTopic", new ObjectMapper().writeValueAsString(message));
    }
    
    private Message constructMessage() {
        Message message = new Message();
        message.setId(idGenerator.getAndIncrement());
        message.setUuid( UUID.randomUUID().toString());
        
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        message.setDate(now.format(formatter));
         
        return message;
    }
    
}


公司简介

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>org.heller.kafka.demo</groupId>
    <artifactId>kafka_demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka_demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>4.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>4.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>4.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-commons</artifactId>
            <version>4.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.16.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

相关问题