使用scala通过flink timewindow累计字数

ni65a41a  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(413)

我有推特流api,我从那里检索推特。
我也有一个我想要考虑的单词列表。
我要做的是将最精确的值存储到我的cassandra数据库中,该值对应于这个词在一天中被使用的次数。
我在考虑使用窗口函数每5秒钟合并一次结果,然后在数据库中写入这个合并值。
我不知道这是不是最好的办法。如果这是最好的方法,我试着在文档后面做一个简单的例子,但是它没有每5秒钟对单词进行分组。

val env = StreamExecutionEnvironment.getExecutionEnvironment

    val counts =
      env.fromElements("foo bar test test baz foo", "yes no no yes", "hi hello hi hello")
      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .filter(word => Words.listOfWords.contains(word) || Words.listOfWords2.contains(word))
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5)).sum( 1)

    counts.print()
    env.execute("test-code")

  }
vktxenjb

vktxenjb1#

好吧,目前它将不起作用,因为您正在创建 DataStream 从元素,这不是窗口化的最佳方法,因为您实际上没有5秒的运行时间来创建多个窗口,所以所有消息都将转到同一个窗口。但是,如果您在实际的twitterapi上运行它,通常应该将这些项目正确地分组到windows中。

相关问题