我使用的是python 3.9.16和kafka-python 2.0.2版本。我的MacBook Pro iOS 11.6.5。
刚接触Kafka,现在只是在玩它。我不确定问题是什么,也不确定为什么我的解决方法有效。
我试图做的是寻找主题的特定偏移量,但我经常遇到ValueError。
这是我的代码。
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])
#import pdb
#pdb.set_trace()
myTP = TopicPartition('my-topic', 0)
consumer.assign([myTP])
print ("this is the consumer assignment: {}".format(consumer.assignment()))
#print ("not sure why this will work but printing position: {} ".format(consumer.position(myTP)))
consumer.seek(myTP, 22)
#print ("not sure why this will work but printing position: {} ".format(consumer.position(myTP)))
for blah in consumer:
print ("{}, {}".format(blah.offset, blah.value))
所以大多数时候,当我运行它时,我会得到这个ValueError。有时候,它会神秘地工作,没有我的变通办法,但我不知道为什么。
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
Traceback (most recent call last):
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
(value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes
...
...
...
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes
我发现的解决方法是,如果我在seek命令之前和之后打印位置,它似乎一直都能工作,但我不知道为什么。有人能给我解释一下吗我是否需要建立一些短暂的延迟来使此工作?打印我在Consumer中的位置是否会重置Consumer中的某些内容,从而使其工作?
$ python tkCons.py
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
not sure why this will work but printing position: 34
not sure why this will work but printing position: 22
22, b'{"number": 8}'
23, b'{"number": 9}'
24, b'{"number": 0}'
25, b'{"number": 1}'
26, b'{"number": 2}'
27, b'{"number": 3}'
28, b'{"number": 4}'
29, b'{"number": 5}'
30, b'{"number": 6}'
31, b'{"number": 7}'
32, b'{"number": 8}'
33, b'{"number": 9}'
编辑:完整的traceback在这里:
$ python tkCons.py
this is the consumer assignment: {TopicPartition(topic='my-topic', partition=0)}
Traceback (most recent call last):
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 20, in _unpack
(value,) = f(data)
struct.error: unpack requires a buffer of 4 bytes
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/my_secret_username/kafka/tkCons.py", line 34, in <module>
for blah in consumer:
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1193, in __next__
return self.next_v2()
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/consumer/group.py", line 702, in _poll_once
self._client.poll(timeout_ms=timeout_ms)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py", line 602, in poll
self._poll(timeout / 1000)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/client_async.py", line 687, in _poll
self._pending_completion.extend(conn.recv())
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py", line 1053, in recv
responses = self._recv()
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/conn.py", line 1127, in _recv
return self._protocol.receive_bytes(recvd_data)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py", line 132, in receive_bytes
resp = self._process_response(self._rbuffer)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/parser.py", line 138, in _process_response
recv_correlation_id = Int32.decode(read_buffer)
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 64, in decode
return _unpack(cls._unpack, data.read(4))
File "/Users/my_secret_username/venvs/kafka/lib/python3.9/site-packages/kafka/protocol/types.py", line 23, in _unpack
raise ValueError("Error encountered when attempting to convert value: "
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10539a930>', hit error: unpack requires a buffer of 4 bytes
1条答案
按热度按时间t98cgbkg1#
我建议升级到最新的Python版本,然后再试一次。
错误来自内部字节解包函数。