Flink不再默默地谈论Kafka的主题

jdzmm42g  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(237)

我是Flink的新手。我实现了一个应用程序,它使用来自kafka的消息,然后对它们进行处理。该应用程序在k8s独立模式下运行,flink版本为1.12.2,使用k8s ha。
下面是我用来创建消费者的代码

private static FlinkKafkaConsumer<byte[]> createKafkaConsumer(
        TopologyConfig config,
        String kafkaTopic,
        String kafkaGroup) {

        Properties consumerProperties = getKafkaDeserializerProperties(config, kafkaGroup);

        FlinkKafkaConsumer<byte[]> kafkaConsumer = new FlinkKafkaConsumer<>(
            kafkaTopic,
            new BytesKafkaDeserializer(),
            consumerProperties);
        kafkaConsumer.setStartFromLatest();
        return kafkaConsumer;
    }

    private static Properties getKafkaDeserializerProperties(TopologyConfig config, String groupName) {
        Properties consumerProperties = newCommonKafkaProperties(config);
        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
            config.common.kafkaConsumerGroup + groupName);
        consumerProperties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "50000000");
        consumerProperties.setProperty(ConsumerConfig.RECEIVE_BUFFER_CONFIG, "1048576");
        consumerProperties.setProperty(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            ByteArrayDeserializer.class.getName());
        consumerProperties.setProperty(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            ByteArrayDeserializer.class.getName());
        return consumerProperties;
    }

这是执行 ByteArrayDeserializer ```
public class BytesKafkaDeserializer extends AbstractDeserializationSchema<byte[]> {

@Override
public byte[] deserialize(byte[] bytes) throws IOException {
    return bytes;
}

}

当我启动它时,flink可以很好地获取消息。但是,过了一段时间,flink就不再默默地使用消息,没有任何异常或日志。它是非常随机的,不可能有目的地重新制作。
我想弄清楚为什么会发生这种情况,我们是否有一个修复或解决办法。
任何帮助都将不胜感激!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题