flink cep未知错误由intellij ide发出警报

nqwrtyyt  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(505)

我开始用scala语言学习apache flink的cep库,并试图通过执行 CEP.pattern(input,pattern) 如教程中所示https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html,ide说它“无法解析重载方法”,指的是 pattern 方法。根据实施 readTextFile 以及 Pattern[String].begin('line').where(_.length == 10) ,我分别用来创建输入和模式,方法的参数或泛型类型应该没有任何问题。
这是我写的代码。我知道它不完整,但自从这个问题出现后,我无论如何都无法完成它。

package FlinkCEPClasses

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

class FlinkCEPPipeline {

  var props : Properties = new Properties()

  var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.setParallelism(1)

  var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines")

  var patt : Pattern[String,String] = Pattern.begin[String]("igual").where(_.length == 10)

  // Problem appears at the following line. A red subscript appears at the pattern method, 
  // saying the following: "Cannot resolve overloaded method"

  var CEPstream = CEP.pattern(input,patt)

  input.writeAsText("/home/luca/Desktop/flinkcepout",FileSystem.WriteMode.OVERWRITE)

  env.execute()

我的build.sbt文件内容如下:

name := "FlinkCEP"

version := "0.1"

scalaVersion := "2.12.10"

// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-scala
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.9.0"

libraryDependencies += "log4j" % "log4j" % "1.2.17"

libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime

libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.2" % Test```

我编写这段代码的目的只是看到它运行一个简单的“where”条件,除此之外,它不应该有任何更大的实用程序。我使用intellij作为ide。另外,我不确定scala的cep库是否可以使用。如果有人能解释一下我会很感激的。

9jyewag0

9jyewag01#

在看了@davidanderson的github示例之后,我终于解决了这个问题。因为我做了很多改变,我不能确定我的解决方案是否适合你,但我改变了 import org.apache.flink.streaming.api.datastream.DataStreamimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, _} . 注意模棱两可的导入,确保导入的是真正需要的类。
我将列出我的所有导入和build.sbt文件,以便您可以完全访问我的配置。

进口

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.cep.scala.PatternStream
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, _}

构建.sbt

name := "FlinkCEP"

version := "0.1"

scalaVersion := "2.12.10"

// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
//libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-cep
libraryDependencies += "org.apache.flink" %% "flink-cep" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-runtime
libraryDependencies += "org.apache.flink" %% "flink-runtime" % "1.9.0" % Test

// https://mvnrepository.com/artifact/org.apache.flink/flink-scala
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.9.0"

libraryDependencies += "log4j" % "log4j" % "1.2.17"

libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime

libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.2" % Test
lx0bsm1f

lx0bsm1f2#

试试这个:

import org.apache.flink.cep.scala.PatternStream

...

val CEPstream: PatternStream[String] = CEP.pattern[String](input, patt)

有关将cep与scala结合使用的简单示例,请参见github。

相关问题