c++中使用rdkafkacpp.h的基本kafka使用者

qyuhtwio  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(658)

我对使用c++的rdkafkacpp.hkafka库相当陌生。
我参考了一些可用的在线资源,介绍了如何在windows上使用标准kafka environment setup.bat文件设置kafka consumer,这些文件提供于:https://kafka.apache.org/quickstart (我只是玩了一下,能够测试从生产者终端发送的消息是否出现在消费者终端上)
我阅读了Kafka的基本理论,并浏览了以下文档:https://docs.confluent.io/4.0.0/clients/librdkafka
基本阅读之后,我尝试使用内置库函数编写示例代码,如下所示。但是,我在试图“set the bootstrap server”属性为“conf”的代码行收到一个运行时异常。该异常是访问冲突异常。“在mykafka.exe中的0x00007ffb878ec3f9(msvcr120.dll)处引发异常:0xc000005:访问冲突读取位置0x0000008e5a900000。“
我怀疑kafka消费者使用库函数“实现”的顺序遗漏了一些步骤,或者必须重新排序。
我试图保持实现简单,只是一个生产者在我的本地机器(localhost:9092),只有这个消费者(mykafka.exe)。此外,一个主题“快速启动事件”已经在终端上启动。
非常感谢您的帮助!
p、 s:visualstudio2019用于此代码开发。


# include <iostream>

# include "..\include\librdkafka\rdkafkacpp.h"

# include <process.h> // to use exit()

using namespace std;
using namespace RdKafka;

int configAsKafkaConsumer()
{
    string host = "localhost:9092";
    string errstr;

    /* Set properties */
    cout << "Inside configAsKafkaConsumer()" << endl;

    // Create configuration objects
    RdKafka::Conf* conf = NULL;
    conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

    if (conf != NULL)
    {
        cout << "conf != NULL" << endl;
    }

    // THIS IS WHERE I'M GETTING THE RUNTIME EXCEPTION!!
    if (conf->set("bootstrap.servers", "localhost:9092", errstr) != RdKafka::Conf::CONF_OK)
    {
        cerr << "Failed to set config of broker: " << errstr << endl;
        exit(1);
    }

    if (conf->set("client.id", host, errstr) != RdKafka::Conf::CONF_OK) 
    {
        cout << "client.id:" << endl;
        exit(1);
    }

    if (conf->set("group.id", "foo", errstr) != RdKafka::Conf::CONF_OK)
    {
        cout << "group.id:" << endl;
        exit(1);
    }

    // Create a consumer handle
    Consumer* ConsumerHandle = RdKafka::Consumer::create(conf, errstr);

    string errstr;
    string prodTopic = "quickstart-events"; 

    cout << "Creating topic handle" << endl;

    RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    // Create topic handle.
    RdKafka::Topic* topic = RdKafka::Topic::create(ConsumerHandle, prodTopic,
        tconf, errstr);

    if (!topic) 
    {
        std::cerr << "Failed to create topic: " << errstr << std::endl;
        exit(1);
    }

    cout << "Starting the consumer handle" << endl;
    ConsumerHandle->start(topic, 1, 0);

    cout << "Consuming the message" << endl;
    Message* msg = ConsumerHandle->consume(topic, 1, 10000);

    cout << "Message is: " << msg->payload() << endl;

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题