Kafka消费者“订阅”功能的目的是什么?

hi3rlvi2  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(134)

我正在编写一些测试代码来试验Kafka Consumer subscription()函数。
我不明白这个功能的目的是什么。
这里有一个指向JavaDoc参考的链接,它没有提供太多有用的信息。
虽然我使用Rust-rdkafka在Rust中编写代码,但Java的文档似乎是相同的,当然还有底层的C++ rdkafka实现。
我调用的函数是这样的:

let subscription = consumer.subscription().unwrap();

它返回以下数据:

element.topic: "kafka_test"
element.partition: -1
element.offset: Invalid
element.metadata: ""
element.topic: "kafka_test_1.json"
element.partition: -1
element.offset: Invalid
element.metadata: ""

我不明白的是为什么它不返回偏移量,元数据或分区。我试着调用poll()几次来获取记录,结果还是一样。我也尝试过提交当前偏移量,但输出仍然相同。
Poll返回数据--所以主题确实被创建了,并且它们确实有数据。
这里有一个小的测试代码(不完全完整)。

fn main() {

    // This is actually different each time the program is run
    let group_id = "test_group_id" + run_counter;

    let mut client_config = ClientConfig::new();
    let client_config = client_config
        .set("bootstrap.servers", ...)
        .set("security.protocol", ...)
        .set("sasl.mechanisms", ...)
        .set("sasl.username", ...)
        .set("sasl.password", ...)
        .set("group.id", group_id)
        .set("enable.auto.commit", "false")
        .set("auto.offset.reset", "earliest")
        .set_log_level(rdkafka::config::RDKafkaLogLevel::Debug);

    let consumer: BaseConsumer = client_config
        .create()
        .expect("invalid producer config");
    
    consumer.subscribe(&topic_names).expect("consumer subscribe failed");

    test_function(&consumer, std::time::Duration::from_secs(30));
}

pub fn test_function(
    consumer: &BaseConsumer,
    poll_duration: Option<Duration>
)
{
    
    let something = consumer.poll(poll_duration);
    log::info!("something={:?}", something); // gets a valid record... (not "None")

    // Test subscription()
    let subscription = consumer.subscription().unwrap();

    // Print elements of topic map
    // As above, offset is invalid, partition is invalid ... why?
    let elements = subscription.elements();
    for element in elements.into_iter() {

        let topic = element.topic();
        let partition = element.partition();
        let offset = element.offset();
        let metadata = element.metadata();

        log::info!("element.topic: {:?}", topic);
        log::info!("element.partition: {:?}", partition);
        log::info!("element.offset: {:?}", offset);
        log::info!("element.metadata: {:?}", metadata);
    }
}
7xllpg7q

7xllpg7q1#

它并不意味着获取主题元数据,而只是获取分配给客户机的主题的名称(您可以看到Javadoc返回字符串集合,而不是主题对象)。
如果在subscribe函数中定义了一个字符串或静态列表,那么它就不是很有用,但是如果使用正则表达式模式,它可以帮助验证您是否阅读了正确的数据。
您需要使用something变量来获取这些值。
有一些示例代码可以打印这些值。
您也可以使用StreamConsumer而不是较低级别的基本值

相关问题