java CompletableFuture与Kafka的回调方法?

ivqmmu1c  于 2023-03-28  发布在  Java
关注(0)|答案(1)|浏览(387)

我们正在构建一个围绕Kafka的小型 Package 库。我们使用旧版本,在调用kafkaTemplate.send(...);时没有.whenComplete((response, throwable) -> {...});
我们的图书馆正在做以下工作:
1定义一个RetryTemplate bean:

@Bean
public RetryTemplate prodRetryTemplate() {    
   final RetryTemplate template = RetryTemplate.builder().maxAttempts(3)
         .retryOn(TopicAuthorizationException.class).exponentialBackoff(1000, 2, 2000).build();
   return template;
}

2定义异步CompletableFuture方法:

@Override
@Async(EXECUTOR_SERVICE) //we had defined Executor bean, it is ommited here
public CompletableFuture<DtoResponse> asyncSend(final Integer key, final String topic, final String message)
      throws Exception {

   final ProducerRecord<K, V> producerRecord = buildRecord(key, topic, message);

   DtoRespons[] responseHolder = new DtoRespons[1];
   template.execute(ctx ->{

            final ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);
            future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

               @Override
               public void onSuccess(final SendResult<Integer, String> result) {
                  log.info("success...");
                  responseHolder[0] = DtoResponse.builder()
                        .error(null)
                        .success(true)
                        .msg("PUBLISHED")
                        .build();
               }

               @SneakyThrows
               @Override
               public void onFailure(final Throwable ex) {
                  log.error("error...)
                  throwError(producerRecord, eventName, ex); //do additional logging
               }
            });

      return null; //to satisfy execute method from RetryTemplate
   });

   return CompletableFuture.completedFuture(responseHolder[0]);
}

我的问题很简单,有没有可能一旦调用asyncSend,它就会返回null,即在调用onSuccessonFailure之前执行return CompletableFuture.completedFuture(null);行?
也就是说,从客户端:

public void myService(){  
  MyLib lib = new MyLib();
  CompletableFuture<DtoResponse> future = lib.sendAsync(key, topic, message);
  future.whenComplete((result, throwable) -> { 
     //is it possible that result and throwable both be null ??

    });
}

更新1:
根据加里教授的评论。我不确定这是否是你的意思:

@Override
@Async(EXECUTOR_SERVICE)
public CompletableFuture<SendResult<Integer, String>> asyncSend(final Integer key, final String topic, final String message)
      throws Exception {
   final ProducerRecord<Integer, String> producerRecord = buildRecord(key, topic, message); //this can throw ex

   ListenableFuture<SendResult<Integer, String>> x = template.execute(ctx -> {
      final ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(producerRecord);

      return future;
   });
   CompletableFuture<SendResult<Integer, String>> completable = x.completable();

   return completable;
}

然后客户端代码将不得不检查throwable.whenComplete()中的SendResult<Integer, String>。但这不是我需要的。使用SendResult<Integer, String>,客户端代码仍然无法判断消息是否已发送。如何避免这一点?如果我已经以可理解的方式解释了这一点,请让我知道。另外,我们的库需要在onFailureonSuccess中进行重要的日志记录
更新2:
加里的编辑。
感谢加里的帮助。然而,首先这是可行的,但是当抛出异常时会出现问题。在这种情况下,whenComplete不会被调用,也不会调用exceptionally通道。让我来解释一下。
因此,由于我们在.yaml文件中使用了安全的Kafka,因此有以下属性:

properties:
  security.protocol: "SASL_SSL"
  sasl.mechanism: "OAUTHBEARER"
  sasl.login.callback.handler.class: "path.to.LoginCallBack"
  sasl.jaas.config: >-
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
      required
      oauth.token.endpoint.uri="sso-uri"
      oauth.ssl.endpoint.identification.algorithm=""
      oauth.ssl.truststore.type="JKS"
      oauth.client.id="kafka-client-id"
      oauth.client.secret="someSecret"
      oauth.username="username"
      oauth.password="password";

我已经将值oauth.client.id更改为dummy,以便激发org.apache.kafka.common.KafkaException: Failed to construct kafka producer并查看流将在何处结束。
当到达kafkaTemplate.send("so75823718", "foo");行时,它抛出异常,并且该异常不是whenComplete中的ex

