我正在运行一个poc来发送大的(5兆字节)kafka消息。
我无法发送大消息,我不知道错误是在代理配置中还是在客户端中。
我用docker桌面上的keubernetes和这个舵图:https://github.com/helm/charts/tree/master/incubator/kafka 我正在使用这个init命令:
helm install --name wielder-kafka --namespace kafka incubator/kafka
--set configurationOverride=
"{"replica.fetch.max.bytes":15048576,"message.max.bytes":15048576}"
我不确定是否进行了配置!有没有办法查一下?
还有这个python脚本(从pod运行)。
# !/usr/bin/env python
import json
import traceback
from time import sleep
from kafka import KafkaProducer
from kafka.errors import KafkaError
a = '/storage/stam/ENERGY.json'
with open(a) as json_file:
data = json.load(json_file)
# print(data["1"])
producer = KafkaProducer(
bootstrap_servers="wielder-kafka.kafka.svc.cluster.local:9092",
max_request_size=15048576
)
j = json.dumps(data)
print(type(j))
# en = f'fool {str("batata")}'.encode('utf-8')
en = j.encode('utf-8')
future = producer.send(topic="grid_1", value=en)
try:
record_metadata = future.get(timeout=1000)
except KafkaError:
# Decide what to do if produce request failed...
print(traceback.format_exc())
result = 'Fail'
finally:
producer.close()
它工作得很好,但是对于大的json文件,我得到了下面的错误。
root@pep-55c4dd9ff5-rnqjc:/storage/cluster# python3.6 json_to_kafka.py
<class 'str'>
Traceback (most recent call last):
File "json_to_kafka.py", line 34, in <module>
record_metadata = future.get(timeout=1000)
File "/usr/local/lib/python3.6/site-packages/kafka/producer/future.py", line 65, in get
raise self.exception # pylint: disable-msg=raising-bad-type
kafka.errors.MessageSizeTooLargeError: [Error 10] MessageSizeTooLargeError
root@pep-55c4dd9ff5-rnqjc:/storage/cluster#
以下是kubernetes存储信息:
➜ ~ kubectl get pv -n kafka -o wide
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
pep-pv 200Mi RWO Retain Bound wielder-services/pep-pvc pep-storage 5d18h
pvc-1069afcd-0c5d-11ea-8ffb-025000000001 1Gi RWO Delete Bound kafka/datadir-wielder-kafka-0 hostpath 5d18h
pvc-5ad5e6c5-0c5d-11ea-8ffb-025000000001 1Gi RWO Delete Bound kafka/datadir-wielder-kafka-1 hostpath 5d18h
pvc-6eaa98ae-0c5d-11ea-8ffb-025000000001 1Gi RWO Delete Bound kafka/datadir-wielder-kafka-2 hostpath 5d18h
如何发送用python生成的大型kafka消息
暂无答案!
目前还没有任何答案,快来回答吧!