我正在构建一个关于Flink1.2的教程,我想运行一些简单的窗口示例。其中之一是会话窗口。
我要运行的代码如下:
import <package>.Session
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import scala.util.Try
object SessionWindowExample {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.socketTextStream("localhost", 9000)
//session map
val values = source.map(value => {
val columns = value.split(",")
val endSignal = Try(Some(columns(2))).getOrElse(None)
Session(columns(0), columns(1).toDouble, endSignal)
})
val keyValue = values.keyBy(_.sessionId)
// create global window
val sessionWindowStream = keyValue.
window(GlobalWindows.create()).
trigger(PurgingTrigger.of(new SessionTrigger[GlobalWindow]()))
sessionWindowStream.sum("value").print()
env.execute()
}
}
你会注意到我需要示例化一个 new SessionTrigger
对象,基于这个类:
import <package>.Session
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.Window
class SessionTrigger[W <: Window] extends Trigger[Session,W] {
override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
if(element.endSignal.isDefined) TriggerResult.FIRE
else TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
}
然而,intelij一直抱怨: Class 'SessionTrigger' must either be declared abstract or implement abstract member 'clear(window: W, ctx: TriggerContext):void' in 'org.apache.flink.streaming.api.windowing.triggers.Trigger'
.
我试着在课堂上加上这个:
override def clear(window: W, ctx: TriggerContext): Unit = ctx.deleteEventTimeTimer(4)
但它不起作用。这是我得到的错误:
03/27/2017 15:48:38 TriggerWindow(GlobalWindows(), ReducingStateDescriptor{serializer=co.uk.DRUK.flink.windowing.SessionWindowExample.SessionWindowExample$$anon$2$$anon$1@1aec64d0, reduceFunction=org.apache.flink.streaming.api.functions.aggregation.SumAggregator@1a052a00}, PurgingTrigger(co.uk.DRUK.flink.windowing.SessionWindowExample.SessionTrigger@f2f2cc1), WindowedStream.reduce(WindowedStream.java:276)) -> Sink: Unnamed(4/4) switched to CANCELED
03/27/2017 15:48:38 Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at co.uk.DRUK.flink.windowing.SessionWindowExample.SessionWindowExample$$anonfun$1.apply(SessionWindowExample.scala:27)
at co.uk.DRUK.flink.windowing.SessionWindowExample.SessionWindowExample$$anonfun$1.apply(SessionWindowExample.scala:24)
at org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:521)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Process finished with exit code 1
有人知道为什么吗?
1条答案
按热度按时间ve7v8dk21#
例外情况清楚地表明
这显然Map到下面的代码行
所以下一个显而易见的事情就是记录你的
columns
以及value
之后我怀疑
value
只是不包含第二个逗号分隔的列至少对于某些值。