SpringCloudStream(kafka streams活页夹)应用程序不能支持多个处理器

wyyhbhjk  于 2021-07-06  发布在  Java
关注(0)|答案(0)|浏览(242)

我们已经使用springcloudstream(3.0.9.release)和kafka streams绑定器编写了一个basicstreams处理器。实际的项目包含多个用@springbootapplication注解的类,每个类都有自己的函数,但一次只能使用一个。
我们面临的问题是,当我们将应用捆绑到jar中并将其作为k8s部署运行时,如果我们一次运行任意多个副本,那么这些额外的副本会在几分钟后终止,并且没有任何有意义的错误消息或明显的原因。例如,如果我们运行5个复制副本,1个将无限期地保留并处理事件,但是其他4个(尽管它们最初也开始处理事件)将在不久后退出。显然,如果我们将部署设置为仅运行1个副本,则没有问题,但处理器需要具有可伸缩性。
文档中有关设置应用程序id的这一部分似乎很相关:

For production deployments, it is highly recommended to explicitly specify the application ID
through configuration. This is especially going to be very critical if you are auto scaling your
application in which case you need to make sure that you are deploying each instance with the
same application ID.

但是在绑定级别设置这个值(对于我们的normalize函数):

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            functions:
              normalize:
                applicationId: normalizer-full

也不设置为活页夹级别:

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            applicationId: normalizer-full

似乎能够保持后续的吊舱运行。
谢谢你的帮助。
--编辑--
以下是处理器几分钟后关闭时跟踪日志的捕获:

2020-11-20 06:32:09.721 DEBUG 1 --- [extShutdownHook] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@34f7cfd9, started on Fri Nov 20 06:26:59 GMT 2020
2020-11-20 06:32:09.722 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
2020-11-20 06:32:09.722 DEBUG 1 --- [extShutdownHook] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@5e1dde44, started on Fri Nov 20 06:27:23 GMT 2020, parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@34f7cfd9
2020-11-20 06:32:09.722 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'lifecycleProcessor'
2020-11-20 06:32:09.723 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@38830ea: defining beans [org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,org.springframework.context.event.internalEventListenerProcessor,org.springframework.context.event.internalEventListenerFactory,KStreamBinderConfiguration,org.springframework.boot.autoconfigure.internalCachingMetadataReaderFactory,org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor,org.springframework.boot.context.internalConfigurationPropertiesBinderFactory,org.springframework.boot.context.internalConfigurationPropertiesBinder,org.springframework.boot.context.properties.BoundConfigurationProperties,org.springframework.boot.context.properties.ConfigurationBeanFactoryMetadata,org.springframework.cloud.stream.binder.kafka.streams.MultiBinderPropertiesConfiguration,provisioningProvider,kStreamBinder]; parent: org.springframework.beans.factory.support.DefaultListableBeanFactory@7cb502c
2020-11-20 06:32:09.723 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'org.springframework.boot.autoconfigure.internalCachingMetadataReaderFactory': [org.springframework.context.annotation.internalConfigurationAnnotationProcessor]
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'lifecycleProcessor'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'outputBindingLifecycle'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'inputBindingLifecycle'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'streamsBuilderFactoryManager'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean '_org.springframework.integration.errorLogger'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'integrationHeaderChannelRegistry'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'stream-builder-normalize'
2020-11-20 06:32:09.724 DEBUG 1 --- [extShutdownHook] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483547
...

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题