flink kafka消费者吞吐量随时间减少

cclgggtu  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(749)

我见过flink consumer,它具有以下属性:在程序开始时,flink以大约400条消息/秒的速度消耗记录,但后来随着numbuffersout超过1000000,数据速率下降到200条消息/秒。我已经将parallelism设置为1,它是基于avro的消费者,检查点被禁用。还有人面临同样的问题吗?

Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", bootstrapAddress);
        kafkaProps.setProperty("zookeeper.connect", zookeeperConnect);
        kafkaProps.setProperty("group.id", groupId);
        kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        kafkaProps.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
        ConfluentAvroDeserializationSchema deserializer = new ConfluentAvroDeserializationSchema(schemaRegURL);
        FlinkKafkaConsumer010<GenericRecord> flinkKafkaConsumer = new FlinkKafkaConsumer010<GenericRecord>(topic, deserializer, kafkaProps);
        flinkKafkaConsumer.setStartFromLatest();
        FlinkConnectorRateLimiter rateLimiter = new GuavaFlinkConnectorRateLimiter();
        rateLimiter.setRate(12000000L);
        flinkKafkaConsumer.setRateLimiter(rateLimiter);
        flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false);
//ConfluentAvroDeserializationSchema
    @Override
    public GenericRecord deserialize(byte[] message) {
        if (kafkaAvroDecoder == null) {
            System.out.println("Kafka serizlier");
            SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
            this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
        }
        return (GenericRecord) this.kafkaAvroDecoder.fromBytes(message);
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题