我使用cppkafka库(librdkafka的 Package 器)为kafka消息编写了c++消费者。我在运行程序时遇到问题。有时在分配主题之前存在分段错误(核心转储)问题,但有时运行时没有任何错误。下面是我的main()代码。
bool running = true;
int main() {
string topic = "topic";
string stop = "STOP";
int m_count = 0;
signal(SIGINT, [](int) { running = false; });
Configuration config = {
{ "metadata.broker.list", "kafka-1:19092"},
{"group.id", topic},
{"fetch.wait.max.ms", 10},
{"enable.auto.commit", false},
{"auto.offset.reset", "latest"}
};
Consumer consumer(config);
consumer.set_assignment_callback([](const TopicPartitionList& partitions) {
cout << "Got assigned: " << partitions << endl;
});
consumer.set_revocation_callback([](const TopicPartitionList& partitions) {
cout << "Got revoked: " << partitions << endl;
});
consumer.subscribe({ topic });
cout << "Consuming messages from topic " << topic << endl;
auto ms = std::chrono::microseconds(10);
while (running) {
Message msg = consumer.poll();
if (msg) {
if (msg.get_error()) {
if (!msg.is_eof()) {
cout << "[+] Received error notification: " << msg.get_error() << endl;
}
}
else {
m_count ++;
cout << msg.get_payload()<<" OFSET: "<< msg.get_offset() <<" TIME: "<<getMs() - msg.get_timestamp().get().get_timestamp().count()<< endl;
consumer.commit(msg);
}
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!