将Spring Cloud Stream v2迁移到Spring Cloud Stream v4 for org.springframework.cloud.stream.messaging

ocebsuys  于 2024-01-05  发布在  Spring
关注(0)|答案(1)|浏览(125)

简介

我目前正在使用来自Spring-cloud-stream v2的Sink接口:https://javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/2.0.0.RELEASE/index.html
我写了单元测试来测试我的代码。

我的项目在迁移前使用这些注解的代码:

StreamUtils.java

  1. package com.enterprise.production.event.consumer;
  2. import com.enterprise.production.model.message.EnergyMeasure;
  3. import com.enterprise.production.model.message.EnergyProductionMessage;
  4. import com.enterprise.production.model.message.Units;
  5. import com.enterprise.production.model.time.InstantPeriod;
  6. import com.enterprise.production.repository.entity.ProductionEntity;
  7. import com.enterprise.production.stream.service.ProductionMessageConverter;
  8. import com.fasterxml.jackson.databind.ObjectMapper;
  9. import com.google.common.collect.ImmutableMap;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.cloud.stream.messaging.Sink;
  13. import org.springframework.messaging.Message;
  14. import org.springframework.messaging.support.MessageBuilder;
  15. import org.springframework.stereotype.Component;
  16. import java.io.IOException;
  17. import java.time.Instant;
  18. import java.time.ZoneId;
  19. import java.time.temporal.ChronoUnit;
  20. import java.util.ArrayList;
  21. import java.util.Arrays;
  22. import java.util.List;
  23. import java.util.Map;
  24. import java.util.stream.Collectors;
  25. import java.util.stream.IntStream;
  26. import static com.enterprise.production.event.consumer.ProductionMessageConsumerIT.NOW;
  27. @Component
  28. @Slf4j
  29. public class StreamUtils {
  30. @Autowired
  31. private Sink sink;
  32. @Autowired
  33. ProductionMessageConverter productionMessageConverter;
  34. @Autowired
  35. ObjectMapper objectMapper;
  36. public List<ProductionEntity> generateAndSend2MessagesWith2Productions(int start, int end, String deviceId, String username, String tenantId, String unitsValue, String startDate) {
  37. List<ProductionEntity> result = new ArrayList<>();
  38. IntStream.rangeClosed(start, end)
  39. .boxed()
  40. .collect(Collectors.toList())
  41. .stream()
  42. .forEach(i -> {
  43. int dateInc = i * 4;
  44. Instant period1From = Instant.parse(startDate).plus(dateInc, ChronoUnit.DAYS);
  45. Instant period1To = Instant.parse(startDate).plus(dateInc + 1, ChronoUnit.DAYS);
  46. Instant period2From = Instant.parse(startDate).plus(dateInc + 2, ChronoUnit.DAYS);
  47. Instant period2To = Instant.parse(startDate).plus(dateInc + 3, ChronoUnit.DAYS);
  48. Instant expiry = Instant.parse("2019-01-01T00:00:00.000Z");
  49. // dateInc = dateInc + 4;
  50. EnergyMeasure production1 = EnergyMeasure.builder()
  51. .period(new InstantPeriod(period1From, period1To))
  52. .expiry(expiry)
  53. .production(1.6)
  54. .networkConsumption(8.0)
  55. .selfConsumption(1.6)
  56. .build();
  57. EnergyMeasure production2 = EnergyMeasure.builder()
  58. .period(new InstantPeriod(period2From, period2To))
  59. .expiry(expiry)
  60. .production(1.6)
  61. .networkConsumption(0.0)
  62. .selfConsumption(1.6)
  63. .build();
  64. Map<String, Units> units = ImmutableMap.<String, Units>builder()
  65. .put("production", new Units(unitsValue))
  66. .put("selfConsumption", new Units(unitsValue))
  67. .put("networkConsumption", new Units(unitsValue))
  68. .build();
  69. EnergyProductionMessage message = EnergyProductionMessage.builder()
  70. .deviceId(deviceId)
  71. .username(username)
  72. .tenantId(tenantId)
  73. .units(units)
  74. .solarEnergies(Arrays.asList(production1, production2))
  75. .build();
  76. List<ProductionEntity> productions = productionMessageConverter.convert(message);
  77. productions.forEach(productionEntity -> productionEntity.setExpiry(computeTtl(productionEntity.getBeginTs())));
  78. result.addAll(productions);
  79. sendMessage(message);
  80. });
  81. return result;
  82. }
  83. private long computeTtl(Instant fromBeginTs) {
  84. return fromBeginTs.atZone(ZoneId.of("UTC")).plusMonths(25).toEpochSecond()
  85. - NOW.toEpochMilli() / 1000;
  86. }
  87. public void sendMessage(EnergyProductionMessage message) {
  88. sink.input().send(getMessage(message));
  89. }
  90. public Message<EnergyProductionMessage> getMessage(EnergyProductionMessage message){
  91. return MessageBuilder.withPayload(message).build();
  92. }
  93. public void sendMessage(String payload) throws IOException {
  94. EnergyProductionMessage energyProductionMessage = objectMapper.readValue(payload,
  95. EnergyProductionMessage.class);
  96. Message<EnergyProductionMessage> message = MessageBuilder.withPayload(energyProductionMessage).build();
  97. sink.input().send(message);
  98. }
  99. }

