使用apache flink连接到twitter流媒体api时的ioexpetion

vwoqyblh  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(488)

我编写了一个小型scala程序,它使用apache flink流api来读取twitter推文。

  1. object TwitterWordCount {
  2. private val properties = "/home/twitter-login.properties"
  3. def main(args: Array[String]) {
  4. val env = StreamExecutionEnvironment.getExecutionEnvironment
  5. val twitterStream = env.addSource(new TwitterSource(properties))
  6. val tweets = twitterStream
  7. .flatMap(new JSONParseFlatMap[String, String] {
  8. override def flatMap(in: String, out: Collector[String]): Unit = {
  9. if (getString(in, "user.lang") == "en") {
  10. out.collect(getString(in, "text"))
  11. }
  12. }
  13. })
  14. tweets.print
  15. env.execute("tweets")
  16. }
  17. }

执行时遇到以下问题:

  1. 14:35:48,353 INFO com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Establishing a connection
  2. 14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection request: [route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20]
  3. 14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection leased: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 1 of 2; total allocated: 1 of 20]
  4. 14:35:48,354 DEBUG org.apache.http.impl.conn.DefaultClientConnectionOperator - Connecting to stream.twitter.com:80
  5. 14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Received message SendHeartbeat at akka://flink/user/taskmanager_1 from Actor[akka://flink/deadLetters].
  6. 14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
  7. 14:35:49,487 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Handled message SendHeartbeat in 1 ms from Actor[akka://flink/deadLetters].
  8. 14:35:49,487 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[B@4daaaf5f) at akka://flink/user/jobmanager from Actor[akka://flink/user/taskmanager_1#-64418449].
  9. 14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received hearbeat message from cb51cdb1bd08879df10bd2198b8e043a.
  10. 14:35:49,488 DEBUG org.apache.flink.runtime.instance.InstanceManager - Received heartbeat from TaskManager cb51cdb1bd08879df10bd2198b8e043a @ localhost - 8 slots - URL: akka://flink/user/taskmanager_1
  11. 14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Handled message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[B@4daaaf5f) in 0 ms from Actor[akka://flink/user/taskmanager_1#-64418449].
  12. 14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d closed
  13. 14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d shut down
  14. 14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection [id: 4][route: {}->http://stream.twitter.com] can be kept alive for 9223372036854775807 MILLISECONDS
  15. 14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d closed
  16. 14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection released: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20]
  17. 14:35:52,359 WARN com.twitter.hbc.httpclient.ClientBase - twitterSourceClient IOException caught when establishing connection to https://stream.twitter.com/1.1/statuses/filter.json?delimited=length
  18. 14:35:53,613 WARN com.twitter.hbc.httpclient.ClientBase - twitterSourceClient failed to establish connection properly
  19. 14:35:53,613 INFO com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Done processing, preparing to close connection
  20. 14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager is shutting down
  21. 14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager shut down

程序尝试重新建立连接。因此,这4行日志消息将继续发出。
奇怪的是,当我运行apache flink项目中提供的示例时,一切正常(我从github获取了master的最新版本)。我甚至使用相同的属性文件。如果我将这个示例类复制到自己的项目中,上面的问题状态也会出现。
我用flink原型来创建我自己的项目。我尝试了版本0.9.1和0.10-snapshot。依赖关系 flink-scala , flink-streaming-scala , flink-clients 以及 flink-connector-twitter 在相应的版本中使用。
有没有人经历过类似的问题,能让我走上正轨?

4bbkushb

4bbkushb1#

调试 com.twitter.hbc.httpclient.ClientBase 给我带来了以下例外: org.apache.http.conn.ConnectTimeoutException: Connect to stream.twitter.com:80 timed out 根据twitter开发者论坛上的一篇文章,发生这种情况是因为apacheshttpclient4.2中的一个bug。事实上,解析我的项目上的依赖树表明flink运行时对com有依赖关系。amazonaws:aws-java-sdk:1.81,它同样依赖于org.apache.httpcomponents:httpclient:4.2.
将httpclient4.2.6添加到我的项目的依赖项中暂时解决了这个问题。

ckx4rj1h

ckx4rj1h2#

谢谢@peedeex21你的解决方案帮了我!在pom.xml中添加显式依赖项可以解决从eclipse运行时的问题,但是当使用flink cluster并提交带有flink run的程序时,使用flink发行版打包的版本仍然会获胜。
我将httpclient-4.2.6.jar下载到flink/lib中,并将其重命名为“a”(ahttpclient-4.2.6.jar),因此它将首先添加到flink运行时的类路径中(由bin/config.sh完成),希望对某些人有所帮助。

相关问题