将Spring v2迁移到Spring v4 for org.springframework.cloud.stream.annotation

ltqd579y  于 2024-01-05  发布在  Spring
关注(0)|答案(2)|浏览(135)

简介

我目前正在使用Spring v2中的EnableBindingStreamListenerhttps://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/2.0.0.RELEASE/org/springframework/cloud/stream/annotation/package-summary.html

我的项目代码使用了这些注解:

  1. import java.time.Clock;
  2. import java.time.Duration;
  3. import java.time.Instant;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.cloud.stream.annotation.EnableBinding;
  6. import org.springframework.cloud.stream.annotation.StreamListener;
  7. import org.springframework.messaging.Message;
  8. import org.springframework.messaging.handler.annotation.Payload;
  9. import org.springframework.stereotype.Service;
  10. import com.enterprise.production.model.exceptions.ProductionStoreException;
  11. import com.enterprise.production.model.message.EnergyProductionMessage;
  12. import com.enterprise.production.stream.service.ProductionStoreService;
  13. import lombok.extern.slf4j.Slf4j;
  14. @EnableBinding(Sink.class)
  15. @Slf4j
  16. @Service
  17. public class ProductionMessageConsumer {
  18. @Autowired
  19. private ProductionStoreService productionService;
  20. @Autowired
  21. private Clock clock;
  22. @StreamListener(target = Sink.INPUT)
  23. public void handleEnergyProductionMessage(@Payload EnergyProductionMessage energyProductionMessage) throws ProductionStoreException {
  24. Instant start = clock.instant();
  25. log.debug("Processing energy productions message with original interval: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
  26. energyProductionMessage.getDeviceId());
  27. log.info("Processing {} energy productions ", energyProductionMessage.getSolarEnergies().size());
  28. productionService.saveProductions(energyProductionMessage);
  29. log.debug("Ending energy productions message with original interval: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
  30. energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
  31. .between(start, clock.instant()).toMillis());
  32. Instant startNormalization = clock.instant();
  33. log.debug("Processing energy productions message with normalization 30m: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
  34. energyProductionMessage.getDeviceId());
  35. productionService.saveProductions30m(energyProductionMessage);
  36. log.debug("Ending energy productions message with normalization 30m: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
  37. energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
  38. .between(startNormalization, clock.instant()).toMillis());
  39. }
  40. @StreamListener("errorChannel")
  41. public void error(Message<?> message) {
  42. log.error("Fail to read message with error '{}'", message.getPayload());
  43. }
  44. }

字符串

问题

我需要迁移到Spring v4,但这些注解在Spring v4中不可用:https://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/4.0.0/org/springframework/cloud/stream/annotation/package-summary.html

提问

有人知道如何将这些注解从Spring v2迁移到Spring v4吗?

ldxq2e6h

ldxq2e6h1#

您正在使用的注解已弃用,并在Spring Cloud Stream的3.2.x的相应部分中提到
基于注解的编程模型。基本上,@EnableBInding,@ Streaming和所有相关的注解现在都被弃用,以支持函数式编程模型。有关更多详细信息,请参阅Spring Cloud Function支持。
当你点击上面的链接时,你会看到
从Spring Cloud Stream v2.1开始,定义流处理程序和源的另一种选择是使用Spring Cloud Function的内置支持,它们可以表示为java.util.function.[Supplier/Function/Consumer]类型的bean。
为了迁移到v4,您需要将@StreamListener的实际配置声明为在特定类中定义的实际Spring bean,并使用@org.springframework.context.annotationConfiguration注解

  1. @Configuration
  2. public class ProductionMessageConsumerConfiguration {
  3. @Autowired
  4. private ProductionStoreService productionService;
  5. @Autowired
  6. private Clock clock;
  7. @Bean
  8. public Consumer<EnergyProductionMessage> consumeEnergyProductionMessage() {
  9. return energyProductionMessage -> {
  10. Instant start = clock.instant();
  11. log.debug("Processing energy productions message with original interval: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
  12. energyProductionMessage.getDeviceId());
  13. log.info("Processing {} energy productions ", energyProductionMessage.getSolarEnergies().size());
  14. productionService.saveProductions(energyProductionMessage);
  15. log.debug("Ending energy productions message with original interval: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
  16. energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
  17. .between(start, clock.instant()).toMillis());
  18. Instant startNormalization = clock.instant();
  19. log.debug("Processing energy productions message with normalization 30m: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
  20. energyProductionMessage.getDeviceId());
  21. productionService.saveProductions30m(energyProductionMessage);
  22. log.debug("Ending energy productions message with normalization 30m: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
  23. energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
  24. .between(startNormalization, clock.instant()).toMillis());
  25. }
  26. }
  27. }

字符串
对于错误处理部分,通过defaut,它将创建一个包含消息有效负载的日志,但您可以按照本节的说明手动处理它

展开查看全部
zzzyeukh

zzzyeukh2#

这些注解是折旧的。您可以使用Consumer定义它

  1. @Service
  2. public class ProductionMessageConsumer {
  3. @Autowired
  4. private ProductionStoreService productionService;
  5. @Bean
  6. public Consumer<EnergyProductionMessage> handleEnergyProductionMessage() {
  7. retrun energyProductionMessage -> {
  8. ...
  9. productionService.saveProductions(energyProductionMessage);
  10. ....
  11. }
  12. }
  13. }

字符串
也请检查这个答案@EnableBinding @从3.1开始已弃用,支持函数式编程模型

展开查看全部

相关问题