我使用Apache fink和FlinkKafkaConsumer,我有2个消费者读了1个主题(1partition)。在同一时间,两个消费者读了同一个事件。我希望消费者1接收事件1,消费者2不能接收事件1。我使用group.id,但它不工作。我参考Flink的文档,group.id不支持这个。
例如:topic(e1,e2,e3,e4,e5)
结果:
消费者1:e1、e2、e3、e4、e5
消费者2:e1、e2、e3、e4、e5
期望:
消费者1:e1,e3
消费者2:e2、e4、e5
这是我的代码
String kafkaBootstrapServers = config.get(SOURCE_KAFKA_BOOTSTRAP_SERVERS);
String kafkaGroupId = config.get(SOURCE_KAFKA_GROUPID);
String topic = config.get(SOURCE_KAFKA_TOPIC);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
properties.setProperty("group.id", kafkaGroupId);
properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("ssl.endpoint.identification.algorithm", "");
properties.setProperty("ssl.truststore.location", config.get(SOURCE_KAFKA_TRUSTSTORE_LOCATION));
properties.setProperty("ssl.truststore.password", config.get(SOURCE_KAFKA_TRUSTSTORE_PASS));
properties.put("sasl.jaas.config",
String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkascram\" password=\"%s\";", config.get(SOURCE_KAFKA_TRUSTSTORE_PASS)));
FlinkKafkaConsumer<EventBase> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleSyslogEventSchema(), properties);
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;
字符串
1条答案
按热度按时间5vf7fwbs1#
FlinkKafkaConsumer
在Flink 1.14中被弃用,并被新的KafkaSource
API取代,您可能希望使用它来代替旧的,弃用的API。同一消费者的类似实现看起来像这样:字符串
从简化的Angular 来看,您现有的
group.id
看起来对旧的FlinkKafkaConsumer
API是正确的,但是需要注意的是,this * 的值必须 * 是相同的(最好是作业的某个常量),以确保消费者组偏移量如您所期望的那样共享。