java—在被动redis主题中发布的消息不会发送到客户端

jexiocij  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(360)

在实现graphql订阅的过程中,我提出了一个穷人的pub/sub解决方案,自定义主题如下:

// The map is a custom class, just to handle multiple clients subscribed to the same data
// and we just create one FluxSink per client which is internally stored in a Set
private static final ConcurrentMultiMap<Long, FluxSink<String>> subscribers =
      new ConcurrentMultiMap<>();

public Publisher<String> subscribeToMessages(final Long id) {
  return Flux.create(
      newSubscriber ->
          subscribers.add(
              id,
              newSubscriber.onDispose(() -> subscribers.remove(id, newSubscriber))),
      OverflowStrategy.LATEST);
}

public void publish(final Long id, final String message) {
  Optional.of(id)
      .map(subscribers::get)
      .ifPresent(
          subscribers ->
              subscribers.forEach(
                  subscriber -> subscriber.next(message)));
}

接下来,我想用一个使用 RedissonReactiveClient 当我偶然发现 RTopicReactive::getMessages 返回 Flux . 基本上,前面的代码现在看起来如下所示:

public Publisher<String> subscribeToMessages(final Long id) {
  val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
  return topic.getMessages(String.class);
}

public void publish(final Long id, final String message) {
  val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
  topic.publish(message);
}

但不幸的是,这并不像预期的那样有效。据我所知,这些更新是正确发布的(因为我连接了redis示例 SUBSCRIBE d)甚至在让我的graphql客户机订阅服务器之后 Mono 退回人 publish 指示有一个订户,即我的graphql客户端。但是,它不会更新其数据,并且浏览器中的“网络”选项卡不会显示来自服务器的已发布数据的websocket消息。因此,我假设通过 FluxsubscribeToMessages 方法的格式不正确。
当我试图使用 getMessages 题目的方法?

ztmd8pv5

ztmd8pv51#

您忘记在两个发布服务器上调用subscribe。应该是这样的:

public void publish(final Long id, final String message) {  
   val topic = redissonReactiveClient.getTopic("messages/" + id.toString());  
   topic.publish(message).doOnSuccess(res -> {  

       // ...   

   }).subscribe();
}

public void subscribeToMessages(final Long id) {
   val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
   return topic.getMessages(String.class).doOnNext(res -> {  

       // ...   

   }).subscribe();
}

相关问题