如何在不自动提交的情况下长时间(4-60分钟)处理kafka消息,并在不重新平衡的情况下提交它

vwkv1x7d  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(393)

如何在不自动提交的情况下使用kafka消息,对其进行长时间(4-60分钟)的处理,并在不进行重新平衡的情况下进行提交,以及分区重新分配或阻止其他组使用者使用其他消息。
我正在使用Python3.8 kafka使用者,以:
一次使用一条消息,无需自动提交。
开始一个长时间运行的脚本(阅读python的标准输出)
有条件地提交消息。
我的问题是,kafka分区经常被重新平衡到另一个消费群体成员。
在浏览了大量文档之后,我尝试使用这些配置属性:
会话\u超时\u毫秒
请求\u超时\u毫秒
最大轮询间隔

from kafka import KafkaConsumer, OffsetAndMetadata, TopicPartition

def consume_one_message_at_a_time(conf):

conf.models_dir = f'{conf.project_root}/{conf.models_dir}'
group_id = conf.group_id
group_conf = conf.consumer_groups[group_id]

kafka_brokers = conf.KAFKA_BROKERS
topic = group_conf.subscribe[0].name

print(f'KAFKA_BROKERS: {kafka_brokers}\n Topic {topic}\n group id: {group_id}')

consumer = KafkaConsumer(
    topic,
    bootstrap_servers=kafka_brokers,
    group_id=group_id,
    enable_auto_commit=False,
    max_poll_records=1,
    max_poll_interval_ms=1800000,
    # session_timeout_ms=1800000,
    # request_timeout_ms=1800002,
    # connections_max_idle_ms=1800003
    # heartbeat_interval_ms=1800000,
)

print(f'bootstrap_servers: {kafka_brokers} subscribing to {topic}')
consumer.subscribe([topic])

for message in consumer:
    print(f"message is of type: {type(message)}")

    if not group_conf.use_cmd:
        do_something_time_consuming(message)
    else:
        if group_id == 'bots' and check_bot_id(message):

            bot_action(conf, group_conf, message)
        else:
            print(f'no action for group_id: {group_id}')
            print(f'key  : {message.key}')
            print(f'value: {message.value}')

    meta = consumer.partitions_for_topic(message.topic)

    partition = TopicPartition(message.topic, message.partition)
    offsets = OffsetAndMetadata(message.offset + 1, meta)
    options = {partition: offsets}

    print(f'\noptions: {options}\n')

    response = consumer.commit(offsets=options)

当其他组成员订阅或完成其作业并使用我时,会出现以下错误:

Traceback (most recent call last):
  File "./consumer_one_at_a_time.py", line 148, in <module>
    consume_one_message_at_a_time(_conf)
  File "./consumer_one_at_a_time.py", line 141, in consume_one_message_at_a_time
    response = consumer.commit(offsets=options)
  File "/usr/lib/python3.8/site-packages/kafka/consumer/group.py", line 526, in commit
    self._coordinator.commit_offsets_sync(offsets)
  File "/usr/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 518, in commit_offsets_sync
    raise future.exception # pylint: disable-msg=raising-bad-type
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.

添加这些配置后,我发现新的消费者被阻止了!i、 e.在提交消息之前,不要使用消息!

session_timeout_ms=1800000,
request_timeout_ms=1800002,
connections_max_idle_ms=1800003

# heartbeat_interval_ms=1800000,

我读到后台线程应该发送心跳信号。有没有一种不用轮询就能发送心跳的方法?

4nkexdtk

4nkexdtk1#

有没有一种不用轮询就能发送心跳的方法?
已经是这样了。从版本0.10.1.0开始,心跳信号在kafka中通过单独的线程发送(有关详细信息,请查看此项)
一般来说,在以下情况下会发生再平衡:
新消费者加入消费群体
添加新分区
完全关闭耗电元件
Kafka认为消费者已经死亡
正在过期session.timeout.ms而不发送心跳信号
过期max.poll.timeout.ms而不发送轮询请求
看来你的处境是最后一个了。您对记录进行投票,但在中不再投票 max.poll.interval.ms (在你的情况下是30分钟)因为长时间运行的过程。要解决此问题:
你可以增加 max.poll.interval.ms . 但这会导致太长时间的重新平衡。因为 rebalance.timeout = max.poll.interval.ms . 重新平衡开始后,将撤消使用者组中的所有使用者,kafka将等待所有仍在向poll()发送heartbeat的使用者(通过在该点轮询consumers send joingrouprequest),直到重新平衡超时过期(等于) max.poll.interval.ms . 假设你设定了 max.poll.interval.ms 到60分钟,你的过程需要50分钟才能完成。如果在你漫长过程的第五分钟,因为我上面提到的任何一个原因而开始重新平衡,那么Kafka将等待你的消费者投票45分钟。在此期间,所有的消费者都将被撤销(对于这个消费群体来说,消费将完全停止),所以这不是一个好主意(当然这取决于你的需要)
另一个解决方案是不使用kafka进行这种长时间运行的操作。因为kafka不适合长时间运行的处理。您可以使用kafka将有关长进程的元数据持久化为消息消费的一部分,然后在不使用kafka的情况下执行适当的操作。

相关问题