我正在处理librdkafka的consumer.c示例文件,并试图找出如何将rkm有效负载(在第244行打印出来)转换为json,以便从json中获取参数值。现在我正在使用jansson,但是有一些问题,如果需要的话我可以扩展。librdkafka或标准c库中是否有我不知道的特性?
relj7zay1#
只需将消息负载(二进制)和长度传递给jansson,如下所示:
/* For each consumed message .. */ rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000); if (!rkmessage) { /* No message available */ continue; } else if (rkmessage->err) { /* Handle consumer event/error (typically not fatal) */ handle_consumer_error(rkmessage->err, "%s", rd_kafka_message_errstr(rkmessage)); } else if (rkmessage->len > 0) { /* Parse JSON value */ json_error_t err; json_t *json = json_loadb(rkm->payload, rkm->len, JSON_ALLOW_NUL, &err); if (!json) { handle_consumer_error(RD_KAFKA_RESP_ERR__BAD_MSG, "Failed to parse JSON for message at %s [%"PRId32"] offset %"PRIu64: line %d: %s\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition), rkmessage->offset, err.line, err.text); } else { /* Process the message */ process_message(json); json_decref(json); } } rd_kafka_message_destroy(rkmessage);
有关更多信息,请参阅jansson文档:https://jansson.readthedocs.io/en/latest/tutorial.html
1条答案
按热度按时间relj7zay1#
只需将消息负载(二进制)和长度传递给jansson,如下所示:
有关更多信息,请参阅jansson文档:https://jansson.readthedocs.io/en/latest/tutorial.html