Flink KeyBy随机数

3qpi33ja  于 2023-09-28  发布在  Apache
关注(0)|答案(1)|浏览(127)

有一个java flink代码,我想用一个随机数来求keyby,所以我实现了KeySelector,下面代码中注解掉的那一行是什么,但是会有一些问题。
1.当我把parallelism设置为1时,它会工作,但是当parallelism大于1时,会有一个错误,它发生在print“=process”之前。
2.当我预先在map方法中设置随机数,并将keyby作为MaxwellSend对象的字段时,无论并行度如何,它都会工作。
keyBy((KeySelector<MaxwellSend, Integer>) value -> new Random().nextInt(10)+1)这似乎不起作用。似乎所有的数据都进入了同一个分区,为什么在Keyby中生成随机数不起作用?

Caused by: java.lang.NullPointerException
    at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
    at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
    at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:99)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
    at java.lang.Thread.run(Thread.java:750)
DataStreamSource<String> streamSource = env.addSource(consumer);
        SingleOutputStreamOperator<MaxwellSend> map =
                streamSource.map(data -> {
                    MaxwellSend maxwellSend = mapper.readValue(data, MaxwellSend.class);
                    maxwellSend.setRandomId(new Random().nextInt(10)+1);
                    return maxwellSend;
                });
        SingleOutputStreamOperator<DataToMySQL> process = map
            //  .keyBy((KeySelector<MaxwellSend, Integer>) value -> new Random().nextInt(10)+1)
                .keyBy(MaxwellSend::getRandomId)
                .timeWindow(Time.seconds(2))
                .process(new ProcessWindowFunction<MaxwellSend, DataToMySQL, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer tuple, Context context, Iterable<MaxwellSend> elements, Collector<DataToMySQL> out) throws Exception {
                        System.out.println("=====process");
                    }
                });

        process.addSink(new MySQLSink());
zynd9foi

zynd9foi1#

你的代码有几个问题,关于你的问题的一些输入...
1.始终包含您在问题中使用的Flink版本。这有助于当(除其他外)有一个堆栈跟踪进入Flink代码。
1.您不想为密钥使用随机值,请参阅this answer了解更多详细信息。
3.如果你使用keyBy((KeySelector<MaxwellSend, Integer>) value -> new Random().nextInt(10)+1)之类的东西,并且你的并行度不明显大于10,你可以很容易地将所有键分配(通过哈希分区)到同一个插槽。

相关问题