scala和ApacheFlink的新成员,为什么我的map函数在repl中正确运行,但在flink中失败

jhiyze9q  于 2021-06-25  发布在  Flink
关注(0)|答案(3)|浏览(457)

我正在尝试(没有成功)在apacheflink中运行一个简单的helloworld类型的程序。代码接收来自apachekafka的消息,在每个字母后添加“.”,并将新字符串打印到stdout。代码正确地从kafka获取消息,但是添加“.”的map函数失败。我在repl提示符下尝试了这个函数,scala代码在那里正常工作。scala代码:

scala> input = "hello"
input: String = hello
scala> val output = input.flatMap(value => value + ".")
output: String = h.e.l.l.o

flink程序:flink代码截断行读取

val messageStream = env.addSource(new FlinkKafkaConsumer09("CL", new SimpleStringSchema, properties))

我不知道哪里出了问题,我尝试了apache文档,但没有结果。你能给我的任何帮助都会受到欢迎的。

i2byvkas

i2byvkas1#

谢谢你的帮助,我算是解决了问题。尽管flatmap函数在scala提示符下工作,但它在flink中不能正常工作,因为flink要求向flatmap传递一个带有重写的新flatmap函数。仍然不确定为什么它会在flink中的scala提示符下工作,但是代码现在编译并按预期运行。

r7xajy2e

r7xajy2e2#

首先,我建议学习一些函数式编程和操作的基础知识,比如map/flatmap/reduce等。
所有这些函数都适用于集合。在scala示例中,@pedrofuria指出 flatMap 函数到 String 它是 char s
在flink的例子中 messageStream 可以抽象为字符串的集合,因此要执行您描述的操作,您应该执行以下操作:

val stream = messageStream.map(str => str.mkString("."))

我用了 mkString 而不是 flatMap 因为前者比
h、 e.l.l.o(如你所写)
它产生
h、 e.l.l.o。
但是再一次真正从函数式编程的基础开始。

tuwxkamq

tuwxkamq3#

因为scala提示符中的flatmap与flink程序中的flatmap不同。
scala提示符中的flatmap只是scala中的一个函数。
flink flatmap可以在如下输入时应用:

val input = benv.fromElements(
            "To be, or not to be,--that is the question:--",
            "Whether 'tis nobler in the mind to suffer",
            "The slings and arrows of outrageous fortune",
            "Or to take arms against a sea of troubles,")
val counts = input
            .flatMap { _.toLowerCase.split("\\W+") }
            .map { (_, 1) }.groupBy(0).sum(1)

参见:scala shell

相关问题