我们有一个生产者、一个消费者和一个分区。消费者/生产者都是spring引导应用程序。消费者应用程序在我的本地机器上运行,而生产者和kafka&zookeeper在远程机器上运行。
在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在那之后,我的消费者没有收到任何信息。我尝试重新启动消费者,但没有成功。问题是什么和/或如何解决?
使用者配置:
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
input:
destination: sales
content-type: application/json
kafka:
binder:
brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
defaultZkPort: 2181
defaultBrokerPort: 9092
server:
port: 0
生产者配置:
cloud:
stream:
defaultBinder: kafka
bindings:
output:
destination: sales
content-type: application/json
kafka:
binder:
brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
defaultZkPort: 2181
defaultBrokerPort: 9092
编辑2:
5分钟后,消费者应用程序死亡,但以下情况除外:
2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2017-09-12 18:14:47,255 WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-09-12 18:14:47,256 INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel
2017-09-12 18:14:47,258 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel
2条答案
按热度按时间ar7v8xwq1#
嗯,看起来已经有一个错误报告了
spring-cloud-stream-binder-kafka
陈述resetOffset
属性无效。因此,消费者总是请求偏移量为的消息latest
.正如在git问题上提到的,唯一的解决方法是通过kafka consumer cli工具来解决这个问题。
ca1c2owp2#
看看上面关于调试的建议是否揭示了任何进一步的信息。看起来您从kafkatopicprovisioner收到了一些超时异常。但是当你重新启动消费者的时候就会发生这种情况。似乎消费者在与经纪人沟通时遇到了一些问题,你需要了解那里发生了什么。