无法将消息从Kafka资源正确分发到黑兹卡斯特喷气机群

kqlmhetl  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(172)

我们用Kafka作为我们喷气机工作的来源。我们希望将kafka中接收到的消息分发给在不同机器上运行的每个jet示例。
我们能够使用groupconfig(设置名称和密码)和joinconfig(添加ip地址)成功地加入来自不同机器的成员。
当我们将一条消息发送到kafka源主题时,要么所有的机器都读取同一条消息,要么同一条消息在一台机器上被处理两次。
例如,我们有一个kafka主题,它由4个分区和2台不同的机器创建,运行的正好是一个jet集群。当我们运行应用程序并示例化jet示例时,我们看到2个jet成员被连接起来,分区在2个jet成员之间被拆分。
当发送“message\ one”这样的消息时,会发生以下两种情况之一:
消息由两个jet示例读取
消息只被一个jet示例读取两次
在我们的例子中,两者都不需要,因为我们不希望消息被复制。
如何防止消息处理两次?
下面是代码段:

JetConfig jetConfig = new JetConfig();
    jetConfig.getInstanceConfig().setCooperativeThreadCount(1);
    jetConfig.getInstanceConfig().setFlowControlPeriodMs(200);
    jetConfig.getHazelcastConfig().getGroupConfig().setName("TEST_NAME");
    jetConfig.getHazelcastConfig().getGroupConfig().setPassword("TEST_PASSWORD");
    jetConfig.getHazelcastConfig().getNetworkConfig().setReuseAddress(true);

    final JoinConfig join = jetConfig.getHazelcastConfig().getNetworkConfig().getJoin();
    join.getMulticastConfig().setEnabled(false);
    join.getTcpIpConfig().setEnabled(true).addMember("test.ip.address.one");
    join.getTcpIpConfig().setEnabled(true).addMember("test.ip.address.two");
    jetConfig.getHazelcastConfig().getNetworkConfig().getInterfaces()
            .setEnabled(true)
            .addInterface("test.ip.address.one")
            .addInterface("test.ip.address.two");

    final JetInstance jet =Jet.newJetInstance(jetConfig);

    final Properties consumeProps = new Properties();
    final Properties produceProps=new Properties();
    consumeProps.setProperty("bootstrap.servers", kafkaBootstrapServers);           
    consumeProps.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());          
    consumeProps.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
    consumeProps.setProperty("auto.offset.reset", "latest");
    consumeProps.setProperty("enable.auto.commit", "true");
    consumeProps.setProperty("group.id", kafkaLoadBalancerGroupName);
    consumeProps.setProperty("partition.assignment.strategy", RoundRobinAssignor.class.getCanonicalName());

    produceProps.setProperty("bootstrap.servers", kafkaBootstrapServers);   
    produceProps.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
    produceProps.setProperty("value.serializer", StringSerializer.class.getCanonicalName());

    final Pipeline p = Pipeline.create();
    p.drawFrom(KafkaSources.kafka(consumeProps, kafkaSourceTopic))
     .filter(Objects::nonNull)
     .map(TestClass::parseMessage)
     .filter(Objects::nonNull)
     .drainTo(KafkaSinks.kafka(produceProps, kafkaEnrichedTopic));
    jet.newJob(p).join();

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题