如何将librdkafka负载转换为json来获取参数值?

qij5mzcb  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(424)

我正在处理librdkafka的consumer.c示例文件,并试图找出如何将rkm有效负载(在第244行打印出来)转换为json,以便从json中获取参数值。
现在我正在使用jansson,但是有一些问题,如果需要的话我可以扩展。
librdkafka或标准c库中是否有我不知道的特性?

relj7zay

relj7zay1#

只需将消息负载(二进制)和长度传递给jansson,如下所示:

/* For each consumed message .. */
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);

if (!rkmessage) {
   /* No message available */
   continue;

} else if (rkmessage->err) {
   /* Handle consumer event/error (typically not fatal) */
   handle_consumer_error(rkmessage->err, "%s", rd_kafka_message_errstr(rkmessage));

} else if (rkmessage->len > 0) {
   /* Parse JSON value */
   json_error_t err;
   json_t *json = json_loadb(rkm->payload, rkm->len, JSON_ALLOW_NUL, &err);
   if (!json) {
      handle_consumer_error(RD_KAFKA_RESP_ERR__BAD_MSG,
                            "Failed to parse JSON for message at %s [%"PRId32"] offset %"PRIu64: line %d: %s\n",
                            rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition),
                            rkmessage->offset, err.line, err.text);
   } else {
      /* Process the message */
      process_message(json);

      json_decref(json);
   }
}

rd_kafka_message_destroy(rkmessage);

有关更多信息,请参阅jansson文档:https://jansson.readthedocs.io/en/latest/tutorial.html

相关问题