在flink中解析json时如何处理异常

nle07wnf  于 2021-06-21  发布在  Flink
关注(0)|答案(3)|浏览(2086)

我正在使用Flink1.4.2读取Kafka的数据,并将其解析为 ObjectNode 使用 JSONDeserializationSchema . 如果传入的记录不是有效的json,那么我的flink作业将失败。我想跳过那破记录,而不是不及格。

FlinkKafkaConsumer010<ObjectNode> kafkaConsumer =
                new FlinkKafkaConsumer010<>(TOPIC, new JSONDeserializationSchema(), consumerProperties);
DataStream<ObjectNode> messageStream = env.addSource(kafkaConsumer);
messageStream.print();

如果kafka中的数据不是有效的json,我将得到以下异常。

Job execution switched to status FAILING.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'This': was expecting ('true', 'false' or 'null')
 at [Source: [B@4f522623; line: 1, column: 6]
Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
7uzetpgm

7uzetpgm1#

flink改进了flinkkafkaconsumer的空记录处理
DeserializationSchema 遇到损坏的消息。它可以抛出一个 IOException 这会导致管道重新启动,或者它会返回 null flink kafka的消费者会悄悄跳过损坏的消息。
有关更多详细信息,请参见此链接。

1tuwyuhd

1tuwyuhd2#

最简单的解决方案是实现自己的 DeserializationSchema Package JSONDeserializationSchema . 然后可以捕获异常并忽略它或执行自定义操作。

c0vxltue

c0vxltue3#

按照@twalthr的建议,我实现了自己的 DeserializationSchema 通过复制 JSONDeserializationSchema 并添加了异常处理。

import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;

public class CustomJSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {

    private ObjectMapper mapper;

    @Override
    public ObjectNode deserialize(byte[] message) throws IOException {
        if (mapper == null) {
            mapper = new ObjectMapper();
        }

        ObjectNode objectNode;
        try {
            objectNode = mapper.readValue(message, ObjectNode.class);
        } catch (Exception e) {
            ObjectMapper errorMapper = new ObjectMapper();
            ObjectNode errorObjectNode = errorMapper.createObjectNode();
            errorObjectNode.put("jsonParseError", new String(message));
            objectNode = errorObjectNode;
        }
        return objectNode;
    }

    @Override
    public boolean isEndOfStream(ObjectNode nextElement) {
        return false;
    }

}

在我的流媒体工作中。

messageStream
        .filter((event) -> {
            if(event.has("jsonParseError")) {
                LOG.warn("JsonParseException was handled: " + event.get("jsonParseError").asText());
                return false;
            }
            return true;
        }).print();

相关问题