基于Sping Boot 的Kafka主题创作

1wnzp6jl  于 2022-12-11  发布在  Apache
关注(0)|答案(2)|浏览(169)

我试图在Kafka中创建一个新的主题使用Spring Boot 。
在谷歌搜索,我得到了下面的答案

@Configuration
public class KafkaTopicConfiguration {
  
  @Bean
  public NewTopic createTopic(Topic topic) {
    return TopicBuilder.name(topic.getTopicName())
      .partitions(topic.getPartitions())
      .replicas(topic.getReplicas())
      .build();
  }
}

但是我无法使用Sping Boot REST POST Call实现以下内容。

@PostMapping("/api/v1/kafkatopic/")
    public NewTopic createTopic(@RequestBody Topic topic)
    {
        return kt.createTopic(topic);
    }

//kt being object of the Configuration Class

但是,如果主题详细信息已经沿着如下代码(硬编码)传递,则相同的代码也可以工作。

@Bean
  public NewTopic createTopic() {
    return TopicBuilder.name("test-topic")
      .partitions(6)
      .replicas(3)
      .build();
  }

谁来帮帮忙。
先谢了

zbdgwd5y

zbdgwd5y1#

NewTopic Bean仅在应用程序初始化期间创建。在运行时调用Bean工厂方法不会执行任何操作。
要动态创建主题,您需要使用AdminClient。Sping Boot 会自动配置KafkaAdmin bean。
您可以使用AdminClient的属性来建立它。
然后使用客户端创建主题。

try (AdminClient client = AdminClient.create(kafkaAdmin.getConfigurationProperties()) {
    ...
}
jucafojl

jucafojl2#

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import 
 org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import         
org.springframework.kafka.support.serializer.JsonSerializer;
import com.bti.dto.Payload;

 @EnableKafka
 @Configuration
 public class KafkaProducerConfig {
//@Value(value = "${kafka.bootstrapAddress:}")
//private String bootstrapAddress;

@Bean
public ProducerFactory<String, Payload> producerFactory() {
Map<String, Object> configProps = new HashMap<String, Object>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"http://localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
 StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
JsonSerializer.class);
 // configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Payload> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}



**KafkaProducer** 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import     
 org.springframework.util.concurrent.ListenableFutureCallback;

import com.bti.dto.Payload;
@Component
public class KafkaProducer {

private static final Logger LOGGER =             
LoggerFactory.getLogger(KafkaProducer.class);

@Autowired
private KafkaTemplate<String, Payload> requestKafkaTemplate;

public void pushPayload(Payload payload, String topicName) {
try {
//PayloadMapper.validatePayload(payload);

ListenableFuture<SendResult<String, Payload>> future = 
  requestKafkaTemplate.send(topicName, payload);

future.addCallback(new             
ListenableFutureCallback<SendResult<String, 
Payload>>() {

    @Override
    public void onSuccess(SendResult<String, Payload> result) {
    }

    @Override
    public void onFailure(Throwable ex) {
    }

});
  } catch (Exception ex) {
if (ex instanceof NullPointerException) {
    LOGGER.error("Null Pointer Exception Occoured while 
 Converting 
      Exception in KafKa Consumer");
      } else {
          LOGGER.error("Exception in sentToKafka: " + 
     ex.getMessage());
      }
       }
     }
   }
**KafkaConsumerConfig** 
@Configuration
public class KafkaConsumerConfig {

//  @Value(value = "${kafka.bootstrapAddress}")
//    private String bootstrapAddress;
//
//    @Value(value = "test-consumer-group")
//    private String consumerGroupId;

@Bean
public ConsumerFactory<String, Payload> consumerFactory() {
    Map<String, Object> configs = new HashMap<>(); 
    
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,     
  "http://localhost:9092");
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,     
    StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
    JsonDeserializer.class );
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer- 
    group");
    configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
    Integer.MAX_VALUE);

    return new DefaultKafkaConsumerFactory<>(configs,new     
    StringDeserializer(),new JsonDeserializer<>(Payload.class));
     }

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Payload> 
 kafkaListenerContainerFactory()
  {
    ConcurrentKafkaListenerContainerFactory<String, Payload> 
    factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
 }
}

**KafkaConsumer** 

@Component
public class KafkaConsumer {

private static final org.slf4j.Logger LOGGER =     
 LoggerFactory.getLogger(KafkaConsumer.class);

 CountDownLatch poLatch = new CountDownLatch(1);
 @KafkaListener(topics = "abc", containerFactory = 
 "kafkaListenerContainerFactory")
public void receive(ConsumerRecord<?, ?> consumerRecord) throws 
  Exception {

    try {
        consumerRecord.topic();
        Payload payload = (Payload) consumerRecord.value();

        LOGGER.info("Object Received: " + new 
  
 ObjectMapper().writeValueAsString(payload.getBody().getObject()));

    } catch (Exception ex) {
        LOGGER.error("i2Exception.getMessage()");
    }
    this.poLatch.countDown();
}
}

相关问题