apache flink scala流媒体

moiiocjp  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(259)

不能使任何例子编译更不用说工作。我做错什么了?从快速启动模板开始。

package dav.network

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.extensions._
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.api.datastream._
import java.util.Properties
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.connectors.cassandra._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.api.common.functions._

object WordCount {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.createRemoteEnvironment(
      "localhost",
      8040,
      "./target/scala-2.11/flink-test-assembly-0.1-SNAPSHOT.jar");

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "test")
    val kafkaConsumer =
      new FlinkKafkaConsumer011[String]("topic",
                                        new SimpleStringSchema(),
                                        properties);
    val counts = env.addSource(kafkaConsumer)
    .flatMap { _.toLowerCase.split("\\W+") }
    .map { (_, 1) }
    .groupBy(0)
    .sum(1);
    env.execute
  }
}

无法编译:

[info] Loading settings from assembly.sbt ...
[info] Loading project definition from /.../flink-test/project
[info] Loading settings from build.sbt,idea.sbt ...
[info] Set current project to flink-test (in build file:/.../flink-test/)
[success] Total time: 0 s, completed Oct 21, 2018 7:46:13 PM
[info] Updating ...
[info] Done updating.
[info] Compiling 3 Scala sources to /.../flink-test/target/scala-2.11/classes ...
[error] /.../flink-test/src/main/scala/dav/network/WordCount.scala:30:16: missing parameter type for expanded function ((x$1) => x$1.toLowerCase.split("\\W+"))
[error]     .flatMap { _.toLowerCase.split("\\W+") }
[error]                ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 4 s, completed Oct 21, 2018 7:46:17 PM

唯一可以编译的是: val counts: DataStream[(String)] = env.addSource(kafkaConsumer); val counts1=counts.map((_)); .map[String](new MapFunction[String, (String)] { override def map(x: String): (String) = (x) });

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题