flink serializationschema:无法序列化行错误

3lxsmp7m  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(534)

我在使用flink的serializationschema时遇到了一些问题。
这是我的主要代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DeserializationSchema<Row> sourceDeserializer = new JsonRowDeserializationSchema.Builder( /*Extract TypeInformation<Row> from an avsc schema file*/ ).build();
DataStream<Row> myDataStream = env.addSource( new MyCustomSource(sourceDeserializer) ) ;
final SinkFunction<Row> sink = new MyCustomSink(new JsonRowSerializationSchema.Builder(myDataStream.getType()).build());
myDataStream.addSink(sink).name("MyCustomSink");

env.execute("MyJob");

以下是我的自定义接收器功能:

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("serial")
public class MyCustomSink implements SinkFunction<Row> {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSink.class);
    private final boolean print;
    private final SerializationSchema<Row> serializationSchema;

    public MyCustomSink(final SerializationSchema<Row> serializationSchema) {
        this.serializationSchema = serializationSchema;
    }

    @Override
    public void invoke(final Row value, final Context context) throws Exception {

        try {
            LOGGER.info("MyCustomSink- invoke : [{}]", new String(serializationSchema.serialize(value)));
        }catch (Exception e){
            LOGGER.error("MyCustomSink- Error while sending data : " + e);
        }
    }
}

下面是我的自定义源函数(不确定它对我遇到的问题是否有用):

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.io.ByteStreams;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyCustomSource<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {

    /**logger */
    private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSource.class);

    /**the JSON deserializer */
    private final DeserializationSchema<T> deserializationSchema;

    public MyCustomSource(final DeserializationSchema<T> deserializer) {
        this.deserializationSchema = deserializer;
    }

    @Override
    public void open(final Configuration parameters) {
        ...
    }

    @Override
    public void run(final SourceContext<T> ctx) throws Exception {
        LOGGER.info("run");
        InputStream data = ...; // Retrieve the input json data
        final T row = deserializationSchema
                        .deserialize(ByteStreams.toByteArray(data));
        ctx.collect(row);

    }

    @Override
    public void cancel() {
        ...
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return deserializationSchema.getProducedType();
    }
}

现在,我运行代码并将一些数据按顺序发送到管道:

==> 
{
    "id": "sensor1",
    "data":{
        "rotation": 250
    }
}

在这里,数据由我的接收器正确打印: MyCustomSink- invoke : [{"id":"sensor1","data":{"rotation":250}}] ```
==>
{
"id": "sensor1"
}

在这里,数据由我的接收器正确打印: `MyCustomSink- invoke : [{"id":"sensor1","data":null}]` ```
==> 
{
    "id": "sensor1",
    "data":{
        "rotation": 250
    }
}

这里,序列化出错。打印的错误日志为:

MyCustomSink- Error while sending data : java.lang.RuntimeException: Could not serialize row 'sensor1,250'. Make sure that the schema matches the input.

我完全不明白为什么我会有这种行为。有人有主意吗?
笔记:
使用flink 1.9.2
--编辑--
我添加了customsource部分
--编辑2--
经过进一步调查,这种行为似乎是由 private transient ObjectNode nodeJsonRowSerializationSchema . 如果我理解正确,这是用于优化,但似乎是我的问题的原因。
这是正常的行为吗?如果是的话,这个类在我的例子中的正确用法是什么(否则,有没有办法绕过这个问题?)

wkftcu5l

wkftcu5l1#

这是一个 JsonRowSerializationSchema 在最新的flink版本中修复的bug-我相信,这个pr解决了上面的问题。

相关问题