当我向消费者查询分配的主题分区列表时,结果中的所有分区的偏移量都是-1001。如果我打印出收到的消息的偏移量,偏移量被设置为正确的值。
这是我用来使用消息的代码:
static void print_partition_list(FILE* fp,
const rd_kafka_topic_partition_list_t
* partitions) {
int i;
for (i = 0; i < partitions->cnt; i++) {
fprintf(fp, "%s %s [%d] offset %lld",
i > 0 ? "," : "",
partitions->elems[i].topic,
partitions->elems[i].partition,
partitions->elems[i].offset);
}
fprintf(fp, "\n");
}
static void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque) {
fprintf(stderr, "%% Consumer group rebalanced: ");
switch (err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
fprintf(stderr, "assigned:\n");
print_partition_list(stderr, partitions);
rd_kafka_assign(rk, partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
fprintf(stderr, "revoked:\n");
print_partition_list(stderr, partitions);
rd_kafka_assign(rk, NULL);
break;
default:
fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err));
rd_kafka_assign(rk, NULL);
break;
}
}
int main()
{
rd_kafka_t* rk;
rd_kafka_conf_t* conf;
rd_kafka_resp_err_t err;
char errstr[512];
const char* brokers{ "localhost:9092" };
const char* groupid{ "OffsetTest" };
const char* topics[] = { "OffsetTesting" };
rd_kafka_topic_partition_list_t* subscription;
conf = rd_kafka_conf_new();
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "group.id", groupid,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "enable.auto.commit", "false",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "offset.store.method", "broker",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
return 1;
}
conf = NULL;
rd_kafka_poll_set_consumer(rk);
subscription = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(subscription, topics[0], RD_KAFKA_PARTITION_UA);
err = rd_kafka_subscribe(rk, subscription);
if (err) {
fprintf(stderr,
"%% Failed to subscribe to %d topics: %s\n",
subscription->cnt, rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_destroy(rk);
return 1;
}
fprintf(stderr,
"%% Subscribed to %d topic(s), "
"waiting for rebalance and messages...\n",
subscription->cnt);
rd_kafka_topic_partition_list_destroy(subscription);
int runningCounter = 0;
while (runningCounter != 10) {
rd_kafka_message_t* rkm;
rkm = rd_kafka_consumer_poll(rk, 100);
if (!rkm) {
Sleep(2000);
runningCounter++;
continue;
}
if (rkm->err) {
fprintf(stderr,
"%% Consumer error: %s\n",
rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
continue;
}
rd_kafka_topic_partition_list_t* list;
err = rd_kafka_assignment(rk, &list);
if (err) {
fprintf(stderr,
"%% Failed to subscribe to %d topics: %s\n",
subscription->cnt, rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(subscription);
return 1;
}
print_partition_list(stderr, list);
rd_kafka_topic_partition_list_destroy(list);
printf("Message on %s [%d] at offset %lld:\n",
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset);
if (rkm->key)
printf(" Key: %.*s\n",
(int)rkm->key_len, (const char*)rkm->key);
else if (rkm->key)
printf(" Key: (%d bytes)\n", (int)rkm->key_len);
if (rkm->payload)
printf(" Value: %.*s\n",
(int)rkm->len, (const char*)rkm->payload);
else if (rkm->key)
printf(" Value: (%d bytes)\n", (int)rkm->len);
rd_kafka_commit_message(rk, rkm, 0);
rd_kafka_message_destroy(rkm);
runningCounter++;
}
fprintf(stderr, "%% Closing consumer\n");
rd_kafka_consumer_close(rk);
rd_kafka_destroy(rk);
return 0;
}
我知道这里有一个类似问题的答案,librdkafka:committed\u offset总是-1001,但这没有帮助。我将主题分区列表分配给 rebalance_cb
.
更新:
这是示例2消息的输出:
> %4|1580198390.566|CONFWARN|rdkafka#consumer-1| [thrd:app]: Configuration property offset.store.method is deprecated: Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker).
> % Subscribed to 1 topic(s), waiting for rebalance and messages...
> % Consumer group rebalanced: assigned:
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001,
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001,
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
>
> Message on NewTestingTopic [0] at offset 25:
> Key: 0
> Value: ExampleMessage 0
>
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001,
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
>
> Message on NewTestingTopic [3] at offset 41:
> Key: 1
> Value: ExampleMessage 1
1条答案
按热度按时间c6ubokkw1#
我相信这可能是故意的。
这个
rd_kafka_assignment()
方法返回通过rd_kafka_assign()
. 当消费者被分配到一个组中的分区时,分配只是一个分区列表,没有偏移量。同样在java库中,
assignment()
退货Set<TopicPartition>
,这里也没有偏移。在利伯Kafka,rd_kafka_assignment()
给一个rd_kafka_topic_partition_list_t
,类似于Set<TopicPartition>
. 主要区别在于它重用了rd_kafka_topic_partition_t
有几个额外字段的类型,如offset
.这个
rd_kafka_topic_partition_t
类型在很多地方都有使用,它的所有字段在所有上下文中都没有意义。赋值上下文就是这种情况,因此有些字段被设置为“空”值,这是-1001
对于偏移量。如果要获取赋值的当前偏移量,则需要使用
rd_kafka_position()
. 同样,在java中,您可以使用position()
.