我在二上应用并集运算符 DataStream
通用记录类型的。
package com.gslab.com.dataSets;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkBroadcast {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
List<String> controlMessageList = new ArrayList<String>();
controlMessageList.add("controlMessage1");
controlMessageList.add("controlMessage2");
List<String> dataMessageList = new ArrayList<String>();
dataMessageList.add("Person1");
dataMessageList.add("Person2");
dataMessageList.add("Person3");
dataMessageList.add("Person4");
DataStream<String> controlMessageStream = env.fromCollection(controlMessageList);
DataStream<String> dataMessageStream = env.fromCollection(dataMessageList);
DataStream<GenericRecord> controlMessageGenericRecordStream = controlMessageStream.map(new MapFunction<String, GenericRecord>() {
@Override
public GenericRecord map(String value) throws Exception {
Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/controlMessageSchema.avsc")));
gr.put("TYPE", value);
return gr;
}
});
DataStream<GenericRecord> dataMessageGenericRecordStream = dataMessageStream.map(new MapFunction<String, GenericRecord>() {
@Override
public GenericRecord map(String value) throws Exception {
Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/dataMessageSchema.avsc")));
gr.put("FIRSTNAME", value);
gr.put("LASTNAME", value+": lastname");
return gr;
}
});
//Displaying Generic records
dataMessageGenericRecordStream.map(new MapFunction<GenericRecord, GenericRecord>() {
@Override
public GenericRecord map(GenericRecord value) throws Exception {
System.out.println("data before union: "+ value);
return value;
}
});
controlMessageGenericRecordStream.broadcast().union(dataMessageGenericRecordStream).map(new MapFunction<GenericRecord, GenericRecord>() {
@Override
public GenericRecord map(GenericRecord value) throws Exception {
System.out.println("data after union: " + value);
return value;
}
});
env.execute("stream");
}
}
输出:
05/09/2016 13:02:13 Map(2/2) switched to FINISHED
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2: lastname"}
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1: lastname"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1"}
data before union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4: lastname"}
data before union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3: lastname"}
data after union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2"}
data after union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3"}
05/09/2016 13:02:13 Map -> Map(2/2) switched to FINISHED
data after union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4"}
05/09/2016 13:02:13 Map -> Map(1/2) switched to FINISHED
05/09/2016 13:02:13 Map(1/2) switched to FINISHED
05/09/2016 13:02:13 Map(2/2) switched to FINISHED
05/09/2016 13:02:13 Job execution switched to status FINISHED.
正如您所看到的,datamessagegenericrecordstream中的记录在联合之后是不正确的。所有字段值都将被第一个字段值替换。
2条答案
按热度按时间ebdffaop1#
我在datasetapi中也遇到了类似的问题。我读了一些avro文件作为通用记录,看到了这种奇怪的行为。我使用了这个解决方法-我没有将它们作为genericords读取,而是将它们作为特定记录(例如myavroobject)读取,然后使用Map将它们转换/类型转换为genericords。
我编写了一些代码来使用DataSeteAPI测试您的用例,它与上面的解决方法一起工作-
其中query和searchengine query是我的avro对象(类似于控制消息列表和数据消息列表)。
输出:
ru9i0ody2#
我花了几天的时间来研究另一个问题(但仍然涉及genericord),并找到了根本原因和解决方案。
根本原因:在apacheavro“schema.class”中,“field”位置是暂时的,不能被kryo序列化,因此在flink管道中反序列化时会被初始化为位置“0”。
见jira avro-1476,其中描述了这一点,并特别提到了kyro序列化。
这是在avro 1.7.7中修复的
解决方案:flink必须使用avro1.7.7(或更高版本)。我已经在本地机器上通过替换flink-dist_2.11-1.1.3.jar中的avro类验证了这个修复,它纠正了我的问题。
我更新了jira的问题:https://issues.apache.org/jira/browse/flink-5039
现在有一个公关:https://github.com/apache/flink/pull/2953
我希望它会包含在Flink1.1.4和1.2.0版本中。