我不熟悉spring集成框架。我试图订阅来自googlepubsub的消息,运行一个小的转换并发布到redis服务器。
我已经写了下面的bean和它的工作以及,但我有几个疑问。
1 . 如何编写消息转换以及如何在下面的代码中集成。传入消息在json 2中。使用this.redistemplate()发布消息是一种很好的做法,也可以使用任何更好的做法。此场景中的音量很大(数据来自sesor)
@Configuration 公共类appconfiguration{
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel, PubSubTemplate pubSubTemplate) {
System.out.println("I am executed");
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "spring-cleint");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@Bean
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -> {
System.out.println("Message arrived! part 2 Payload: " + new String((byte[]) message.getPayload()));
this.redisTemplate().opsForHash().put("test", "test", "tesst");
BasicAcknowledgeablePubsubMessage originalMessage = message.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
originalMessage.ack();
};
}
@Bean
public JedisConnectionFactory redisConnectionFactory() {
System.out.println("Conn Factory");
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName("localhost");
config.setPort(6379);
return new JedisConnectionFactory(config);
}
@Bean
public JedisPoolConfig jedisPoolConfig() {
System.out.println("Configuration execued");
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxIdle(0);
poolConfig.setMaxTotal(3);
poolConfig.setTestOnBorrow(false);
poolConfig.setTestOnReturn(false);
poolConfig.setTestOnCreate(false);
poolConfig.setTestWhileIdle(false);
poolConfig.setMinEvictableIdleTimeMillis(6);
poolConfig.setTimeBetweenEvictionRunsMillis(3);
poolConfig.setNumTestsPerEvictionRun(-1);
poolConfig.setFairness(true);
return poolConfig;
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory());
GenericJackson2JsonRedisSerializer valSerializer = new GenericJackson2JsonRedisSerializer();
template.setValueSerializer(valSerializer);
template.setKeySerializer(new RedisSerializer<Object>() {
@Override
public byte[] serialize(Object t) throws SerializationException {
return (t == null ? null : (":" + t.toString()).getBytes());
}
@Override
public Object deserialize(byte[] bytes) throws SerializationException {
return (bytes == null ? null : new String(bytes));
}
});
template.setHashValueSerializer(valSerializer);
return template;
}
}
暂无答案!
目前还没有任何答案,快来回答吧!