kafka消费客户机不返回消息

uubf1zoe  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(470)

python客户机作为独立运行时工作正常,但作为具有相同配置的多处理辅助进程运行时无法检索消息。
客户机总是打印消息块中的消息,消息块中没有消息。非常感谢您对诊断这个问题的帮助。
工人基本上看起来像:

from multiprocessing import Process
...
class saListener(Process):
     def   __init__(self, n)
            self.ClientName = "saListener-" + str(n)
            ...
            schema_registry_client = SchemaRegistryClient(schema_registry_conf)
            value_avro_deserializer = AvroDeserializer(ccloud_lib.value_schema, schema_registry_client)
            conf["value.deserializer"] = value_avro_deserializer
            self.cons = DeserializingConsumer(conf)
            Process.__init__(self)

    def connect(self):
            self.cons.subscribe([self.topic])

    def run(self):
            while True:
                            msg = self.cons.poll(5.0)
                            if msg is None:
                                    print(self.ClientName + ":Waiting for message or event/error in poll()")

控制器看起来像:

for n in range(instances):
                lnr_instance = saListener(n)
                lnr_instance.connect()
                lnr_instance.start()

客户端配置:

"bootstrap.servers" : "srv1:909,srv2:909",
   "group.id" : "ainvil9_intraday_group",
   "debug" : "all",
   "max.poll.interval.ms" : "30000",
   "enable.auto.commit" : "true",
   "fetch.wait.max.ms" : "1000",
   "session.timeout.ms" : "10000",
   "auto.commit.interval.ms" : "500",
   "sasl.mechanism" : "GSSAPI",
   "security.protocol" : "SASL_PLAINTEXT",
   "sasl.kerberos.service.name" : "kafka",
   "ssl.ca.location" : "security/ca.cert.pem",
   "sasl.kerberos.kinit.cmd" : "kinit -R -p -kt security/kafka_ist_producer.keytab kafka_ist_producer@DMS",
   "sasl.kerberos.keytab" : "security/kafka_ist_producer.keytab",
   "sasl.kerberos.principal" : "kafka_ist_producer@DMS"

看起来,作为多处理工人的消费者无法获得补偿:

%7|1596766163.070|HEARTBEAT|rdkafka#consumer-2| [thrd:main]: GroupCoordinator/9: Heartbeat for group "ainvil9_intraday_group" generation id 1
 %7|1596766163.070|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
 %7|1596766163.070|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
 %7|1596766163.070|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/9: Sent HeartbeatRequest (v3, 97 bytes @ 0, CorrId 12)
 %7|1596766163.070|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/9: Sent HeartbeatRequest (v3, 97 bytes @ 0, CorrId 12)
 %7|1596766163.073|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/9: Received HeartbeatResponse (v3, 6 bytes, CorrId 12, rtt 3.03ms)
 %7|1596766163.073|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/9: Received HeartbeatResponse (v3, 6 bytes, CorrId 12, rtt 3.08ms)
 %7|1596766163.181|COMMIT|rdkafka#consumer-3| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
 %7|1596766163.181|UNASSIGN|rdkafka#consumer-3| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
 %7|1596766163.573|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
 %7|1596766163.573|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
 %7|1596766163.573|COMMIT|rdkafka#consumer-4| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
 %7|1596766163.573|UNASSIGN|rdkafka#consumer-4| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
 %7|1596766163.573|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
 %7|1596766163.573|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
 %7|1596766163.682|COMMIT|rdkafka#consumer-3| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题