我们正在构建一个围绕Kafka的小型 Package 库。我们使用旧版本,在调用kafkaTemplate.send(...);
时没有.whenComplete((response, throwable) -> {...});
。
我们的图书馆正在做以下工作:
1定义一个RetryTemplate
bean:
@Bean
public RetryTemplate prodRetryTemplate() {
final RetryTemplate template = RetryTemplate.builder().maxAttempts(3)
.retryOn(TopicAuthorizationException.class).exponentialBackoff(1000, 2, 2000).build();
return template;
}
2定义异步CompletableFuture方法:
@Override
@Async(EXECUTOR_SERVICE) //we had defined Executor bean, it is ommited here
public CompletableFuture<DtoResponse> asyncSend(final Integer key, final String topic, final String message)
throws Exception {
final ProducerRecord<K, V> producerRecord = buildRecord(key, topic, message);
DtoRespons[] responseHolder = new DtoRespons[1];
template.execute(ctx ->{
final ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(final SendResult<Integer, String> result) {
log.info("success...");
responseHolder[0] = DtoResponse.builder()
.error(null)
.success(true)
.msg("PUBLISHED")
.build();
}
@SneakyThrows
@Override
public void onFailure(final Throwable ex) {
log.error("error...)
throwError(producerRecord, eventName, ex); //do additional logging
}
});
return null; //to satisfy execute method from RetryTemplate
});
return CompletableFuture.completedFuture(responseHolder[0]);
}
我的问题很简单,有没有可能一旦调用asyncSend
,它就会返回null
,即在调用onSuccess
或onFailure
之前执行return CompletableFuture.completedFuture(null);
行?
也就是说,从客户端:
public void myService(){
MyLib lib = new MyLib();
CompletableFuture<DtoResponse> future = lib.sendAsync(key, topic, message);
future.whenComplete((result, throwable) -> {
//is it possible that result and throwable both be null ??
});
}
更新1:
根据加里教授的评论。我不确定这是否是你的意思:
@Override
@Async(EXECUTOR_SERVICE)
public CompletableFuture<SendResult<Integer, String>> asyncSend(final Integer key, final String topic, final String message)
throws Exception {
final ProducerRecord<Integer, String> producerRecord = buildRecord(key, topic, message); //this can throw ex
ListenableFuture<SendResult<Integer, String>> x = template.execute(ctx -> {
final ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);
return future;
});
CompletableFuture<SendResult<Integer, String>> completable = x.completable();
return completable;
}
然后客户端代码将不得不检查throwable
和.whenComplete()
中的SendResult<Integer, String>
。但这不是我需要的。使用SendResult<Integer, String>
,客户端代码仍然无法判断消息是否已发送。如何避免这一点?如果我已经以可理解的方式解释了这一点,请让我知道。另外,我们的库需要在onFailure
和onSuccess
中进行重要的日志记录
更新2:
加里的编辑。
感谢加里的帮助。然而,首先这是可行的,但是当抛出异常时会出现问题。在这种情况下,whenComplete
不会被调用,也不会调用exceptionally
通道。让我来解释一下。
因此,由于我们在.yaml
文件中使用了安全的Kafka,因此有以下属性:
properties:
security.protocol: "SASL_SSL"
sasl.mechanism: "OAUTHBEARER"
sasl.login.callback.handler.class: "path.to.LoginCallBack"
sasl.jaas.config: >-
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required
oauth.token.endpoint.uri="sso-uri"
oauth.ssl.endpoint.identification.algorithm=""
oauth.ssl.truststore.type="JKS"
oauth.client.id="kafka-client-id"
oauth.client.secret="someSecret"
oauth.username="username"
oauth.password="password";
我已经将值oauth.client.id
更改为dummy
,以便激发org.apache.kafka.common.KafkaException: Failed to construct kafka producer
并查看流将在何处结束。
当到达kafkaTemplate.send("so75823718", "foo");
行时,它抛出异常,并且该异常不是whenComplete
中的ex
@Bean
ApplicationRunner runner(Foo foo) {
return args -> {
try {
CompletableFuture<SendResult<String, String>> future
= foo.asyncSend("foo");
future.whenComplete((sr, ex) -> {
if (sr != null){
System.out.println("success");
} else if (ex != null){
System.out.println("fail");
}
});
} catch (Exception e){
System.out.println("not expected nor desired behavior
to have exception here");
e.printStackTrace();
}
}
}
我已经尝试了一些方法,但由于它们根本无法编译,所以不值得展示。所以我现在的问题是,是否有可能不强制asyncSend
的用户使用try/catch,而只是使用whenComplete()
或acceptThen/exceptionally
,并且所有错误处理都在asyncSend
中完成?
更新3:
真的跳,这是一个最终的编辑。
所以我想我是这样管理的:
一、模板
@Value("${kafkaExceptions:}")
private List<String> exceptions = new ArrayList<>();
@Bean
public RetryTemplate template() throws ClassNotFoundException {
exceptions.add(TopicAuthorizationException.class.getName());
RetryTemplateBuilder retryTemplateBuilder = RetryTemplate.builder().maxAttempts(retries).retryOn(TopicAuthorizationException.class);
for (String exceptionClassName : exceptions){
retryTemplateBuilder.retryOn((Class<? extends Throwable>) Class.forName(exceptionClassName));
}
return retryTemplateBuilder.exponentialBackoff(1000, 2, 2000).build();
}
然后
@Async("exec")
public CompletableFuture<SendResult<K, V>> asyncSend(K key, String topic, V message, S) throws Exception {
return template.execute(ctx -> {
try {
ProducerRecord<K, V> producerRecord = buildHeader(key, topic, msg); //can throw custom exception
return kafkaTemplate.send(producerRecord).completable();
} catch (Exception e){
if (reThrow(e)) throw e;
return CompletableFuture.failedFuture(new CustomException(Enum.KAFKA_EX_PROD_PUBLISH_ERROR.findErrorMessage(),e));
}
});
}
private boolean reThrow(Exception e){
String retriableException = e.getClass().getName();
return retriableExceptions.contains(retriableException);
}
我用这种方法控制execute
调用,只需要两次确认。
1.我能确定如果在.whenComplete
中,sr
是非空的,这意味着调用了onSuccess
(从ListenableFutureCallback)回调?如果sr
是非空的,将有一些业务逻辑。
1.我可以肯定,如果在.whenComplete
中,th
是非空的,那么这是因为onFailure
(来自ListenableFutureCallback)发生了,或者我完成了future,但失败了?
1条答案
按热度按时间vtwuwzda1#
completedFuture
您正在返回一个已经完成的future,而不管回调是否已经运行;所以呼叫者将总是看到一个完整的未来。
应该从execute方法返回
future
,然后从asyncSend
返回future.completable()
。请注意,从3.0版开始,模板返回
CompletableFuture
而不是ListenableFuture
。编辑
我刚刚测试了一下,它的工作原理和预期的一样...
编辑2
...并在
send()
上捕获直接异常...