我是flink(kinesis数据分析aws服务)的新手。我在实现eventtimesessionwindows时遇到了困难。下面是我的代码:
SingleOutputStreamOperator<DeserializedObj> deserializedObjSingleOutputStreamOperator = kinesisStream
.assignTimestampsAndWatermarks(WatermarkStrategy
.<DeserializedObj>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> Long.parseLong(event.timestamp)));
SingleOutputStreamOperator<String> sessionStream = deserializedObjSingleOutputStreamOperator
.keyBy("anonymousId")
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.process(new ProcessWindowFunction<DeserializedObj, String, Tuple, TimeWindow>() {
@Override
public void process(Tuple tuple, Context context, Iterable<DeserializedObj> iterable, Collector<String> out) throws Exception {
long count = 0;
for (DeserializedObj obj : iterable) {
count++;
}
out.collect("Window: " + tuple.getField(0) + " " + context.window() + "count: " + count);
}
});
基本上,我想创建一个基于会话(间隔30分钟)的窗口,显示在动觉流上流动的用户活动。我的事件不能保证以正确的顺序进行,因为它们是在源端触发的。我想在我的反序列化dobj中使用“timestamp”列来确保我可以将事件存储到正确的帧(会话)中。我想等待几秒钟(4)以确保我没有丢失任何事件(因为它们是延迟事件),然后再处理该窗口。
目前,我得到下面的错误,它不给任何线索发生了什么。我从apacheflink Jmeter 板下的exceptions选项卡中捕获了这个错误。
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2d7aff90[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@3aa696a6[Wrapped task = org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer@fc8fb39]] rejected from java.util.concurrent.ThreadPoolExecutor@520831ff[Shutting down, pool size = 8, active threads = 8, queued tasks = 0, completed tasks = 14]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:473)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:345)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)```
暂无答案!
目前还没有任何答案,快来回答吧!