我正在尝试用redis流实现一个java应用程序,其中每个consomer只使用一条消息。就像管道/队列一样,每个使用者只接收一条消息,然后对其进行处理,完成后,使用者接收流中到目前为止尚未处理的下一条消息。有效的方法是每个消息只被一个消费者使用(使用xreadgroup)。
我从redislabs的教程开始
代码:
RedisClient redisClient = RedisClient.create("redis://pw@host:port");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();
try {
syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(STREAM_KEY, "0-0"), ID_READ_GROUP);
} catch (RedisBusyException redisBusyException) {
System.out.println(String.format("\t Group '%s' already exists", ID_READ_GROUP));
}
System.out.println("Waiting for new messages ");
while (true) {
List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
Consumer.from(ID_READ_GROUP, ID_WORKER), ReadArgs.StreamOffset.lastConsumed(STREAM_KEY));
if (!messages.isEmpty()) {
System.out.println(messages.size()); //
for (StreamMessage<String, String> message : messages) {
System.out.println(message.getId());
Thread.sleep(5000);
syncCommands.xack(STREAM_KEY, ID_READ_GROUP, message.getId());
}
}
}
我当前的问题是,一个消费者从队列中获取了多条消息,在某些情况下,其他消费者正在等待,而一个消费者同时处理10条消息。
提前谢谢!
1条答案
按热度按时间hvvq6cgz1#
请注意,xreadgroup可以
COUNT
争论。查看javadoc如何在
Lettuce
xreadgroup,通过传递xreadargs。