我是Flink的新手。我实现了一个应用程序,它使用来自kafka的消息,然后对它们进行处理。该应用程序在k8s独立模式下运行,flink版本为1.12.2,使用k8s ha。
下面是我用来创建消费者的代码
private static FlinkKafkaConsumer<byte[]> createKafkaConsumer(
TopologyConfig config,
String kafkaTopic,
String kafkaGroup) {
Properties consumerProperties = getKafkaDeserializerProperties(config, kafkaGroup);
FlinkKafkaConsumer<byte[]> kafkaConsumer = new FlinkKafkaConsumer<>(
kafkaTopic,
new BytesKafkaDeserializer(),
consumerProperties);
kafkaConsumer.setStartFromLatest();
return kafkaConsumer;
}
private static Properties getKafkaDeserializerProperties(TopologyConfig config, String groupName) {
Properties consumerProperties = newCommonKafkaProperties(config);
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
config.common.kafkaConsumerGroup + groupName);
consumerProperties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "50000000");
consumerProperties.setProperty(ConsumerConfig.RECEIVE_BUFFER_CONFIG, "1048576");
consumerProperties.setProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
consumerProperties.setProperty(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
return consumerProperties;
}
这是执行 ByteArrayDeserializer
```
public class BytesKafkaDeserializer extends AbstractDeserializationSchema<byte[]> {
@Override
public byte[] deserialize(byte[] bytes) throws IOException {
return bytes;
}
}
当我启动它时,flink可以很好地获取消息。但是,过了一段时间,flink就不再默默地使用消息,没有任何异常或日志。它是非常随机的,不可能有目的地重新制作。
我想弄清楚为什么会发生这种情况,我们是否有一个修复或解决办法。
任何帮助都将不胜感激!
暂无答案!
目前还没有任何答案,快来回答吧!