由于这样或那样的原因,我们最近使用不同于最初编写的库重新编写了消费者和生产者。不过,我在换岗后遇到了一些问题。
使用Kafka0.9.0.2:
8个分区
我们从一个主题消费,处理消息,然后推送到另一个主题。
由于大量处理导致在提交偏移量之前会话超时,因此更新了以下配置选项:
消费者: session.timeout.ms
:1米40秒
经纪人: group.max.session.timeout.ms
:2m和 group.min.session.timeout.ms
:6秒
我遇到的问题是,我的消费者每启动几次,它就会在试图获取消息时挂起。
没有错误或异常,它最终不会超时,它只是静止不动。考虑到我的过程,实现消费者重启频繁,这是一个突破性的问题,我的想法。我不知道这是一个配置更新,需要作出,或者如果我没有妥善处理关机,这是造成某种超时,在经纪上超过。
consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.host,
group_id=self.group,
enable_auto_commit=False,
max_partition_fetch_bytes=100*1048576,
auto_offset_reset='earliest',
session_timeout_ms=100000)
print 'Consumer created'
for message in consumer:
print message.offset
print message.value
我给Kafka加了10万条留言。
我打开此服务并允许消费约1000条消息。
我重新启动这个过程来模拟正在发生的事情。
进程挂起,输出: Consumer created
有没有人看到他们的消费者在创业时犹豫不决?
Kafka的日志里有什么我该找的吗?
我也注意到分区的消耗越来越慢。
暂无答案!
目前还没有任何答案,快来回答吧!