我是Kafka的新人。通常我会编写一个小的java演示应用程序,设置一个kafka消费者并从一个3Kafka服务器集群获取数据。而且效果很好。我将设置如下服务器 props.put("bootstrap.servers", "192.168.22.1:9092,192.168.22.2:9092,192.168.22.3:9092")
,并将订阅以下主题 consumer.subscribe(Arrays.asList("test_topic_1","test_topic_2","test_topic_3"))
. 现在我需要使用来自两个不同集群的数据。
因此kafka服务器将“192.168.22.1:9092192.168.22.2:9092192.168.22.3:9092”作为一个集群,“192.168.22.4:9092192.168.22.5:9092192.168.22.6:9092”作为另一个集群。
主题将是“test\u topic \u 1”、“test\u topic \u 2”、“test\u topic \u 3”(来自群集1)、“test\u topic \u 4”、“test\u topic \u 5”、“test\u topic \u 6”(来自群集2)。
我可以在同一个java应用程序中这样做吗?我试过了,但只能使用来自1个集群的数据。我该怎么做?太好了,谢谢。
谢谢@yaswanth,我用了两个示例。请看下面我的代码。
public class Consumer {
public static void main(String[] args) {
System.out.println("begin consumer");
consume();
consume2();
System.out.println("finish consumer");
}
public static void consume() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.116.13:9092");
props.put("group.id", "group-test1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test_topic_1"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic()+"---------------------------"+record.value());
}
}
}
public static void consume2() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.116.37:9092");
props.put("group.id", "group-test2");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer2 = new KafkaConsumer<String, String>(props);
consumer2.subscribe(Arrays.asList("test_topic_2"));
while (true) {
ConsumerRecords<String, String> records = consumer2.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic()+"---------------------------"+record.value());
}
}
}
}
谢谢你的帮助,@yaswanth,很管用。
1条答案
按热度按时间lndjwyie1#
这行不通,因为
consume2()
从来没有人打过电话。consume()
以及consume2()
似乎是正确的。你得跑了consume()
以及consume2()
在两个不同的线程中。你能做到的。从主线程开始两个不同的线程并进行 Packageconsume()
以及consume2()
在一个Runnable
或者Callable
.给你个主意,
注意:上面的代码只是给你一个如何实现它的想法。您可以参数化属性并将其作为构造函数arg传递,并且只有一个consumer类的实现。