spring集成从google pub sub读取消息并发布到redis服务器

igsr9ssn  于 2021-06-10  发布在  Redis
关注(0)|答案(0)|浏览(267)

我不熟悉spring集成框架。我试图订阅来自googlepubsub的消息,运行一个小的转换并发布到redis服务器。
我已经写了下面的bean和它的工作以及,但我有几个疑问。
1 . 如何编写消息转换以及如何在下面的代码中集成。传入消息在json 2中。使用this.redistemplate()发布消息是一种很好的做法,也可以使用任何更好的做法。此场景中的音量很大(数据来自sesor)
@Configuration 公共类appconfiguration{

@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
        @Qualifier("pubsubInputChannel") MessageChannel inputChannel, PubSubTemplate pubSubTemplate) {
    System.out.println("I am executed");
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "spring-cleint");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);

    return adapter;
}

@Bean
public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver() {
    return message -> {
        System.out.println("Message arrived! part 2 Payload: " + new String((byte[]) message.getPayload()));
        this.redisTemplate().opsForHash().put("test", "test", "tesst");

        BasicAcknowledgeablePubsubMessage originalMessage = message.getHeaders()
                .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
        originalMessage.ack();
    };
}

@Bean
public JedisConnectionFactory redisConnectionFactory() {

    System.out.println("Conn Factory");

    RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
    config.setHostName("localhost");
    config.setPort(6379);

    return new JedisConnectionFactory(config);
}

@Bean
public JedisPoolConfig jedisPoolConfig() {
    System.out.println("Configuration execued");
    JedisPoolConfig poolConfig = new JedisPoolConfig();

    poolConfig.setMaxIdle(0);
    poolConfig.setMaxTotal(3);
    poolConfig.setTestOnBorrow(false);
    poolConfig.setTestOnReturn(false);
    poolConfig.setTestOnCreate(false);
    poolConfig.setTestWhileIdle(false);
    poolConfig.setMinEvictableIdleTimeMillis(6);
    poolConfig.setTimeBetweenEvictionRunsMillis(3);
    poolConfig.setNumTestsPerEvictionRun(-1);
    poolConfig.setFairness(true);
    return poolConfig;

}

@Bean
public RedisTemplate<String, Object> redisTemplate() {
    RedisTemplate<String, Object> template = new RedisTemplate<>();
    template.setConnectionFactory(redisConnectionFactory());

    GenericJackson2JsonRedisSerializer valSerializer = new GenericJackson2JsonRedisSerializer();
    template.setValueSerializer(valSerializer);

    template.setKeySerializer(new RedisSerializer<Object>() {

        @Override
        public byte[] serialize(Object t) throws SerializationException {
            return (t == null ? null : (":" + t.toString()).getBytes());
        }

        @Override
        public Object deserialize(byte[] bytes) throws SerializationException {
            return (bytes == null ? null : new String(bytes));
        }
    });
    template.setHashValueSerializer(valSerializer);

    return template;
}

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题