这可能是一个eisenbug,所以我不希望有很难的答案,但更多的提示,以寻找能够复制的错误。
我有一个事件驱动的,基于kafka的系统,由几个服务组成。目前,它们被组织在线性管道中。一个主题,一个事件类型。每个服务都可以看作是从一个事件类型到一个或多个事件类型的转换。
每个转换都作为一个python进程执行,有自己的使用者和生产者。它们都共享相同的代码和配置,因为这些都是从服务实现中抽象出来的。
现在,有什么问题。在我们的暂存环境中,有时(假设每50条消息中就有一条消息),kafka上有一条消息,但消费者根本不处理它。即使你等了几个小时,它也会挂起来。这种情况不会发生在本地环境中,我们也无法在其他任何地方复制。
一些更相关的信息:
这些服务经常被重新启动以进行调试,但是挂起似乎与重新启动无关。
当消息挂起并重新启动服务时,服务将处理消息。
这些服务是完全无状态的,因此没有缓存或其他奇怪的事情发生(我希望如此)
当这种情况发生时,我有证据表明他们还没有处理较旧的消息(当他们生成输出时我会记录,而这种情况正好发生在使用者循环结束之前)
在当前部署中,使用者组中只有一个使用者,因此在相同的服务中没有并行处理,也没有服务的水平扩展
我如何消费:
我使用pykafka,这是消费者循环:
def create_consumer(self):
consumer = self.client.topics[bytes(self.input_topic, "UTF-8")].get_simple_consumer(
consumer_group=bytes(self.consumer_group, "UTF-8"),
auto_commit_enable=True,
offsets_commit_max_retries=self.service_config.kafka_offsets_commit_max_retries,
)
return consumer
def run(self):
consumer = self.create_consumer()
while not self.stop_event.wait(1):
message = consumer.consume()
results = self._process_message(message)
self.output_results(results)
我的假设是,要么是我使用消息的方式有问题,要么是消费者群体偏移量的状态不一致,但我不能真正地围绕这个问题来思考。
我也在考虑搬到浮士德去解决这个问题。考虑到我的代码库和架构决策,转换应该不会太难,但在开始这样的工作之前,我想确定我的方向是正确的。现在,它只是一个盲目的镜头,希望一些造成问题的细节会消失。
暂无答案!
目前还没有任何答案,快来回答吧!