我正在建立一条从Kafka到hdfs的flink管道。我想在addsink()步骤之后处理元素。这是因为我想设置一个触发器文件来指示某个分区/小时的数据写入(到接收器)已经完成。如何做到这一点?目前我正在使用Flume。
datastream messagestream=env.addsource(flinkkafkaconsumer011);
//一些聚合将消息流转换为keyedstream
keyedstream.addsink(sink);
//3.之后如何处理元素。?
我正在建立一条从Kafka到hdfs的flink管道。我想在addsink()步骤之后处理元素。这是因为我想设置一个触发器文件来指示某个分区/小时的数据写入(到接收器)已经完成。如何做到这一点?目前我正在使用Flume。
datastream messagestream=env.addsource(flinkkafkaconsumer011);
//一些聚合将消息流转换为keyedstream
keyedstream.addsink(sink);
//3.之后如何处理元素。?
1条答案
按热度按时间ttp71kqs1#
flink API不支持将作业图扩展到接收器之外(但是,您可以分叉流,并在向接收器写入数据的同时执行其他处理。)
使用流文件接收器,您可以观察零件文件在完成时转换到完成状态。有关更多信息,请参阅javadoc。
状态存在于单个操作符中——只有该操作符(例如,processfunction)可以修改它。如果要在接收器完成后修改键控值状态,则没有直接的方法。一个想法是在processfunction中添加一个处理时间计时器,该计时器具有键控状态,该状态定期唤醒并检查新完成的零件文件,并根据它们的存在修改状态。或者,如果粒度错误,则编写一个执行类似操作的自定义源,并将信息流式传输或广播到processfunction中(该函数必须是协处理器函数或keyedbroadcastprocessfunction),以用于执行必要的状态更新。