./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
根据官方的quickstart直接运行示例程序fololws.log。原因似乎是java.net.connectexception。我确定端口没有被使用,防火墙已经关闭。
root@maple-PC:/home/maple/Downloads/flink-1.4.2# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8081
Starting execution of program
Submitting job with JobID: b371f7847302f8930115f093c7e32d3d. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-198847679] with leader session id 00000000-0000-0000-0000-000000000000.
12/15/2018 01:06:20 Job execution switched to status RUNNING.
12/15/2018 01:06:20 Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED
12/15/2018 01:06:20 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@9cfdeb73, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@71687585}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed(1/1) switched to SCHEDULED
12/15/2018 01:06:20 Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING
12/15/2018 01:06:20 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@9cfdeb73, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@71687585}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed(1/1) switched to DEPLOYING
12/15/2018 01:06:21 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@9cfdeb73, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@71687585}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed(1/1) switched to RUNNING
12/15/2018 01:06:21 Source: Socket Stream -> Flat Map(1/1) switched to RUNNING
12/15/2018 01:06:21 Source: Socket Stream -> Flat Map(1/1) switched to FAILED
java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
12/15/2018 01:06:21 Job execution switched to status FAILING.
java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
12/15/2018 01:06:21 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@9cfdeb73, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@71687585}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed(1/1) switched to CANCELING
12/15/2018 01:06:21 TriggerWindow(TumblingProcessingTimeWindows(5000), ReducingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@9cfdeb73, reduceFunction=org.apache.flink.streaming.examples.socket.SocketWindowWordCount$1@71687585}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:241)) -> Sink: Unnamed(1/1) switched to CANCELED
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.ConnectException: 拒绝连接 (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
3条答案
按热度按时间41ik7eoe1#
从上一个堆栈可以看出,
sockettextstreamfunction
在尝试连接时被拒绝了连接localhost:9000 with 插座。所以这是发送方(程序)或接收方(本地机器的端口)的问题。也许你可以附上9000的端口状态,让我们知道更多,这可以帮助解决这个问题。lhcgjxsq2#
我相信这是flink/flink演示的问题。我也有同样的问题。本地主机到本地主机。如果我多次运行这个演示,我会一次接一次地失败,但是它会连接起来并正常工作。nc一直在那里等着。我是从
ncat -v -l 9000
所以当有人试图联系的时候它就显示出来了。ncat输出使用
-v
显示在我成功运行演示之前,没有任何东西试图连接到ncat。所以,我做了一个基本测试:https://www.linode.com/docs/networking/diagnostics/netcat/netcat/#using-netcat作为服务器
结果是,这是完美的工作!我可以在两扇打开的Windows之间连接,尽情地自言自语。
我真的认为这就是一个例子(也许它在放弃之前等待的时间不够长?)
设置
操作系统
centos7.4,linux myŠU主机3.10.0-957.5.1.el7.x86ŠU 64Š1 smp 2月1日星期五14:54:57 utc 2019 x86ŠU 64 x86ŠU 64 gnu/linux
ncat
系统上的试用版本:7.50
还尝试了从nmap.org下载的版本:7.80。
java
java版本“1.8.0\u 221”java(tm)se运行时环境(内部版本1.8.0\u 221-b11)java热点(tm)64位服务器虚拟机(内部版本25.221-b11,混合模式)
aoyhnmkz3#
这些套接字流示例期望服务器(netcat)已经启动,并且在flink作业启动之前绑定到端口。通常这是通过
但有些版本的netcat需要
请参阅apache flink(稳定版本1.6.2)不起作用,以了解更多关于此问题的讨论。