我正在编写一些测试代码来试验Kafka Consumer subscription()
函数。
我不明白这个功能的目的是什么。
这里有一个指向JavaDoc参考的链接,它没有提供太多有用的信息。
虽然我使用Rust-rdkafka在Rust中编写代码,但Java的文档似乎是相同的,当然还有底层的C++ rdkafka实现。
我调用的函数是这样的:
let subscription = consumer.subscription().unwrap();
它返回以下数据:
element.topic: "kafka_test"
element.partition: -1
element.offset: Invalid
element.metadata: ""
element.topic: "kafka_test_1.json"
element.partition: -1
element.offset: Invalid
element.metadata: ""
我不明白的是为什么它不返回偏移量,元数据或分区。我试着调用poll()
几次来获取记录,结果还是一样。我也尝试过提交当前偏移量,但输出仍然相同。
Poll返回数据--所以主题确实被创建了,并且它们确实有数据。
这里有一个小的测试代码(不完全完整)。
fn main() {
// This is actually different each time the program is run
let group_id = "test_group_id" + run_counter;
let mut client_config = ClientConfig::new();
let client_config = client_config
.set("bootstrap.servers", ...)
.set("security.protocol", ...)
.set("sasl.mechanisms", ...)
.set("sasl.username", ...)
.set("sasl.password", ...)
.set("group.id", group_id)
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.set_log_level(rdkafka::config::RDKafkaLogLevel::Debug);
let consumer: BaseConsumer = client_config
.create()
.expect("invalid producer config");
consumer.subscribe(&topic_names).expect("consumer subscribe failed");
test_function(&consumer, std::time::Duration::from_secs(30));
}
pub fn test_function(
consumer: &BaseConsumer,
poll_duration: Option<Duration>
)
{
let something = consumer.poll(poll_duration);
log::info!("something={:?}", something); // gets a valid record... (not "None")
// Test subscription()
let subscription = consumer.subscription().unwrap();
// Print elements of topic map
// As above, offset is invalid, partition is invalid ... why?
let elements = subscription.elements();
for element in elements.into_iter() {
let topic = element.topic();
let partition = element.partition();
let offset = element.offset();
let metadata = element.metadata();
log::info!("element.topic: {:?}", topic);
log::info!("element.partition: {:?}", partition);
log::info!("element.offset: {:?}", offset);
log::info!("element.metadata: {:?}", metadata);
}
}
1条答案
按热度按时间7xllpg7q1#
它并不意味着获取主题元数据,而只是获取分配给客户机的主题的名称(您可以看到Javadoc返回字符串集合,而不是主题对象)。
如果在subscribe函数中定义了一个字符串或静态列表,那么它就不是很有用,但是如果使用正则表达式模式,它可以帮助验证您是否阅读了正确的数据。
您需要使用
something
变量来获取这些值。有一些示例代码可以打印这些值。
您也可以使用
StreamConsumer
而不是较低级别的基本值