在实现graphql订阅的过程中,我提出了一个穷人的pub/sub解决方案,自定义主题如下:
// The map is a custom class, just to handle multiple clients subscribed to the same data
// and we just create one FluxSink per client which is internally stored in a Set
private static final ConcurrentMultiMap<Long, FluxSink<String>> subscribers =
new ConcurrentMultiMap<>();
public Publisher<String> subscribeToMessages(final Long id) {
return Flux.create(
newSubscriber ->
subscribers.add(
id,
newSubscriber.onDispose(() -> subscribers.remove(id, newSubscriber))),
OverflowStrategy.LATEST);
}
public void publish(final Long id, final String message) {
Optional.of(id)
.map(subscribers::get)
.ifPresent(
subscribers ->
subscribers.forEach(
subscriber -> subscriber.next(message)));
}
接下来,我想用一个使用 RedissonReactiveClient
当我偶然发现 RTopicReactive::getMessages
返回 Flux
. 基本上,前面的代码现在看起来如下所示:
public Publisher<String> subscribeToMessages(final Long id) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
return topic.getMessages(String.class);
}
public void publish(final Long id, final String message) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
topic.publish(message);
}
但不幸的是,这并不像预期的那样有效。据我所知,这些更新是正确发布的(因为我连接了redis示例 SUBSCRIBE
d)甚至在让我的graphql客户机订阅服务器之后 Mono
退回人 publish
指示有一个订户,即我的graphql客户端。但是,它不会更新其数据,并且浏览器中的“网络”选项卡不会显示来自服务器的已发布数据的websocket消息。因此,我假设通过 Flux
在 subscribeToMessages
方法的格式不正确。
当我试图使用 getMessages
题目的方法?
1条答案
按热度按时间ztmd8pv51#
您忘记在两个发布服务器上调用subscribe。应该是这样的: