我在使用flink的serializationschema时遇到了一些问题。
这是我的主要代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DeserializationSchema<Row> sourceDeserializer = new JsonRowDeserializationSchema.Builder( /*Extract TypeInformation<Row> from an avsc schema file*/ ).build();
DataStream<Row> myDataStream = env.addSource( new MyCustomSource(sourceDeserializer) ) ;
final SinkFunction<Row> sink = new MyCustomSink(new JsonRowSerializationSchema.Builder(myDataStream.getType()).build());
myDataStream.addSink(sink).name("MyCustomSink");
env.execute("MyJob");
以下是我的自定义接收器功能:
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("serial")
public class MyCustomSink implements SinkFunction<Row> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSink.class);
private final boolean print;
private final SerializationSchema<Row> serializationSchema;
public MyCustomSink(final SerializationSchema<Row> serializationSchema) {
this.serializationSchema = serializationSchema;
}
@Override
public void invoke(final Row value, final Context context) throws Exception {
try {
LOGGER.info("MyCustomSink- invoke : [{}]", new String(serializationSchema.serialize(value)));
}catch (Exception e){
LOGGER.error("MyCustomSink- Error while sending data : " + e);
}
}
}
下面是我的自定义源函数(不确定它对我遇到的问题是否有用):
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.io.ByteStreams;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyCustomSource<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
/**logger */
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomSource.class);
/**the JSON deserializer */
private final DeserializationSchema<T> deserializationSchema;
public MyCustomSource(final DeserializationSchema<T> deserializer) {
this.deserializationSchema = deserializer;
}
@Override
public void open(final Configuration parameters) {
...
}
@Override
public void run(final SourceContext<T> ctx) throws Exception {
LOGGER.info("run");
InputStream data = ...; // Retrieve the input json data
final T row = deserializationSchema
.deserialize(ByteStreams.toByteArray(data));
ctx.collect(row);
}
@Override
public void cancel() {
...
}
@Override
public TypeInformation<T> getProducedType() {
return deserializationSchema.getProducedType();
}
}
现在,我运行代码并将一些数据按顺序发送到管道:
==>
{
"id": "sensor1",
"data":{
"rotation": 250
}
}
在这里,数据由我的接收器正确打印: MyCustomSink- invoke : [{"id":"sensor1","data":{"rotation":250}}]
```
==>
{
"id": "sensor1"
}
在这里,数据由我的接收器正确打印: `MyCustomSink- invoke : [{"id":"sensor1","data":null}]` ```
==>
{
"id": "sensor1",
"data":{
"rotation": 250
}
}
这里,序列化出错。打印的错误日志为:
MyCustomSink- Error while sending data : java.lang.RuntimeException: Could not serialize row 'sensor1,250'. Make sure that the schema matches the input.
我完全不明白为什么我会有这种行为。有人有主意吗?
笔记:
使用flink 1.9.2
--编辑--
我添加了customsource部分
--编辑2--
经过进一步调查,这种行为似乎是由 private transient ObjectNode node
的 JsonRowSerializationSchema
. 如果我理解正确,这是用于优化,但似乎是我的问题的原因。
这是正常的行为吗?如果是的话,这个类在我的例子中的正确用法是什么(否则,有没有办法绕过这个问题?)
1条答案
按热度按时间wkftcu5l1#
这是一个
JsonRowSerializationSchema
在最新的flink版本中修复的bug-我相信,这个pr解决了上面的问题。