Kafka同步生产者需要较长的时间对第一个请求

zf2sa74q  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(465)

我正在SpringBoot微服务中使用SpringCloudStreamKafka同步生产者。每次我们部署服务时,对kafka的第一次调用都需要20多秒才能将消息发布到topic。但所有随后的通话几乎不需要3到4毫秒。这个问题也会随机发生,而且是间歇性的,但主要发生在我们重新启动服务时。我们使用kafka版本0.9.0.1和gradle依赖,如下所示{

  1. compile('org.springframework.cloud:spring-cloud-starter-stream-kafka')
  2. }
  3. dependencyManagement {
  4. imports {
  5. mavenBom "org.springframework.cloud:spring-cloud-dependencies:Camden.SR3"
  6. }
  7. }

这是申请表。基督教青年会

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. output:
  6. content-type: application/json
  7. destination: SOPOrderReceiveTopic
  8. kafka:
  9. binder:
  10. brokers: "localhost:9092,localhost:9093"
  11. headers: eventType
  12. requiredAcks: -1
  13. zkNodes: "localhost:2181"
  14. bindings:
  15. output:
  16. producer:
  17. configuration:
  18. max:
  19. block:
  20. ms: 20000
  21. reconnect:
  22. backoff:
  23. ms: 5000
  24. request:
  25. timeout:
  26. ms: 30000
  27. retries: 3
  28. retry:
  29. backoff:
  30. ms: 10000
  31. timeout:
  32. ms: 30000
  33. sync: true

我使用org.springframework.cloud.stream.messaging.source作为输出通道,这是用于发布消息的方法

  1. public void publish(Message event) {
  2. try {
  3. boolean result = source.output().send(event, orderEventConfig.getTimeoutMs());
  4. logger.log(LoggingEventType.INFORMATION, "MESSAGE SENT TO KAFKA : " + result);
  5. } catch (Exception publishingExceptionMessage) {
  6. logger.log(LoggingEventType.ERROR, "publish event to kafka failed!", publishingExceptionMessage);
  7. throw new PublishEventException("publish event to kafka failed for eventPayload: " + event.getPayload(),
  8. ThreadVariables.getTenantId());
  9. }
  10. }

我知道sync producer在性能方面比较慢,因为它保证了消息的顺序和持久性,但是为什么只有第一个请求需要这么长时间呢?这个问题是已知问题吗?在最新的Kafka版本中是固定的吗。有人能建议吗。谢谢

nr9pn0ug

nr9pn0ug1#

使用以下依赖项下载的spring cloud stream版本似乎有问题,

  1. imports {
  2. mavenBom "org.springframework.cloud:spring-cloud-dependencies:Camden.SR3"
  3. }

尝试升级spring云流并检查。它应该修复springboot服务启动后kafka服务器上第一次发布调用的延迟。

  1. dependencies {
  2. compile('org.springframework.cloud:spring-cloud-stream-binder-kafka')
  3. }
  4. ext { springCloudVersion = 'Dalston.RELEASE' }
  5. dependencyManagement {
  6. imports {
  7. mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
  8. }
  9. }
展开查看全部

相关问题