kafka生产者和消费者

0aydgbwb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(517)

我有一个cloudera集群,在3台不同的机器上有3个代理。我是从集群中的第四个发展而来的。
我创建了如下主题:create topic/usr/bin/kafka topics--zookeeperhost:2181,host2:2181,hosts3:2181/kafka--创建--分区10--复制因子2--主题主题名
我在zookeeper中的根目录不是根目录,而是/kafka
这是我的制作人代码:

class Kafkaproducer(object):
    def __init__(self,**kwargs):
        if kwargs:
            try:
                self.producer = KafkaProducer(**kwargs)
            except Exception as ex:
                print "unable to create Producer Object " + str(ex)
            self.iw = Imageworker()
            log = Logger()
            self.logs = log.logger('Producer')

    def set_topic(self, topic):
        """
        Set Topic for Producer
        :param self:
        :param topic: Topic String for Kafka
        :return: no value
        """
        self.topic = topic
        print self.producer.partitions_for(topic )

    def send_message(self, file):
        """
        send a single message to kafka broker
        :param self:
        :param file: absolute filepath from file to send to broker
        :return: no value
        """
        print self.topic
        try:
            print "create json message .. "
            message = self.iw.read_image_file(file)
        except Exception as ex:
            print "unable to read file" + str(ex)
        try:
            print "send message"+ self.iw.get_imagename(file)
            self.producer.send(self.topic, message)
        except Exception as Ex:
            print "unable to send kafka message " + str(ex)

    def _handle_fetch_response(self):
        print "error"

    def send_message_synchron(self, file ):
        """

        :param data:
        :return:
        """
        try:
            print "create json message .. "
            message = self.iw.read_image_file(file)

        except Exception as ex:
            print "unable to read file" + str(ex)
        try:
            #print "send message "+ self.iw.get_imagename(file)
            future = self.producer.send(self.topic, message)
            future.error_on_callbacks=True
            #result = future.get(timeout=1000)
            result = future.succeeded()

            print future.is_done
            if result:
                print future.value
                print result
                print "success!!!"
                meta = future.get(timeout=100)
        except Exception as ex:
            print "unable to send kafka message " + str(ex)
        try:
            if future.is_done:
                print "Message send successful "
        except KafkaError:
            log.exception()
            print "Error in Kafka"
            pass

    def flush_producer(self):
        self.producer.flush()

我可以用send\ messages函数异步发送消息。我还从使用的主题中获取分区数。问题是,信息消失了。
我已经用我的python消费者和下面的语句检查了两次:
/opt/cloudera/parcels/kafka-2.2.0-1.2.2.0.p0.68/lib/kafka/bin/kafka-run-class.sh kafka.tools.getoffsetshell--代理列表myhosts--主题名称
此外,我想用我的同步功能发送消息,以获得未来的结果。在这里,我无法得到一个未来的结果。行result=future.get(timeout=1000)失败。
希望有人对这种情况有想法。谢谢,
jö注册护士

k7fdbhmy

k7fdbhmy1#

发现了问题,但不知道如何解决。我从属性文件中读取生产者配置

bootstrap_servers=['h1:9092' ,'h2:9092','h3:9092']
api_version=(0,10)
value_serializer=str.encode
buffer_memory=200000000
retries=5
max_block_ms=10000

producer = Kafkaproducer(**dic)  # do not work
roducer = Kafkaproducer(bootstrap_servers=['h1:9092' ,'h2:9092','h3:9092'],api_version=(0,10)...   # works well

在消费者网站上,我可以使用consumer=kafkaconsumer(**dic)
在修复了生产者调用之后,同步错误战也消失了。但是为什么我不能用字典给制作人打电话呢?
-->{'retries':5,'max_block_ms':10000,'buffer_memory':200000000,'bootstrap_servers':['h1:9092','h2:9092','h3:9092'],'value_serializer':'str.encode','api_version':(0,10)}
谢谢您

相关问题