有一个java flink代码,我想用一个随机数来求keyby
,所以我实现了KeySelector
,下面代码中注解掉的那一行是什么,但是会有一些问题。
1.当我把parallelism设置为1时,它会工作,但是当parallelism大于1时,会有一个错误,它发生在print“=process”之前。
2.当我预先在map方法中设置随机数,并将keyby作为MaxwellSend对象的字段时,无论并行度如何,它都会工作。keyBy((KeySelector<MaxwellSend, Integer>) value -> new Random().nextInt(10)+1)
这似乎不起作用。似乎所有的数据都进入了同一个分区,为什么在Keyby中生成随机数不起作用?
Caused by: java.lang.NullPointerException
at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:99)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:750)
DataStreamSource<String> streamSource = env.addSource(consumer);
SingleOutputStreamOperator<MaxwellSend> map =
streamSource.map(data -> {
MaxwellSend maxwellSend = mapper.readValue(data, MaxwellSend.class);
maxwellSend.setRandomId(new Random().nextInt(10)+1);
return maxwellSend;
});
SingleOutputStreamOperator<DataToMySQL> process = map
// .keyBy((KeySelector<MaxwellSend, Integer>) value -> new Random().nextInt(10)+1)
.keyBy(MaxwellSend::getRandomId)
.timeWindow(Time.seconds(2))
.process(new ProcessWindowFunction<MaxwellSend, DataToMySQL, Integer, TimeWindow>() {
@Override
public void process(Integer tuple, Context context, Iterable<MaxwellSend> elements, Collector<DataToMySQL> out) throws Exception {
System.out.println("=====process");
}
});
process.addSink(new MySQLSink());
1条答案
按热度按时间zynd9foi1#
你的代码有几个问题,关于你的问题的一些输入...
1.始终包含您在问题中使用的Flink版本。这有助于当(除其他外)有一个堆栈跟踪进入Flink代码。
1.您不想为密钥使用随机值,请参阅this answer了解更多详细信息。
3.如果你使用
keyBy((KeySelector<MaxwellSend, Integer>) value -> new Random().nextInt(10)+1)
之类的东西,并且你的并行度不明显大于10,你可以很容易地将所有键分配(通过哈希分区)到同一个插槽。