apache flink-无法从twitter获取数据

dm7nw8vv  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(493)

我正在尝试使用ApacheFlink通过twitter流api获取一些消息。
但是,我的代码没有在输出文件中写入任何内容。我正在计算特定单词的输入数据。
请看我的例子:

  1. import java.util.Properties
  2. import org.apache.flink.api.scala._
  3. import org.apache.flink.streaming.connectors.twitter._
  4. import org.apache.flink.api.java.utils.ParameterTool
  5. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  6. import com.twitter.hbc.core.endpoint.{Location, StatusesFilterEndpoint, StreamingEndpoint}
  7. import org.apache.flink.streaming.api.windowing.time.Time
  8. import scala.collection.JavaConverters._
  9. //////////////////////////////////////////////////////
  10. // Create an Endpoint to Track our terms
  11. class myFilterEndpoint extends TwitterSource.EndpointInitializer with Serializable {
  12. @Override
  13. def createEndpoint(): StreamingEndpoint = {
  14. //val chicago = new Location(new Location.Coordinate(-86.0, 41.0), new Location.Coordinate(-87.0, 42.0))
  15. val endpoint = new StatusesFilterEndpoint()
  16. //endpoint.locations(List(chicago).asJava)
  17. endpoint.trackTerms(List("odebrecht", "lava", "jato").asJava)
  18. endpoint
  19. }
  20. }
  21. object Connection {
  22. def main(args: Array[String]): Unit = {
  23. val props = new Properties()
  24. val params: ParameterTool = ParameterTool.fromArgs(args)
  25. val env = StreamExecutionEnvironment.getExecutionEnvironment
  26. env.getConfig.setGlobalJobParameters(params)
  27. env.setParallelism(params.getInt("parallelism", 1))
  28. props.setProperty(TwitterSource.CONSUMER_KEY, params.get("consumer-key"))
  29. props.setProperty(TwitterSource.CONSUMER_SECRET, params.get("consumer-key"))
  30. props.setProperty(TwitterSource.TOKEN, params.get("token"))
  31. props.setProperty(TwitterSource.TOKEN_SECRET, params.get("token-secret"))
  32. val source = new TwitterSource(props)
  33. val epInit = new myFilterEndpoint()
  34. source.setCustomEndpointInitializer(epInit)
  35. val streamSource = env.addSource(source)
  36. streamSource.map(s => (0, 1))
  37. .keyBy(0)
  38. .timeWindow(Time.minutes(2), Time.seconds(30))
  39. .sum(1)
  40. .map(t => t._2)
  41. .writeAsText(params.get("output"))
  42. env.execute("Twitter Count")
  43. }
  44. }

关键是,我没有错误信息,我可以看到在我的 Jmeter 板。我的消息源正在向我的triggerwindow发送数据。但它不能接收任何数据:

我一次有两个问题。
第一:如果没有收到任何信息,为什么我的源代码会向triggerwindow发送字节?
第二:我的代码有什么问题,我不能从twitter获取数据吗?

bn31dyow

bn31dyow1#

应用程序源没有将实际记录发送到窗口,您可以通过查看“已发送的记录”列看到该窗口。发送的字节属于flink在任务之间不时发送的控制消息。更具体地说,是 LatencyMarker 用于测量flink作业的端到端延迟的消息。
我觉得代码不错。我甚至试过你的代码为我工作。因此,我得出结论,twitter连接凭据肯定有问题。请重新检查您是否输入了正确的凭据。

相关问题