我正在尝试使用vertx-redis-client库实现一个比较和删除(CAD)函数。
this answer如何做到这一点。我已经实现了Lua脚本方法,它工作得很好。然而,由于redis安装可能会禁止使用任意Lua脚本运行eval
命令,我认为我还必须使用WATCH/GET/MULTI/compare/DELETE/UNWATCH命令序列来实现此功能。
然而,我真的很难用vert.x响应式编程的正确方法来完成这件事。
下面是我的代码,它实现了这两种方法来执行CAD操作:
private Future<Boolean> deleteLua(final String key, final String value) {
final RedisAPI redis = RedisAPI.api(client);
final Promise<Boolean> promise = Promise.promise();
redis.eval(List.of("if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('del', KEYS[1]); return 1; else return 0; end", "1", key, value))
.onSuccess(res -> {
System.out.println("Success: " + res);
promise.complete(res.toInteger() == 1);
})
.onFailure(err -> {
System.out.println("Error: " + err);
promise.fail(err);
});
return promise.future();
}
private Future<Boolean> deleteMulti(final String key, final String value) {
final RedisAPI redis = RedisAPI.api(client);
final Promise<Boolean> promise = Promise.promise();
redis.watch(List.of(key))
.onSuccess(watchResult -> {
redis.get(key)
.onSuccess(getResult -> {
redis.multi()
.onSuccess(multiResult -> {
if (value.equals(String.valueOf(getResult))) {
redis.del(List.of(key))
.onSuccess(delResult -> {
System.out.println("key deleted");
});
} else {
System.out.println("value did not match, key not deleted");
promise.complete(false);
}
redis.exec().onSuccess(execResult -> {
System.out.println("execResult: " + execResult);
redis.unwatch().onSuccess(unwatchResult -> {
System.out.println("unwatched key");
promise.complete(true);
});
});
})
.onFailure(multiErr -> {
System.out.println("The multi failed: " + multiErr);
promise.fail(multiErr);
});
});
});
return Future.succeededFuture(true);
}
这两种方法似乎都能工作,但我真的不确定deleteMulti
方法。首先,它只是看起来很丑。。它是如此无休止的嵌套!我相信我可以使用compose
在某种程度上解决这个问题,但我已经尝试过这样做,但一直碰壁,不知道如何做到这一点。例如,我需要multi
的onSuccess
块中的getResult
,但当我尝试使用compose
结构时,一切都变得有点混乱,我不清楚在每个compose块中到底有什么结果。另一件我不太清楚的事情是Redis交易在这里是如何工作的。我的redis
对象是否跟踪会话,以便我对watch
的调用对于我使用同一对象进行的任何后续redis调用都是“活动的”?如何治疗unwatch
?我目前只在onSuccess
中调用watch->get->multi->exec块,但是如果get或multi失败了怎么办?我是否也需要取消对失败的监视,或者只是通过某种方式使会话无效来处理?这实际上似乎不太可能,因为这都是React式的,并且方法(实际创建RedisAPI redis
对象的地方)可能会在所有操作完成之前返回。
所以是的...很困惑。
任何帮助,使这个代码块的形状将非常感谢。
1条答案
按热度按时间4dbbbstv1#
使用
onSuccess()
和onFailure()
是处理期货的一种非常乏味的方法。我建议您熟悉组合子方法,最著名的是map()
和flatMap()
。这些就是我所说的 * 顺序组合运算符 * --也就是说,它们允许表达“首先执行A,然后执行B”。map()
是同步的,而flatMap()
是异步的。(还要注意,compose()
与flatMap()
相同,所以这是风格问题。我更喜欢使用flatMap()
,所以这就是你在下面看到的。)还有其他的操作符用于错误处理等,但这两个是我认为最重要的。也就是说,下面是我的实现:
您会注意到,我在这里没有做任何错误处理。相反,我对代码进行了结构化,以便每当发生错误时(即,
Future
完成时出现错误),它就会通过链传播。该方法返回一个future,当“比较并删除”操作成功时,该future以true
完成,当操作失败时(键不存在,或者有人在事务中间更改了值),该future以false
完成。未来可能会以失败告终,以防出现一些错误。为了测试,我还写了这个简单的函数:
有了这个,我可以在链中插入任意延迟,以便能够在其他人在交易过程中更改值时轻松触发这种情况。例如,我可以插入这一行
在对
redis.del()
和redis.exec()
的调用之间。除了使用future combinators而不是success/failure回调函数之外,还有一个重要的区别。我的实现做的第一件事是从Redis客户端获取一个 * 连接 *。Vert.x Redis客户端管理一个连接池,如果你使用
Redis
对象发送命令,每个命令可能会在不同的连接上执行。另一方面,当使用RedisConnection
对象时,将使用单个连接来发送所有命令,这是Redis事务正常工作所必需的。如您所见,我的方法所做的最后一件事是“关闭”连接(实际上意味着将连接返回到池)。