通过flink、scala、addsource和readcsvfile读取csv文件

rjzwgtxy  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(472)

我想通过flink、scala语言和addsource和readcsvfile函数来读取csv文件。我没有找到任何简单的例子。我只发现:https://github.com/dataartisans/flink-training-exercises/blob/master/src/main/scala/com/dataartisans/flinktraining/exercises/datastream_scala/cep/longrides.scala 这对我来说太复杂了。
在定义中:streamexecutionenvironment.addsource(sourcefunction)我应该只使用readcsvfile作为sourcefunction吗?
读完之后,我想使用cep(复杂事件处理)。

waxmsbnn

waxmsbnn1#

readcsvfile()仅作为flink的dataset(批处理)api的一部分提供,不能与datastream(流式处理)api一起使用。下面是readcsvfile()的一个很好的例子,尽管它可能与您要做的事情无关。
readtextfile()和readfile()是streamexecutionenvironment上的方法,它们不实现sourcefunction接口——它们不打算与addsource()一起使用,而是代替它。下面是使用datastreamapi使用readtextfile()加载csv的示例。
另一个选择是使用表api和csvtablesource。这里有一个例子和一些关于它做什么和不做什么的讨论。如果您走这条路线,那么在使用cep之前,您需要使用streamtableenvironment.toappendstream()将表流转换为数据流。
请记住,所有这些方法都只需读取一次文件,并从其内容创建一个有界流。如果您想要一个读取无限csv流并等待追加新行的源,则需要另一种方法。您可以使用自定义源、sockettextstream或类似kafka的东西。

ymdaylpp

ymdaylpp2#

import org.apache.flink.types.Row
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.api.common.typeinfo.Types

object CepTest2 {

  def main(args: Array[String]) {

    println("Start ...")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //val tableEnv = StreamTableEnvironment.getTableEnvironment(env)
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val csvtable = CsvTableSource
      .builder
      .path("/home/esa/Log_EX1_gen_track_5.csv")
      .ignoreFirstLine
      .fieldDelimiter(",")
      .field("time", Types.INT)
      .field("id", Types.STRING)
      .field("sources", Types.STRING)
      .field("targets", Types.STRING)
      .field("attr", Types.STRING)
      .field("data", Types.STRING)
      .build

    tableEnv.registerTableSource("test", csvtable)

    val tableTest = tableEnv.scan("test").where("id='5'").select("id,sources,targets")

    val stream = tableEnv.toAppendStream[Row](tableTest)

    stream.print
    env.execute()
  }
}

Error:(56, 46) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
    val stream = tableEnv.toAppendStream[Row](tableTest)

Error:(56, 46) not enough arguments for method toAppendStream: (implicit evidence$3: org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row])org.apache.flink.streaming.api.scala.DataStream[org.apache.flink.types.Row].
Unspecified value parameter evidence$3.
    val stream = tableEnv.toAppendStream[Row](tableTest)

我已经尝试解决上述错误很久了,但还没有成功。你能告诉我怎么解决这个问题吗?

相关问题