commitfailederror

s5a0g9ez  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(180)

问题:打电话时 commit() ,如果消息执行需要很长时间,我将收到错误消息。
以下是我使用的代码:

consumer = KafkaConsumer(topic,
      group_id='my-group',
      value_deserializer=lambda m: json.loads(m.decode('utf-8')),
      bootstrap_servers=kafka_server_url,
      api_version=(0, 10, 1),
      auto_offset_reset='earliest',
      enable_auto_commit= False,
      #consumer_timeout_ms=3000,
      session_timeout_ms=250000, #The timeout used to detect failures when using Kafka group management facilities. 
      request_timeout_ms=300000,
      heartbeat_interval_ms=80000,
      #max_poll_records=100,#The maximum number of records returned in a single call to poll(). Default: 500
      enable_auto_commit=False,
      retry_backoff_ms = 1000
      )
errored = False
while errored is False:
  logging.warn("Polling new messages from queue ......")
  messages = []
  crs = [] # Store all consumer records
  try:
    tpd = (consumer.poll(max_records=1))
    [crs.extend(tp) for tp in tpd.values()] # List of cr's
    [messages.extend([cr.value]) for cr in crs] 
    if(len(messages)):
      consumer.commit()
      logging.warn("{} messages available in queue".format(len(messages)))
      for msg in messages:
        if(self.__validators.is_valid_message(msg)):
          self.process_message(msg);
        else:
          continue
      # If auto_commit_enable is False, remember to commit() periodically    
      logging.warn("All messages processed. Commiting kafka consumer")

  except Exception as e:
    message = "Error occured while processing message Exception {}".format(e)
    logging.critical(message,exc_info=True)
    errored = True
    break
  logging.info("Sleeping for 5 Seconds")
  time.sleep(3)

def process_message(self,message):
 print("Processed")

以下是错误跟踪:
回溯(最后一次调用):文件“message\u consumer.py”,第413行,在consumer\u messages consumer.commit()文件“/usr/lib/python3.6/site packages/kafka/consumer/group.py”,第472行,在commit self.\u coordinator.commit\u offsets\u sync(offsets)文件“/usr/lib/python3.6/site packages/kafka/coordinator/consumer.py”,第398行,在提交中#偏移量#sync raise future.exception#pylint:disable msg=raising bad type kafka.errors.commitfailedrorr:commitfailedrorr:commit无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间间隔比配置的session.timeout.ms长,这通常意味着poll循环花费了太多的时间来处理消息。您可以通过增加会话超时或使用max.poll.records减少poll()中返回的最大批大小来解决此问题。
我的消息未提交,正在从kafka队列重新处理。我该怎么解决这个问题?请帮忙。
任何帮助都将不胜感激。提前谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题