我见过flink consumer,它具有以下属性:在程序开始时,flink以大约400条消息/秒的速度消耗记录,但后来随着numbuffersout超过1000000,数据速率下降到200条消息/秒。我已经将parallelism设置为1,它是基于avro的消费者,检查点被禁用。还有人面临同样的问题吗?
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", bootstrapAddress);
kafkaProps.setProperty("zookeeper.connect", zookeeperConnect);
kafkaProps.setProperty("group.id", groupId);
kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
ConfluentAvroDeserializationSchema deserializer = new ConfluentAvroDeserializationSchema(schemaRegURL);
FlinkKafkaConsumer010<GenericRecord> flinkKafkaConsumer = new FlinkKafkaConsumer010<GenericRecord>(topic, deserializer, kafkaProps);
flinkKafkaConsumer.setStartFromLatest();
FlinkConnectorRateLimiter rateLimiter = new GuavaFlinkConnectorRateLimiter();
rateLimiter.setRate(12000000L);
flinkKafkaConsumer.setRateLimiter(rateLimiter);
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
//ConfluentAvroDeserializationSchema
@Override
public GenericRecord deserialize(byte[] message) {
if (kafkaAvroDecoder == null) {
System.out.println("Kafka serizlier");
SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
}
return (GenericRecord) this.kafkaAvroDecoder.fromBytes(message);
}
暂无答案!
目前还没有任何答案,快来回答吧!