将数据流从vm套接字/远程套接字发送到运行在主机操作系统上的flink程序

lokaqttq  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(325)

正如flink文档中提到的,我可以通过使用打开本地套接字从文本服务器读取文本输入

amar@admin:~$ nc -l 12345

然后在flink程序上用

DataStream<String> text = env.socketTextStream("localhost", 12345);

text.print();

env.execute();

然而,由于我正在模拟一些场景,所以我想从一个vm(最终是各种vm)获取数据流,并将其发送到运行在主机操作系统上的cep程序。
所以,我安装了vm,使用vagrant和ssh vagrant ssh 访客操作系统的主机名是precise64
使用ifconfig=10.0.2.15的ip地址
现在,我想做的是,看看我是否可以从vm发送一些数据,并在flink程序中接收它们,就像我在本地环境中所做的那样。
我使用打开了访客操作系统上的netcat套接字

vagrant@precise64:~$ nc -l 12345

我试图用在主机程序上接收它,但出现了错误

DataStream<String> text = env.socketTextStream("precise64", 12345);

text.print();

env.execute();

我也试过了precise64@10.0.2.15 但我认为我做错了。
有什么想法吗,我应该如何将数据流从vm发送到主机flink程序
欢迎提出建议,提前谢谢!

yduiuuwa

yduiuuwa1#

你可以试试这个:
1.程序:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print

    env.execute("Window Stream WordCount")
  }
}

2.运行上述程序后,您可以启动此程序。

nc -lk 9999

这会有用的。

相关问题