如何在Kafka消费者中移动到特定的偏移量而不会遇到ValueError?

wvyml7n5  于 2023-06-21  发布在  Apache
关注(0)|答案(1)|浏览(116)

我使用的是python 3.9.16和kafka-python 2.0.2版本。我的MacBook Pro iOS 11.6.5。
刚接触Kafka,现在只是在玩它。我不确定问题是什么,也不确定为什么我的解决方法有效。
我试图做的是寻找主题的特定偏移量,但我经常遇到ValueError。
这是我的代码。

from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])

#import pdb
#pdb.set_trace()

myTP = TopicPartition('my-topic', 0)

consumer.assign([myTP])
print ("this is the consumer assignment: {}".format(consumer.assignment()))
#print ("not sure why this will work but printing position: {} ".format(consumer.position(myTP)))
consumer.seek(myTP, 22) 
#print ("not sure why this will work but printing position: {} ".format(consumer.position(myTP)))

for blah in consumer:
    print ("{}, {}".format(blah.offset, blah.value))

所以大多数时候,当我运行它时,我会得到这个ValueError。有时候,它会神秘地工作,没有我的变通办法,但我不知道为什么。

this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
Traceback (most recent call last):
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
    (value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes
...
...
...
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes

我发现的解决方法是,如果我在seek命令之前和之后打印位置,它似乎一直都能工作,但我不知道为什么。有人能给我解释一下吗我是否需要建立一些短暂的延迟来使此工作?打印我在Consumer中的位置是否会重置Consumer中的某些内容,从而使其工作?

$ python tkCons.py 
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
not sure why this will work but printing position: 34 
not sure why this will work but printing position: 22 
22, b'{"number": 8}'
23, b'{"number": 9}'
24, b'{"number": 0}'
25, b'{"number": 1}'
26, b'{"number": 2}'
27, b'{"number": 3}'
28, b'{"number": 4}'
29, b'{"number": 5}'
30, b'{"number": 6}'
31, b'{"number": 7}'
32, b'{"number": 8}'
33, b'{"number": 9}'

编辑:完整的traceback在这里:

$ python tkCons.py
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
Traceback (most recent call last):
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
    (value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/my_secret_username/kafka/tkCons.py", line 34, in <module>
    for blah in consumer:
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1193, in __next__
    return self.next_v2()
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1201, in next_v2
    return next(self._iterator)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
    record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 655, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 702, in _poll_once
    self._client.poll(timeout_ms=timeout_ms)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py", line 602, in poll
    self._poll(timeout / 1000)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py", line 687, in _poll
    self._pending_completion.extend(conn.recv())
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py", line 1053, in recv
    responses = self._recv()
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py", line 1127, in _recv
    return self._protocol.receive_bytes(recvd_data)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py", line 132, in receive_bytes
    resp = self._process_response(self._rbuffer)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py", line 138, in _process_response
    recv_correlation_id = Int32.decode(read_buffer)
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 64, in decode
    return _unpack(cls._unpack, data.read(4))
  File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 23, in _unpack
    raise ValueError("Error encountered when attempting to convert value: "
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes
t98cgbkg

t98cgbkg1#

我建议升级到最新的Python版本,然后再试一次。
错误来自内部字节解包函数。

相关问题