@Bean
ApplicationRunner runner(Foo foo) {
  return args -> {
        try {
            CompletableFuture<SendResult<String, String>> future 
             = foo.asyncSend("foo");
            future.whenComplete((sr, ex) -> {
                if (sr != null){
                    System.out.println("success");
                } else if (ex != null){
                    System.out.println("fail");
                }
            });
        } catch (Exception e){
            System.out.println("not expected nor desired behavior 
            to have exception here");
            e.printStackTrace();
        }
    }
}

我已经尝试了一些方法,但由于它们根本无法编译,所以不值得展示。所以我现在的问题是,是否有可能不强制asyncSend的用户使用try/catch,而只是使用whenComplete()acceptThen/exceptionally,并且所有错误处理都在asyncSend中完成?
更新3:
真的跳,这是一个最终的编辑。
所以我想我是这样管理的:
一、模板

@Value("${kafkaExceptions:}")
private List<String> exceptions = new ArrayList<>();

    @Bean
    public RetryTemplate template() throws ClassNotFoundException {
       exceptions.add(TopicAuthorizationException.class.getName());
       RetryTemplateBuilder retryTemplateBuilder = RetryTemplate.builder().maxAttempts(retries).retryOn(TopicAuthorizationException.class);
       for (String exceptionClassName : exceptions){
          retryTemplateBuilder.retryOn((Class<? extends Throwable>) Class.forName(exceptionClassName));
       }
       return retryTemplateBuilder.exponentialBackoff(1000, 2, 2000).build();
    }

然后

@Async("exec")
public CompletableFuture<SendResult<K, V>> asyncSend(K key, String topic, V message, S) throws Exception {

   return template.execute(ctx -> {

      try {
         ProducerRecord<K, V> producerRecord = buildHeader(key, topic, msg); //can throw custom exception

         return kafkaTemplate.send(producerRecord).completable();
      } catch (Exception e){
         
         if (reThrow(e)) throw e;

         return CompletableFuture.failedFuture(new CustomException(Enum.KAFKA_EX_PROD_PUBLISH_ERROR.findErrorMessage(),e));

      }
   });

}

private boolean reThrow(Exception e){
   String retriableException = e.getClass().getName();
   return retriableExceptions.contains(retriableException);
}

我用这种方法控制execute调用,只需要两次确认。
1.我能确定如果在.whenComplete中,sr是非空的,这意味着调用了onSuccess(从ListenableFutureCallback)回调?如果sr是非空的,将有一些业务逻辑。
1.我可以肯定,如果在.whenComplete中,th是非空的,那么这是因为onFailure(来自ListenableFutureCallback)发生了,或者我完成了future,但失败了?

vtwuwzda

vtwuwzda1#

completedFuture
您正在返回一个已经完成的future,而不管回调是否已经运行;所以呼叫者将总是看到一个完整的未来。
应该从execute方法返回future,然后从asyncSend返回future.completable()
请注意,从3.0版开始,模板返回CompletableFuture而不是ListenableFuture

编辑

我刚刚测试了一下,它的工作原理和预期的一样...

@SpringBootApplication
@EnableAsync
public class So75823718Application {

    public static void main(String[] args) {
        SpringApplication.run(So75823718Application.class, args);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so75823718").partitions(1).replicas(1).build();
    }

    @Bean
    ApplicationRunner runner(Foo foo) {
        return args -> {
            CompletableFuture<SendResult<String, String>> future = foo.asyncSend("foo");
            future.whenComplete((sr, ex) -> System.out.println(future + ": " + sr.getRecordMetadata()));
        };
    }

    @Bean
    TaskExecutor exec() {
        return new SimpleAsyncTaskExecutor();
    }

}

@Component
class Foo {

    RetryTemplate template = new RetryTemplate();

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    @Async("exec")
    public CompletableFuture<SendResult<String, String>> asyncSend(final String message) throws Exception {

        return this.template.execute(ctx -> {
            final ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("so75823718", "foo");

            return future;
        }).completable();
    }

}
java.util.concurrent.CompletableFuture@7ebce1f7[Completed normally]: so75823718-0@12

编辑2

...并在send()上捕获直接异常...

public CompletableFuture<SendResult<String, String>> asyncSend(final String message) throws Exception {

    CompletableFuture<SendResult<String, String>> future = new CompletableFuture<>();
    try {
        CompletableFuture<SendResult<String, String>> fut = this.template.execute(ctx -> {
            return kafkaTemplate.send("so75823718", "foo");
        }).completable();
        fut.whenComplete((sr, ex) -> {
            if (ex != null) {
                future.completeExceptionally(ex);
            }
            else {
                future.complete(sr);
            }
        });
    }
    catch (Exception ex) {
        future.completeExceptionally(ex);
    }
    return future;
}

相关问题