我可以在同一个java应用程序中从两个不同的kafka服务器集群获取数据吗?

cigdeys3  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(488)

我是Kafka的新人。通常我会编写一个小的java演示应用程序,设置一个kafka消费者并从一个3Kafka服务器集群获取数据。而且效果很好。我将设置如下服务器 props.put("bootstrap.servers", "192.168.22.1:9092,192.168.22.2:9092,192.168.22.3:9092") ,并将订阅以下主题 consumer.subscribe(Arrays.asList("test_topic_1","test_topic_2","test_topic_3")) . 现在我需要使用来自两个不同集群的数据。
因此kafka服务器将“192.168.22.1:9092192.168.22.2:9092192.168.22.3:9092”作为一个集群,“192.168.22.4:9092192.168.22.5:9092192.168.22.6:9092”作为另一个集群。
主题将是“test\u topic \u 1”、“test\u topic \u 2”、“test\u topic \u 3”(来自群集1)、“test\u topic \u 4”、“test\u topic \u 5”、“test\u topic \u 6”(来自群集2)。
我可以在同一个java应用程序中这样做吗?我试过了,但只能使用来自1个集群的数据。我该怎么做?太好了,谢谢。
谢谢@yaswanth,我用了两个示例。请看下面我的代码。

public class Consumer {
    public static void main(String[] args) {
    System.out.println("begin consumer");
    consume();
    consume2();
    System.out.println("finish consumer");
     }
    public static void consume() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.116.13:9092");
    props.put("group.id", "group-test1");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "earliest");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer",         "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Arrays.asList("test_topic_1"));
     while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.topic()+"---------------------------"+record.value());
        }
    }
}
    public static void consume2() {

    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.116.37:9092");
    props.put("group.id", "group-test2");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "earliest");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer2 = new KafkaConsumer<String, String>(props);
    consumer2.subscribe(Arrays.asList("test_topic_2"));     
    while (true) {
        ConsumerRecords<String, String> records = consumer2.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.topic()+"---------------------------"+record.value());
        }
    }
}

}
谢谢你的帮助,@yaswanth,很管用。

lndjwyie

lndjwyie1#

这行不通,因为 consume2() 从来没有人打过电话。 consume() 以及 consume2() 似乎是正确的。你得跑了 consume() 以及 consume2() 在两个不同的线程中。你能做到的。从主线程开始两个不同的线程并进行 Package consume() 以及 consume2() 在一个 Runnable 或者 Callable .
给你个主意,

public class Consumer2 implements Runnable {

    @Override
    public void run() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.116.37:9092");
        props.put("group.id", "group-test2");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer2 = new KafkaConsumer<String, String>(props);
        consumer2.subscribe(Arrays.asList("test_topic_2"));
        while (true) {
            ConsumerRecords<String, String> records = consumer2.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.topic()+"---------------------------"+record.value());
            }
        }
    }
}

public class Consumer1 implements Runnable {

    @Override
    public void run() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.116.13:9092");
        props.put("group.id", "group-test1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("test_topic_1"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.topic() + "---------------------------" + record.value());
            }
        }
    }
}

public class AppStarter {

    public void init() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        List<Runnable> runnables = new ArrayList<>();
        Future future1 = executorService.submit(new Consumer1());
        Future future2 = executorService.submit(new Consumer2());

        try {
            future1.get();
            future2.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        AppStarter appStarter = new AppStarter();
        appStarter.init();
    }
}

注意:上面的代码只是给你一个如何实现它的想法。您可以参数化属性并将其作为构造函数arg传递,并且只有一个consumer类的实现。

相关问题