我正在尝试(没有成功)在apacheflink中运行一个简单的helloworld类型的程序。代码接收来自apachekafka的消息,在每个字母后添加“.”,并将新字符串打印到stdout。代码正确地从kafka获取消息,但是添加“.”的map函数失败。我在repl提示符下尝试了这个函数,scala代码在那里正常工作。scala代码:
scala> input = "hello"
input: String = hello
scala> val output = input.flatMap(value => value + ".")
output: String = h.e.l.l.o
flink程序:flink代码截断行读取
val messageStream = env.addSource(new FlinkKafkaConsumer09("CL", new SimpleStringSchema, properties))
我不知道哪里出了问题,我尝试了apache文档,但没有结果。你能给我的任何帮助都会受到欢迎的。
3条答案
按热度按时间i2byvkas1#
谢谢你的帮助,我算是解决了问题。尽管flatmap函数在scala提示符下工作,但它在flink中不能正常工作,因为flink要求向flatmap传递一个带有重写的新flatmap函数。仍然不确定为什么它会在flink中的scala提示符下工作,但是代码现在编译并按预期运行。
r7xajy2e2#
首先,我建议学习一些函数式编程和操作的基础知识,比如map/flatmap/reduce等。
所有这些函数都适用于集合。在scala示例中,@pedrofuria指出
flatMap
函数到String
它是char
s在flink的例子中
messageStream
可以抽象为字符串的集合,因此要执行您描述的操作,您应该执行以下操作:我用了
mkString
而不是flatMap
因为前者比h、 e.l.l.o(如你所写)
它产生
h、 e.l.l.o。
但是再一次真正从函数式编程的基础开始。
tuwxkamq3#
因为scala提示符中的flatmap与flink程序中的flatmap不同。
scala提示符中的flatmap只是scala中的一个函数。
flink flatmap可以在如下输入时应用:
参见:scala shell