如何使用helm kafka图表和python pod来发布大型(5兆字节)消息

yjghlzjz  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(396)

我正在运行一个poc来发送大的(5兆字节)kafka消息。
我无法发送大消息,我不知道错误是在代理配置中还是在客户端中。
我用docker桌面上的keubernetes和这个舵图:https://github.com/helm/charts/tree/master/incubator/kafka 我正在使用这个init命令:

helm install --name wielder-kafka --namespace kafka incubator/kafka 
--set configurationOverride=
"{"replica.fetch.max.bytes":15048576,"message.max.bytes":15048576}"

我不确定是否进行了配置!有没有办法查一下?
还有这个python脚本(从pod运行)。


# !/usr/bin/env python

import json
import traceback

from time import sleep

from kafka import KafkaProducer
from kafka.errors import KafkaError

a = '/storage/stam/ENERGY.json'

with open(a) as json_file:
    data = json.load(json_file)

    # print(data["1"])

    producer = KafkaProducer(
        bootstrap_servers="wielder-kafka.kafka.svc.cluster.local:9092",
        max_request_size=15048576
    )

    j = json.dumps(data)

    print(type(j))

    # en = f'fool {str("batata")}'.encode('utf-8')

    en = j.encode('utf-8')

    future = producer.send(topic="grid_1", value=en)

    try:
        record_metadata = future.get(timeout=1000)
    except KafkaError:
        # Decide what to do if produce request failed...
        print(traceback.format_exc())
        result = 'Fail'
    finally:
        producer.close()

它工作得很好,但是对于大的json文件,我得到了下面的错误。

root@pep-55c4dd9ff5-rnqjc:/storage/cluster# python3.6 json_to_kafka.py
<class 'str'>
Traceback (most recent call last):
  File "json_to_kafka.py", line 34, in <module>
    record_metadata = future.get(timeout=1000)
  File "/usr/local/lib/python3.6/site-packages/kafka/producer/future.py", line 65, in get
    raise self.exception # pylint: disable-msg=raising-bad-type
kafka.errors.MessageSizeTooLargeError: [Error 10] MessageSizeTooLargeError

root@pep-55c4dd9ff5-rnqjc:/storage/cluster#

以下是kubernetes存储信息:

➜  ~ kubectl get pv -n kafka -o wide
NAME                                       CAPACITY   ACCESS MODES           RECLAIM POLICY   STATUS   CLAIM                           STORAGECLASS   REASON   AGE
pep-pv                                     200Mi      RWO            Retain           Bound    wielder-services/pep-pvc        pep-storage             5d18h
pvc-1069afcd-0c5d-11ea-8ffb-025000000001   1Gi        RWO            Delete           Bound    kafka/datadir-wielder-kafka-0   hostpath                    5d18h
pvc-5ad5e6c5-0c5d-11ea-8ffb-025000000001   1Gi        RWO            Delete           Bound    kafka/datadir-wielder-kafka-1   hostpath                5d18h
pvc-6eaa98ae-0c5d-11ea-8ffb-025000000001   1Gi        RWO            Delete           Bound    kafka/datadir-wielder-kafka-2   hostpath                5d18h

如何发送用python生成的大型kafka消息

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题