我想用flink替换我的基于akka流的流处理器的一部分。当前是否可以使用akka流作为flink的源,然后使用flink作为同一代码库中akka流的源?
akka溪流的水流如下:
// Kafka Source -> Read Flow involving Azure cosmosDB -> Evaluate Flow -> Write Flow -> Logger Flow -> Sink
lazy private val converterGraph: Graph[ClosedShape.type, Future[Done]] =
GraphDSL.create(sink) { implicit builder => out =>
source.watchTermination()(Keep.none) ~> prepareFlow ~> readFlow ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out
ClosedShape
}
上述流程定义如下:
def prepareFlow: Flow[FromSource, ToRead, NotUsed]
def readFlow: Flow[ToRead, ToEvaluate, NotUsed]
而不是现在 readFlow
作为一个akka流,我想用一个flink流处理器来代替它。所以 prepareFlow
将是flink的一个输入 readFlow
,其输出将被输入到 evaluateFlow
.
基本上,有没有可能这样做:
prepareFlow ~> [Flink source ->read -> result] ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out
我看到apachebahir中有一个flink-akka连接器(sink),但不确定它是否可以仅用于akka参与者或流。
1条答案
按热度按时间hm2xizp91#
你可以把衣服包起来
prepareFlow
作为一个定制的flink从cosmosdb阅读Source
(通过延长SourceFunction
),并将整个evaluate write logger流 Package 为自定义流SinkFunction
.由于flink本身是分布式的,所以您将把akka流集成到flink作业中,但反之亦然。我看到这种方法的主要问题是,阿克卡流有反压力的盒子,但Flink本身主要是阻止。例如,sourcefunction.run()方法需要有一个内部无限循环在每次迭代中生成消息,所以您必须在那里阻塞,等待akka stream在那里生成下一条消息。