问题:打电话时 commit()
,如果消息执行需要很长时间,我将收到错误消息。
以下是我使用的代码:
consumer = KafkaConsumer(topic,
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
bootstrap_servers=kafka_server_url,
api_version=(0, 10, 1),
auto_offset_reset='earliest',
enable_auto_commit= False,
#consumer_timeout_ms=3000,
session_timeout_ms=250000, #The timeout used to detect failures when using Kafka group management facilities.
request_timeout_ms=300000,
heartbeat_interval_ms=80000,
#max_poll_records=100,#The maximum number of records returned in a single call to poll(). Default: 500
enable_auto_commit=False,
retry_backoff_ms = 1000
)
errored = False
while errored is False:
logging.warn("Polling new messages from queue ......")
messages = []
crs = [] # Store all consumer records
try:
tpd = (consumer.poll(max_records=1))
[crs.extend(tp) for tp in tpd.values()] # List of cr's
[messages.extend([cr.value]) for cr in crs]
if(len(messages)):
consumer.commit()
logging.warn("{} messages available in queue".format(len(messages)))
for msg in messages:
if(self.__validators.is_valid_message(msg)):
self.process_message(msg);
else:
continue
# If auto_commit_enable is False, remember to commit() periodically
logging.warn("All messages processed. Commiting kafka consumer")
except Exception as e:
message = "Error occured while processing message Exception {}".format(e)
logging.critical(message,exc_info=True)
errored = True
break
logging.info("Sleeping for 5 Seconds")
time.sleep(3)
def process_message(self,message):
print("Processed")
以下是错误跟踪:
回溯(最后一次调用):文件“message\u consumer.py”,第413行,在consumer\u messages consumer.commit()文件“/usr/lib/python3.6/site packages/kafka/consumer/group.py”,第472行,在commit self.\u coordinator.commit\u offsets\u sync(offsets)文件“/usr/lib/python3.6/site packages/kafka/coordinator/consumer.py”,第398行,在提交中#偏移量#sync raise future.exception#pylint:disable msg=raising bad type kafka.errors.commitfailedrorr:commitfailedrorr:commit无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间间隔比配置的session.timeout.ms长,这通常意味着poll循环花费了太多的时间来处理消息。您可以通过增加会话超时或使用max.poll.records减少poll()中返回的最大批大小来解决此问题。
我的消息未提交,正在从kafka队列重新处理。我该怎么解决这个问题?请帮忙。
任何帮助都将不胜感激。提前谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!