简介
我目前正在使用来自Spring-cloud-stream v2的Sink
接口:https://javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/2.0.0.RELEASE/index.html
我写了单元测试来测试我的代码。
我的项目在迁移前使用这些注解的代码:
StreamUtils.java
package com.enterprise.production.event.consumer;
import com.enterprise.production.model.message.EnergyMeasure;
import com.enterprise.production.model.message.EnergyProductionMessage;
import com.enterprise.production.model.message.Units;
import com.enterprise.production.model.time.InstantPeriod;
import com.enterprise.production.repository.entity.ProductionEntity;
import com.enterprise.production.stream.service.ProductionMessageConverter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static com.enterprise.production.event.consumer.ProductionMessageConsumerIT.NOW;
@Component
@Slf4j
public class StreamUtils {
@Autowired
private Sink sink;
@Autowired
ProductionMessageConverter productionMessageConverter;
@Autowired
ObjectMapper objectMapper;
public List<ProductionEntity> generateAndSend2MessagesWith2Productions(int start, int end, String deviceId, String username, String tenantId, String unitsValue, String startDate) {
List<ProductionEntity> result = new ArrayList<>();
IntStream.rangeClosed(start, end)
.boxed()
.collect(Collectors.toList())
.stream()
.forEach(i -> {
int dateInc = i * 4;
Instant period1From = Instant.parse(startDate).plus(dateInc, ChronoUnit.DAYS);
Instant period1To = Instant.parse(startDate).plus(dateInc + 1, ChronoUnit.DAYS);
Instant period2From = Instant.parse(startDate).plus(dateInc + 2, ChronoUnit.DAYS);
Instant period2To = Instant.parse(startDate).plus(dateInc + 3, ChronoUnit.DAYS);
Instant expiry = Instant.parse("2019-01-01T00:00:00.000Z");
// dateInc = dateInc + 4;
EnergyMeasure production1 = EnergyMeasure.builder()
.period(new InstantPeriod(period1From, period1To))
.expiry(expiry)
.production(1.6)
.networkConsumption(8.0)
.selfConsumption(1.6)
.build();
EnergyMeasure production2 = EnergyMeasure.builder()
.period(new InstantPeriod(period2From, period2To))
.expiry(expiry)
.production(1.6)
.networkConsumption(0.0)
.selfConsumption(1.6)
.build();
Map<String, Units> units = ImmutableMap.<String, Units>builder()
.put("production", new Units(unitsValue))
.put("selfConsumption", new Units(unitsValue))
.put("networkConsumption", new Units(unitsValue))
.build();
EnergyProductionMessage message = EnergyProductionMessage.builder()
.deviceId(deviceId)
.username(username)
.tenantId(tenantId)
.units(units)
.solarEnergies(Arrays.asList(production1, production2))
.build();
List<ProductionEntity> productions = productionMessageConverter.convert(message);
productions.forEach(productionEntity -> productionEntity.setExpiry(computeTtl(productionEntity.getBeginTs())));
result.addAll(productions);
sendMessage(message);
});
return result;
}
private long computeTtl(Instant fromBeginTs) {
return fromBeginTs.atZone(ZoneId.of("UTC")).plusMonths(25).toEpochSecond()
- NOW.toEpochMilli() / 1000;
}
public void sendMessage(EnergyProductionMessage message) {
sink.input().send(getMessage(message));
}
public Message<EnergyProductionMessage> getMessage(EnergyProductionMessage message){
return MessageBuilder.withPayload(message).build();
}
public void sendMessage(String payload) throws IOException {
EnergyProductionMessage energyProductionMessage = objectMapper.readValue(payload,
EnergyProductionMessage.class);
Message<EnergyProductionMessage> message = MessageBuilder.withPayload(energyProductionMessage).build();
sink.input().send(message);
}
}
字符串
ProductionMessageConsumerIT.java
package com.enterprise.production.event.consumer;
import com.enterprise.production.Application;
import com.enterprise.production.client.ClientException;
import com.enterprise.production.model.message.EnergyMeasure;
import com.enterprise.production.model.message.EnergyProductionMessage;
import com.enterprise.production.model.message.Units;
import com.enterprise.production.model.time.InstantPeriod;
import com.enterprise.production.repository.ProductionRepository;
import com.enterprise.production.repository.entity.ProductionEntity;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {Application.class, ProductionMessageConsumerIT.TestConfig.class})
@AutoConfigureWireMock(port = 0)
@ActiveProfiles("it")
public class ProductionMessageConsumerIT {
public static final Instant NOW = Instant.parse("2018-07-15T15:15:21Z");
@Autowired
ProductionRepository productionRepository;
@Autowired
StreamUtils streamUtils;
@Autowired
ResourceLoader resourceLoader;
@Configuration
static class TestConfig {
@Bean
@Primary
public Clock clockIT() {
return Clock.fixed(NOW, ZoneOffset.UTC); // timestamp 1531667721
}
}
public static final String CAS_TABLE_ENERGIES = "energies_v2";
public static final String CAS_TABLE_ENERGIES30m = "energies30m_v2";
// Some tests ...
}
型
我的项目在迁移后使用这些注解的代码:
StreamUtils.java
package com.enterprise.production.event.consumer;
import com.enterprise.production.model.message.EnergyMeasure;
import com.enterprise.production.model.message.EnergyProductionMessage;
import com.enterprise.production.model.message.Units;
import com.enterprise.production.model.time.InstantPeriod;
import com.enterprise.production.repository.entity.ProductionEntity;
import com.enterprise.production.stream.service.ProductionMessageConverter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static com.enterprise.production.event.consumer.ProductionMessageConsumerIT.NOW;
@Component
@Slf4j
public class StreamUtils {
@Autowired
ProductionMessageConverter productionMessageConverter;
@Autowired
ObjectMapper objectMapper;
@Autowired
private DirectWithAttributesChannel sink;
public List<ProductionEntity> generateAndSend2MessagesWith2Productions(int start, int end, String deviceId, String username, String tenantId, String unitsValue, String startDate) {
List<ProductionEntity> result = new ArrayList<>();
IntStream.rangeClosed(start, end)
.boxed()
.collect(Collectors.toList())
.stream()
.forEach(i -> {
int dateInc = i * 4;
Instant period1From = Instant.parse(startDate).plus(dateInc, ChronoUnit.DAYS);
Instant period1To = Instant.parse(startDate).plus(dateInc + 1, ChronoUnit.DAYS);
Instant period2From = Instant.parse(startDate).plus(dateInc + 2, ChronoUnit.DAYS);
Instant period2To = Instant.parse(startDate).plus(dateInc + 3, ChronoUnit.DAYS);
Instant expiry = Instant.parse("2019-01-01T00:00:00.000Z");
// dateInc = dateInc + 4;
EnergyMeasure production1 = EnergyMeasure.builder()
.period(new InstantPeriod(period1From, period1To))
.expiry(expiry)
.production(1.6)
.networkConsumption(8.0)
.selfConsumption(1.6)
.build();
EnergyMeasure production2 = EnergyMeasure.builder()
.period(new InstantPeriod(period2From, period2To))
.expiry(expiry)
.production(1.6)
.networkConsumption(0.0)
.selfConsumption(1.6)
.build();
Map<String, Units> units = ImmutableMap.<String, Units>builder()
.put("production", new Units(unitsValue))
.put("selfConsumption", new Units(unitsValue))
.put("networkConsumption", new Units(unitsValue))
.build();
EnergyProductionMessage message = EnergyProductionMessage.builder()
.deviceId(deviceId)
.username(username)
.tenantId(tenantId)
.units(units)
.solarEnergies(Arrays.asList(production1, production2))
.build();
List<ProductionEntity> productions = productionMessageConverter.convert(message);
productions.forEach(productionEntity -> productionEntity.setExpiry(computeTtl(productionEntity.getBeginTs())));
result.addAll(productions);
sendMessage(message);
});
return result;
}
private long computeTtl(Instant fromBeginTs) {
return fromBeginTs.atZone(ZoneId.of("UTC")).plusMonths(25).toEpochSecond()
- NOW.toEpochMilli() / 1000;
}
public void sendMessage(EnergyProductionMessage message) {
sink.send(getMessage(message));
}
public Message<EnergyProductionMessage> getMessage(EnergyProductionMessage message) {
return MessageBuilder.withPayload(message).build();
}
public void sendMessage(String payload) throws IOException {
EnergyProductionMessage energyProductionMessage = objectMapper.readValue(payload,
EnergyProductionMessage.class);
Message<EnergyProductionMessage> message = MessageBuilder.withPayload(energyProductionMessage).build();
sink.send(message);
}
}
型
ProductionMessageConsumerIT.java
package com.enterprise.production.event.consumer;
import com.enterprise.production.Application;
import com.enterprise.production.client.ClientException;
import com.enterprise.production.model.message.EnergyMeasure;
import com.enterprise.production.model.message.EnergyProductionMessage;
import com.enterprise.production.model.message.Units;
import com.enterprise.production.model.time.InstantPeriod;
import com.enterprise.production.repository.ProductionRepository;
import com.enterprise.production.repository.entity.ProductionEntity;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEntityManager;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.test.context.ActiveProfiles;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
@SpringBootTest(classes = {Application.class, ProductionMessageConsumerIT.TestConfig.class})
@AutoConfigureWireMock(port = 0)
@ActiveProfiles("it")
public class ProductionMessageConsumerIT {
public static final Instant NOW = Instant.parse("2018-07-15T15:15:21Z");
public static final String CAS_TABLE_ENERGIES = "energies_v2";
public static final String CAS_TABLE_ENERGIES30m = "energies30m_v2";
@Autowired
ProductionRepository productionRepository;
@Autowired
StreamUtils streamUtils;
@Autowired
ResourceLoader resourceLoader;
// Some Tests ...
@Configuration
static class TestConfig {
@Bean
@Primary
public Clock clockIT() {
return Clock.fixed(NOW, ZoneOffset.UTC); // timestamp 1531667721
}
}
}
型
问题
接口Sink
不在Spring-cloud-stream https://javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/4.0.0/org/springframework/cloud/stream/messaging/package-summary.html的新版本4.0.0上,所以我需要将代码中过去的Sink
替换为Spring-cloud-stream的新类DirectWithAttributesChannel
,但是当我这样做时,我得到了这个错误:
Field sink in com.enterprise.production.event.consumer.StreamUtils required a bean of type 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' that could not be found.
The injection point has the following annotations:
- @org.springframework.beans.factory.annotation.Autowired(required=true)
Action:
Consider defining a bean of type 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' in your configuration.
型
提问
有人知道如何将Sink
接口从spring-cloud-stream v2迁移到spring-cloud-stream v4吗?
1条答案
按热度按时间anauzrmj1#
由
EnableBinding,
Sink,
Source
和Processor
组件组成的基于注解的编程模型早已被弃用,并在Spring Cloud Stream的4.x
版本中完全删除。您应该迁移到基于Spring Cloud Function架构的功能模型。在功能模型中,Sink
可以转换为java.util.function.Consumer
,一个Source
作为一个java.util.function.Supplier
和一个Processor
通过java.util.function.Function
。除了这些,还有一个StreamBridge
API用于按需发送数据。所有这些都可以在参考文档中找到。由于您使用的是4.0.x
,这里是最新4.0.x
的参考文档:https://docs.spring.io/spring-cloud-stream/docs/4.0.3/reference/html/在旧模型中,当您使用
Sink
时,您通常将其与EnableBinding(Sink.class)
一起使用,然后使用类似StreamListener
的东西来接收通过sink发送的消息。sink负责从消息传递中间件消费记录并发送到输入通道,然后由StreamListener
侦听。所有这些基于注解的编程模型和组件都是从框架中删除,优先于功能模型。通过查看您提供的代码示例,看起来您正在尝试生成一些数据,然后将其发送到输入通道。如果您的用例到此结束,那么我建议直接使用Spring Integration,因为它对消息传递通道级别的工作提供了一流的支持。Spring Cloud Stream是一个更高的-在内部使用Spring Integration的抽象层。如果您想在应用程序中生成数据后将其发送到中间件目的地,则可以使用
StreamBridge
。字符串
或者如果你想简单地监听中间件目的地(例如Kafka主题),那么你可以使用标准的
java.util.function.Consumer
(或者如果你想把数据发送到另一个目的地,则使用Function
)。型
或
型