我想通过flink、scala语言和addsource和readcsvfile函数来读取csv文件。我没有找到任何简单的例子。我只发现:https://github.com/dataartisans/flink-training-exercises/blob/master/src/main/scala/com/dataartisans/flinktraining/exercises/datastream_scala/cep/longrides.scala 这对我来说太复杂了。
在定义中:streamexecutionenvironment.addsource(sourcefunction)我应该只使用readcsvfile作为sourcefunction吗?
读完之后,我想使用cep(复杂事件处理)。
2条答案
按热度按时间waxmsbnn1#
readcsvfile()仅作为flink的dataset(批处理)api的一部分提供,不能与datastream(流式处理)api一起使用。下面是readcsvfile()的一个很好的例子,尽管它可能与您要做的事情无关。
readtextfile()和readfile()是streamexecutionenvironment上的方法,它们不实现sourcefunction接口——它们不打算与addsource()一起使用,而是代替它。下面是使用datastreamapi使用readtextfile()加载csv的示例。
另一个选择是使用表api和csvtablesource。这里有一个例子和一些关于它做什么和不做什么的讨论。如果您走这条路线,那么在使用cep之前,您需要使用streamtableenvironment.toappendstream()将表流转换为数据流。
请记住,所有这些方法都只需读取一次文件,并从其内容创建一个有界流。如果您想要一个读取无限csv流并等待追加新行的源,则需要另一种方法。您可以使用自定义源、sockettextstream或类似kafka的东西。
ymdaylpp2#
我已经尝试解决上述错误很久了,但还没有成功。你能告诉我怎么解决这个问题吗?