如何在librdkafka中使用kafka的producerapi和windows上的c++代码

zxlwwiss  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(444)

我想写一个客户作为制片人。我按照示例创建了一个新的win32控制台项目。我发现api不适合我,除非我在程序末尾添加getline()函数。
如果删除getline(),product(..)方法仍然返回success的结果。但是,我在kafka控制台的命令窗口中看不到任何响应
我有点糊涂了。是这样吗?不使用getline()如何发送消息?有人知道吗?
我找到了为什么它不起作用。删除producer对象似乎太快,导致producer无法向代理发送消息。
当我在produce方法和delete producer对象之间添加sleep 1000时,producer可以正确地发送消息。
所以,问题是如何立即发送消息。在销毁producer对象之前,如何确保这些消息已完全发送?
如何解决这个问题,实际上我不喜欢在源代码中添加sleep()。
win10+vs2015+kafka_2.10-0.9.0.1+zookeeper-3.4.6+librdkafka请检查以下代码

// kafka_test_win32_nomfc.cpp 
//

# include "stdafx.h"

# include <iostream>

# include "librdkafka/rdkafkacpp.h"

int static producer_1()
{
    std::string brokers = "127.0.0.1";
    std::string errstr;
    std::string topic_str = "linli";
    std::string mode;
    std::string debug;
    int32_t partition = RdKafka::Topic::PARTITION_UA;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump = false;
    int opt;
    // MyHashPartitionerCb hash_partitioner;
    int use_ccb = 0;

    /*
    * Create configuration objects
    */
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    conf->set("metadata.broker.list", brokers, errstr);

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }

    std::cout << "% Created producer " << producer->name() << std::endl;

    /*
    * Create topic handle.
    */
    RdKafka::Topic *topic = NULL;
    if (!topic_str.empty()) {
        topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
        if (!topic) {
            std::cerr << "Failed to create topic: " << errstr << std::endl;
            exit(1);
        }
    }

    RdKafka::ErrorCode resp = producer->produce(topic, partition,
        RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
        const_cast<char *>("hello worlf"), 11,
        NULL, NULL);

    delete topic;
    delete producer;
    return 0;
}

int static producer_2()
{
    std::string brokers = "127.0.0.1";
    std::string errstr;
    std::string topic_str = "linli";
    std::string mode;
    std::string debug;
    int32_t partition = RdKafka::Topic::PARTITION_UA;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump = false;
    int opt;
    // MyHashPartitionerCb hash_partitioner;
    int use_ccb = 0;

    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    conf->set("metadata.broker.list", brokers, errstr);

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }

    std::cout << "% Created producer " << producer->name() << std::endl;

    RdKafka::ErrorCode resp = producer->produce(topic_str, partition,
        RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
        (void *)"hi", 2,
        NULL, 0, 0, NULL);

    std::string errs(RdKafka::err2str(resp));
    std::cout << errs << std::endl;
    //producer->poll(0);

    delete producer;

    return 0;
}

int main()
{

    producer_2();

    return 0;
}
yiytaume

yiytaume1#

librdkafka product()api(c和c++)是异步的,您的消息最初只在内部生产者队列中排队,稍后才排队(请参阅 queue.buffering.max.ms configuration属性(默认值为1秒)可以与其他消息组合到一个消息批(messageset)中,并从后台线程发送到代理。
您的程序调用 produce() 然后快速退出,早在后台生产者线程有机会向代理发送消息之前,更不用说从代理接收确认了。
要确保所有未完成的邮件都已发送,请致电 flush() ,然后终止应用程序。
如果你的申请是长期的,你应该打电话 poll() 定期为您注册的任何交货报告回拨提供服务。

相关问题