我们用Kafka作为我们喷气机工作的来源。我们希望将kafka中接收到的消息分发给在不同机器上运行的每个jet示例。
我们能够使用groupconfig(设置名称和密码)和joinconfig(添加ip地址)成功地加入来自不同机器的成员。
当我们将一条消息发送到kafka源主题时,要么所有的机器都读取同一条消息,要么同一条消息在一台机器上被处理两次。
例如,我们有一个kafka主题,它由4个分区和2台不同的机器创建,运行的正好是一个jet集群。当我们运行应用程序并示例化jet示例时,我们看到2个jet成员被连接起来,分区在2个jet成员之间被拆分。
当发送“message\ one”这样的消息时,会发生以下两种情况之一:
消息由两个jet示例读取
消息只被一个jet示例读取两次
在我们的例子中,两者都不需要,因为我们不希望消息被复制。
如何防止消息处理两次?
下面是代码段:
JetConfig jetConfig = new JetConfig();
jetConfig.getInstanceConfig().setCooperativeThreadCount(1);
jetConfig.getInstanceConfig().setFlowControlPeriodMs(200);
jetConfig.getHazelcastConfig().getGroupConfig().setName("TEST_NAME");
jetConfig.getHazelcastConfig().getGroupConfig().setPassword("TEST_PASSWORD");
jetConfig.getHazelcastConfig().getNetworkConfig().setReuseAddress(true);
final JoinConfig join = jetConfig.getHazelcastConfig().getNetworkConfig().getJoin();
join.getMulticastConfig().setEnabled(false);
join.getTcpIpConfig().setEnabled(true).addMember("test.ip.address.one");
join.getTcpIpConfig().setEnabled(true).addMember("test.ip.address.two");
jetConfig.getHazelcastConfig().getNetworkConfig().getInterfaces()
.setEnabled(true)
.addInterface("test.ip.address.one")
.addInterface("test.ip.address.two");
final JetInstance jet =Jet.newJetInstance(jetConfig);
final Properties consumeProps = new Properties();
final Properties produceProps=new Properties();
consumeProps.setProperty("bootstrap.servers", kafkaBootstrapServers);
consumeProps.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
consumeProps.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
consumeProps.setProperty("auto.offset.reset", "latest");
consumeProps.setProperty("enable.auto.commit", "true");
consumeProps.setProperty("group.id", kafkaLoadBalancerGroupName);
consumeProps.setProperty("partition.assignment.strategy", RoundRobinAssignor.class.getCanonicalName());
produceProps.setProperty("bootstrap.servers", kafkaBootstrapServers);
produceProps.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
produceProps.setProperty("value.serializer", StringSerializer.class.getCanonicalName());
final Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.kafka(consumeProps, kafkaSourceTopic))
.filter(Objects::nonNull)
.map(TestClass::parseMessage)
.filter(Objects::nonNull)
.drainTo(KafkaSinks.kafka(produceProps, kafkaEnrichedTopic));
jet.newJob(p).join();
暂无答案!
目前还没有任何答案,快来回答吧!