我喜欢用quarkus中redis.subscribe的响应做一个sse。
我有一个来自quarkus快速启动的简单sse示例
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.TEXT_PLAIN)
@Path("{name}/streaming")
public Multi<String> greeting(@org.jboss.resteasy.annotations.jaxrs.PathParam String name) {
return Multi.createFrom().publisher(vertx.periodicStream(2000).toMulti())
.map(l -> String.format("Hello %s! (%s)%n", name, new Date()));
}
这个效果不错,每2秒钟我就收到一个问候。。。。在我的web浏览器中
现在我尝试订阅redis,所以我应该会收到redis的消息。
redis示例:
(cmd window 1)
SUBSCRIBE message-channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "message-channel"
3) (integer) 1
(cmd window 2)
PUBLISH message-channel HelloWorld
(integer) 1
(cmd window 1)
1) "message"
2) "message-channel"
3) "HelloWorld"
现在我用quarkus sse尝试这个:
@Inject
ReactiveRedisClient reactiveRedisClient;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.TEXT_PLAIN)
@Path("sse/redissse")
public Multi<String> redissse() {
List<String> subscriberList = new ArrayList();
subscriberList.add("message-channel");
return reactiveRedisClient.subscribe(subscriberList)
.onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
.onItem().castTo(String.class);
}
我收到的是一个例外:
WARNING [io.ver.red.cli.imp.RedisConnectionImpl] (vert.x-eventloop-thread-0) No handler waiting for message: [subscribe, message-channel, 1]
有人能支持我吗?有一个简单的例子吗?我对此一无所知,我无法接收带有“订阅”发布的redis消息。
任何建议。。。
2条答案
按热度按时间5vf7fwbs1#
现在我要做的是:
现在它可以工作了,但是如果我从多个浏览器中执行sse,它们就会共享事件。因此,每个用户中只有一个在其他用户(浏览器)之后收到一个事件。
ldfqzlk82#
我没有使用redis pub sub,但我确实使用了redis streams,我要做的是这样的:
`
我想既然pub-sub是非阻塞的,它只运行一次,就不会等到另一条消息到达。你必须实现你自己的目标
while(true)` 以React的方式循环。