字符串

ProductionMessageConsumerIT.java

  1. package com.enterprise.production.event.consumer;
  2. import com.enterprise.production.Application;
  3. import com.enterprise.production.client.ClientException;
  4. import com.enterprise.production.model.message.EnergyMeasure;
  5. import com.enterprise.production.model.message.EnergyProductionMessage;
  6. import com.enterprise.production.model.message.Units;
  7. import com.enterprise.production.model.time.InstantPeriod;
  8. import com.enterprise.production.repository.ProductionRepository;
  9. import com.enterprise.production.repository.entity.ProductionEntity;
  10. import com.google.common.collect.ImmutableMap;
  11. import org.apache.commons.lang3.builder.EqualsBuilder;
  12. import org.json.JSONArray;
  13. import org.json.JSONException;
  14. import org.json.JSONObject;
  15. import org.junit.Test;
  16. import org.junit.jupiter.api.Assertions;
  17. import org.junit.runner.RunWith;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.boot.test.context.SpringBootTest;
  20. import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
  21. import org.springframework.context.annotation.Bean;
  22. import org.springframework.context.annotation.Configuration;
  23. import org.springframework.context.annotation.Primary;
  24. import org.springframework.core.io.Resource;
  25. import org.springframework.core.io.ResourceLoader;
  26. import org.springframework.test.context.ActiveProfiles;
  27. import org.springframework.test.context.junit4.SpringRunner;
  28. import java.io.IOException;
  29. import java.nio.file.Files;
  30. import java.time.Clock;
  31. import java.time.Instant;
  32. import java.time.ZoneId;
  33. import java.time.ZoneOffset;
  34. import java.time.temporal.ChronoUnit;
  35. import java.util.*;
  36. import java.util.concurrent.TimeUnit;
  37. import java.util.stream.Stream;
  38. import static com.github.tomakehurst.wiremock.client.WireMock.*;
  39. @RunWith(SpringRunner.class)
  40. @SpringBootTest(classes = {Application.class, ProductionMessageConsumerIT.TestConfig.class})
  41. @AutoConfigureWireMock(port = 0)
  42. @ActiveProfiles("it")
  43. public class ProductionMessageConsumerIT {
  44. public static final Instant NOW = Instant.parse("2018-07-15T15:15:21Z");
  45. @Autowired
  46. ProductionRepository productionRepository;
  47. @Autowired
  48. StreamUtils streamUtils;
  49. @Autowired
  50. ResourceLoader resourceLoader;
  51. @Configuration
  52. static class TestConfig {
  53. @Bean
  54. @Primary
  55. public Clock clockIT() {
  56. return Clock.fixed(NOW, ZoneOffset.UTC); // timestamp 1531667721
  57. }
  58. }
  59. public static final String CAS_TABLE_ENERGIES = "energies_v2";
  60. public static final String CAS_TABLE_ENERGIES30m = "energies30m_v2";
  61. // Some tests ...
  62. }

我的项目在迁移后使用这些注解的代码:

StreamUtils.java

  1. package com.enterprise.production.event.consumer;
  2. import com.enterprise.production.model.message.EnergyMeasure;
  3. import com.enterprise.production.model.message.EnergyProductionMessage;
  4. import com.enterprise.production.model.message.Units;
  5. import com.enterprise.production.model.time.InstantPeriod;
  6. import com.enterprise.production.repository.entity.ProductionEntity;
  7. import com.enterprise.production.stream.service.ProductionMessageConverter;
  8. import com.fasterxml.jackson.databind.ObjectMapper;
  9. import com.google.common.collect.ImmutableMap;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
  13. import org.springframework.messaging.Message;
  14. import org.springframework.messaging.support.MessageBuilder;
  15. import org.springframework.stereotype.Component;
  16. import java.io.IOException;
  17. import java.time.Instant;
  18. import java.time.ZoneId;
  19. import java.time.temporal.ChronoUnit;
  20. import java.util.ArrayList;
  21. import java.util.Arrays;
  22. import java.util.List;
  23. import java.util.Map;
  24. import java.util.stream.Collectors;
  25. import java.util.stream.IntStream;
  26. import static com.enterprise.production.event.consumer.ProductionMessageConsumerIT.NOW;
  27. @Component
  28. @Slf4j
  29. public class StreamUtils {
  30. @Autowired
  31. ProductionMessageConverter productionMessageConverter;
  32. @Autowired
  33. ObjectMapper objectMapper;
  34. @Autowired
  35. private DirectWithAttributesChannel sink;
  36. public List<ProductionEntity> generateAndSend2MessagesWith2Productions(int start, int end, String deviceId, String username, String tenantId, String unitsValue, String startDate) {
  37. List<ProductionEntity> result = new ArrayList<>();
  38. IntStream.rangeClosed(start, end)
  39. .boxed()
  40. .collect(Collectors.toList())
  41. .stream()
  42. .forEach(i -> {
  43. int dateInc = i * 4;
  44. Instant period1From = Instant.parse(startDate).plus(dateInc, ChronoUnit.DAYS);
  45. Instant period1To = Instant.parse(startDate).plus(dateInc + 1, ChronoUnit.DAYS);
  46. Instant period2From = Instant.parse(startDate).plus(dateInc + 2, ChronoUnit.DAYS);
  47. Instant period2To = Instant.parse(startDate).plus(dateInc + 3, ChronoUnit.DAYS);
  48. Instant expiry = Instant.parse("2019-01-01T00:00:00.000Z");
  49. // dateInc = dateInc + 4;
  50. EnergyMeasure production1 = EnergyMeasure.builder()
  51. .period(new InstantPeriod(period1From, period1To))
  52. .expiry(expiry)
  53. .production(1.6)
  54. .networkConsumption(8.0)
  55. .selfConsumption(1.6)
  56. .build();
  57. EnergyMeasure production2 = EnergyMeasure.builder()
  58. .period(new InstantPeriod(period2From, period2To))
  59. .expiry(expiry)
  60. .production(1.6)
  61. .networkConsumption(0.0)
  62. .selfConsumption(1.6)
  63. .build();
  64. Map<String, Units> units = ImmutableMap.<String, Units>builder()
  65. .put("production", new Units(unitsValue))
  66. .put("selfConsumption", new Units(unitsValue))
  67. .put("networkConsumption", new Units(unitsValue))
  68. .build();
  69. EnergyProductionMessage message = EnergyProductionMessage.builder()
  70. .deviceId(deviceId)
  71. .username(username)
  72. .tenantId(tenantId)
  73. .units(units)
  74. .solarEnergies(Arrays.asList(production1, production2))
  75. .build();
  76. List<ProductionEntity> productions = productionMessageConverter.convert(message);
  77. productions.forEach(productionEntity -> productionEntity.setExpiry(computeTtl(productionEntity.getBeginTs())));
  78. result.addAll(productions);
  79. sendMessage(message);
  80. });
  81. return result;
  82. }
  83. private long computeTtl(Instant fromBeginTs) {
  84. return fromBeginTs.atZone(ZoneId.of("UTC")).plusMonths(25).toEpochSecond()
  85. - NOW.toEpochMilli() / 1000;
  86. }
  87. public void sendMessage(EnergyProductionMessage message) {
  88. sink.send(getMessage(message));
  89. }
  90. public Message<EnergyProductionMessage> getMessage(EnergyProductionMessage message) {
  91. return MessageBuilder.withPayload(message).build();
  92. }
  93. public void sendMessage(String payload) throws IOException {
  94. EnergyProductionMessage energyProductionMessage = objectMapper.readValue(payload,
  95. EnergyProductionMessage.class);
  96. Message<EnergyProductionMessage> message = MessageBuilder.withPayload(energyProductionMessage).build();
  97. sink.send(message);
  98. }
  99. }

