Flink Kafka消费者group.id不工作

amrnrhlw  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(251)

我使用Apache fink和FlinkKafkaConsumer,我有2个消费者读了1个主题(1partition)。在同一时间,两个消费者读了同一个事件。我希望消费者1接收事件1,消费者2不能接收事件1。我使用group.id,但它不工作。我参考Flink的文档,group.id不支持这个。
例如:topic(e1,e2,e3,e4,e5)
结果:
消费者1:e1、e2、e3、e4、e5
消费者2:e1、e2、e3、e4、e5
期望:
消费者1:e1,e3
消费者2:e2、e4、e5
这是我的代码

String kafkaBootstrapServers = config.get(SOURCE_KAFKA_BOOTSTRAP_SERVERS);
String kafkaGroupId = config.get(SOURCE_KAFKA_GROUPID);
String topic = config.get(SOURCE_KAFKA_TOPIC);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
properties.setProperty("group.id", kafkaGroupId);

properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");
properties.setProperty("ssl.endpoint.identification.algorithm", "");
properties.setProperty("ssl.truststore.location", config.get(SOURCE_KAFKA_TRUSTSTORE_LOCATION));
properties.setProperty("ssl.truststore.password", config.get(SOURCE_KAFKA_TRUSTSTORE_PASS));
properties.put("sasl.jaas.config",
       String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkascram\" password=\"%s\";", config.get(SOURCE_KAFKA_TRUSTSTORE_PASS)));

FlinkKafkaConsumer<EventBase> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleSyslogEventSchema(), properties);
kafkaConsumer.setStartFromGroupOffsets();
return kafkaConsumer;

字符串

5vf7fwbs

5vf7fwbs1#

FlinkKafkaConsumer在Flink 1.14中被弃用,并被新的KafkaSource API取代,您可能希望使用它来代替旧的,弃用的API。同一消费者的类似实现看起来像这样:

KafkaSource.builder<EventBase>()
    .setBootstrapServers(kafkaBootstrapServers)
    .setTopics(topic)
    .setGroupId(kafkaGroupId)
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(SimpleSyslogEventSchema())
    // Add your SASL Configuration here
    .setProperty("security.protocol", "SASL_SSL")
    .setProperty("sasl.mechanism", "SCRAM-SHA-512") 
    .setProperty("ssl.endpoint.identification.algorithm", "")
    .setProperty("ssl.truststore.location", config.get(SOURCE_KAFKA_TRUSTSTORE_LOCATION)). 
    .setProperty("ssl.truststore.password", config.get(SOURCE_KAFKA_TRUSTSTORE_PASS))
    .setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkascram\" password=\"%s\";", config.get(SOURCE_KAFKA_TRUSTSTORE_PASS)));
    .build();

字符串
从简化的Angular 来看,您现有的group.id看起来对旧的FlinkKafkaConsumer API是正确的,但是需要注意的是,this * 的值必须 * 是相同的(最好是作业的某个常量),以确保消费者组偏移量如您所期望的那样共享。

相关问题