我正在使用Flink1.4.2读取Kafka的数据,并将其解析为 ObjectNode
使用 JSONDeserializationSchema
. 如果传入的记录不是有效的json,那么我的flink作业将失败。我想跳过那破记录,而不是不及格。
FlinkKafkaConsumer010<ObjectNode> kafkaConsumer =
new FlinkKafkaConsumer010<>(TOPIC, new JSONDeserializationSchema(), consumerProperties);
DataStream<ObjectNode> messageStream = env.addSource(kafkaConsumer);
messageStream.print();
如果kafka中的数据不是有效的json,我将得到以下异常。
Job execution switched to status FAILING.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'This': was expecting ('true', 'false' or 'null')
at [Source: [B@4f522623; line: 1, column: 6]
Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
3条答案
按热度按时间7uzetpgm1#
flink改进了flinkkafkaconsumer的空记录处理
当
DeserializationSchema
遇到损坏的消息。它可以抛出一个IOException
这会导致管道重新启动,或者它会返回null
flink kafka的消费者会悄悄跳过损坏的消息。有关更多详细信息,请参见此链接。
1tuwyuhd2#
最简单的解决方案是实现自己的
DeserializationSchema
PackageJSONDeserializationSchema
. 然后可以捕获异常并忽略它或执行自定义操作。c0vxltue3#
按照@twalthr的建议,我实现了自己的
DeserializationSchema
通过复制JSONDeserializationSchema
并添加了异常处理。在我的流媒体工作中。