ProductionMessageConsumerIT.java

  1. package com.enterprise.production.event.consumer;
  2. import com.enterprise.production.Application;
  3. import com.enterprise.production.client.ClientException;
  4. import com.enterprise.production.model.message.EnergyMeasure;
  5. import com.enterprise.production.model.message.EnergyProductionMessage;
  6. import com.enterprise.production.model.message.Units;
  7. import com.enterprise.production.model.time.InstantPeriod;
  8. import com.enterprise.production.repository.ProductionRepository;
  9. import com.enterprise.production.repository.entity.ProductionEntity;
  10. import com.google.common.collect.ImmutableMap;
  11. import org.apache.commons.lang3.builder.EqualsBuilder;
  12. import org.json.JSONArray;
  13. import org.json.JSONException;
  14. import org.json.JSONObject;
  15. import org.junit.jupiter.api.Assertions;
  16. import org.junit.jupiter.api.Test;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
  19. import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa;
  20. import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
  21. import org.springframework.boot.test.context.SpringBootTest;
  22. import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
  23. import org.springframework.context.annotation.Bean;
  24. import org.springframework.context.annotation.Configuration;
  25. import org.springframework.context.annotation.Primary;
  26. import org.springframework.core.io.Resource;
  27. import org.springframework.core.io.ResourceLoader;
  28. import org.springframework.test.context.ActiveProfiles;
  29. import java.io.IOException;
  30. import java.nio.file.Files;
  31. import java.time.Clock;
  32. import java.time.Instant;
  33. import java.time.ZoneId;
  34. import java.time.ZoneOffset;
  35. import java.time.temporal.ChronoUnit;
  36. import java.util.*;
  37. import java.util.concurrent.TimeUnit;
  38. import java.util.stream.Stream;
  39. import static com.github.tomakehurst.wiremock.client.WireMock.*;
  40. import static org.junit.jupiter.api.Assertions.assertThrows;
  41. @SpringBootTest(classes = {Application.class, ProductionMessageConsumerIT.TestConfig.class})
  42. @AutoConfigureWireMock(port = 0)
  43. @ActiveProfiles("it")
  44. public class ProductionMessageConsumerIT {
  45. public static final Instant NOW = Instant.parse("2018-07-15T15:15:21Z");
  46. public static final String CAS_TABLE_ENERGIES = "energies_v2";
  47. public static final String CAS_TABLE_ENERGIES30m = "energies30m_v2";
  48. @Autowired
  49. ProductionRepository productionRepository;
  50. @Autowired
  51. StreamUtils streamUtils;
  52. @Autowired
  53. ResourceLoader resourceLoader;
  54. // Some Tests ...
  55. @Configuration
  56. static class TestConfig {
  57. @Bean
  58. @Primary
  59. public Clock clockIT() {
  60. return Clock.fixed(NOW, ZoneOffset.UTC); // timestamp 1531667721
  61. }
  62. }
  63. }

问题

接口Sink不在Spring-cloud-stream https://javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/4.0.0/org/springframework/cloud/stream/messaging/package-summary.html的新版本4.0.0上,所以我需要将代码中过去的Sink替换为Spring-cloud-stream的新类DirectWithAttributesChannel,但是当我这样做时,我得到了这个错误:

  1. Field sink in com.enterprise.production.event.consumer.StreamUtils required a bean of type 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' that could not be found.
  2. The injection point has the following annotations:
  3. - @org.springframework.beans.factory.annotation.Autowired(required=true)
  4. Action:
  5. Consider defining a bean of type 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' in your configuration.

提问

有人知道如何将Sink接口从spring-cloud-stream v2迁移到spring-cloud-stream v4吗?

anauzrmj

anauzrmj1#

EnableBinding,Sink,SourceProcessor组件组成的基于注解的编程模型早已被弃用,并在Spring Cloud Stream的4.x版本中完全删除。您应该迁移到基于Spring Cloud Function架构的功能模型。在功能模型中,Sink可以转换为java.util.function.Consumer,一个Source作为一个java.util.function.Supplier和一个Processor通过java.util.function.Function。除了这些,还有一个StreamBridge API用于按需发送数据。所有这些都可以在参考文档中找到。由于您使用的是4.0.x,这里是最新4.0.x的参考文档:https://docs.spring.io/spring-cloud-stream/docs/4.0.3/reference/html/
在旧模型中,当您使用Sink时,您通常将其与EnableBinding(Sink.class)一起使用,然后使用类似StreamListener的东西来接收通过sink发送的消息。sink负责从消息传递中间件消费记录并发送到输入通道,然后由StreamListener侦听。所有这些基于注解的编程模型和组件都是从框架中删除,优先于功能模型。通过查看您提供的代码示例,看起来您正在尝试生成一些数据,然后将其发送到输入通道。如果您的用例到此结束,那么我建议直接使用Spring Integration,因为它对消息传递通道级别的工作提供了一流的支持。Spring Cloud Stream是一个更高的-在内部使用Spring Integration的抽象层。
如果您想在应用程序中生成数据后将其发送到中间件目的地,则可以使用StreamBridge

  1. @Autowired
  2. StreamBridge streamBridge;
  3. ...
  4. streamBridge.send("binding-name", data);
  5. }

字符串
或者如果你想简单地监听中间件目的地(例如Kafka主题),那么你可以使用标准的java.util.function.Consumer(或者如果你想把数据发送到另一个目的地,则使用Function)。

  1. @Bean
  2. public Consumer<Data> consume() {
  3. data -> {
  4. // custom code, maybe StreamBridge.send...
  5. }
  6. }
  7. spring.cloud.stream.bindings.consume-in-0.destination: <Middleware destination>


  1. @Bean
  2. public Function<DataIn, DataOut> function() {
  3. return in -> {
  4. ...
  5. return dataOut;
  6. }
  7. }
  8. spring.cloud.stream.bindings.function-in-0.destination: <Middleware destination>
  9. spring.cloud.stream.bindings.function-out-0.destination: <Middleware destination>

展开查看全部

相关问题