如何连接流和数据集?我有一个流,文件中有一个静态数据。我想用文件中的数据来丰富流的数据。示例:在流中我得到机场代码,在文件中我有机场的名称和文件中的代码。现在我想将流数据加入到文件中,形成一个具有机场名称的新流。请提供如何实现这一目标的步骤。
myss37ts1#
根据具体要求,有许多方法可以使用flink进行河流富集。https://www.youtube.com/watch?v=cjs18ikluiy 这是康斯坦丁·可耐福的一篇很好的演讲,涵盖了许多不同的方法,以及它们之间的权衡。在一个简单的例子中,富集数据是不可变的,并且相当小,我只使用 RichFlatMap 并将整个文件加载到 open() 方法。看起来像这样:
RichFlatMap
open()
public class EnrichmentWithPreloading extends RichFlatMapFunction<Event, EnrichedEvent> { private Map<Long, SensorReferenceData> referenceData; @Override public void open(final Configuration parameters) throws Exception { super.open(parameters); referenceData = loadReferenceData(); } @Override public void flatMap( final Event event, final Collector<EnrichedEvent> collector) throws Exception { SensorReferenceData sensorReferenceData = referenceData.get(sensorMeasurement.getSensorId()); collector.collect(new EnrichedEvent(event, sensorReferenceData)); } }
您将在中找到更多其他方法的代码示例https://github.com/knaufk/enrichments-with-flink.更新:如果您更愿意预加载一些更大的分区引用数据以与流连接,那么有几种方法可以实现这一点,我在上面共享的视频和repo中介绍了其中一些方法。对于那些特定的需求,我建议使用自定义分区器;在同一个github repo中有一个例子。其思想是对丰富数据进行切分,并使用相关的参考数据将每个流事件导向示例。在我看来,这比试图让表api作为一个连接来进行这种特殊的扩展要简单。
1条答案
按热度按时间myss37ts1#
根据具体要求,有许多方法可以使用flink进行河流富集。https://www.youtube.com/watch?v=cjs18ikluiy 这是康斯坦丁·可耐福的一篇很好的演讲,涵盖了许多不同的方法,以及它们之间的权衡。
在一个简单的例子中,富集数据是不可变的,并且相当小,我只使用
RichFlatMap
并将整个文件加载到open()
方法。看起来像这样:您将在中找到更多其他方法的代码示例https://github.com/knaufk/enrichments-with-flink.
更新:
如果您更愿意预加载一些更大的分区引用数据以与流连接,那么有几种方法可以实现这一点,我在上面共享的视频和repo中介绍了其中一些方法。对于那些特定的需求,我建议使用自定义分区器;在同一个github repo中有一个例子。其思想是对丰富数据进行切分,并使用相关的参考数据将每个流事件导向示例。
在我看来,这比试图让表api作为一个连接来进行这种特殊的扩展要简单。