flink(kinesis数据分析)事件会话窗口

vx6bjr1n  于 2021-06-26  发布在  Java
关注(0)|答案(0)|浏览(318)

我是flink(kinesis数据分析aws服务)的新手。我在实现eventtimesessionwindows时遇到了困难。下面是我的代码:

SingleOutputStreamOperator<DeserializedObj> deserializedObjSingleOutputStreamOperator = kinesisStream
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<DeserializedObj>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, timestamp) -> Long.parseLong(event.timestamp)));

        SingleOutputStreamOperator<String> sessionStream = deserializedObjSingleOutputStreamOperator
                .keyBy("anonymousId")
                .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
                .process(new ProcessWindowFunction<DeserializedObj, String, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<DeserializedObj> iterable, Collector<String> out) throws Exception {
                        long count = 0;
                        for (DeserializedObj obj : iterable) {
                            count++;
                        }
                        out.collect("Window: " + tuple.getField(0) + " " + context.window() + "count: " + count);
                    }
                });

基本上,我想创建一个基于会话(间隔30分钟)的窗口,显示在动觉流上流动的用户活动。我的事件不能保证以正确的顺序进行,因为它们是在源端触发的。我想在我的反序列化dobj中使用“timestamp”列来确保我可以将事件存储到正确的帧(会话)中。我想等待几秒钟(4)以确保我没有丢失任何事件(因为它们是延迟事件),然后再处理该窗口。
目前,我得到下面的错误,它不给任何线索发生了什么。我从apacheflink Jmeter 板下的exceptions选项卡中捕获了这个错误。

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2d7aff90[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@3aa696a6[Wrapped task = org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer@fc8fb39]] rejected from java.util.concurrent.ThreadPoolExecutor@520831ff[Shutting down, pool size = 8, active threads = 8, queued tasks = 0, completed tasks = 14]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
    at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:473)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:345)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)```

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题